This is Unofficial EPICS BASE Doxygen Site
tcpiiu Class Reference

#include "virtualCircuit.h"

+ Inheritance diagram for tcpiiu:
+ Collaboration diagram for tcpiiu:

Public Types

enum  iiu_conn_state {
  iiucs_connecting, iiucs_connected, iiucs_clean_shutdown, iiucs_disconnected,
  iiucs_abort_shutdown
}
 

Public Member Functions

 tcpiiu (cac &cac, epicsMutex &mutualExclusion, epicsMutex &callbackControl, cacContextNotify &, double connectionTimeout, epicsTimerQueue &timerQueue, const osiSockAddr &addrIn, comBufMemoryManager &, unsigned minorVersion, ipAddrToAsciiEngine &engineIn, const cacChannel::priLev &priorityIn, SearchDestTCP *pSearchDestIn=NULL)
 
 ~tcpiiu ()
 
void start (epicsGuard< epicsMutex > &)
 
void responsiveCircuitNotify (epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
 
void sendTimeoutNotify (callbackManager &cbMgr, epicsGuard< epicsMutex > &guard)
 
void receiveTimeoutNotify (callbackManager &, epicsGuard< epicsMutex > &)
 
void beaconAnomalyNotify (epicsGuard< epicsMutex > &)
 
void beaconArrivalNotify (epicsGuard< epicsMutex > &)
 
void probeResponseNotify (epicsGuard< epicsMutex > &)
 
void flushRequest (epicsGuard< epicsMutex > &)
 
unsigned requestMessageBytesPending (epicsGuard< epicsMutex > &mutualExclusionGuard)
 
void flush (epicsGuard< epicsMutex > &mutualExclusionGuard)
 
void show (unsigned level) const
 
bool setEchoRequestPending (epicsGuard< epicsMutex > &)
 
void requestRecvProcessPostponedFlush (epicsGuard< epicsMutex > &)
 
void clearChannelRequest (epicsGuard< epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid)
 
bool ca_v41_ok (epicsGuard< epicsMutex > &) const
 
bool ca_v42_ok (epicsGuard< epicsMutex > &) const
 
bool ca_v44_ok (epicsGuard< epicsMutex > &) const
 
bool ca_v49_ok (epicsGuard< epicsMutex > &) const
 
unsigned getHostName (epicsGuard< epicsMutex > &, char *pBuf, unsigned bufLength) const throw ()
 
bool alive (epicsGuard< epicsMutex > &) const
 
bool connecting (epicsGuard< epicsMutex > &) const
 
bool receiveThreadIsBusy (epicsGuard< epicsMutex > &)
 
osiSockAddr getNetworkAddress (epicsGuard< epicsMutex > &) const
 
int printFormated (epicsGuard< epicsMutex > &cbGuard, const char *pformat,...)
 
unsigned channelCount (epicsGuard< epicsMutex > &)
 
void disconnectAllChannels (epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard, class udpiiu &)
 
void unlinkAllChannels (epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
 
void installChannel (epicsGuard< epicsMutex > &, nciu &chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn)
 
void uninstallChan (epicsGuard< epicsMutex > &guard, nciu &chan)
 
bool connectNotify (epicsGuard< epicsMutex > &, nciu &chan)
 
void searchRespNotify (const epicsTime &, const caHdrLargeArray &)
 
void versionRespNotify (const caHdrLargeArray &)
 
void * operator new (size_t size, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)
 
bool processIncoming (const epicsTime &currentTime, callbackManager &)
 
unsigned sendBytes (const void *pBuf, unsigned nBytesInBuf, const epicsTime &currentTime)
 
void recvBytes (void *pBuf, unsigned nBytesInBuf, statusWireIO &)
 
const char * pHostName (epicsGuard< epicsMutex > &) const throw ()
 
double receiveWatchdogDelay (epicsGuard< epicsMutex > &) const
 
void unresponsiveCircuitNotify (epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
 
void initiateCleanShutdown (epicsGuard< epicsMutex > &)
 
void initiateAbortShutdown (epicsGuard< epicsMutex > &)
 
void disconnectNotify (epicsGuard< epicsMutex > &)
 
bool bytesArePendingInOS () const
 
void decrementBlockingForFlushCount (epicsGuard< epicsMutex > &guard)
 
bool isNameService () const
 
void echoRequest (epicsGuard< epicsMutex > &)
 
void versionMessage (epicsGuard< epicsMutex > &, const cacChannel::priLev &priority)
 
void disableFlowControlRequest (epicsGuard< epicsMutex > &)
 
void enableFlowControlRequest (epicsGuard< epicsMutex > &)
 
void hostNameSetRequest (epicsGuard< epicsMutex > &)
 
void userNameSetRequest (epicsGuard< epicsMutex > &)
 
void createChannelRequest (nciu &, epicsGuard< epicsMutex > &)
 
void writeRequest (epicsGuard< epicsMutex > &, nciu &, unsigned type, arrayElementCount nElem, const void *pValue)
 
void writeNotifyRequest (epicsGuard< epicsMutex > &, nciu &, netWriteNotifyIO &, unsigned type, arrayElementCount nElem, const void *pValue)
 
void readNotifyRequest (epicsGuard< epicsMutex > &, nciu &, netReadNotifyIO &, unsigned type, arrayElementCount nElem)
 
void subscriptionRequest (epicsGuard< epicsMutex > &, nciu &, netSubscription &subscr)
 
void subscriptionUpdateRequest (epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
 
void subscriptionCancelRequest (epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
 
void flushIfRecvProcessRequested (epicsGuard< epicsMutex > &)
 
bool sendThreadFlush (epicsGuard< epicsMutex > &)
 
void uninstallChanDueToSuccessfulSearchResponse (epicsGuard< epicsMutex > &, nciu &, const class epicsTime &)
 
bool searchMsg (epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)
 
 tcpiiu (const tcpiiu &)
 
tcpiiuoperator= (const tcpiiu &)
 
void operator delete (void *)
 
- Public Member Functions inherited from netiiu
virtual ~netiiu ()=0
 
- Public Member Functions inherited from tsDLNode< tcpiiu >
 tsDLNode ()
 
 tsDLNode (const tsDLNode< tcpiiu > &)
 
const tsDLNode< tcpiiu > & operator= (const tsDLNode< tcpiiu > &)
 
- Public Member Functions inherited from tsSLNode< tcpiiu >
 tsSLNode ()
 
tsSLNode< tcpiiu > & operator= (const tsSLNode< tcpiiu > &)
 
- Public Member Functions inherited from caServerID
 caServerID (const struct sockaddr_in &addrIn, unsigned priority)
 
bool operator== (const caServerID &) const
 
resTableIndex hash () const
 
osiSockAddr address () const
 
unsigned priority () const
 

Public Attributes

epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread )
 
tcpSendThread sendThread
 
tcpRecvWatchdog recvDog
 
tcpSendWatchdog sendDog
 
comQueSend sendQue
 
comQueRecv recvQue
 
tsDLList< nciucreateReqPend
 
tsDLList< nciucreateRespPend
 
tsDLList< nciuv42ConnCallbackPend
 
tsDLList< nciusubscripReqPend
 
tsDLList< nciuconnectedList
 
tsDLList< nciuunrespCircuit
 
tsDLList< nciusubscripUpdateReqPend
 
caHdrLargeArray curMsg
 
arrayElementCount curDataMax
 
arrayElementCount curDataBytes
 
comBufMemoryManagercomBufMemMgr
 
caccacRef
 
char * pCurData
 
SearchDestTCPpSearchDest
 
epicsMutex & mutex
 
epicsMutex & cbMutex
 
unsigned minorProtocolVersion
 
enum tcpiiu::iiu_conn_state state
 
epicsEvent sendThreadFlushEvent
 
epicsEvent flushBlockEvent
 
SOCKET sock
 
unsigned contigRecvMsgCount
 
unsigned blockingForFlush
 
unsigned socketLibrarySendBufferSize
 
unsigned unacknowledgedSendBytes
 
unsigned channelCountTot
 
bool _receiveThreadIsBusy
 
bool busyStateDetected
 
bool flowControlActive
 
bool echoRequestPending
 
bool oldMsgHeaderAvailable
 
bool msgHeaderAvailable
 
bool earlyFlush
 
bool recvProcessPostponedFlush
 
bool discardingPendingData
 
bool socketHasBeenClosed
 
bool unresponsiveCircuit
 

Friends

class tcpRecvThread
 
class tcpSendThread
 
void SearchDestTCP::searchRequest (epicsGuard< epicsMutex > &guard, const char *pbuf, size_t len)
 

Detailed Description

Definition at line 111 of file virtualCircuit.h.

Member Enumeration Documentation

Enumerator
iiucs_connecting 
iiucs_connected 
iiucs_clean_shutdown 
iiucs_disconnected 
iiucs_abort_shutdown 

Definition at line 233 of file virtualCircuit.h.

233  {
234  iiucs_connecting, // pending circuit connect
235  iiucs_connected, // live circuit
236  iiucs_clean_shutdown, // live circuit will shutdown when flush completes
237  iiucs_disconnected, // socket informed us of disconnect
238  iiucs_abort_shutdown // socket has been closed
239  } state;
enum tcpiiu::iiu_conn_state state

Constructor & Destructor Documentation

tcpiiu::tcpiiu ( cac cac,
epicsMutex &  mutualExclusion,
epicsMutex &  callbackControl,
cacContextNotify ctxNotifyIn,
double  connectionTimeout,
epicsTimerQueue &  timerQueue,
const osiSockAddr addrIn,
comBufMemoryManager comBufMemMgrIn,
unsigned  minorVersion,
ipAddrToAsciiEngine engineIn,
const cacChannel::priLev priorityIn,
SearchDestTCP pSearchDestIn = NULL 
)

Definition at line 665 of file tcpiiu.cpp.

672  :
673  caServerID ( addrIn.ia, priorityIn ),
674  hostNameCacheInstance ( addrIn, engineIn ),
675  recvThread ( *this, cbMutexIn, ctxNotifyIn, "CAC-TCP-recv",
678  sendThread ( *this, "CAC-TCP-send",
682  recvDog ( cbMutexIn, ctxNotifyIn, mutexIn,
683  *this, connectionTimeout, timerQueue ),
684  sendDog ( cbMutexIn, ctxNotifyIn, mutexIn,
685  *this, connectionTimeout, timerQueue ),
686  sendQue ( *this, comBufMemMgrIn ),
687  recvQue ( comBufMemMgrIn ),
688  curDataMax ( MAX_TCP ),
689  curDataBytes ( 0ul ),
690  comBufMemMgr ( comBufMemMgrIn ),
691  cacRef ( cac ),
692  pCurData ( (char*) freeListMalloc(this->cacRef.tcpSmallRecvBufFreeList) ),
693  pSearchDest ( pSearchDestIn ),
694  mutex ( mutexIn ),
695  cbMutex ( cbMutexIn ),
696  minorProtocolVersion ( minorVersion ),
698  sock ( INVALID_SOCKET ),
699  contigRecvMsgCount ( 0u ),
700  blockingForFlush ( 0u ),
701  socketLibrarySendBufferSize ( 0x1000 ),
703  channelCountTot ( 0u ),
704  _receiveThreadIsBusy ( false ),
705  busyStateDetected ( false ),
706  flowControlActive ( false ),
707  echoRequestPending ( false ),
708  oldMsgHeaderAvailable ( false ),
709  msgHeaderAvailable ( false ),
710  earlyFlush ( false ),
711  recvProcessPostponedFlush ( false ),
712  discardingPendingData ( false ),
713  socketHasBeenClosed ( false ),
714  unresponsiveCircuit ( false )
715 {
716  if(!pCurData)
717  throw std::bad_alloc();
718 
719  this->sock = epicsSocketCreate ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
720  if ( this->sock == INVALID_SOCKET ) {
721  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
722  char sockErrBuf[64];
724  sockErrBuf, sizeof ( sockErrBuf ) );
725  std :: string reason =
726  "CAC: TCP circuit creation failure because \"";
727  reason += sockErrBuf;
728  reason += "\"";
729  throw runtime_error ( reason );
730  }
731 
732  int flag = true;
733  int status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY,
734  (char *) &flag, sizeof ( flag ) );
735  if ( status < 0 ) {
736  char sockErrBuf[64];
738  sockErrBuf, sizeof ( sockErrBuf ) );
739  errlogPrintf ( "CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
740  sockErrBuf );
741  }
742 
743  flag = true;
744  status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE,
745  ( char * ) &flag, sizeof ( flag ) );
746  if ( status < 0 ) {
747  char sockErrBuf[64];
749  sockErrBuf, sizeof ( sockErrBuf ) );
750  errlogPrintf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n",
751  sockErrBuf );
752  }
753 
754  // load message queue with messages informing server
755  // of version, user, and host name of client
756  {
757  epicsGuard < epicsMutex > guard ( this->mutex );
758  this->versionMessage ( guard, this->priority() );
759  this->userNameSetRequest ( guard );
760  this->hostNameSetRequest ( guard );
761  }
762 
763 # if 0
764  {
765  int i;
766 
767  /*
768  * some concern that vxWorks will run out of mBuf's
769  * if this change is made joh 11-10-98
770  */
771  i = MAX_MSG_SIZE;
772  status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF,
773  ( char * ) &i, sizeof ( i ) );
774  if (status < 0) {
775  char sockErrBuf[64];
776  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
777  errlogPrintf ( "CAC: problems setting socket option SO_SNDBUF = \"%s\"\n",
778  sockErrBuf );
779  }
780  i = MAX_MSG_SIZE;
781  status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF,
782  ( char * ) &i, sizeof ( i ) );
783  if ( status < 0 ) {
784  char sockErrBuf[64];
785  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
786  errlogPrintf ( "CAC: problems setting socket option SO_RCVBUF = \"%s\"\n",
787  sockErrBuf );
788  }
789  }
790 # endif
791 
792  {
793  int nBytes;
794  osiSocklen_t sizeOfParameter = static_cast < int > ( sizeof ( nBytes ) );
795  status = getsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF,
796  ( char * ) &nBytes, &sizeOfParameter );
797  if ( status < 0 || nBytes < 0 ||
798  sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) {
799  char sockErrBuf[64];
801  sockErrBuf, sizeof ( sockErrBuf ) );
802  errlogPrintf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n",
803  sockErrBuf );
804  }
805  else {
806  this->socketLibrarySendBufferSize = static_cast < unsigned > ( nBytes );
807  }
808  }
809 
810  if ( isNameService() ) {
811  pSearchDest->setCircuit ( this );
812  }
813 
814  memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
815 }
#define MAX_TCP
Definition: caProto.h:63
char * pCurData
unsigned getInitializingThreadsPriority() const
Definition: cac.h:353
#define INVALID_SOCKET
Definition: osdSock.h:32
bool socketHasBeenClosed
unsigned minorProtocolVersion
pvd::Status status
int osiSocklen_t
Definition: osdSock.h:36
int i
Definition: scan.c:967
epicsMutex & cbMutex
arrayElementCount curDataMax
struct sockaddr_in ia
Definition: osiSock.h:157
bool unresponsiveCircuit
unsigned blockingForFlush
cac & cacRef
void versionMessage(epicsGuard< epicsMutex > &, const cacChannel::priLev &priority)
Definition: tcpiiu.cpp:1380
arrayElementCount curDataBytes
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
static unsigned highestPriorityLevelBelow(unsigned priority)
Definition: cac.cpp:374
bool earlyFlush
bool recvProcessPostponedFlush
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
void hostNameSetRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1291
comQueSend sendQue
bool isNameService() const
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
tcpSendWatchdog sendDog
unsigned channelCountTot
tcpSendThread sendThread
tcpRecvWatchdog recvDog
LIBCOM_API void epicsStdCall freeListFree(void *pvt, void *pmem)
Definition: freeListLib.c:131
unsigned unacknowledgedSendBytes
unsigned priority() const
Definition: caServerID.h:80
epicsMutex & mutex
caHdrLargeArray curMsg
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
unsigned socketLibrarySendBufferSize
unsigned contigRecvMsgCount
bool busyStateDetected
void setCircuit(tcpiiu *)
bool flowControlActive
comQueRecv recvQue
bool discardingPendingData
#define MAX_MSG_SIZE
Definition: caProto.h:64
caServerID(const struct sockaddr_in &addrIn, unsigned priority)
Definition: caServerID.h:39
void userNameSetRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1321
static unsigned lowestPriorityLevelAbove(unsigned priority)
Definition: cac.cpp:362
SOCKET sock
bool oldMsgHeaderAvailable
SearchDestTCP * pSearchDest
comBufMemoryManager & comBufMemMgr
bool _receiveThreadIsBusy
bool echoRequestPending
bool msgHeaderAvailable
LIBCOM_API void *epicsStdCall freeListMalloc(void *pvt)
Definition: freeListLib.c:74
tcpiiu::~tcpiiu ( )

