26 # pragma warning(disable:4355) 45 const unsigned ca_client_context :: flushBlockThreshold = 0x58000;
51 assert ( caClientCallbackThreadId );
52 ca_client_context::pDefaultServiceInstallMutex = newEpicsMutex;
57 cacService * ca_client_context::pDefaultService = 0;
58 epicsMutex * ca_client_context::pDefaultServiceInstallMutex;
61 mutex(__FILE__, __LINE__),
62 cbMutex(__FILE__, __LINE__),
64 ca_exception_func ( 0 ), ca_exception_arg ( 0 ),
65 pVPrintfFunc (
errlogVprintf ), fdRegFunc ( 0 ), fdRegArg ( 0 ),
66 pndRecvCnt ( 0u ), ioSeqNo ( 0u ), callbackThreadsPending ( 0u ),
67 localPort ( 0 ), fdRegFuncNeedsToBeCalled (
false ),
68 noWakeupSincePend (
true )
70 static const unsigned short PORT_ANY = 0u;
79 if ( ca_client_context::pDefaultService ) {
80 this->pServiceContext.reset (
81 & ca_client_context::pDefaultService->contextCreate (
82 this->mutex, this->cbMutex, *
this ) );
85 this->pServiceContext.reset (
new cac ( this->mutex, this->cbMutex, *
this ) );
94 "ca_client_context: unable to create " 95 "datagram socket because = \"%s\"\n",
109 "%s: non blocking IO set fail because \"%s\"\n",
110 __FILE__, sockErrBuf );
119 memset ( (
char *)&addr, 0 ,
sizeof ( addr ) );
120 addr.
ia.sin_family = AF_INET;
121 addr.
ia.sin_addr.s_addr = htonl ( INADDR_ANY );
122 addr.
ia.sin_port = htons ( PORT_ANY );
123 int status = bind (this->sock, &addr.
sa, sizeof (addr) );
129 "CAC: unable to bind to an unconstrained " 130 "address because = \"%s\"\n",
139 int status = getsockname ( this->sock, & tmpAddr.
sa, & saddr_length );
144 this->
printFormated (
"CAC: getsockname () error was \"%s\"\n", sockErrBuf );
147 if ( tmpAddr.
sa.sa_family != AF_INET) {
149 this->
printFormated (
"CAC: UDP socket was not inet addr family\n" );
152 this->localPort = htons ( tmpAddr.
ia.sin_port );
155 std::auto_ptr < CallbackGuard > pCBGuard;
156 if ( ! enablePreemptiveCallback ) {
161 this->pCallbackGuard = pCBGuard;
166 if ( this->fdRegFunc ) {
168 ( this->fdRegArg, this->sock,
false );
179 if ( this->pCallbackGuard.get() ) {
181 this->pServiceContext.reset ( 0 );
184 this->pServiceContext.reset ( 0 );
193 this->getCopyFreeList.
release ( & gc );
201 this->getCallbackFreeList.
release ( & gcb );
209 this->putCallbackFreeList.
release ( & pcb );
217 this->subscriptionFreeList.
release ( & os );
224 this->ca_exception_func = pfunc;
225 this->ca_exception_arg = arg;
233 if ( ca_printf_func ) {
234 this->pVPrintfFunc = ca_printf_func;
246 this->fdRegFunc = pFunc;
247 this->fdRegArg = pArg;
248 this->fdRegFuncNeedsToBeCalled =
true;
252 this->_sendWakeupMsg ();
258 const char *pformat, ... )
const 263 va_start ( theArgs, pformat );
273 const char *pformat, va_list args )
const 278 pFunc = this->pVPrintfFunc;
281 return ( *pFunc ) ( pformat, args );
284 return :: vfprintf (
stderr, pformat, args );
288 void ca_client_context::exception (
290 const char * pFile,
unsigned lineNo )
294 void * pArg = this->ca_exception_arg;
317 void ca_client_context::exception (
324 void * pArg = this->ca_exception_arg;
343 "op=%u, channel=%s, type=%s, count=%lu, ctx=\"%s\"",
352 int lineno,
const char * pFormat, ... )
355 va_start ( theArgs, pFormat );
356 this->
vSignal ( ca_status, pfilenm, lineno, pFormat, theArgs);
361 int ca_status,
const char *pfilenm,
362 int lineno,
const char *pFormat, va_list args )
364 static const char *severity[] =
376 this->
printFormated (
"CA.Client.Exception...............................................\n" );
393 epicsTime current = epicsTime::getCurrent ();
395 current.strftime ( date,
sizeof ( date ),
"%a %b %d %Y %H:%M:%S.%f");
407 this->
printFormated (
"..................................................................\n" );
414 ::printf (
"ca_client_context at %p pndRecvCnt=%u ioSeqNo=%u\n",
415 static_cast <const void *> (
this ),
416 this->pndRecvCnt, this->ioSeqNo );
419 this->pServiceContext->show ( guard, level - 1u );
420 ::printf (
"\tpreemptive callback is %s\n",
421 this->pCallbackGuard.get() ?
"disabled" :
"enabled" );
422 ::printf (
"\tthere are %u unsatisfied IO operations blocking ca_pend_io()\n",
424 ::printf (
"\tthe current io sequence number is %u\n",
427 this->ioDone.show ( level - 1u );
428 ::printf (
"Synchronous group identifier hash table:\n" );
429 this->sgTable.
show ( level - 1u );
433 void ca_client_context::attachToClientCtx ()
443 if ( this->ioSeqNo == ioSeqNoIn ) {
444 assert ( this->pndRecvCnt < UINT_MAX );
453 if ( this->ioSeqNo == ioSeqNoIn ) {
454 assert ( this->pndRecvCnt > 0u );
456 if ( this->pndRecvCnt == 0u ) {
457 this->ioDone.signal ();
477 epicsTime beg_time = epicsTime::getCurrent ();
482 this->
flush ( guard );
484 while ( this->pndRecvCnt > 0 ) {
495 double delay = epicsTime::getCurrent () - beg_time;
496 if ( delay < timeout ) {
497 remaining = timeout - delay;
505 this->pndRecvCnt = 0u;
524 epicsTime current = epicsTime::getCurrent ();
528 this->
flush ( guard );
532 if ( this->pCallbackGuard.get() ) {
544 if ( this->fdRegFunc ) {
554 status = recvfrom ( this->sock, & buf,
sizeof ( buf ),
555 0, & tmpAddr.
sa, & addrSize );
556 }
while ( status > 0 );
558 while ( this->callbackThreadsPending > 0 ) {
560 this->callbackThreadActivityComplete.wait ( 30.0 );
562 this->noWakeupSincePend =
true;
565 double elapsed = epicsTime::getCurrent() - current;
568 if ( timeout > elapsed ) {
569 delay = timeout - elapsed;
576 if ( this->pCallbackGuard.get() ) {
591 if ( this->pCallbackGuard.get() ) {
593 event.wait ( timeout );
596 event.wait ( timeout );
600 void ca_client_context::callbackProcessingInitiateNotify ()
603 if ( this->pCallbackGuard.get() ) {
604 bool sendNeeded =
false;
607 this->callbackThreadsPending++;
608 if ( this->fdRegFunc && this->noWakeupSincePend ) {
609 this->noWakeupSincePend =
false;
619 void ca_client_context :: _sendWakeupMsg ()
624 tmpAddr.
ia.sin_family = AF_INET;
626 tmpAddr.
ia.sin_port = htons ( this->localPort );
628 sendto ( this->sock, & buf,
sizeof ( buf ),
629 0, & tmpAddr.
sa, sizeof ( tmpAddr.
sa ) );
632 void ca_client_context::callbackProcessingCompleteNotify ()
635 if ( this->pCallbackGuard.get() ) {
636 bool signalNeeded =
false;
639 if ( this->callbackThreadsPending <= 1 ) {
640 if ( this->callbackThreadsPending == 1 ) {
641 this->callbackThreadsPending = 0;
646 this->callbackThreadsPending--;
649 if ( signalNeeded ) {
650 this->callbackThreadActivityComplete.signal ();
660 return this->pServiceContext->createChannel (
661 guard, pChannelName, chan, pri );
666 this->pServiceContext->flush ( guard );
672 return this->pServiceContext->circuitCount ( guard );
678 return this->pServiceContext->beaconAnomaliesSinceProgramStart ( guard );
692 this->sgTable.
remove ( sg );
701 if ( ! psg->
verify ( guard ) ) {
712 this->pServiceContext->selfTest ( guard );
720 cacContext & ca_client_context::createNetworkContext (
723 return *
new cac ( mutexIn, cbMutexIn, *
this );
731 if ( ca_client_context::pDefaultService ) {
732 throw std::logic_error
733 (
"CA in-memory service already installed and can't be replaced");
735 ca_client_context::pDefaultService = & service;
760 if ( cac.pCallbackGuard.get() &&
763 pMon->
cancel ( *cac.pCallbackGuard.get(), guard );
776 pMon->
cancel ( cbGuard, guard );
785 ca_client_context :: flushBlockThreshold ) {
786 if ( this->pCallbackGuard.get() &&
793 * this->pCallbackGuard.get() );
796 chan.
flush ( nestedGuard );
801 chan.
flush ( guard );
void exception epicsGuard< epicsMutex > int status
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned type
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
void exception epicsGuard< epicsMutex > int const char const char * pFileName
void destroySubscription(epicsGuard< epicsMutex > &, oldSubscription &)
void installCASG(epicsGuard< epicsMutex > &, CASG &)
void epicsStdCall caInstallDefaultService(cacService &service)
#define assert(exp)
Declare that a condition should be true.
void uninstallCASG(epicsGuard< epicsMutex > &, CASG &)
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned arrayElementCount count
LIBCOM_API void *epicsStdCall epicsThreadPrivateGet(epicsThreadPrivateId)
int errlogVprintf(const char *pFormat, va_list pvar)
ca_client_context & getClientCtx()
void destroyGetCallback(epicsGuard< epicsMutex > &, getCallback &)
void caExceptionHandler(struct exception_handler_args)
void incrementOutstandingIO(epicsGuard< epicsMutex > &, unsigned ioSeqNo)
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify unsigned arrayElementCount unsigned op
int printFormated(const char *pformat,...) const
void destroyGetCopy(epicsGuard< epicsMutex > &, getCopy &)
void assertIdenticalMutex(const T &) const
epicsMutex & mutexRef() const
friend int epicsStdCall ca_clear_subscription(evid pMon)
epicsThreadPrivateId caClientCallbackThreadId
T * lookup(const ID &idIn) const
void CAFDHANDLER(void *parg, int fd, int opened)
#define dbr_type_to_text(type)
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
int varArgsPrintFormated(const char *pformat, va_list args) const
#define socket_ioctl(A, B, C)
friend void cacOnceFunc(void *)
bool verify(epicsGuard< epicsMutex > &) const
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
virtual void flush(epicsGuard< epicsMutex > &mutualExclusionGuard)=0
#define EPICS_THREAD_ONCE_INIT
ca_client_context(bool enablePreemptiveCallback=false)
unsigned beaconAnomaliesSinceProgramStart() const
virtual unsigned requestMessageBytesPending(epicsGuard< epicsMutex > &mutualExclusionGuard)=0
void idAssignAdd(ITEM &item)
LIBCOM_API void epicsStdCall epicsThreadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC, void *arg)
LIBCOM_API void epicsStdCall epicsThreadPrivateSet(epicsThreadPrivateId, void *)
Extended replacement for the Posix exit and atexit routines.
void cancel(CallbackGuard &callbackGuard, epicsGuard< epicsMutex > &mutualExclusionGuard)
LIBCA_API const char *epicsStdCall ca_name(chid chan)
void exception epicsGuard< epicsMutex > int const char * pContext
static void installDefaultService(cacService &)
int caPrintfFunc(const char *pformat, va_list args)
epicsThreadPrivateId caClientContextId
void changeExceptionEvent(caExceptionHandler *pfunc, void *arg)
void date(const char *format)
void exception epicsGuard< epicsMutex > int const char const char unsigned lineNo
void destroyPutCallback(epicsGuard< epicsMutex > &, putCallback &)
int pendEvent(const double &timeout)
T * remove(const ID &idIn)
unsigned long arrayElementCount
const char *epicsStdCall ca_message(long ca_status)
oldChannelNotify & channel() const
virtual ~ca_client_context()
void show(unsigned level) const
void flush(epicsGuard< epicsMutex > &)
CASG * lookupCASG(epicsGuard< epicsMutex > &, unsigned id)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
unsigned circuitCount() const
void eliminateExcessiveSendBacklog(epicsGuard< epicsMutex > &, cacChannel &)
void decrementOutstandingIO(epicsGuard< epicsMutex > &, unsigned ioSeqNo)
#define CAC_SIGNIFICANT_DELAY
void replaceErrLogHandler(caPrintfFunc *ca_printf_func)
void registerForFileDescriptorCallBack(CAFDHANDLER *pFunc, void *pArg)
#define CA_EXTRACT_SEVERITY(code)
void eliminateExcessiveSendBacklog(epicsGuard< epicsMutex > &)
int pendIO(const double &timeout)
void vSignal(int ca_status, const char *pfilenm, int lineno, const char *pFormat, va_list args)
#define throwWithLocation(parm)
LIBCOM_API epicsThreadPrivateId epicsStdCall epicsThreadPrivateCreate(void)
cacChannel & createChannel(epicsGuard< epicsMutex > &, const char *pChannelName, cacChannelNotify &, cacChannel::priLev pri)
void blockForEventAndEnableCallbacks(epicsEvent &event, const double &timeout)
void exception epicsGuard< epicsMutex > int const char const char unsigned oldChannelNotify & chan
void signal(int ca_status, const char *pfilenm, int lineno, const char *pFormat,...)
void show(unsigned level) const
LIBCOM_API epicsThreadId epicsStdCall epicsThreadGetIdSelf(void)