This is Unofficial EPICS BASE Doxygen Site
clientContextImpl.cpp
Go to the documentation of this file.
1 
7 #include <iostream>
8 #include <sstream>
9 #include <memory>
10 #include <queue>
11 #include <stdexcept>
12 
13 #include <osiSock.h>
14 #include <epicsGuard.h>
15 #include <epicsAssert.h>
16 #include <epicsAtomic.h>
17 
18 #include <pv/lock.h>
19 #include <pv/timer.h>
20 #include <pv/bitSetUtil.h>
21 #include <pv/standardPVField.h>
22 #include <pv/reftrack.h>
23 
24 #define epicsExportSharedSymbols
25 #include <pv/pvAccess.h>
26 #include <pv/pvaConstants.h>
27 #include <pv/blockingUDP.h>
28 #include <pv/blockingTCP.h>
29 #include <pv/inetAddressUtil.h>
30 #include <pv/hexDump.h>
31 #include <pv/remote.h>
32 #include <pv/codec.h>
34 #include <pv/serializationHelper.h>
36 #include <pv/clientContextImpl.h>
37 #include <pv/configuration.h>
38 #include <pv/beaconHandler.h>
39 #include <pv/logger.h>
40 #include <pv/securityImpl.h>
41 
42 #include <pv/pvAccessMB.h>
43 
44 //#include <tr1/unordered_map>
45 
48 
49 using namespace std;
50 using namespace epics::pvData;
51 
52 namespace epics {
53 namespace pvAccess {
54 
55 Status ClientChannelImpl::channelDestroyed(
56  Status::STATUSTYPE_WARNING, "channel destroyed");
57 Status ClientChannelImpl::channelDisconnected(
58  Status::STATUSTYPE_WARNING, "channel disconnected");
59 
60 }}
61 namespace {
62 using namespace epics::pvAccess;
63 
64 class ChannelGetFieldRequestImpl;
65 
66 // TODO consider std::unordered_map
67 //typedef std::tr1::unordered_map<pvAccessID, ResponseRequest::weak_pointer> IOIDResponseRequestMap;
68 typedef std::map<pvAccessID, ResponseRequest::weak_pointer> IOIDResponseRequestMap;
69 
70 
71 #define EXCEPTION_GUARD(code) do { code; } while(0)
72 
73 #define EXCEPTION_GUARD3(WEAK, PTR, code) do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) { code; }}while(0)
74 
75 #define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE) \
76 do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG, MTYPE); }while(0)
77 
82 class BaseRequestImpl :
83  public ResponseRequest,
84  public NetStats,
85  public virtual epics::pvAccess::Destroyable
86 {
87 public:
88  POINTER_DEFINITIONS(BaseRequestImpl);
89 
90  static PVDataCreatePtr pvDataCreate;
91 
92  static const Status notInitializedStatus;
93  static const Status destroyedStatus;
94  static const Status channelNotConnected;
95  static const Status channelDestroyed;
96  static const Status otherRequestPendingStatus;
97  static const Status invalidPutStructureStatus;
98  static const Status invalidPutArrayStatus;
99  static const Status pvRequestNull;
100 
101  static BitSet::shared_pointer createBitSetFor(
102  PVStructure::shared_pointer const & pvStructure,
103  BitSet::shared_pointer const & existingBitSet)
104  {
105  assert(pvStructure);
106  size_t pvStructureSize = pvStructure->getNumberFields();
107  if (existingBitSet)
108  {
109  existingBitSet->clear();
110  return existingBitSet;
111  }
112  else
113  return BitSet::shared_pointer(new BitSet(pvStructureSize));
114  }
115 
116  static PVField::shared_pointer reuseOrCreatePVField(
117  Field::const_shared_pointer const & field,
118  PVField::shared_pointer const & existingPVField)
119  {
120  if (existingPVField.get() && *field == *existingPVField->getField())
121  return existingPVField;
122  else
123  return pvDataCreate->createPVField(field);
124  }
125 
126 protected:
127 
128  const ClientChannelImpl::shared_pointer m_channel;
129 
130  /* negative... */
131  static const int NULL_REQUEST = -1;
132  static const int PURE_DESTROY_REQUEST = -2;
133  static const int PURE_CANCEL_REQUEST = -3;
134 
135  // const after activate()
136  pvAccessID m_ioid;
137 
138 private:
139  // holds: NULL_REQUEST, PURE_DESTROY_REQUEST, PURE_CANCEL_REQUEST, or
140  // a mask of QOS_*
141  int32 m_pendingRequest;
142 protected:
143 
144  Mutex m_mutex;
145 
146  /* ownership here is a bit complicated...
147  *
148  * each instance maintains two shared_ptr/weak_ptr
149  * 1. internal - calls 'delete' when ref count reaches zero
150  * 2. external - wraps 'internal' ref. calls ->destroy() and releases internal ref. when ref count reaches zero
151  *
152  * Any internal ref. loops must be broken by destroy()
153  *
154  * Only external refs. are returned by Channel::create*() or passed to *Requester methods.
155  *
156  * Internal refs. are held by internal relations which need to ensure memory is not
157  * prematurely free'd, but should not keep the channel/operation "alive".
158  * eg. A Channel holds an internal ref to ChannelGet
159  */
160  const BaseRequestImpl::weak_pointer m_this_internal,
161  m_this_external;
162 
163  template<class subklass>
164  std::tr1::shared_ptr<subklass> internal_from_this() {
165  ResponseRequest::shared_pointer P(m_this_internal);
166  return std::tr1::static_pointer_cast<subklass>(P);
167  }
168  template<class subklass>
169  std::tr1::shared_ptr<subklass> external_from_this() {
170  ResponseRequest::shared_pointer P(m_this_external);
171  return std::tr1::static_pointer_cast<subklass>(P);
172  }
173 public:
174  static size_t num_instances;
175  static size_t num_active;
176 
177  template<class subklass>
178  static
179  typename std::tr1::shared_ptr<subklass>
180  build(ClientChannelImpl::shared_pointer const & channel,
181  const typename subklass::requester_type::shared_pointer& requester,
182  const epics::pvData::PVStructure::shared_pointer& pvRequest)
183  {
184  std::tr1::shared_ptr<subklass> internal(new subklass(channel, requester, pvRequest)),
185  external(internal.get(),
187  // only we get to set these, but since this isn't the ctor, we aren't able to
188  // follow the rules.
189  const_cast<BaseRequestImpl::weak_pointer&>(internal->m_this_internal) = internal;
190  const_cast<BaseRequestImpl::weak_pointer&>(internal->m_this_external) = external;
191  internal->activate();
192  REFTRACE_INCREMENT(num_active);
193  return external;
194  }
195 protected:
196  bool m_destroyed;
197  bool m_initialized;
198 
199  AtomicBoolean m_lastRequest;
200 
201  AtomicBoolean m_subscribed;
202 
203  BaseRequestImpl(ClientChannelImpl::shared_pointer const & channel) :
204  m_channel(channel),
205  m_ioid(INVALID_IOID),
206  m_pendingRequest(NULL_REQUEST),
207  m_destroyed(false),
208  m_initialized(false),
209  m_subscribed()
210  {
211  REFTRACE_INCREMENT(num_instances);
212  }
213 
214  virtual ~BaseRequestImpl() {
215  REFTRACE_DECREMENT(num_instances);
216  }
217 
218  virtual void activate() {
219  // register response request
220  // ResponseRequest::shared_pointer to this instance must already exist
221  shared_pointer self(m_this_internal);
222  m_ioid = m_channel->getContext()->registerResponseRequest(self);
223  m_channel->registerResponseRequest(self);
224  }
225 
226  bool startRequest(int32 qos) {
227  Lock guard(m_mutex);
228 
229  if(qos==PURE_DESTROY_REQUEST)
230  {/* always allow destroy */}
231  else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST)
232  {/* cancel overrides all but destroy */}
233  else if(m_pendingRequest==NULL_REQUEST)
234  {/* anything whenidle */}
235  else
236  {return false; /* others not allowed */}
237 
238  m_pendingRequest = qos;
239  return true;
240  }
241 
242  int32 beginRequest() {
243  Lock guard(m_mutex);
244  int32 ret = m_pendingRequest;
245  m_pendingRequest = NULL_REQUEST;
246  return ret;
247  }
248 
249  void abortRequest() {
250  Lock guard(m_mutex);
251  m_pendingRequest = NULL_REQUEST;
252  }
253 
254 public:
255 
256  pvAccessID getIOID() const OVERRIDE FINAL {
257  return m_ioid;
258  }
259 
260  virtual void initResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0;
261  virtual void normalResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0;
262 
263  virtual void response(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer) OVERRIDE {
264  transport->ensureData(1);
265  int8 qos = payloadBuffer->getByte();
266 
267  Status status;
268  status.deserialize(payloadBuffer, transport.get());
269 
270  if (qos & QOS_INIT)
271  {
272  if (status.isSuccess())
273  {
274  // once created set destroy flag
275  Lock G(m_mutex);
276  m_initialized = true;
277  }
278 
279  initResponse(transport, version, payloadBuffer, qos, status);
280  }
281  else
282  {
283  bool destroyReq = false;
284 
285  if (qos & QOS_DESTROY)
286  {
287  Lock G(m_mutex);
288  m_initialized = false;
289  destroyReq = true;
290  }
291 
292  normalResponse(transport, version, payloadBuffer, qos, status);
293 
294  if (destroyReq)
295  destroy();
296  }
297  }
298 
299  virtual void cancel() OVERRIDE {
300 
301  {
302  Lock guard(m_mutex);
303  if (m_destroyed)
304  return;
305  }
306 
307  try
308  {
309  startRequest(PURE_CANCEL_REQUEST);
310  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
311  } catch (std::runtime_error& e) {
312  // assume from checkAndGetTransport() due to wrong channel state
313  } catch (std::exception& e) {
314  // noop (do not complain if fails)
315  LOG(logLevelWarn, "Ignore exception during ChanneGet::cancel: %s", e.what());
316  }
317 
318  }
319 
320  virtual Channel::shared_pointer getChannel() {
321  return m_channel;
322  }
323 
324  virtual void destroy() OVERRIDE {
325  destroy(false);
326  }
327 
328  virtual void lastRequest() {
329  m_lastRequest.set();
330  }
331 
332  virtual void destroy(bool createRequestFailed) {
333 
334  bool initd;
335  {
336  Lock guard(m_mutex);
337  if (m_destroyed)
338  return;
339  m_destroyed = true;
340  initd = m_initialized;
341  }
342 
343  // unregister response request
344  m_channel->getContext()->unregisterResponseRequest(m_ioid);
345  m_channel->unregisterResponseRequest(m_ioid);
346 
347  // destroy remote instance
348  if (!createRequestFailed && initd)
349  {
350  try
351  {
352  startRequest(PURE_DESTROY_REQUEST);
353  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
354  } catch (std::runtime_error& e) {
355  // assume from checkAndGetTransport() due to wrong channel state
356  } catch (std::exception& e) {
357  LOG(logLevelWarn, "Ignore exception during BaseRequestImpl::destroy: %s", e.what());
358  }
359 
360  }
361 
362  REFTRACE_DECREMENT(num_active);
363  }
364 
365  virtual void timeout() OVERRIDE FINAL {
366  cancel();
367  // TODO notify?
368  }
369 
370  void reportStatus(Channel::ConnectionState status) OVERRIDE FINAL {
371  // destroy, since channel (parent) was destroyed
372  if (status == Channel::DESTROYED)
373  destroy();
374  else if (status == Channel::DISCONNECTED)
375  {
376  m_subscribed.clear();
377  abortRequest();
378  }
379  // TODO notify?
380  }
381 
382  virtual void resubscribeSubscription(Transport::shared_pointer const & transport) {
383  if (transport.get() != 0 && !m_subscribed.get() && startRequest(QOS_INIT))
384  {
385  m_subscribed.set();
386  transport->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
387  }
388  }
389 
390  void updateSubscription() {}
391 
392  // sub-class send() calls me
393  void base_send(ByteBuffer* buffer, TransportSendControl* control, int8 qos) {
394  if (qos == NULL_REQUEST) {
395  return;
396  }
397  else if (qos == PURE_DESTROY_REQUEST)
398  {
399  control->startMessage((int8)CMD_DESTROY_REQUEST, 8);
400  buffer->putInt(m_channel->getServerChannelID());
401  buffer->putInt(m_ioid);
402  }
403  else if (qos == PURE_CANCEL_REQUEST)
404  {
405  control->startMessage((int8)CMD_CANCEL_REQUEST, 8);
406  buffer->putInt(m_channel->getServerChannelID());
407  buffer->putInt(m_ioid);
408  }
409  }
410 
411  virtual void stats(Stats& s) const OVERRIDE FINAL
412  {
413  s.populated = true;
414  s.operationBytes.tx = epics::atomic::get(bytesTX);
415  s.operationBytes.rx = epics::atomic::get(bytesRX);
416  Transport::shared_pointer T(m_channel->getTransport());
417  if(T) { // must be connected
418  s.transportPeer = T->getRemoteName();
419  s.transportBytes.tx = epics::atomic::get(T->_totalBytesSent);
420  s.transportBytes.rx = epics::atomic::get(T->_totalBytesRecv);
421  }
422  }
423 };
424 
425 size_t BaseRequestImpl::num_instances;
426 size_t BaseRequestImpl::num_active;
427 
428 
429 PVDataCreatePtr BaseRequestImpl::pvDataCreate = getPVDataCreate();
430 
431 const Status BaseRequestImpl::notInitializedStatus(Status::STATUSTYPE_ERROR, "request not initialized");
432 const Status BaseRequestImpl::destroyedStatus(Status::STATUSTYPE_ERROR, "request destroyed");
433 const Status BaseRequestImpl::channelNotConnected(Status::STATUSTYPE_ERROR, "channel not connected");
434 const Status BaseRequestImpl::channelDestroyed(Status::STATUSTYPE_ERROR, "channel destroyed");
435 const Status BaseRequestImpl::otherRequestPendingStatus(Status::STATUSTYPE_ERROR, "other request pending");
436 const Status BaseRequestImpl::invalidPutStructureStatus(Status::STATUSTYPE_ERROR, "incompatible put structure");
437 const Status BaseRequestImpl::invalidPutArrayStatus(Status::STATUSTYPE_ERROR, "incompatible put array");
438 const Status BaseRequestImpl::pvRequestNull(Status::STATUSTYPE_ERROR, "pvRequest == 0");
439 
440 
441 class ChannelProcessRequestImpl :
442  public BaseRequestImpl,
443  public ChannelProcess
444 {
445 public:
446  const requester_type::weak_pointer m_callback;
447  const PVStructure::shared_pointer m_pvRequest;
448 
449  ChannelProcessRequestImpl(ClientChannelImpl::shared_pointer const & channel, ChannelProcessRequester::shared_pointer const & callback, PVStructure::shared_pointer const & pvRequest) :
450  BaseRequestImpl(channel),
451  m_callback(callback),
452  m_pvRequest(pvRequest)
453  {}
454 
455  virtual void activate() OVERRIDE FINAL
456  {
457  BaseRequestImpl::activate();
458 
459  // pvRequest can be null
460 
461  // TODO best-effort support
462 
463  try {
464  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
465  } catch (std::runtime_error &rte) {
466  EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(channelDestroyed, external_from_this<ChannelProcessRequestImpl>()));
467  BaseRequestImpl::destroy(true);
468  }
469  }
470 
471  virtual ~ChannelProcessRequestImpl() {}
472 
473  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
474 
475  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
476  int32 pendingRequest = beginRequest();
477  if (pendingRequest < 0)
478  {
479  base_send(buffer, control, pendingRequest);
480  return;
481  }
482 
483  control->startMessage((int8)CMD_PROCESS, 9);
484  buffer->putInt(m_channel->getServerChannelID());
485  buffer->putInt(m_ioid);
486  buffer->putByte((int8)pendingRequest);
487 
488  if (pendingRequest & QOS_INIT)
489  {
490  // pvRequest
491  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
492  }
493  }
494 
495  virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
496  EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(status, external_from_this<ChannelProcessRequestImpl>()));
497  }
498 
499  virtual void normalResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
500  EXCEPTION_GUARD3(m_callback, cb, cb->processDone(status, external_from_this<ChannelProcessRequestImpl>()));
501  }
502 
503  virtual void process() OVERRIDE FINAL
504  {
505  ChannelProcess::shared_pointer thisPtr(external_from_this<ChannelProcessRequestImpl>());
506 
507  {
508  Lock guard(m_mutex);
509  if (m_destroyed) {
510  EXCEPTION_GUARD3(m_callback, cb, cb->processDone(destroyedStatus, thisPtr));
511  return;
512  }
513  if (!m_initialized) {
514  EXCEPTION_GUARD3(m_callback, cb, cb->processDone(notInitializedStatus, thisPtr));
515  return;
516  }
517  }
518 
519  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) {
520  EXCEPTION_GUARD3(m_callback, cb, cb->processDone(otherRequestPendingStatus, thisPtr));
521  return;
522  }
523 
524  try {
525  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
526  } catch (std::runtime_error &rte) {
527  abortRequest();
528  EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr));
529  }
530  }
531 
532  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
533  {
534  return BaseRequestImpl::getChannel();
535  }
536 
537  virtual void cancel() OVERRIDE FINAL
538  {
539  BaseRequestImpl::cancel();
540  }
541 
542  virtual void destroy() OVERRIDE FINAL
543  {
544  BaseRequestImpl::destroy();
545  }
546 
547  virtual void lastRequest() OVERRIDE FINAL
548  {
549  BaseRequestImpl::lastRequest();
550  }
551 };
552 
553 
554 
555 
556 
557 
558 
559 
560 class ChannelGetImpl :
561  public BaseRequestImpl,
562  public ChannelGet
563 {
564 public:
565  const ChannelGetRequester::weak_pointer m_callback;
566 
567  const PVStructure::shared_pointer m_pvRequest;
568 
569  PVStructure::shared_pointer m_structure;
570  BitSet::shared_pointer m_bitSet;
571 
572  Mutex m_structureMutex;
573 
574  ChannelGetImpl(ClientChannelImpl::shared_pointer const & channel,
575  ChannelGetRequester::shared_pointer const & requester,
576  PVStructure::shared_pointer const & pvRequest) :
577  BaseRequestImpl(channel),
578  m_callback(requester),
579  m_pvRequest(pvRequest)
580  {
581  }
582 
583  virtual void activate() OVERRIDE FINAL
584  {
585  if (!m_pvRequest)
586  {
587  EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(pvRequestNull, external_from_this<ChannelGetImpl>(), StructureConstPtr()));
588  return;
589  }
590 
591  BaseRequestImpl::activate();
592 
593  // TODO immediate get, i.e. get data with init message
594  // TODO one-time get, i.e. immediate get + lastRequest
595 
596  try {
597  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
598  } catch (std::runtime_error &rte) {
599  EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(channelDestroyed, external_from_this<ChannelGetImpl>(), StructureConstPtr()));
600  BaseRequestImpl::destroy(true);
601  }
602  }
603 
604  virtual ~ChannelGetImpl()
605  {
606  }
607 
608  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
609 
610  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
611  int32 pendingRequest = beginRequest();
612  bool initStage = ((pendingRequest & QOS_INIT) != 0);
613 
614  if (pendingRequest < 0)
615  {
616  base_send(buffer, control, pendingRequest);
617  return;
618  }
619 
620  control->startMessage((int8)CMD_GET, 9);
621  buffer->putInt(m_channel->getServerChannelID());
622  buffer->putInt(m_ioid);
623  buffer->putByte((int8)pendingRequest);
624 
625  if (initStage)
626  {
627  // pvRequest
628  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
629  }
630  }
631 
632  virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
633  if (!status.isSuccess())
634  {
635  EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(status, external_from_this<ChannelGetImpl>(), StructureConstPtr()));
636  return;
637  }
638 
639  // create data and its bitSet
640  {
641  Lock lock(m_structureMutex);
642  m_structure = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get(), m_structure);
643  m_bitSet = createBitSetFor(m_structure, m_bitSet);
644  }
645 
646  // notify
647  EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(status, external_from_this<ChannelGetImpl>(), m_structure->getStructure()));
648  }
649 
650  virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
651 
652  if (!status.isSuccess())
653  {
654  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, external_from_this<ChannelGetImpl>(), PVStructurePtr(), BitSetPtr()));
655  return;
656  }
657 
658  // deserialize bitSet and data
659  {
660  Lock lock(m_structureMutex);
661  m_bitSet->deserialize(payloadBuffer, transport.get());
662  m_structure->deserialize(payloadBuffer, transport.get(), m_bitSet.get());
663  }
664 
665  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, external_from_this<ChannelGetImpl>(), m_structure, m_bitSet));
666  }
667 
668  virtual void get() OVERRIDE FINAL {
669 
670  ChannelGet::shared_pointer thisPtr(external_from_this<ChannelGetImpl>());
671 
672  {
673  Lock guard(m_mutex);
674  if (m_destroyed) {
675  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
676  return;
677  }
678  if (!m_initialized) {
679  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
680  return;
681  }
682  }
683  /*
684  // TODO bulk hack
685  if (lastRequest)
686  {
687  try {
688  m_channel->checkAndGetTransport()->flushSendQueue();
689  } catch (std::runtime_error &rte) {
690  abortRequest();
691  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr));
692  }
693  return;
694  }
695  */
696  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) {
697  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
698  return;
699  }
700 
701  try {
702  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelGetImpl>());
703  //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender);
704  } catch (std::runtime_error &rte) {
705  abortRequest();
706  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
707  }
708  }
709 
710  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
711  {
712  return BaseRequestImpl::getChannel();
713  }
714 
715  virtual void cancel() OVERRIDE FINAL
716  {
717  BaseRequestImpl::cancel();
718  }
719 
720  virtual void destroy() OVERRIDE FINAL
721  {
722  BaseRequestImpl::destroy();
723  }
724 
725  virtual void lastRequest() OVERRIDE FINAL
726  {
727  BaseRequestImpl::lastRequest();
728  }
729 
730  virtual void lock() OVERRIDE FINAL
731  {
732  m_structureMutex.lock();
733  }
734 
735  virtual void unlock() OVERRIDE FINAL
736  {
737  m_structureMutex.unlock();
738  }
739 };
740 
741 
742 
743 
744 
745 
746 
747 
748 class ChannelPutImpl :
749  public BaseRequestImpl,
750  public ChannelPut
751 {
752 public:
753  const ChannelPutRequester::weak_pointer m_callback;
754 
755  const PVStructure::shared_pointer m_pvRequest;
756 
757  PVStructure::shared_pointer m_structure;
758  BitSet::shared_pointer m_bitSet;
759 
760  Mutex m_structureMutex;
761 
762  ChannelPutImpl(ClientChannelImpl::shared_pointer const & channel,
763  ChannelPutRequester::shared_pointer const & requester,
764  PVStructure::shared_pointer const & pvRequest) :
765  BaseRequestImpl(channel),
766  m_callback(requester),
767  m_pvRequest(pvRequest)
768  {
769  }
770 
771  virtual void activate() OVERRIDE FINAL
772  {
773  if (!m_pvRequest)
774  {
775  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(pvRequestNull, external_from_this<ChannelPutImpl>(), StructureConstPtr()));
776  return;
777  }
778 
779  BaseRequestImpl::activate();
780 
781  // TODO low-overhead put
782  // TODO best-effort put
783 
784  try {
785  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
786  } catch (std::runtime_error &rte) {
787  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(channelDestroyed, external_from_this<ChannelPutImpl>(), StructureConstPtr()));
788  BaseRequestImpl::destroy(true);
789  }
790  }
791 
792  virtual ~ChannelPutImpl()
793  {
794  }
795 
796  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
797 
798  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
799  int32 pendingRequest = beginRequest();
800  if (pendingRequest < 0)
801  {
802  base_send(buffer, control, pendingRequest);
803  return;
804  }
805 
806  control->startMessage((int8)CMD_PUT, 9);
807  buffer->putInt(m_channel->getServerChannelID());
808  buffer->putInt(m_ioid);
809  buffer->putByte((int8)pendingRequest);
810 
811  if (pendingRequest & QOS_INIT)
812  {
813  // pvRequest
814  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
815  }
816  else if (!(pendingRequest & QOS_GET))
817  {
818  // put
819  // serialize only what has been changed
820  {
821  // no need to lock here, since it is already locked via TransportSender IF
822  //Lock lock(m_structureMutex);
823  m_bitSet->serialize(buffer, control);
824  m_structure->serialize(buffer, control, m_bitSet.get());
825  }
826  }
827  }
828 
829  virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
830  if (!status.isSuccess())
831  {
832  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(status, external_from_this<ChannelPutImpl>(), StructureConstPtr()));
833  return;
834  }
835 
836  // create data and its bitSet
837  {
838  Lock lock(m_structureMutex);
839  m_structure = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get(), m_structure);
840  m_bitSet = createBitSetFor(m_structure, m_bitSet);
841  }
842 
843  // notify
844  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(status, external_from_this<ChannelPutImpl>(), m_structure->getStructure()));
845  }
846 
847  virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) OVERRIDE FINAL {
848 
849  ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
850 
851  if (qos & QOS_GET)
852  {
853  if (!status.isSuccess())
854  {
855  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, thisPtr, PVStructurePtr(), BitSetPtr()));
856  return;
857  }
858 
859  {
860  Lock lock(m_structureMutex);
861  m_bitSet->deserialize(payloadBuffer, transport.get());
862  m_structure->deserialize(payloadBuffer, transport.get(), m_bitSet.get());
863  }
864 
865  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, thisPtr, m_structure, m_bitSet));
866  }
867  else
868  {
869  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(status, thisPtr));
870  }
871  }
872 
873  virtual void get() OVERRIDE FINAL {
874 
875  ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
876 
877  {
878  Lock guard(m_mutex);
879  if (m_destroyed) {
880  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
881  return;
882  }
883  if (!m_initialized) {
884  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
885  return;
886  }
887  }
888 
889  if (!startRequest(m_lastRequest.get() ? QOS_GET | QOS_DESTROY : QOS_GET)) {
890  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
891  return;
892  }
893 
894 
895  try {
896  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
897  } catch (std::runtime_error &rte) {
898  abortRequest();
899  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
900  }
901  }
902 
903  virtual void put(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & pvPutBitSet) OVERRIDE FINAL {
904 
905  ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
906 
907  {
908  Lock guard(m_mutex);
909  if (m_destroyed) {
910  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(destroyedStatus, thisPtr));
911  return;
912  }
913  if (!m_initialized) {
914  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(notInitializedStatus, thisPtr));
915  return;
916  }
917  }
918 
919  // TODO: m_structure and m_bitSet guarded by m_structureMutex? (as below)
920  if (!(*m_structure->getStructure() == *pvPutStructure->getStructure()))
921  {
922  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(invalidPutStructureStatus, thisPtr));
923  return;
924  }
925 
926  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) {
927  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(otherRequestPendingStatus, thisPtr));
928  return;
929  }
930 
931  try {
932  {
934  *m_bitSet = *pvPutBitSet;
935  m_structure->copyUnchecked(*pvPutStructure, *m_bitSet);
936  }
937  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
938  } catch (std::runtime_error &rte) {
939  abortRequest();
940  EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr));
941  }
942  }
943 
944  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
945  {
946  return BaseRequestImpl::getChannel();
947  }
948 
949  virtual void cancel() OVERRIDE FINAL
950  {
951  BaseRequestImpl::cancel();
952  }
953 
954  virtual void destroy() OVERRIDE FINAL
955  {
956  BaseRequestImpl::destroy();
957  }
958 
959  virtual void lastRequest() OVERRIDE FINAL
960  {
961  BaseRequestImpl::lastRequest();
962  }
963 
964  virtual void lock() OVERRIDE FINAL
965  {
966  m_structureMutex.lock();
967  }
968 
969  virtual void unlock() OVERRIDE FINAL
970  {
971  m_structureMutex.unlock();
972  }
973 };
974 
975 
976 
977 
978 
979 
980 
981 
982 class ChannelPutGetImpl :
983  public BaseRequestImpl,
984  public ChannelPutGet
985 {
986 public:
987  const ChannelPutGetRequester::weak_pointer m_callback;
988 
989  const PVStructure::shared_pointer m_pvRequest;
990 
991  // put data container
992  PVStructure::shared_pointer m_putData;
993  BitSet::shared_pointer m_putDataBitSet;
994 
995  // get data container
996  PVStructure::shared_pointer m_getData;
997  BitSet::shared_pointer m_getDataBitSet;
998 
999  Mutex m_structureMutex;
1000 
1001  ChannelPutGetImpl(ClientChannelImpl::shared_pointer const & channel,
1002  ChannelPutGetRequester::shared_pointer const & requester,
1003  PVStructure::shared_pointer const & pvRequest) :
1004  BaseRequestImpl(channel),
1005  m_callback(requester),
1006  m_pvRequest(pvRequest)
1007  {
1008  }
1009 
1010  virtual void activate() OVERRIDE FINAL
1011  {
1012  if (!m_pvRequest)
1013  {
1014  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(pvRequestNull, external_from_this<ChannelPutGetImpl>(), StructureConstPtr(), StructureConstPtr()));
1015  return;
1016  }
1017 
1018  BaseRequestImpl::activate();
1019 
1020  try {
1021  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1022  } catch (std::runtime_error &rte) {
1023  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(channelDestroyed, external_from_this<ChannelPutGetImpl>(), StructureConstPtr(), StructureConstPtr()));
1024  BaseRequestImpl::destroy(true);
1025  }
1026  }
1027 
1028 
1029  virtual ~ChannelPutGetImpl()
1030  {
1031  }
1032 
1033  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
1034 
1035  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
1036  int32 pendingRequest = beginRequest();
1037  if (pendingRequest < 0)
1038  {
1039  base_send(buffer, control, pendingRequest);
1040  return;
1041  }
1042 
1043  control->startMessage((int8)CMD_PUT_GET, 9);
1044  buffer->putInt(m_channel->getServerChannelID());
1045  buffer->putInt(m_ioid);
1046  if ((pendingRequest & QOS_INIT) == 0)
1047  buffer->putByte((int8)pendingRequest);
1048 
1049  if (pendingRequest & QOS_INIT)
1050  {
1051  buffer->putByte((int8)QOS_INIT);
1052 
1053  // pvRequest
1054  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1055  }
1056  else if (pendingRequest & (QOS_GET | QOS_GET_PUT)) {
1057  // noop
1058  }
1059  else
1060  {
1061  {
1062  // no need to lock here, since it is already locked via TransportSender IF
1063  //Lock lock(m_structureMutex);
1064  m_putDataBitSet->serialize(buffer, control);
1065  m_putData->serialize(buffer, control, m_putDataBitSet.get());
1066  }
1067  }
1068  }
1069 
1070  virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
1071  if (!status.isSuccess())
1072  {
1073  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(status, external_from_this<ChannelPutGetImpl>(), StructureConstPtr(), StructureConstPtr()));
1074  return;
1075  }
1076 
1077  {
1078  Lock lock(m_structureMutex);
1079  m_putData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get());
1080  m_putDataBitSet = createBitSetFor(m_putData, m_putDataBitSet);
1081  m_getData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get());
1082  m_getDataBitSet = createBitSetFor(m_getData, m_getDataBitSet);
1083  }
1084 
1085  // notify
1086  EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(status, external_from_this<ChannelPutGetImpl>(), m_putData->getStructure(), m_getData->getStructure()));
1087  }
1088 
1089 
1090  virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) OVERRIDE FINAL {
1091 
1092  ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1093 
1094  if (qos & QOS_GET)
1095  {
1096  if (!status.isSuccess())
1097  {
1098  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr()));
1099  return;
1100  }
1101 
1102  {
1103  Lock lock(m_structureMutex);
1104  // deserialize get data
1105  m_getDataBitSet->deserialize(payloadBuffer, transport.get());
1106  m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get());
1107  }
1108 
1109  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(status, thisPtr, m_getData, m_getDataBitSet));
1110  }
1111  else if (qos & QOS_GET_PUT)
1112  {
1113  if (!status.isSuccess())
1114  {
1115  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(status, thisPtr, PVStructurePtr(), BitSetPtr()));
1116  return;
1117  }
1118 
1119  {
1120  Lock lock(m_structureMutex);
1121  // deserialize put data
1122  m_putDataBitSet->deserialize(payloadBuffer, transport.get());
1123  m_putData->deserialize(payloadBuffer, transport.get(), m_putDataBitSet.get());
1124  }
1125 
1126  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(status, thisPtr, m_putData, m_putDataBitSet));
1127  }
1128  else
1129  {
1130  if (!status.isSuccess())
1131  {
1132  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(status, thisPtr, PVStructurePtr(), BitSetPtr()));
1133  return;
1134  }
1135 
1136  {
1137  Lock lock(m_structureMutex);
1138  // deserialize data
1139  m_getDataBitSet->deserialize(payloadBuffer, transport.get());
1140  m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get());
1141  }
1142 
1143  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(status, thisPtr, m_getData, m_getDataBitSet));
1144  }
1145  }
1146 
1147 
1148  virtual void putGet(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & bitSet) OVERRIDE FINAL {
1149 
1150  ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1151 
1152  {
1153  Lock guard(m_mutex);
1154  if (m_destroyed) {
1155  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1156  return;
1157  }
1158  if (!m_initialized) {
1159  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1160  return;
1161  }
1162  }
1163 
1164  if (!(*m_putData->getStructure() == *pvPutStructure->getStructure()))
1165  {
1166  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(invalidPutStructureStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1167  return;
1168  }
1169 
1170  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) {
1171  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1172  return;
1173  }
1174 
1175  try {
1176  {
1178  *m_putDataBitSet = *bitSet;
1179  m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet);
1180  }
1181  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1182  } catch (std::runtime_error &rte) {
1183  abortRequest();
1184  EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
1185  }
1186  }
1187 
1188  virtual void getGet() OVERRIDE FINAL {
1189 
1190  ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1191 
1192  {
1193  Lock guard(m_mutex);
1194  if (m_destroyed) {
1195  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1196  return;
1197  }
1198  if (!m_initialized) {
1199  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1200  return;
1201  }
1202  }
1203 
1204  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) {
1205  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1206  return;
1207  }
1208 
1209  try {
1210  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1211  } catch (std::runtime_error &rte) {
1212  abortRequest();
1213  EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
1214  }
1215  }
1216 
1217  virtual void getPut() OVERRIDE FINAL {
1218 
1219  ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1220 
1221  {
1222  Lock guard(m_mutex);
1223  if (m_destroyed) {
1224  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(destroyedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1225  return;
1226  }
1227  if (!m_initialized) {
1228  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(notInitializedStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1229  return;
1230  }
1231  }
1232 
1233  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) {
1234  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(otherRequestPendingStatus, thisPtr, PVStructurePtr(), BitSetPtr()));
1235  return;
1236  }
1237 
1238  try {
1239  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1240  } catch (std::runtime_error &rte) {
1241  abortRequest();
1242  EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(channelNotConnected, thisPtr, PVStructurePtr(), BitSetPtr()));
1243  }
1244  }
1245 
1246  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
1247  {
1248  return BaseRequestImpl::getChannel();
1249  }
1250 
1251  virtual void cancel() OVERRIDE FINAL
1252  {
1253  BaseRequestImpl::cancel();
1254  }
1255 
1256  virtual void destroy() OVERRIDE FINAL
1257  {
1258  BaseRequestImpl::destroy();
1259  }
1260 
1261  virtual void lastRequest() OVERRIDE FINAL
1262  {
1263  BaseRequestImpl::lastRequest();
1264  }
1265 
1266  virtual void lock() OVERRIDE FINAL
1267  {
1268  m_structureMutex.lock();
1269  }
1270 
1271  virtual void unlock() OVERRIDE FINAL
1272  {
1273  m_structureMutex.unlock();
1274  }
1275 
1276 };
1277 
1278 
1279 
1280 
1281 
1282 
1283 
1284 
1285 
1286 
1287 class ChannelRPCImpl :
1288  public BaseRequestImpl,
1289  public ChannelRPC
1290 {
1291 public:
1292  const ChannelRPCRequester::weak_pointer m_callback;
1293 
1294  const PVStructure::shared_pointer m_pvRequest;
1295 
1296  PVStructure::shared_pointer m_structure;
1297 
1298  Mutex m_structureMutex;
1299 
1300  ChannelRPCImpl(ClientChannelImpl::shared_pointer const & channel,
1301  ChannelRPCRequester::shared_pointer const & requester,
1302  PVStructure::shared_pointer const & pvRequest) :
1303  BaseRequestImpl(channel),
1304  m_callback(requester),
1305  m_pvRequest(pvRequest)
1306  {
1307  }
1308 
1309  virtual void activate() OVERRIDE FINAL
1310  {
1311  if (!m_pvRequest)
1312  {
1313  EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(pvRequestNull, external_from_this<ChannelRPCImpl>()));
1314  return;
1315  }
1316 
1317  BaseRequestImpl::activate();
1318 
1319  // subscribe
1320  try {
1321  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1322  } catch (std::runtime_error &rte) {
1323  EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(channelDestroyed, external_from_this<ChannelRPCImpl>()));
1324  BaseRequestImpl::destroy(true);
1325  }
1326  }
1327 
1328  virtual ~ChannelRPCImpl()
1329  {
1330  }
1331 
1332  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
1333 
1334  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
1335  int32 pendingRequest = beginRequest();
1336  if (pendingRequest < 0)
1337  {
1338  base_send(buffer, control, pendingRequest);
1339  return;
1340  }
1341 
1342  control->startMessage((int8)CMD_RPC, 9);
1343  buffer->putInt(m_channel->getServerChannelID());
1344  buffer->putInt(m_ioid);
1345  if ((pendingRequest & QOS_INIT) == 0)
1346  buffer->putByte((int8)pendingRequest);
1347 
1348  if (pendingRequest & QOS_INIT)
1349  {
1350  buffer->putByte((int8)QOS_INIT);
1351 
1352  // pvRequest
1353  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1354  }
1355  else
1356  {
1357  {
1358  // no need to lock here, since it is already locked via TransportSender IF
1359  //Lock lock(m_structureMutex);
1360  SerializationHelper::serializeStructureFull(buffer, control, m_structure);
1361  // release arguments structure
1362  m_structure.reset();
1363  }
1364  }
1365  }
1366 
1367  virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
1368  if (!status.isSuccess())
1369  {
1370  EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this<ChannelRPCImpl>()));
1371  return;
1372  }
1373 
1374  // notify
1375  EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this<ChannelRPCImpl>()));
1376  }
1377 
1378  virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
1379 
1380  ChannelRPC::shared_pointer thisPtr(external_from_this<ChannelRPCImpl>());
1381 
1382  if (!status.isSuccess())
1383  {
1384  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(status, thisPtr, PVStructurePtr()));
1385  return;
1386  }
1387 
1388 
1389  PVStructure::shared_pointer response(SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get()));
1390  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(status, thisPtr, response));
1391  }
1392 
1393  virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) OVERRIDE FINAL {
1394 
1395  ChannelRPC::shared_pointer thisPtr(external_from_this<ChannelRPCImpl>());
1396 
1397  {
1398  Lock guard(m_mutex);
1399  if (m_destroyed) {
1400  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(destroyedStatus, thisPtr, PVStructurePtr()));
1401  return;
1402  }
1403  if (!m_initialized) {
1404  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(notInitializedStatus, thisPtr, PVStructurePtr()));
1405  return;
1406  }
1407  }
1408 
1409  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) {
1410  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(otherRequestPendingStatus, thisPtr, PVStructurePtr()));
1411  return;
1412  }
1413 
1414  try {
1415  {
1416  epicsGuard<epicsMutex> G(m_structureMutex);
1417  m_structure = pvArgument;
1418  }
1419 
1420  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelRPCImpl>());
1421  } catch (std::runtime_error &rte) {
1422  abortRequest();
1423  EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(channelNotConnected, thisPtr, PVStructurePtr()));
1424  }
1425  }
1426 
1427  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
1428  {
1429  return BaseRequestImpl::getChannel();
1430  }
1431 
1432  virtual void cancel() OVERRIDE FINAL
1433  {
1434  BaseRequestImpl::cancel();
1435  }
1436 
1437  virtual void destroy() OVERRIDE FINAL
1438  {
1439  BaseRequestImpl::destroy();
1440  }
1441 
1442  virtual void lastRequest() OVERRIDE FINAL
1443  {
1444  BaseRequestImpl::lastRequest();
1445  }
1446 
1447  virtual void lock() OVERRIDE FINAL
1448  {
1449  m_structureMutex.lock();
1450  }
1451 
1452  virtual void unlock() OVERRIDE FINAL
1453  {
1454  m_structureMutex.unlock();
1455  }
1456 };
1457 
1458 
1459 
1460 
1461 
1462 
1463 
1464 
1465 
1466 class ChannelArrayImpl :
1467  public BaseRequestImpl,
1468  public ChannelArray
1469 {
1470 public:
1471  const ChannelArrayRequester::weak_pointer m_callback;
1472 
1473  const PVStructure::shared_pointer m_pvRequest;
1474 
1475  // data container
1476  PVArray::shared_pointer m_arrayData;
1477 
1478  size_t m_offset;
1479  size_t m_count;
1480  size_t m_stride;
1481 
1482  size_t m_length;
1483 
1484  Mutex m_structureMutex;
1485 
1486  ChannelArrayImpl(ClientChannelImpl::shared_pointer const & channel,
1487  ChannelArrayRequester::shared_pointer const & requester,
1488  PVStructure::shared_pointer const & pvRequest) :
1489  BaseRequestImpl(channel),
1490  m_callback(requester),
1491  m_pvRequest(pvRequest),
1492  m_offset(0), m_count(0),
1493  m_length(0)
1494  {
1495  }
1496 
1497  virtual void activate() OVERRIDE FINAL
1498  {
1499  if (!m_pvRequest)
1500  {
1501  EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(pvRequestNull, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1502  return;
1503  }
1504 
1505  BaseRequestImpl::activate();
1506 
1507  // subscribe
1508  try {
1509  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1510  } catch (std::runtime_error &rte) {
1511  EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(channelDestroyed, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1512  BaseRequestImpl::destroy(true);
1513  }
1514  }
1515 
1516  virtual ~ChannelArrayImpl()
1517  {
1518  }
1519 
1520  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
1521 
1522  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
1523  int32 pendingRequest = beginRequest();
1524  if (pendingRequest < 0)
1525  {
1526  base_send(buffer, control, pendingRequest);
1527  return;
1528  }
1529 
1530  control->startMessage((int8)CMD_ARRAY, 9);
1531  buffer->putInt(m_channel->getServerChannelID());
1532  buffer->putInt(m_ioid);
1533  buffer->putByte((int8)pendingRequest);
1534 
1535  if (pendingRequest & QOS_INIT)
1536  {
1537  // pvRequest
1538  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1539  }
1540  else if (pendingRequest & QOS_GET)
1541  {
1542  // lock... see comment below
1543  SerializeHelper::writeSize(m_offset, buffer, control);
1544  SerializeHelper::writeSize(m_count, buffer, control);
1545  SerializeHelper::writeSize(m_stride, buffer, control);
1546  }
1547  else if (pendingRequest & QOS_GET_PUT) // i.e. setLength
1548  {
1549  // lock... see comment below
1550  SerializeHelper::writeSize(m_length, buffer, control);
1551  }
1552  else if (pendingRequest & QOS_PROCESS) // i.e. getLength
1553  {
1554  // noop
1555  }
1556  // put
1557  else
1558  {
1559  {
1560  // no need to lock here, since it is already locked via TransportSender IF
1561  //Lock lock(m_structureMutex);
1562  SerializeHelper::writeSize(m_offset, buffer, control);
1563  SerializeHelper::writeSize(m_stride, buffer, control);
1564  // TODO what about count sanity check?
1565  m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array
1566  }
1567  }
1568  }
1569 
1570  virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) OVERRIDE FINAL {
1571  if (!status.isSuccess())
1572  {
1573  EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1574  return;
1575  }
1576 
1577  // create data and its bitSet
1578  FieldConstPtr field = transport->cachedDeserialize(payloadBuffer);
1579  {
1580  Lock lock(m_structureMutex);
1581  m_arrayData = dynamic_pointer_cast<PVArray>(getPVDataCreate()->createPVField(field));
1582  }
1583 
1584  // notify
1585  EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this<ChannelArrayImpl>(), m_arrayData->getArray()));
1586  }
1587 
1588  virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) OVERRIDE FINAL {
1589 
1590  ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1591 
1592  if (qos & QOS_GET)
1593  {
1594  if (!status.isSuccess())
1595  {
1596  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, PVArray::shared_pointer()));
1597  return;
1598  }
1599 
1600  {
1601  Lock lock(m_structureMutex);
1602  m_arrayData->deserialize(payloadBuffer, transport.get());
1603  }
1604 
1605  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, m_arrayData));
1606  }
1607  else if (qos & QOS_GET_PUT)
1608  {
1609  EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(status, thisPtr));
1610  }
1611  else if (qos & QOS_PROCESS)
1612  {
1613  size_t length = SerializeHelper::readSize(payloadBuffer, transport.get());
1614 
1615  EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(status, thisPtr, length));
1616  }
1617  else
1618  {
1619  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(status, thisPtr));
1620  }
1621  }
1622 
1623 
1624  virtual void getArray(size_t offset, size_t count, size_t stride) OVERRIDE FINAL {
1625 
1626  // TODO stride == 0 check
1627 
1628  ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1629 
1630  {
1631  Lock guard(m_mutex);
1632  if (m_destroyed) {
1633  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(destroyedStatus, thisPtr, PVArray::shared_pointer()));
1634  return;
1635  }
1636  if (!m_initialized) {
1637  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(notInitializedStatus, thisPtr, PVArray::shared_pointer()));
1638  return;
1639  }
1640  }
1641 
1642  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) {
1643  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(otherRequestPendingStatus, thisPtr, PVArray::shared_pointer()));
1644  return;
1645  }
1646 
1647  try {
1648  {
1649  Lock lock(m_structureMutex);
1650  m_offset = offset;
1651  m_count = count;
1652  m_stride = stride;
1653  }
1654  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1655  } catch (std::runtime_error &rte) {
1656  abortRequest();
1657  EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer()));
1658  }
1659  }
1660 
1661  virtual void putArray(PVArray::shared_pointer const & putArray, size_t offset, size_t count, size_t stride) OVERRIDE FINAL {
1662 
1663  // TODO stride == 0 check
1664 
1665  ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1666 
1667  {
1668  Lock guard(m_mutex);
1669  if (m_destroyed) {
1670  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(destroyedStatus, thisPtr));
1671  return;
1672  }
1673  if (!m_initialized) {
1674  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(notInitializedStatus, thisPtr));
1675  return;
1676  }
1677  }
1678 
1679  if (!(*m_arrayData->getArray() == *putArray->getArray()))
1680  {
1681  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(invalidPutArrayStatus, thisPtr));
1682  return;
1683  }
1684 
1685  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) {
1686  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(otherRequestPendingStatus, thisPtr));
1687  return;
1688  }
1689 
1690  try {
1691  {
1692  Lock lock(m_structureMutex);
1693  m_arrayData->copyUnchecked(*putArray);
1694  m_offset = offset;
1695  m_count = count;
1696  m_stride = stride;
1697  }
1698  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1699  } catch (std::runtime_error &rte) {
1700  abortRequest();
1701  EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr));
1702  }
1703  }
1704 
1705  virtual void setLength(size_t length) OVERRIDE FINAL {
1706 
1707  ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1708 
1709  {
1710  Lock guard(m_mutex);
1711  if (m_destroyed) {
1712  EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(destroyedStatus, thisPtr));
1713  return;
1714  }
1715  if (!m_initialized) {
1716  EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(notInitializedStatus, thisPtr));
1717  return;
1718  }
1719  }
1720 
1721  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) {
1722  EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(otherRequestPendingStatus, thisPtr));
1723  return;
1724  }
1725 
1726  try {
1727  {
1728  Lock lock(m_structureMutex);
1729  m_length = length;
1730  }
1731  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1732  } catch (std::runtime_error &rte) {
1733  abortRequest();
1734  EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr));
1735  }
1736  }
1737 
1738 
1739  virtual void getLength() OVERRIDE FINAL {
1740 
1741  ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1742 
1743  {
1744  Lock guard(m_mutex);
1745  if (m_destroyed) {
1746  EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(destroyedStatus, thisPtr, 0));
1747  return;
1748  }
1749  if (!m_initialized) {
1750  EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(notInitializedStatus, thisPtr, 0));
1751  return;
1752  }
1753  }
1754 
1755  if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_PROCESS : QOS_PROCESS)) {
1756  EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(otherRequestPendingStatus, thisPtr, 0));
1757  return;
1758  }
1759 
1760  try {
1761  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1762  } catch (std::runtime_error &rte) {
1763  abortRequest();
1764  EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0));
1765  }
1766  }
1767 
1768  virtual Channel::shared_pointer getChannel() OVERRIDE FINAL
1769  {
1770  return BaseRequestImpl::getChannel();
1771  }
1772 
1773  virtual void cancel() OVERRIDE FINAL
1774  {
1775  BaseRequestImpl::cancel();
1776  }
1777 
1778  virtual void destroy() OVERRIDE FINAL
1779  {
1780  BaseRequestImpl::destroy();
1781  }
1782 
1783  virtual void lastRequest() OVERRIDE FINAL
1784  {
1785  BaseRequestImpl::lastRequest();
1786  }
1787 
1788  virtual void lock() OVERRIDE FINAL
1789  {
1790  m_structureMutex.lock();
1791  }
1792 
1793  virtual void unlock() OVERRIDE FINAL
1794  {
1795  m_structureMutex.unlock();
1796  }
1797 };
1798 
1799 
1800 
1801 
1802 
1803 
1804 
1805 class MonitorStrategy : public Monitor {
1806 public:
1807  virtual ~MonitorStrategy() {};
1808  virtual void init(StructureConstPtr const & structure) = 0;
1809  virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0;
1810  virtual void unlisten() = 0;
1811 };
1812 
1813 typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
1814 typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
1815 
1816 
1817 class MonitorStrategyQueue :
1818  public MonitorStrategy,
1819  public TransportSender,
1820  public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
1821 {
1822 private:
1823 
1824  const int32 m_queueSize;
1825 
1826  StructureConstPtr m_lastStructure;
1827  FreeElementQueue m_freeQueue;
1828  MonitorElementQueue m_monitorQueue;
1829 
1830 
1831  const MonitorRequester::weak_pointer m_callback;
1832 
1833  Mutex m_mutex;
1834 
1835  BitSet m_bitSet1;
1836  BitSet m_bitSet2;
1837  MonitorElement::shared_pointer m_overrunElement;
1838  bool m_overrunInProgress;
1839 
1840  PVStructure::shared_pointer m_up2datePVStructure;
1841 
1842  int32 m_releasedCount;
1843  bool m_reportQueueStateInProgress;
1844 
1845  // TODO check for cyclic-ref
1846  const ClientChannelImpl::shared_pointer m_channel;
1847  const pvAccessID m_ioid;
1848 
1849  const bool m_pipeline;
1850  const int32 m_ackAny;
1851 
1852  bool m_unlisten;
1853 
1854 public:
1855 
1856  MonitorStrategyQueue(ClientChannelImpl::shared_pointer channel, pvAccessID ioid,
1857  MonitorRequester::weak_pointer const & callback,
1858  int32 queueSize,
1859  bool pipeline, int32 ackAny) :
1860  m_queueSize(queueSize), m_lastStructure(),
1861  m_freeQueue(),
1862  m_monitorQueue(),
1863  m_callback(callback), m_mutex(),
1864  m_bitSet1(), m_bitSet2(), m_overrunInProgress(false),
1865  m_releasedCount(0),
1866  m_reportQueueStateInProgress(false),
1867  m_channel(channel), m_ioid(ioid),
1868  m_pipeline(pipeline), m_ackAny(ackAny),
1869  m_unlisten(false)
1870  {
1871  if (queueSize <= 1)
1872  throw std::invalid_argument("queueSize <= 1");
1873 
1874  m_freeQueue.reserve(m_queueSize);
1875  // TODO array based deque
1876  //m_monitorQueue.reserve(m_queueSize);
1877  }
1878 
1879  virtual ~MonitorStrategyQueue() {}
1880 
1881  virtual void init(StructureConstPtr const & structure) OVERRIDE FINAL {
1882  Lock guard(m_mutex);
1883 
1884  m_releasedCount = 0;
1885  m_reportQueueStateInProgress = false;
1886 
1887  {
1888  while (!m_monitorQueue.empty())
1889  m_monitorQueue.pop();
1890 
1891  m_freeQueue.clear();
1892 
1893  m_up2datePVStructure.reset();
1894 
1895  for (int32 i = 0; i < m_queueSize; i++)
1896  {
1897  PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
1898  MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));
1899  m_freeQueue.push_back(monitorElement);
1900  }
1901 
1902  m_lastStructure = structure;
1903  }
1904  }
1905 
1906 
1907  virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) OVERRIDE FINAL {
1908 
1909  {
1910  // TODO do not lock deserialization
1911  Lock guard(m_mutex);
1912 
1913  if (m_overrunInProgress)
1914  {
1915  PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr;
1916  BitSet::shared_pointer changedBitSet = m_overrunElement->changedBitSet;
1917  BitSet::shared_pointer overrunBitSet = m_overrunElement->overrunBitSet;
1918 
1919  m_bitSet1.deserialize(payloadBuffer, transport.get());
1920  pvStructure->deserialize(payloadBuffer, transport.get(), &m_bitSet1);
1921  m_bitSet2.deserialize(payloadBuffer, transport.get());
1922 
1923  // OR local overrun
1924  // TODO this does not work perfectly if bitSet is compressed !!!
1925  // uncompressed bitSets should be used !!!
1926  overrunBitSet->or_and(*(changedBitSet.get()), m_bitSet1);
1927 
1928  // OR remove change
1929  *(changedBitSet.get()) |= m_bitSet1;
1930 
1931  // OR remote overrun
1932  *(overrunBitSet.get()) |= m_bitSet2;
1933 
1934  // m_up2datePVStructure is already set
1935 
1936  return;
1937  }
1938 
1939  MonitorElementPtr newElement = m_freeQueue.back();
1940  m_freeQueue.pop_back();
1941 
1942  if (m_freeQueue.empty())
1943  {
1944  m_overrunInProgress = true;
1945  m_overrunElement = newElement;
1946  }
1947 
1948  // setup current fields
1949  PVStructurePtr pvStructure = newElement->pvStructurePtr;
1950  BitSet::shared_pointer changedBitSet = newElement->changedBitSet;
1951  BitSet::shared_pointer overrunBitSet = newElement->overrunBitSet;
1952 
1953  // deserialize changedBitSet and data, and overrun bit set
1954  changedBitSet->deserialize(payloadBuffer, transport.get());
1955  if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get()) {
1956  assert(pvStructure->getStructure().get()==m_up2datePVStructure->getStructure().get());
1957  pvStructure->copyUnchecked(*m_up2datePVStructure, *changedBitSet, true);
1958  }
1959  pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
1960  overrunBitSet->deserialize(payloadBuffer, transport.get());
1961 
1962  m_up2datePVStructure = pvStructure;
1963 
1964  if (!m_overrunInProgress)
1965  m_monitorQueue.push(newElement);
1966  }
1967 
1968  if (!m_overrunInProgress)
1969  {
1970  EXCEPTION_GUARD3(m_callback, cb, cb->monitorEvent(shared_from_this()));
1971  }
1972  }
1973 
1974  virtual void unlisten() OVERRIDE FINAL
1975  {
1976  bool notifyUnlisten = false;
1977  {
1978  Lock guard(m_mutex);
1979  notifyUnlisten = m_monitorQueue.empty();
1980  m_unlisten = !notifyUnlisten;
1981  }
1982 
1983  if (notifyUnlisten)
1984  {
1985  EXCEPTION_GUARD3(m_callback, cb, cb->unlisten(shared_from_this()));
1986  }
1987  }
1988 
1989  virtual MonitorElement::shared_pointer poll() OVERRIDE FINAL {
1990  Lock guard(m_mutex);
1991 
1992  if (m_monitorQueue.empty()) {
1993 
1994  if (m_unlisten) {
1995  m_unlisten = false;
1996  guard.unlock();
1997  EXCEPTION_GUARD3(m_callback, cb, cb->unlisten(shared_from_this()));
1998  }
1999  return MonitorElement::shared_pointer();
2000  }
2001 
2002  MonitorElement::shared_pointer retVal(m_monitorQueue.front());
2003  m_monitorQueue.pop();
2004  return retVal;
2005  }
2006 
2007  // NOTE: a client must always call poll() after release() to check the presence of any new monitor elements
2008  virtual void release(MonitorElement::shared_pointer const & monitorElement) OVERRIDE FINAL {
2009 
2010  // fast sanity check check if monitorElement->pvStructurePtr->getStructure() matches
2011  // not to accept wrong structure (might happen on monitor reconnect with different type)
2012  // silent return
2013  if (monitorElement->pvStructurePtr->getStructure().get() != m_lastStructure.get())
2014  return;
2015 
2016  bool sendAck = false;
2017  {
2018  Lock guard(m_mutex);
2019 
2020  m_freeQueue.push_back(monitorElement);
2021 
2022  if (m_overrunInProgress)
2023  {
2024  // compress bit-set
2025  PVStructurePtr pvStructure = m_overrunElement->pvStructurePtr;
2026  BitSetUtil::compress(m_overrunElement->changedBitSet, pvStructure);
2027  BitSetUtil::compress(m_overrunElement->overrunBitSet, pvStructure);
2028 
2029  m_monitorQueue.push(m_overrunElement);
2030 
2031  m_overrunElement.reset();
2032  m_overrunInProgress = false;
2033  }
2034 
2035  if (m_pipeline)
2036  {
2037  m_releasedCount++;
2038  if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny)
2039  {
2040  sendAck = true;
2041  m_reportQueueStateInProgress = true;
2042  }
2043  }
2044 
2045  if (sendAck)
2046  {
2047  guard.unlock();
2048 
2049  try
2050  {
2051  m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
2052  } catch (std::runtime_error&) {
2053  // assume wrong connection state from checkAndGetTransport()
2054  guard.lock();
2055  m_reportQueueStateInProgress = false;
2056  } catch (std::exception& e) {
2057  LOG(logLevelWarn, "Ignore exception during MonitorStrategyQueue::release: %s", e.what());
2058  guard.lock();
2059  m_reportQueueStateInProgress = false;
2060  }
2061  }
2062  }
2063  }
2064 
2065  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
2066  control->startMessage((int8)CMD_MONITOR, 9);
2067  buffer->putInt(m_channel->getServerChannelID());
2068  buffer->putInt(m_ioid);
2069  buffer->putByte((int8)QOS_GET_PUT);
2070 
2071  {
2072  Lock guard(m_mutex);
2073  buffer->putInt(m_releasedCount);
2074  m_releasedCount = 0;
2075  m_reportQueueStateInProgress = false;
2076  }
2077 
2078  // immediate send
2079  control->flush(true);
2080  }
2081 
2082  Status start() OVERRIDE FINAL {
2083  Lock guard(m_mutex);
2084  while (!m_monitorQueue.empty())
2085  {
2086  m_freeQueue.push_back(m_monitorQueue.front());
2087  m_monitorQueue.pop();
2088  }
2089  if (m_overrunElement)
2090  {
2091  m_freeQueue.push_back(m_overrunElement);
2092  m_overrunElement.reset();
2093  }
2094  m_overrunInProgress = false;
2095  return Status::Ok;
2096  }
2097 
2098  Status stop() OVERRIDE FINAL {
2099  return Status::Ok;
2100  }
2101 
2102  void destroy() OVERRIDE FINAL {
2103  }
2104 
2105 };
2106 
2107 
2108 
2109 
2110 class ChannelMonitorImpl :
2111  public BaseRequestImpl,
2112  public Monitor
2113 {
2114 public:
2115  const MonitorRequester::weak_pointer m_callback;
2116  bool m_started;
2117 
2118  const PVStructure::shared_pointer m_pvRequest;
2119 
2120  std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
2121 
2122  int32 m_queueSize;
2123  bool m_pipeline;
2124  int32 m_ackAny;
2125 
2126  ChannelMonitorImpl(
2127  ClientChannelImpl::shared_pointer const & channel,
2128  MonitorRequester::shared_pointer const & requester,
2129  PVStructure::shared_pointer const & pvRequest)
2130  :
2131  BaseRequestImpl(channel),
2132  m_callback(requester),
2133  m_started(false),
2134  m_pvRequest(pvRequest),
2135  m_queueSize(2),
2136  m_pipeline(false),
2137  m_ackAny(0)
2138  {
2139  }
2140 
2141  virtual void activate() OVERRIDE FINAL
2142  {
2143  if (!m_pvRequest)
2144  {
2145  EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(pvRequestNull, external_from_this<ChannelMonitorImpl>(), StructureConstPtr()));
2146  return;
2147  }
2148 
2149  PVStructurePtr pvOptions = m_pvRequest->getSubField<PVStructure>("record._options");
2150  if (pvOptions) {
2151  PVScalarPtr option(pvOptions->getSubField<PVScalar>("queueSize"));
2152  if (option) {
2153  try {
2154  m_queueSize = option->getAs<int32>();
2155  if(m_queueSize<2)
2156  m_queueSize = 2;
2157  }catch(std::runtime_error& e){
2158  SEND_MESSAGE(m_callback, cb, "Invalid queueSize=", warningMessage);
2159  }
2160  }
2161 
2162  option = pvOptions->getSubField<PVScalar>("pipeline");
2163  if (option) {
2164  try {
2165  m_pipeline = option->getAs<epics::pvData::boolean>();
2166  }catch(std::runtime_error& e){
2167  SEND_MESSAGE(m_callback, cb, "Invalid pipeline=", warningMessage);
2168  }
2169  }
2170 
2171  // pipeline options
2172  if (m_pipeline)
2173  {
2174  // defaults to queueSize/2
2175  m_ackAny = m_queueSize/2;
2176 
2177  option = pvOptions->getSubField<PVScalar>("ackAny");
2178  if (option) {
2179  bool done = false;
2180  int32 size = -1; /* -1 only to silence warning, should never be used */
2181 
2182  if(option->getScalar()->getScalarType()==pvString) {
2183  std::string sval(option->getAs<std::string>());
2184 
2185  if(!sval.empty() && sval[sval.size()-1]=='%') {
2186  try {
2187  double percent = castUnsafe<double>(sval.substr(0, sval.size()-1));
2188  size = (m_queueSize * percent) / 100.0;
2189  done = true;
2190  }catch(std::runtime_error&){
2191  SEND_MESSAGE(m_callback, cb, "ackAny= invalid precentage", warningMessage);
2192  }
2193  }
2194  }
2195 
2196  if(!done) {
2197  try {
2198  size = option->getAs<int32>();
2199  done = true;
2200  }catch(std::runtime_error&){
2201  SEND_MESSAGE(m_callback, cb, "ackAny= invalid value", warningMessage);
2202  }
2203  }
2204 
2205  if(!done) {
2206  } else if (size <= 0) {
2207  m_ackAny = 1;
2208  } else {
2209  m_ackAny = (m_ackAny <= m_queueSize) ? size : m_queueSize;
2210  }
2211  }
2212  }
2213  }
2214 
2215  BaseRequestImpl::activate();
2216 
2217  std::tr1::shared_ptr<MonitorStrategyQueue> tp(
2218  new MonitorStrategyQueue(m_channel, m_ioid, m_callback, m_queueSize,
2219  m_pipeline, m_ackAny)
2220  );
2221  m_monitorStrategy = tp;
2222 
2223  // subscribe
2224  try {
2225  resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
2226  } catch (std::runtime_error &rte) {
2227  EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(channelDestroyed, external_from_this<ChannelMonitorImpl>(), StructureConstPtr()));
2228  BaseRequestImpl::destroy(true);
2229  }
2230  }
2231 
2232  // override default impl. to provide pipeline QoS flag
2233  virtual void resubscribeSubscription(Transport::shared_pointer const & transport) OVERRIDE FINAL {
2234  if (transport.get() != 0 && !m_subscribed.get() &&
2235  startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT))
2236  {
2237  m_subscribed.set();
2238  transport->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2239  }
2240  }
2241 
2242  virtual ~ChannelMonitorImpl() {}
2243 
2244  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL { return m_callback.lock(); }
2245 
2246  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
2247  int32 pendingRequest = beginRequest();
2248  if (pendingRequest < 0)
2249  {
2250  base_send(buffer, control, pendingRequest);
2251  return;
2252  }
2253 
2254  control->startMessage((int8)CMD_MONITOR, 9);
2255  buffer->putInt(m_channel->getServerChannelID());
2256  buffer->putInt(m_ioid);
2257  buffer->putByte((int8)pendingRequest);
2258 
2259  if (pendingRequest & QOS_INIT)
2260  {
2261  // pvRequest
2262  SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
2263 
2264  // if streaming
2265  if (pendingRequest & QOS_GET_PUT)
2266  {
2267  control->ensureBuffer(4);
2268  buffer->putInt(m_queueSize);
2269  }
2270  }
2271  }
2272 
2273  virtual void initResponse(
2274  Transport::shared_pointer const & transport,
2275  int8 /*version*/,
2276  ByteBuffer* payloadBuffer,
2277  int8 /*qos*/,
2278  const Status& status) OVERRIDE FINAL
2279  {
2280  if (!status.isSuccess())
2281  {
2282  EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(status, external_from_this<ChannelMonitorImpl>(), StructureConstPtr()));
2283  return;
2284  }
2285 
2288  transport->cachedDeserialize(payloadBuffer)
2289  );
2290  if(!structure)
2291  throw std::runtime_error("initResponse() w/o Structure");
2292  m_monitorStrategy->init(structure);
2293 
2294  bool restoreStartedState = m_started;
2295 
2296  // notify
2297  EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(status, external_from_this<ChannelMonitorImpl>(), structure));
2298 
2299  if (restoreStartedState)
2300  start();
2301  }
2302 
2303  virtual void normalResponse(
2304  Transport::shared_pointer const & transport,
2305  int8 /*version*/,
2306  ByteBuffer* payloadBuffer,
2307  int8 qos,
2308  const Status& /*status*/) OVERRIDE FINAL
2309  {
2310  if (qos & QOS_GET)
2311  {
2312  // TODO not supported by IF yet...
2313  }
2314  else if (qos & QOS_DESTROY)
2315  {
2316  // TODO for now status is ignored
2317 
2318  if (payloadBuffer->getRemaining())
2319  m_monitorStrategy->response(transport, payloadBuffer);
2320 
2321  // unlisten will be called when all the elements in the queue gets processed
2322  m_monitorStrategy->unlisten();
2323  }
2324  else
2325  {
2326  m_monitorStrategy->response(transport, payloadBuffer);
2327  }
2328  }
2329 
2330  // override, since we optimize status
2331  virtual void response(
2332  Transport::shared_pointer const & transport,
2333  int8 version,
2334  ByteBuffer* payloadBuffer) OVERRIDE FINAL
2335  {
2336  transport->ensureData(1);
2337  int8 qos = payloadBuffer->getByte();
2338  if (qos & QOS_INIT)
2339  {
2340  Status status;
2341  status.deserialize(payloadBuffer, transport.get());
2342  if (status.isSuccess())
2343  {
2344  Lock G(m_mutex);
2345  m_initialized = true;
2346  }
2347  initResponse(transport, version, payloadBuffer, qos, status);
2348  }
2349  else if (qos & QOS_DESTROY)
2350  {
2351  Status status;
2352  status.deserialize(payloadBuffer, transport.get());
2353 
2354  {
2355  Lock G(m_mutex);
2356  m_initialized = false;
2357  }
2358 
2359  normalResponse(transport, version, payloadBuffer, qos, status);
2360  }
2361  else
2362  {
2363  normalResponse(transport, version, payloadBuffer, qos, Status::Ok);
2364  }
2365 
2366  }
2367 
2368  virtual Status start() OVERRIDE FINAL
2369  {
2370  Lock guard(m_mutex);
2371 
2372  if (m_destroyed)
2373  return BaseRequestImpl::destroyedStatus;
2374  if (!m_initialized)
2375  return BaseRequestImpl::notInitializedStatus;
2376 
2377  m_monitorStrategy->start();
2378 
2379  // start == process + get
2380  if (!startRequest(QOS_PROCESS | QOS_GET))
2381  return BaseRequestImpl::otherRequestPendingStatus;
2382 
2383  bool restore = m_started;
2384  m_started = true;
2385 
2386  guard.unlock();
2387 
2388  try
2389  {
2390  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2391  return Status::Ok;
2392  } catch (std::runtime_error &rte) {
2393  guard.lock();
2394 
2395  m_started = restore;
2396  abortRequest();
2397  return BaseRequestImpl::channelNotConnected;
2398  }
2399  }
2400 
2401  virtual Status stop() OVERRIDE FINAL
2402  {
2403  Lock guard(m_mutex);
2404 
2405  if (m_destroyed)
2406  return BaseRequestImpl::destroyedStatus;
2407  if (!m_initialized)
2408  return BaseRequestImpl::notInitializedStatus;
2409 
2410  m_monitorStrategy->stop();
2411 
2412  // stop == process + no get
2413  if (!startRequest(QOS_PROCESS))
2414  return BaseRequestImpl::otherRequestPendingStatus;
2415 
2416  bool restore = m_started;
2417  m_started = false;
2418 
2419  guard.unlock();
2420 
2421  try
2422  {
2423  m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2424  return Status::Ok;
2425  } catch (std::runtime_error &rte) {
2426  guard.lock();
2427 
2428  m_started = restore;
2429  abortRequest();
2430  return BaseRequestImpl::channelNotConnected;
2431  }
2432  }
2433 
2434 
2435  virtual void destroy() OVERRIDE FINAL
2436  {
2437  BaseRequestImpl::destroy();
2438  }
2439 
2440  virtual MonitorElement::shared_pointer poll() OVERRIDE FINAL
2441  {
2442  return m_monitorStrategy->poll();
2443  }
2444 
2445  virtual void release(MonitorElement::shared_pointer const & monitorElement) OVERRIDE FINAL
2446  {
2447  m_monitorStrategy->release(monitorElement);
2448  }
2449 
2450 };
2451 
2452 
2453 
2454 class AbstractClientResponseHandler : public ResponseHandler {
2455  EPICS_NOT_COPYABLE(AbstractClientResponseHandler)
2456 protected:
2457  const ClientContextImpl::weak_pointer _context;
2458 public:
2459  AbstractClientResponseHandler(ClientContextImpl::shared_pointer const & context, string const & description) :
2460  ResponseHandler(context.get(), description), _context(ClientContextImpl::weak_pointer(context)) {
2461  }
2462 
2463  virtual ~AbstractClientResponseHandler() {
2464  }
2465 };
2466 
2467 class NoopResponse : public AbstractClientResponseHandler {
2468 public:
2469  NoopResponse(ClientContextImpl::shared_pointer const & context, string const & description) :
2470  AbstractClientResponseHandler(context, description)
2471  {
2472  }
2473 
2474  virtual ~NoopResponse() {
2475  }
2476 };
2477 
2478 
2479 class ResponseRequestHandler : public AbstractClientResponseHandler {
2480 public:
2481  ResponseRequestHandler(ClientContextImpl::shared_pointer const & context) :
2482  AbstractClientResponseHandler(context, "Data response")
2483  {
2484  }
2485 
2486  virtual ~ResponseRequestHandler() {
2487  }
2488 
2489  virtual void handleResponse(osiSockAddr* responseFrom,
2490  Transport::shared_pointer const & transport, int8 version, int8 command,
2491  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2492  {
2493  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2494 
2495  transport->ensureData(4);
2496  // TODO check and optimize?
2497  ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(payloadBuffer->getInt());
2498  if (rr)
2499  {
2500  epics::atomic::add(rr->bytesRX, payloadSize);
2501  rr->response(transport, version, payloadBuffer);
2502  } else {
2503  // oh no, we can't complete parsing this message!
2504  // This might contain updates to our introspectionRegistry, which will lead to failures later on.
2505  // TODO: seperate message parsing from user Operation lifetime...
2506  }
2507  }
2508 };
2509 
2510 
2511 class MultipleResponseRequestHandler : public AbstractClientResponseHandler {
2512 public:
2513  MultipleResponseRequestHandler(ClientContextImpl::shared_pointer const & context) :
2514  AbstractClientResponseHandler(context, "Multiple data response")
2515  {
2516  }
2517 
2518  virtual ~MultipleResponseRequestHandler() {
2519  }
2520 
2521  virtual void handleResponse(osiSockAddr* responseFrom,
2522  Transport::shared_pointer const & transport, int8 version, int8 command,
2523  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2524  {
2525  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2526 
2527  // TODO add submessage payload size, so that non-existant IOID can be skipped
2528  // and others not lost
2529 
2530  ClientContextImpl::shared_pointer context = _context.lock();
2531  while (true)
2532  {
2533  transport->ensureData(4);
2534  pvAccessID ioid = payloadBuffer->getInt();
2535  if (ioid == INVALID_IOID)
2536  return;
2537 
2538  ResponseRequest::shared_pointer rr = context->getResponseRequest(ioid);
2539  if (rr)
2540  {
2541  epics::atomic::add(rr->bytesRX, payloadSize);
2542  rr->response(transport, version, payloadBuffer);
2543  }
2544  else
2545  return; // we cannot deserialize, we are lost in stream, we must stop
2546  }
2547  }
2548 };
2549 
2550 class SearchResponseHandler : public AbstractClientResponseHandler {
2551 public:
2552  SearchResponseHandler(ClientContextImpl::shared_pointer const & context) :
2553  AbstractClientResponseHandler(context, "Search response")
2554  {
2555  }
2556 
2557  virtual ~SearchResponseHandler() {
2558  }
2559 
2560  virtual void handleResponse(osiSockAddr* responseFrom,
2561  Transport::shared_pointer const & transport, int8 version, int8 command,
2562  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2563  {
2564  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2565 
2566  transport->ensureData(12+4+16+2);
2567 
2568  ServerGUID guid;
2569  payloadBuffer->get(guid.value, 0, sizeof(guid.value));
2570 
2571  int32 searchSequenceId = payloadBuffer->getInt();
2572 
2573  osiSockAddr serverAddress;
2574  memset(&serverAddress, 0, sizeof(serverAddress));
2575  serverAddress.ia.sin_family = AF_INET;
2576 
2577  // 128-bit IPv6 address
2578  if (!decodeAsIPv6Address(payloadBuffer, &serverAddress)) return;
2579 
2580  // accept given address if explicitly specified by sender
2581  if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
2582  serverAddress.ia.sin_addr = responseFrom->ia.sin_addr;
2583 
2584  // NOTE: htons might be a macro (e.g. vxWorks)
2585  int16 port = payloadBuffer->getShort();
2586  serverAddress.ia.sin_port = htons(port);
2587 
2588  /*string protocol =*/ SerializeHelper::deserializeString(payloadBuffer, transport.get());
2589 
2590  transport->ensureData(1);
2591  bool found = payloadBuffer->getByte() != 0;
2592  if (!found)
2593  return;
2594 
2595  // reads CIDs
2596  // TODO optimize
2597  ClientContextImpl::shared_pointer context(_context.lock());
2598  if(!context)
2599  return;
2600  std::tr1::shared_ptr<epics::pvAccess::ChannelSearchManager> csm = context->getChannelSearchManager();
2601  int16 count = payloadBuffer->getShort();
2602  for (int i = 0; i < count; i++)
2603  {
2604  transport->ensureData(4);
2605  pvAccessID cid = payloadBuffer->getInt();
2606  csm->searchResponse(guid, cid, searchSequenceId, version, &serverAddress);
2607  }
2608 
2609 
2610  }
2611 };
2612 
2613 class SearchHandler : public AbstractClientResponseHandler {
2614 public:
2615  SearchHandler(ClientContextImpl::shared_pointer const & context) :
2616  AbstractClientResponseHandler(context, "Search")
2617  {
2618  }
2619 
2620  virtual ~SearchHandler() {
2621  }
2622 
2623  virtual void handleResponse(osiSockAddr* responseFrom,
2624  Transport::shared_pointer const & transport, int8 version, int8 command,
2625  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2626  {
2627  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2628 
2629  transport->ensureData(4+1+3+16+2);
2630 
2631  size_t startPosition = payloadBuffer->getPosition();
2632 
2633  /*const int32 searchSequenceId =*/ payloadBuffer->getInt();
2634  const int8 qosCode = payloadBuffer->getByte();
2635 
2636  // reserved part
2637  payloadBuffer->getByte();
2638  payloadBuffer->getShort();
2639 
2640  osiSockAddr responseAddress;
2641  memset(&responseAddress, 0, sizeof(responseAddress));
2642  responseAddress.ia.sin_family = AF_INET;
2643 
2644  // 128-bit IPv6 address
2645  if (!decodeAsIPv6Address(payloadBuffer, &responseAddress)) return;
2646 
2647  // accept given address if explicitly specified by sender
2648  if (responseAddress.ia.sin_addr.s_addr == INADDR_ANY)
2649  responseAddress.ia.sin_addr = responseFrom->ia.sin_addr;
2650 
2651  // NOTE: htons might be a macro (e.g. vxWorks)
2652  int16 port = payloadBuffer->getShort();
2653  responseAddress.ia.sin_port = htons(port);
2654 
2655  // we ignore the rest, since we care only about data relevant
2656  // to do the local multicast
2657 
2658  //
2659  // locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP
2660  //
2661  if ((qosCode & 0x80) == 0x80)
2662  {
2663  // TODO optimize
2664  ClientContextImpl::shared_pointer context = _context.lock();
2665  if (!context)
2666  return;
2667 
2668  BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast<BlockingUDPTransport>(transport);
2669  if (bt && bt->hasLocalMulticastAddress())
2670  {
2671  // RECEIVE_BUFFER_PRE_RESERVE allows to pre-fix message
2672  size_t newStartPos = (startPosition-PVA_MESSAGE_HEADER_SIZE)-PVA_MESSAGE_HEADER_SIZE-16;
2673  payloadBuffer->setPosition(newStartPos);
2674 
2675  // copy part of a header, and add: command, payloadSize, NIF address
2676  payloadBuffer->put(payloadBuffer->getBuffer(), startPosition-PVA_MESSAGE_HEADER_SIZE, PVA_MESSAGE_HEADER_SIZE-5);
2677  payloadBuffer->putByte(CMD_ORIGIN_TAG);
2678  payloadBuffer->putInt(16);
2679  // encode this socket bind address
2680  encodeAsIPv6Address(payloadBuffer, bt->getBindAddress());
2681 
2682  // clear unicast flag
2683  payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
2684 
2685  // update response address
2686  payloadBuffer->setPosition(startPosition+8);
2687  encodeAsIPv6Address(payloadBuffer, &responseAddress);
2688 
2689  // set to end of a message
2690  payloadBuffer->setPosition(payloadBuffer->getLimit());
2691 
2692  bt->send(payloadBuffer->getBuffer()+newStartPos, payloadBuffer->getPosition()-newStartPos,
2693  bt->getLocalMulticastAddress());
2694 
2695  return;
2696  }
2697  }
2698 
2699  }
2700 };
2701 
2702 class BeaconResponseHandler : public AbstractClientResponseHandler {
2703 public:
2704  BeaconResponseHandler(ClientContextImpl::shared_pointer const & context) :
2705  AbstractClientResponseHandler(context, "Beacon")
2706  {}
2707 
2708  virtual ~BeaconResponseHandler() {}
2709 
2710  virtual void handleResponse(osiSockAddr* responseFrom,
2711  Transport::shared_pointer const & transport, int8 version, int8 command,
2712  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2713  {
2714  // reception timestamp
2715  TimeStamp timestamp;
2716  timestamp.getCurrent();
2717 
2718  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2719 
2720  transport->ensureData(12+2+2+16+2);
2721 
2722  ServerGUID guid;
2723  payloadBuffer->get(guid.value, 0, sizeof(guid.value));
2724 
2725  /*int8 qosCode =*/ payloadBuffer->getByte();
2726  int8 sequentalID = payloadBuffer->getByte();
2727  int16 changeCount = payloadBuffer->getShort();
2728 
2729  osiSockAddr serverAddress;
2730  memset(&serverAddress, 0, sizeof(serverAddress));
2731  serverAddress.ia.sin_family = AF_INET;
2732 
2733  // 128-bit IPv6 address
2734  if (!decodeAsIPv6Address(payloadBuffer, &serverAddress)) return;
2735 
2736  // accept given address if explicitly specified by sender
2737  if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY)
2738  serverAddress.ia.sin_addr = responseFrom->ia.sin_addr;
2739 
2740  // NOTE: htons might be a macro (e.g. vxWorks)
2741  int16 port = payloadBuffer->getShort();
2742  serverAddress.ia.sin_port = htons(port);
2743 
2744  string protocol(SerializeHelper::deserializeString(payloadBuffer, transport.get()));
2745  if(protocol!="tcp")
2746  return;
2747 
2748  // TODO optimize
2749  ClientContextImpl::shared_pointer context = _context.lock();
2750  if (!context)
2751  return;
2752 
2753  std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(responseFrom);
2754  // currently we care only for servers used by this context
2755  if (!beaconHandler)
2756  return;
2757 
2758  // extra data
2759  PVFieldPtr data;
2760  const FieldConstPtr field = getFieldCreate()->deserialize(payloadBuffer, transport.get());
2761  if (field)
2762  {
2763  data = getPVDataCreate()->createPVField(field);
2764  data->deserialize(payloadBuffer, transport.get());
2765  }
2766 
2767  // notify beacon handler
2768  beaconHandler->beaconNotify(responseFrom, version, &timestamp, guid, sequentalID, changeCount, data);
2769  }
2770 };
2771 
2772 class ClientConnectionValidationHandler : public AbstractClientResponseHandler {
2773 public:
2774  ClientConnectionValidationHandler(ClientContextImpl::shared_pointer context) :
2775  AbstractClientResponseHandler(context, "Connection validation")
2776  {}
2777 
2778  virtual ~ClientConnectionValidationHandler() {}
2779 
2780  virtual void handleResponse(osiSockAddr* responseFrom,
2781  Transport::shared_pointer const & transport, int8 version, int8 command,
2782  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2783  {
2784  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2785 
2786  transport->ensureData(4+2);
2787 
2788  transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt());
2789  // TODO
2790  // TODO serverIntrospectionRegistryMaxSize
2791  /*int serverIntrospectionRegistryMaxSize = */ payloadBuffer->getShort();
2792 
2793  // authNZ
2794  size_t size = SerializeHelper::readSize(payloadBuffer, transport.get());
2795  vector<string> offeredSecurityPlugins;
2796  offeredSecurityPlugins.reserve(size);
2797  for (size_t i = 0; i < size; i++)
2798  offeredSecurityPlugins.push_back(
2799  SerializeHelper::deserializeString(payloadBuffer, transport.get())
2800  );
2801 
2802  epics::pvAccess::detail::BlockingClientTCPTransportCodec* cliTransport(static_cast<epics::pvAccess::detail::BlockingClientTCPTransportCodec*>(transport.get()));
2803  //TODO: simplify byzantine class heirarchy...
2804  assert(cliTransport);
2805 
2806  cliTransport->authNZInitialize(offeredSecurityPlugins);
2807  }
2808 };
2809 
2810 class ClientConnectionValidatedHandler : public AbstractClientResponseHandler {
2811 public:
2812  ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) :
2813  AbstractClientResponseHandler(context, "Connection validated")
2814  {}
2815 
2816  virtual ~ClientConnectionValidatedHandler() {}
2817 
2818  virtual void handleResponse(osiSockAddr* responseFrom,
2819  Transport::shared_pointer const & transport, int8 version, int8 command,
2820  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2821  {
2822  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2823 
2824  Status status;
2825  status.deserialize(payloadBuffer, transport.get());
2826  transport->verified(status);
2827 
2828  }
2829 };
2830 
2831 class MessageHandler : public AbstractClientResponseHandler {
2832 public:
2833  MessageHandler(ClientContextImpl::shared_pointer const & context) :
2834  AbstractClientResponseHandler(context, "Message")
2835  {}
2836 
2837  virtual ~MessageHandler() {}
2838 
2839  virtual void handleResponse(osiSockAddr* responseFrom,
2840  Transport::shared_pointer const & transport, int8 version, int8 command,
2841  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2842  {
2843  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2844 
2845  transport->ensureData(5);
2846  int32 ioid = payloadBuffer->getInt();
2847  MessageType type = (MessageType)payloadBuffer->getByte();
2848 
2849  string message = SerializeHelper::deserializeString(payloadBuffer, transport.get());
2850 
2851  bool shown = false;
2852  ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(ioid);
2853  if (rr)
2854  {
2855  epics::atomic::add(rr->bytesRX, payloadSize);
2856  Requester::shared_pointer requester = rr->getRequester();
2857  if (requester) {
2858  requester->message(message, type);
2859  shown = true;
2860  }
2861  }
2862  if(!shown)
2863  std::cerr<<"Orphaned server message "<<type<<" : "<<message<<"\n";
2864  }
2865 };
2866 
2867 class CreateChannelHandler : public AbstractClientResponseHandler {
2868 public:
2869  CreateChannelHandler(ClientContextImpl::shared_pointer const & context) :
2870  AbstractClientResponseHandler(context, "Create channel")
2871  {}
2872 
2873  virtual ~CreateChannelHandler() {}
2874 
2875  virtual void handleResponse(osiSockAddr* responseFrom,
2876  Transport::shared_pointer const & transport, int8 version, int8 command,
2877  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2878  {
2879  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2880 
2881  transport->ensureData(8);
2882  pvAccessID cid = payloadBuffer->getInt();
2883  pvAccessID sid = payloadBuffer->getInt();
2884 
2885  Status status;
2886  status.deserialize(payloadBuffer, transport.get());
2887 
2888  // TODO optimize
2889  ClientChannelImpl::shared_pointer channel = static_pointer_cast<ClientChannelImpl>(_context.lock()->getChannel(cid));
2890  if (channel.get())
2891  {
2892  // failed check
2893  if (!status.isSuccess()) {
2894 
2896  {
2897  std::stringstream ss;
2898  ss << "Failed to create channel '" << channel->getChannelName() << "': ";
2899  ss << status.getMessage();
2900  if (!status.getStackDump().empty())
2901  ss << std::endl << status.getStackDump();
2902  LOG(logLevelDebug, "%s", ss.str().c_str());
2903  }
2904 
2905  channel->createChannelFailed();
2906  return;
2907  }
2908 
2909  //int16 acl = payloadBuffer->getShort();
2910 
2911  channel->connectionCompleted(sid);
2912  }
2913  }
2914 };
2915 
2916 
2917 class DestroyChannelHandler : public AbstractClientResponseHandler {
2918 public:
2919  DestroyChannelHandler(ClientContextImpl::shared_pointer const & context) :
2920  AbstractClientResponseHandler(context, "Destroy channel")
2921  {}
2922 
2923  virtual ~DestroyChannelHandler() {}
2924 
2925  virtual void handleResponse(osiSockAddr* responseFrom,
2926  Transport::shared_pointer const & transport, int8 version, int8 command,
2927  size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) OVERRIDE FINAL
2928  {
2929  AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2930 
2931  transport->ensureData(8);
2932  pvAccessID sid = payloadBuffer->getInt();
2933  pvAccessID cid = payloadBuffer->getInt();
2934  (void)sid;
2935 
2936  // TODO optimize
2937  ClientChannelImpl::shared_pointer channel = static_pointer_cast<ClientChannelImpl>(_context.lock()->getChannel(cid));
2938  if (channel.get())
2939  channel->channelDestroyedOnServer();
2940  }
2941 };
2942 
2943 
2948 class ClientResponseHandler : public ResponseHandler {
2949  EPICS_NOT_COPYABLE(ClientResponseHandler)
2950 private:
2951 
2955  vector<ResponseHandler::shared_pointer> m_handlerTable;
2956 
2957 public:
2958 
2959  virtual ~ClientResponseHandler() {}
2960 
2964  ClientResponseHandler(ClientContextImpl::shared_pointer const & context)
2965  :ResponseHandler(context.get(), "ClientResponseHandler")
2966  {
2967  ResponseHandler::shared_pointer ignoreResponse(new NoopResponse(context, "Ignore"));
2968  ResponseHandler::shared_pointer dataResponse(new ResponseRequestHandler(context));
2969 
2970  m_handlerTable.resize(CMD_CANCEL_REQUEST+1);
2971 
2972  m_handlerTable[CMD_BEACON].reset(new BeaconResponseHandler(context)); /* 0 */
2973  m_handlerTable[CMD_CONNECTION_VALIDATION].reset(new ClientConnectionValidationHandler(context)); /* 1 */
2974  m_handlerTable[CMD_ECHO] = ignoreResponse; /* 2 */
2975  m_handlerTable[CMD_SEARCH].reset(new SearchHandler(context)); /* 3 */
2976  m_handlerTable[CMD_SEARCH_RESPONSE].reset(new SearchResponseHandler(context)); /* 4 */
2977  m_handlerTable[CMD_AUTHNZ].reset(new AuthNZHandler(context.get())); /* 5 */
2978  m_handlerTable[CMD_ACL_CHANGE] = ignoreResponse; /* 6 */
2979  m_handlerTable[CMD_CREATE_CHANNEL].reset(new CreateChannelHandler(context)); /* 7 */
2980  m_handlerTable[CMD_DESTROY_CHANNEL].reset(new DestroyChannelHandler(context)); /* 8 */
2981  m_handlerTable[CMD_CONNECTION_VALIDATED].reset(new ClientConnectionValidatedHandler(context)); /* 9 */
2982  m_handlerTable[CMD_GET] = dataResponse; /* 10 - get response */
2983  m_handlerTable[CMD_PUT] = dataResponse; /* 11 - put response */
2984  m_handlerTable[CMD_PUT_GET] = dataResponse; /* 12 - put-get response */
2985  m_handlerTable[CMD_MONITOR] = dataResponse; /* 13 - monitor response */
2986  m_handlerTable[CMD_ARRAY] = dataResponse; /* 14 - array response */
2987  m_handlerTable[CMD_DESTROY_REQUEST] = ignoreResponse; /* 15 - destroy request */
2988  m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */
2989  m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */
2990  m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */
2991  m_handlerTable[CMD_MULTIPLE_DATA].reset(new MultipleResponseRequestHandler(context)); /* 19 - grouped monitors */
2992  m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */
2993  m_handlerTable[CMD_CANCEL_REQUEST] = ignoreResponse; /* 21 - cancel request */
2994  }
2995 
2996  virtual void handleResponse(osiSockAddr* responseFrom,
2997  Transport::shared_pointer const & transport, int8 version, int8 command,
2998  size_t payloadSize, ByteBuffer* payloadBuffer) OVERRIDE FINAL
2999  {
3000  if (command < 0 || command >= (int8)m_handlerTable.size())
3001  {
3002  if(IS_LOGGABLE(logLevelError)) {
3003  std::cerr<<"Invalid (or unsupported) command: "<<std::hex<<(int)(0xFF&command)<<"\n"
3004  <<HexDump(*payloadBuffer, payloadSize).limit(256u);
3005  }
3006  return;
3007  }
3008  // delegate
3009  m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
3010  }
3011 };
3012 
3013 
3014 
3015 
3016 
3017 
3025  CONTEXT_NOT_INITIALIZED,
3026 
3030  CONTEXT_INITIALIZED,
3031 
3035  CONTEXT_DESTROYED
3036 };
3037 
3038 
3039 
3040 
3041 class InternalClientContextImpl :
3042  public ClientContextImpl,
3043  public ChannelProvider
3044 {
3045 public:
3046  POINTER_DEFINITIONS(InternalClientContextImpl);
3047 
3048  virtual std::string getProviderName() OVERRIDE FINAL
3049  {
3050  return "pva";
3051  }
3052 
3053  virtual ChannelFind::shared_pointer channelFind(
3054  std::string const & channelName,
3055  ChannelFindRequester::shared_pointer const & channelFindRequester) OVERRIDE FINAL
3056  {
3057  // TODO not implemented
3058 
3059  checkChannelName(channelName);
3060 
3061  if (!channelFindRequester.get())
3062  throw std::runtime_error("null requester");
3063 
3064  Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented");
3065  ChannelFind::shared_pointer nullChannelFind;
3066  EXCEPTION_GUARD(channelFindRequester->channelFindResult(errorStatus, nullChannelFind, false));
3067  return nullChannelFind;
3068  }
3069 
3070  virtual ChannelFind::shared_pointer channelList(
3071  ChannelListRequester::shared_pointer const & channelListRequester) OVERRIDE FINAL
3072  {
3073  if (!channelListRequester.get())
3074  throw std::runtime_error("null requester");
3075 
3076  Status errorStatus(Status::STATUSTYPE_ERROR, "not implemented");
3077  ChannelFind::shared_pointer nullChannelFind;
3079  EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none, false));
3080  return nullChannelFind;
3081  }
3082 
3083  virtual Channel::shared_pointer createChannel(
3084  std::string const & channelName,
3085  ChannelRequester::shared_pointer const & channelRequester,
3086  short priority) OVERRIDE FINAL
3087  {
3088  return createChannel(channelName, channelRequester, priority, std::string());
3089  }
3090 
3091  virtual Channel::shared_pointer createChannel(
3092  std::string const & channelName,
3093  ChannelRequester::shared_pointer const & channelRequester,
3094  short priority,
3095  std::string const & addressesStr) OVERRIDE FINAL
3096  {
3097  InetAddrVector addresses;
3098  getSocketAddressList(addresses, addressesStr, PVA_SERVER_PORT);
3099 
3100  Channel::shared_pointer channel = createChannelInternal(channelName, channelRequester, priority, addresses);
3101  if (channel.get())
3102  channelRequester->channelCreated(Status::Ok, channel);
3103  return channel;
3104 
3105  // NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors
3106  }
3107 
3108 public:
3112  class InternalChannelImpl :
3113  public ClientChannelImpl,
3114  public TimerCallback
3115  {
3116  InternalChannelImpl(InternalChannelImpl&);
3117  InternalChannelImpl& operator=(const InternalChannelImpl&);
3118  public:
3119  POINTER_DEFINITIONS(InternalChannelImpl);
3120  private:
3121 
3122  const weak_pointer m_external_this, m_internal_this;
3123 
3124  shared_pointer external_from_this() {
3125  return shared_pointer(m_external_this);
3126  }
3127  shared_pointer internal_from_this() {
3128  return shared_pointer(m_internal_this);
3129  }
3130 
3134  const std::tr1::shared_ptr<InternalClientContextImpl> m_context;
3135 
3139  const pvAccessID m_channelID;
3140 
3144  const string m_name;
3145 
3149  const ChannelRequester::weak_pointer m_requester;
3150 
3151  public:
3154  std::tr1::shared_ptr<ChannelGetFieldRequestImpl> m_getfield;
3155  private:
3156 
3160  const short m_priority;
3161 
3165  InetAddrVector m_addresses;
3166 
3170  int m_addressIndex;
3171 
3175  ConnectionState m_connectionState;
3176 
3180  IOIDResponseRequestMap m_responseRequests;
3181 
3185  Mutex m_responseRequestsMutex;
3186 
3187  bool m_needSubscriptionUpdate;
3188 
3192  bool m_allowCreation;
3193 
3194  /* ****************** */
3195  /* PVA protocol fields */
3196  /* ****************** */
3197 
3201  Transport::shared_pointer m_transport;
3202 
3206  pvAccessID m_serverChannelID;
3207 public:
3211  Mutex m_channelMutex;
3212 private:
3216  bool m_issueCreateMessage;
3217 
3219  int32_t m_userValue;
3220 
3224  ServerGUID m_guid;
3225 
3226  public:
3227  static size_t num_instances;
3228  static size_t num_active;
3229  private:
3230 
3238  InternalChannelImpl(
3239  InternalClientContextImpl::shared_pointer const & context,
3240  pvAccessID channelID,
3241  string const & name,
3242  ChannelRequester::shared_pointer const & requester,
3243  short priority,
3244  const InetAddrVector& addresses) :
3245  m_context(context),
3246  m_channelID(channelID),
3247  m_name(name),
3248  m_requester(requester),
3249  m_priority(priority),
3250  m_addresses(addresses),
3251  m_addressIndex(0),
3252  m_connectionState(NEVER_CONNECTED),
3253  m_needSubscriptionUpdate(false),
3254  m_allowCreation(true),
3255  m_serverChannelID(0xFFFFFFFF),
3256  m_issueCreateMessage(true)
3257  {
3258  REFTRACE_INCREMENT(num_instances);
3259  }
3260 
3261  void activate()
3262  {
3263  // register before issuing search request
3264  m_context->registerChannel(internal_from_this());
3265 
3266  // connect
3267  connect();
3268 
3269  REFTRACE_INCREMENT(num_active);
3270  }
3271 
3272  public:
3273 
3274  static ClientChannelImpl::shared_pointer create(InternalClientContextImpl::shared_pointer context,
3275  pvAccessID channelID,
3276  string const & name,
3277  ChannelRequester::shared_pointer requester,
3278  short priority,
3279  const InetAddrVector& addresses)
3280  {
3281  std::tr1::shared_ptr<InternalChannelImpl> internal(
3282  new InternalChannelImpl(context, channelID, name, requester, priority, addresses)),
3283  external(internal.get(), epics::pvAccess::Destroyable::cleaner(internal));
3284  const_cast<weak_pointer&>(internal->m_internal_this) = internal;
3285  const_cast<weak_pointer&>(internal->m_external_this) = external;
3286  internal->activate();
3287  return external;
3288  }
3289 
3290  virtual ~InternalChannelImpl()
3291  {
3292  REFTRACE_DECREMENT(num_instances);
3293  }
3294 
3295  virtual void destroy() OVERRIDE FINAL
3296  {
3297  // Hack. Prevent Transport from being dtor'd while m_channelMutex is held
3298  Transport::shared_pointer old_transport;
3299  {
3300  Lock guard(m_channelMutex);
3301  if (m_connectionState == DESTROYED)
3302  return;
3303  REFTRACE_DECREMENT(num_active);
3304 
3305  old_transport = m_transport;
3306 
3307  m_getfield.reset();
3308 
3309  // stop searching...
3310  shared_pointer thisChannelPointer = internal_from_this();
3311  m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer);
3312 
3313  disconnectPendingIO(true);
3314 
3315  if (m_connectionState == CONNECTED)
3316  {
3317  disconnect(false, true);
3318  }
3319  else if (m_transport)
3320  {
3321  // unresponsive state, do not forget to release transport
3322  m_transport->release(getID());
3323  m_transport.reset();
3324  }
3325 
3326 
3327  setConnectionState(DESTROYED);
3328 
3329  // unregister
3330  m_context->unregisterChannel(thisChannelPointer);
3331  }
3332 
3333  // should be called without any lock hold
3334  reportChannelStateChange();
3335  }
3336 
3337  virtual string getRequesterName() OVERRIDE FINAL
3338  {
3339  return getChannelName();
3340  }
3341 
3342  private:
3343 
3344  // intentionally returning non-const reference
3345  int32_t& getUserValue() OVERRIDE FINAL {
3346  return m_userValue;
3347  }
3348 
3349  virtual ChannelProvider::shared_pointer getProvider() OVERRIDE FINAL
3350  {
3351  return m_context->external_from_this();
3352  }
3353 
3354  // NOTE: synchronization guarantees that <code>transport</code> is non-<code>0</code> and <code>state == CONNECTED</code>.
3355  virtual std::string getRemoteAddress() OVERRIDE FINAL
3356  {
3357  Lock guard(m_channelMutex);
3358  if (m_connectionState != CONNECTED) {
3359  return "";
3360  }
3361  else
3362  {
3363  return m_transport->getRemoteName();
3364  }
3365  }
3366 
3367  virtual std::string getChannelName() OVERRIDE FINAL
3368  {
3369  return m_name;
3370  }
3371 
3372  virtual ChannelRequester::shared_pointer getChannelRequester() OVERRIDE FINAL
3373  {
3374  return ChannelRequester::shared_pointer(m_requester);
3375  }
3376 
3377  virtual ConnectionState getConnectionState() OVERRIDE FINAL
3378  {
3379  Lock guard(m_channelMutex);
3380  return m_connectionState;
3381  }
3382 
3383  virtual AccessRights getAccessRights(std::tr1::shared_ptr<epics::pvData::PVField> const &) OVERRIDE FINAL
3384  {
3385  return readWrite;
3386  }
3387 
3388  virtual pvAccessID getID() OVERRIDE FINAL {
3389  return m_channelID;
3390  }
3391 
3392  pvAccessID getChannelID() OVERRIDE FINAL {
3393  return m_channelID;
3394  }
3395 public:
3396  virtual ClientContextImpl* getContext() OVERRIDE FINAL {
3397  return m_context.get();
3398  }
3399 
3400  virtual pvAccessID getSearchInstanceID() OVERRIDE FINAL {
3401  return m_channelID;
3402  }
3403 
3404  virtual const string& getSearchInstanceName() OVERRIDE FINAL {
3405  return m_name;
3406  }
3407 
3408  virtual pvAccessID getServerChannelID() OVERRIDE FINAL {
3409  Lock guard(m_channelMutex);
3410  return m_serverChannelID;
3411  }
3412 
3413  virtual void registerResponseRequest(ResponseRequest::shared_pointer const & responseRequest) OVERRIDE FINAL
3414  {
3415  Lock guard(m_responseRequestsMutex);
3416  m_responseRequests[responseRequest->getIOID()] = ResponseRequest::weak_pointer(responseRequest);
3417  }
3418 
3419  virtual void unregisterResponseRequest(pvAccessID ioid) OVERRIDE FINAL
3420  {
3421  if (ioid == INVALID_IOID) return;
3422  Lock guard(m_responseRequestsMutex);
3423  m_responseRequests.erase(ioid);
3424  }
3425 
3426  void connect() {
3427  Lock guard(m_channelMutex);
3428  // if not destroyed...
3429  if (m_connectionState == DESTROYED)
3430  throw std::runtime_error("Channel destroyed.");
3431  else if (m_connectionState != CONNECTED)
3432  initiateSearch();
3433  }
3434 
3435  void disconnect() {
3436  {
3437  // Hack. Prevent Transport from being dtor'd while m_channelMutex is held
3438  Transport::shared_pointer old_transport;
3439  Lock guard(m_channelMutex);
3440  old_transport = m_transport;
3441 
3442  // if not destroyed...
3443  if (m_connectionState == DESTROYED)
3444  throw std::runtime_error("Channel destroyed.");
3445  else if (m_connectionState == CONNECTED)
3446  disconnect(false, true);
3447  }
3448 
3449  // should be called without any lock hold
3450  reportChannelStateChange();
3451  }
3452 
3453  virtual void timeout() {
3454  createChannelFailed();
3455  }
3456 
3460  virtual void createChannelFailed() OVERRIDE FINAL
3461  {
3462  // Hack. Prevent Transport from being dtor'd while m_channelMutex is held
3463  Transport::shared_pointer old_transport;
3464  Lock guard(m_channelMutex);
3465  // release transport if active
3466  if (m_transport)
3467  {
3468  m_transport->release(getID());
3469  old_transport.swap(m_transport);
3470  }
3471 
3472  // ... and search again, with penalty
3473  initiateSearch(true);
3474  }
3475 
3481  virtual void connectionCompleted(pvAccessID sid/*, rights*/) OVERRIDE FINAL
3482  {
3483  {
3484  Lock guard(m_channelMutex);
3485 
3486  try
3487  {
3488  // do this silently
3489  if (m_connectionState == DESTROYED)
3490  {
3491  // end connection request
3492  return;
3493  }
3494 
3495  // store data
3496  m_serverChannelID = sid;
3497  //setAccessRights(rights);
3498 
3499  m_addressIndex = 0; // reset
3500 
3501  // user might create monitors in listeners, so this has to be done before this can happen
3502  // however, it would not be nice if events would come before connection event is fired
3503  // but this cannot happen since transport (TCP) is serving in this thread
3504  resubscribeSubscriptions();
3505  setConnectionState(CONNECTED);
3506  }
3507  catch (std::exception& e) {
3508  LOG(logLevelError, "connectionCompleted() %d '%s' unhandled exception: %s\n", sid, m_name.c_str(), e.what());
3509  // noop
3510  }
3511  }
3512 
3513  // should be called without any lock hold
3514  reportChannelStateChange();
3515  }
3516 
3522  void disconnect(bool initiateSearch, bool remoteDestroy) {
3523  // order of oldchan and guard is important to ensure
3524  // oldchan is destoryed after unlock
3525  Transport::shared_pointer oldchan;
3526  Lock guard(m_channelMutex);
3527 
3528  if (m_connectionState != CONNECTED)
3529  return;
3530 
3531  if (!initiateSearch) {
3532  // stop searching...
3533  m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this());
3534  }
3535  setConnectionState(DISCONNECTED);
3536 
3537  disconnectPendingIO(false);
3538 
3539  // release transport
3540  if (m_transport)
3541  {
3542  if (remoteDestroy) {
3543  m_issueCreateMessage = false;
3544  m_transport->enqueueSendRequest(internal_from_this());
3545  }
3546 
3547  m_transport->release(getID());
3548  oldchan.swap(m_transport);
3549  }
3550 
3551  if (initiateSearch)
3552  this->initiateSearch();
3553 
3554  }
3555 
3556  void channelDestroyedOnServer() OVERRIDE FINAL {
3557  if (isConnected())
3558  {
3559  disconnect(true, false);
3560 
3561  // should be called without any lock hold
3562  reportChannelStateChange();
3563  }
3564  }
3565 
3566 #define STATIC_SEARCH_BASE_DELAY_SEC 5
3567 #define STATIC_SEARCH_MAX_MULTIPLIER 10
3568 
3572  void initiateSearch(bool penalize = false)
3573  {
3574  Lock guard(m_channelMutex);
3575 
3576  m_allowCreation = true;
3577 
3578  if (m_addresses.empty())
3579  {
3580  m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize);
3581  }
3582  else
3583  {
3584  m_context->getTimer()->scheduleAfterDelay(internal_from_this(),
3585  (m_addressIndex / m_addresses.size())*STATIC_SEARCH_BASE_DELAY_SEC);
3586  }
3587  }
3588 
3589  virtual void callback() OVERRIDE FINAL {
3590  // TODO cancellaction?!
3591  // TODO not in this timer thread !!!
3592  // TODO boost when a server (from address list) is started!!! IP vs address !!!
3593  int ix = m_addressIndex % m_addresses.size();
3594  m_addressIndex++;
3595  if (m_addressIndex >= static_cast<int>(m_addresses.size()*(STATIC_SEARCH_MAX_MULTIPLIER+1)))
3596  m_addressIndex = m_addresses.size()*STATIC_SEARCH_MAX_MULTIPLIER;
3597 
3598  // NOTE: calls channelConnectFailed() on failure
3599  static ServerGUID guid = { { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 } };
3600  // m_addresses[ix] is modified by the following
3601  searchResponse(guid, PVA_CLIENT_PROTOCOL_REVISION, &m_addresses[ix]);
3602  }
3603 
3604  virtual void timerStopped() OVERRIDE FINAL {
3605  // noop
3606  }
3607 
3608  virtual void searchResponse(const ServerGUID & guid, int8 minorRevision, osiSockAddr* serverAddress) OVERRIDE FINAL {
3609  // Hack. Prevent Transport from being dtor'd while m_channelMutex is held
3610  Transport::shared_pointer old_transport;
3611 
3612  Lock guard(m_channelMutex);
3613  Transport::shared_pointer transport(m_transport);
3614  if (transport)
3615  {
3616  // GUID check case: same server listening on different NIF
3617 
3618  if (!sockAddrAreIdentical(&transport->getRemoteAddress(), serverAddress) &&
3619  !std::equal(guid.value, guid.value + 12, m_guid.value))
3620  {
3621  EXCEPTION_GUARD3(m_requester, req, req->message("More than one channel with name '" + m_name +
3622  "' detected, connected to: " + transport->getRemoteName() + ", ignored: " + inetAddressToString(*serverAddress), warningMessage));
3623  }
3624 
3625  // do not pass (create transports) with we already have one
3626  return;
3627  }
3628 
3629  // NOTE: this creates a new or acquires an existing transport (implies increases usage count)
3630  transport = m_context->getTransport(internal_from_this(), serverAddress, minorRevision, m_priority);
3631  if (!transport)
3632  {
3633  createChannelFailed();
3634  return;
3635  }
3636 
3637 
3638  // remember GUID
3639  std::copy(guid.value, guid.value + 12, m_guid.value);
3640 
3641  // create channel
3642  {
3643  Lock guard(m_channelMutex);
3644 
3645  // do not allow duplicate creation to the same transport
3646  if (!m_allowCreation)
3647  return;
3648  m_allowCreation = false;
3649 
3650  // check existing transport
3651  if (m_transport && m_transport.get() != transport.get())
3652  {
3653  disconnectPendingIO(false);
3654 
3655  m_transport->release(getID());
3656  }
3657  else if (m_transport.get() == transport.get())
3658  {
3659  // request to sent create request to same transport, ignore
3660  // this happens when server is slower (processing search requests) than client generating it
3661  return;
3662  }
3663 
3664  // rotate: transport -> m_transport -> old_transport ->
3665  old_transport.swap(m_transport);
3666  m_transport.swap(transport);
3667 
3668  m_transport->enqueueSendRequest(internal_from_this());
3669  }
3670  }
3671 
3672  virtual void transportClosed() OVERRIDE FINAL {
3673  disconnect(true, false);
3674 
3675  // should be called without any lock hold
3676  reportChannelStateChange();
3677  }
3678 
3679  virtual Transport::shared_pointer checkAndGetTransport() OVERRIDE FINAL
3680  {
3681  Lock guard(m_channelMutex);
3682 
3683  if (m_connectionState == DESTROYED)
3684  throw std::runtime_error("Channel destroyed.");
3685  else if (m_connectionState != CONNECTED)
3686  throw std::runtime_error("Channel not connected.");
3687  return m_transport;
3688  }
3689 
3690  virtual Transport::shared_pointer checkDestroyedAndGetTransport() OVERRIDE FINAL
3691  {
3692  Lock guard(m_channelMutex);
3693 
3694  if (m_connectionState == DESTROYED)
3695  throw std::runtime_error("Channel destroyed.");
3696  else if (m_connectionState == CONNECTED)
3697  return m_transport;
3698  else
3699  return Transport::shared_pointer();
3700  }
3701 
3702  virtual Transport::shared_pointer getTransport() OVERRIDE FINAL
3703  {
3704  Lock guard(m_channelMutex);
3705  return m_transport;
3706  }
3707 
3712  void setConnectionState(ConnectionState connectionState)
3713  {
3714  Lock guard(m_channelMutex);
3715  if (m_connectionState != connectionState)
3716  {
3717  m_connectionState = connectionState;
3718 
3719  //bool connectionStatusToReport = (connectionState == CONNECTED);
3720  //if (connectionStatusToReport != lastReportedConnectionState)
3721  {
3722  //lastReportedConnectionState = connectionStatusToReport;
3723  // TODO via dispatcher ?!!!
3724  //Channel::shared_pointer thisPointer = shared_from_this();
3725  //EXCEPTION_GUARD(m_requester->channelStateChange(thisPointer, connectionState));
3726  channelStateChangeQueue.push(connectionState);
3727  }
3728  }
3729  }
3730 
3731 
3732  std::queue<ConnectionState> channelStateChangeQueue;
3733 
3734  void reportChannelStateChange()
3735  {
3736  // hack
3737  // we should always use the external shared_ptr.
3738  // however, this is already dead during destroy(),
3739  // but we still want to give notification.
3740  // so give the internal ref and hope it isn't stored...
3741  shared_pointer self(m_external_this.lock());
3742  if(!self)
3743  self = internal_from_this();
3744 
3745  while (true)
3746  {
3747  std::vector<ResponseRequest::weak_pointer> ops;
3748  ConnectionState connectionState;
3749  {
3750  Lock guard(m_channelMutex);
3751  if (channelStateChangeQueue.empty())
3752  break;
3753  connectionState = channelStateChangeQueue.front();
3754  channelStateChangeQueue.pop();
3755 
3756  if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
3757  Lock guard(m_responseRequestsMutex);
3758  ops.reserve(m_responseRequests.size());
3759  for(IOIDResponseRequestMap::const_iterator it = m_responseRequests.begin(),
3760  end = m_responseRequests.end();
3761  it!=end; ++it)
3762  {
3763  ops.push_back(it->second);
3764  }
3765  }
3766  }
3767 
3768  EXCEPTION_GUARD3(m_requester, req, req->channelStateChange(self, connectionState));
3769 
3770  if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
3771  for(size_t i=0, N=ops.size(); i<N; i++) {
3772  ResponseRequest::shared_pointer R(ops[i].lock());
3773  if(!R) continue;
3774  ChannelBaseRequester::shared_pointer req(R->getRequester());
3775  if(!req) continue;
3776  EXCEPTION_GUARD(req->channelDisconnect(connectionState==Channel::DESTROYED););
3777  }
3778  }
3779  }
3780 
3781 
3782  }
3783 
3784 
3785  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
3786  m_channelMutex.lock();
3787  bool issueCreateMessage = m_issueCreateMessage;
3788  m_channelMutex.unlock();
3789 
3790  if (issueCreateMessage)
3791  {
3792  control->startMessage((int8)CMD_CREATE_CHANNEL, 2+4);
3793 
3794  // count
3795  buffer->putShort((int16)1);
3796  // array of CIDs and names
3797  buffer->putInt(m_channelID);
3798  SerializeHelper::serializeString(m_name, buffer, control);
3799  // send immediately
3800  // TODO
3801  control->flush(true);
3802  }
3803  else
3804  {
3805  control->startMessage((int8)CMD_DESTROY_CHANNEL, 4+4);
3806  // SID
3807  m_channelMutex.lock();
3808  pvAccessID sid = m_serverChannelID;
3809  m_channelMutex.unlock();
3810  buffer->putInt(sid);
3811  // CID
3812  buffer->putInt(m_channelID);
3813  // send immediately
3814  // TODO
3815  control->flush(true);
3816  }
3817  }
3818 
3819 
3824  void disconnectPendingIO(bool destroy)
3825  {
3826  Channel::ConnectionState state = destroy ? Channel::DESTROYED : Channel::DISCONNECTED;
3827 
3828  Lock guard(m_responseRequestsMutex);
3829 
3830  m_needSubscriptionUpdate = true;
3831 
3832  // make a copy so that ResponseRequest::reportStatus() can
3833  // remove itself from m_responseRequests
3834  size_t count = 0;
3835  std::vector<ResponseRequest::weak_pointer> rrs(m_responseRequests.size());
3836  for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
3837  iter != m_responseRequests.end();
3838  iter++)
3839  {
3840  rrs[count++] = iter->second;
3841  }
3842 
3843  ResponseRequest::shared_pointer ptr;
3844  for (size_t i = 0; i< count; i++)
3845  {
3846  if((ptr = rrs[i].lock()))
3847  {
3848  EXCEPTION_GUARD(ptr->reportStatus(state));
3849  }
3850  }
3851  }
3852 
3856  // TODO to be called from non-transport thread !!!!!!
3857  void resubscribeSubscriptions();
3858 
3862  // TODO to be called from non-transport thread !!!!!!
3863  void updateSubscriptions()
3864  {
3865  Lock guard(m_responseRequestsMutex);
3866 
3867  if (m_needSubscriptionUpdate)
3868  m_needSubscriptionUpdate = false;
3869  else
3870  return; // noop
3871 
3872  // NOTE: elements cannot be removed within rrs->updateSubscription callbacks
3873  for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
3874  iter != m_responseRequests.end();
3875  iter++)
3876  {
3877  ResponseRequest::shared_pointer ptr = iter->second.lock();
3878  if (ptr)
3879  {
3880  BaseRequestImpl::shared_pointer rrs = dynamic_pointer_cast<BaseRequestImpl>(ptr);
3881  if (rrs)
3882  EXCEPTION_GUARD(rrs->updateSubscription());
3883  }
3884  }
3885  }
3886 
3887  virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField) OVERRIDE FINAL;
3888 
3889  virtual ChannelProcess::shared_pointer createChannelProcess(
3890  ChannelProcessRequester::shared_pointer const & requester,
3891  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3892  {
3893  return BaseRequestImpl::build<ChannelProcessRequestImpl>(external_from_this(), requester, pvRequest);
3894  }
3895 
3896  virtual ChannelGet::shared_pointer createChannelGet(
3897  ChannelGetRequester::shared_pointer const & requester,
3898  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3899  {
3900  return BaseRequestImpl::build<ChannelGetImpl>(external_from_this(), requester, pvRequest);
3901  }
3902 
3903  virtual ChannelPut::shared_pointer createChannelPut(
3904  ChannelPutRequester::shared_pointer const & requester,
3905  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3906  {
3907  return BaseRequestImpl::build<ChannelPutImpl>(external_from_this(), requester, pvRequest);
3908  }
3909 
3910  virtual ChannelPutGet::shared_pointer createChannelPutGet(
3911  ChannelPutGetRequester::shared_pointer const & requester,
3912  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3913  {
3914  return BaseRequestImpl::build<ChannelPutGetImpl>(external_from_this(), requester, pvRequest);
3915  }
3916 
3917  virtual ChannelRPC::shared_pointer createChannelRPC(
3918  ChannelRPCRequester::shared_pointer const & requester,
3919  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3920  {
3921  return BaseRequestImpl::build<ChannelRPCImpl>(external_from_this(), requester, pvRequest);
3922  }
3923 
3924  virtual Monitor::shared_pointer createMonitor(
3925  MonitorRequester::shared_pointer const & requester,
3926  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3927  {
3928  return BaseRequestImpl::build<ChannelMonitorImpl>(external_from_this(), requester, pvRequest);
3929  }
3930 
3931  virtual ChannelArray::shared_pointer createChannelArray(
3932  ChannelArrayRequester::shared_pointer const & requester,
3933  epics::pvData::PVStructure::shared_pointer const & pvRequest) OVERRIDE FINAL
3934  {
3935  return BaseRequestImpl::build<ChannelArrayImpl>(external_from_this(), requester, pvRequest);
3936  }
3937 
3938  virtual void printInfo(std::ostream& out) OVERRIDE FINAL {
3939  //Lock lock(m_channelMutex);
3940 
3941  out << "CHANNEL : " << m_name << std::endl;
3942  out << "STATE : " << ConnectionStateNames[m_connectionState] << std::endl;
3943  if (m_connectionState == CONNECTED)
3944  {
3945  out << "ADDRESS : " << getRemoteAddress() << std::endl;
3946  //out << "RIGHTS : " << getAccessRights() << std::endl;
3947  }
3948  }
3949  };
3950 
3951 
3952 
3953 
3954 public:
3955  static size_t num_instances;
3956 
3957  InternalClientContextImpl(const Configuration::shared_pointer& conf) :
3958  m_addressList(""), m_autoAddressList(true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f),
3959  m_broadcastPort(PVA_BROADCAST_PORT), m_receiveBufferSize(MAX_TCP_RECV),
3960  m_lastCID(0x10203040),
3961  m_lastIOID(0x80706050),
3962  m_version("pvAccess Client", "cpp",
3963  EPICS_PVA_MAJOR_VERSION,
3964  EPICS_PVA_MINOR_VERSION,
3965  EPICS_PVA_MAINTENANCE_VERSION,
3966  EPICS_PVA_DEVELOPMENT_FLAG),
3967  m_contextState(CONTEXT_NOT_INITIALIZED),
3968  m_configuration(conf)
3969  {
3970  REFTRACE_INCREMENT(num_instances);
3971 
3972  if(!m_configuration) m_configuration = ConfigurationFactory::getConfiguration("pvAccess-client");
3973  m_flushTransports.reserve(64);
3974  loadConfiguration();
3975  }
3976 
3977  virtual Configuration::const_shared_pointer getConfiguration() OVERRIDE FINAL {
3978  return m_configuration;
3979  }
3980 
3981  virtual const Version& getVersion() OVERRIDE FINAL {
3982  return m_version;
3983  }
3984 
3985  virtual Timer::shared_pointer getTimer() OVERRIDE FINAL
3986  {
3987  return m_timer;
3988  }
3989 
3990  virtual TransportRegistry* getTransportRegistry() OVERRIDE FINAL
3991  {
3992  return &m_transportRegistry;
3993  }
3994 
3995  virtual Transport::shared_pointer getSearchTransport() OVERRIDE FINAL
3996  {
3997  return m_searchTransport;
3998  }
3999 
4000  virtual void initialize() OVERRIDE FINAL {
4001  Lock lock(m_contextMutex);
4002 
4003  if (m_contextState == CONTEXT_DESTROYED)
4004  throw std::runtime_error("Context destroyed!");
4005  else if (m_contextState == CONTEXT_INITIALIZED)
4006  throw std::runtime_error("Context already initialized.");
4007 
4008  internalInitialize();
4009 
4010  m_contextState = CONTEXT_INITIALIZED;
4011  }
4012 
4013  virtual void printInfo(std::ostream& out) OVERRIDE FINAL {
4014  Lock lock(m_contextMutex);
4015 
4016  out << "CLASS : ::epics::pvAccess::ClientContextImpl" << std::endl;
4017  out << "VERSION : " << m_version.getVersionString() << std::endl;
4018  out << "ADDR_LIST : " << m_addressList << std::endl;
4019  out << "AUTO_ADDR_LIST : " << (m_autoAddressList ? "true" : "false") << std::endl;
4020  out << "CONNECTION_TIMEOUT : " << m_connectionTimeout << std::endl;
4021  out << "BEACON_PERIOD : " << m_beaconPeriod << std::endl;
4022  out << "BROADCAST_PORT : " << m_broadcastPort << std::endl;;
4023  out << "RCV_BUFFER_SIZE : " << m_receiveBufferSize << std::endl;
4024  out << "STATE : ";
4025  switch (m_contextState)
4026  {
4027  case CONTEXT_NOT_INITIALIZED:
4028  out << "CONTEXT_NOT_INITIALIZED" << std::endl;
4029  break;
4030  case CONTEXT_INITIALIZED:
4031  out << "CONTEXT_INITIALIZED" << std::endl;
4032  break;
4033  case CONTEXT_DESTROYED:
4034  out << "CONTEXT_DESTROYED" << std::endl;
4035  break;
4036  default:
4037  out << "UNKNOWN" << std::endl;
4038  }
4039  }
4040 
4041  virtual void destroy() OVERRIDE FINAL
4042  {
4043  {
4044  Lock guard(m_contextMutex);
4045 
4046  if (m_contextState == CONTEXT_DESTROYED)
4047  return;
4048 
4049  // go into destroyed state ASAP
4050  m_contextState = CONTEXT_DESTROYED;
4051  }
4052 
4053  //
4054  // cleanup
4055  //
4056 
4057  m_timer->close();
4058 
4059  m_channelSearchManager->cancel();
4060 
4061  // this will also close all PVA transports
4062  destroyAllChannels();
4063 
4064  // stop UDPs
4065  for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin();
4066  iter != m_udpTransports.end(); iter++)
4067  (*iter)->close();
4068  m_udpTransports.clear();
4069 
4070  // stop UDPs
4071  if (m_searchTransport)
4072  m_searchTransport->close();
4073 
4074  // wait for all transports to cleanly exit
4075  int tries = 40;
4076  epics::pvData::int32 transportCount;
4077  while ((transportCount = m_transportRegistry.size()) && tries--)
4078  epicsThreadSleep(0.025);
4079 
4080  {
4081  Lock guard(m_beaconMapMutex);
4082  m_beaconHandlers.clear();
4083  }
4084 
4085  if (transportCount)
4086  LOG(logLevelDebug, "PVA client context destroyed with %u transport(s) active.", (unsigned)transportCount);
4087  }
4088 
4089  virtual ~InternalClientContextImpl()
4090  {
4091  REFTRACE_DECREMENT(num_instances);
4092  }
4093 
4094  const weak_pointer m_external_this, m_internal_this;
4095  shared_pointer internal_from_this() const {
4096  return shared_pointer(m_internal_this);
4097  }
4098  shared_pointer external_from_this() const {
4099  return shared_pointer(m_external_this);
4100  }
4101 private:
4102 
4103  void loadConfiguration() {
4104 
4105  // TODO for now just a simple switch
4106  int32 debugLevel = m_configuration->getPropertyAsInteger(PVACCESS_DEBUG, 0);
4107  if (debugLevel > 0)
4109 
4110  m_addressList = m_configuration->getPropertyAsString("EPICS_PVA_ADDR_LIST", m_addressList);
4111  m_autoAddressList = m_configuration->getPropertyAsBoolean("EPICS_PVA_AUTO_ADDR_LIST", m_autoAddressList);
4112  m_connectionTimeout = m_configuration->getPropertyAsFloat("EPICS_PVA_CONN_TMO", m_connectionTimeout);
4113  m_beaconPeriod = m_configuration->getPropertyAsFloat("EPICS_PVA_BEACON_PERIOD", m_beaconPeriod);
4114  m_broadcastPort = m_configuration->getPropertyAsInteger("EPICS_PVA_BROADCAST_PORT", m_broadcastPort);
4115  m_receiveBufferSize = m_configuration->getPropertyAsInteger("EPICS_PVA_MAX_ARRAY_BYTES", m_receiveBufferSize);
4116  }
4117 
4118  void internalInitialize() {
4119 
4120  osiSockAttach();
4121  m_timer.reset(new Timer("pvAccess-client timer", lowPriority));
4122  InternalClientContextImpl::shared_pointer thisPointer(internal_from_this());
4123  // stores weak_ptr
4124  m_connector.reset(new BlockingTCPConnector(thisPointer, m_receiveBufferSize, m_connectionTimeout));
4125 
4126  // stores many weak_ptr
4127  m_responseHandler.reset(new ClientResponseHandler(thisPointer));
4128 
4129  m_channelSearchManager.reset(new ChannelSearchManager(thisPointer));
4130 
4131  // TODO put memory barrier here... (if not already called within a lock?)
4132 
4133  // setup UDP transport
4134  {
4135 
4136  // query broadcast addresses of all IFs
4137  SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0);
4138  if (socket == INVALID_SOCKET)
4139  {
4140  throw std::logic_error("Failed to create a socket to introspect network interfaces.");
4141  }
4142 
4143  IfaceNodeVector ifaceList;
4144  if (discoverInterfaces(ifaceList, socket, 0) || ifaceList.size() == 0)
4145  {
4146  LOG(logLevelError, "Failed to introspect interfaces or no network interfaces available.");
4147  }
4148  epicsSocketDestroy (socket);
4149 
4150  initializeUDPTransports(false, m_udpTransports, ifaceList, m_responseHandler, m_searchTransport,
4151  m_broadcastPort, m_autoAddressList, m_addressList, std::string());
4152 
4153  }
4154 
4155  // setup search manager
4156  // Starts timer
4157  m_channelSearchManager->activate();
4158 
4159  // TODO what if initialization failed!!!
4160  }
4161 
4162  void destroyAllChannels() {
4163  Lock guard(m_cidMapMutex);
4164 
4165  int count = 0;
4166  std::vector<ClientChannelImpl::weak_pointer> channels(m_channelsByCID.size());
4167  for (CIDChannelMap::iterator iter = m_channelsByCID.begin();
4168  iter != m_channelsByCID.end();
4169  iter++)
4170  {
4171  channels[count++] = iter->second;
4172  }
4173 
4174  guard.unlock();
4175 
4176 
4177  ClientChannelImpl::shared_pointer ptr;
4178  for (int i = 0; i < count; i++)
4179  {
4180  ptr = channels[i].lock();
4181  if (ptr)
4182  {
4183  EXCEPTION_GUARD(ptr->destroy());
4184  }
4185  }
4186  }
4187 
4191  void checkChannelName(std::string const & name) OVERRIDE FINAL {
4192  if (name.empty())
4193  throw std::runtime_error("0 or empty channel name");
4194  else if (name.length() > MAX_CHANNEL_NAME_LENGTH)
4195  throw std::runtime_error("name too long");
4196  }
4197 
4201  void checkState() {
4202  Lock lock(m_contextMutex); // TODO check double-lock?!!!
4203 
4204  if (m_contextState == CONTEXT_DESTROYED)
4205  throw std::runtime_error("Context destroyed.");
4206  else if (m_contextState == CONTEXT_NOT_INITIALIZED)
4207  initialize();
4208  }
4209 
4214  void registerChannel(ClientChannelImpl::shared_pointer const & channel) OVERRIDE FINAL
4215  {
4216  Lock guard(m_cidMapMutex);
4217  m_channelsByCID[channel->getChannelID()] = ClientChannelImpl::weak_pointer(channel);
4218  }
4219 
4224  void unregisterChannel(ClientChannelImpl::shared_pointer const & channel) OVERRIDE FINAL
4225  {
4226  Lock guard(m_cidMapMutex);
4227  m_channelsByCID.erase(channel->getChannelID());
4228  }
4229 
4235  Channel::shared_pointer getChannel(pvAccessID channelID) OVERRIDE FINAL
4236  {
4237  Lock guard(m_cidMapMutex);
4238  CIDChannelMap::iterator it = m_channelsByCID.find(channelID);
4239  return (it == m_channelsByCID.end() ? Channel::shared_pointer() : static_pointer_cast<Channel>(it->second.lock()));
4240  }
4241 
4246  pvAccessID generateCID()
4247  {
4248  Lock guard(m_cidMapMutex);
4249 
4250  // search first free (theoretically possible loop of death)
4251  while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()) ;
4252  // reserve CID
4253  m_channelsByCID[m_lastCID].reset();
4254  return m_lastCID;
4255  }
4256 
4260  void freeCID(int cid)
4261  {
4262  Lock guard(m_cidMapMutex);
4263  m_channelsByCID.erase(cid);
4264  }
4265 
4266 
4272  ResponseRequest::shared_pointer getResponseRequest(pvAccessID ioid) OVERRIDE FINAL
4273  {
4274  Lock guard(m_ioidMapMutex);
4275  IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
4276  if (it == m_pendingResponseRequests.end()) return ResponseRequest::shared_pointer();
4277  return it->second.lock();
4278  }
4279 
4285  pvAccessID registerResponseRequest(ResponseRequest::shared_pointer const & request) OVERRIDE FINAL
4286  {
4287  Lock guard(m_ioidMapMutex);
4288  pvAccessID ioid = generateIOID();
4289  m_pendingResponseRequests[ioid] = ResponseRequest::weak_pointer(request);
4290  return ioid;
4291  }
4292 
4298  ResponseRequest::shared_pointer unregisterResponseRequest(pvAccessID ioid) OVERRIDE FINAL
4299  {
4300  if (ioid == INVALID_IOID) return ResponseRequest::shared_pointer();
4301 
4302  Lock guard(m_ioidMapMutex);
4303  IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
4304  if (it == m_pendingResponseRequests.end())
4305  return ResponseRequest::shared_pointer();
4306 
4307  ResponseRequest::shared_pointer retVal = it->second.lock();
4308  m_pendingResponseRequests.erase(it);
4309  return retVal;
4310  }
4311 
4316  pvAccessID generateIOID()
4317  {
4318  Lock guard(m_ioidMapMutex);
4319 
4320  // search first free (theoretically possible loop of death)
4321  do {
4322  while (m_pendingResponseRequests.find(++m_lastIOID) != m_pendingResponseRequests.end()) ;
4323  } while (m_lastIOID == INVALID_IOID);
4324 
4325  // reserve IOID
4326  m_pendingResponseRequests[m_lastIOID].reset();
4327  return m_lastIOID;
4328  }
4329 
4333  virtual void newServerDetected() OVERRIDE FINAL
4334  {
4335  if (m_channelSearchManager)
4336  m_channelSearchManager->newServerDetected();
4337  }
4338 
4345  BeaconHandler::shared_pointer getBeaconHandler(osiSockAddr* responseFrom) OVERRIDE FINAL
4346  {
4347  Lock guard(m_beaconMapMutex);
4348  AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom);
4349  BeaconHandler::shared_pointer handler;
4350  if (it == m_beaconHandlers.end())
4351  {
4352  // stores weak_ptr
4353  handler.reset(new BeaconHandler(internal_from_this(), responseFrom));
4354  m_beaconHandlers[*responseFrom] = handler;
4355  }
4356  else
4357  handler = it->second;
4358  return handler;
4359  }
4360 
4367  Transport::shared_pointer getTransport(ClientChannelImpl::shared_pointer const & client, osiSockAddr* serverAddress, int8 minorRevision, int16 priority) OVERRIDE FINAL
4368  {
4369  try
4370  {
4371  Transport::shared_pointer t = m_connector->connect(client, m_responseHandler, *serverAddress, minorRevision, priority);
4372  return t;
4373  }
4374  catch (std::exception& e)
4375  {
4376  LOG(logLevelDebug, "getTransport() fails: %s", e.what());
4377  return Transport::shared_pointer();
4378  }
4379  }
4380 
4384  // TODO no minor version with the addresses
4385  // TODO what if there is an channel with the same name, but on different host!
4386  ClientChannelImpl::shared_pointer createChannelInternal(std::string const & name, ChannelRequester::shared_pointer const & requester, short priority,
4387  const InetAddrVector& addresses) OVERRIDE FINAL { // TODO addresses
4388 
4389  checkState();
4390  checkChannelName(name);
4391 
4392  if (!requester)
4393  throw std::runtime_error("0 requester");
4394 
4395  if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX)
4396  throw std::range_error("priority out of bounds");
4397 
4398  try {
4399  /* Note that our channels have an internal ref. to us.
4400  * Thus having active channels will *not* keep us alive.
4401  * Use code must explicitly keep our external ref. as well
4402  * as our channels.
4403  */
4404  pvAccessID cid = generateCID();
4405  return InternalChannelImpl::create(internal_from_this(), cid, name, requester, priority, addresses);
4406  } catch(std::exception& e) {
4407  LOG(logLevelError, "createChannelInternal() exception: %s\n", e.what());
4408  return ClientChannelImpl::shared_pointer();
4409  }
4410  }
4411 
4416  ChannelSearchManager::shared_pointer getChannelSearchManager() OVERRIDE FINAL {
4417  return m_channelSearchManager;
4418  }
4419 
4424  string m_addressList;
4425 
4429  bool m_autoAddressList;
4430 
4437  float m_connectionTimeout;
4438 
4442  float m_beaconPeriod;
4443 
4447  int32 m_broadcastPort;
4448 
4452  int m_receiveBufferSize;
4453 
4457  Timer::shared_pointer m_timer;
4458 
4462  BlockingUDPTransportVector m_udpTransports;
4463 
4467  BlockingUDPTransport::shared_pointer m_searchTransport;
4468 
4472  epics::auto_ptr<BlockingTCPConnector> m_connector;
4473 
4478  TransportRegistry m_transportRegistry;
4479 
4483  ClientResponseHandler::shared_pointer m_responseHandler;
4484 
4488  // TODO consider std::unordered_map
4489  typedef std::map<pvAccessID, ClientChannelImpl::weak_pointer> CIDChannelMap;
4490  CIDChannelMap m_channelsByCID;
4491 
4495  Mutex m_cidMapMutex;
4496 
4500  pvAccessID m_lastCID;
4501 
4505  IOIDResponseRequestMap m_pendingResponseRequests;
4506 
4510  Mutex m_ioidMapMutex;
4511 
4515  pvAccessID m_lastIOID;
4516 
4521  ChannelSearchManager::shared_pointer m_channelSearchManager;
4522 
4526  // TODO consider std::unordered_map
4527  typedef std::map<osiSockAddr, BeaconHandler::shared_pointer, comp_osiSock_lt> AddressBeaconHandlerMap;
4528  AddressBeaconHandlerMap m_beaconHandlers;
4529 
4533  Mutex m_beaconMapMutex;
4534 
4538  Version m_version;
4539 
4540 private:
4541 
4545  ContextState m_contextState;
4546 
4550  Mutex m_contextMutex;
4551 
4552  friend class ChannelProviderImpl;
4553 
4554  Configuration::shared_pointer m_configuration;
4555 
4556  TransportRegistry::transportVector_t m_flushTransports;
4557 };
4558 
4559 size_t InternalClientContextImpl::num_instances;
4560 size_t InternalClientContextImpl::InternalChannelImpl::num_instances;
4561 size_t InternalClientContextImpl::InternalChannelImpl::num_active;
4562 
4563 class ChannelGetFieldRequestImpl :
4564  public ResponseRequest,
4566  public std::tr1::enable_shared_from_this<ChannelGetFieldRequestImpl>
4567 {
4568 public:
4569  typedef GetFieldRequester requester_type;
4570  POINTER_DEFINITIONS(ChannelGetFieldRequestImpl);
4571 
4572  const InternalClientContextImpl::InternalChannelImpl::shared_pointer m_channel;
4573 
4574  const GetFieldRequester::weak_pointer m_callback;
4575  string m_subField;
4576 
4577  // const after activate()
4578  pvAccessID m_ioid;
4579 
4580  Mutex m_mutex;
4581  bool m_destroyed;
4582  bool m_notified;
4583 
4584  ChannelGetFieldRequestImpl(InternalClientContextImpl::InternalChannelImpl::shared_pointer const & channel,
4585  GetFieldRequester::shared_pointer const & callback,
4586  std::string const & subField) :
4587  m_channel(channel),
4588  m_callback(callback),
4589  m_subField(subField),
4590  m_ioid(INVALID_IOID),
4591  m_destroyed(false),
4592  m_notified(false)
4593  {}
4594 
4595  void activate()
4596  {
4597  {
4598  // register response request
4599  ChannelGetFieldRequestImpl::shared_pointer self(shared_from_this());
4600  m_ioid = m_channel->getContext()->registerResponseRequest(self);
4601  m_channel->registerResponseRequest(self);
4602  {
4603  Lock L(m_channel->m_channelMutex);
4604  m_channel->m_getfield.swap(self);
4605  }
4606  // self goes out of scope, may call GetFieldRequester::getDone() from dtor
4607  }
4608 
4609  // enqueue send request
4610  try {
4611  m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
4612  } catch (std::runtime_error &rte) {
4613  //notify(BaseRequestImpl::channelNotConnected, FieldConstPtr());
4614  }
4615  }
4616 
4617 public:
4618  virtual ~ChannelGetFieldRequestImpl()
4619  {
4620  destroy();
4621  notify(BaseRequestImpl::channelDestroyed, FieldConstPtr());
4622  }
4623 
4624  void notify(const Status& sts, const FieldConstPtr& field)
4625  {
4626  {
4627  Lock G(m_mutex);
4628  if(m_notified)
4629  return;
4630  m_notified = true;
4631  }
4632  EXCEPTION_GUARD3(m_callback, cb, cb->getDone(sts, field));
4633  }
4634 
4635  ChannelBaseRequester::shared_pointer getRequester() OVERRIDE FINAL {
4636  return m_callback.lock();
4637  }
4638 
4639  pvAccessID getIOID() const OVERRIDE FINAL {
4640  return m_ioid;
4641  }
4642 
4643  virtual void send(ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL {
4644  control->startMessage((int8)CMD_GET_FIELD, 8);
4645  buffer->putInt(m_channel->getServerChannelID());
4646  buffer->putInt(m_ioid);
4647  SerializeHelper::serializeString(m_subField, buffer, control);
4648  }
4649 
4650 
4651  virtual Channel::shared_pointer getChannel()
4652  {
4653  return m_channel;
4654  }
4655 
4656  virtual void cancel() OVERRIDE FINAL {
4657  // TODO
4658  // noop
4659  }
4660 
4661  virtual void timeout() OVERRIDE FINAL {
4662  cancel();
4663  }
4664 
4665  void reportStatus(Channel::ConnectionState status) OVERRIDE FINAL {
4666  // destroy, since channel (parent) was destroyed
4667  if (status == Channel::DESTROYED)
4668  destroy();
4669  // TODO notify?
4670  }
4671 
4672  virtual void destroy() OVERRIDE FINAL
4673  {
4674  {
4675  Lock guard(m_mutex);
4676  if (m_destroyed)
4677  return;
4678  m_destroyed = true;
4679  }
4680 
4681  {
4682  Lock L(m_channel->m_channelMutex);
4683  if(m_channel->m_getfield.get()==this)
4684  m_channel->m_getfield.reset();
4685  }
4686 
4687  // unregister response request
4688  m_channel->getContext()->unregisterResponseRequest(m_ioid);
4689  m_channel->unregisterResponseRequest(m_ioid);
4690  }
4691 
4692  virtual void response(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer) OVERRIDE FINAL {
4693 
4694  Status status;
4695  FieldConstPtr field;
4696  status.deserialize(payloadBuffer, transport.get());
4697  if (status.isSuccess())
4698  {
4699  // deserialize Field...
4700  field = transport->cachedDeserialize(payloadBuffer);
4701  }
4702  notify(status, field);
4703 
4704  destroy();
4705  }
4706 
4707 
4708 };
4709 
4710 
4711 void InternalClientContextImpl::InternalChannelImpl::getField(GetFieldRequester::shared_pointer const & requester,std::string const & subField)
4712 {
4713  ChannelGetFieldRequestImpl::shared_pointer self(new ChannelGetFieldRequestImpl(internal_from_this(), requester, subField));
4714  self->activate();
4715  // activate() stores self in channel
4716 }
4717 
4718 void InternalClientContextImpl::InternalChannelImpl::resubscribeSubscriptions()
4719 {
4720  Lock guard(m_responseRequestsMutex);
4721 
4722  Transport::shared_pointer transport = getTransport();
4723 
4724  if(m_getfield) {
4725  transport->enqueueSendRequest(m_getfield);
4726  }
4727 
4728  // NOTE: elements cannot be removed within rrs->updateSubscription callbacks
4729  for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
4730  iter != m_responseRequests.end();
4731  iter++)
4732  {
4733  ResponseRequest::shared_pointer ptr = iter->second.lock();
4734  if (ptr)
4735  {
4736  BaseRequestImpl::shared_pointer rrs = dynamic_pointer_cast<BaseRequestImpl>(ptr);
4737  if (rrs)
4738  EXCEPTION_GUARD(rrs->resubscribeSubscription(transport));
4739  }
4740  }
4741 }
4742 
4743 }//namespace
4744 namespace epics {
4745 namespace pvAccess {
4746 
4747 ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf)
4748 {
4749  registerRefCounter("InternalClientContextImpl", &InternalClientContextImpl::num_instances);
4750  registerRefCounter("InternalChannelImpl", &InternalClientContextImpl::InternalChannelImpl::num_instances);
4751  registerRefCounter("InternalChannelImpl (Active)", &InternalClientContextImpl::InternalChannelImpl::num_active);
4752  registerRefCounter("BaseRequestImpl", &BaseRequestImpl::num_instances);
4753  registerRefCounter("BaseRequestImpl (Active)", &BaseRequestImpl::num_active);
4754  InternalClientContextImpl::shared_pointer internal(new InternalClientContextImpl(conf)),
4755  external(internal.get(), epics::pvAccess::Destroyable::cleaner(internal));
4756  const_cast<InternalClientContextImpl::weak_pointer&>(internal->m_external_this) = external;
4757  const_cast<InternalClientContextImpl::weak_pointer&>(internal->m_internal_this) = internal;
4758  internal->initialize();
4759  return external;
4760 }
4761 
4762 }
4763 };
int8_t int8
Definition: pvType.h:75
void deserialize(ByteBuffer *buffer, DeserializableControl *flusher)
Definition: status.cpp:61
double timeout
Definition: pvutils.cpp:25
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
virtual void deserialize(ByteBuffer *buffer, DeserializableControl *flusher)
Definition: bitSet.cpp:339
PVScalar is the base class for each scalar field.
Definition: pvData.h:272
std::string request
#define INVALID_SOCKET
Definition: osdSock.h:32
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsInt32 pvAccessID
Definition: pvaDefs.h:18
std::tr1::shared_ptr< detail::SharedPut > put
EPICS_ALWAYS_INLINE int8 getByte()
Definition: byteBuffer.h:617
epicsMutexId lock
Definition: osiClockTime.c:37
pvd::Status status
An EPICS-specific replacement for ANSI C&#39;s assert.
int i
Definition: scan.c:967
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const char * getBuffer() const
Definition: byteBuffer.h:294
Class that must be implemented by code that makes Timer requests.
Definition: timer.h:40
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
struct sockaddr_in ia
Definition: osiSock.h:157
#define SET_LOG_LEVEL(level)
Definition: logger.h:50
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
void lock()
Definition: lock.h:55
std::size_t getLimit() const
Definition: byteBuffer.h:368
pvd::StructureConstPtr type
#define STATIC_SEARCH_BASE_DELAY_SEC
#define STATIC_SEARCH_MAX_MULTIPLIER
epicsShareExtern const std::string PVACCESS_DEBUG
Definition: pvaConstants.h:88
Definition: memory.hpp:41
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
A lock for multithreading.
Definition: lock.h:36
#define initialize
Definition: aaiRecord.c:54
A vector of bits.
Definition: bitSet.h:56
An element for a monitorQueue.
Definition: monitor.h:54
void unlock()
Definition: lock.h:66
#define EXCEPTION_GUARD3(WEAK, PTR, code)
std::size_t getPosition() const
Definition: byteBuffer.h:346
#define OVERRIDE
Definition: pvAccess.h:55
const std::string & getMessage() const
Definition: status.h:80
void copy(PVValueArray< T > &pvFrom, size_t fromOffset, size_t fromStride, PVValueArray< T > &pvTo, size_t toOffset, size_t toStride, size_t count)
Copy a subarray from one scalar array to another.
virtual void flush(bool lastMessageCompleted)=0
const epics::pvData::int32 PVA_SERVER_PORT
Definition: pvaConstants.h:41
std::vector< BlockingUDPTransport::shared_pointer > BlockingUDPTransportVector
Definition: blockingUDP.h:403
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
Definition: pvData.h:124
Support for delayed or periodic callback execution.
Definition: timer.h:71
EPICS_ALWAYS_INLINE int32 getInt()
Definition: byteBuffer.h:629
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
Holds all PVA related.
Definition: pvif.h:34
#define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE)
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
pvData
Definition: monitor.h:428
This class implements introspection object for a structure.
Definition: pvIntrospect.h:697
#define POINTER_DEFINITIONS(clazz)
Definition: sharedPtr.h:198
std::vector< osiSockAddr > InetAddrVector
ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer &conf)
void authNZInitialize(const std::vector< std::string > &offeredSecurityPlugins)
Definition: codec.cpp:1898
Definition: server.h:76
void encodeAsIPv6Address(ByteBuffer *buffer, const osiSockAddr *address)
Definition: caget.c:48
std::vector< ifaceNode > IfaceNodeVector
const std::string & getStackDump() const
Definition: status.h:86
const ChannelProcessRequester::weak_pointer requester
Definition: pvAccess.cpp:68
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
const epics::pvData::int32 PVA_BROADCAST_PORT
Definition: pvaConstants.h:44
bool isSuccess() const
Definition: status.h:103
HexDump & limit(size_t n=(size_t)-1)
safety limit on max bytes printed
Definition: hexDump.h:44
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
std::tr1::shared_ptr< PVScalar > PVScalarPtr
Definition: pvData.h:77
std::vector< Transport::shared_pointer > transportVector_t
int SOCKET
Definition: osdSock.h:31
EPICS_ALWAYS_INLINE int16 getShort()
Definition: byteBuffer.h:623
Data interface for a structure,.
Definition: pvData.h:712
std::size_t getRemaining() const
Definition: byteBuffer.h:391
std::tr1::shared_ptr< const Field > FieldConstPtr
Definition: pvIntrospect.h:137
#define EXCEPTION_GUARD(code)
virtual std::string getChannelName()=0
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
void initializeUDPTransports(bool serverFlag, BlockingUDPTransportVector &udpTransports, const IfaceNodeVector &ifaceList, const ResponseHandler::shared_pointer &responseHandler, BlockingUDPTransport::shared_pointer &sendTransport, int32 &listenPort, bool autoAddressList, const std::string &addressList, const std::string &ignoreAddressList)
void registerRefCounter(const char *name, const size_t *counter)
Definition: reftrack.cpp:59
bool decodeAsIPv6Address(ByteBuffer *buffer, osiSockAddr *address)
Expose statistics related to network transport.
Definition: pvAccess.h:156
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
void getSocketAddressList(InetAddrVector &ret, const std::string &list, int defaultPort, const InetAddrVector *appendList)
const epics::pvData::int32 INVALID_IOID
Definition: pvaConstants.h:79
std::tr1::shared_ptr< PVField > PVFieldPtr
Definition: pvData.h:66
void done(int k)
Definition: antelope.c:77
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
Definition: tool_lib.h:64
Definition: caget.c:48
virtual void ensureBuffer(std::size_t size)=0
void getVersion(epics::pvData::PVDataVersion *ptr)
Definition: pvdVersion.cpp:13
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
Definition: monitor.h:40
std::tr1::shared_ptr< BitSet > BitSetPtr
Definition: bitSet.h:26
int osiSockAttach()
Definition: osdSock.c:54
detail::pick_type< int8_t, signed char, detail::pick_type< uint8_t, char, unsigned char >::type >::type boolean
Definition: pvType.h:71
Instance declaring destroy method.
Definition: destroyable.h:24
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:808
int16_t int16
Definition: pvType.h:79
const epics::pvData::uint32 MAX_CHANNEL_NAME_LENGTH
Definition: pvaConstants.h:73
epicsMutex Mutex
Definition: lock.h:28
virtual void channelDestroyedOnServer()=0
PVArray is the base class for all array types.
Definition: pvData.h:551
const epics::pvData::int32 MAX_TCP_RECV
Definition: pvaConstants.h:64
#define EPICS_NOT_COPYABLE(CLASS)
Disable implicit copyable.
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0)=0
int epicsStdCall sockAddrAreIdentical(const osiSockAddr *plhs, const osiSockAddr *prhs)
Definition: osiSock.c:35
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
int discoverInterfaces(IfaceNodeVector &list, SOCKET socket, const osiSockAddr *pMatchAddr)
EPICS_ALWAYS_INLINE void putShort(int16 value)
Definition: byteBuffer.h:531
T getAs() const
Definition: pvData.h:302
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
#define FINAL
Definition: pvAccess.h:48
Methods for manipulating timeStamp.
Definition: timeStamp.h:43
#define IS_LOGGABLE(level)
Definition: logger.h:51