Definition at line 1014 of file tcpiiu.cpp.

1015 {
1016  if ( this->pSearchDest ) {
1017  this->pSearchDest->disable ();
1018  }
1019 
1020  this->sendThread.exitWait ();
1021  this->recvThread.exitWait ();
1022  this->sendDog.cancel ();
1023  this->recvDog.shutdown ();
1024 
1025  if ( ! this->socketHasBeenClosed ) {
1026  epicsSocketDestroy ( this->sock );
1027  }
1028 
1029  // free message body cache
1030  if ( this->pCurData ) {
1031  if ( this->curDataMax <= MAX_TCP ) {
1032  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
1033  }
1034  else if ( this->cacRef.tcpLargeRecvBufFreeList ) {
1035  freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData);
1036  }
1037  else {
1038  free ( this->pCurData );
1039  }
1040  }
1041 }
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
#define MAX_TCP
Definition: caProto.h:63
char * pCurData
bool socketHasBeenClosed
arrayElementCount curDataMax
cac & cacRef
void exitWait()
Definition: tcpiiu.cpp:69
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
void disable()
Definition: tcpiiu.cpp:2144
tcpSendWatchdog sendDog
tcpSendThread sendThread
tcpRecvWatchdog recvDog
LIBCOM_API void epicsStdCall freeListFree(void *pvt, void *pmem)
Definition: freeListLib.c:131
SOCKET sock
SearchDestTCP * pSearchDest
tcpiiu::tcpiiu ( const tcpiiu )

Member Function Documentation

bool tcpiiu::alive ( epicsGuard< epicsMutex > &  ) const
inline

Definition at line 371 of file virtualCircuit.h.

373 {
374  return ( this->state == iiucs_connecting ||
375  this->state == iiucs_connected );
376 }
enum tcpiiu::iiu_conn_state state
void tcpiiu::beaconAnomalyNotify ( epicsGuard< epicsMutex > &  guard)
inline

Definition at line 391 of file virtualCircuit.h.

393 {
394  //guard.assertIdenticalMutex ( this->cacRef.mutexRef () );
395  this->recvDog.beaconAnomalyNotify ( guard );
396 }
tcpRecvWatchdog recvDog
void beaconAnomalyNotify(epicsGuard< epicsMutex > &)
void tcpiiu::beaconArrivalNotify ( epicsGuard< epicsMutex > &  guard)
inline

Definition at line 398 of file virtualCircuit.h.

400 {
401  //guard.assertIdenticalMutex ( this->cacRef.mutexRef () );
402  this->recvDog.beaconArrivalNotify ( guard );
403 }
tcpRecvWatchdog recvDog
void beaconArrivalNotify(epicsGuard< epicsMutex > &)
bool tcpiiu::bytesArePendingInOS ( ) const

Definition at line 2045 of file tcpiiu.cpp.

2046 {
2047 #if 0
2048  FD_SET readBits;
2049  FD_ZERO ( & readBits );
2050  FD_SET ( this->sock, & readBits );
2051  struct timeval tmo;
2052  tmo.tv_sec = 0;
2053  tmo.tv_usec = 0;
2054  int status = select ( this->sock + 1, & readBits, NULL, NULL, & tmo );
2055  if ( status > 0 ) {
2056  if ( FD_ISSET ( this->sock, & readBits ) ) {
2057  return true;
2058  }
2059  }
2060  return false;
2061 #else
2062  osiSockIoctl_t bytesPending = 0; /* shut up purifys yapping */
2063  int status = socket_ioctl ( this->sock,
2064  FIONREAD, & bytesPending );
2065  if ( status >= 0 ) {
2066  if ( bytesPending > 0 ) {
2067  return true;
2068  }
2069  }
2070  return false;
2071 #endif
2072 }
pvd::Status status
#define NULL
Definition: catime.c:38
#define socket_ioctl(A, B, C)
Definition: osdSock.h:34
int osiSockIoctl_t
Definition: osdSock.h:35
BSD and SRV5 Unix timestamp.
Definition: epicsTime.h:52
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
SOCKET sock
bool tcpiiu::ca_v41_ok ( epicsGuard< epicsMutex > &  ) const
inlinevirtual

Implements netiiu.

Definition at line 353 of file virtualCircuit.h.

355 {
356  return CA_V41 ( this->minorProtocolVersion );
357 }
unsigned minorProtocolVersion
#define CA_V41(MINOR)
Definition: caProto.h:32
bool tcpiiu::ca_v42_ok ( epicsGuard< epicsMutex > &  guard) const
virtual

Implements netiiu.

Definition at line 1777 of file tcpiiu.cpp.

1779 {
1780  guard.assertIdenticalMutex ( this->mutex );
1781  return CA_V42 ( this->minorProtocolVersion );
1782 }
unsigned minorProtocolVersion
#define CA_V42(MINOR)
Definition: caProto.h:33
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
bool tcpiiu::ca_v44_ok ( epicsGuard< epicsMutex > &  ) const
inline

Definition at line 359 of file virtualCircuit.h.

361 {
362  return CA_V44 ( this->minorProtocolVersion );
363 }
unsigned minorProtocolVersion
#define CA_V44(MINOR)
Definition: caProto.h:35
bool tcpiiu::ca_v49_ok ( epicsGuard< epicsMutex > &  ) const
inline

Definition at line 365 of file virtualCircuit.h.

367 {
368  return CA_V49 ( this->minorProtocolVersion );
369 }
unsigned minorProtocolVersion
#define CA_V49(MINOR)
Definition: caProto.h:40
unsigned tcpiiu::channelCount ( epicsGuard< epicsMutex > &  guard)

Definition at line 2113 of file tcpiiu.cpp.

2114 {
2115  guard.assertIdenticalMutex ( this->mutex );
2116  return this->channelCountTot;
2117 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned channelCountTot
epicsMutex & mutex
void tcpiiu::clearChannelRequest ( epicsGuard< epicsMutex > &  guard,
ca_uint32_t  sid,
ca_uint32_t  cid 
)
virtual

Implements netiiu.

Definition at line 1536 of file tcpiiu.cpp.

1538 {
1539  guard.assertIdenticalMutex ( this->mutex );
1540  // there are situations where the circuit is disconnected, but
1541  // the channel does not know this yet
1542  if ( this->state != iiucs_connected ) {
1543  return;
1544  }
1545  comQueSendMsgMinder minder ( this->sendQue, guard );
1548  0u, 0u, sid, cid,
1549  CA_V49 ( this->minorProtocolVersion ) );
1550  minder.commit ();
1551 }
unsigned minorProtocolVersion
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
#define CA_PROTO_CLEAR_CHANNEL
Definition: caProto.h:95
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
bool tcpiiu::connecting ( epicsGuard< epicsMutex > &  ) const
inline

Definition at line 378 of file virtualCircuit.h.

380 {
381  return ( this->state == iiucs_connecting );
382 }
enum tcpiiu::iiu_conn_state state
bool tcpiiu::connectNotify ( epicsGuard< epicsMutex > &  guard,
nciu chan 
)

Definition at line 1958 of file tcpiiu.cpp.

