This is Unofficial EPICS BASE Doxygen Site
caChannel.cpp
Go to the documentation of this file.
1 
8 #include <epicsVersion.h>
9 
10 #include <pv/standardField.h>
11 #include <pv/logger.h>
12 #include <pv/pvAccess.h>
13 #include "channelConnectThread.h"
14 #include "monitorEventThread.h"
15 #include "getDoneThread.h"
16 #include "putDoneThread.h"
17 
18 #define epicsExportSharedSymbols
19 #include "caChannel.h"
20 
21 using namespace epics::pvData;
22 using std::string;
23 using std::cout;
24 using std::cerr;
25 using std::endl;
26 
27 namespace epics {
28 namespace pvAccess {
29 namespace ca {
30 
31 #define EXCEPTION_GUARD(code) try { code; } \
32  catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
33  catch (...) { LOG(logLevelError, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
34 
35 
36 CAChannel::shared_pointer CAChannel::create(CAChannelProvider::shared_pointer const & channelProvider,
37  std::string const & channelName,
38  short priority,
39  ChannelRequester::shared_pointer const & channelRequester)
40 {
41  if(DEBUG_LEVEL>0) {
42  cout<< "CAChannel::create " << channelName << endl;
43  }
44  CAChannelPtr caChannel(
45  new CAChannel(channelName, channelProvider, channelRequester));
46  caChannel->activate(priority);
47  return caChannel;
48 }
49 
50 static void ca_connection_handler(struct connection_handler_args args)
51 {
52  CAChannel *channel = static_cast<CAChannel*>(ca_puser(args.chid));
53 
54  if (args.op == CA_OP_CONN_UP) {
55  channel->connect(true);
56  } else if (args.op == CA_OP_CONN_DOWN) {
57  channel->connect(false);
58  }
59 }
60 
61 void CAChannel::connect(bool isConnected)
62 {
63  if(DEBUG_LEVEL>0) {
64  cout<< "CAChannel::connect " << channelName << endl;
65  }
66  {
67  Lock lock(requestsMutex);
68  channelConnected = isConnected;
69  }
70  channelConnectThread->channelConnected(notifyChannelRequester);
71 }
72 
73 void CAChannel::notifyClient()
74 {
75  if(DEBUG_LEVEL>0) {
76  cout<< "CAChannel::notifyClient " << channelName << endl;
77  }
78  CAChannelProviderPtr provider(channelProvider.lock());
79  if(!provider) return;
80  bool isConnected = false;
81  {
82  Lock lock(requestsMutex);
83  isConnected = channelConnected;
84  }
85  if(!isConnected) {
86  ChannelRequester::shared_pointer req(channelRequester.lock());
87  if(req) {
88  EXCEPTION_GUARD(req->channelStateChange(
89  shared_from_this(), Channel::DISCONNECTED));
90  }
91  return;
92  }
93  while(!getFieldQueue.empty()) {
94  getFieldQueue.front()->activate();
95  getFieldQueue.pop();
96  }
97  while(!putQueue.empty()) {
98  putQueue.front()->activate();
99  putQueue.pop();
100  }
101  while(!getQueue.empty()) {
102  getQueue.front()->activate();
103  getQueue.pop();
104  }
105  while(!monitorQueue.empty()) {
106  CAChannelMonitorPtr monitor(monitorQueue.front());
107  monitor->activate();
108  addMonitor(monitor);
109  monitorQueue.pop();
110  }
111  ChannelRequester::shared_pointer req(channelRequester.lock());
112  if(req) {
113  EXCEPTION_GUARD(req->channelStateChange(
114  shared_from_this(), Channel::CONNECTED));
115  }
116 }
117 
118 
119 CAChannel::CAChannel(std::string const & channelName,
120  CAChannelProvider::shared_pointer const & channelProvider,
121  ChannelRequester::shared_pointer const & channelRequester) :
122  channelName(channelName),
123  channelProvider(channelProvider),
124  channelRequester(channelRequester),
125  channelID(0),
126  channelCreated(false),
127  channelConnected(false),
128  channelConnectThread(ChannelConnectThread::get())
129 {
130  if(DEBUG_LEVEL>0) {
131  cout<< "CAChannel::CAChannel " << channelName << endl;
132  }
133 }
134 
135 void CAChannel::activate(short priority)
136 {
137  ChannelRequester::shared_pointer req(channelRequester.lock());
138  if(!req) return;
139  if(DEBUG_LEVEL>0) {
140  cout<< "CAChannel::activate " << channelName << endl;
141  }
142  notifyChannelRequester = NotifyChannelRequesterPtr(new NotifyChannelRequester());
143  notifyChannelRequester->setChannel(shared_from_this());
144  attachContext();
145  int result = ca_create_channel(channelName.c_str(),
146  ca_connection_handler,
147  this,
148  priority, // TODO mapping
149  &channelID);
150  if (result == ECA_NORMAL)
151  {
152  channelCreated = true;
153  CAChannelProviderPtr provider(channelProvider.lock());
154  if(provider) provider->addChannel(shared_from_this());
155  EXCEPTION_GUARD(req->channelCreated(Status::Ok, shared_from_this()));
156  } else {
157  Status errorStatus(Status::STATUSTYPE_ERROR, string(ca_message(result)));
158  EXCEPTION_GUARD(req->channelCreated(errorStatus, shared_from_this()));
159  }
160 }
161 
162 CAChannel::~CAChannel()
163 {
164  if(DEBUG_LEVEL>0) {
165  cout << "CAChannel::~CAChannel() " << channelName
166  << " channelCreated " << (channelCreated ? "true" : "false")
167  << endl;
168  }
169  {
170  Lock lock(requestsMutex);
171  if(!channelCreated) return;
172  }
173  disconnectChannel();
174 }
175 
176 void CAChannel::disconnectChannel()
177 {
178  if(DEBUG_LEVEL>0) {
179  cout << "CAChannel::disconnectChannel() "
180  << channelName
181  << " channelCreated " << (channelCreated ? "true" : "false")
182  << endl;
183  }
184  {
185  Lock lock(requestsMutex);
186  if(!channelCreated) return;
187  channelCreated = false;
188  }
189  std::vector<CAChannelMonitorWPtr>::iterator it;
190  for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
191  {
192  CAChannelMonitorPtr mon = (*it).lock();
193  if(!mon) continue;
194  mon->stop();
195  }
196  monitorlist.resize(0);
197  /* Clear CA Channel */
198  CAChannelProviderPtr provider(channelProvider.lock());
199  if(provider) {
200  std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
201  }
202  int result = ca_clear_channel(channelID);
203  if (result == ECA_NORMAL) return;
204  string mess("CAChannel::disconnectChannel() ");
205  mess += ca_message(result);
206  cerr << mess << endl;
207 }
208 
209 chid CAChannel::getChannelID()
210 {
211  return channelID;
212 }
213 
214 std::tr1::shared_ptr<ChannelProvider> CAChannel::getProvider()
215 {
216  return channelProvider.lock();
217 }
218 
219 
220 std::string CAChannel::getRemoteAddress()
221 {
222  return std::string(ca_host_name(channelID));
223 }
224 
225 
226 static Channel::ConnectionState cs2CS[] =
227 {
228  Channel::NEVER_CONNECTED, // cs_never_conn
229  Channel::DISCONNECTED, // cs_prev_conn
230  Channel::CONNECTED, // cs_conn
231  Channel::DESTROYED // cs_closed
232 };
233 
234 Channel::ConnectionState CAChannel::getConnectionState()
235 {
236  return cs2CS[ca_state(channelID)];
237 }
238 
239 
240 std::string CAChannel::getChannelName()
241 {
242  return channelName;
243 }
244 
245 
246 std::tr1::shared_ptr<ChannelRequester> CAChannel::getChannelRequester()
247 {
248  return channelRequester.lock();
249 }
250 
251 void CAChannel::getField(GetFieldRequester::shared_pointer const & requester,
252  std::string const & subField)
253 {
254  if(DEBUG_LEVEL>0) {
255  cout << "CAChannel::getField " << channelName << endl;
256  }
257  CAChannelGetFieldPtr getField(
258  new CAChannelGetField(shared_from_this(),requester,subField));
259  {
260  Lock lock(requestsMutex);
261  if(getConnectionState()!=Channel::CONNECTED) {
262  getFieldQueue.push(getField);
263  return;
264  }
265  }
266  getField->callRequester(shared_from_this());
267 }
268 
269 
270 AccessRights CAChannel::getAccessRights(PVField::shared_pointer const & /*pvField*/)
271 {
272  if (ca_write_access(channelID))
273  return readWrite;
274  else if (ca_read_access(channelID))
275  return read;
276  else
277  return none;
278 }
279 
280 
281 ChannelGet::shared_pointer CAChannel::createChannelGet(
282  ChannelGetRequester::shared_pointer const & channelGetRequester,
283  PVStructure::shared_pointer const & pvRequest)
284 {
285  if(DEBUG_LEVEL>0) {
286  cout << "CAChannel::createChannelGet " << channelName << endl;
287  }
288  CAChannelGetPtr channelGet =
289  CAChannelGet::create(shared_from_this(), channelGetRequester, pvRequest);
290  {
291  Lock lock(requestsMutex);
292  if(getConnectionState()!=Channel::CONNECTED) {
293  getQueue.push(channelGet);
294  return channelGet;
295  }
296  }
297  channelGet->activate();
298  return channelGet;
299 }
300 
301 
302 ChannelPut::shared_pointer CAChannel::createChannelPut(
303  ChannelPutRequester::shared_pointer const & channelPutRequester,
304  PVStructure::shared_pointer const & pvRequest)
305 {
306  if(DEBUG_LEVEL>0) {
307  cout << "CAChannel::createChannelPut " << channelName << endl;
308  }
309  CAChannelPutPtr channelPut =
310  CAChannelPut::create(shared_from_this(), channelPutRequester, pvRequest);
311  {
312  Lock lock(requestsMutex);
313  if(getConnectionState()!=Channel::CONNECTED) {
314  putQueue.push(channelPut);
315  return channelPut;
316  }
317  }
318  channelPut->activate();
319  return channelPut;
320 }
321 
322 
323 Monitor::shared_pointer CAChannel::createMonitor(
324  MonitorRequester::shared_pointer const & monitorRequester,
325  PVStructure::shared_pointer const & pvRequest)
326 {
327  if(DEBUG_LEVEL>0) {
328  cout << "CAChannel::createMonitor " << channelName << endl;
329  }
330  CAChannelMonitorPtr channelMonitor =
331  CAChannelMonitor::create(shared_from_this(), monitorRequester, pvRequest);
332  {
333  Lock lock(requestsMutex);
334  if(getConnectionState()!=Channel::CONNECTED) {
335  monitorQueue.push(channelMonitor);
336  return channelMonitor;
337  }
338  }
339  channelMonitor->activate();
340  addMonitor(channelMonitor);
341  return channelMonitor;
342 }
343 
344 void CAChannel::addMonitor(CAChannelMonitorPtr const & monitor)
345 {
346  std::vector<CAChannelMonitorWPtr>::iterator it;
347  for(it = monitorlist.begin(); it!=monitorlist.end(); ++it)
348  {
349  CAChannelMonitorWPtr mon = *it;
350  if(mon.lock()) continue;
351  mon = monitor;
352  return;
353  }
354  monitorlist.push_back(monitor);
355 }
356 
357 void CAChannel::printInfo(std::ostream& out)
358 {
359  out << "CHANNEL : " << getChannelName() << std::endl;
360 
361  ConnectionState state = getConnectionState();
362  out << "STATE : " << ConnectionStateNames[state] << std::endl;
363  if (state == CONNECTED)
364  {
365  out << "ADDRESS : " << getRemoteAddress() << std::endl;
366  //out << "RIGHTS : " << getAccessRights() << std::endl;
367  }
368 }
369 
370 
371 CAChannelGetField::CAChannelGetField(
372  CAChannelPtr const &channel,
373  GetFieldRequester::shared_pointer const & requester,std::string const & subField)
374  : channel(channel),
375  getFieldRequester(requester),
376  subField(subField)
377 {
378  if(DEBUG_LEVEL>0) {
379  cout << "CAChannelGetField::CAChannelGetField()\n";
380  }
381 }
382 
384 {
385  CAChannelPtr chan(channel.lock());
386  if(chan) callRequester(chan);
387 }
388 
390 {
391  if(DEBUG_LEVEL>0) {
392  cout << "CAChannelGetField::~CAChannelGetField()\n";
393  }
394 }
395 
397 {
398  if(DEBUG_LEVEL>0) {
399  cout << "CAChannelGetField::callRequester\n";
400  }
401  GetFieldRequester::shared_pointer requester(getFieldRequester.lock());
402  if(!requester) return;
403  PVStructurePtr pvRequest(createRequest(""));
404  DbdToPvPtr dbdToPv = DbdToPv::create(caChannel,pvRequest,getIO);
405  Structure::const_shared_pointer structure(dbdToPv->getStructure());
406  Field::const_shared_pointer field =
407  subField.empty() ?
409  structure->getField(subField);
410 
411  if (field)
412  {
413  EXCEPTION_GUARD(requester->getDone(Status::Ok, field));
414  }
415  else
416  {
417  Status errorStatus(Status::STATUSTYPE_ERROR, "field '" + subField + "' not found");
418  EXCEPTION_GUARD(requester->getDone(errorStatus, FieldConstPtr()));
419  }
420 }
421 
422 /* ---------------------------------------------------------- */
423 
425 {
426  CAChannelProviderPtr provider(channelProvider.lock());
427  if(provider) {
428  std::tr1::static_pointer_cast<CAChannelProvider>(provider)->attachContext();
429  return;
430  }
431  string mess("CAChannel::attachContext provider does not exist ");
432  mess += getChannelName();
433  throw std::runtime_error(mess);
434 }
435 
437  CAChannel::shared_pointer const & channel,
438  ChannelGetRequester::shared_pointer const & channelGetRequester,
439  PVStructure::shared_pointer const & pvRequest)
440 {
441  if(DEBUG_LEVEL>0) {
442  cout << "CAChannelGet::create " << channel->getChannelName() << endl;
443  }
444  return CAChannelGetPtr(new CAChannelGet(channel, channelGetRequester, pvRequest));
445 }
446 
447 CAChannelGet::CAChannelGet(CAChannel::shared_pointer const & channel,
448  ChannelGetRequester::shared_pointer const & channelGetRequester,
449  PVStructure::shared_pointer const & pvRequest)
450  :
451  channel(channel),
452  channelGetRequester(channelGetRequester),
453  pvRequest(pvRequest),
454  getStatus(Status::Ok),
455  getDoneThread(GetDoneThread::get())
456 {}
457 
459 {
460  if(DEBUG_LEVEL>0) {
461  std::cout << "CAChannelGet::~CAChannelGet() " << channel->getChannelName() << endl;
462  }
463 }
464 
466 {
467  ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
468  if(!getRequester) return;
469  if(DEBUG_LEVEL>0) {
470  std::cout << "CAChannelGet::activate " << channel->getChannelName() << endl;
471  }
472  dbdToPv = DbdToPv::create(channel,pvRequest,getIO);
473  dbdToPv->getChoices(channel);
474  pvStructure = dbdToPv->createPVStructure();
475  bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
476  notifyGetRequester = NotifyGetRequesterPtr(new NotifyGetRequester());
477  notifyGetRequester->setChannelGet(shared_from_this());
478  EXCEPTION_GUARD(getRequester->channelGetConnect(Status::Ok, shared_from_this(),
479  pvStructure->getStructure()));
480 }
481 
482 
483 
484 std::string CAChannelGet::getRequesterName() { return "CAChannelGet";}
485 
486 namespace {
487 
488 static void ca_get_handler(struct event_handler_args args)
489 {
490  CAChannelGet *channelGet = static_cast<CAChannelGet*>(args.usr);
491  channelGet->getDone(args);
492 }
493 
494 } // namespace
495 
497 {
498  if(DEBUG_LEVEL>1) {
499  std::cout << "CAChannelGet::getDone "
500  << channel->getChannelName() << endl;
501  }
502 
503  ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
504  if(!getRequester) return;
505  getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
506  getDoneThread->getDone(notifyGetRequester);
507 }
508 
510 {
511  if(DEBUG_LEVEL>1) {
512  std::cout << "CAChannelGet::notifyClient " << channel->getChannelName() << endl;
513  }
514  ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
515  if(!getRequester) return;
516  EXCEPTION_GUARD(getRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
517 }
518 
520 {
521  if(DEBUG_LEVEL>1) {
522  std::cout << "CAChannelGet::get " << channel->getChannelName() << endl;
523  }
524  ChannelGetRequester::shared_pointer getRequester(channelGetRequester.lock());
525  if(!getRequester) return;
526  channel->attachContext();
527  bitSet->clear();
528  int result = ca_array_get_callback(dbdToPv->getRequestType(),
529  0,
530  channel->getChannelID(), ca_get_handler, this);
531  if (result == ECA_NORMAL)
532  {
533  result = ca_flush_io();
534  }
535  if (result != ECA_NORMAL)
536  {
537  string mess("CAChannelGet::get ");
538  mess += channel->getChannelName() + " message " + ca_message(result);
539  getStatus = Status(Status::STATUSTYPE_ERROR,mess);
540  notifyClient();
541  }
542 }
543 
544 Channel::shared_pointer CAChannelGet::getChannel()
545 {
546  return channel;
547 }
548 
550 {
551 }
552 
554 {
555 }
556 
558  CAChannel::shared_pointer const & channel,
559  ChannelPutRequester::shared_pointer const & channelPutRequester,
560  PVStructure::shared_pointer const & pvRequest)
561 {
562  if(DEBUG_LEVEL>0) {
563  cout << "CAChannelPut::create " << channel->getChannelName() << endl;
564  }
565  return CAChannelPutPtr(new CAChannelPut(channel, channelPutRequester, pvRequest));
566 }
567 
568 CAChannelPut::CAChannelPut(CAChannel::shared_pointer const & channel,
569  ChannelPutRequester::shared_pointer const & channelPutRequester,
570  PVStructure::shared_pointer const & pvRequest)
571 :
572  channel(channel),
573  channelPutRequester(channelPutRequester),
574  pvRequest(pvRequest),
575  block(false),
576  isPut(false),
577  getStatus(Status::Ok),
578  putStatus(Status::Ok),
579  putDoneThread(PutDoneThread::get())
580 {}
581 
583 {
584  if(DEBUG_LEVEL>0) {
585  std::cout << "CAChannelPut::~CAChannelPut() " << channel->getChannelName() << endl;
586  }
587 }
588 
589 
591 {
592  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
593  if(!putRequester) return;
594  if(DEBUG_LEVEL>0) {
595  cout << "CAChannelPut::activate " << channel->getChannelName() << endl;
596  }
597  dbdToPv = DbdToPv::create(channel,pvRequest,putIO);
598  dbdToPv->getChoices(channel);
599  pvStructure = dbdToPv->createPVStructure();
600  bitSet = BitSetPtr(new BitSet(pvStructure->getStructure()->getNumberFields()));
601  PVStringPtr pvString = pvRequest->getSubField<PVString>("record._options.block");
602  if(pvString) {
603  std::string val = pvString->get();
604  if(val.compare("true")==0) block = true;
605  }
606  notifyPutRequester = NotifyPutRequesterPtr(new NotifyPutRequester());
607  notifyPutRequester->setChannelPut(shared_from_this());
608  EXCEPTION_GUARD(putRequester->channelPutConnect(Status::Ok, shared_from_this(),
609  pvStructure->getStructure()));
610 }
611 
612 std::string CAChannelPut::getRequesterName() { return "CAChannelPut";}
613 
614 
615 /* --------------- epics::pvAccess::ChannelPut --------------- */
616 
617 namespace {
618 
619 static void ca_put_handler(struct event_handler_args args)
620 {
621  CAChannelPut *channelPut = static_cast<CAChannelPut*>(args.usr);
622  channelPut->putDone(args);
623 }
624 
625 static void ca_put_get_handler(struct event_handler_args args)
626 {
627  CAChannelPut *channelPut = static_cast<CAChannelPut*>(args.usr);
628  channelPut->getDone(args);
629 }
630 
631 } // namespace
632 
633 
634 void CAChannelPut::put(PVStructure::shared_pointer const & pvPutStructure,
635  BitSet::shared_pointer const & /*putBitSet*/)
636 {
637  if(DEBUG_LEVEL>1) {
638  cout << "CAChannelPut::put " << channel->getChannelName() << endl;
639  }
640  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
641  if(!putRequester) return;
642  {
643  Lock lock(mutex);
644  isPut = true;
645  }
646  putStatus = dbdToPv->putToDBD(channel,pvPutStructure,block,&ca_put_handler,this);
647  if(!block || !putStatus.isOK()) {
648  EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
649  }
650 }
651 
652 
654 {
655  if(DEBUG_LEVEL>1) {
656  cout << "CAChannelPut::putDone " << channel->getChannelName() << endl;
657  }
658  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
659  if(!putRequester) return;
660  if(args.status!=ECA_NORMAL)
661  {
662  putStatus = Status(Status::STATUSTYPE_ERROR, string(ca_message(args.status)));
663  } else {
664  putStatus = Status::Ok;
665  }
666  putDoneThread->putDone(notifyPutRequester);
667 }
668 
670 {
671  if(DEBUG_LEVEL>1) {
672  cout << "CAChannelPut::getDone " << channel->getChannelName() << endl;
673  }
674 
675  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
676  if(!putRequester) return;
677  getStatus = dbdToPv->getFromDBD(pvStructure,bitSet,args);
678  putDoneThread->putDone(notifyPutRequester);
679 }
680 
682 {
683  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
684  if(!putRequester) return;
685  if(isPut) {
686  EXCEPTION_GUARD(putRequester->putDone(putStatus, shared_from_this()));
687  } else {
688  EXCEPTION_GUARD(putRequester->getDone(getStatus, shared_from_this(), pvStructure, bitSet));
689  }
690 }
691 
692 
694 {
695  if(DEBUG_LEVEL>1) {
696  std::cout << "CAChannelPut::get " << channel->getChannelName() << endl;
697  }
698  ChannelPutRequester::shared_pointer putRequester(channelPutRequester.lock());
699  if(!putRequester) return;
700  {
701  Lock lock(mutex);
702  isPut = false;
703  }
704 
705  channel->attachContext();
706  bitSet->clear();
707  int result = ca_array_get_callback(dbdToPv->getRequestType(),
708  0,
709  channel->getChannelID(), ca_put_get_handler, this);
710  if (result == ECA_NORMAL)
711  {
712  result = ca_flush_io();
713  }
714  if (result != ECA_NORMAL)
715  {
716  string mess("CAChannelPut::get ");
717  mess += channel->getChannelName() + " message " +ca_message(result);
719  EXCEPTION_GUARD(putRequester->getDone(status, shared_from_this(), pvStructure, bitSet));
720  }
721 }
722 
723 
724 Channel::shared_pointer CAChannelPut::getChannel()
725 {
726  return channel;
727 }
728 
730 {
731 }
732 
734 {
735 }
736 
737 
738 /* --------------- Monitor --------------- */
739 
740 
741 static void ca_subscription_handler(struct event_handler_args args)
742 {
743  CAChannelMonitor *channelMonitor = static_cast<CAChannelMonitor*>(args.usr);
744  channelMonitor->subscriptionEvent(args);
745 }
746 
748  public std::tr1::enable_shared_from_this<CACMonitorQueue>
749 {
750 public:
752 private:
753  size_t queueSize;
754  bool isStarted;
755  Mutex mutex;
756 
757  std::queue<MonitorElementPtr> monitorElementQueue;
758 public:
760  int32 queueSize)
761  : queueSize(queueSize),
762  isStarted(false)
763  {}
765  {
766  }
767  void start()
768  {
769  Lock guard(mutex);
770  while(!monitorElementQueue.empty()) monitorElementQueue.pop();
771  isStarted = true;
772  }
773  void stop()
774  {
775  Lock guard(mutex);
776  while(!monitorElementQueue.empty()) monitorElementQueue.pop();
777  isStarted = false;
778  }
779 
780  bool event(
781  const PVStructurePtr &pvStructure,
782  const MonitorElementPtr & activeElement)
783  {
784  Lock guard(mutex);
785  if(!isStarted) return false;
786  if(monitorElementQueue.size()==queueSize) return false;
787  PVStructure::shared_pointer pvs =
788  getPVDataCreate()->createPVStructure(pvStructure);
789  MonitorElementPtr monitorElement(new MonitorElement(pvs));
790  *(monitorElement->changedBitSet) = *(activeElement->changedBitSet);
791  *(monitorElement->overrunBitSet) = *(activeElement->overrunBitSet);
792  monitorElementQueue.push(monitorElement);
793  return true;
794  }
796  {
797  Lock guard(mutex);
798  if(!isStarted) return MonitorElementPtr();
799  if(monitorElementQueue.empty()) return MonitorElementPtr();
800  MonitorElementPtr retval = monitorElementQueue.front();
801  return retval;
802  }
803  void release(MonitorElementPtr const & monitorElement)
804  {
805  Lock guard(mutex);
806  if(!isStarted) return;
807  if(monitorElementQueue.empty()) {
808  string mess("CAChannelMonitor::release client error calling release ");
809  throw std::runtime_error(mess);
810  }
811  monitorElementQueue.pop();
812  }
813 };
814 
816  CAChannel::shared_pointer const & channel,
817  MonitorRequester::shared_pointer const & monitorRequester,
818  PVStructure::shared_pointer const & pvRequest)
819 {
820  if(DEBUG_LEVEL>0) {
821  cout << "CAChannelMonitor::create " << channel->getChannelName() << endl;
822  }
823  return CAChannelMonitorPtr(new CAChannelMonitor(channel, monitorRequester, pvRequest));
824 }
825 
826 CAChannelMonitor::CAChannelMonitor(
827  CAChannel::shared_pointer const & channel,
828  MonitorRequester::shared_pointer const & monitorRequester,
829  PVStructurePtr const & pvRequest)
830 :
831  channel(channel),
832  monitorRequester(monitorRequester),
833  pvRequest(pvRequest),
834  isStarted(false),
835  monitorEventThread(MonitorEventThread::get()),
836  pevid(NULL),
837  eventMask(DBE_VALUE | DBE_ALARM)
838 {}
839 
841 {
842  if(DEBUG_LEVEL>0) {
843  std::cout << "CAChannelMonitor::~CAChannelMonitor() "
844  << channel->getChannelName()
845  << " isStarted " << (isStarted ? "true" : "false")
846  << endl;
847  }
848  stop();
849 }
850 
852 {
853  MonitorRequester::shared_pointer requester(monitorRequester.lock());
854  if(!requester) return;
855  if(DEBUG_LEVEL>0) {
856  std::cout << "CAChannelMonitor::activate " << channel->getChannelName() << endl;
857  }
858  dbdToPv = DbdToPv::create(channel,pvRequest,monitorIO);
859  dbdToPv->getChoices(channel);
860  pvStructure = dbdToPv->createPVStructure();
861  activeElement = MonitorElementPtr(new MonitorElement(pvStructure));
862  int32 queueSize = 2;
863  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
864  if (pvOptions) {
865  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
866  if (pvString) {
867  int size=0;
868  std::stringstream ss;
869  ss << pvString->get();
870  ss >> size;
871  if (size > 1) queueSize = size;
872  }
873  pvString = pvOptions->getSubField<PVString>("DBE");
874  if(pvString) {
875  std::string value(pvString->get());
876  eventMask = 0;
877  if(value.find("VALUE")!=std::string::npos) eventMask|=DBE_VALUE;
878  if(value.find("ARCHIVE")!=std::string::npos) eventMask|=DBE_ARCHIVE;
879  if(value.find("ALARM")!=std::string::npos) eventMask|=DBE_ALARM;
880  if(value.find("PROPERTY")!=std::string::npos) eventMask|=DBE_PROPERTY;
881  }
882  }
883  notifyMonitorRequester = NotifyMonitorRequesterPtr(new NotifyMonitorRequester());
884  notifyMonitorRequester->setChannelMonitor(shared_from_this());
885  monitorQueue = CACMonitorQueuePtr(new CACMonitorQueue(queueSize));
886  EXCEPTION_GUARD(requester->monitorConnect(Status::Ok, shared_from_this(),
887  pvStructure->getStructure()));
888 }
889 
890 std::string CAChannelMonitor::getRequesterName() { return "CAChannelMonitor";}
891 
893 {
894  if(DEBUG_LEVEL>1) {
895  std::cout << "CAChannelMonitor::subscriptionEvent "
896  << channel->getChannelName() << endl;
897  }
898  {
899  Lock lock(mutex);
900  if(!isStarted) return;
901  }
902  MonitorRequester::shared_pointer requester(monitorRequester.lock());
903  if(!requester) return;
904  Status status = dbdToPv->getFromDBD(pvStructure,activeElement->changedBitSet,args);
905  if(status.isOK())
906  {
907  if(monitorQueue->event(pvStructure,activeElement)) {
908  activeElement->changedBitSet->clear();
909  activeElement->overrunBitSet->clear();
910  } else {
911  *(activeElement->overrunBitSet) |= *(activeElement->changedBitSet);
912  }
913  monitorEventThread->event(notifyMonitorRequester);
914  }
915  else
916  {
917  string mess("CAChannelMonitor::subscriptionEvent ");
918  mess += channel->getChannelName();
919  mess += ca_message(args.status);
920  throw std::runtime_error(mess);
921  }
922 }
923 
924 
926 {
927  {
928  Lock lock(mutex);
929  if(!isStarted) return;
930  }
931  MonitorRequester::shared_pointer requester(monitorRequester.lock());
932  if(!requester) return;
933  requester->monitorEvent(shared_from_this());
934 }
935 
937 {
938  if(DEBUG_LEVEL>0) {
939  std::cout << "CAChannelMonitor::start " << channel->getChannelName() << endl;
940  }
942  {
943  Lock lock(mutex);
944  if(isStarted) {
945  status = Status(Status::STATUSTYPE_WARNING,"already started");
946  return status;
947  }
948  isStarted = true;
949  monitorQueue->start();
950  }
951  channel->attachContext();
952  int result = ca_create_subscription(dbdToPv->getRequestType(),
953  0,
954  channel->getChannelID(), eventMask,
955  ca_subscription_handler, this,
956  &pevid);
957  if (result == ECA_NORMAL)
958  {
959  result = ca_flush_io();
960  }
961  if (result == ECA_NORMAL) return status;
962  isStarted = false;
963  string message(ca_message(result));
964  return Status(Status::STATUSTYPE_ERROR,message);
965 }
966 
968 {
969  if(DEBUG_LEVEL>0) {
970  std::cout << "CAChannelMonitor::stop "
971  << channel->getChannelName()
972  << " isStarted " << (isStarted ? "true" : "false")
973  << endl;
974  }
975  {
976  Lock lock(mutex);
977  if(!isStarted) return Status(Status::STATUSTYPE_WARNING,"already stopped");
978  isStarted = false;
979  }
980  monitorQueue->stop();
981  int result = ca_clear_subscription(pevid);
982  if(result==ECA_NORMAL) return Status::Ok;
983  return Status(Status::STATUSTYPE_ERROR,string(ca_message(result)));
984 }
985 
986 
988 {
989  if(DEBUG_LEVEL>1) {
990  std::cout << "CAChannelMonitor::poll " << channel->getChannelName() << endl;
991  }
992  {
993  Lock lock(mutex);
994  if(!isStarted) return MonitorElementPtr();
995  }
996  return monitorQueue->poll();
997 }
998 
999 
1000 void CAChannelMonitor::release(MonitorElementPtr const & monitorElement)
1001 {
1002  if(DEBUG_LEVEL>1) {
1003  std::cout << "CAChannelMonitor::release " << channel->getChannelName() << endl;
1004  }
1005  monitorQueue->release(monitorElement);
1006 }
1007 
1008 /* --------------- ChannelRequest --------------- */
1009 
1011 {
1012  // noop
1013 }
1014 
1015 
1016 }}}
#define CA_OP_CONN_UP
Definition: cadef.h:128
Definition: link.h:174
virtual Channel::shared_pointer getChannel()
Definition: caChannel.cpp:724
LIBCA_API int epicsStdCall ca_array_get_callback(chtype type, unsigned long count, chid chanId, caEventCallBackFunc *pFunc, void *pArg)
#define DEBUG_LEVEL
Definition: caProviderPvt.h:25
bool event(const PVStructurePtr &pvStructure, const MonitorElementPtr &activeElement)
Definition: caChannel.cpp:780
LIBCA_API unsigned epicsStdCall ca_write_access(chid chan)
pvac::PutEvent result
Definition: clientSync.cpp:117
virtual Channel::shared_pointer getChannel()
Definition: caChannel.cpp:544
epicsMutexId lock
Definition: osiClockTime.c:37
pvd::Status status
std::tr1::shared_ptr< CACMonitorQueue > CACMonitorQueuePtr
Definition: caChannel.h:232
static Status Ok
Definition: status.h:47
int epicsStdCall ca_create_channel(const char *name_str, caCh *conn_func, void *puser, capri priority, chid *chanptr)
Definition: access.cpp:288
void release(MonitorElementPtr const &monitorElement)
Definition: caChannel.cpp:803
LIBCA_API enum channel_state epicsStdCall ca_state(chid chan)
void getDone(struct event_handler_args &args)
Definition: caChannel.cpp:669
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
LIBCA_API const char *epicsStdCall ca_host_name(chid channel)
static CAChannelGet::shared_pointer create(CAChannel::shared_pointer const &channel, ChannelGetRequester::shared_pointer const &channelGetRequester, epics::pvData::PVStructurePtr const &pvRequest)
Definition: caChannel.cpp:436
std::tr1::shared_ptr< NotifyChannelRequester > NotifyChannelRequesterPtr
Definition: caChannel.h:37
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
virtual epics::pvData::Status stop()
Definition: caChannel.cpp:967
LIBCA_API void *epicsStdCall ca_puser(chid chan)
A lock for multithreading.
Definition: lock.h:36
#define NULL
Definition: catime.c:38
virtual std::string getRequesterName()
Definition: caChannel.cpp:612
A vector of bits.
Definition: bitSet.h:56
#define DBE_ALARM
Definition: caeventmask.h:41
An element for a monitorQueue.
Definition: monitor.h:54
static PutDoneThreadPtr get()
void putDone(struct event_handler_args &args)
Definition: caChannel.cpp:653
#define DBE_ARCHIVE
Definition: caeventmask.h:39
PVStructure::shared_pointer createRequest(std::string const &request)
storage_t::arg_type get() const
Definition: pvData.h:396
virtual void put(epics::pvData::PVStructure::shared_pointer const &pvPutStructure, epics::pvData::BitSet::shared_pointer const &putBitSet)
Definition: caChannel.cpp:634
std::tr1::shared_ptr< CAChannel > CAChannelPtr
Definition: caChannel.h:31
This class implements introspection object for field.
Definition: pvIntrospect.h:336
#define DBE_VALUE
Definition: caeventmask.h:38
static CAChannelPut::shared_pointer create(CAChannel::shared_pointer const &channel, ChannelPutRequester::shared_pointer const &channelPutRequester, epics::pvData::PVStructurePtr const &pvRequest)
Definition: caChannel.cpp:557
PVString is special case, since it implements SerializableArray.
Definition: pvData.h:521
pvData
Definition: monitor.h:428
#define ECA_NORMAL
Definition: caerr.h:77
std::tr1::shared_ptr< CAChannelProvider > CAChannelProviderPtr
Definition: caProviderPvt.h:43
std::tr1::shared_ptr< CAChannelGetField > CAChannelGetFieldPtr
Definition: caChannel.h:57
void subscriptionEvent(struct event_handler_args &args)
Definition: caChannel.cpp:892
std::tr1::shared_ptr< NotifyGetRequester > NotifyGetRequesterPtr
Definition: caChannel.h:45
epicsMutex mutex
Definition: pvAccess.cpp:71
int epicsStdCall ca_flush_io()
Definition: access.cpp:509
std::tr1::shared_ptr< NotifyMonitorRequester > NotifyMonitorRequesterPtr
Definition: caChannel.h:40
static GetDoneThreadPtr get()
const ChannelProcessRequester::weak_pointer requester
Definition: pvAccess.cpp:68
LIBCA_API int epicsStdCall ca_clear_subscription(evid pMon)
std::tr1::shared_ptr< CAChannelPut > CAChannelPutPtr
Definition: caChannel.h:60
std::tr1::weak_ptr< CAChannelMonitor > CAChannelMonitorWPtr
Definition: caChannel.h:68
static MonitorEventThreadPtr get()
LIBCA_API unsigned epicsStdCall ca_read_access(chid chan)
std::tr1::shared_ptr< CAChannelMonitor > CAChannelMonitorPtr
Definition: caChannel.h:66
virtual MonitorElementPtr poll()
Definition: caChannel.cpp:987
Data interface for a structure,.
Definition: pvData.h:712
std::tr1::shared_ptr< const Field > FieldConstPtr
Definition: pvIntrospect.h:137
int epicsStdCall ca_clear_channel(chid pChan)
Definition: access.cpp:363
static CAChannelMonitor::shared_pointer create(CAChannel::shared_pointer const &channel, MonitorRequester::shared_pointer const &monitorRequester, epics::pvData::PVStructurePtr const &pvRequest)
Definition: caChannel.cpp:815
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
#define CA_OP_CONN_DOWN
Definition: cadef.h:129
#define DBE_PROPERTY
Definition: caeventmask.h:42
std::tr1::shared_ptr< PVString > PVStringPtr
Definition: pvData.h:540
const char *epicsStdCall ca_message(long ca_status)
Definition: access.cpp:561
std::tr1::shared_ptr< NotifyPutRequester > NotifyPutRequesterPtr
Definition: caChannel.h:51
#define EXCEPTION_GUARD(code)
Definition: caChannel.cpp:31
virtual epics::pvData::Status start()
Definition: caChannel.cpp:936
std::tr1::shared_ptr< DbdToPv > DbdToPvPtr
Definition: dbdToPv.h:73
Definition: caget.c:48
std::tr1::shared_ptr< CAChannelGet > CAChannelGetPtr
Definition: caChannel.h:63
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
Definition: monitor.h:40
virtual std::string getRequesterName()
Definition: caChannel.cpp:484
std::tr1::shared_ptr< BitSet > BitSetPtr
Definition: bitSet.h:26
bool isOK() const
Definition: status.h:95
void connect(bool isConnected)
Definition: caChannel.cpp:61
static DbdToPvPtr create(CAChannelPtr const &caChannel, epics::pvData::PVStructurePtr const &pvRequest, IOType ioType)
Definition: dbdToPv.cpp:37
epicsMutex Mutex
Definition: lock.h:28
void callRequester(CAChannelPtr const &caChannel)
Definition: caChannel.cpp:396
void * usr
Definition: cadef.h:85
LIBCA_API int epicsStdCall ca_create_subscription(chtype type, unsigned long count, chid chanId, long mask, caEventCallBackFunc *pFunc, void *pArg, evid *pEventID)
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
void getDone(struct event_handler_args &args)
Definition: caChannel.cpp:496
virtual void release(MonitorElementPtr const &monitorElement)
Definition: caChannel.cpp:1000
virtual std::string getRequesterName()
Definition: caChannel.cpp:890