This is Unofficial EPICS BASE Doxygen Site
tcpiiu.cpp
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2002 The University of Chicago, as Operator of Argonne
3 * National Laboratory.
4 * Copyright (c) 2002 The Regents of the University of California, as
5 * Operator of Los Alamos National Laboratory.
6 * EPICS BASE is distributed subject to a Software License Agreement found
7 * in file LICENSE that is included with this distribution.
8 \*************************************************************************/
9 
10 /*
11  * L O S A L A M O S
12  * Los Alamos National Laboratory
13  * Los Alamos, New Mexico 87545
14  *
15  * Copyright, 1986, The Regents of the University of California.
16  *
17  * Author: Jeff Hill
18  *
19  */
20 
21 #ifdef _MSC_VER
22 # pragma warning(disable:4355)
23 #endif
24 
25 #define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
26 
27 #include <stdexcept>
28 #include <string>
29 
30 #include <stdlib.h>
31 
32 #include "errlog.h"
33 
34 #include "localHostName.h"
35 #include "iocinf.h"
36 #include "virtualCircuit.h"
37 #include "inetAddrID.h"
38 #include "cac.h"
39 #include "netiiu.h"
40 #include "hostNameCache.h"
41 #include "net_convert.h"
42 #include "bhe.h"
43 #include "epicsSignal.h"
44 #include "caerr.h"
45 #include "udpiiu.h"
46 
47 using namespace std;
48 
50  class tcpiiu & iiuIn, const char * pName,
51  unsigned stackSize, unsigned priority ) :
52  thread ( *this, pName, stackSize, priority ), iiu ( iiuIn )
53 {
54 }
55 
57 {
58 }
59 
61 {
62  this->thread.start ();
63 }
64 
65 void tcpSendThread::show ( unsigned /* level */ ) const
66 {
67 }
68 
70 {
71  this->thread.exitWait ();
72 }
73 
74 void tcpSendThread::run ()
75 {
76  try {
77  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
78 
79  bool laborPending = false;
80 
81  while ( true ) {
82 
83  // dont wait if there is still labor to be done below
84  if ( ! laborPending ) {
85  epicsGuardRelease < epicsMutex > unguard ( guard );
86  this->iiu.sendThreadFlushEvent.wait ();
87  }
88 
89  if ( this->iiu.state != tcpiiu::iiucs_connected ) {
90  break;
91  }
92 
93  laborPending = false;
94  bool flowControlLaborNeeded =
95  this->iiu.busyStateDetected != this->iiu.flowControlActive;
96  bool echoLaborNeeded = this->iiu.echoRequestPending;
97  this->iiu.echoRequestPending = false;
98 
99  if ( flowControlLaborNeeded ) {
100  if ( this->iiu.flowControlActive ) {
101  this->iiu.disableFlowControlRequest ( guard );
102  this->iiu.flowControlActive = false;
103  debugPrintf ( ( "fc off\n" ) );
104  }
105  else {
106  this->iiu.enableFlowControlRequest ( guard );
107  this->iiu.flowControlActive = true;
108  debugPrintf ( ( "fc on\n" ) );
109  }
110  }
111 
112  if ( echoLaborNeeded ) {
113  this->iiu.echoRequest ( guard );
114  }
115 
116  while ( nciu * pChan = this->iiu.createReqPend.get () ) {
117  this->iiu.createChannelRequest ( *pChan, guard );
118 
119  if ( CA_V42 ( this->iiu.minorProtocolVersion ) ) {
120  this->iiu.createRespPend.add ( *pChan );
121  pChan->channelNode::listMember =
122  channelNode::cs_createRespPend;
123  }
124  else {
125  // This wakes up the resp thread so that it can call
126  // the connect callback. This isnt maximally efficent
127  // but it has the excellent side effect of not requiring
128  // that the UDP thread take the callback lock. There are
129  // almost no V42 servers left at this point.
130  this->iiu.v42ConnCallbackPend.add ( *pChan );
131  pChan->channelNode::listMember =
132  channelNode::cs_v42ConnCallbackPend;
133  this->iiu.echoRequestPending = true;
134  laborPending = true;
135  }
136 
137  if ( this->iiu.sendQue.flushBlockThreshold () ) {
138  laborPending = true;
139  break;
140  }
141  }
142 
143  while ( nciu * pChan = this->iiu.subscripReqPend.get () ) {
144  // this installs any subscriptions as needed
145  pChan->resubscribe ( guard );
146  this->iiu.connectedList.add ( *pChan );
147  pChan->channelNode::listMember =
148  channelNode::cs_connected;
149  if ( this->iiu.sendQue.flushBlockThreshold () ) {
150  laborPending = true;
151  break;
152  }
153  }
154 
155  while ( nciu * pChan = this->iiu.subscripUpdateReqPend.get () ) {
156  // this updates any subscriptions as needed
157  pChan->sendSubscriptionUpdateRequests ( guard );
158  this->iiu.connectedList.add ( *pChan );
159  pChan->channelNode::listMember =
160  channelNode::cs_connected;
161  if ( this->iiu.sendQue.flushBlockThreshold () ) {
162  laborPending = true;
163  break;
164  }
165  }
166 
167  if ( ! this->iiu.sendThreadFlush ( guard ) ) {
168  break;
169  }
170  }
171  if ( this->iiu.state == tcpiiu::iiucs_clean_shutdown ) {
172  this->iiu.sendThreadFlush ( guard );
173  // this should cause the server to disconnect from
174  // the client
175  int status = ::shutdown ( this->iiu.sock, SHUT_WR );
176  if ( status ) {
177  char sockErrBuf[64];
179  sockErrBuf, sizeof ( sockErrBuf ) );
180  errlogPrintf ("CAC TCP clean socket shutdown error was %s\n",
181  sockErrBuf );
182  }
183  }
184  }
185  catch ( ... ) {
186  errlogPrintf (
187  "cac: tcp send thread received an unexpected exception "
188  "- disconnecting\n");
189  // this should cause the server to disconnect from
190  // the client
191  int status = ::shutdown ( this->iiu.sock, SHUT_WR );
192  if ( status ) {
193  char sockErrBuf[64];
195  sockErrBuf, sizeof ( sockErrBuf ) );
196  errlogPrintf ("CAC TCP clean socket shutdown error was %s\n",
197  sockErrBuf );
198  }
199  }
200 
201  this->iiu.sendDog.cancel ();
202  this->iiu.recvDog.shutdown ();
203 
204  while ( ! this->iiu.recvThread.exitWait ( 30.0 ) ) {
205  // it is possible to get stuck here if the user calls
206  // ca_context_destroy() when a circuit isnt known to
207  // be unresponsive, but is. That situation is probably
208  // rare, and the IP kernel might have a timeout for
209  // such situations, nevertheless we will attempt to deal
210  // with it here after waiting a reasonable amount of time
211  // for a clean shutdown to finish.
212  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
213  this->iiu.initiateAbortShutdown ( guard );
214  }
215 
216  // user threads blocking for send backlog to be reduced
217  // will abort their attempt to get space if
218  // the state of the tcpiiu changes from connected to a
219  // disconnecting state. Nevertheless, we need to wait
220  // for them to finish prior to destroying the IIU.
221  {
222  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
223  while ( this->iiu.blockingForFlush ) {
224  epicsGuardRelease < epicsMutex > unguard ( guard );
225  epicsThreadSleep ( 0.1 );
226  }
227  }
228  this->iiu.cacRef.destroyIIU ( this->iiu );
229 }
230 
231 unsigned tcpiiu::sendBytes ( const void *pBuf,
232  unsigned nBytesInBuf, const epicsTime & currentTime )
233 {
234  unsigned nBytes = 0u;
235  assert ( nBytesInBuf <= INT_MAX );
236 
237  this->sendDog.start ( currentTime );
238 
239  while ( true ) {
240  int status = ::send ( this->sock,
241  static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
242  if ( status > 0 ) {
243  nBytes = static_cast <unsigned> ( status );
244  // printf("SEND: %u\n", nBytes );
245  break;
246  }
247  else {
248  epicsGuard < epicsMutex > guard ( this->mutex );
249  if ( this->state != iiucs_connected &&
250  this->state != iiucs_clean_shutdown ) {
251  break;
252  }
253  // winsock indicates disconnect by returning zero here
254  if ( status == 0 ) {
255  this->disconnectNotify ( guard );
256  break;
257  }
258 
259  int localError = SOCKERRNO;
260 
261  if ( localError == SOCK_EINTR ) {
262  continue;
263  }
264 
265  if ( localError == SOCK_ENOBUFS ) {
266  errlogPrintf (
267  "CAC: system low on network buffers "
268  "- send retry in 15 seconds\n" );
269  {
270  epicsGuardRelease < epicsMutex > unguard ( guard );
271  epicsThreadSleep ( 15.0 );
272  }
273  continue;
274  }
275 
276  if (
277  localError != SOCK_EPIPE &&
278  localError != SOCK_ECONNRESET &&
279  localError != SOCK_ETIMEDOUT &&
280  localError != SOCK_ECONNABORTED &&
281  localError != SOCK_SHUTDOWN ) {
282  char sockErrBuf[64];
284  sockErrBuf, sizeof ( sockErrBuf ) );
285  errlogPrintf ( "CAC: unexpected TCP send error: %s\n",
286  sockErrBuf );
287  }
288 
289  this->disconnectNotify ( guard );
290  break;
291  }
292  }
293 
294  this->sendDog.cancel ();
295 
296  return nBytes;
297 }
298 
300  void * pBuf, unsigned nBytesInBuf, statusWireIO & stat )
301 {
302  assert ( nBytesInBuf <= INT_MAX );
303 
304  while ( true ) {
305  int status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
306  static_cast <int> ( nBytesInBuf ), 0 );
307 
308  if ( status > 0 ) {
309  stat.bytesCopied = static_cast <unsigned> ( status );
310  assert ( stat.bytesCopied <= nBytesInBuf );
312  return;
313  }
314  else {
315  epicsGuard < epicsMutex > guard ( this->mutex );
316 
317  if ( status == 0 ) {
318  this->disconnectNotify ( guard );
319  stat.bytesCopied = 0u;
321  return;
322  }
323 
324  // if the circuit was locally aborted then supress
325  // warning messages about bad file descriptor etc
326  if ( this->state != iiucs_connected &&
327  this->state != iiucs_clean_shutdown ) {
328  stat.bytesCopied = 0u;
330  return;
331  }
332 
333  int localErrno = SOCKERRNO;
334 
335  if ( localErrno == SOCK_SHUTDOWN ) {
336  stat.bytesCopied = 0u;
338  return;
339  }
340 
341  if ( localErrno == SOCK_EINTR ) {
342  continue;
343  }
344 
345  if ( localErrno == SOCK_ENOBUFS ) {
346  errlogPrintf (
347  "CAC: system low on network buffers "
348  "- receive retry in 15 seconds\n" );
349  {
350  epicsGuardRelease < epicsMutex > unguard ( guard );
351  epicsThreadSleep ( 15.0 );
352  }
353  continue;
354  }
355 
356  char sockErrBuf[64];
358  sockErrBuf, sizeof ( sockErrBuf ) );
359 
360  // the replacable printf handler isnt called here
361  // because it reqires a callback lock which probably
362  // isnt appropriate here
363  char name[64];
364  this->hostNameCacheInstance.getName (
365  name, sizeof ( name ) );
366  errlogPrintf (
367  "Unexpected problem with CA circuit to"
368  " server \"%s\" was \"%s\" - disconnecting\n",
369  name, sockErrBuf );
370 
371  stat.bytesCopied = 0u;
373  return;
374  }
375  }
376 }
377 
379  class tcpiiu & iiuIn, class epicsMutex & cbMutexIn,
380  cacContextNotify & ctxNotifyIn, const char * pName,
381  unsigned int stackSize, unsigned int priority ) :
382  thread ( *this, pName, stackSize, priority ),
383  iiu ( iiuIn ), cbMutex ( cbMutexIn ),
384  ctxNotify ( ctxNotifyIn ) {}
385 
387 {
388 }
389 
391 {
392  this->thread.start ();
393 }
394 
395 void tcpRecvThread::show ( unsigned /* level */ ) const
396 {
397 }
398 
399 bool tcpRecvThread::exitWait ( double delay )
400 {
401  return this->thread.exitWait ( delay );
402 }
403 
405 {
406  this->thread.exitWait ();
407 }
408 
409 bool tcpRecvThread::validFillStatus (
410  epicsGuard < epicsMutex > & guard, const statusWireIO & stat )
411 {
412  if ( this->iiu.state != tcpiiu::iiucs_connected &&
413  this->iiu.state != tcpiiu::iiucs_clean_shutdown ) {
414  return false;
415  }
416  if ( stat.circuitState == swioConnected ) {
417  return true;
418  }
419  if ( stat.circuitState == swioPeerHangup ||
420  stat.circuitState == swioPeerAbort ) {
421  this->iiu.disconnectNotify ( guard );
422  }
423  else if ( stat.circuitState == swioLinkFailure ) {
424  this->iiu.initiateAbortShutdown ( guard );
425  }
426  else if ( stat.circuitState == swioLocalAbort ) {
427  // state change already occurred
428  }
429  else {
430  errlogMessage ( "cac: invalid fill status - disconnecting" );
431  this->iiu.disconnectNotify ( guard );
432  }
433  return false;
434 }
435 
436 void tcpRecvThread::run ()
437 {
438  try {
439  {
440  bool connectSuccess = false;
441  {
442  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
443  this->connect ( guard );
444  connectSuccess = this->iiu.state == tcpiiu::iiucs_connected;
445  }
446  if ( ! connectSuccess ) {
447  this->iiu.recvDog.shutdown ();
448  this->iiu.cacRef.destroyIIU ( this->iiu );
449  return;
450  }
451  if ( this->iiu.isNameService () ) {
452  this->iiu.pSearchDest->setCircuit ( &this->iiu );
453  this->iiu.pSearchDest->enable ();
454  }
455  }
456 
457  this->iiu.sendThread.start ();
459  this->iiu.cacRef.attachToClientCtx ();
460 
461  comBuf * pComBuf = 0;
462  while ( true ) {
463 
464  //
465  // We leave the bytes pending and fetch them after
466  // callbacks are enabled when running in the old preemptive
467  // call back disabled mode so that asynchronous wakeup via
468  // file manager call backs works correctly. This does not
469  // appear to impact performance.
470  //
471  if ( ! pComBuf ) {
472  pComBuf = new ( this->iiu.comBufMemMgr ) comBuf;
473  }
474 
475  statusWireIO stat;
476  pComBuf->fillFromWire ( this->iiu, stat );
477 
478  epicsTime currentTime = epicsTime::getCurrent ();
479 
480  {
481  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
482 
483  if ( ! this->validFillStatus ( guard, stat ) ) {
484  break;
485  }
486  if ( stat.bytesCopied == 0u ) {
487  continue;
488  }
489 
490  this->iiu.recvQue.pushLastComBufReceived ( *pComBuf );
491  pComBuf = 0;
492 
493  this->iiu._receiveThreadIsBusy = true;
494  }
495 
496  bool sendWakeupNeeded = false;
497  {
498  // only one recv thread at a time may call callbacks
499  // - pendEvent() blocks until threads waiting for
500  // this lock get a chance to run
501  callbackManager mgr ( this->ctxNotify, this->cbMutex );
502 
503  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
504 
505  // route legacy V42 channel connect through the recv thread -
506  // the only thread that should be taking the callback lock
507  while ( nciu * pChan = this->iiu.v42ConnCallbackPend.first () ) {
508  this->iiu.connectNotify ( guard, *pChan );
509  pChan->connect ( mgr.cbGuard, guard );
510  }
511 
512  this->iiu.unacknowledgedSendBytes = 0u;
513 
514  bool protocolOK = false;
515  {
516  epicsGuardRelease < epicsMutex > unguard ( guard );
517  // execute receive labor
518  protocolOK = this->iiu.processIncoming ( currentTime, mgr );
519  }
520 
521  if ( ! protocolOK ) {
522  this->iiu.initiateAbortShutdown ( guard );
523  break;
524  }
525  this->iiu._receiveThreadIsBusy = false;
526  // reschedule connection activity watchdog
527  this->iiu.recvDog.messageArrivalNotify ( guard );
528  //
529  // if this thread has connected channels with subscriptions
530  // that need to be sent then wakeup the send thread
531  if ( this->iiu.subscripReqPend.count() ) {
532  sendWakeupNeeded = true;
533  }
534  }
535 
536  //
537  // we dont feel comfortable calling this with a lock applied
538  // (it might block for longer than we like)
539  //
540  // we would prefer to improve efficency by trying, first, a
541  // recv with the new MSG_DONTWAIT flag set, but there isnt
542  // universal support
543  //
544  bool bytesArePending = this->iiu.bytesArePendingInOS ();
545  {
546  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
547  if ( bytesArePending ) {
548  if ( ! this->iiu.busyStateDetected ) {
549  this->iiu.contigRecvMsgCount++;
550  if ( this->iiu.contigRecvMsgCount >=
551  this->iiu.cacRef.maxContiguousFrames ( guard ) ) {
552  this->iiu.busyStateDetected = true;
553  sendWakeupNeeded = true;
554  }
555  }
556  }
557  else {
558  // if no bytes are pending then we must immediately
559  // switch off flow control w/o waiting for more
560  // data to arrive
561  this->iiu.contigRecvMsgCount = 0u;
562  if ( this->iiu.busyStateDetected ) {
563  sendWakeupNeeded = true;
564  this->iiu.busyStateDetected = false;
565  }
566  }
567  }
568 
569  if ( sendWakeupNeeded ) {
570  this->iiu.sendThreadFlushEvent.signal ();
571  }
572  }
573 
574  if ( pComBuf ) {
575  pComBuf->~comBuf ();
576  this->iiu.comBufMemMgr.release ( pComBuf );
577  }
578  }
579  catch ( std::bad_alloc & ) {
580  errlogPrintf (
581  "CA client library tcp receive thread "
582  "terminating due to no space in pool "
583  "C++ exception\n" );
584  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
585  this->iiu.initiateCleanShutdown ( guard );
586  }
587  catch ( std::exception & except ) {
588  errlogPrintf (
589  "CA client library tcp receive thread "
590  "terminating due to C++ exception \"%s\"\n",
591  except.what () );
592  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
593  this->iiu.initiateCleanShutdown ( guard );
594  }
595  catch ( ... ) {
596  errlogPrintf (
597  "CA client library tcp receive thread "
598  "terminating due to a non-standard C++ exception\n" );
599  epicsGuard < epicsMutex > guard ( this->iiu.mutex );
600  this->iiu.initiateCleanShutdown ( guard );
601  }
602 }
603 
604 /*
605  * tcpRecvThread::connect ()
606  */
607 void tcpRecvThread::connect (
608  epicsGuard < epicsMutex > & guard )
609 {
610  // attempt to connect to a CA server
611  while ( true ) {
612  int status;
613  {
614  epicsGuardRelease < epicsMutex > unguard ( guard );
615  osiSockAddr tmp = this->iiu.address ();
616  status = ::connect ( this->iiu.sock,
617  & tmp.sa, sizeof ( tmp.sa ) );
618  }
619 
620  if ( this->iiu.state != tcpiiu::iiucs_connecting ) {
621  break;
622  }
623  if ( status >= 0 ) {
624  // put the iiu into the connected state
625  this->iiu.state = tcpiiu::iiucs_connected;
626  this->iiu.recvDog.connectNotify ( guard );
627  break;
628  }
629  else {
630  int errnoCpy = SOCKERRNO;
631 
632  if ( errnoCpy == SOCK_EINTR ) {
633  continue;
634  }
635  else if ( errnoCpy == SOCK_SHUTDOWN ) {
636  if ( ! this->iiu.isNameService () ) {
637  break;
638  }
639  }
640  else {
641  char sockErrBuf[64];
643  sockErrBuf, sizeof ( sockErrBuf ) );
644  errlogPrintf ( "CAC: Unable to connect because \"%s\"\n",
645  sockErrBuf );
646  if ( ! this->iiu.isNameService () ) {
647  this->iiu.disconnectNotify ( guard );
648  break;
649  }
650  }
651  {
652  double sleepTime = this->iiu.cacRef.connectionTimeout ( guard );
653  epicsGuardRelease < epicsMutex > unguard ( guard );
654  epicsThreadSleep ( sleepTime );
655  }
656  continue;
657  }
658  }
659  return;
660 }
661 
662 //
663 // tcpiiu::tcpiiu ()
664 //
666  cac & cac, epicsMutex & mutexIn, epicsMutex & cbMutexIn,
667  cacContextNotify & ctxNotifyIn, double connectionTimeout,
668  epicsTimerQueue & timerQueue, const osiSockAddr & addrIn,
669  comBufMemoryManager & comBufMemMgrIn,
670  unsigned minorVersion, ipAddrToAsciiEngine & engineIn,
671  const cacChannel::priLev & priorityIn,
672  SearchDestTCP * pSearchDestIn ) :
673  caServerID ( addrIn.ia, priorityIn ),
674  hostNameCacheInstance ( addrIn, engineIn ),
675  recvThread ( *this, cbMutexIn, ctxNotifyIn, "CAC-TCP-recv",
677  cac::highestPriorityLevelBelow ( cac.getInitializingThreadsPriority() ) ),
678  sendThread ( *this, "CAC-TCP-send",
680  cac::lowestPriorityLevelAbove (
681  cac.getInitializingThreadsPriority() ) ),
682  recvDog ( cbMutexIn, ctxNotifyIn, mutexIn,
683  *this, connectionTimeout, timerQueue ),
684  sendDog ( cbMutexIn, ctxNotifyIn, mutexIn,
685  *this, connectionTimeout, timerQueue ),
686  sendQue ( *this, comBufMemMgrIn ),
687  recvQue ( comBufMemMgrIn ),
688  curDataMax ( MAX_TCP ),
689  curDataBytes ( 0ul ),
690  comBufMemMgr ( comBufMemMgrIn ),
691  cacRef ( cac ),
692  pCurData ( (char*) freeListMalloc(this->cacRef.tcpSmallRecvBufFreeList) ),
693  pSearchDest ( pSearchDestIn ),
694  mutex ( mutexIn ),
695  cbMutex ( cbMutexIn ),
696  minorProtocolVersion ( minorVersion ),
697  state ( iiucs_connecting ),
698  sock ( INVALID_SOCKET ),
699  contigRecvMsgCount ( 0u ),
700  blockingForFlush ( 0u ),
701  socketLibrarySendBufferSize ( 0x1000 ),
702  unacknowledgedSendBytes ( 0u ),
703  channelCountTot ( 0u ),
704  _receiveThreadIsBusy ( false ),
705  busyStateDetected ( false ),
706  flowControlActive ( false ),
707  echoRequestPending ( false ),
708  oldMsgHeaderAvailable ( false ),
709  msgHeaderAvailable ( false ),
710  earlyFlush ( false ),
711  recvProcessPostponedFlush ( false ),
712  discardingPendingData ( false ),
713  socketHasBeenClosed ( false ),
714  unresponsiveCircuit ( false )
715 {
716  if(!pCurData)
717  throw std::bad_alloc();
718 
719  this->sock = epicsSocketCreate ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
720  if ( this->sock == INVALID_SOCKET ) {
721  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
722  char sockErrBuf[64];
724  sockErrBuf, sizeof ( sockErrBuf ) );
725  std :: string reason =
726  "CAC: TCP circuit creation failure because \"";
727  reason += sockErrBuf;
728  reason += "\"";
729  throw runtime_error ( reason );
730  }
731 
732  int flag = true;
733  int status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY,
734  (char *) &flag, sizeof ( flag ) );
735  if ( status < 0 ) {
736  char sockErrBuf[64];
738  sockErrBuf, sizeof ( sockErrBuf ) );
739  errlogPrintf ( "CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
740  sockErrBuf );
741  }
742 
743  flag = true;
744  status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE,
745  ( char * ) &flag, sizeof ( flag ) );
746  if ( status < 0 ) {
747  char sockErrBuf[64];
749  sockErrBuf, sizeof ( sockErrBuf ) );
750  errlogPrintf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n",
751  sockErrBuf );
752  }
753 
754  // load message queue with messages informing server
755  // of version, user, and host name of client
756  {
757  epicsGuard < epicsMutex > guard ( this->mutex );
758  this->versionMessage ( guard, this->priority() );
759  this->userNameSetRequest ( guard );
760  this->hostNameSetRequest ( guard );
761  }
762 
763 # if 0
764  {
765  int i;
766 
767  /*
768  * some concern that vxWorks will run out of mBuf's
769  * if this change is made joh 11-10-98
770  */
771  i = MAX_MSG_SIZE;
772  status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF,
773  ( char * ) &i, sizeof ( i ) );
774  if (status < 0) {
775  char sockErrBuf[64];
776  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
777  errlogPrintf ( "CAC: problems setting socket option SO_SNDBUF = \"%s\"\n",
778  sockErrBuf );
779  }
780  i = MAX_MSG_SIZE;
781  status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF,
782  ( char * ) &i, sizeof ( i ) );
783  if ( status < 0 ) {
784  char sockErrBuf[64];
785  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
786  errlogPrintf ( "CAC: problems setting socket option SO_RCVBUF = \"%s\"\n",
787  sockErrBuf );
788  }
789  }
790 # endif
791 
792  {
793  int nBytes;
794  osiSocklen_t sizeOfParameter = static_cast < int > ( sizeof ( nBytes ) );
795  status = getsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF,
796  ( char * ) &nBytes, &sizeOfParameter );
797  if ( status < 0 || nBytes < 0 ||
798  sizeOfParameter != static_cast < int > ( sizeof ( nBytes ) ) ) {
799  char sockErrBuf[64];
801  sockErrBuf, sizeof ( sockErrBuf ) );
802  errlogPrintf ("CAC: problems getting socket option SO_SNDBUF = \"%s\"\n",
803  sockErrBuf );
804  }
805  else {
806  this->socketLibrarySendBufferSize = static_cast < unsigned > ( nBytes );
807  }
808  }
809 
810  if ( isNameService() ) {
811  pSearchDest->setCircuit ( this );
812  }
813 
814  memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
815 }
816 
817 // this must always be called by the udp thread when it holds
818 // the callback lock.
820  epicsGuard < epicsMutex > & guard )
821 {
822  guard.assertIdenticalMutex ( this->mutex );
823  this->recvThread.start ();
824 }
825 
827  epicsGuard < epicsMutex > & guard )
828 {
829  guard.assertIdenticalMutex ( this->mutex );
830 
831  if ( this->state == iiucs_connected ) {
832  if ( this->unresponsiveCircuit ) {
833  this->initiateAbortShutdown ( guard );
834  }
835  else {
836  this->state = iiucs_clean_shutdown;
837  this->sendThreadFlushEvent.signal ();
838  this->flushBlockEvent.signal ();
839  }
840  }
841  else if ( this->state == iiucs_clean_shutdown ) {
842  if ( this->unresponsiveCircuit ) {
843  this->initiateAbortShutdown ( guard );
844  }
845  }
846  else if ( this->state == iiucs_connecting ) {
847  this->initiateAbortShutdown ( guard );
848  }
849 }
850 
852  epicsGuard < epicsMutex > & guard )
853 {
854  guard.assertIdenticalMutex ( this->mutex );
855  this->state = iiucs_disconnected;
856  this->sendThreadFlushEvent.signal ();
857  this->flushBlockEvent.signal ();
858 }
859 
861  epicsGuard < epicsMutex > & cbGuard,
862  epicsGuard < epicsMutex > & guard )
863 {
864  cbGuard.assertIdenticalMutex ( this->cbMutex );
865  guard.assertIdenticalMutex ( this->mutex );
866  if ( this->unresponsiveCircuit ) {
867  this->unresponsiveCircuit = false;
868  while ( nciu * pChan = this->unrespCircuit.get() ) {
869  this->subscripUpdateReqPend.add ( *pChan );
870  pChan->channelNode::listMember =
871  channelNode::cs_subscripUpdateReqPend;
872  pChan->connect ( cbGuard, guard );
873  }
874  this->sendThreadFlushEvent.signal ();
875  }
876 }
877 
879  callbackManager & mgr,
880  epicsGuard < epicsMutex > & guard )
881 {
882  mgr.cbGuard.assertIdenticalMutex ( this-> cbMutex );
883  guard.assertIdenticalMutex ( this->mutex );
884  this->unresponsiveCircuitNotify ( mgr.cbGuard, guard );
885  // setup circuit probe sequence
886  this->recvDog.sendTimeoutNotify ( mgr.cbGuard, guard );
887 }
888 
890  callbackManager & mgr,
891  epicsGuard < epicsMutex > & guard )
892 {
893  mgr.cbGuard.assertIdenticalMutex ( this->cbMutex );
894  guard.assertIdenticalMutex ( this->mutex );
895  this->unresponsiveCircuitNotify ( mgr.cbGuard, guard );
896 }
897 
899  epicsGuard < epicsMutex > & cbGuard,
900  epicsGuard < epicsMutex > & guard )
901 {
902  cbGuard.assertIdenticalMutex ( this->cbMutex );
903  guard.assertIdenticalMutex ( this->mutex );
904 
905  if ( ! this->unresponsiveCircuit ) {
906  this->unresponsiveCircuit = true;
907  this->echoRequestPending = true;
908  this->sendThreadFlushEvent.signal ();
909  this->flushBlockEvent.signal ();
910 
911  // must not hold lock when canceling timer
912  {
913  epicsGuardRelease < epicsMutex > unguard ( guard );
914  {
915  epicsGuardRelease < epicsMutex > cbUnguard ( cbGuard );
916  this->recvDog.cancel ();
917  this->sendDog.cancel ();
918  }
919  }
920 
921  if ( this->connectedList.count() ) {
922  char hostNameTmp[128];
923  this->getHostName ( guard, hostNameTmp, sizeof ( hostNameTmp ) );
924  genLocalExcep ( cbGuard, guard, this->cacRef,
925  ECA_UNRESPTMO, hostNameTmp );
926  while ( nciu * pChan = this->connectedList.get () ) {
927  // The cac lock is released herein so there is concern that
928  // the list could be changed while we are traversing it.
929  // However, this occurs only if a circuit disconnects,
930  // a user deletes a channel, or a server disconnects a
931  // channel. The callback lock must be taken in all of
932  // these situations so this code is protected.
933  this->unrespCircuit.add ( *pChan );
934  pChan->channelNode::listMember =
935  channelNode::cs_unrespCircuit;
936  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
937  }
938  }
939  }
940 }
941 
943  epicsGuard < epicsMutex > & guard )
944 {
945  guard.assertIdenticalMutex ( this->mutex );
946 
947  if ( ! this->discardingPendingData ) {
948  // force abortive shutdown sequence
949  // (discard outstanding sends and receives)
950  struct linger tmpLinger;
951  tmpLinger.l_onoff = true;
952  tmpLinger.l_linger = 0u;
953  int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
954  reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
955  if ( status != 0 ) {
956  char sockErrBuf[64];
958  sockErrBuf, sizeof ( sockErrBuf ) );
959  errlogPrintf ( "CAC TCP socket linger set error was %s\n",
960  sockErrBuf );
961  }
962  this->discardingPendingData = true;
963  }
964 
965  iiu_conn_state oldState = this->state;
966  if ( oldState != iiucs_abort_shutdown && oldState != iiucs_disconnected ) {
967  this->state = iiucs_abort_shutdown;
968 
971  switch ( info ) {
973  //
974  // on winsock and probably vxWorks shutdown() does not
975  // unblock a thread in recv() so we use close() and introduce
976  // some complexity because we must unregister the fd early
977  //
978  if ( ! this->socketHasBeenClosed ) {
979  epicsSocketDestroy ( this->sock );
980  this->socketHasBeenClosed = true;
981  }
982  break;
984  {
985  int status = ::shutdown ( this->sock, SHUT_RDWR );
986  if ( status ) {
987  char sockErrBuf[64];
989  sockErrBuf, sizeof ( sockErrBuf ) );
990  errlogPrintf ("CAC TCP socket shutdown error was %s\n",
991  sockErrBuf );
992  }
993  }
994  break;
996  this->recvThread.interruptSocketRecv ();
998  break;
999  default:
1000  break;
1001  };
1002 
1003  //
1004  // wake up the send thread if it isnt blocking in send()
1005  //
1006  this->sendThreadFlushEvent.signal ();
1007  this->flushBlockEvent.signal ();
1008  }
1009 }
1010 
1011 //
1012 // tcpiiu::~tcpiiu ()
1013 //
1015 {
1016  if ( this->pSearchDest ) {
1017  this->pSearchDest->disable ();
1018  }
1019 
1020  this->sendThread.exitWait ();
1021  this->recvThread.exitWait ();
1022  this->sendDog.cancel ();
1023  this->recvDog.shutdown ();
1024 
1025  if ( ! this->socketHasBeenClosed ) {
1026  epicsSocketDestroy ( this->sock );
1027  }
1028 
1029  // free message body cache
1030  if ( this->pCurData ) {
1031  if ( this->curDataMax <= MAX_TCP ) {
1032  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData);
1033  }
1034  else if ( this->cacRef.tcpLargeRecvBufFreeList ) {
1035  freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData);
1036  }
1037  else {
1038  free ( this->pCurData );
1039  }
1040  }
1041 }
1042 
1043 void tcpiiu::show ( unsigned level ) const
1044 {
1045  epicsGuard < epicsMutex > locker ( this->mutex );
1046  char buf[256];
1047  this->hostNameCacheInstance.getName ( buf, sizeof ( buf ) );
1048  ::printf ( "Virtual circuit to \"%s\" at version V%u.%u state %u\n",
1050  this->minorProtocolVersion, this->state );
1051  if ( level > 1u ) {
1052  ::printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n",
1053  static_cast < void * > ( this->pCurData ), this->curDataMax );
1054  ::printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n",
1056  ::printf ( "\receive thread is busy=%u\n",
1057  this->_receiveThreadIsBusy );
1058  }
1059  if ( level > 2u ) {
1060  ::printf ( "\tvirtual circuit socket identifier %d\n", this->sock );
1061  ::printf ( "\tsend thread flush signal:\n" );
1062  this->sendThreadFlushEvent.show ( level-2u );
1063  ::printf ( "\tsend thread:\n" );
1064  this->sendThread.show ( level-2u );
1065  ::printf ( "\trecv thread:\n" );
1066  this->recvThread.show ( level-2u );
1067  ::printf ("\techo pending bool = %u\n", this->echoRequestPending );
1068  ::printf ( "IO identifier hash table:\n" );
1069 
1070  if ( this->createReqPend.count () ) {
1071  ::printf ( "Create request pending channels\n" );
1073  while ( pChan.valid () ) {
1074  pChan->show ( level - 2u );
1075  pChan++;
1076  }
1077  }
1078  if ( this->createRespPend.count () ) {
1079  ::printf ( "Create response pending channels\n" );
1081  while ( pChan.valid () ) {
1082  pChan->show ( level - 2u );
1083  pChan++;
1084  }
1085  }
1086  if ( this->v42ConnCallbackPend.count () ) {
1087  ::printf ( "V42 Conn Callback pending channels\n" );
1089  while ( pChan.valid () ) {
1090  pChan->show ( level - 2u );
1091  pChan++;
1092  }
1093  }
1094  if ( this->subscripReqPend.count () ) {
1095  ::printf ( "Subscription request pending channels\n" );
1097  while ( pChan.valid () ) {
1098  pChan->show ( level - 2u );
1099  pChan++;
1100  }
1101  }
1102  if ( this->connectedList.count () ) {
1103  ::printf ( "Connected channels\n" );
1105  while ( pChan.valid () ) {
1106  pChan->show ( level - 2u );
1107  pChan++;
1108  }
1109  }
1110  if ( this->unrespCircuit.count () ) {
1111  ::printf ( "Unresponsive circuit channels\n" );
1113  while ( pChan.valid () ) {
1114  pChan->show ( level - 2u );
1115  pChan++;
1116  }
1117  }
1118  }
1119 }
1120 
1122 {
1123  guard.assertIdenticalMutex ( this->mutex );
1124 
1125  this->echoRequestPending = true;
1126  this->sendThreadFlushEvent.signal ();
1127  if ( CA_V43 ( this->minorProtocolVersion ) ) {
1128  // we send an echo
1129  return true;
1130  }
1131  else {
1132  // we send a NOOP
1133  return false;
1134  }
1135 }
1136 
1138  epicsGuard < epicsMutex > & guard )
1139 {
1140  if ( this->recvProcessPostponedFlush ) {
1141  this->flushRequest ( guard );
1142  this->recvProcessPostponedFlush = false;
1143  }
1144 }
1145 
1147  const epicsTime & currentTime,
1148  callbackManager & mgr )
1149 {
1150  mgr.cbGuard.assertIdenticalMutex ( this->cbMutex );
1151 
1152  while ( true ) {
1153 
1154  //
1155  // fetch a complete message header
1156  //
1157  if ( ! this->msgHeaderAvailable ) {
1158  if ( ! this->oldMsgHeaderAvailable ) {
1159  this->oldMsgHeaderAvailable =
1160  this->recvQue.popOldMsgHeader ( this->curMsg );
1161  if ( ! this->oldMsgHeaderAvailable ) {
1162  epicsGuard < epicsMutex > guard ( this->mutex );
1163  this->flushIfRecvProcessRequested ( guard );
1164  return true;
1165  }
1166  }
1167  if ( this->curMsg.m_postsize == 0xffff ) {
1168  static const unsigned annexSize =
1169  sizeof ( this->curMsg.m_postsize ) +
1170  sizeof ( this->curMsg.m_count );
1171  if ( this->recvQue.occupiedBytes () < annexSize ) {
1172  epicsGuard < epicsMutex > guard ( this->mutex );
1173  this->flushIfRecvProcessRequested ( guard );
1174  return true;
1175  }
1176  this->curMsg.m_postsize = this->recvQue.popUInt32 ();
1177  this->curMsg.m_count = this->recvQue.popUInt32 ();
1178  }
1179  this->msgHeaderAvailable = true;
1180 # ifdef DEBUG
1181  epicsGuard < epicsMutex > guard ( this->mutex );
1182  debugPrintf (
1183  ( "%s Cmd=%3u Type=%3u Count=%8u Size=%8u",
1184  this->pHostName ( guard ),
1185  this->curMsg.m_cmmd,
1186  this->curMsg.m_dataType,
1187  this->curMsg.m_count,
1188  this->curMsg.m_postsize) );
1189  debugPrintf (
1190  ( " Avail=%8u Cid=%8u\n",
1191  this->curMsg.m_available,
1192  this->curMsg.m_cid) );
1193 # endif
1194  }
1195 
1196  // check for 8 byte aligned protocol
1197  if ( this->curMsg.m_postsize & 0x7 ) {
1198  this->printFormated ( mgr.cbGuard,
1199  "CAC: server sent missaligned payload 0x%x\n",
1200  this->curMsg.m_postsize );
1201  return false;
1202  }
1203 
1204  //
1205  // make sure we have a large enough message body cache
1206  //
1207  if ( this->curMsg.m_postsize > this->curDataMax ) {
1208  assert (this->curMsg.m_postsize > MAX_TCP);
1209 
1210  char * newbuf = NULL;
1211  arrayElementCount newsize;
1212 
1213  if ( !this->cacRef.tcpLargeRecvBufFreeList ) {
1214  // round size up to multiple of 4K
1215  newsize = ((this->curMsg.m_postsize-1)|0xfff)+1;
1216 
1217  if ( this->curDataMax <= MAX_TCP ) {
1218  // small -> large
1219  newbuf = (char*)malloc(newsize);
1220 
1221  } else {
1222  // expand large to larger
1223  newbuf = (char*)realloc(this->pCurData, newsize);
1224  }
1225 
1226  } else if ( this->curMsg.m_postsize <= this->cacRef.maxRecvBytesTCP ) {
1227  newbuf = (char*) freeListMalloc(this->cacRef.tcpLargeRecvBufFreeList);
1228  newsize = this->cacRef.maxRecvBytesTCP;
1229 
1230  }
1231 
1232  if ( newbuf) {
1233  if (this->curDataMax <= MAX_TCP) {
1234  freeListFree(this->cacRef.tcpSmallRecvBufFreeList, this->pCurData );
1235 
1236  } else if (this->cacRef.tcpLargeRecvBufFreeList) {
1237  freeListFree(this->cacRef.tcpLargeRecvBufFreeList, this->pCurData );
1238 
1239  } else {
1240  // called realloc()
1241  }
1242  this->pCurData = newbuf;
1243  this->curDataMax = newsize;
1244 
1245  } else {
1246  this->printFormated ( mgr.cbGuard,
1247  "CAC: not enough memory for message body cache (ignoring response message)\n");
1248  }
1249  }
1250 
1251  if ( this->curMsg.m_postsize <= this->curDataMax ) {
1252  if ( this->curMsg.m_postsize > 0u ) {
1253  this->curDataBytes += this->recvQue.copyOutBytes (
1254  &this->pCurData[this->curDataBytes],
1255  this->curMsg.m_postsize - this->curDataBytes );
1256  if ( this->curDataBytes < this->curMsg.m_postsize ) {
1257  epicsGuard < epicsMutex > guard ( this->mutex );
1258  this->flushIfRecvProcessRequested ( guard );
1259  return true;
1260  }
1261  }
1262  bool msgOK = this->cacRef.executeResponse ( mgr, *this,
1263  currentTime, this->curMsg, this->pCurData );
1264  if ( ! msgOK ) {
1265  return false;
1266  }
1267  }
1268  else {
1269  static bool once = false;
1270  if ( ! once ) {
1271  this->printFormated ( mgr.cbGuard,
1272  "CAC: response with payload size=%u > EPICS_CA_MAX_ARRAY_BYTES ignored\n",
1273  this->curMsg.m_postsize );
1274  once = true;
1275  }
1276  this->curDataBytes += this->recvQue.removeBytes (
1277  this->curMsg.m_postsize - this->curDataBytes );
1278  if ( this->curDataBytes < this->curMsg.m_postsize ) {
1279  epicsGuard < epicsMutex > guard ( this->mutex );
1280  this->flushIfRecvProcessRequested ( guard );
1281  return true;
1282  }
1283  }
1284 
1285  this->oldMsgHeaderAvailable = false;
1286  this->msgHeaderAvailable = false;
1287  this->curDataBytes = 0u;
1288  }
1289 }
1290 
1292 {
1293  guard.assertIdenticalMutex ( this->mutex );
1294 
1295  if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
1296  return;
1297  }
1298 
1299  const char * pName = this->cacRef.pLocalHostName ();
1300  unsigned size = strlen ( pName ) + 1u;
1301  unsigned postSize = CA_MESSAGE_ALIGN ( size );
1302  assert ( postSize < 0xffff );
1303 
1304  if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) {
1305  this->flushRequest ( guard );
1306  }
1307 
1308  comQueSendMsgMinder minder ( this->sendQue, guard );
1310  CA_PROTO_HOST_NAME, postSize,
1311  0u, 0u, 0u, 0u,
1312  CA_V49 ( this->minorProtocolVersion ) );
1313  this->sendQue.pushString ( pName, size );
1314  this->sendQue.pushString ( cacNillBytes, postSize - size );
1315  minder.commit ();
1316 }
1317 
1318 /*
1319  * tcpiiu::userNameSetRequest ()
1320  */
1322 {
1323  guard.assertIdenticalMutex ( this->mutex );
1324 
1325  if ( ! CA_V41 ( this->minorProtocolVersion ) ) {
1326  return;
1327  }
1328 
1329  const char *pName = this->cacRef.userNamePointer ();
1330  unsigned size = strlen ( pName ) + 1u;
1331  unsigned postSize = CA_MESSAGE_ALIGN ( size );
1332  assert ( postSize < 0xffff );
1333 
1334  if ( this->sendQue.flushEarlyThreshold ( postSize + 16u ) ) {
1335  this->flushRequest ( guard );
1336  }
1337 
1338  comQueSendMsgMinder minder ( this->sendQue, guard );
1340  CA_PROTO_CLIENT_NAME, postSize,
1341  0u, 0u, 0u, 0u,
1342  CA_V49 ( this->minorProtocolVersion ) );
1343  this->sendQue.pushString ( pName, size );
1344  this->sendQue.pushString ( cacNillBytes, postSize - size );
1345  minder.commit ();
1346 }
1347 
1349  epicsGuard < epicsMutex > & guard )
1350 {
1351  guard.assertIdenticalMutex ( this->mutex );
1352 
1353  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1354  this->flushRequest ( guard );
1355  }
1356  comQueSendMsgMinder minder ( this->sendQue, guard );
1358  CA_PROTO_EVENTS_ON, 0u,
1359  0u, 0u, 0u, 0u,
1360  CA_V49 ( this->minorProtocolVersion ) );
1361  minder.commit ();
1362 }
1363 
1365  epicsGuard < epicsMutex > & guard )
1366 {
1367  guard.assertIdenticalMutex ( this->mutex );
1368 
1369  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1370  this->flushRequest ( guard );
1371  }
1372  comQueSendMsgMinder minder ( this->sendQue, guard );
1374  CA_PROTO_EVENTS_OFF, 0u,
1375  0u, 0u, 0u, 0u,
1376  CA_V49 ( this->minorProtocolVersion ) );
1377  minder.commit ();
1378 }
1379 
1381  const cacChannel::priLev & priority )
1382 {
1383  guard.assertIdenticalMutex ( this->mutex );
1384 
1385  assert ( priority <= 0xffff );
1386 
1387  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1388  this->flushRequest ( guard );
1389  }
1390 
1391  comQueSendMsgMinder minder ( this->sendQue, guard );
1393  CA_PROTO_VERSION, 0u,
1394  static_cast < ca_uint16_t > ( priority ),
1396  CA_V49 ( this->minorProtocolVersion ) );
1397  minder.commit ();
1398 }
1399 
1401 {
1402  guard.assertIdenticalMutex ( this->mutex );
1403 
1404  epicsUInt16 command = CA_PROTO_ECHO;
1405  if ( ! CA_V43 ( this->minorProtocolVersion ) ) {
1406  // we fake an echo to early server using a read sync
1407  command = CA_PROTO_READ_SYNC;
1408  }
1409 
1410  if ( this->sendQue.flushEarlyThreshold ( 16u ) ) {
1411  this->flushRequest ( guard );
1412  }
1413  comQueSendMsgMinder minder ( this->sendQue, guard );
1415  command, 0u,
1416  0u, 0u, 0u, 0u,
1417  CA_V49 ( this->minorProtocolVersion ) );
1418  minder.commit ();
1419 }
1420 
1422  nciu &chan, unsigned type, arrayElementCount nElem, const void *pValue )
1423 {
1424  guard.assertIdenticalMutex ( this->mutex );
1425  if ( INVALID_DB_REQ ( type ) ) {
1426  throw cacChannel::badType ();
1427  }
1428  comQueSendMsgMinder minder ( this->sendQue, guard );
1430  type, nElem, chan.getSID(guard), chan.getCID(guard), pValue,
1431  CA_V49 ( this->minorProtocolVersion ) );
1432  minder.commit ();
1433 }
1434 
1435 
1437  nciu &chan, netWriteNotifyIO &io, unsigned type,
1438  arrayElementCount nElem, const void *pValue )
1439 {
1440  guard.assertIdenticalMutex ( this->mutex );
1441 
1442  if ( ! this->ca_v41_ok ( guard ) ) {
1444  }
1445  if ( INVALID_DB_REQ ( type ) ) {
1446  throw cacChannel::badType ();
1447  }
1448  comQueSendMsgMinder minder ( this->sendQue, guard );
1450  type, nElem, chan.getSID(guard), io.getId(), pValue,
1451  CA_V49 ( this->minorProtocolVersion ) );
1452  minder.commit ();
1453 }
1454 
1456  nciu & chan, netReadNotifyIO & io,
1457  unsigned dataType, arrayElementCount nElem )
1458 {
1459  guard.assertIdenticalMutex ( this->mutex );
1460  if ( INVALID_DB_REQ ( dataType ) ) {
1461  throw cacChannel::badType ();
1462  }
1463  arrayElementCount maxBytes;
1464  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1465  maxBytes = 0xfffffff0;
1466  }
1467  else {
1468  maxBytes = MAX_TCP;
1469  }
1470  arrayElementCount maxElem =
1471  ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1472  if ( nElem > maxElem ) {
1474  }
1475  if (nElem == 0 && !CA_V413(this->minorProtocolVersion))
1476  nElem = chan.getcount();
1477  comQueSendMsgMinder minder ( this->sendQue, guard );
1480  static_cast < ca_uint16_t > ( dataType ),
1481  static_cast < ca_uint32_t > ( nElem ),
1482  chan.getSID(guard), io.getId(),
1483  CA_V49 ( this->minorProtocolVersion ) );
1484  minder.commit ();
1485 }
1486 
1488  nciu & chan, epicsGuard < epicsMutex > & guard )
1489 {
1490  guard.assertIdenticalMutex ( this->mutex );
1491 
1492  if ( this->state != iiucs_connected &&
1493  this->state != iiucs_connecting ) {
1494  return;
1495  }
1496 
1497  const char *pName;
1498  unsigned nameLength;
1499  ca_uint32_t identity;
1500  if ( this->ca_v44_ok ( guard ) ) {
1501  identity = chan.getCID ( guard );
1502  pName = chan.pName ( guard );
1503  nameLength = chan.nameLen ( guard );
1504  }
1505  else {
1506  identity = chan.getSID ( guard );
1507  pName = 0;
1508  nameLength = 0u;
1509  }
1510 
1511  unsigned postCnt = CA_MESSAGE_ALIGN ( nameLength );
1512 
1513  if ( postCnt >= 0xffff ) {
1515  }
1516 
1517  comQueSendMsgMinder minder ( this->sendQue, guard );
1518  //
1519  // The available field is used (abused)
1520  // here to communicate the minor version number
1521  // starting with CA 4.1.
1522  //
1524  CA_PROTO_CREATE_CHAN, postCnt,
1525  0u, 0u, identity, CA_MINOR_PROTOCOL_REVISION,
1526  CA_V49 ( this->minorProtocolVersion ) );
1527  if ( nameLength ) {
1528  this->sendQue.pushString ( pName, nameLength );
1529  }
1530  if ( postCnt > nameLength ) {
1531  this->sendQue.pushString ( cacNillBytes, postCnt - nameLength );
1532  }
1533  minder.commit ();
1534 }
1535 
1537  ca_uint32_t sid, ca_uint32_t cid )
1538 {
1539  guard.assertIdenticalMutex ( this->mutex );
1540  // there are situations where the circuit is disconnected, but
1541  // the channel does not know this yet
1542  if ( this->state != iiucs_connected ) {
1543  return;
1544  }
1545  comQueSendMsgMinder minder ( this->sendQue, guard );
1548  0u, 0u, sid, cid,
1549  CA_V49 ( this->minorProtocolVersion ) );
1550  minder.commit ();
1551 }
1552 
1553 //
1554 // this routine return void because if this internally fails the best response
1555 // is to try again the next time that we reconnect
1556 //
1558  epicsGuard < epicsMutex > & guard,
1559  nciu & chan, netSubscription & subscr )
1560 {
1561  guard.assertIdenticalMutex ( this->mutex );
1562  // there are situations where the circuit is disconnected, but
1563  // the channel does not know this yet
1564  if ( this->state != iiucs_connected &&
1565  this->state != iiucs_connecting ) {
1566  return;
1567  }
1568  unsigned mask = subscr.getMask(guard);
1569  if ( mask > 0xffff ) {
1571  }
1572  arrayElementCount nElem = subscr.getCount (
1573  guard, CA_V413(this->minorProtocolVersion) );
1574  arrayElementCount maxBytes;
1575  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1576  maxBytes = 0xfffffff0;
1577  }
1578  else {
1579  maxBytes = MAX_TCP;
1580  }
1581  unsigned dataType = subscr.getType ( guard );
1582  // data type bounds checked when sunscription created
1583  arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1584  if ( nElem > maxElem ) {
1586  }
1587  comQueSendMsgMinder minder ( this->sendQue, guard );
1588  // nElement bounds checked above
1590  CA_PROTO_EVENT_ADD, 16u,
1591  static_cast < ca_uint16_t > ( dataType ),
1592  static_cast < ca_uint32_t > ( nElem ),
1593  chan.getSID(guard), subscr.getId(),
1594  CA_V49 ( this->minorProtocolVersion ) );
1595 
1596  // extension
1597  this->sendQue.pushFloat32 ( 0.0f ); // m_lval
1598  this->sendQue.pushFloat32 ( 0.0f ); // m_hval
1599  this->sendQue.pushFloat32 ( 0.0f ); // m_toval
1600  this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( mask ) ); // m_mask
1601  this->sendQue.pushUInt16 ( 0u ); // m_pad
1602  minder.commit ();
1603 }
1604 
1605 //
1606 // this routine return void because if this internally fails the best response
1607 // is to try again the next time that we reconnect
1608 //
1610  epicsGuard < epicsMutex > & guard,
1611  nciu & chan, netSubscription & subscr )
1612 {
1613  guard.assertIdenticalMutex ( this->mutex );
1614  // there are situations where the circuit is disconnected, but
1615  // the channel does not know this yet
1616  if ( this->state != iiucs_connected ) {
1617  return;
1618  }
1619  arrayElementCount nElem = subscr.getCount (
1620  guard, CA_V413(this->minorProtocolVersion) );
1621  arrayElementCount maxBytes;
1622  if ( CA_V49 ( this->minorProtocolVersion ) ) {
1623  maxBytes = 0xfffffff0;
1624  }
1625  else {
1626  maxBytes = MAX_TCP;
1627  }
1628  unsigned dataType = subscr.getType ( guard );
1629  // data type bounds checked when subscription constructed
1630  arrayElementCount maxElem = ( maxBytes - dbr_size[dataType] ) / dbr_value_size[dataType];
1631  if ( nElem > maxElem ) {
1633  }
1634  comQueSendMsgMinder minder ( this->sendQue, guard );
1635  // nElem boounds checked above
1638  static_cast < ca_uint16_t > ( dataType ),
1639  static_cast < ca_uint32_t > ( nElem ),
1640  chan.getSID (guard), subscr.getId (),
1641  CA_V49 ( this->minorProtocolVersion ) );
1642  minder.commit ();
1643 }
1644 
1646  nciu & chan, netSubscription & subscr )
1647 {
1648  guard.assertIdenticalMutex ( this->mutex );
1649  // there are situations where the circuit is disconnected, but
1650  // the channel does not know this yet
1651  if ( this->state != iiucs_connected ) {
1652  return;
1653  }
1654  comQueSendMsgMinder minder ( this->sendQue, guard );
1657  static_cast < ca_uint16_t > ( subscr.getType ( guard ) ),
1658  static_cast < ca_uint16_t > ( subscr.getCount (
1659  guard, CA_V413(this->minorProtocolVersion) ) ),
1660  chan.getSID(guard), subscr.getId(),
1661  CA_V49 ( this->minorProtocolVersion ) );
1662  minder.commit ();
1663 }
1664 
1666 {
1667  guard.assertIdenticalMutex ( this->mutex );
1668 
1669  if ( this->sendQue.occupiedBytes() > 0 ) {
1670  while ( comBuf * pBuf = this->sendQue.popNextComBufToSend () ) {
1671  epicsTime current = epicsTime::getCurrent ();
1672 
1673  unsigned bytesToBeSent = pBuf->occupiedBytes ();
1674  bool success = false;
1675  {
1676  // no lock while blocking to send
1677  epicsGuardRelease < epicsMutex > unguard ( guard );
1678  success = pBuf->flushToWire ( *this, current );
1679  pBuf->~comBuf ();
1680  this->comBufMemMgr.release ( pBuf );
1681  }
1682 
1683  if ( ! success ) {
1684  while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) {
1685  pBuf->~comBuf ();
1686  this->comBufMemMgr.release ( pBuf );
1687  }
1688  return false;
1689  }
1690 
1691  // set it here with this odd order because we must have
1692  // the lock and we must have already sent the bytes
1693  this->unacknowledgedSendBytes += bytesToBeSent;
1694  if ( this->unacknowledgedSendBytes >
1695  this->socketLibrarySendBufferSize ) {
1696  this->recvDog.sendBacklogProgressNotify ( guard );
1697  }
1698  }
1699  }
1700 
1701  this->earlyFlush = false;
1702  if ( this->blockingForFlush ) {
1703  this->flushBlockEvent.signal ();
1704  }
1705 
1706  return true;
1707 }
1708 
1710 {
1711  this->flushRequest ( guard );
1712  // the process thread is not permitted to flush as this
1713  // can result in a push / pull deadlock on the TCP pipe.
1714  // Instead, the process thread scheduals the flush with the
1715  // send thread which runs at a higher priority than the
1716  // receive thread. The same applies to the UDP thread for
1717  // locking hierarchy reasons.
1719  // enable / disable of call back preemption must occur here
1720  // because the tcpiiu might disconnect while waiting and its
1721  // pointer to this cac might become invalid
1722  assert ( this->blockingForFlush < UINT_MAX );
1723  this->blockingForFlush++;
1724  while ( this->sendQue.flushBlockThreshold() ) {
1725 
1726  bool userRequestsCanBeAccepted =
1727  this->state == iiucs_connected ||
1728  ( ! this->ca_v42_ok ( guard ) &&
1729  this->state == iiucs_connecting );
1730  // fail the users request if we have a disconnected
1731  // or unresponsive circuit
1732  if ( ! userRequestsCanBeAccepted ||
1733  this->unresponsiveCircuit ) {
1734  this->decrementBlockingForFlushCount ( guard );
1735  throw cacChannel::notConnected ();
1736  }
1737 
1738  epicsGuardRelease < epicsMutex > unguard ( guard );
1739  this->flushBlockEvent.wait ( 30.0 );
1740  }
1741  this->decrementBlockingForFlushCount ( guard );
1742  }
1743 }
1744 
1746  epicsGuard < epicsMutex > & guard )
1747 {
1748  guard.assertIdenticalMutex ( this->mutex );
1749 #if 0
1750  if ( ! this->earlyFlush && this->sendQue.flushEarlyThreshold(0u) ) {
1751  this->earlyFlush = true;
1752  this->sendThreadFlushEvent.signal ();
1753  }
1754 #endif
1755  return sendQue.occupiedBytes ();
1756 }
1757 
1759  epicsGuard < epicsMutex > & guard )
1760 {
1761  guard.assertIdenticalMutex ( this->mutex );
1762  assert ( this->blockingForFlush > 0u );
1763  this->blockingForFlush--;
1764  if ( this->blockingForFlush > 0 ) {
1765  this->flushBlockEvent.signal ();
1766  }
1767 }
1768 
1770  epicsGuard < epicsMutex > & guard ) const
1771 {
1772  guard.assertIdenticalMutex ( this->mutex );
1773  return this->address();
1774 }
1775 
1776 // not inline because its virtual
1778  epicsGuard < epicsMutex > & guard ) const
1779 {
1780  guard.assertIdenticalMutex ( this->mutex );
1781  return CA_V42 ( this->minorProtocolVersion );
1782 }
1783 
1785  epicsGuard < epicsMutex > & guard )
1786 {
1787  guard.assertIdenticalMutex ( this->mutex );
1788  this->recvProcessPostponedFlush = true;
1789 }
1790 
1792  epicsGuard < epicsMutex > & guard,
1793  char * pBuf, unsigned bufLength ) const throw ()
1794 {
1795  guard.assertIdenticalMutex ( this->mutex );
1796  return this->hostNameCacheInstance.getName ( pBuf, bufLength );
1797 }
1798 
1799 const char * tcpiiu::pHostName (
1800  epicsGuard < epicsMutex > & guard ) const throw ()
1801 {
1802  guard.assertIdenticalMutex ( this->mutex );
1803  return this->hostNameCacheInstance.pointer ();
1804 }
1805 
1807  epicsGuard < epicsMutex > & cbGuard,
1808  epicsGuard < epicsMutex > & guard,
1809  class udpiiu & discIIU )
1810 {
1811  cbGuard.assertIdenticalMutex ( this->cbMutex );
1812  guard.assertIdenticalMutex ( this->mutex );
1813 
1814  while ( nciu * pChan = this->createReqPend.get () ) {
1815  discIIU.installDisconnectedChannel ( guard, *pChan );
1816  }
1817 
1818  while ( nciu * pChan = this->createRespPend.get () ) {
1819  // we dont yet know the server's id so we cant
1820  // send a channel delete request and will instead
1821  // trust that the server can do the proper cleanup
1822  // when the circuit disconnects
1823  discIIU.installDisconnectedChannel ( guard, *pChan );
1824  }
1825 
1826  while ( nciu * pChan = this->v42ConnCallbackPend.get () ) {
1827  this->clearChannelRequest ( guard,
1828  pChan->getSID(guard), pChan->getCID(guard) );
1829  discIIU.installDisconnectedChannel ( guard, *pChan );
1830  }
1831 
1832  while ( nciu * pChan = this->subscripReqPend.get () ) {
1833  pChan->disconnectAllIO ( cbGuard, guard );
1834  this->clearChannelRequest ( guard,
1835  pChan->getSID(guard), pChan->getCID(guard) );
1836  discIIU.installDisconnectedChannel ( guard, *pChan );
1837  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1838  }
1839 
1840  while ( nciu * pChan = this->connectedList.get () ) {
1841  pChan->disconnectAllIO ( cbGuard, guard );
1842  this->clearChannelRequest ( guard,
1843  pChan->getSID(guard), pChan->getCID(guard) );
1844  discIIU.installDisconnectedChannel ( guard, *pChan );
1845  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1846  }
1847 
1848  while ( nciu * pChan = this->unrespCircuit.get () ) {
1849  // if we know that the circuit is unresponsive
1850  // then we dont send a channel delete request and
1851  // will instead trust that the server can do the
1852  // proper cleanup when the circuit disconnects
1853  pChan->disconnectAllIO ( cbGuard, guard );
1854  discIIU.installDisconnectedChannel ( guard, *pChan );
1855  }
1856 
1857  while ( nciu * pChan = this->subscripUpdateReqPend.get () ) {
1858  pChan->disconnectAllIO ( cbGuard, guard );
1859  this->clearChannelRequest ( guard,
1860  pChan->getSID(guard), pChan->getCID(guard) );
1861  discIIU.installDisconnectedChannel ( guard, *pChan );
1862  pChan->unresponsiveCircuitNotify ( cbGuard, guard );
1863  }
1864 
1865  this->channelCountTot = 0u;
1866  this->initiateCleanShutdown ( guard );
1867 }
1868 
1870  epicsGuard < epicsMutex > & cbGuard,
1871  epicsGuard < epicsMutex > & guard )
1872 {
1873  cbGuard.assertIdenticalMutex ( this->cbMutex );
1874  guard.assertIdenticalMutex ( this->mutex );
1875 
1876  while ( nciu * pChan = this->createReqPend.get () ) {
1877  pChan->channelNode::listMember =
1878  channelNode::cs_none;
1879  pChan->serviceShutdownNotify ( cbGuard, guard );
1880  }
1881 
1882  while ( nciu * pChan = this->createRespPend.get () ) {
1883  pChan->channelNode::listMember =
1884  channelNode::cs_none;
1885  // we dont yet know the server's id so we cant
1886  // send a channel delete request and will instead
1887  // trust that the server can do the proper cleanup
1888  // when the circuit disconnects
1889  pChan->serviceShutdownNotify ( cbGuard, guard );
1890  }
1891 
1892  while ( nciu * pChan = this->v42ConnCallbackPend.get () ) {
1893  pChan->channelNode::listMember =
1894  channelNode::cs_none;
1895  this->clearChannelRequest ( guard,
1896  pChan->getSID(guard), pChan->getCID(guard) );
1897  pChan->serviceShutdownNotify ( cbGuard, guard );
1898  }
1899 
1900  while ( nciu * pChan = this->subscripReqPend.get () ) {
1901  pChan->channelNode::listMember =
1902  channelNode::cs_none;
1903  pChan->disconnectAllIO ( cbGuard, guard );
1904  this->clearChannelRequest ( guard,
1905  pChan->getSID(guard), pChan->getCID(guard) );
1906  pChan->serviceShutdownNotify ( cbGuard, guard );
1907  }
1908 
1909  while ( nciu * pChan = this->connectedList.get () ) {
1910  pChan->channelNode::listMember =
1911  channelNode::cs_none;
1912  pChan->disconnectAllIO ( cbGuard, guard );
1913  this->clearChannelRequest ( guard,
1914  pChan->getSID(guard), pChan->getCID(guard) );
1915  pChan->serviceShutdownNotify ( cbGuard, guard );
1916  }
1917 
1918  while ( nciu * pChan = this->unrespCircuit.get () ) {
1919  pChan->channelNode::listMember =
1920  channelNode::cs_none;
1921  pChan->disconnectAllIO ( cbGuard, guard );
1922  // if we know that the circuit is unresponsive
1923  // then we dont send a channel delete request and
1924  // will instead trust that the server can do the
1925  // proper cleanup when the circuit disconnects
1926  pChan->serviceShutdownNotify ( cbGuard, guard );
1927  }
1928 
1929  while ( nciu * pChan = this->subscripUpdateReqPend.get () ) {
1930  pChan->channelNode::listMember =
1931  channelNode::cs_none;
1932  pChan->disconnectAllIO ( cbGuard, guard );
1933  this->clearChannelRequest ( guard,
1934  pChan->getSID(guard), pChan->getCID(guard) );
1935  pChan->serviceShutdownNotify ( cbGuard, guard );
1936  }
1937 
1938  this->channelCountTot = 0u;
1939  this->initiateCleanShutdown ( guard );
1940 }
1941 
1943  epicsGuard < epicsMutex > & guard,
1944  nciu & chan, unsigned sidIn,
1945  ca_uint16_t typeIn, arrayElementCount countIn )
1946 {
1947  guard.assertIdenticalMutex ( this->mutex );
1948 
1949  this->createReqPend.add ( chan );
1950  this->channelCountTot++;
1951  chan.channelNode::listMember = channelNode::cs_createReqPend;
1952  chan.searchReplySetUp ( *this, sidIn, typeIn, countIn, guard );
1953  // The tcp send thread runs at apriority below the udp thread
1954  // so that this will not send small packets
1955  this->sendThreadFlushEvent.signal ();
1956 }
1957 
1959  epicsGuard < epicsMutex > & guard, nciu & chan )
1960 {
1961  guard.assertIdenticalMutex ( this->mutex );
1962  bool wasExpected = false;
1963  // this improves robustness in the face of a server sending
1964  // protocol that does not match its declared protocol revision
1965  if ( chan.channelNode::listMember == channelNode::cs_createRespPend ) {
1966  this->createRespPend.remove ( chan );
1967  this->subscripReqPend.add ( chan );
1968  chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1969  wasExpected = true;
1970  }
1971  else if ( chan.channelNode::listMember == channelNode::cs_v42ConnCallbackPend ) {
1972  this->v42ConnCallbackPend.remove ( chan );
1973  this->subscripReqPend.add ( chan );
1974  chan.channelNode::listMember = channelNode::cs_subscripReqPend;
1975  wasExpected = true;
1976  }
1977  // the TCP send thread is awakened by its receive thread whenever the receive thread
1978  // is about to block if this->subscripReqPend has items in it
1979  return wasExpected;
1980 }
1981 
1983  epicsGuard < epicsMutex > & guard, nciu & chan )
1984 {
1985  guard.assertIdenticalMutex ( this->mutex );
1986 
1987  switch ( chan.channelNode::listMember ) {
1988  case channelNode::cs_createReqPend:
1989  this->createReqPend.remove ( chan );
1990  break;
1991  case channelNode::cs_createRespPend:
1992  this->createRespPend.remove ( chan );
1993  break;
1994  case channelNode::cs_v42ConnCallbackPend:
1995  this->v42ConnCallbackPend.remove ( chan );
1996  break;
1997  case channelNode::cs_subscripReqPend:
1998  this->subscripReqPend.remove ( chan );
1999  break;
2000  case channelNode::cs_connected:
2001  this->connectedList.remove ( chan );
2002  break;
2003  case channelNode::cs_unrespCircuit:
2004  this->unrespCircuit.remove ( chan );
2005  break;
2006  case channelNode::cs_subscripUpdateReqPend:
2007  this->subscripUpdateReqPend.remove ( chan );
2008  break;
2009  default:
2010  errlogPrintf (
2011  "cac: attempt to uninstall channel from tcp iiu, but it inst installed there?" );
2012  }
2013  chan.channelNode::listMember = channelNode::cs_none;
2014  this->channelCountTot--;
2015  if ( this->channelCountTot == 0 && ! this->isNameService() ) {
2016  this->initiateCleanShutdown ( guard );
2017  }
2018 }
2019 
2021  epicsGuard < epicsMutex > & cbGuard,
2022  const char *pformat, ... )
2023 {
2024  cbGuard.assertIdenticalMutex ( this->cbMutex );
2025 
2026  va_list theArgs;
2027  int status;
2028 
2029  va_start ( theArgs, pformat );
2030 
2031  status = this->cacRef.varArgsPrintFormated ( cbGuard, pformat, theArgs );
2032 
2033  va_end ( theArgs );
2034 
2035  return status;
2036 }
2037 
2039 {
2040  if ( this->sendQue.occupiedBytes () > 0 ) {
2041  this->sendThreadFlushEvent.signal ();
2042  }
2043 }
2044 
2046 {
2047 #if 0
2048  FD_SET readBits;
2049  FD_ZERO ( & readBits );
2050  FD_SET ( this->sock, & readBits );
2051  struct timeval tmo;
2052  tmo.tv_sec = 0;
2053  tmo.tv_usec = 0;
2054  int status = select ( this->sock + 1, & readBits, NULL, NULL, & tmo );
2055  if ( status > 0 ) {
2056  if ( FD_ISSET ( this->sock, & readBits ) ) {
2057  return true;
2058  }
2059  }
2060  return false;
2061 #else
2062  osiSockIoctl_t bytesPending = 0; /* shut up purifys yapping */
2063  int status = socket_ioctl ( this->sock,
2064  FIONREAD, & bytesPending );
2065  if ( status >= 0 ) {
2066  if ( bytesPending > 0 ) {
2067  return true;
2068  }
2069  }
2070  return false;
2071 #endif
2072 }
2073 
2075  epicsGuard < epicsMutex > & ) const
2076 {
2077  return this->recvDog.delay ();
2078 }
2079 
2080 /*
2081  * Certain OS, such as HPUX, do not unblock a socket system call
2082  * when another thread asynchronously calls both shutdown() and
2083  * close(). To solve this problem we need to employ OS specific
2084  * mechanisms.
2085  */
2087 {
2088  epicsThreadId threadId = this->thread.getId ();
2089  if ( threadId ) {
2090  epicsSignalRaiseSigAlarm ( threadId );
2091  }
2092 }
2094 {
2095  epicsThreadId threadId = this->thread.getId ();
2096  if ( threadId ) {
2097  epicsSignalRaiseSigAlarm ( threadId );
2098  }
2099 }
2100 
2101 void tcpiiu::operator delete ( void * /* pCadaver */ )
2102 {
2103  // Visual C++ .net appears to require operator delete if
2104  // placement operator delete is defined? I smell a ms rat
2105  // because if I declare placement new and delete, but
2106  // comment out the placement delete definition there are
2107  // no undefined symbols.
2108  errlogPrintf ( "%s:%d this compiler is confused about "
2109  "placement delete - memory was probably leaked",
2110  __FILE__, __LINE__ );
2111 }
2112 
2114 {
2115  guard.assertIdenticalMutex ( this->mutex );
2116  return this->channelCountTot;
2117 }
2118 
2120  epicsGuard < epicsMutex > & guard, nciu & chan,
2121  const class epicsTime & currentTime )
2122 {
2124  guard, chan, currentTime );
2125 }
2126 
2129  const char * pName, unsigned nameLength )
2130 {
2131  return netiiu::searchMsg (
2132  guard, id, pName, nameLength );
2133 }
2134 
2136  cac & cacIn, const osiSockAddr & addrIn ) :
2137  _ptcpiiu ( NULL ),
2138  _cac ( cacIn ),
2139  _addr ( addrIn ),
2140  _active ( false )
2141 {
2142 }
2143 
2145 {
2146  _active = false;
2147  _ptcpiiu = NULL;
2148 }
2149 
2151 {
2152  _active = true;
2153 }
2154 
2156  epicsGuard < epicsMutex > & guard,
2157  const char * pBuf, size_t len )
2158 {
2159  // restart circuit if it was shut down
2160  if ( ! _ptcpiiu ) {
2161  tcpiiu * piiu = NULL;
2162  bool newIIU = _cac.findOrCreateVirtCircuit (
2163  guard, _addr, cacChannel::priorityDefault,
2164  piiu, CA_UKN_MINOR_VERSION, this );
2165  if ( newIIU ) {
2166  piiu->start ( guard );
2167  }
2168  _ptcpiiu = piiu;
2169  }
2170 
2171  // does this server support TCP-based name resolution?
2172  if ( CA_V412 ( _ptcpiiu->minorProtocolVersion ) ) {
2173  guard.assertIdenticalMutex ( _ptcpiiu->mutex );
2174  assert ( CA_MESSAGE_ALIGN ( len ) == len );
2175  comQueSendMsgMinder minder ( _ptcpiiu->sendQue, guard );
2176  _ptcpiiu->sendQue.pushString ( pBuf, len );
2177  minder.commit ();
2178  _ptcpiiu->flushRequest ( guard );
2179  }
2180 }
2181 
2183  epicsGuard < epicsMutex > & guard, unsigned level ) const
2184 {
2185  :: printf ( "tcpiiu :: SearchDestTCP\n" );
2186 }
2187 
2189 {
2190  this->minorProtocolVersion = msg.m_count;
2191 }
2192 
2194  const epicsTime & currentTime, const caHdrLargeArray & msg )
2195 {
2196  /*
2197  * the type field is abused to carry the port number
2198  * so that we can have multiple servers on one host
2199  */
2200  osiSockAddr serverAddr;
2201  if ( msg.m_cid != INADDR_BROADCAST ) {
2202  serverAddr.ia.sin_family = AF_INET;
2203  serverAddr.ia.sin_addr.s_addr = htonl ( msg.m_cid );
2204  serverAddr.ia.sin_port = htons ( msg.m_dataType );
2205  }
2206  else {
2207  serverAddr = this->address ();
2208  }
2209  cacRef.transferChanToVirtCircuit
2210  ( msg.m_available, msg.m_cid, 0xffff,
2211  0, minorProtocolVersion, serverAddr, currentTime );
2212 }
void subscriptionCancelRequest(epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
Definition: tcpiiu.cpp:1645
const char * pLocalHostName()
Definition: cac.h:408
#define SHUT_WR
Definition: osdSock.h:45
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
void destroyIIU(tcpiiu &iiu)
Definition: cac.cpp:1227
void interruptSocketRecv()
Definition: tcpiiu.cpp:2086
tsDLList< nciu > subscripUpdateReqPend
void disconnectAllChannels(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard, class udpiiu &)
Definition: tcpiiu.cpp:1806
void searchRequest(epicsGuard< epicsMutex > &guard, const char *pbuf, size_t len)
Definition: tcpiiu.cpp:2155
tsDLList< nciu > connectedList
bool ca_v44_ok(epicsGuard< epicsMutex > &) const
#define MAX_TCP
Definition: caProto.h:63
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:896
char * pCurData
void requestRecvProcessPostponedFlush(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1784
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:847
#define SOCK_ECONNABORTED
Definition: osdSock.h:59
void writeNotifyRequest(epicsGuard< epicsMutex > &, nciu &, netWriteNotifyIO &, unsigned type, arrayElementCount nElem, const void *pValue)
Definition: tcpiiu.cpp:1436
Definition: cac.h:97
void sendTimeoutNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
virtual ~tcpRecvThread()
Definition: tcpiiu.cpp:386
#define INVALID_SOCKET
Definition: osdSock.h:32
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
bool socketHasBeenClosed
#define CA_PROTO_CREATE_CHAN
Definition: caProto.h:101
unsigned nameLen(epicsGuard< epicsMutex > &) const
Definition: nciu.cpp:252
void add(T &item)
Definition: tsDLList.h:313
#define CA_PROTO_EVENT_ADD
Definition: caProto.h:84
void start()
Definition: tcpiiu.cpp:60
bool searchMsg(epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)
Definition: tcpiiu.cpp:2127
unsigned minorProtocolVersion
#define CA_MAJOR_PROTOCOL_REVISION
Definition: caProto.h:26
pvd::Status status
#define CA_PROTO_VERSION
Definition: caProto.h:83
void pushString(const char *pVal, unsigned nChar)
Definition: comQueSend.h:203
int osiSocklen_t
Definition: osdSock.h:36
void pushUInt16(const ca_uint16_t value)
Definition: comQueSend.h:188
epicsEvent flushBlockEvent
osiSockAddr getNetworkAddress(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:1769
int i
Definition: scan.c:967
void show(unsigned level) const
Definition: tcpiiu.cpp:65
#define SOCK_ENOBUFS
Definition: osdSock.h:52
struct sockaddr sa
Definition: osiSock.h:158
#define CA_PROTO_WRITE
Definition: caProto.h:87
epicsMutex & cbMutex
arrayElementCount curDataMax
ca_uint32_t m_postsize
unsigned getMask(epicsGuard< epicsMutex > &) const
Definition: netIO.h:258
#define CA_PROTO_WRITE_NOTIFY
Definition: caProto.h:102
#define CA_V43(MINOR)
Definition: caProto.h:34
LIBCOM_API void *epicsStdCall epicsThreadPrivateGet(epicsThreadPrivateId)
Definition: osdThread.c:973
unsigned short epicsUInt16
Definition: epicsTypes.h:41
void flushRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2038
struct sockaddr_in ia
Definition: osiSock.h:157
#define CA_PROTO_EVENT_CANCEL
Definition: caProto.h:85
unsigned requestMessageBytesPending(epicsGuard< epicsMutex > &mutualExclusionGuard)
Definition: tcpiiu.cpp:1745
LIBCOM_API void epicsStdCall epicsSignalRaiseSigAlarm(struct epicsThreadOSD *)
Definition: osdSignal.cpp:19
bool popOldMsgHeader(struct caHdrLargeArray &)
Definition: comQueRecv.cpp:230
bool unresponsiveCircuit
bool processIncoming(const epicsTime &currentTime, callbackManager &)
Definition: tcpiiu.cpp:1146
bool flushEarlyThreshold(unsigned nBytesThisMsg) const
Definition: comQueSend.h:226
tsDLIterConst< T > firstIter() const
Definition: tsDLList.h:459
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
tcpiiu(cac &cac, epicsMutex &mutualExclusion, epicsMutex &callbackControl, cacContextNotify &, double connectionTimeout, epicsTimerQueue &timerQueue, const osiSockAddr &addrIn, comBufMemoryManager &, unsigned minorVersion, ipAddrToAsciiEngine &engineIn, const cacChannel::priLev &priorityIn, SearchDestTCP *pSearchDestIn=NULL)
Definition: tcpiiu.cpp:665
const char * pName(epicsGuard< epicsMutex > &) const
Definition: nciu.cpp:226
#define printf
Definition: epicsStdio.h:41
void interruptSocketSend()
Definition: tcpiiu.cpp:2093
#define CA_V42(MINOR)
Definition: caProto.h:33
ca_uint32_t getCID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:297
void unlinkAllChannels(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:1869
pvd::StructureConstPtr type
unsigned int ca_uint32_t
Definition: caProto.h:76
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
Definition: memory.hpp:41
void fillFromWire(wireRecvAdapter &, statusWireIO &)
Definition: comBuf.h:176
#define INVALID_DB_REQ(x)
Definition: db_access.h:115
#define CA_UKN_MINOR_VERSION
Definition: caProto.h:29
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
unsigned blockingForFlush
void receiveTimeoutNotify(callbackManager &, epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:889
void attachToClientCtx()
Definition: cac.h:371
bool ca_v42_ok(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:1777
void sendTimeoutNotify(callbackManager &cbMgr, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:878
#define NULL
Definition: catime.c:38
void start(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:819
ca_uint16_t m_dataType
cac & cacRef
void unresponsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:898
double connectionTimeout(epicsGuard< epicsMutex > &)
Definition: cac.h:420
void versionMessage(epicsGuard< epicsMutex > &, const cacChannel::priLev &priority)
Definition: tcpiiu.cpp:1380
tcpSendThread(class tcpiiu &iiuIn, const char *pName, unsigned int stackSize, unsigned int priority)
Definition: tcpiiu.cpp:49
unsigned sendBytes(const void *pBuf, unsigned nBytesInBuf, const epicsTime &currentTime)
Definition: tcpiiu.cpp:231
void searchReplySetUp(netiiu &iiu, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn, epicsGuard< epicsMutex > &)
Definition: nciu.h:311
epicsThreadPrivateId caClientCallbackThreadId
unsigned count() const
Definition: tsDLList.h:181
#define CA_PROTO_EVENTS_OFF
Definition: caProto.h:91
const T getId() const
Definition: resourceLib.h:1021
arrayElementCount curDataBytes
unsigned short ca_uint16_t
Definition: caProto.h:75
swioCircuitState circuitState
Definition: comBuf.h:64
unsigned priLev
Definition: cacIO.h:165
epicsSocketSystemCallInterruptMechanismQueryInfo
Definition: osiSock.h:47
void sendBacklogProgressNotify(epicsGuard< epicsMutex > &)
void uninstallChan(epicsGuard< epicsMutex > &guard, nciu &chan)
Definition: tcpiiu.cpp:1982
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
void flush(epicsGuard< epicsMutex > &mutualExclusionGuard)
Definition: tcpiiu.cpp:1709
#define CA_V49(MINOR)
Definition: caProto.h:40
void uninstallChanDueToSuccessfulSearchResponse(epicsGuard< epicsMutex > &, nciu &, const class epicsTime &)
Definition: tcpiiu.cpp:2119
bool valid() const
Definition: tsDLList.h:506
#define SOCK_SHUTDOWN
Definition: osdSock.h:67
virtual ~tcpSendThread()
Definition: tcpiiu.cpp:56
void responsiveCircuitNotify(epicsGuard< epicsMutex > &cbGuard, epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:860
double delay() const
bool earlyFlush
int printFormated(epicsGuard< epicsMutex > &cbGuard, const char *pformat,...)
Definition: tcpiiu.cpp:2020
bool recvProcessPostponedFlush
#define CA_PROTO_READ_SYNC
Definition: caProto.h:93
#define CA_PROTO_HOST_NAME
Definition: caProto.h:104
#define CA_V413(MINOR)
Definition: caProto.h:44
#define CA_MINOR_PROTOCOL_REVISION
Definition: nciu.h:35
void pushFloat32(const ca_float32_t value)
Definition: comQueSend.h:198
void installDisconnectedChannel(epicsGuard< epicsMutex > &, nciu &)
Definition: udpiiu.cpp:1262
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
unsigned bytesCopied
Definition: comBuf.h:63
unsigned occupiedBytes() const
Definition: comQueSend.h:216
#define socket_ioctl(A, B, C)
Definition: osdSock.h:34
void exitWait()
Definition: tcpiiu.cpp:404
const char * pHostName(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:1799
void exitWait()
Definition: tcpiiu.cpp:69
#define SOCK_ECONNRESET
Definition: osdSock.h:53
void decrementBlockingForFlushCount(epicsGuard< epicsMutex > &guard)
Definition: tcpiiu.cpp:1758
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
void hostNameSetRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1291
comQueSend sendQue
bool isNameService() const
epicsPlacementDeleteOperator((void *, tsFreeList< class tcpiiu, 32, epicsMutexNOOP > &)) private tcpRecvThrea recvThread)
ca_uint32_t m_count
void disable()
Definition: tcpiiu.cpp:2144
void show(epicsGuard< epicsMutex > &guard, unsigned level) const
Definition: tcpiiu.cpp:2182
int varArgsPrintFormated(epicsGuard< epicsMutex > &callbackControl, const char *pformat, va_list args) const
Definition: cac.h:363
void subscriptionUpdateRequest(epicsGuard< epicsMutex > &, nciu &chan, netSubscription &subscr)
Definition: tcpiiu.cpp:1609
tcpSendWatchdog sendDog
bool executeResponse(callbackManager &, tcpiiu &, const epicsTime &currentTime, caHdrLargeArray &, char *pMsgBody)
Definition: cac.cpp:1204
unsigned channelCountTot
void flushIfRecvProcessRequested(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1137
#define CA_MESSAGE_ALIGN(A)
Definition: caProto.h:154
int osiSockIoctl_t
Definition: osdSock.h:35
tcpSendThread sendThread
unsigned removeBytes(unsigned nBytes)
Definition: comQueRecv.cpp:69
bool connectNotify(epicsGuard< epicsMutex > &, nciu &chan)
Definition: tcpiiu.cpp:1958
epicsMutex mutex
Definition: pvAccess.cpp:71
const char cacNillBytes[]
Definition: comQueSend.cpp:74
virtual void uninstallChanDueToSuccessfulSearchResponse(epicsGuard< epicsMutex > &, nciu &, const class epicsTime &currentTime)=0
Definition: netiiu.cpp:159
tcpRecvWatchdog recvDog
void writeRequest(epicsGuard< epicsMutex > &, nciu &, unsigned type, arrayElementCount nElem, const void *pValue)
Definition: tcpiiu.cpp:1421
LIBCOM_API void epicsStdCall freeListFree(void *pvt, void *pmem)
Definition: freeListLib.c:131
LIBCOM_API void epicsStdCall epicsThreadPrivateSet(epicsThreadPrivateId, void *)
Definition: osdThread.c:961
void show(unsigned level) const
Definition: tcpiiu.cpp:395
unsigned unacknowledgedSendBytes
double receiveWatchdogDelay(epicsGuard< epicsMutex > &) const
Definition: tcpiiu.cpp:2074
void readNotifyRequest(epicsGuard< epicsMutex > &, nciu &, netReadNotifyIO &, unsigned type, arrayElementCount nElem)
Definition: tcpiiu.cpp:1455
void enableFlowControlRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1364
bool bytesArePendingInOS() const
Definition: tcpiiu.cpp:2045
void searchRespNotify(const epicsTime &, const caHdrLargeArray &)
Definition: tcpiiu.cpp:2193
bool flushBlockThreshold() const
Definition: comQueSend.h:221
void recvBytes(void *pBuf, unsigned nBytesInBuf, statusWireIO &)
Definition: tcpiiu.cpp:299
#define ECA_UNRESPTMO
Definition: caerr.h:137
SearchDestTCP(cac &, const osiSockAddr &)
Definition: tcpiiu.cpp:2135
unsigned priority() const
Definition: caServerID.h:80
BSD and SRV5 Unix timestamp.
Definition: epicsTime.h:52
unsigned occupiedBytes() const
Definition: comQueRecv.h:60
epicsMutex & mutex
void start()
Definition: tcpiiu.cpp:390
caHdrLargeArray curMsg
void connectNotify(epicsGuard< epicsMutex > &)
unsigned getcount() const
Definition: nciu.h:200
enum tcpiiu::iiu_conn_state state
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
void initiateCleanShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:826
unsigned socketLibrarySendBufferSize
int select(int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
bool ca_v41_ok(epicsGuard< epicsMutex > &) const
~tcpiiu()
Definition: tcpiiu.cpp:1014
arrayElementCount getCount(epicsGuard< epicsMutex > &, bool allow_zero) const
Definition: netIO.h:240
void messageArrivalNotify(epicsGuard< epicsMutex > &guard)
epicsEvent sendThreadFlushEvent
void installChannel(epicsGuard< epicsMutex > &, nciu &chan, unsigned sidIn, ca_uint16_t typeIn, arrayElementCount countIn)
Definition: tcpiiu.cpp:1942
#define CA_PROTO_CLEAR_CHANNEL
Definition: caProto.h:95
#define CA_PROTO_ECHO
Definition: caProto.h:106
epicsGuard< epicsMutex > cbGuard
Definition: cac.h:94
#define SOCKERRNO
Definition: osdSock.h:33
ca_uint32_t m_cid
void insertRequestWithPayLoad(ca_uint16_t request, unsigned dataType, arrayElementCount nElem, ca_uint32_t cid, ca_uint32_t requestDependent, const void *pPayload, bool v49Ok)
Definition: comQueSend.cpp:317
unsigned contigRecvMsgCount
void subscriptionRequest(epicsGuard< epicsMutex > &, nciu &, netSubscription &subscr)
Definition: tcpiiu.cpp:1557
unsigned long arrayElementCount
Definition: cacIO.h:57
bool busyStateDetected
bool findOrCreateVirtCircuit(epicsGuard< epicsMutex > &, const osiSockAddr &, unsigned, tcpiiu *&, unsigned, SearchDestTCP *pSearchDest=NULL)
Definition: cac.cpp:535
comBuf * popNextComBufToSend()
Definition: comQueSend.cpp:259
#define CA_PROTO_CLIENT_NAME
Definition: caProto.h:103
T * get()
Definition: tsDLList.h:261
#define CA_PROTO_READ_NOTIFY
Definition: caProto.h:98
int errlogMessage(const char *message)
Definition: errlog.c:180
void setCircuit(tcpiiu *)
Definition: comBuf.h:75
void disconnectNotify(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:851
#define SOCK_EPIPE
Definition: osdSock.h:65
virtual bool searchMsg(epicsGuard< epicsMutex > &, ca_uint32_t id, const char *pName, unsigned nameLength)=0
Definition: netiiu.cpp:166
bool flowControlActive
comQueRecv recvQue
bool discardingPendingData
#define SOCK_EINTR
Definition: osdSock.h:64
tcpRecvThread(class tcpiiu &iiuIn, epicsMutex &cbMutexIn, cacContextNotify &, const char *pName, unsigned int stackSize, unsigned int priority)
Definition: tcpiiu.cpp:378
unsigned copyOutBytes(epicsInt8 *pBuf, unsigned nBytes)
Definition: comQueRecv.cpp:48
#define MAX_MSG_SIZE
Definition: caProto.h:64
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
#define genLocalExcep(CBGUARD, GUARD, CAC, STAT, PCTX)
Definition: iocinf.h:66
void echoRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1400
ca_uint16_t m_cmmd
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
Definition: udpiiu.h:79
void userNameSetRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1321
void show(unsigned level) const
Definition: nciu.cpp:472
void remove(T &item)
Definition: tsDLList.h:219
ca_uint32_t m_available
#define debugPrintf(argsInParen)
Definition: iocinf.h:30
unsigned channelCount(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:2113
OS-independent routines for ignoring Posix signals.
SOCKET sock
#define CA_PROTO_EVENTS_ON
Definition: caProto.h:92
void disableFlowControlRequest(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1348
bool oldMsgHeaderAvailable
void clearChannelRequest(epicsGuard< epicsMutex > &, ca_uint32_t sid, ca_uint32_t cid)
Definition: tcpiiu.cpp:1536
osiSockAddr address() const
Definition: caServerID.h:73
tsDLList< nciu > createReqPend
bool sendThreadFlush(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1665
void show(unsigned level) const
Definition: tcpiiu.cpp:1043
SearchDestTCP * pSearchDest
#define CA_V412(MINOR)
Definition: caProto.h:43
void initiateAbortShutdown(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:942
comBufMemoryManager & comBufMemMgr
tsDLList< nciu > createRespPend
unsigned getHostName(epicsGuard< epicsMutex > &, char *pBuf, unsigned bufLength) const
Definition: tcpiiu.cpp:1791
ca_uint32_t getSID(epicsGuard< epicsMutex > &) const
Definition: nciu.h:291
bool _receiveThreadIsBusy
bool echoRequestPending
tsDLList< nciu > subscripReqPend
const char * userNamePointer() const
Definition: cac.h:348
void createChannelRequest(nciu &, epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1487
#define false
Definition: flexdef.h:85
unsigned getType(epicsGuard< epicsMutex > &) const
Definition: netIO.h:253
static const priLev priorityDefault
Definition: cacIO.h:168
void pushLastComBufReceived(comBuf &)
Definition: comQueRecv.cpp:102
bool setEchoRequestPending(epicsGuard< epicsMutex > &)
Definition: tcpiiu.cpp:1121
#define CA_V41(MINOR)
Definition: caProto.h:32
epicsUInt32 popUInt32()
Definition: comQueRecv.cpp:211
T * first(void) const
Definition: tsDLList.h:190
#define SHUT_RDWR
Definition: osdSock.h:48
tsDLList< nciu > unrespCircuit
void enable()
Definition: tcpiiu.cpp:2150
bool msgHeaderAvailable
void versionRespNotify(const caHdrLargeArray &)
Definition: tcpiiu.cpp:2188
Definition: nciu.h:127
tsDLList< nciu > v42ConnCallbackPend
virtual void release(void *)=0
LIBCOM_API void *epicsStdCall freeListMalloc(void *pvt)
Definition: freeListLib.c:74