1960 {
1961  guard.assertIdenticalMutex ( this->mutex );
1962  bool wasExpected = false;
1963  // this improves robustness in the face of a server sending
1964  // protocol that does not match its declared protocol revision
1965  if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) {
1966  this->createRespPend.remove ( chan );
1967  this->subscripReqPend.add ( chan );
1968  chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1969  wasExpected = true;
1970  }
1971  else if ( chan.channelNode::listMember == channelNode::cs_v42ConnCallbackPend ) {
1972  this->v42ConnCallbackPend.remove ( chan );
1973  this->subscripReqPend.add ( chan );
1974  chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1975  wasExpected = true;
1976  }
1977  // the TCP send thread is awakened by its receive thread whenever the receive thread
1978  // is about to block if this->subscripReqPend has items in it
1979  return wasExpected;
1980 }
void add(T &item)
Definition: tsDLList.h:313
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
void remove(T &item)
Definition: tsDLList.h:219
tsDLList< nciu > createRespPend
tsDLList< nciu > subscripReqPend
tsDLList< nciu > v42ConnCallbackPend
void tcpiiu::createChannelRequest ( nciu chan,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 1487 of file tcpiiu.cpp.

1489 {
1490  guard.assertIdenticalMutex ( this->mutex );
1491 
1492  if ( this->state != iiucs_connected &&
1493  this->state != iiucs_connecting ) {
1494  return;
1495  }
1496 
1497  const char *pName;
1498  unsigned nameLength;
1499  ca_uint32_t identity;
1500  if ( this->ca_v44_ok ( guard ) ) {
1501  identity = chan.getCID ( guard );
1502  pName = chan.pName ( guard );
1503  nameLength = chan.nameLen ( guard );
1504  }
1505  else {
1506  identity = chan.getSID ( guard );
1507  pName = 0;
1508  nameLength = 0u;
1509  }
1510 
1511  unsigned postCnt = CA_MESSAGE_ALIGN ( nameLength );
1512 
1513  if ( postCnt >= 0xffff ) {
1515  }
1516 
1517  comQueSendMsgMinder minder ( this->sendQue, guard );
1518  //
1519  // The available field is used (abused)
1520  // here to communicate the minor version number
1521  // starting with CA 4.1.
1522  //
1524  CA_PROTO_CREATE_CHAN, postCnt,
1525  0u, 0u, identity, CA_MINOR_PROTOCOL_REVISION,
1526  CA_V49 ( this->minorProtocolVersion ) );
1527  if ( nameLength ) {
1528  this->sendQue.pushString ( pName, nameLength );
1529  }
1530  if ( postCnt > nameLength ) {
1531  this->sendQue.pushString ( cacNillBytes, postCnt - nameLength );
1532  }
1533  minder.commit ();
1534 }
bool ca_v44_ok(epicsGuard< epicsMutex > &) const
#define CA_PROTO_CREATE_CHAN
Definition: caProto.h:101
unsigned nameLen(epicsGuard< epicsMutex > &) const
Definition: nciu.cpp:252
unsigned minorProtocolVersion
void pushString(const char *pVal, unsigned nChar)
Definition: comQueSend.h:203
const char * pName(epicsGuard< epicsMutex > &) const
Definition: nciu.cpp:226
ca_uint32_t getCID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:297
unsigned int ca_uint32_t
Definition: caProto.h:76
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_MINOR_PROTOCOL_REVISION
Definition: nciu.h:35
comQueSend sendQue
#define CA_MESSAGE_ALIGN(A)
Definition: caProto.h:154
const char cacNillBytes[]
Definition: comQueSend.cpp:74
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
void tcpiiu::decrementBlockingForFlushCount ( epicsGuard< epicsMutex > &  guard)

Definition at line 1758 of file tcpiiu.cpp.

1760 {
1761  guard.assertIdenticalMutex ( this->mutex );
1762  assert ( this->blockingForFlush > 0u );
1763  this->blockingForFlush--;
1764  if ( this->blockingForFlush > 0 ) {
1765  this->flushBlockEvent.signal ();
1766  }
1767 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsEvent flushBlockEvent
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned blockingForFlush
epicsMutex & mutex
void tcpiiu::disableFlowControlRequest ( epicsGuard< epicsMutex > &  guard)

Definition at line 1348 of file tcpiiu.cpp.

1350 {
1351  guard.assertIdenticalMutex ( this->mutex );
1352 
1353  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1354  this->flushRequest ( guard );
1355  }
1356  comQueSendMsgMinder minder ( this->sendQue, guard );
1358  CA_PROTO_EVENTS_ON, 0u,
1359  0u, 0u, 0u, 0u,
1360  CA_V49 ( this->minorProtocolVersion ) );
1361  minder.commit ();
1362 }
unsigned minorProtocolVersion
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
epicsMutex & mutex
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
#define CA_PROTO_EVENTS_ON
Definition: caProto.h:92
void tcpiiu::disconnectAllChannels ( epicsGuard< epicsMutex > &  cbGuard,
epicsGuard< epicsMutex > &  guard,
class udpiiu discIIU 
)

Definition at line 1806 of file tcpiiu.cpp.

1810 {
1811  cbGuard.assertIdenticalMutex ( this->cbMutex );
1812  guard.assertIdenticalMutex ( this->mutex );
1813 
1814  while ( nciu * pChan = this->createReqPend.get () ) {
1815  discIIU.installDisconnectedChannel ( guard, *pChan );
1816  }
1817 
1818  while ( nciu * pChan = this->createRespPend.get () ) {
1819  // we dont yet know the server's id so we cant
1820  // send a channel delete request and will instead
1821  // trust that the server can do the proper cleanup
1822  // when the circuit disconnects
1823  discIIU.installDisconnectedChannel ( guard, *pChan );
1824  }
1825 
1826  while ( nciu * pChan = this->v42ConnCallbackPend.get () ) {
1827  this->clearChannelRequest ( guard,
1828  pChan->getSID(guard), pChan->getCID(guard) );
1829  discIIU.installDisconnectedChannel ( guard, *pChan );
1830  }
1831 
1832  while ( nciu * pChan = this->subscripReqPend.get () ) {
1833  pChan->disconnectAllIO ( cbGuard, guard );
1834  this->clearChannelRequest ( guard,
1835  pChan->getSID(guard), pChan->getCID(guard) );
1836  discIIU.installDisconnectedChannel ( guard, *pChan );
1837  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1838  }
1839 
1840  while ( nciu * pChan = this->connectedList.get () ) {
1841  pChan->disconnectAllIO ( cbGuard, guard );
1842  this->clearChannelRequest ( guard,
1843  pChan->getSID(guard), pChan->getCID(guard) );
1844  discIIU.installDisconnectedChannel ( guard, *pChan );
1845  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1846  }
1847 
1848  while ( nciu * pChan = this->unrespCircuit.get () ) {
1849  // if we know that the circuit is unresponsive
1850  // then we dont send a channel delete request and
1851  // will instead trust that the server can do the
1852  // proper cleanup when the circuit disconnects
1853  pChan->disconnectAllIO ( cbGuard, guard );
1854  discIIU.installDisconnectedChannel ( guard, *pChan );
1855  }
1856 
1857  while ( nciu * pChan = this->subscripUpdateReqPend.get () ) {
1858  pChan->disconnectAllIO ( cbGuard, guard );
1859  this->clearChannelRequest ( guard,
1860  pChan->getSID(guard), pChan->getCID(guard) );
1861  discIIU.installDisconnectedChannel ( guard, *pChan );
1862  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1863  }
1864 
1865  this->channelCountTot = 0u;
1866  this->initiateCleanShutdown ( guard );
1867 }
tsDLList< nciu > subscripUpdateReqPend
tsDLList< nciu > connectedList
epicsMutex & cbMutex
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned channelCountTot
epicsMutex & mutex
void initiateCleanShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:826
T * get()
Definition: tsDLList.h:261
void clearChannelRequest(epicsGuard< epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid)
Definition: tcpiiu.cpp:1536
tsDLList< nciu > createReqPend
tsDLList< nciu > createRespPend
tsDLList< nciu > subscripReqPend
tsDLList< nciu > unrespCircuit
Definition: nciu.h:127
tsDLList< nciu > v42ConnCallbackPend
void tcpiiu::disconnectNotify ( epicsGuard< epicsMutex > &  guard)

Definition at line 851 of file tcpiiu.cpp.

853 {
854  guard.assertIdenticalMutex ( this->mutex );
855  this->state = iiucs_disconnected;
856  this->sendThreadFlushEvent.signal ();
857  this->flushBlockEvent.signal ();
858 }
epicsEvent flushBlockEvent
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
epicsEvent sendThreadFlushEvent
void tcpiiu::echoRequest ( epicsGuard< epicsMutex > &  guard)

Definition at line 1400 of file tcpiiu.cpp.

1401 {
1402  guard.assertIdenticalMutex ( this->mutex );
1403 
1404  epicsUInt16 command = CA_PROTO_ECHO;
1405  if ( ! CA_V43 ( this->minorProtocolVersion ) ) {
1406  // we fake an echo to early server using a read sync
1407  command = CA_PROTO_READ_SYNC;
1408  }
1409 
1410  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1411  this->flushRequest ( guard );
1412  }
1413  comQueSendMsgMinder minder ( this->sendQue, guard );
1415  command, 0u,
1416  0u, 0u, 0u, 0u,
1417  CA_V49 ( this->minorProtocolVersion ) );
1418  minder.commit ();
1419 }
unsigned minorProtocolVersion
#define CA_V43(MINOR)
Definition: caProto.h:34
unsigned short epicsUInt16
Definition: epicsTypes.h:41
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_PROTO_READ_SYNC
Definition: caProto.h:93
comQueSend sendQue
epicsMutex & mutex
#define CA_PROTO_ECHO
Definition: caProto.h:106
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
void tcpiiu::enableFlowControlRequest ( epicsGuard< epicsMutex > &  guard)

Definition at line 1364 of file tcpiiu.cpp.

1366 {
1367  guard.assertIdenticalMutex ( this->mutex );
1368 
1369  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1370  this->flushRequest ( guard );
1371  }
1372  comQueSendMsgMinder minder ( this->sendQue, guard );
1374  CA_PROTO_EVENTS_OFF, 0u,
1375  0u, 0u, 0u, 0u,
1376  CA_V49 ( this->minorProtocolVersion ) );
1377  minder.commit ();
1378 }
unsigned minorProtocolVersion
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_PROTO_EVENTS_OFF
Definition: caProto.h:91
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
epicsMutex & mutex
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
void tcpiiu::flush ( epicsGuard< epicsMutex > &  mutualExclusionGuard)
virtual

Implements netiiu.

Definition at line 1709 of file tcpiiu.cpp.

