This is Unofficial EPICS BASE Doxygen Site
ca_client_context.cpp
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2002 The University of Chicago, as Operator of Argonne
3 * National Laboratory.
4 * Copyright (c) 2002 The Regents of the University of California, as
5 * Operator of Los Alamos National Laboratory.
6 * EPICS BASE is distributed subject to a Software License Agreement found
7 * in file LICENSE that is included with this distribution.
8 \*************************************************************************/
9 
10 /*
11  *
12  *
13  * L O S A L A M O S
14  * Los Alamos National Laboratory
15  * Los Alamos, New Mexico 87545
16  *
17  * Copyright, 1986, The Regents of the University of California.
18  *
19  *
20  * Author Jeffrey O. Hill
21  * johill@lanl.gov
22  * 505 665 1831
23  */
24 
25 #ifdef _MSC_VER
26 # pragma warning(disable:4355)
27 #endif
28 
29 #include <stdexcept>
30 #include <string> // vxWorks 6.0 requires this include
31 #include <stdio.h>
32 
33 #include "epicsExit.h"
34 #include "errlog.h"
35 #include "locationException.h"
36 
37 #include "iocinf.h"
38 #include "oldAccess.h"
39 #include "cac.h"
40 
42 
44 
45 const unsigned ca_client_context :: flushBlockThreshold = 0x58000;
46 
47 // runs once only for each process
48 extern "C" void cacOnceFunc ( void * )
49 {
50  caClientCallbackThreadId = epicsThreadPrivateCreate ();
51  assert ( caClientCallbackThreadId );
52  ca_client_context::pDefaultServiceInstallMutex = newEpicsMutex;
53 }
54 
56 
57 cacService * ca_client_context::pDefaultService = 0;
58 epicsMutex * ca_client_context::pDefaultServiceInstallMutex;
59 
60 ca_client_context::ca_client_context ( bool enablePreemptiveCallback ) :
61  mutex(__FILE__, __LINE__),
62  cbMutex(__FILE__, __LINE__),
63  createdByThread ( epicsThreadGetIdSelf () ),
64  ca_exception_func ( 0 ), ca_exception_arg ( 0 ),
65  pVPrintfFunc ( errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ),
66  pndRecvCnt ( 0u ), ioSeqNo ( 0u ), callbackThreadsPending ( 0u ),
67  localPort ( 0 ), fdRegFuncNeedsToBeCalled ( false ),
68  noWakeupSincePend ( true )
69 {
70  static const unsigned short PORT_ANY = 0u;
71 
72  if ( ! osiSockAttach () ) {
74  }
75 
76  epicsThreadOnce ( & cacOnce, cacOnceFunc, 0 );
77  {
78  epicsGuard < epicsMutex > guard ( *ca_client_context::pDefaultServiceInstallMutex );
79  if ( ca_client_context::pDefaultService ) {
80  this->pServiceContext.reset (
81  & ca_client_context::pDefaultService->contextCreate (
82  this->mutex, this->cbMutex, *this ) );
83  }
84  else {
85  this->pServiceContext.reset ( new cac ( this->mutex, this->cbMutex, *this ) );
86  }
87  }
88 
89  this->sock = epicsSocketCreate ( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
90  if ( this->sock == INVALID_SOCKET ) {
91  char sockErrBuf[64];
92  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
93  this->printFormated (
94  "ca_client_context: unable to create "
95  "datagram socket because = \"%s\"\n",
96  sockErrBuf );
98  }
99 
100  {
101  osiSockIoctl_t yes = true;
102  int status = socket_ioctl ( this->sock,
103  FIONBIO, & yes);
104  if ( status < 0 ) {
105  char sockErrBuf[64];
106  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
107  epicsSocketDestroy ( this->sock );
108  this->printFormated (
109  "%s: non blocking IO set fail because \"%s\"\n",
110  __FILE__, sockErrBuf );
112  }
113  }
114 
115  // force a bind to an unconstrained address so we can obtain
116  // the local port number below
117  {
118  osiSockAddr addr;
119  memset ( (char *)&addr, 0 , sizeof ( addr ) );
120  addr.ia.sin_family = AF_INET;
121  addr.ia.sin_addr.s_addr = htonl ( INADDR_ANY );
122  addr.ia.sin_port = htons ( PORT_ANY );
123  int status = bind (this->sock, &addr.sa, sizeof (addr) );
124  if ( status < 0 ) {
125  char sockErrBuf[64];
126  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
127  epicsSocketDestroy (this->sock);
128  this->printFormated (
129  "CAC: unable to bind to an unconstrained "
130  "address because = \"%s\"\n",
131  sockErrBuf );
133  }
134  }
135 
136  {
137  osiSockAddr tmpAddr;
138  osiSocklen_t saddr_length = sizeof ( tmpAddr );
139  int status = getsockname ( this->sock, & tmpAddr.sa, & saddr_length );
140  if ( status < 0 ) {
141  char sockErrBuf[64];
142  epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) );
143  epicsSocketDestroy ( this->sock );
144  this->printFormated ( "CAC: getsockname () error was \"%s\"\n", sockErrBuf );
146  }
147  if ( tmpAddr.sa.sa_family != AF_INET) {
148  epicsSocketDestroy ( this->sock );
149  this->printFormated ( "CAC: UDP socket was not inet addr family\n" );
151  }
152  this->localPort = htons ( tmpAddr.ia.sin_port );
153  }
154 
155  std::auto_ptr < CallbackGuard > pCBGuard;
156  if ( ! enablePreemptiveCallback ) {
157  pCBGuard.reset ( new CallbackGuard ( this->cbMutex ) );
158  }
159 
160  // multiple steps ensure exception safety
161  this->pCallbackGuard = pCBGuard;
162 }
163 
165 {
166  if ( this->fdRegFunc ) {
167  ( *this->fdRegFunc )
168  ( this->fdRegArg, this->sock, false );
169  }
170  epicsSocketDestroy ( this->sock );
171 
172  osiSockRelease ();
173 
174  // force a logical shutdown order
175  // so that the cac class does not hang its
176  // receive threads during their shutdown sequence
177  // and so that classes using this classes mutex
178  // are destroyed before the mutex is destroyed
179  if ( this->pCallbackGuard.get() ) {
180  epicsGuardRelease < epicsMutex > unguard ( *this->pCallbackGuard );
181  this->pServiceContext.reset ( 0 );
182  }
183  else {
184  this->pServiceContext.reset ( 0 );
185  }
186 }
187 
189  epicsGuard < epicsMutex > & guard, getCopy & gc )
190 {
191  guard.assertIdenticalMutex ( this->mutex );
192  gc.~getCopy ();
193  this->getCopyFreeList.release ( & gc );
194 }
195 
197  epicsGuard < epicsMutex > & guard, getCallback & gcb )
198 {
199  guard.assertIdenticalMutex ( this->mutex );
200  gcb.~getCallback ();
201  this->getCallbackFreeList.release ( & gcb );
202 }
203 
205  epicsGuard < epicsMutex > & guard, putCallback & pcb )
206 {
207  guard.assertIdenticalMutex ( this->mutex );
208  pcb.~putCallback ();
209  this->putCallbackFreeList.release ( & pcb );
210 }
211 
214 {
215  guard.assertIdenticalMutex ( this->mutex );
216  os.~oldSubscription ();
217  this->subscriptionFreeList.release ( & os );
218 }
219 
221  caExceptionHandler * pfunc, void * arg )
222 {
223  epicsGuard < epicsMutex > guard ( this->mutex );
224  this->ca_exception_func = pfunc;
225  this->ca_exception_arg = arg;
226 // should block here until releated callback in progress completes
227 }
228 
230  caPrintfFunc * ca_printf_func )
231 {
232  epicsGuard < epicsMutex > guard ( this->mutex );
233  if ( ca_printf_func ) {
234  this->pVPrintfFunc = ca_printf_func;
235  }
236  else {
237  this->pVPrintfFunc = epicsVprintf;
238  }
239 // should block here until releated callback in progress completes
240 }
241 
243  CAFDHANDLER *pFunc, void *pArg )
244 {
245  epicsGuard < epicsMutex > guard ( this->mutex );
246  this->fdRegFunc = pFunc;
247  this->fdRegArg = pArg;
248  this->fdRegFuncNeedsToBeCalled = true;
249  if ( pFunc ) {
250  // the receive thread might already be blocking
251  // w/o having sent the wakeup message
252  this->_sendWakeupMsg ();
253  }
254 // should block here until releated callback in progress completes
255 }
256 
258  const char *pformat, ... ) const
259 {
260  va_list theArgs;
261  int status;
262 
263  va_start ( theArgs, pformat );
264 
265  status = this->ca_client_context :: varArgsPrintFormated ( pformat, theArgs );
266 
267  va_end ( theArgs );
268 
269  return status;
270 }
271 
273  const char *pformat, va_list args ) const
274 {
275  caPrintfFunc * pFunc;
276  {
277  epicsGuard < epicsMutex > guard ( this->mutex );
278  pFunc = this->pVPrintfFunc;
279  }
280  if ( pFunc ) {
281  return ( *pFunc ) ( pformat, args );
282  }
283  else {
284  return :: vfprintf ( stderr, pformat, args );
285  }
286 }
287 
288 void ca_client_context::exception (
289  epicsGuard < epicsMutex > & guard, int stat, const char * pCtx,
290  const char * pFile, unsigned lineNo )
291 {
292  struct exception_handler_args args;
293  caExceptionHandler * pFunc = this->ca_exception_func;
294  void * pArg = this->ca_exception_arg;
295  {
296  epicsGuardRelease < epicsMutex > unguard ( guard );
297  // NOOP if they disable exceptions
298  if ( pFunc ) {
299  args.chid = NULL;
300  args.type = TYPENOTCONN;
301  args.count = 0;
302  args.addr = NULL;
303  args.stat = stat;
304  args.op = CA_OP_OTHER;
305  args.ctx = pCtx;
306  args.pFile = pFile;
307  args.lineNo = lineNo;
308  args.usr = pArg;
309  ( *pFunc ) ( args );
310  }
311  else {
312  this->signal ( stat, pFile, lineNo, pCtx );
313  }
314  }
315 }
316 
317 void ca_client_context::exception (
318  epicsGuard < epicsMutex > & guard, int status, const char * pContext,
319  const char * pFileName, unsigned lineNo, oldChannelNotify & chan,
320  unsigned type, arrayElementCount count, unsigned op )
321 {
322  struct exception_handler_args args;
323  caExceptionHandler * pFunc = this->ca_exception_func;
324  void * pArg = this->ca_exception_arg;
325  {
326  epicsGuardRelease < epicsMutex > unguard ( guard );
327  // NOOP if they disable exceptions
328  if ( pFunc ) {
329  args.chid = &chan;
330  args.type = type;
331  args.count = count;
332  args.addr = NULL;
333  args.stat = status;
334  args.op = op;
335  args.ctx = pContext;
336  args.pFile = pFileName;
337  args.lineNo = lineNo;
338  args.usr = pArg;
339  ( *pFunc ) ( args );
340  }
341  else {
342  this->signal ( status, pFileName, lineNo,
343  "op=%u, channel=%s, type=%s, count=%lu, ctx=\"%s\"",
344  op, ca_name ( &chan ),
345  dbr_type_to_text ( static_cast <int> ( type ) ),
346  count, pContext );
347  }
348  }
349 }
350 
351 void ca_client_context::signal ( int ca_status, const char * pfilenm,
352  int lineno, const char * pFormat, ... )
353 {
354  va_list theArgs;
355  va_start ( theArgs, pFormat );
356  this->vSignal ( ca_status, pfilenm, lineno, pFormat, theArgs);
357  va_end ( theArgs );
358 }
359 
361  int ca_status, const char *pfilenm,
362  int lineno, const char *pFormat, va_list args )
363 {
364  static const char *severity[] =
365  {
366  "Warning",
367  "Success",
368  "Error",
369  "Info",
370  "Fatal",
371  "Fatal",
372  "Fatal",
373  "Fatal"
374  };
375 
376  this->printFormated ( "CA.Client.Exception...............................................\n" );
377 
378  this->printFormated ( " %s: \"%s\"\n",
379  severity[ CA_EXTRACT_SEVERITY ( ca_status ) ],
380  ca_message ( ca_status ) );
381 
382  if ( pFormat ) {
383  this->printFormated ( " Context: \"" );
384  this->varArgsPrintFormated ( pFormat, args );
385  this->printFormated ( "\"\n" );
386  }
387 
388  if ( pfilenm ) {
389  this->printFormated ( " Source File: %s line %d\n",
390  pfilenm, lineno );
391  }
392 
393  epicsTime current = epicsTime::getCurrent ();
394  char date[64];
395  current.strftime ( date, sizeof ( date ), "%a %b %d %Y %H:%M:%S.%f");
396  this->printFormated ( " Current Time: %s\n", date );
397 
398  /*
399  * Terminate execution if unsuccessful
400  */
401  if( ! ( ca_status & CA_M_SUCCESS ) &&
402  CA_EXTRACT_SEVERITY ( ca_status ) != CA_K_WARNING ){
403  errlogFlush ();
404  abort ();
405  }
406 
407  this->printFormated ( "..................................................................\n" );
408 }
409 
410 void ca_client_context::show ( unsigned level ) const
411 {
412  epicsGuard < epicsMutex > guard ( this->mutex );
413 
414  ::printf ( "ca_client_context at %p pndRecvCnt=%u ioSeqNo=%u\n",
415  static_cast <const void *> ( this ),
416  this->pndRecvCnt, this->ioSeqNo );
417 
418  if ( level > 0u ) {
419  this->pServiceContext->show ( guard, level - 1u );
420  ::printf ( "\tpreemptive callback is %s\n",
421  this->pCallbackGuard.get() ? "disabled" : "enabled" );
422  ::printf ( "\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
423  this->pndRecvCnt );
424  ::printf ( "\tthe current io sequence number is %u\n",
425  this->ioSeqNo );
426  ::printf ( "IO done event:\n");
427  this->ioDone.show ( level - 1u );
428  ::printf ( "Synchronous group identifier hash table:\n" );
429  this->sgTable.show ( level - 1u );
430  }
431 }
432 
433 void ca_client_context::attachToClientCtx ()
434 {
435  assert ( ! epicsThreadPrivateGet ( caClientContextId ) );
436  epicsThreadPrivateSet ( caClientContextId, this );
437 }
438 
440  epicsGuard < epicsMutex > & guard, unsigned ioSeqNoIn )
441 {
442  guard.assertIdenticalMutex ( this->mutex );
443  if ( this->ioSeqNo == ioSeqNoIn ) {
444  assert ( this->pndRecvCnt < UINT_MAX );
445  this->pndRecvCnt++;
446  }
447 }
448 
450  epicsGuard < epicsMutex > & guard, unsigned ioSeqNoIn )
451 {
452  guard.assertIdenticalMutex ( this->mutex );
453  if ( this->ioSeqNo == ioSeqNoIn ) {
454  assert ( this->pndRecvCnt > 0u );
455  this->pndRecvCnt--;
456  if ( this->pndRecvCnt == 0u ) {
457  this->ioDone.signal ();
458  }
459  }
460 }
461 
462 // !!!! This routine is only visible in the old interface - or in a new ST interface.
463 // !!!! In the old interface we restrict thread attach so that calls from threads
464 // !!!! other than the initializing thread are not allowed if preemptive callback
465 // !!!! is disabled. This prevents the preemptive callback lock from being released
466 // !!!! by other threads than the one that locked it.
467 //
468 int ca_client_context::pendIO ( const double & timeout )
469 {
470  // prevent recursion nightmares by disabling calls to
471  // pendIO () from within a CA callback.
472  if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
473  return ECA_EVDISALLOW;
474  }
475 
476  int status = ECA_NORMAL;
477  epicsTime beg_time = epicsTime::getCurrent ();
478  double remaining = timeout;
479 
480  epicsGuard < epicsMutex > guard ( this->mutex );
481 
482  this->flush ( guard );
483 
484  while ( this->pndRecvCnt > 0 ) {
485  if ( remaining < CAC_SIGNIFICANT_DELAY ) {
486  status = ECA_TIMEOUT;
487  break;
488  }
489 
490  {
491  epicsGuardRelease < epicsMutex > unguard ( guard );
492  this->blockForEventAndEnableCallbacks ( this->ioDone, remaining );
493  }
494 
495  double delay = epicsTime::getCurrent () - beg_time;
496  if ( delay < timeout ) {
497  remaining = timeout - delay;
498  }
499  else {
500  remaining = 0.0;
501  }
502  }
503 
504  this->ioSeqNo++;
505  this->pndRecvCnt = 0u;
506 
507  return status;
508 }
509 
510 // !!!! This routine is only visible in the old interface - or in a new ST interface.
511 // !!!! In the old interface we restrict thread attach so that calls from threads
512 // !!!! other than the initializing thread are not allowed if preemptive callback
513 // !!!! is disabled. This prevents the preemptive callback lock from being released
514 // !!!! by other threads than the one that locked it.
515 //
516 int ca_client_context::pendEvent ( const double & timeout )
517 {
518  // prevent recursion nightmares by disabling calls to
519  // pendIO () from within a CA callback.
520  if ( epicsThreadPrivateGet ( caClientCallbackThreadId ) ) {
521  return ECA_EVDISALLOW;
522  }
523 
524  epicsTime current = epicsTime::getCurrent ();
525 
526  {
527  epicsGuard < epicsMutex > guard ( this->mutex );
528  this->flush ( guard );
529  }
530 
531  // process at least once if preemptive callback is disabled
532  if ( this->pCallbackGuard.get() ) {
533  epicsGuardRelease < epicsMutex > cbUnguard ( *this->pCallbackGuard );
534  epicsGuard < epicsMutex > guard ( this->mutex );
535 
536  //
537  // This is needed because in non-preemptive callback mode
538  // legacy applications that use file descriptor managers
539  // will register for ca receive thread activity and keep
540  // calling ca_pend_event until all of the socket data has
541  // been read. We must guarantee that other threads get a
542  // chance to run if there is data in any of the sockets.
543  //
544  if ( this->fdRegFunc ) {
545  epicsGuardRelease < epicsMutex > unguard ( guard );
546 
547  // remove short udp message sent to wake
548  // up a file descriptor manager
549  osiSockAddr tmpAddr;
550  osiSocklen_t addrSize = sizeof ( tmpAddr.sa );
551  char buf = 0;
552  int status = 0;
553  do {
554  status = recvfrom ( this->sock, & buf, sizeof ( buf ),
555  0, & tmpAddr.sa, & addrSize );
556  } while ( status > 0 );
557  }
558  while ( this->callbackThreadsPending > 0 ) {
559  epicsGuardRelease < epicsMutex > unguard ( guard );
560  this->callbackThreadActivityComplete.wait ( 30.0 );
561  }
562  this->noWakeupSincePend = true;
563  }
564 
565  double elapsed = epicsTime::getCurrent() - current;
566  double delay;
567 
568  if ( timeout > elapsed ) {
569  delay = timeout - elapsed;
570  }
571  else {
572  delay = 0.0;
573  }
574 
575  if ( delay >= CAC_SIGNIFICANT_DELAY ) {
576  if ( this->pCallbackGuard.get() ) {
577  epicsGuardRelease < epicsMutex > unguard ( *this->pCallbackGuard );
578  epicsThreadSleep ( delay );
579  }
580  else {
581  epicsThreadSleep ( delay );
582  }
583  }
584 
585  return ECA_TIMEOUT;
586 }
587 
589  epicsEvent & event, const double & timeout )
590 {
591  if ( this->pCallbackGuard.get() ) {
592  epicsGuardRelease < epicsMutex > unguard ( *this->pCallbackGuard );
593  event.wait ( timeout );
594  }
595  else {
596  event.wait ( timeout );
597  }
598 }
599 
600 void ca_client_context::callbackProcessingInitiateNotify ()
601 {
602  // if preemptive callback is enabled then this is a noop
603  if ( this->pCallbackGuard.get() ) {
604  bool sendNeeded = false;
605  {
606  epicsGuard < epicsMutex > guard ( this->mutex );
607  this->callbackThreadsPending++;
608  if ( this->fdRegFunc && this->noWakeupSincePend ) {
609  this->noWakeupSincePend = false;
610  sendNeeded = true;
611  }
612  }
613  if ( sendNeeded ) {
614  _sendWakeupMsg ();
615  }
616  }
617 }
618 
619 void ca_client_context :: _sendWakeupMsg ()
620 {
621  // send short udp message to wake up a file descriptor manager
622  // when a message arrives
623  osiSockAddr tmpAddr;
624  tmpAddr.ia.sin_family = AF_INET;
625  tmpAddr.ia.sin_addr.s_addr = htonl ( INADDR_LOOPBACK );
626  tmpAddr.ia.sin_port = htons ( this->localPort );
627  char buf = 0;
628  sendto ( this->sock, & buf, sizeof ( buf ),
629  0, & tmpAddr.sa, sizeof ( tmpAddr.sa ) );
630 }
631 
632 void ca_client_context::callbackProcessingCompleteNotify ()
633 {
634  // if preemptive callback is enabled then this is a noop
635  if ( this->pCallbackGuard.get() ) {
636  bool signalNeeded = false;
637  {
638  epicsGuard < epicsMutex > guard ( this->mutex );
639  if ( this->callbackThreadsPending <= 1 ) {
640  if ( this->callbackThreadsPending == 1 ) {
641  this->callbackThreadsPending = 0;
642  signalNeeded = true;
643  }
644  }
645  else {
646  this->callbackThreadsPending--;
647  }
648  }
649  if ( signalNeeded ) {
650  this->callbackThreadActivityComplete.signal ();
651  }
652  }
653 }
654 
656  epicsGuard < epicsMutex > & guard, const char * pChannelName,
658 {
659  guard.assertIdenticalMutex ( this->mutex );
660  return this->pServiceContext->createChannel (
661  guard, pChannelName, chan, pri );
662 }
663 
665 {
666  this->pServiceContext->flush ( guard );
667 }
668 
670 {
671  epicsGuard < epicsMutex > guard ( this->mutex );
672  return this->pServiceContext->circuitCount ( guard );
673 }
674 
676 {
677  epicsGuard < epicsMutex > guard ( this->mutex );
678  return this->pServiceContext->beaconAnomaliesSinceProgramStart ( guard );
679 }
680 
682  epicsGuard < epicsMutex > & guard, CASG & sg )
683 {
684  guard.assertIdenticalMutex ( this->mutex );
685  this->sgTable.idAssignAdd ( sg );
686 }
687 
689  epicsGuard < epicsMutex > & guard, CASG & sg )
690 {
691  guard.assertIdenticalMutex ( this->mutex );
692  this->sgTable.remove ( sg );
693 }
694 
696  epicsGuard < epicsMutex > & guard, unsigned idIn )
697 {
698  guard.assertIdenticalMutex ( this->mutex );
699  CASG * psg = this->sgTable.lookup ( idIn );
700  if ( psg ) {
701  if ( ! psg->verify ( guard ) ) {
702  psg = 0;
703  }
704  }
705  return psg;
706 }
707 
709 {
710  epicsGuard < epicsMutex > guard ( this->mutex );
711  this->sgTable.verify ();
712  this->pServiceContext->selfTest ( guard );
713 }
714 
716 {
717  return this->mutex;
718 }
719 
720 cacContext & ca_client_context::createNetworkContext (
721  epicsMutex & mutexIn, epicsMutex & cbMutexIn )
722 {
723  return * new cac ( mutexIn, cbMutexIn, *this );
724 }
725 
727 {
728  epicsThreadOnce ( & cacOnce, cacOnceFunc, 0 );
729 
730  epicsGuard < epicsMutex > guard ( *ca_client_context::pDefaultServiceInstallMutex );
731  if ( ca_client_context::pDefaultService ) {
732  throw std::logic_error
733  ( "CA in-memory service already installed and can't be replaced");
734  }
735  ca_client_context::pDefaultService = & service;
736 }
737 
738 void epicsStdCall caInstallDefaultService ( cacService & service )
739 {
741 }
742 
743 LIBCA_API int epicsStdCall ca_clear_subscription ( evid pMon )
744 {
745  oldChannelNotify & chan = pMon->channel ();
746  ca_client_context & cac = chan.getClientCtx ();
747  // !!!! the order in which we take the mutex here prevents deadlocks
748  {
749  epicsGuard < epicsMutex > guard ( cac.mutex );
750  try {
751  // if this stalls out on a live circuit then an exception
752  // can be forthcoming which we must ignore as the clear
753  // request must always be successful
754  chan.eliminateExcessiveSendBacklog ( guard );
755  }
756  catch ( cacChannel::notConnected & ) {
757  // intentionally ignored
758  }
759  }
760  if ( cac.pCallbackGuard.get() &&
761  cac.createdByThread == epicsThreadGetIdSelf () ) {
762  epicsGuard < epicsMutex > guard ( cac.mutex );
763  pMon->cancel ( *cac.pCallbackGuard.get(), guard );
764  }
765  else {
766  //
767  // we will definately stall out here if all of the
768  // following are true
769  //
770  // o user creates non-preemtive mode client library context
771  // o user doesnt periodically call a ca function
772  // o user calls this function from an auxiillary thread
773  //
774  CallbackGuard cbGuard ( cac.cbMutex );
775  epicsGuard < epicsMutex > guard ( cac.mutex );
776  pMon->cancel ( cbGuard, guard );
777  }
778  return ECA_NORMAL;
779 }
780 
782  epicsGuard < epicsMutex > & guard, cacChannel & chan )
783 {
784  if ( chan.requestMessageBytesPending ( guard ) >
785  ca_client_context :: flushBlockThreshold ) {
786  if ( this->pCallbackGuard.get() &&
787  this->createdByThread == epicsThreadGetIdSelf () ) {
788  // we need to be very careful about lock hierarchy
789  // inversion in this situation
790  epicsGuardRelease < epicsMutex > unguard ( guard );
791  {
793  * this->pCallbackGuard.get() );
794  {
795  epicsGuard < epicsMutex > nestedGuard ( this->mutex );
796  chan.flush ( nestedGuard );
797  }
798  }
799  }
800  else {
801  chan.flush ( guard );
802  }
803  }
804 }
#define ECA_EVDISALLOW
Definition: caerr.h:103
void exception epicsGuard< epicsMutex > int status
Definition: oldAccess.h:311
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned type
Definition: oldAccess.h:314
#define INADDR_LOOPBACK
Definition: osdSock.h:76
void errlogFlush(void)
Definition: errlog.c:529
#define CA_M_SUCCESS
Definition: caerr.h:41
double timeout
Definition: pvutils.cpp:25
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
void exception epicsGuard< epicsMutex > int const char const char * pFileName
Definition: oldAccess.h:311
void destroySubscription(epicsGuard< epicsMutex > &, oldSubscription &)
Definition: cac.h:97
void installCASG(epicsGuard< epicsMutex > &, CASG &)
void epicsStdCall caInstallDefaultService(cacService &service)
#define INVALID_SOCKET
Definition: osdSock.h:32
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
int osiSocklen_t
Definition: osdSock.h:36
#define true
Definition: flexdef.h:84
void uninstallCASG(epicsGuard< epicsMutex > &, CASG &)
int lineno
Definition: antelope.c:33
void osiSockRelease()
Definition: osdSock.c:62
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned arrayElementCount count
Definition: oldAccess.h:314
struct sockaddr sa
Definition: osiSock.h:158
LIBCOM_API void *epicsStdCall epicsThreadPrivateGet(epicsThreadPrivateId)
Definition: osdThread.c:973
int errlogVprintf(const char *pFormat, va_list pvar)
Definition: errlog.c:144
ca_client_context & getClientCtx()
Definition: oldAccess.h:428
struct sockaddr_in ia
Definition: osiSock.h:157
void destroyGetCallback(epicsGuard< epicsMutex > &, getCallback &)
void caExceptionHandler(struct exception_handler_args)
Definition: cadef.h:251
const char * ctx
Definition: cadef.h:108
void incrementOutstandingIO(epicsGuard< epicsMutex > &, unsigned ioSeqNo)
#define printf
Definition: epicsStdio.h:41
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned arrayElementCount unsigned op
Definition: oldAccess.h:314
const char * pFile
Definition: cadef.h:109
int printFormated(const char *pformat,...) const
void destroyGetCopy(epicsGuard< epicsMutex > &, getCopy &)
void assertIdenticalMutex(const T &) const
Definition: epicsGuard.h:80
#define NULL
Definition: catime.c:38
~getCopy()
Definition: getCopy.cpp:47
epicsMutex & mutexRef() const
friend int epicsStdCall ca_clear_subscription(evid pMon)
epicsThreadPrivateId caClientCallbackThreadId
unsigned priLev
Definition: cacIO.h:165
#define CA_K_WARNING
Definition: caerr.h:34
T * lookup(const ID &idIn) const
Definition: resourceLib.h:342
void CAFDHANDLER(void *parg, int fd, int opened)
Definition: cadef.h:657
#define dbr_type_to_text(type)
Definition: db_access.h:687
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define CA_OP_OTHER
Definition: cadef.h:123
int varArgsPrintFormated(const char *pformat, va_list args) const
#define socket_ioctl(A, B, C)
Definition: osdSock.h:34
friend void cacOnceFunc(void *)
bool verify(epicsGuard< epicsMutex > &) const
Definition: CASG.cpp:54
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
virtual void flush(epicsGuard< epicsMutex > &mutualExclusionGuard)=0
#define EPICS_THREAD_ONCE_INIT
Definition: epicsThread.h:109
ca_client_context(bool enablePreemptiveCallback=false)
#define ECA_NORMAL
Definition: caerr.h:77
#define epicsVprintf
Definition: errlog.h:52
unsigned beaconAnomaliesSinceProgramStart() const
int osiSockIoctl_t
Definition: osdSock.h:35
virtual unsigned requestMessageBytesPending(epicsGuard< epicsMutex > &mutualExclusionGuard)=0
void idAssignAdd(ITEM &item)
Definition: resourceLib.h:964
epicsMutex mutex
Definition: pvAccess.cpp:71
LIBCOM_API void epicsStdCall epicsThreadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC, void *arg)
LIBCOM_API void epicsStdCall epicsThreadPrivateSet(epicsThreadPrivateId, void *)
Definition: osdThread.c:961
void cacOnceFunc(void *)
Extended replacement for the Posix exit and atexit routines.
void cancel(CallbackGuard &callbackGuard, epicsGuard< epicsMutex > &mutualExclusionGuard)
Definition: oldAccess.h:502
LIBCA_API const char *epicsStdCall ca_name(chid chan)
void exception epicsGuard< epicsMutex > int const char * pContext
Definition: oldAccess.h:311
static void installDefaultService(cacService &)
int caPrintfFunc(const char *pformat, va_list args)
Definition: cadef.h:821
epicsThreadPrivateId caClientContextId
Definition: access.cpp:46
void changeExceptionEvent(caExceptionHandler *pfunc, void *arg)
void date(const char *format)
void exception epicsGuard< epicsMutex > int const char const char unsigned lineNo
Definition: oldAccess.h:311
void destroyPutCallback(epicsGuard< epicsMutex > &, putCallback &)
int pendEvent(const double &timeout)
T * remove(const ID &idIn)
Definition: resourceLib.h:297
unsigned long arrayElementCount
Definition: cacIO.h:57
const char *epicsStdCall ca_message(long ca_status)
Definition: access.cpp:561
oldChannelNotify & channel() const
Definition: oldAccess.h:509
#define TYPENOTCONN
Definition: cadef.h:149
void show(unsigned level) const
Definition: resourceLib.h:371
void flush(epicsGuard< epicsMutex > &)
CASG * lookupCASG(epicsGuard< epicsMutex > &, unsigned id)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
unsigned circuitCount() const
void release(void *p)
Definition: tsFreeList.h:176
#define ECA_TIMEOUT
Definition: caerr.h:87
void eliminateExcessiveSendBacklog(epicsGuard< epicsMutex > &, cacChannel &)
int osiSockAttach()
Definition: osdSock.c:54
void decrementOutstandingIO(epicsGuard< epicsMutex > &, unsigned ioSeqNo)
#define stderr
Definition: epicsStdio.h:32
#define CAC_SIGNIFICANT_DELAY
Definition: iocinf.h:36
void replaceErrLogHandler(caPrintfFunc *ca_printf_func)
void registerForFileDescriptorCallBack(CAFDHANDLER *pFunc, void *pArg)
#define CA_EXTRACT_SEVERITY(code)
Definition: caerr.h:56
void eliminateExcessiveSendBacklog(epicsGuard< epicsMutex > &)
Definition: oldAccess.h:468
int pendIO(const double &timeout)
void vSignal(int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args)
#define throwWithLocation(parm)
LIBCOM_API epicsThreadPrivateId epicsStdCall epicsThreadPrivateCreate(void)
Definition: osdThread.c:934
cacChannel & createChannel(epicsGuard< epicsMutex > &, const char *pChannelName, cacChannelNotify &, cacChannel::priLev pri)
void verify() const
Definition: resourceLib.h:432
#define false
Definition: flexdef.h:85
void blockForEventAndEnableCallbacks(epicsEvent &event, const double &timeout)
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify & chan
Definition: oldAccess.h:314
void signal(int ca_status, const char *pfilenm, int lineno, const char *pFormat,...)
void show(unsigned level) const
LIBCOM_API epicsThreadId epicsStdCall epicsThreadGetIdSelf(void)
Definition: osdThread.c:810