22 # pragma warning(disable:4355) 25 #define epicsAssertAuthor "Jeff Hill johill@lanl.gov" 50 class tcpiiu & iiuIn,
const char * pName,
51 unsigned stackSize,
unsigned priority ) :
52 thread ( *this, pName, stackSize, priority ), iiu ( iiuIn )
62 this->thread.start ();
71 this->thread.exitWait ();
74 void tcpSendThread::run ()
79 bool laborPending =
false;
84 if ( ! laborPending ) {
94 bool flowControlLaborNeeded =
99 if ( flowControlLaborNeeded ) {
112 if ( echoLaborNeeded ) {
121 pChan->channelNode::listMember =
122 channelNode::cs_createRespPend;
131 pChan->channelNode::listMember =
132 channelNode::cs_v42ConnCallbackPend;
145 pChan->resubscribe ( guard );
147 pChan->channelNode::listMember =
148 channelNode::cs_connected;
157 pChan->sendSubscriptionUpdateRequests ( guard );
159 pChan->channelNode::listMember =
160 channelNode::cs_connected;
179 sockErrBuf,
sizeof ( sockErrBuf ) );
180 errlogPrintf (
"CAC TCP clean socket shutdown error was %s\n",
187 "cac: tcp send thread received an unexpected exception " 188 "- disconnecting\n");
195 sockErrBuf,
sizeof ( sockErrBuf ) );
196 errlogPrintf (
"CAC TCP clean socket shutdown error was %s\n",
204 while ( ! this->iiu.
recvThread.exitWait ( 30.0 ) ) {
232 unsigned nBytesInBuf,
const epicsTime &
currentTime )
234 unsigned nBytes = 0u;
235 assert ( nBytesInBuf <= INT_MAX );
237 this->sendDog.start ( currentTime );
240 int status = ::send ( this->sock,
241 static_cast < const char * > (pBuf), (
int) nBytesInBuf, 0 );
243 nBytes = static_cast <
unsigned> (
status );
249 if ( this->state != iiucs_connected &&
250 this->state != iiucs_clean_shutdown ) {
255 this->disconnectNotify ( guard );
267 "CAC: system low on network buffers " 268 "- send retry in 15 seconds\n" );
284 sockErrBuf,
sizeof ( sockErrBuf ) );
289 this->disconnectNotify ( guard );
294 this->sendDog.cancel ();
300 void * pBuf,
unsigned nBytesInBuf,
statusWireIO & stat )
302 assert ( nBytesInBuf <= INT_MAX );
305 int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
306 static_cast <int> ( nBytesInBuf ), 0 );
318 this->disconnectNotify ( guard );
326 if ( this->state != iiucs_connected &&
327 this->state != iiucs_clean_shutdown ) {
347 "CAC: system low on network buffers " 348 "- receive retry in 15 seconds\n" );
358 sockErrBuf,
sizeof ( sockErrBuf ) );
364 this->hostNameCacheInstance.getName (
365 name,
sizeof ( name ) );
367 "Unexpected problem with CA circuit to" 368 " server \"%s\" was \"%s\" - disconnecting\n",
381 unsigned int stackSize,
unsigned int priority ) :
382 thread ( *this, pName, stackSize, priority ),
383 iiu ( iiuIn ), cbMutex ( cbMutexIn ),
384 ctxNotify ( ctxNotifyIn ) {}
392 this->thread.start ();
401 return this->thread.exitWait ( delay );
406 this->thread.exitWait ();
409 bool tcpRecvThread::validFillStatus (
430 errlogMessage (
"cac: invalid fill status - disconnecting" );
436 void tcpRecvThread::run ()
440 bool connectSuccess =
false;
443 this->connect ( guard );
446 if ( ! connectSuccess ) {
483 if ( ! this->validFillStatus ( guard, stat ) ) {
496 bool sendWakeupNeeded =
false;
509 pChan->connect ( mgr.
cbGuard, guard );
514 bool protocolOK =
false;
521 if ( ! protocolOK ) {
532 sendWakeupNeeded =
true;
547 if ( bytesArePending ) {
551 this->iiu.cacRef.maxContiguousFrames ( guard ) ) {
553 sendWakeupNeeded =
true;
563 sendWakeupNeeded =
true;
569 if ( sendWakeupNeeded ) {
579 catch ( std::bad_alloc & ) {
581 "CA client library tcp receive thread " 582 "terminating due to no space in pool " 587 catch ( std::exception & except ) {
589 "CA client library tcp receive thread " 590 "terminating due to C++ exception \"%s\"\n",
597 "CA client library tcp receive thread " 598 "terminating due to a non-standard C++ exception\n" );
607 void tcpRecvThread::connect (
616 status = ::connect ( this->iiu.
sock,
617 & tmp.
sa, sizeof ( tmp.
sa ) );
643 sockErrBuf,
sizeof ( sockErrBuf ) );
644 errlogPrintf (
"CAC: Unable to connect because \"%s\"\n",
674 hostNameCacheInstance ( addrIn, engineIn ),
675 recvThread ( *this, cbMutexIn, ctxNotifyIn,
"CAC-TCP-recv",
677 cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ),
678 sendThread ( *this,
"CAC-TCP-send",
680 cac::lowestPriorityLevelAbove (
681 cac.getInitializingThreadsPriority() ) ),
682 recvDog ( cbMutexIn, ctxNotifyIn, mutexIn,
683 *this, connectionTimeout, timerQueue ),
684 sendDog ( cbMutexIn, ctxNotifyIn, mutexIn,
685 *this, connectionTimeout, timerQueue ),
686 sendQue ( *this, comBufMemMgrIn ),
687 recvQue ( comBufMemMgrIn ),
689 curDataBytes ( 0ul ),
690 comBufMemMgr ( comBufMemMgrIn ),
692 pCurData ( (char*)
freeListMalloc(this->cacRef.tcpSmallRecvBufFreeList) ),
693 pSearchDest ( pSearchDestIn ),
695 cbMutex ( cbMutexIn ),
696 minorProtocolVersion ( minorVersion ),
697 state ( iiucs_connecting ),
699 contigRecvMsgCount ( 0u ),
700 blockingForFlush ( 0u ),
701 socketLibrarySendBufferSize ( 0x1000 ),
702 unacknowledgedSendBytes ( 0u ),
703 channelCountTot ( 0u ),
704 _receiveThreadIsBusy (
false ),
705 busyStateDetected (
false ),
706 flowControlActive (
false ),
707 echoRequestPending (
false ),
708 oldMsgHeaderAvailable (
false ),
709 msgHeaderAvailable (
false ),
710 earlyFlush (
false ),
711 recvProcessPostponedFlush (
false ),
712 discardingPendingData (
false ),
713 socketHasBeenClosed (
false ),
714 unresponsiveCircuit (
false )
717 throw std::bad_alloc();
724 sockErrBuf,
sizeof ( sockErrBuf ) );
725 std :: string reason =
726 "CAC: TCP circuit creation failure because \"";
727 reason += sockErrBuf;
729 throw runtime_error ( reason );
733 int status = setsockopt ( this->
sock, IPPROTO_TCP, TCP_NODELAY,
734 (
char *) &flag,
sizeof ( flag ) );
738 sockErrBuf,
sizeof ( sockErrBuf ) );
739 errlogPrintf (
"CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
744 status = setsockopt ( this->
sock , SOL_SOCKET, SO_KEEPALIVE,
745 (
char * ) &flag,
sizeof ( flag ) );
749 sockErrBuf,
sizeof ( sockErrBuf ) );
750 errlogPrintf (
"CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n",
772 status = setsockopt ( this->
sock, SOL_SOCKET, SO_SNDBUF,
773 (
char * ) &i,
sizeof ( i ) );
777 errlogPrintf (
"CAC: problems setting socket option SO_SNDBUF = \"%s\"\n",
781 status = setsockopt ( this->
sock, SOL_SOCKET, SO_RCVBUF,
782 (
char * ) &i,
sizeof ( i ) );
786 errlogPrintf (
"CAC: problems setting socket option SO_RCVBUF = \"%s\"\n",
794 osiSocklen_t sizeOfParameter = static_cast <
int > (
sizeof ( nBytes ) );
795 status = getsockopt ( this->
sock, SOL_SOCKET, SO_SNDBUF,
796 (
char * ) &nBytes, &sizeOfParameter );
797 if ( status < 0 || nBytes < 0 ||
798 sizeOfParameter != static_cast < int > (
sizeof ( nBytes ) ) ) {
801 sockErrBuf,
sizeof ( sockErrBuf ) );
802 errlogPrintf (
"CAC: problems getting socket option SO_SNDBUF = \"%s\"\n",
814 memset ( (
void *) &this->
curMsg,
'\0',
sizeof ( this->
curMsg ) );
870 pChan->channelNode::listMember =
871 channelNode::cs_subscripUpdateReqPend;
872 pChan->connect ( cbGuard, guard );
922 char hostNameTmp[128];
923 this->
getHostName ( guard, hostNameTmp,
sizeof ( hostNameTmp ) );
934 pChan->channelNode::listMember =
935 channelNode::cs_unrespCircuit;
936 pChan->unresponsiveCircuitNotify ( cbGuard, guard );
950 struct linger tmpLinger;
951 tmpLinger.l_onoff =
true;
952 tmpLinger.l_linger = 0u;
953 int status = setsockopt ( this->
sock, SOL_SOCKET, SO_LINGER,
954 reinterpret_cast <char *> ( &tmpLinger ),
sizeof (tmpLinger) );
958 sockErrBuf,
sizeof ( sockErrBuf ) );
959 errlogPrintf (
"CAC TCP socket linger set error was %s\n",
989 sockErrBuf,
sizeof ( sockErrBuf ) );
1034 else if ( this->
cacRef.tcpLargeRecvBufFreeList ) {
1047 this->hostNameCacheInstance.getName ( buf,
sizeof ( buf ) );
1048 ::printf (
"Virtual circuit to \"%s\" at version V%u.%u state %u\n",
1052 ::printf (
"\tcurrent data cache pointer = %p current data cache size = %lu\n",
1054 ::printf (
"\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n",
1056 ::printf (
"\receive thread is busy=%u\n",
1060 ::printf (
"\tvirtual circuit socket identifier %d\n", this->
sock );
1061 ::printf (
"\tsend thread flush signal:\n" );
1068 ::printf (
"IO identifier hash table:\n" );
1071 ::printf (
"Create request pending channels\n" );
1073 while ( pChan.
valid () ) {
1074 pChan->
show ( level - 2u );
1079 ::printf (
"Create response pending channels\n" );
1081 while ( pChan.
valid () ) {
1082 pChan->
show ( level - 2u );
1087 ::printf (
"V42 Conn Callback pending channels\n" );
1089 while ( pChan.
valid () ) {
1090 pChan->
show ( level - 2u );
1095 ::printf (
"Subscription request pending channels\n" );
1097 while ( pChan.
valid () ) {
1098 pChan->
show ( level - 2u );
1103 ::printf (
"Connected channels\n" );
1105 while ( pChan.
valid () ) {
1106 pChan->
show ( level - 2u );
1111 ::printf (
"Unresponsive circuit channels\n" );
1113 while ( pChan.
valid () ) {
1114 pChan->
show ( level - 2u );
1168 static const unsigned annexSize =
1183 (
"%s Cmd=%3u Type=%3u Count=%8u Size=%8u",
1186 this->curMsg.m_dataType,
1187 this->curMsg.m_count,
1188 this->curMsg.m_postsize) );
1190 (
" Avail=%8u Cid=%8u\n",
1192 this->curMsg.m_cid) );
1199 "CAC: server sent missaligned payload 0x%x\n",
1200 this->curMsg.m_postsize );
1210 char * newbuf =
NULL;
1213 if ( !this->
cacRef.tcpLargeRecvBufFreeList ) {
1219 newbuf = (
char*)malloc(newsize);
1223 newbuf = (
char*)realloc(this->
pCurData, newsize);
1228 newsize = this->
cacRef.maxRecvBytesTCP;
1236 }
else if (this->
cacRef.tcpLargeRecvBufFreeList) {
1247 "CAC: not enough memory for message body cache (ignoring response message)\n");
1269 static bool once =
false;
1272 "CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n",
1273 this->curMsg.m_postsize );
1300 unsigned size = strlen ( pName ) + 1u;
1302 assert ( postSize < 0xffff );
1330 unsigned size = strlen ( pName ) + 1u;
1332 assert ( postSize < 0xffff );
1385 assert ( priority <= 0xffff );
1394 static_cast < ca_uint16_t > ( priority ),
1430 type, nElem, chan.
getSID(guard), chan.
getCID(guard), pValue,
1450 type, nElem, chan.
getSID(guard), io.
getId(), pValue,
1465 maxBytes = 0xfffffff0;
1472 if ( nElem > maxElem ) {
1480 static_cast < ca_uint16_t > ( dataType ),
1481 static_cast < ca_uint32_t > ( nElem ),
1498 unsigned nameLength;
1501 identity = chan.
getCID ( guard );
1502 pName = chan.
pName ( guard );
1503 nameLength = chan.
nameLen ( guard );
1506 identity = chan.
getSID ( guard );
1513 if ( postCnt >= 0xffff ) {
1530 if ( postCnt > nameLength ) {
1568 unsigned mask = subscr.
getMask(guard);
1569 if ( mask > 0xffff ) {
1576 maxBytes = 0xfffffff0;
1581 unsigned dataType = subscr.
getType ( guard );
1584 if ( nElem > maxElem ) {
1591 static_cast < ca_uint16_t > ( dataType ),
1592 static_cast < ca_uint32_t > ( nElem ),
1623 maxBytes = 0xfffffff0;
1628 unsigned dataType = subscr.
getType ( guard );
1631 if ( nElem > maxElem ) {
1638 static_cast < ca_uint16_t > ( dataType ),
1639 static_cast < ca_uint32_t > ( nElem ),
1657 static_cast < ca_uint16_t > ( subscr.
getType ( guard ) ),
1658 static_cast < ca_uint16_t > ( subscr.
getCount (
1671 epicsTime current = epicsTime::getCurrent ();
1673 unsigned bytesToBeSent = pBuf->occupiedBytes ();
1674 bool success =
false;
1678 success = pBuf->flushToWire ( *
this, current );
1726 bool userRequestsCanBeAccepted =
1732 if ( ! userRequestsCanBeAccepted ||
1793 char * pBuf,
unsigned bufLength )
const throw ()
1796 return this->hostNameCacheInstance.getName ( pBuf, bufLength );
1803 return this->hostNameCacheInstance.pointer ();
1828 pChan->getSID(guard), pChan->getCID(guard) );
1833 pChan->disconnectAllIO ( cbGuard, guard );
1835 pChan->getSID(guard), pChan->getCID(guard) );
1837 pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1841 pChan->disconnectAllIO ( cbGuard, guard );
1843 pChan->getSID(guard), pChan->getCID(guard) );
1845 pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1853 pChan->disconnectAllIO ( cbGuard, guard );
1858 pChan->disconnectAllIO ( cbGuard, guard );
1860 pChan->getSID(guard), pChan->getCID(guard) );
1862 pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1877 pChan->channelNode::listMember =
1878 channelNode::cs_none;
1879 pChan->serviceShutdownNotify ( cbGuard, guard );
1883 pChan->channelNode::listMember =
1884 channelNode::cs_none;
1889 pChan->serviceShutdownNotify ( cbGuard, guard );
1893 pChan->channelNode::listMember =
1894 channelNode::cs_none;
1896 pChan->getSID(guard), pChan->getCID(guard) );
1897 pChan->serviceShutdownNotify ( cbGuard, guard );
1901 pChan->channelNode::listMember =
1902 channelNode::cs_none;
1903 pChan->disconnectAllIO ( cbGuard, guard );
1905 pChan->getSID(guard), pChan->getCID(guard) );
1906 pChan->serviceShutdownNotify ( cbGuard, guard );
1910 pChan->channelNode::listMember =
1911 channelNode::cs_none;
1912 pChan->disconnectAllIO ( cbGuard, guard );
1914 pChan->getSID(guard), pChan->getCID(guard) );
1915 pChan->serviceShutdownNotify ( cbGuard, guard );
1919 pChan->channelNode::listMember =
1920 channelNode::cs_none;
1921 pChan->disconnectAllIO ( cbGuard, guard );
1926 pChan->serviceShutdownNotify ( cbGuard, guard );
1930 pChan->channelNode::listMember =
1931 channelNode::cs_none;
1932 pChan->disconnectAllIO ( cbGuard, guard );
1934 pChan->getSID(guard), pChan->getCID(guard) );
1935 pChan->serviceShutdownNotify ( cbGuard, guard );
1944 nciu & chan,
unsigned sidIn,
1951 chan.channelNode::listMember = channelNode::cs_createReqPend;
1962 bool wasExpected =
false;
1965 if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) {
1968 chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1971 else if ( chan.channelNode::listMember == channelNode::cs_v42ConnCallbackPend ) {
1974 chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1987 switch ( chan.channelNode::listMember ) {
1988 case channelNode::cs_createReqPend:
1991 case channelNode::cs_createRespPend:
1994 case channelNode::cs_v42ConnCallbackPend:
1997 case channelNode::cs_subscripReqPend:
2000 case channelNode::cs_connected:
2003 case channelNode::cs_unrespCircuit:
2006 case channelNode::cs_subscripUpdateReqPend:
2011 "cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" );
2013 chan.channelNode::listMember = channelNode::cs_none;
2022 const char *pformat, ... )
2029 va_start ( theArgs, pformat );
2049 FD_ZERO ( & readBits );
2050 FD_SET ( this->
sock, & readBits );
2056 if ( FD_ISSET ( this->
sock, & readBits ) ) {
2064 FIONREAD, & bytesPending );
2065 if ( status >= 0 ) {
2066 if ( bytesPending > 0 ) {
2101 void tcpiiu::operator
delete (
void * )
2108 errlogPrintf (
"%s:%d this compiler is confused about " 2109 "placement delete - memory was probably leaked",
2110 __FILE__, __LINE__ );
2124 guard, chan, currentTime );
2129 const char * pName,
unsigned nameLength )
2132 guard,
id, pName, nameLength );
2157 const char * pBuf,
size_t len )
2166 piiu->
start ( guard );
2185 :: printf (
"tcpiiu :: SearchDestTCP\n" );
2190 this->minorProtocolVersion = msg.
m_count;
2201 if ( msg.
m_cid != INADDR_BROADCAST ) {
2202 serverAddr.
ia.sin_family = AF_INET;
2203 serverAddr.
ia.sin_addr.s_addr = htonl ( msg.
m_cid );
2207 serverAddr = this->address ();
2209 cacRef.transferChanToVirtCircuit
2211 0, minorProtocolVersion, serverAddr, currentTime );
void subscriptionCancelRequest(epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
const char * pLocalHostName()
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
void destroyIIU(tcpiiu &iiu)
void interruptSocketRecv()
tsDLList< nciu > subscripUpdateReqPend
void disconnectAllChannels(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard, class udpiiu &)
void searchRequest(epicsGuard< epicsMutex > &guard, const char *pbuf, size_t len)
tsDLList< nciu > connectedList
bool ca_v44_ok(epicsGuard< epicsMutex > &) const
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
void requestRecvProcessPostponedFlush(epicsGuard< epicsMutex > &)
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
#define SOCK_ECONNABORTED
void writeNotifyRequest(epicsGuard< epicsMutex > &, nciu &, netWriteNotifyIO &, unsigned type, arrayElementCount nElem, const void *pValue)
void sendTimeoutNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
#define assert(exp)
Declare that a condition should be true.
#define CA_PROTO_CREATE_CHAN
unsigned nameLen(epicsGuard< epicsMutex > &) const
#define CA_PROTO_EVENT_ADD
bool searchMsg(epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)
unsigned minorProtocolVersion
#define CA_MAJOR_PROTOCOL_REVISION
void pushString(const char *pVal, unsigned nChar)
void pushUInt16(const ca_uint16_t value)
epicsEvent flushBlockEvent
osiSockAddr getNetworkAddress(epicsGuard< epicsMutex > &) const
void show(unsigned level) const
arrayElementCount curDataMax
unsigned getMask(epicsGuard< epicsMutex > &) const
#define CA_PROTO_WRITE_NOTIFY
LIBCOM_API void *epicsStdCall epicsThreadPrivateGet(epicsThreadPrivateId)
unsigned short epicsUInt16
void flushRequest(epicsGuard< epicsMutex > &)
#define CA_PROTO_EVENT_CANCEL
unsigned requestMessageBytesPending(epicsGuard< epicsMutex > &mutualExclusionGuard)
LIBCOM_API void epicsStdCall epicsSignalRaiseSigAlarm(struct epicsThreadOSD *)
bool popOldMsgHeader(struct caHdrLargeArray &)
bool processIncoming(const epicsTime ¤tTime, callbackManager &)
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
tsDLIterConst< T > firstIter() const
tcpiiu(cac &cac, epicsMutex &mutualExclusion, epicsMutex &callbackControl, cacContextNotify &, double connectionTimeout, epicsTimerQueue &timerQueue, const osiSockAddr &addrIn, comBufMemoryManager &, unsigned minorVersion, ipAddrToAsciiEngine &engineIn, const cacChannel::priLev &priorityIn, SearchDestTCP *pSearchDestIn=NULL)
const char * pName(epicsGuard< epicsMutex > &) const
void interruptSocketSend()
ca_uint32_t getCID(epicsGuard< epicsMutex > &) const
void unlinkAllChannels(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
pvd::StructureConstPtr type
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
void fillFromWire(wireRecvAdapter &, statusWireIO &)
#define INVALID_DB_REQ(x)
#define CA_UKN_MINOR_VERSION
void assertIdenticalMutex(const T &) const
unsigned blockingForFlush
void receiveTimeoutNotify(callbackManager &, epicsGuard< epicsMutex > &)
bool ca_v42_ok(epicsGuard< epicsMutex > &) const
void sendTimeoutNotify(callbackManager &cbMgr, epicsGuard< epicsMutex > &guard)
void start(epicsGuard< epicsMutex > &)
void unresponsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
double connectionTimeout(epicsGuard< epicsMutex > &)
void versionMessage(epicsGuard< epicsMutex > &, const cacChannel::priLev &priority)
tcpSendThread(class tcpiiu &iiuIn, const char *pName, unsigned int stackSize, unsigned int priority)
unsigned sendBytes(const void *pBuf, unsigned nBytesInBuf, const epicsTime ¤tTime)
void searchReplySetUp(netiiu &iiu, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn, epicsGuard< epicsMutex > &)
epicsThreadPrivateId caClientCallbackThreadId
#define CA_PROTO_EVENTS_OFF
arrayElementCount curDataBytes
unsigned short ca_uint16_t
swioCircuitState circuitState
epicsSocketSystemCallInterruptMechanismQueryInfo
void sendBacklogProgressNotify(epicsGuard< epicsMutex > &)
void uninstallChan(epicsGuard< epicsMutex > &guard, nciu &chan)
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
void flush(epicsGuard< epicsMutex > &mutualExclusionGuard)
void uninstallChanDueToSuccessfulSearchResponse(epicsGuard< epicsMutex > &, nciu &, const class epicsTime &)
void responsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
int printFormated(epicsGuard< epicsMutex > &cbGuard, const char *pformat,...)
bool recvProcessPostponedFlush
#define CA_PROTO_READ_SYNC
#define CA_PROTO_HOST_NAME
#define CA_MINOR_PROTOCOL_REVISION
void pushFloat32(const ca_float32_t value)
void installDisconnectedChannel(epicsGuard< epicsMutex > &, nciu &)
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
unsigned occupiedBytes() const
#define socket_ioctl(A, B, C)
const char * pHostName(epicsGuard< epicsMutex > &) const
void decrementBlockingForFlushCount(epicsGuard< epicsMutex > &guard)
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
void hostNameSetRequest(epicsGuard< epicsMutex > &)
bool isNameService() const
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
void show(epicsGuard< epicsMutex > &guard, unsigned level) const
int varArgsPrintFormated(epicsGuard< epicsMutex > &callbackControl, const char *pformat, va_list args) const
void subscriptionUpdateRequest(epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
bool executeResponse(callbackManager &, tcpiiu &, const epicsTime ¤tTime, caHdrLargeArray &, char *pMsgBody)
void flushIfRecvProcessRequested(epicsGuard< epicsMutex > &)
#define CA_MESSAGE_ALIGN(A)
unsigned removeBytes(unsigned nBytes)
bool connectNotify(epicsGuard< epicsMutex > &, nciu &chan)
const char cacNillBytes[]
virtual void uninstallChanDueToSuccessfulSearchResponse(epicsGuard< epicsMutex > &, nciu &, const class epicsTime ¤tTime)=0
void writeRequest(epicsGuard< epicsMutex > &, nciu &, unsigned type, arrayElementCount nElem, const void *pValue)
LIBCOM_API void epicsStdCall freeListFree(void *pvt, void *pmem)
LIBCOM_API void epicsStdCall epicsThreadPrivateSet(epicsThreadPrivateId, void *)
void show(unsigned level) const
unsigned unacknowledgedSendBytes
double receiveWatchdogDelay(epicsGuard< epicsMutex > &) const
void readNotifyRequest(epicsGuard< epicsMutex > &, nciu &, netReadNotifyIO &, unsigned type, arrayElementCount nElem)
void enableFlowControlRequest(epicsGuard< epicsMutex > &)
bool bytesArePendingInOS() const
void searchRespNotify(const epicsTime &, const caHdrLargeArray &)
bool flushBlockThreshold() const
void recvBytes(void *pBuf, unsigned nBytesInBuf, statusWireIO &)
SearchDestTCP(cac &, const osiSockAddr &)
unsigned priority() const
BSD and SRV5 Unix timestamp.
unsigned occupiedBytes() const
void connectNotify(epicsGuard< epicsMutex > &)
unsigned getcount() const
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
void initiateCleanShutdown(epicsGuard< epicsMutex > &)
unsigned socketLibrarySendBufferSize
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
bool ca_v41_ok(epicsGuard< epicsMutex > &) const
arrayElementCount getCount(epicsGuard< epicsMutex > &, bool allow_zero) const
void messageArrivalNotify(epicsGuard< epicsMutex > &guard)
epicsEvent sendThreadFlushEvent
void installChannel(epicsGuard< epicsMutex > &, nciu &chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn)
#define CA_PROTO_CLEAR_CHANNEL
epicsGuard< epicsMutex > cbGuard
void insertRequestWithPayLoad(ca_uint16_t request, unsigned dataType, arrayElementCount nElem, ca_uint32_t cid, ca_uint32_t requestDependent, const void *pPayload, bool v49Ok)
unsigned contigRecvMsgCount
void subscriptionRequest(epicsGuard< epicsMutex > &, nciu &, netSubscription &subscr)
unsigned long arrayElementCount
bool findOrCreateVirtCircuit(epicsGuard< epicsMutex > &, const osiSockAddr &, unsigned, tcpiiu *&, unsigned, SearchDestTCP *pSearchDest=NULL)
comBuf * popNextComBufToSend()
#define CA_PROTO_CLIENT_NAME
#define CA_PROTO_READ_NOTIFY
int errlogMessage(const char *message)
void setCircuit(tcpiiu *)
void disconnectNotify(epicsGuard< epicsMutex > &)
virtual bool searchMsg(epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)=0
bool discardingPendingData
tcpRecvThread(class tcpiiu &iiuIn, epicsMutex &cbMutexIn, cacContextNotify &, const char *pName, unsigned int stackSize, unsigned int priority)
unsigned copyOutBytes(epicsInt8 *pBuf, unsigned nBytes)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
#define genLocalExcep(CBGUARD, GUARD, CAC, STAT, PCTX)
void echoRequest(epicsGuard< epicsMutex > &)
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
void userNameSetRequest(epicsGuard< epicsMutex > &)
void show(unsigned level) const
#define debugPrintf(argsInParen)
unsigned channelCount(epicsGuard< epicsMutex > &)
OS-independent routines for ignoring Posix signals.
#define CA_PROTO_EVENTS_ON
void disableFlowControlRequest(epicsGuard< epicsMutex > &)
bool oldMsgHeaderAvailable
void clearChannelRequest(epicsGuard< epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid)
osiSockAddr address() const
tsDLList< nciu > createReqPend
bool sendThreadFlush(epicsGuard< epicsMutex > &)
void show(unsigned level) const
SearchDestTCP * pSearchDest
void initiateAbortShutdown(epicsGuard< epicsMutex > &)
comBufMemoryManager & comBufMemMgr
tsDLList< nciu > createRespPend
unsigned getHostName(epicsGuard< epicsMutex > &, char *pBuf, unsigned bufLength) const
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
bool _receiveThreadIsBusy
tsDLList< nciu > subscripReqPend
const char * userNamePointer() const
void createChannelRequest(nciu &, epicsGuard< epicsMutex > &)
unsigned getType(epicsGuard< epicsMutex > &) const
static const priLev priorityDefault
void pushLastComBufReceived(comBuf &)
bool setEchoRequestPending(epicsGuard< epicsMutex > &)
tsDLList< nciu > unrespCircuit
void versionRespNotify(const caHdrLargeArray &)
tsDLList< nciu > v42ConnCallbackPend
virtual void release(void *)=0
LIBCOM_API void *epicsStdCall freeListMalloc(void *pvt)