1710 {
1711  this->flushRequest ( guard );
1712  // the process thread is not permitted to flush as this
1713  // can result in a push / pull deadlock on the TCP pipe.
1714  // Instead, the process thread scheduals the flush with the
1715  // send thread which runs at a higher priority than the
1716  // receive thread. The same applies to the UDP thread for
1717  // locking hierarchy reasons.
1719  // enable / disable of call back preemption must occur here
1720  // because the tcpiiu might disconnect while waiting and its
1721  // pointer to this cac might become invalid
1722  assert ( this->blockingForFlush < UINT_MAX );
1723  this->blockingForFlush++;
1724  while ( this->sendQue.flushBlockThreshold() ) {
1725 
1726  bool userRequestsCanBeAccepted =
1727  this->state == iiucs_connected ||
1728  ( ! this->ca_v42_ok ( guard ) &&
1729  this->state == iiucs_connecting );
1730  // fail the users request if we have a disconnected
1731  // or unresponsive circuit
1732  if ( ! userRequestsCanBeAccepted ||
1733  this->unresponsiveCircuit ) {
1734  this->decrementBlockingForFlushCount ( guard );
1735  throw cacChannel::notConnected ();
1736  }
1737 
1738  epicsGuardRelease < epicsMutex > unguard ( guard );
1739  this->flushBlockEvent.wait ( 30.0 );
1740  }
1741  this->decrementBlockingForFlushCount ( guard );
1742  }
1743 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsEvent flushBlockEvent
LIBCOM_API void *epicsStdCall epicsThreadPrivateGet(epicsThreadPrivateId)
Definition: osdThread.c:973
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool unresponsiveCircuit
unsigned blockingForFlush
bool ca_v42_ok(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:1777
epicsThreadPrivateId caClientCallbackThreadId
void decrementBlockingForFlushCount(epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:1758
comQueSend sendQue
bool flushBlockThreshold() const
Definition: comQueSend.h:221
enum tcpiiu::iiu_conn_state state
void tcpiiu::flushIfRecvProcessRequested ( epicsGuard< epicsMutex > &  guard)

Definition at line 1137 of file tcpiiu.cpp.

1139 {
1140  if ( this->recvProcessPostponedFlush ) {
1141  this->flushRequest ( guard );
1142  this->recvProcessPostponedFlush = false;
1143  }
1144 }
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool recvProcessPostponedFlush
void tcpiiu::flushRequest ( epicsGuard< epicsMutex > &  )
virtual

Implements netiiu.

Definition at line 2038 of file tcpiiu.cpp.

2039 {
2040  if ( this->sendQue.occupiedBytes () > 0 ) {
2041  this->sendThreadFlushEvent.signal ();
2042  }
2043 }
unsigned occupiedBytes() const
Definition: comQueSend.h:216
comQueSend sendQue
epicsEvent sendThreadFlushEvent
unsigned tcpiiu::getHostName ( epicsGuard< epicsMutex > &  guard,
char *  pBuf,
unsigned  bufLength 
) const
throw (
)
virtual

Implements netiiu.

Definition at line 1791 of file tcpiiu.cpp.

1794 {
1795  guard.assertIdenticalMutex ( this->mutex );
1796  return this->hostNameCacheInstance.getName ( pBuf, bufLength );
1797 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
osiSockAddr tcpiiu::getNetworkAddress ( epicsGuard< epicsMutex > &  guard) const
virtual

Implements netiiu.

Definition at line 1769 of file tcpiiu.cpp.

1771 {
1772  guard.assertIdenticalMutex ( this->mutex );
1773  return this->address();
1774 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
osiSockAddr address() const
Definition: caServerID.h:73
void tcpiiu::hostNameSetRequest ( epicsGuard< epicsMutex > &  guard)

Definition at line 1291 of file tcpiiu.cpp.

1292 {
1293  guard.assertIdenticalMutex ( this->mutex );
1294 
1295  if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
1296  return;
1297  }
1298 
1299  const char * pName = this->cacRef.pLocalHostName ();
1300  unsigned size = strlen ( pName ) + 1u;
1301  unsigned postSize = CA_MESSAGE_ALIGN ( size );
1302  assert ( postSize < 0xffff );
1303 
1304  if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) {
1305  this->flushRequest ( guard );
1306  }
1307 
1308  comQueSendMsgMinder minder ( this->sendQue, guard );
1310  CA_PROTO_HOST_NAME, postSize,
1311  0u, 0u, 0u, 0u,
1312  CA_V49 ( this->minorProtocolVersion ) );
1313  this->sendQue.pushString ( pName, size );
1314  this->sendQue.pushString ( cacNillBytes, postSize - size );
1315  minder.commit ();
1316 }
const char * pLocalHostName()
Definition: cac.h:408
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
unsigned minorProtocolVersion
void pushString(const char *pVal, unsigned nChar)
Definition: comQueSend.h:203
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
cac & cacRef
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_PROTO_HOST_NAME
Definition: caProto.h:104
comQueSend sendQue
#define CA_MESSAGE_ALIGN(A)
Definition: caProto.h:154
const char cacNillBytes[]
Definition: comQueSend.cpp:74
epicsMutex & mutex
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
#define CA_V41(MINOR)
Definition: caProto.h:32
void tcpiiu::initiateAbortShutdown ( epicsGuard< epicsMutex > &  guard)

Definition at line 942 of file tcpiiu.cpp.

944 {
945  guard.assertIdenticalMutex ( this->mutex );
946 
947  if ( ! this->discardingPendingData ) {
948  // force abortive shutdown sequence
949  // (discard outstanding sends and receives)
950  struct linger tmpLinger;
951  tmpLinger.l_onoff = true;
952  tmpLinger.l_linger = 0u;
953  int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
954  reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
955  if ( status != 0 ) {
956  char sockErrBuf[64];
958  sockErrBuf, sizeof ( sockErrBuf ) );
959  errlogPrintf ( "CAC TCP socket linger set error was %s\n",
960  sockErrBuf );
961  }
962  this->discardingPendingData = true;
963  }
964 
965  iiu_conn_state oldState = this->state;
966  if ( oldState != iiucs_abort_shutdown && oldState != iiucs_disconnected ) {
967  this->state = iiucs_abort_shutdown;
968 
971  switch ( info ) {
973  //
974  // on winsock and probably vxWorks shutdown() does not
975  // unblock a thread in recv() so we use close() and introduce
976  // some complexity because we must unregister the fd early
977  //
978  if ( ! this->socketHasBeenClosed ) {
979  epicsSocketDestroy ( this->sock );
980  this->socketHasBeenClosed = true;
981  }
982  break;
984  {
985  int status = ::shutdown ( this->sock, SHUT_RDWR );
986  if ( status ) {
987  char sockErrBuf[64];
989  sockErrBuf, sizeof ( sockErrBuf ) );
990  errlogPrintf ("CAC TCP socket shutdown error was %s\n",
991  sockErrBuf );
992  }
993  }
994  break;
996  this->recvThread.interruptSocketRecv ();
998  break;
999  default:
1000  break;
1001  };
1002 
1003  //
1004  // wake up the send thread if it isnt blocking in send()
1005  //
1006  this->sendThreadFlushEvent.signal ();
1007  this->flushBlockEvent.signal ();
1008  }
1009 }
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
bool socketHasBeenClosed
pvd::Status status
epicsEvent flushBlockEvent
void interruptSocketSend()
Definition: tcpiiu.cpp:2093
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsSocketSystemCallInterruptMechanismQueryInfo
Definition: osiSock.h:47
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
tcpSendThread sendThread
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
epicsEvent sendThreadFlushEvent
bool discardingPendingData
SOCKET sock
#define SHUT_RDWR
Definition: osdSock.h:48
void tcpiiu::initiateCleanShutdown ( epicsGuard< epicsMutex > &  guard)

Definition at line 826 of file tcpiiu.cpp.

828 {
829  guard.assertIdenticalMutex ( this->mutex );
830 
831  if ( this->state == iiucs_connected ) {
832  if ( this->unresponsiveCircuit ) {
833  this->initiateAbortShutdown ( guard );
834  }
835  else {
836  this->state = iiucs_clean_shutdown;
837  this->sendThreadFlushEvent.signal ();
838  this->flushBlockEvent.signal ();
839  }
840  }
841  else if ( this->state == iiucs_clean_shutdown ) {
842  if ( this->unresponsiveCircuit ) {
843  this->initiateAbortShutdown ( guard );
844  }
845  }
846  else if ( this->state == iiucs_connecting ) {
847  this->initiateAbortShutdown ( guard );
848  }
849 }
epicsEvent flushBlockEvent
bool unresponsiveCircuit
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
epicsEvent sendThreadFlushEvent
void initiateAbortShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:942
void tcpiiu::installChannel ( epicsGuard< epicsMutex > &  guard,
nciu chan,
unsigned  sidIn,
ca_uint16_t  typeIn,
arrayElementCount  countIn 
)

Definition at line 1942 of file tcpiiu.cpp.

1946 {
1947  guard.assertIdenticalMutex ( this->mutex );
1948 
1949  this->createReqPend.add ( chan );
1950  this->channelCountTot++;
1951  chan.channelNode::listMember = channelNode::cs_createReqPend;
1952  chan.searchReplySetUp ( *this, sidIn, typeIn, countIn, guard );
1953  // The tcp send thread runs at apriority below the udp thread
1954  // so that this will not send small packets
1955  this->sendThreadFlushEvent.signal ();
1956 }
void add(T &item)
Definition: tsDLList.h:313
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
void searchReplySetUp(netiiu &iiu, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn, epicsGuard< epicsMutex > &)
Definition: nciu.h:311
unsigned channelCountTot
epicsMutex & mutex
epicsEvent sendThreadFlushEvent
tsDLList< nciu > createReqPend
bool tcpiiu::isNameService ( ) const
inline

Definition at line 411 of file virtualCircuit.h.

412 {
413  return ( this->pSearchDest != NULL );
414 }
#define NULL
Definition: catime.c:38
SearchDestTCP * pSearchDest
void tcpiiu::operator delete ( void *  )

Definition at line 2101 of file tcpiiu.cpp.

2102 {
2103  // Visual C++ .net appears to require operator delete if
2104  // placement operator delete is defined? I smell a ms rat
2105  // because if I declare placement new and delete, but
2106  // comment out the placement delete definition there are
2107  // no undefined symbols.
2108  errlogPrintf ( "%s:%d this compiler is confused about "
2109  "placement delete - memory was probably leaked",
2110  __FILE__, __LINE__ );
2111 }
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
void * tcpiiu::operator new ( size_t  size,
tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &  mgr 
)
inline

Definition at line 339 of file virtualCircuit.h.

341 {
342  return mgr.allocate ( size );
343 }
void * allocate(size_t size)
Definition: tsFreeList.h:126
tcpiiu& tcpiiu::operator= ( const tcpiiu )
const char * tcpiiu::pHostName ( epicsGuard< epicsMutex > &  guard) const
throw (
)
virtual

Implements netiiu.

Definition at line 1799 of file tcpiiu.cpp.

1801 {
1802  guard.assertIdenticalMutex ( this->mutex );
1803  return this->hostNameCacheInstance.pointer ();
1804 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
int tcpiiu::printFormated ( epicsGuard< epicsMutex > &  cbGuard,
const char *  pformat,
  ... 
)

Definition at line 2020 of file tcpiiu.cpp.

2023 {
2024  cbGuard.assertIdenticalMutex ( this->cbMutex );
2025 
2026  va_list theArgs;
2027  int status;
2028 
2029  va_start ( theArgs, pformat );
2030 
2031  status = this->cacRef.varArgsPrintFormated ( cbGuard, pformat, theArgs );
2032 
2033  va_end ( theArgs );
2034 
2035  return status;
2036 }
pvd::Status status
epicsMutex & cbMutex
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
cac & cacRef
int varArgsPrintFormated(epicsGuard< epicsMutex > &callbackControl, const char *pformat, va_list args) const
Definition: cac.h:363
void tcpiiu::probeResponseNotify ( epicsGuard< epicsMutex > &  cbGuard)
inline

Definition at line 405 of file virtualCircuit.h.

407 {
408  this->recvDog.probeResponseNotify ( cbGuard );
409 }
void probeResponseNotify(epicsGuard< epicsMutex > &)
tcpRecvWatchdog recvDog
bool tcpiiu::processIncoming ( const epicsTime &  currentTime,
callbackManager mgr 
)

Definition at line 1146 of file tcpiiu.cpp.

1149 {
1150  mgr.cbGuard.assertIdenticalMutex ( this->cbMutex );
1151 
1152  while ( true ) {
1153 
1154  //
1155  // fetch a complete message header
1156  //
1157  if ( ! this->msgHeaderAvailable ) {
1158  if ( ! this->oldMsgHeaderAvailable ) {
1159  this->oldMsgHeaderAvailable =
1160  this->recvQue.popOldMsgHeader ( this->curMsg );
1161  if ( ! this->oldMsgHeaderAvailable ) {
1162  epicsGuard < epicsMutex > guard ( this->mutex );
1163  this->flushIfRecvProcessRequested ( guard );
1164  return true;
1165  }
1166  }
1167  if ( this->curMsg.m_postsize == 0xffff ) {
1168  static const unsigned annexSize =
1169  sizeof ( this->curMsg.m_postsize ) +
1170  sizeof ( this->curMsg.m_count );
1171  if ( this->recvQue.occupiedBytes () < annexSize ) {
1172  epicsGuard < epicsMutex > guard ( this->mutex );
1173  this->flushIfRecvProcessRequested ( guard );
1174  return true;
1175  }
1176  this->curMsg.m_postsize = this->recvQue.popUInt32 ();
1177  this->curMsg.m_count = this->recvQue.popUInt32 ();
1178  }
1179  this->msgHeaderAvailable = true;
1180 # ifdef DEBUG
1181  epicsGuard < epicsMutex > guard ( this->mutex );
1182  debugPrintf (
1183  ( "%s Cmd=%3u Type=%3u Count=%8u Size=%8u",
1184  this->pHostName ( guard ),
1185  this->curMsg.m_cmmd,
1186  this->curMsg.m_dataType,
1187  this->curMsg.m_count,
1188  this->curMsg.m_postsize) );
1189  debugPrintf (
1190  ( " Avail=%8u Cid=%8u\n",
1191  this->curMsg.m_available,
1192  this->curMsg.m_cid) );
1193 # endif
1194  }
1195 
1196  // check for 8 byte aligned protocol
1197  if ( this->curMsg.m_postsize & 0x7 ) {
1198  this->printFormated ( mgr.cbGuard,
1199  "CAC: server sent missaligned payload 0x%x\n",
1200  this->curMsg.m_postsize );
1201  return false;
1202  }
1203 
1204  //
1205  // make sure we have a large enough message body cache
1206  //
1207  if ( this->curMsg.m_postsize > this->curDataMax ) {
1208  assert (this->curMsg.m_postsize > MAX_TCP);
1209 
1210  char * newbuf = NULL;
1211  arrayElementCount newsize;
1212 
1213  if ( !this->cacRef.tcpLargeRecvBufFreeList ) {
1214  // round size up to multiple of 4K
1215  newsize = ((this->curMsg.m_postsize-1)|0xfff)+1;
1216 
1217  if ( this->curDataMax <= MAX_TCP ) {
1218  // small -> large
1219  newbuf = (char*)malloc(newsize);
1220 
1221  } else {
1222  // expand large to larger
1223  newbuf = (char*)realloc(this->pCurData, newsize);
1224  }
1225 
1226  } else if ( this->curMsg.m_postsize <= this->cacRef.maxRecvBytesTCP ) {
1227  newbuf = (char*) freeListMalloc(this->cacRef.tcpLargeRecvBufFreeList);
1228  newsize = this->cacRef.maxRecvBytesTCP;
1229 
1230  }
1231 
1232  if ( newbuf) {
1233  if (this->curDataMax <= MAX_TCP) {
1234  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData );
1235 
1236  } else if (this->cacRef.tcpLargeRecvBufFreeList) {
1237  freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData );
1238 
1239  } else {
1240  // called realloc()
1241  }
1242  this->pCurData = newbuf;
1243  this->curDataMax = newsize;
1244 
1245  } else {
1246  this->printFormated ( mgr.cbGuard,
1247  "CAC: not enough memory for message body cache (ignoring response message)\n");
1248  }
1249  }
1250 
1251  if ( this->curMsg.m_postsize <= this->curDataMax ) {
1252  if ( this->curMsg.m_postsize > 0u ) {
1253  this->curDataBytes += this->recvQue.copyOutBytes (
1254  &this->pCurData[this->curDataBytes],
1255  this->curMsg.m_postsize - this->curDataBytes );
1256  if ( this->curDataBytes < this->curMsg.m_postsize ) {
1257  epicsGuard < epicsMutex > guard ( this->mutex );
1258  this->flushIfRecvProcessRequested ( guard );
1259  return true;
1260  }
1261  }
1262  bool msgOK = this->cacRef.executeResponse ( mgr, *this,
1263  currentTime, this->curMsg, this->pCurData );
1264  if ( ! msgOK ) {
1265  return false;
1266  }
1267  }
1268  else {
1269  static bool once = false;
1270  if ( ! once ) {
1271  this->printFormated ( mgr.cbGuard,
1272  "CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n",
1273  this->curMsg.m_postsize );
1274  once = true;
1275  }
1276  this->curDataBytes += this->recvQue.removeBytes (
1277  this->curMsg.m_postsize - this->curDataBytes );
1278  if ( this->curDataBytes < this->curMsg.m_postsize ) {
1279  epicsGuard < epicsMutex > guard ( this->mutex );
1280  this->flushIfRecvProcessRequested ( guard );
1281  return true;
1282  }
1283  }
1284 
1285  this->oldMsgHeaderAvailable = false;
1286  this->msgHeaderAvailable = false;
1287  this->curDataBytes = 0u;
1288  }
1289 }
#define MAX_TCP
Definition: caProto.h:63
char * pCurData
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsMutex & cbMutex
arrayElementCount curDataMax
ca_uint32_t m_postsize
bool popOldMsgHeader(struct caHdrLargeArray &)
Definition: comQueRecv.cpp:230
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define NULL
Definition: catime.c:38
cac & cacRef
arrayElementCount curDataBytes
int printFormated(epicsGuard< epicsMutex > &cbGuard, const char *pformat,...)
Definition: tcpiiu.cpp:2020
const char * pHostName(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:1799
ca_uint32_t m_count
bool executeResponse(callbackManager &, tcpiiu &, const epicsTime &currentTime, caHdrLargeArray &, char *pMsgBody)
Definition: cac.cpp:1204
void flushIfRecvProcessRequested(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1137
unsigned removeBytes(unsigned nBytes)
Definition: comQueRecv.cpp:69
LIBCOM_API void epicsStdCall freeListFree(void *pvt, void *pmem)
Definition: freeListLib.c:131
unsigned occupiedBytes() const
Definition: comQueRecv.h:60
epicsMutex & mutex
caHdrLargeArray curMsg
epicsGuard< epicsMutex > cbGuard
Definition: cac.h:94
unsigned long arrayElementCount
Definition: cacIO.h:57
comQueRecv recvQue
unsigned copyOutBytes(epicsInt8 *pBuf, unsigned nBytes)
Definition: comQueRecv.cpp:48
ca_uint16_t m_cmmd
ca_uint32_t m_available
#define debugPrintf(argsInParen)
Definition: iocinf.h:30
bool oldMsgHeaderAvailable
epicsUInt32 popUInt32()
Definition: comQueRecv.cpp:211
bool msgHeaderAvailable
LIBCOM_API void *epicsStdCall freeListMalloc(void *pvt)
Definition: freeListLib.c:74
void tcpiiu::readNotifyRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
netReadNotifyIO io,
unsigned  type,
arrayElementCount  nElem 
)
virtual

Implements netiiu.

Definition at line 1455 of file tcpiiu.cpp.

1458 {
1459  guard.assertIdenticalMutex ( this->mutex );
1460  if ( INVALID_DB_REQ ( dataType ) ) {
1461  throw cacChannel::badType ();
1462  }
1463  arrayElementCount maxBytes;
1464  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1465  maxBytes = 0xfffffff0;
1466  }
1467  else {
1468  maxBytes = MAX_TCP;
1469  }
1470  arrayElementCount maxElem =
1471  ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1472  if ( nElem > maxElem ) {
1474  }
1475  if (nElem == 0 && !CA_V413(this->minorProtocolVersion))
1476  nElem = chan.getcount();
1477  comQueSendMsgMinder minder ( this->sendQue, guard );
1480  static_cast < ca_uint16_t > ( dataType ),
1481  static_cast < ca_uint32_t > ( nElem ),
1482  chan.getSID(guard), io.getId(),
1483  CA_V49 ( this->minorProtocolVersion ) );
1484  minder.commit ();
1485 }
#define MAX_TCP
Definition: caProto.h:63
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:896
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:847
unsigned minorProtocolVersion
#define INVALID_DB_REQ(x)
Definition: db_access.h:115
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
const T getId() const
Definition: resourceLib.h:1021
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_V413(MINOR)
Definition: caProto.h:44
comQueSend sendQue
epicsMutex & mutex
unsigned getcount() const
Definition: nciu.h:200
unsigned long arrayElementCount
Definition: cacIO.h:57
#define CA_PROTO_READ_NOTIFY
Definition: caProto.h:98
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
bool tcpiiu::receiveThreadIsBusy ( epicsGuard< epicsMutex > &  guard)
inline

Definition at line 384 of file virtualCircuit.h.

386 {
387  guard.assertIdenticalMutex ( this->mutex );
388  return this->_receiveThreadIsBusy;
389 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
bool _receiveThreadIsBusy
void tcpiiu::receiveTimeoutNotify ( callbackManager mgr,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 889 of file tcpiiu.cpp.

892 {
893  mgr.cbGuard.assertIdenticalMutex ( this->cbMutex );
894  guard.assertIdenticalMutex ( this->mutex );
895  this->unresponsiveCircuitNotify ( mgr.cbGuard, guard );
896 }
epicsMutex & cbMutex
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
void unresponsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:898
epicsMutex & mutex
epicsGuard< epicsMutex > cbGuard
Definition: cac.h:94
double tcpiiu::receiveWatchdogDelay ( epicsGuard< epicsMutex > &  ) const
virtual

Implements netiiu.

Definition at line 2074 of file tcpiiu.cpp.

2076 {
2077  return this->recvDog.delay ();
2078 }
double delay() const
tcpRecvWatchdog recvDog
void tcpiiu::recvBytes ( void *  pBuf,
unsigned  nBytesInBuf,
statusWireIO stat 
)
virtual

Implements wireRecvAdapter.

Definition at line 299 of file tcpiiu.cpp.

301 {
302  assert ( nBytesInBuf <= INT_MAX );
303 
304  while ( true ) {
305  int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
306  static_cast <int> ( nBytesInBuf ), 0 );
307 
308  if ( status > 0 ) {
309  stat.bytesCopied = static_cast <unsigned> ( status );
310  assert ( stat.bytesCopied <= nBytesInBuf );
312  return;
313  }
314  else {
315  epicsGuard < epicsMutex > guard ( this->mutex );
316 
317  if ( status == 0 ) {
318  this->disconnectNotify ( guard );
319  stat.bytesCopied = 0u;
321  return;
322  }
323 
324  // if the circuit was locally aborted then supress
325  // warning messages about bad file descriptor etc
326  if ( this->state != iiucs_connected &&
327  this->state != iiucs_clean_shutdown ) {
328  stat.bytesCopied = 0u;
330  return;
331  }
332 
333  int localErrno = SOCKERRNO;
334 
335  if ( localErrno == SOCK_SHUTDOWN ) {
336  stat.bytesCopied = 0u;
338  return;
339  }
340 
341  if ( localErrno == SOCK_EINTR ) {
342  continue;
343  }
344 
345  if ( localErrno == SOCK_ENOBUFS ) {
346  errlogPrintf (
347  "CAC: system low on network buffers "
348  "- receive retry in 15 seconds\n" );
349  {
350  epicsGuardRelease < epicsMutex > unguard ( guard );
351  epicsThreadSleep ( 15.0 );
352  }
353  continue;
354  }
355 
356  char sockErrBuf[64];
358  sockErrBuf, sizeof ( sockErrBuf ) );
359 
360  // the replacable printf handler isnt called here
361  // because it reqires a callback lock which probably
362  // isnt appropriate here
363  char name[64];
364  this->hostNameCacheInstance.getName (
365  name, sizeof ( name ) );
366  errlogPrintf (
367  "Unexpected problem with CA circuit to"
368  " server \"%s\" was \"%s\" - disconnecting\n",
369  name, sockErrBuf );
370 
371  stat.bytesCopied = 0u;
373  return;
374  }
375  }
376 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
pvd::Status status
#define SOCK_ENOBUFS
Definition: osdSock.h:52
swioCircuitState circuitState
Definition: comBuf.h:64
#define SOCK_SHUTDOWN
Definition: osdSock.h:67
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
unsigned bytesCopied
Definition: comBuf.h:63
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
#define SOCKERRNO
Definition: osdSock.h:33
void disconnectNotify(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:851
#define SOCK_EINTR
Definition: osdSock.h:64
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
SOCKET sock
unsigned tcpiiu::requestMessageBytesPending ( epicsGuard< epicsMutex > &  mutualExclusionGuard)
virtual

Implements netiiu.

Definition at line 1745 of file tcpiiu.cpp.

1747 {
1748  guard.assertIdenticalMutex ( this->mutex );
1749 #if 0
1750  if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) {
1751  this->earlyFlush = true;
1752  this->sendThreadFlushEvent.signal ();
1753  }
1754 #endif
1755  return sendQue.occupiedBytes ();
1756 }
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
bool earlyFlush
unsigned occupiedBytes() const
Definition: comQueSend.h:216
comQueSend sendQue
epicsMutex & mutex
epicsEvent sendThreadFlushEvent
void tcpiiu::requestRecvProcessPostponedFlush ( epicsGuard< epicsMutex > &  guard)
virtual

Implements netiiu.

Definition at line 1784 of file tcpiiu.cpp.

1786 {
1787  guard.assertIdenticalMutex ( this->mutex );
1788  this->recvProcessPostponedFlush = true;
1789 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
bool recvProcessPostponedFlush
epicsMutex & mutex
void tcpiiu::responsiveCircuitNotify ( epicsGuard< epicsMutex > &  cbGuard,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 860 of file tcpiiu.cpp.

863 {
864  cbGuard.assertIdenticalMutex ( this->cbMutex );
865  guard.assertIdenticalMutex ( this->mutex );
866  if ( this->unresponsiveCircuit ) {
867  this->unresponsiveCircuit = false;
868  while ( nciu * pChan = this->unrespCircuit.get() ) {
869  this->subscripUpdateReqPend.add ( *pChan );
870  pChan->channelNode::listMember =
871  channelNode::cs_subscripUpdateReqPend;
872  pChan->connect ( cbGuard, guard );
873  }
874  this->sendThreadFlushEvent.signal ();
875  }
876 }
tsDLList< nciu > subscripUpdateReqPend
void add(T &item)
Definition: tsDLList.h:313
epicsMutex & cbMutex
bool unresponsiveCircuit
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
epicsEvent sendThreadFlushEvent
T * get()
Definition: tsDLList.h:261
tsDLList< nciu > unrespCircuit
Definition: nciu.h:127
bool tcpiiu::searchMsg ( epicsGuard< epicsMutex > &  guard,
ca_uint32_t  id,
const char *  pName,
unsigned  nameLength 
)
virtual

Implements netiiu.

Definition at line 2127 of file tcpiiu.cpp.

2130 {
2131  return netiiu::searchMsg (
2132  guard, id, pName, nameLength );
2133 }
virtual bool searchMsg(epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)=0
Definition: netiiu.cpp:166
void tcpiiu::searchRespNotify ( const epicsTime &  currentTime,
const caHdrLargeArray msg 
)

Definition at line 2193 of file tcpiiu.cpp.

2195 {
2196  /*
2197  * the type field is abused to carry the port number
2198  * so that we can have multiple servers on one host
2199  */
2200  osiSockAddr serverAddr;
2201  if ( msg.m_cid != INADDR_BROADCAST ) {
2202  serverAddr.ia.sin_family = AF_INET;
2203  serverAddr.ia.sin_addr.s_addr = htonl ( msg.m_cid );
2204  serverAddr.ia.sin_port = htons ( msg.m_dataType );
2205  }
2206  else {
2207  serverAddr = this->address ();
2208  }
2210  ( msg.m_available, msg.m_cid, 0xffff,
2211  0, minorProtocolVersion, serverAddr, currentTime );
2212 }
unsigned minorProtocolVersion
struct sockaddr_in ia
Definition: osiSock.h:157
ca_uint16_t m_dataType
cac & cacRef
void transferChanToVirtCircuit(unsigned cid, unsigned sid, ca_uint16_t typeCode, arrayElementCount count, unsigned minorVersionNumber, const osiSockAddr &, const epicsTime &currentTime)
Definition: cac.cpp:587
ca_uint32_t m_cid
ca_uint32_t m_available
osiSockAddr address() const
Definition: caServerID.h:73
unsigned tcpiiu::sendBytes ( const void *  pBuf,
unsigned  nBytesInBuf,
const epicsTime &  currentTime 
)

Definition at line 231 of file tcpiiu.cpp.

233 {
234  unsigned nBytes = 0u;
235  assert ( nBytesInBuf <= INT_MAX );
236 
237  this->sendDog.start ( currentTime );
238 
239  while ( true ) {
240  int status = ::send ( this->sock,
241  static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
242  if ( status > 0 ) {
243  nBytes = static_cast <unsigned> ( status );
244  // printf("SEND: %u\n", nBytes );
245  break;
246  }
247  else {
248  epicsGuard < epicsMutex > guard ( this->mutex );
249  if ( this->state != iiucs_connected &&
250  this->state != iiucs_clean_shutdown ) {
251  break;
252  }
253  // winsock indicates disconnect by returning zero here
254  if ( status == 0 ) {
255  this->disconnectNotify ( guard );
256  break;
257  }
258 
259  int localError = SOCKERRNO;
260 
261  if ( localError == SOCK_EINTR ) {
262  continue;
263  }
264 
265  if ( localError == SOCK_ENOBUFS ) {
266  errlogPrintf (
267  "CAC: system low on network buffers "
268  "- send retry in 15 seconds\n" );
269  {
270  epicsGuardRelease < epicsMutex > unguard ( guard );
271  epicsThreadSleep ( 15.0 );
272  }
273  continue;
274  }
275 
276  if (
277  localError != SOCK_EPIPE &&
278  localError != SOCK_ECONNRESET &&
279  localError != SOCK_ETIMEDOUT &&
280  localError != SOCK_ECONNABORTED &&
281  localError != SOCK_SHUTDOWN ) {
282  char sockErrBuf[64];
284  sockErrBuf, sizeof ( sockErrBuf ) );
285  errlogPrintf ( "CAC: unexpected TCP send error: %s\n",
286  sockErrBuf );
287  }
288 
289  this->disconnectNotify ( guard );
290  break;
291  }
292  }
293 
294  this->sendDog.cancel ();
295 
296  return nBytes;
297 }
#define SOCK_ECONNABORTED
Definition: osdSock.h:59
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
void start(const epicsTime &)
pvd::Status status
#define SOCK_ENOBUFS
Definition: osdSock.h:52
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
#define SOCK_SHUTDOWN
Definition: osdSock.h:67
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define SOCK_ECONNRESET
Definition: osdSock.h:53
tcpSendWatchdog sendDog
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
#define SOCKERRNO
Definition: osdSock.h:33
void disconnectNotify(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:851
#define SOCK_EPIPE
Definition: osdSock.h:65
#define SOCK_EINTR
Definition: osdSock.h:64
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
SOCKET sock
bool tcpiiu::sendThreadFlush ( epicsGuard< epicsMutex > &  guard)

Definition at line 1665 of file tcpiiu.cpp.

1666 {
1667  guard.assertIdenticalMutex ( this->mutex );
1668 
1669  if ( this->sendQue.occupiedBytes() > 0 ) {
1670  while ( comBuf * pBuf = this->sendQue.popNextComBufToSend () ) {
1671  epicsTime current = epicsTime::getCurrent ();
1672 
1673  unsigned bytesToBeSent = pBuf->occupiedBytes ();
1674  bool success = false;
1675  {
1676  // no lock while blocking to send
1677  epicsGuardRelease < epicsMutex > unguard ( guard );
1678  success = pBuf->flushToWire ( *this, current );
1679  pBuf->~comBuf ();
1680  this->comBufMemMgr.release ( pBuf );
1681  }
1682 
1683  if ( ! success ) {
1684  while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) {
1685  pBuf->~comBuf ();
1686  this->comBufMemMgr.release ( pBuf );
1687  }
1688  return false;
1689  }
1690 
1691  // set it here with this odd order because we must have
1692  // the lock and we must have already sent the bytes
1693  this->unacknowledgedSendBytes += bytesToBeSent;
1694  if ( this->unacknowledgedSendBytes >
1695  this->socketLibrarySendBufferSize ) {
1696  this->recvDog.sendBacklogProgressNotify ( guard );
1697  }
1698  }
1699  }
1700 
1701  this->earlyFlush = false;
1702  if ( this->blockingForFlush ) {
1703  this->flushBlockEvent.signal ();
1704  }
1705 
1706  return true;
1707 }
epicsEvent flushBlockEvent
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned blockingForFlush
void sendBacklogProgressNotify(epicsGuard< epicsMutex > &)
bool earlyFlush
unsigned occupiedBytes() const
Definition: comQueSend.h:216
comQueSend sendQue
tcpRecvWatchdog recvDog
unsigned unacknowledgedSendBytes
epicsMutex & mutex
unsigned socketLibrarySendBufferSize
comBuf * popNextComBufToSend()
Definition: comQueSend.cpp:259
Definition: comBuf.h:75
comBufMemoryManager & comBufMemMgr
virtual void release(void *)=0
void tcpiiu::sendTimeoutNotify ( callbackManager cbMgr,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 878 of file tcpiiu.cpp.

881 {
882  mgr.cbGuard.assertIdenticalMutex ( this-> cbMutex );
883  guard.assertIdenticalMutex ( this->mutex );
884  this->unresponsiveCircuitNotify ( mgr.cbGuard, guard );
885  // setup circuit probe sequence
886  this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard );
887 }
void sendTimeoutNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
epicsMutex & cbMutex
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
void unresponsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:898
tcpRecvWatchdog recvDog
epicsMutex & mutex
bool tcpiiu::setEchoRequestPending ( epicsGuard< epicsMutex > &  guard)

Definition at line 1121 of file tcpiiu.cpp.

1122 {
1123  guard.assertIdenticalMutex ( this->mutex );
1124 
1125  this->echoRequestPending = true;
1126  this->sendThreadFlushEvent.signal ();
1127  if ( CA_V43 ( this->minorProtocolVersion ) ) {
1128  // we send an echo
1129  return true;
1130  }
1131  else {
1132  // we send a NOOP
1133  return false;
1134  }
1135 }
unsigned minorProtocolVersion
#define CA_V43(MINOR)
Definition: caProto.h:34
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsMutex & mutex
epicsEvent sendThreadFlushEvent
bool echoRequestPending
void tcpiiu::show ( unsigned  level) const

Definition at line 1043 of file tcpiiu.cpp.

1044 {
1045  epicsGuard < epicsMutex > locker ( this->mutex );
1046  char buf[256];
1047  this->hostNameCacheInstance.getName ( buf, sizeof ( buf ) );
1048  ::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n",
1050  this->minorProtocolVersion, this->state );
1051  if ( level > 1u ) {
1052  ::printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n",
1053  static_cast < void * > ( this->pCurData ), this->curDataMax );
1054  ::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n",
1056  ::printf ( "\receive thread is busy=%u\n",
1057  this->_receiveThreadIsBusy );
1058  }
1059  if ( level > 2u ) {
1060  ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock );
1061  ::printf ( "\tsend thread flush signal:\n" );
1062  this->sendThreadFlushEvent.show ( level-2u );
1063  ::printf ( "\tsend thread:\n" );
1064  this->sendThread.show ( level-2u );
1065  ::printf ( "\trecv thread:\n" );
1066  this->recvThread.show ( level-2u );
1067  ::printf ("\techo pending bool = %u\n", this->echoRequestPending );
1068  ::printf ( "IO identifier hash table:\n" );
1069 
1070  if ( this->createReqPend.count () ) {
1071  ::printf ( "Create request pending channels\n" );
1073  while ( pChan.valid () ) {
1074  pChan->show ( level - 2u );
1075  pChan++;
1076  }
1077  }
1078  if ( this->createRespPend.count () ) {
1079  ::printf ( "Create response pending channels\n" );
1081  while ( pChan.valid () ) {
1082  pChan->show ( level - 2u );
1083  pChan++;
1084  }
1085  }
1086  if ( this->v42ConnCallbackPend.count () ) {
1087  ::printf ( "V42 Conn Callback pending channels\n" );
1089  while ( pChan.valid () ) {
1090  pChan->show ( level - 2u );
1091  pChan++;
1092  }
1093  }
1094  if ( this->subscripReqPend.count () ) {
1095  ::printf ( "Subscription request pending channels\n" );
1097  while ( pChan.valid () ) {
1098  pChan->show ( level - 2u );
1099  pChan++;
1100  }
1101  }
1102  if ( this->connectedList.count () ) {
1103  ::printf ( "Connected channels\n" );
1105  while ( pChan.valid () ) {
1106  pChan->show ( level - 2u );
1107  pChan++;
1108  }
1109  }
1110  if ( this->unrespCircuit.count () ) {
1111  ::printf ( "Unresponsive circuit channels\n" );
1113  while ( pChan.valid () ) {
1114  pChan->show ( level - 2u );
1115  pChan++;
1116  }
1117  }
1118  }
1119 }
tsDLList< nciu > connectedList
char * pCurData
unsigned minorProtocolVersion
#define CA_MAJOR_PROTOCOL_REVISION
Definition: caProto.h:26
void show(unsigned level) const
Definition: tcpiiu.cpp:65
arrayElementCount curDataMax
tsDLIterConst< T > firstIter() const
Definition: tsDLList.h:459
#define printf
Definition: epicsStdio.h:41
unsigned count() const
Definition: tsDLList.h:181
bool valid() const
Definition: tsDLList.h:506
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
tcpSendThread sendThread
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
epicsEvent sendThreadFlushEvent
unsigned contigRecvMsgCount
bool busyStateDetected
bool flowControlActive
void show(unsigned level) const
Definition: nciu.cpp:472
SOCKET sock
tsDLList< nciu > createReqPend
tsDLList< nciu > createRespPend
bool _receiveThreadIsBusy
bool echoRequestPending
tsDLList< nciu > subscripReqPend
tsDLList< nciu > unrespCircuit
tsDLList< nciu > v42ConnCallbackPend
void tcpiiu::start ( epicsGuard< epicsMutex > &  guard)

Definition at line 819 of file tcpiiu.cpp.

821 {
822  guard.assertIdenticalMutex ( this->mutex );
823  this->recvThread.start ();
824 }
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
epicsMutex & mutex
void tcpiiu::subscriptionCancelRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
netSubscription subscr 
)
virtual

Implements netiiu.

Definition at line 1645 of file tcpiiu.cpp.

1647 {
1648  guard.assertIdenticalMutex ( this->mutex );
1649  // there are situations where the circuit is disconnected, but
1650  // the channel does not know this yet
1651  if ( this->state != iiucs_connected ) {
1652  return;
1653  }
1654  comQueSendMsgMinder minder ( this->sendQue, guard );
1657  static_cast < ca_uint16_t > ( subscr.getType ( guard ) ),
1658  static_cast < ca_uint16_t > ( subscr.getCount (
1659  guard, CA_V413(this->minorProtocolVersion) ) ),
1660  chan.getSID(guard), subscr.getId(),
1661  CA_V49 ( this->minorProtocolVersion ) );
1662  minder.commit ();
1663 }
unsigned minorProtocolVersion
#define CA_PROTO_EVENT_CANCEL
Definition: caProto.h:85
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
const T getId() const
Definition: resourceLib.h:1021
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_V413(MINOR)
Definition: caProto.h:44
comQueSend sendQue
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
arrayElementCount getCount(epicsGuard< epicsMutex > &, bool allow_zero) const
Definition: netIO.h:240
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
unsigned getType(epicsGuard< epicsMutex > &) const
Definition: netIO.h:253
void tcpiiu::subscriptionRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
netSubscription subscr 
)
virtual

Implements netiiu.

Definition at line 1557 of file tcpiiu.cpp.

1560 {
1561  guard.assertIdenticalMutex ( this->mutex );
1562  // there are situations where the circuit is disconnected, but
1563  // the channel does not know this yet
1564  if ( this->state != iiucs_connected &&
1565  this->state != iiucs_connecting ) {
1566  return;
1567  }
1568  unsigned mask = subscr.getMask(guard);
1569  if ( mask > 0xffff ) {
1571  }
1572  arrayElementCount nElem = subscr.getCount (
1573  guard, CA_V413(this->minorProtocolVersion) );
1574  arrayElementCount maxBytes;
1575  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1576  maxBytes = 0xfffffff0;
1577  }
1578  else {
1579  maxBytes = MAX_TCP;
1580  }
1581  unsigned dataType = subscr.getType ( guard );
1582  // data type bounds checked when sunscription created
1583  arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1584  if ( nElem > maxElem ) {
1586  }
1587  comQueSendMsgMinder minder ( this->sendQue, guard );
1588  // nElement bounds checked above
1590  CA_PROTO_EVENT_ADD, 16u,
1591  static_cast < ca_uint16_t > ( dataType ),
1592  static_cast < ca_uint32_t > ( nElem ),
1593  chan.getSID(guard), subscr.getId(),
1594  CA_V49 ( this->minorProtocolVersion ) );
1595 
1596  // extension
1597  this->sendQue.pushFloat32 ( 0.0f ); // m_lval
1598  this->sendQue.pushFloat32 ( 0.0f ); // m_hval
1599  this->sendQue.pushFloat32 ( 0.0f ); // m_toval
1600  this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask
1601  this->sendQue.pushUInt16 ( 0u ); // m_pad
1602  minder.commit ();
1603 }
#define MAX_TCP
Definition: caProto.h:63
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:896
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:847
#define CA_PROTO_EVENT_ADD
Definition: caProto.h:84
unsigned minorProtocolVersion
void pushUInt16(const ca_uint16_t value)
Definition: comQueSend.h:188
unsigned getMask(epicsGuard< epicsMutex > &) const
Definition: netIO.h:258
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
const T getId() const
Definition: resourceLib.h:1021
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_V413(MINOR)
Definition: caProto.h:44
void pushFloat32(const ca_float32_t value)
Definition: comQueSend.h:198
comQueSend sendQue
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
arrayElementCount getCount(epicsGuard< epicsMutex > &, bool allow_zero) const
Definition: netIO.h:240
unsigned long arrayElementCount
Definition: cacIO.h:57
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
unsigned getType(epicsGuard< epicsMutex > &) const
Definition: netIO.h:253
void tcpiiu::subscriptionUpdateRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
netSubscription subscr 
)
virtual

Implements netiiu.

Definition at line 1609 of file tcpiiu.cpp.

1612 {
1613  guard.assertIdenticalMutex ( this->mutex );
1614  // there are situations where the circuit is disconnected, but
1615  // the channel does not know this yet
1616  if ( this->state != iiucs_connected ) {
1617  return;
1618  }
1619  arrayElementCount nElem = subscr.getCount (
1620  guard, CA_V413(this->minorProtocolVersion) );
1621  arrayElementCount maxBytes;
1622  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1623  maxBytes = 0xfffffff0;
1624  }
1625  else {
1626  maxBytes = MAX_TCP;
1627  }
1628  unsigned dataType = subscr.getType ( guard );
1629  // data type bounds checked when subscription constructed
1630  arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1631  if ( nElem > maxElem ) {
1633  }
1634  comQueSendMsgMinder minder ( this->sendQue, guard );
1635  // nElem boounds checked above
1638  static_cast < ca_uint16_t > ( dataType ),
1639  static_cast < ca_uint32_t > ( nElem ),
1640  chan.getSID (guard), subscr.getId (),
1641  CA_V49 ( this->minorProtocolVersion ) );
1642  minder.commit ();
1643 }
#define MAX_TCP
Definition: caProto.h:63
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:896
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:847
unsigned minorProtocolVersion
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
const T getId() const
Definition: resourceLib.h:1021
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_V413(MINOR)
Definition: caProto.h:44
comQueSend sendQue
epicsMutex & mutex
enum tcpiiu::iiu_conn_state state
arrayElementCount getCount(epicsGuard< epicsMutex > &, bool allow_zero) const
Definition: netIO.h:240
unsigned long arrayElementCount
Definition: cacIO.h:57
#define CA_PROTO_READ_NOTIFY
Definition: caProto.h:98
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
unsigned getType(epicsGuard< epicsMutex > &) const
Definition: netIO.h:253
void tcpiiu::uninstallChan ( epicsGuard< epicsMutex > &  guard,
nciu chan 
)
virtual

Implements netiiu.

Definition at line 1982 of file tcpiiu.cpp.

1984 {
1985  guard.assertIdenticalMutex ( this->mutex );
1986 
1987  switch ( chan.channelNode::listMember ) {
1988  case channelNode::cs_createReqPend:
1989  this->createReqPend.remove ( chan );
1990  break;
1991  case channelNode::cs_createRespPend:
1992  this->createRespPend.remove ( chan );
1993  break;
1994  case channelNode::cs_v42ConnCallbackPend:
1995  this->v42ConnCallbackPend.remove ( chan );
1996  break;
1997  case channelNode::cs_subscripReqPend:
1998  this->subscripReqPend.remove ( chan );
1999  break;
2000  case channelNode::cs_connected:
2001  this->connectedList.remove ( chan );
2002  break;
2003  case channelNode::cs_unrespCircuit:
2004  this->unrespCircuit.remove ( chan );
2005  break;
2006  case channelNode::cs_subscripUpdateReqPend:
2007  this->subscripUpdateReqPend.remove ( chan );
2008  break;
2009  default:
2010  errlogPrintf (
2011  "cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" );
2012  }
2013  chan.channelNode::listMember = channelNode::cs_none;
2014  this->channelCountTot--;
2015  if ( this->channelCountTot == 0 && ! this->isNameService() ) {
2016  this->initiateCleanShutdown ( guard );
2017  }
2018 }
tsDLList< nciu > subscripUpdateReqPend
tsDLList< nciu > connectedList
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
bool isNameService() const
unsigned channelCountTot
epicsMutex & mutex
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
void initiateCleanShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:826
void remove(T &item)
Definition: tsDLList.h:219
tsDLList< nciu > createReqPend
tsDLList< nciu > createRespPend
tsDLList< nciu > subscripReqPend
tsDLList< nciu > unrespCircuit
tsDLList< nciu > v42ConnCallbackPend
void tcpiiu::uninstallChanDueToSuccessfulSearchResponse ( epicsGuard< epicsMutex > &  guard,
nciu chan,
const class epicsTime &  currentTime 
)
virtual

Implements netiiu.

Definition at line 2119 of file tcpiiu.cpp.

2122 {
2124  guard, chan, currentTime );
2125 }
virtual void uninstallChanDueToSuccessfulSearchResponse(epicsGuard< epicsMutex > &, nciu &, const class epicsTime &currentTime)=0
Definition: netiiu.cpp:159
void tcpiiu::unlinkAllChannels ( epicsGuard< epicsMutex > &  cbGuard,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 1869 of file tcpiiu.cpp.

1872 {
1873  cbGuard.assertIdenticalMutex ( this->cbMutex );
1874  guard.assertIdenticalMutex ( this->mutex );
1875 
1876  while ( nciu * pChan = this->createReqPend.get () ) {
1877  pChan->channelNode::listMember =
1878  channelNode::cs_none;
1879  pChan->serviceShutdownNotify ( cbGuard, guard );
1880  }
1881 
1882  while ( nciu * pChan = this->createRespPend.get () ) {
1883  pChan->channelNode::listMember =
1884  channelNode::cs_none;
1885  // we dont yet know the server's id so we cant
1886  // send a channel delete request and will instead
1887  // trust that the server can do the proper cleanup
1888  // when the circuit disconnects
1889  pChan->serviceShutdownNotify ( cbGuard, guard );
1890  }
1891 
1892  while ( nciu * pChan = this->v42ConnCallbackPend.get () ) {
1893  pChan->channelNode::listMember =
1894  channelNode::cs_none;
1895  this->clearChannelRequest ( guard,
1896  pChan->getSID(guard), pChan->getCID(guard) );
1897  pChan->serviceShutdownNotify ( cbGuard, guard );
1898  }
1899 
1900  while ( nciu * pChan = this->subscripReqPend.get () ) {
1901  pChan->channelNode::listMember =
1902  channelNode::cs_none;
1903  pChan->disconnectAllIO ( cbGuard, guard );
1904  this->clearChannelRequest ( guard,
1905  pChan->getSID(guard), pChan->getCID(guard) );
1906  pChan->serviceShutdownNotify ( cbGuard, guard );
1907  }
1908 
1909  while ( nciu * pChan = this->connectedList.get () ) {
1910  pChan->channelNode::listMember =
1911  channelNode::cs_none;
1912  pChan->disconnectAllIO ( cbGuard, guard );
1913  this->clearChannelRequest ( guard,
1914  pChan->getSID(guard), pChan->getCID(guard) );
1915  pChan->serviceShutdownNotify ( cbGuard, guard );
1916  }
1917 
1918  while ( nciu * pChan = this->unrespCircuit.get () ) {
1919  pChan->channelNode::listMember =
1920  channelNode::cs_none;
1921  pChan->disconnectAllIO ( cbGuard, guard );
1922  // if we know that the circuit is unresponsive
1923  // then we dont send a channel delete request and
1924  // will instead trust that the server can do the
1925  // proper cleanup when the circuit disconnects
1926  pChan->serviceShutdownNotify ( cbGuard, guard );
1927  }
1928 
1929  while ( nciu * pChan = this->subscripUpdateReqPend.get () ) {
1930  pChan->channelNode::listMember =
1931  channelNode::cs_none;
1932  pChan->disconnectAllIO ( cbGuard, guard );
1933  this->clearChannelRequest ( guard,
1934  pChan->getSID(guard), pChan->getCID(guard) );
1935  pChan->serviceShutdownNotify ( cbGuard, guard );
1936  }
1937 
1938  this->channelCountTot = 0u;
1939  this->initiateCleanShutdown ( guard );
1940 }
tsDLList< nciu > subscripUpdateReqPend
tsDLList< nciu > connectedList
epicsMutex & cbMutex
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned channelCountTot
epicsMutex & mutex
void initiateCleanShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:826
T * get()
Definition: tsDLList.h:261
void clearChannelRequest(epicsGuard< epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid)
Definition: tcpiiu.cpp:1536
tsDLList< nciu > createReqPend
tsDLList< nciu > createRespPend
tsDLList< nciu > subscripReqPend
tsDLList< nciu > unrespCircuit
Definition: nciu.h:127
tsDLList< nciu > v42ConnCallbackPend
void tcpiiu::unresponsiveCircuitNotify ( epicsGuard< epicsMutex > &  cbGuard,
epicsGuard< epicsMutex > &  guard 
)

Definition at line 898 of file tcpiiu.cpp.

901 {
902  cbGuard.assertIdenticalMutex ( this->cbMutex );
903  guard.assertIdenticalMutex ( this->mutex );
904 
905  if ( ! this->unresponsiveCircuit ) {
906  this->unresponsiveCircuit = true;
907  this->echoRequestPending = true;
908  this->sendThreadFlushEvent.signal ();
909  this->flushBlockEvent.signal ();
910 
911  // must not hold lock when canceling timer
912  {
913  epicsGuardRelease < epicsMutex > unguard ( guard );
914  {
915  epicsGuardRelease < epicsMutex > cbUnguard ( cbGuard );
916  this->recvDog.cancel ();
917  this->sendDog.cancel ();
918  }
919  }
920 
921  if ( this->connectedList.count() ) {
922  char hostNameTmp[128];
923  this->getHostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) );
924  genLocalExcep ( cbGuard, guard, this->cacRef,
925  ECA_UNRESPTMO, hostNameTmp );
926  while ( nciu * pChan = this->connectedList.get () ) {
927  // The cac lock is released herein so there is concern that
928  // the list could be changed while we are traversing it.
929  // However, this occurs only if a circuit disconnects,
930  // a user deletes a channel, or a server disconnects a
931  // channel. The callback lock must be taken in all of
932  // these situations so this code is protected.
933  this->unrespCircuit.add ( *pChan );
934  pChan->channelNode::listMember =
935  channelNode::cs_unrespCircuit;
936  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
937  }
938  }
939  }
940 }
tsDLList< nciu > connectedList
void add(T &item)
Definition: tsDLList.h:313
epicsEvent flushBlockEvent
epicsMutex & cbMutex
bool unresponsiveCircuit
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
cac & cacRef
unsigned count() const
Definition: tsDLList.h:181
tcpSendWatchdog sendDog
tcpRecvWatchdog recvDog
#define ECA_UNRESPTMO
Definition: caerr.h:137
epicsMutex & mutex
epicsEvent sendThreadFlushEvent
T * get()
Definition: tsDLList.h:261
#define genLocalExcep(CBGUARD, GUARD, CAC, STAT, PCTX)
Definition: iocinf.h:66
unsigned getHostName(epicsGuard< epicsMutex > &, char *pBuf, unsigned bufLength) const
Definition: tcpiiu.cpp:1791
bool echoRequestPending
tsDLList< nciu > unrespCircuit
Definition: nciu.h:127
void tcpiiu::userNameSetRequest ( epicsGuard< epicsMutex > &  guard)

Definition at line 1321 of file tcpiiu.cpp.

1322 {
1323  guard.assertIdenticalMutex ( this->mutex );
1324 
1325  if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
1326  return;
1327  }
1328 
1329  const char *pName = this->cacRef.userNamePointer ();
1330  unsigned size = strlen ( pName ) + 1u;
1331  unsigned postSize = CA_MESSAGE_ALIGN ( size );
1332  assert ( postSize < 0xffff );
1333 
1334  if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) {
1335  this->flushRequest ( guard );
1336  }
1337 
1338  comQueSendMsgMinder minder ( this->sendQue, guard );
1340  CA_PROTO_CLIENT_NAME, postSize,
1341  0u, 0u, 0u, 0u,
1342  CA_V49 ( this->minorProtocolVersion ) );
1343  this->sendQue.pushString ( pName, size );
1344  this->sendQue.pushString ( cacNillBytes, postSize - size );
1345  minder.commit ();
1346 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
unsigned minorProtocolVersion
void pushString(const char *pVal, unsigned nChar)
Definition: comQueSend.h:203
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
cac & cacRef
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
#define CA_MESSAGE_ALIGN(A)
Definition: caProto.h:154
const char cacNillBytes[]
Definition: comQueSend.cpp:74
epicsMutex & mutex
#define CA_PROTO_CLIENT_NAME
Definition: caProto.h:103
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
const char * userNamePointer() const
Definition: cac.h:348
#define CA_V41(MINOR)
Definition: caProto.h:32
void tcpiiu::versionMessage ( epicsGuard< epicsMutex > &  guard,
const cacChannel::priLev priority 
)

Definition at line 1380 of file tcpiiu.cpp.

1382 {
1383  guard.assertIdenticalMutex ( this->mutex );
1384 
1385  assert ( priority <= 0xffff );
1386 
1387  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1388  this->flushRequest ( guard );
1389  }
1390 
1391  comQueSendMsgMinder minder ( this->sendQue, guard );
1393  CA_PROTO_VERSION, 0u,
1394  static_cast < ca_uint16_t > ( priority ),
1396  CA_V49 ( this->minorProtocolVersion ) );
1397  minder.commit ();
1398 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
unsigned minorProtocolVersion
#define CA_PROTO_VERSION
Definition: caProto.h:83
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
#define CA_MINOR_PROTOCOL_REVISION
Definition: nciu.h:35
comQueSend sendQue
unsigned priority() const
Definition: caServerID.h:80
epicsMutex & mutex
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
void tcpiiu::versionRespNotify ( const caHdrLargeArray msg)

Definition at line 2188 of file tcpiiu.cpp.

2189 {
2190  this->minorProtocolVersion = msg.m_count;
2191 }
unsigned minorProtocolVersion
ca_uint32_t m_count
void tcpiiu::writeNotifyRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
netWriteNotifyIO io,
unsigned  type,
arrayElementCount  nElem,
const void *  pValue 
)
virtual

Implements netiiu.

Definition at line 1436 of file tcpiiu.cpp.

1439 {
1440  guard.assertIdenticalMutex ( this->mutex );
1441 
1442  if ( ! this->ca_v41_ok ( guard ) ) {
1444  }
1445  if ( INVALID_DB_REQ ( type ) ) {
1446  throw cacChannel::badType ();
1447  }
1448  comQueSendMsgMinder minder ( this->sendQue, guard );
1450  type, nElem, chan.getSID(guard), io.getId(), pValue,
1451  CA_V49 ( this->minorProtocolVersion ) );
1452  minder.commit ();
1453 }
unsigned minorProtocolVersion
#define CA_PROTO_WRITE_NOTIFY
Definition: caProto.h:102
pvd::StructureConstPtr type
#define INVALID_DB_REQ(x)
Definition: db_access.h:115
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
const T getId() const
Definition: resourceLib.h:1021
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
epicsMutex & mutex
bool ca_v41_ok(epicsGuard< epicsMutex > &) const
void insertRequestWithPayLoad(ca_uint16_t request, unsigned dataType, arrayElementCount nElem, ca_uint32_t cid, ca_uint32_t requestDependent, const void *pPayload, bool v49Ok)
Definition: comQueSend.cpp:317
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
void tcpiiu::writeRequest ( epicsGuard< epicsMutex > &  guard,
nciu chan,
unsigned  type,
arrayElementCount  nElem,
const void *  pValue 
)
virtual

Implements netiiu.

Definition at line 1421 of file tcpiiu.cpp.

1423 {
1424  guard.assertIdenticalMutex ( this->mutex );
1425  if ( INVALID_DB_REQ ( type ) ) {
1426  throw cacChannel::badType ();
1427  }
1428  comQueSendMsgMinder minder ( this->sendQue, guard );
1430  type, nElem, chan.getSID(guard), chan.getCID(guard), pValue,
1431  CA_V49 ( this->minorProtocolVersion ) );
1432  minder.commit ();
1433 }
unsigned minorProtocolVersion
#define CA_PROTO_WRITE
Definition: caProto.h:87
ca_uint32_t getCID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:297
pvd::StructureConstPtr type
#define INVALID_DB_REQ(x)
Definition: db_access.h:115
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define CA_V49(MINOR)
Definition: caProto.h:40
comQueSend sendQue
epicsMutex & mutex
void insertRequestWithPayLoad(ca_uint16_t request, unsigned dataType, arrayElementCount nElem, ca_uint32_t cid, ca_uint32_t requestDependent, const void *pPayload, bool v49Ok)
Definition: comQueSend.cpp:317
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291

Friends And Related Function Documentation

void SearchDestTCP::searchRequest ( epicsGuard< epicsMutex > &  guard,
const char *  pbuf,
size_t  len 
)
friend
friend class tcpRecvThread
friend

Definition at line 331 of file virtualCircuit.h.

friend class tcpSendThread
friend

Definition at line 332 of file virtualCircuit.h.

Member Data Documentation

bool tcpiiu::_receiveThreadIsBusy

Definition at line 248 of file virtualCircuit.h.

unsigned tcpiiu::blockingForFlush

Definition at line 244 of file virtualCircuit.h.

bool tcpiiu::busyStateDetected

Definition at line 249 of file virtualCircuit.h.

cac& tcpiiu::cacRef

Definition at line 227 of file virtualCircuit.h.

epicsMutex& tcpiiu::cbMutex

Definition at line 231 of file virtualCircuit.h.

unsigned tcpiiu::channelCountTot

Definition at line 247 of file virtualCircuit.h.

comBufMemoryManager& tcpiiu::comBufMemMgr

Definition at line 226 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::connectedList

Definition at line 220 of file virtualCircuit.h.

unsigned tcpiiu::contigRecvMsgCount

Definition at line 243 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::createReqPend

Definition at line 216 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::createRespPend

Definition at line 217 of file virtualCircuit.h.

arrayElementCount tcpiiu::curDataBytes

Definition at line 225 of file virtualCircuit.h.

arrayElementCount tcpiiu::curDataMax

Definition at line 224 of file virtualCircuit.h.

caHdrLargeArray tcpiiu::curMsg

Definition at line 223 of file virtualCircuit.h.

bool tcpiiu::discardingPendingData

Definition at line 256 of file virtualCircuit.h.

bool tcpiiu::earlyFlush

Definition at line 254 of file virtualCircuit.h.

bool tcpiiu::echoRequestPending

Definition at line 251 of file virtualCircuit.h.

bool tcpiiu::flowControlActive

Definition at line 250 of file virtualCircuit.h.

epicsEvent tcpiiu::flushBlockEvent

Definition at line 241 of file virtualCircuit.h.

unsigned tcpiiu::minorProtocolVersion

Definition at line 232 of file virtualCircuit.h.

bool tcpiiu::msgHeaderAvailable

Definition at line 253 of file virtualCircuit.h.

epicsMutex& tcpiiu::mutex

Definition at line 230 of file virtualCircuit.h.

bool tcpiiu::oldMsgHeaderAvailable

Definition at line 252 of file virtualCircuit.h.

char* tcpiiu::pCurData

Definition at line 228 of file virtualCircuit.h.

SearchDestTCP* tcpiiu::pSearchDest

Definition at line 229 of file virtualCircuit.h.

tcpRecvWatchdog tcpiiu::recvDog

Definition at line 210 of file virtualCircuit.h.

bool tcpiiu::recvProcessPostponedFlush

Definition at line 255 of file virtualCircuit.h.

comQueRecv tcpiiu::recvQue

Definition at line 213 of file virtualCircuit.h.

epicsPlacementDeleteOperator (( void *, tsFreeList < class tcpiiu, 32, epicsMutexNOOP > & )) private tcpRecvThrea tcpiiu::recvThread)

Definition at line 203 of file virtualCircuit.h.

tcpSendWatchdog tcpiiu::sendDog

Definition at line 211 of file virtualCircuit.h.

comQueSend tcpiiu::sendQue

Definition at line 212 of file virtualCircuit.h.

tcpSendThread tcpiiu::sendThread

Definition at line 209 of file virtualCircuit.h.

epicsEvent tcpiiu::sendThreadFlushEvent

Definition at line 240 of file virtualCircuit.h.

SOCKET tcpiiu::sock

Definition at line 242 of file virtualCircuit.h.

bool tcpiiu::socketHasBeenClosed

Definition at line 257 of file virtualCircuit.h.

unsigned tcpiiu::socketLibrarySendBufferSize

Definition at line 245 of file virtualCircuit.h.

enum tcpiiu::iiu_conn_state tcpiiu::state
tsDLList< nciu > tcpiiu::subscripReqPend

Definition at line 219 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::subscripUpdateReqPend

Definition at line 222 of file virtualCircuit.h.

unsigned tcpiiu::unacknowledgedSendBytes

Definition at line 246 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::unrespCircuit

Definition at line 221 of file virtualCircuit.h.

bool tcpiiu::unresponsiveCircuit

Definition at line 258 of file virtualCircuit.h.

tsDLList< nciu > tcpiiu::v42ConnCallbackPend

Definition at line 218 of file virtualCircuit.h.


The documentation for this class was generated from the following files: