24 #define epicsExportSharedSymbols 55 Status ClientChannelImpl::channelDestroyed(
56 Status::STATUSTYPE_WARNING,
"channel destroyed");
57 Status ClientChannelImpl::channelDisconnected(
58 Status::STATUSTYPE_WARNING,
"channel disconnected");
64 class ChannelGetFieldRequestImpl;
68 typedef std::map<pvAccessID, ResponseRequest::weak_pointer> IOIDResponseRequestMap;
71 #define EXCEPTION_GUARD(code) do { code; } while(0) 73 #define EXCEPTION_GUARD3(WEAK, PTR, code) do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) { code; }}while(0) 75 #define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE) \ 76 do{requester_type::shared_pointer PTR((WEAK).lock()); if(PTR) (PTR)->message(MSG, MTYPE); }while(0) 82 class BaseRequestImpl :
92 static const Status notInitializedStatus;
93 static const Status destroyedStatus;
94 static const Status channelNotConnected;
95 static const Status channelDestroyed;
96 static const Status otherRequestPendingStatus;
97 static const Status invalidPutStructureStatus;
98 static const Status invalidPutArrayStatus;
99 static const Status pvRequestNull;
101 static BitSet::shared_pointer createBitSetFor(
102 PVStructure::shared_pointer
const & pvStructure,
103 BitSet::shared_pointer
const & existingBitSet)
106 size_t pvStructureSize = pvStructure->getNumberFields();
109 existingBitSet->clear();
110 return existingBitSet;
113 return BitSet::shared_pointer(
new BitSet(pvStructureSize));
116 static PVField::shared_pointer reuseOrCreatePVField(
117 Field::const_shared_pointer
const & field,
118 PVField::shared_pointer
const & existingPVField)
120 if (existingPVField.get() && *field == *existingPVField->getField())
121 return existingPVField;
123 return pvDataCreate->createPVField(field);
128 const ClientChannelImpl::shared_pointer m_channel;
131 static const int NULL_REQUEST = -1;
132 static const int PURE_DESTROY_REQUEST = -2;
133 static const int PURE_CANCEL_REQUEST = -3;
141 int32 m_pendingRequest;
160 const BaseRequestImpl::weak_pointer m_this_internal,
163 template<
class subklass>
164 std::tr1::shared_ptr<subklass> internal_from_this() {
165 ResponseRequest::shared_pointer P(m_this_internal);
168 template<
class subklass>
169 std::tr1::shared_ptr<subklass> external_from_this() {
170 ResponseRequest::shared_pointer P(m_this_external);
174 static size_t num_instances;
175 static size_t num_active;
177 template<
class subklass>
179 typename std::tr1::shared_ptr<subklass>
180 build(ClientChannelImpl::shared_pointer
const & channel,
181 const typename subklass::requester_type::shared_pointer&
requester,
182 const epics::pvData::PVStructure::shared_pointer& pvRequest)
184 std::tr1::shared_ptr<subklass>
internal(
new subklass(channel, requester, pvRequest)),
185 external(
internal.
get(),
189 const_cast<BaseRequestImpl::weak_pointer&
>(
internal->m_this_internal) =
internal;
190 const_cast<BaseRequestImpl::weak_pointer&
>(
internal->m_this_external) = external;
191 internal->activate();
192 REFTRACE_INCREMENT(num_active);
203 BaseRequestImpl(ClientChannelImpl::shared_pointer
const & channel) :
206 m_pendingRequest(NULL_REQUEST),
208 m_initialized(
false),
211 REFTRACE_INCREMENT(num_instances);
214 virtual ~BaseRequestImpl() {
215 REFTRACE_DECREMENT(num_instances);
218 virtual void activate() {
221 shared_pointer
self(m_this_internal);
222 m_ioid = m_channel->getContext()->registerResponseRequest(
self);
223 m_channel->registerResponseRequest(
self);
226 bool startRequest(
int32 qos) {
229 if(qos==PURE_DESTROY_REQUEST)
231 else if(qos==PURE_CANCEL_REQUEST && m_pendingRequest!=PURE_DESTROY_REQUEST)
233 else if(m_pendingRequest==NULL_REQUEST)
238 m_pendingRequest = qos;
242 int32 beginRequest() {
244 int32 ret = m_pendingRequest;
245 m_pendingRequest = NULL_REQUEST;
249 void abortRequest() {
251 m_pendingRequest = NULL_REQUEST;
260 virtual void initResponse(Transport::shared_pointer
const & transport,
int8 version,
ByteBuffer* payloadBuffer,
int8 qos,
const Status&
status) = 0;
261 virtual void normalResponse(Transport::shared_pointer
const & transport,
int8 version,
ByteBuffer* payloadBuffer,
int8 qos,
const Status& status) = 0;
263 virtual void response(Transport::shared_pointer
const & transport,
int8 version,
ByteBuffer* payloadBuffer)
OVERRIDE {
264 transport->ensureData(1);
276 m_initialized =
true;
279 initResponse(transport, version, payloadBuffer, qos, status);
283 bool destroyReq =
false;
288 m_initialized =
false;
292 normalResponse(transport, version, payloadBuffer, qos, status);
309 startRequest(PURE_CANCEL_REQUEST);
310 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
311 }
catch (std::runtime_error& e) {
313 }
catch (std::exception& e) {
315 LOG(
logLevelWarn,
"Ignore exception during ChanneGet::cancel: %s", e.what());
320 virtual Channel::shared_pointer getChannel() {
328 virtual void lastRequest() {
332 virtual void destroy(
bool createRequestFailed) {
340 initd = m_initialized;
344 m_channel->getContext()->unregisterResponseRequest(m_ioid);
345 m_channel->unregisterResponseRequest(m_ioid);
348 if (!createRequestFailed && initd)
352 startRequest(PURE_DESTROY_REQUEST);
353 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
354 }
catch (std::runtime_error& e) {
356 }
catch (std::exception& e) {
357 LOG(
logLevelWarn,
"Ignore exception during BaseRequestImpl::destroy: %s", e.what());
362 REFTRACE_DECREMENT(num_active);
372 if (status == Channel::DESTROYED)
374 else if (status == Channel::DISCONNECTED)
376 m_subscribed.
clear();
382 virtual void resubscribeSubscription(Transport::shared_pointer
const & transport) {
383 if (transport.get() != 0 && !m_subscribed.
get() && startRequest(
QOS_INIT))
386 transport->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
390 void updateSubscription() {}
394 if (qos == NULL_REQUEST) {
397 else if (qos == PURE_DESTROY_REQUEST)
400 buffer->
putInt(m_channel->getServerChannelID());
403 else if (qos == PURE_CANCEL_REQUEST)
406 buffer->
putInt(m_channel->getServerChannelID());
416 Transport::shared_pointer T(m_channel->getTransport());
418 s.transportPeer = T->getRemoteName();
425 size_t BaseRequestImpl::num_instances;
426 size_t BaseRequestImpl::num_active;
431 const Status BaseRequestImpl::notInitializedStatus(Status::STATUSTYPE_ERROR,
"request not initialized");
432 const Status BaseRequestImpl::destroyedStatus(Status::STATUSTYPE_ERROR,
"request destroyed");
433 const Status BaseRequestImpl::channelNotConnected(Status::STATUSTYPE_ERROR,
"channel not connected");
434 const Status BaseRequestImpl::channelDestroyed(Status::STATUSTYPE_ERROR,
"channel destroyed");
435 const Status BaseRequestImpl::otherRequestPendingStatus(Status::STATUSTYPE_ERROR,
"other request pending");
436 const Status BaseRequestImpl::invalidPutStructureStatus(Status::STATUSTYPE_ERROR,
"incompatible put structure");
437 const Status BaseRequestImpl::invalidPutArrayStatus(Status::STATUSTYPE_ERROR,
"incompatible put array");
438 const Status BaseRequestImpl::pvRequestNull(Status::STATUSTYPE_ERROR,
"pvRequest == 0");
441 class ChannelProcessRequestImpl :
442 public BaseRequestImpl,
446 const requester_type::weak_pointer m_callback;
447 const PVStructure::shared_pointer m_pvRequest;
449 ChannelProcessRequestImpl(ClientChannelImpl::shared_pointer
const & channel, ChannelProcessRequester::shared_pointer
const &
callback, PVStructure::shared_pointer
const & pvRequest) :
450 BaseRequestImpl(channel),
451 m_callback(callback),
452 m_pvRequest(pvRequest)
457 BaseRequestImpl::activate();
464 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
465 }
catch (std::runtime_error &rte) {
466 EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(channelDestroyed, external_from_this<ChannelProcessRequestImpl>()));
467 BaseRequestImpl::destroy(
true);
471 virtual ~ChannelProcessRequestImpl() {}
473 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
476 int32 pendingRequest = beginRequest();
477 if (pendingRequest < 0)
479 base_send(buffer, control, pendingRequest);
484 buffer->
putInt(m_channel->getServerChannelID());
491 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
496 EXCEPTION_GUARD3(m_callback, cb, cb->channelProcessConnect(status, external_from_this<ChannelProcessRequestImpl>()));
500 EXCEPTION_GUARD3(m_callback, cb, cb->processDone(status, external_from_this<ChannelProcessRequestImpl>()));
505 ChannelProcess::shared_pointer thisPtr(external_from_this<ChannelProcessRequestImpl>());
510 EXCEPTION_GUARD3(m_callback, cb, cb->processDone(destroyedStatus, thisPtr));
513 if (!m_initialized) {
514 EXCEPTION_GUARD3(m_callback, cb, cb->processDone(notInitializedStatus, thisPtr));
520 EXCEPTION_GUARD3(m_callback, cb, cb->processDone(otherRequestPendingStatus, thisPtr));
525 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<BaseRequestImpl>());
526 }
catch (std::runtime_error &rte) {
528 EXCEPTION_GUARD3(m_callback, cb, cb->processDone(channelNotConnected, thisPtr));
534 return BaseRequestImpl::getChannel();
539 BaseRequestImpl::cancel();
544 BaseRequestImpl::destroy();
549 BaseRequestImpl::lastRequest();
560 class ChannelGetImpl :
561 public BaseRequestImpl,
565 const ChannelGetRequester::weak_pointer m_callback;
567 const PVStructure::shared_pointer m_pvRequest;
569 PVStructure::shared_pointer m_structure;
570 BitSet::shared_pointer m_bitSet;
572 Mutex m_structureMutex;
574 ChannelGetImpl(ClientChannelImpl::shared_pointer
const & channel,
575 ChannelGetRequester::shared_pointer
const &
requester,
576 PVStructure::shared_pointer
const & pvRequest) :
577 BaseRequestImpl(channel),
578 m_callback(requester),
579 m_pvRequest(pvRequest)
591 BaseRequestImpl::activate();
597 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
598 }
catch (std::runtime_error &rte) {
600 BaseRequestImpl::destroy(
true);
604 virtual ~ChannelGetImpl()
608 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
611 int32 pendingRequest = beginRequest();
612 bool initStage = ((pendingRequest &
QOS_INIT) != 0);
614 if (pendingRequest < 0)
616 base_send(buffer, control, pendingRequest);
621 buffer->
putInt(m_channel->getServerChannelID());
628 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
642 m_structure = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.
get(), m_structure);
643 m_bitSet = createBitSetFor(m_structure, m_bitSet);
647 EXCEPTION_GUARD3(m_callback, cb, cb->channelGetConnect(status, external_from_this<ChannelGetImpl>(), m_structure->getStructure()));
661 m_bitSet->deserialize(payloadBuffer, transport.
get());
662 m_structure->deserialize(payloadBuffer, transport.
get(), m_bitSet.get());
665 EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, external_from_this<ChannelGetImpl>(), m_structure, m_bitSet));
670 ChannelGet::shared_pointer thisPtr(external_from_this<ChannelGetImpl>());
678 if (!m_initialized) {
702 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelGetImpl>());
704 }
catch (std::runtime_error &rte) {
712 return BaseRequestImpl::getChannel();
717 BaseRequestImpl::cancel();
722 BaseRequestImpl::destroy();
727 BaseRequestImpl::lastRequest();
732 m_structureMutex.lock();
737 m_structureMutex.unlock();
748 class ChannelPutImpl :
749 public BaseRequestImpl,
753 const ChannelPutRequester::weak_pointer m_callback;
755 const PVStructure::shared_pointer m_pvRequest;
757 PVStructure::shared_pointer m_structure;
758 BitSet::shared_pointer m_bitSet;
760 Mutex m_structureMutex;
762 ChannelPutImpl(ClientChannelImpl::shared_pointer
const & channel,
763 ChannelPutRequester::shared_pointer
const &
requester,
764 PVStructure::shared_pointer
const & pvRequest) :
765 BaseRequestImpl(channel),
766 m_callback(requester),
767 m_pvRequest(pvRequest)
779 BaseRequestImpl::activate();
785 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
786 }
catch (std::runtime_error &rte) {
788 BaseRequestImpl::destroy(
true);
792 virtual ~ChannelPutImpl()
796 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
799 int32 pendingRequest = beginRequest();
800 if (pendingRequest < 0)
802 base_send(buffer, control, pendingRequest);
807 buffer->
putInt(m_channel->getServerChannelID());
814 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
816 else if (!(pendingRequest &
QOS_GET))
823 m_bitSet->serialize(buffer, control);
824 m_structure->serialize(buffer, control, m_bitSet.get());
839 m_structure = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.
get(), m_structure);
840 m_bitSet = createBitSetFor(m_structure, m_bitSet);
844 EXCEPTION_GUARD3(m_callback, cb, cb->channelPutConnect(status, external_from_this<ChannelPutImpl>(), m_structure->getStructure()));
849 ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
861 m_bitSet->deserialize(payloadBuffer, transport.
get());
862 m_structure->deserialize(payloadBuffer, transport.
get(), m_bitSet.get());
865 EXCEPTION_GUARD3(m_callback, cb, cb->getDone(status, thisPtr, m_structure, m_bitSet));
875 ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
883 if (!m_initialized) {
896 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
897 }
catch (std::runtime_error &rte) {
903 virtual void put(PVStructure::shared_pointer
const & pvPutStructure, BitSet::shared_pointer
const & pvPutBitSet)
OVERRIDE FINAL {
905 ChannelPut::shared_pointer thisPtr(external_from_this<ChannelPutImpl>());
913 if (!m_initialized) {
914 EXCEPTION_GUARD3(m_callback, cb, cb->putDone(notInitializedStatus, thisPtr));
920 if (!(*m_structure->getStructure() == *pvPutStructure->getStructure()))
922 EXCEPTION_GUARD3(m_callback, cb, cb->putDone(invalidPutStructureStatus, thisPtr));
927 EXCEPTION_GUARD3(m_callback, cb, cb->putDone(otherRequestPendingStatus, thisPtr));
934 *m_bitSet = *pvPutBitSet;
935 m_structure->copyUnchecked(*pvPutStructure, *m_bitSet);
937 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutImpl>());
938 }
catch (std::runtime_error &rte) {
940 EXCEPTION_GUARD3(m_callback, cb, cb->putDone(channelNotConnected, thisPtr));
946 return BaseRequestImpl::getChannel();
951 BaseRequestImpl::cancel();
956 BaseRequestImpl::destroy();
961 BaseRequestImpl::lastRequest();
966 m_structureMutex.lock();
971 m_structureMutex.unlock();
982 class ChannelPutGetImpl :
983 public BaseRequestImpl,
987 const ChannelPutGetRequester::weak_pointer m_callback;
989 const PVStructure::shared_pointer m_pvRequest;
992 PVStructure::shared_pointer m_putData;
993 BitSet::shared_pointer m_putDataBitSet;
996 PVStructure::shared_pointer m_getData;
997 BitSet::shared_pointer m_getDataBitSet;
999 Mutex m_structureMutex;
1001 ChannelPutGetImpl(ClientChannelImpl::shared_pointer
const & channel,
1002 ChannelPutGetRequester::shared_pointer
const &
requester,
1003 PVStructure::shared_pointer
const & pvRequest) :
1004 BaseRequestImpl(channel),
1005 m_callback(requester),
1006 m_pvRequest(pvRequest)
1018 BaseRequestImpl::activate();
1021 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1022 }
catch (std::runtime_error &rte) {
1024 BaseRequestImpl::destroy(
true);
1029 virtual ~ChannelPutGetImpl()
1033 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
1036 int32 pendingRequest = beginRequest();
1037 if (pendingRequest < 0)
1039 base_send(buffer, control, pendingRequest);
1044 buffer->
putInt(m_channel->getServerChannelID());
1046 if ((pendingRequest &
QOS_INIT) == 0)
1049 if (pendingRequest & QOS_INIT)
1054 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1064 m_putDataBitSet->serialize(buffer, control);
1065 m_putData->serialize(buffer, control, m_putDataBitSet.get());
1079 m_putData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.
get());
1080 m_putDataBitSet = createBitSetFor(m_putData, m_putDataBitSet);
1081 m_getData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.
get());
1082 m_getDataBitSet = createBitSetFor(m_getData, m_getDataBitSet);
1086 EXCEPTION_GUARD3(m_callback, cb, cb->channelPutGetConnect(status, external_from_this<ChannelPutGetImpl>(), m_putData->getStructure(), m_getData->getStructure()));
1092 ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1105 m_getDataBitSet->deserialize(payloadBuffer, transport.
get());
1106 m_getData->deserialize(payloadBuffer, transport.
get(), m_getDataBitSet.get());
1109 EXCEPTION_GUARD3(m_callback, cb, cb->getGetDone(status, thisPtr, m_getData, m_getDataBitSet));
1122 m_putDataBitSet->deserialize(payloadBuffer, transport.
get());
1123 m_putData->deserialize(payloadBuffer, transport.
get(), m_putDataBitSet.get());
1126 EXCEPTION_GUARD3(m_callback, cb, cb->getPutDone(status, thisPtr, m_putData, m_putDataBitSet));
1139 m_getDataBitSet->deserialize(payloadBuffer, transport.
get());
1140 m_getData->deserialize(payloadBuffer, transport.
get(), m_getDataBitSet.get());
1143 EXCEPTION_GUARD3(m_callback, cb, cb->putGetDone(status, thisPtr, m_getData, m_getDataBitSet));
1148 virtual void putGet(PVStructure::shared_pointer
const & pvPutStructure, BitSet::shared_pointer
const & bitSet)
OVERRIDE FINAL {
1150 ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1153 Lock guard(m_mutex);
1158 if (!m_initialized) {
1164 if (!(*m_putData->getStructure() == *pvPutStructure->getStructure()))
1178 *m_putDataBitSet = *bitSet;
1179 m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet);
1181 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1182 }
catch (std::runtime_error &rte) {
1190 ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1193 Lock guard(m_mutex);
1198 if (!m_initialized) {
1210 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1211 }
catch (std::runtime_error &rte) {
1219 ChannelPutGet::shared_pointer thisPtr(external_from_this<ChannelPutGetImpl>());
1222 Lock guard(m_mutex);
1227 if (!m_initialized) {
1239 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelPutGetImpl>());
1240 }
catch (std::runtime_error &rte) {
1248 return BaseRequestImpl::getChannel();
1253 BaseRequestImpl::cancel();
1258 BaseRequestImpl::destroy();
1263 BaseRequestImpl::lastRequest();
1268 m_structureMutex.lock();
1273 m_structureMutex.unlock();
1287 class ChannelRPCImpl :
1288 public BaseRequestImpl,
1292 const ChannelRPCRequester::weak_pointer m_callback;
1294 const PVStructure::shared_pointer m_pvRequest;
1296 PVStructure::shared_pointer m_structure;
1298 Mutex m_structureMutex;
1300 ChannelRPCImpl(ClientChannelImpl::shared_pointer
const & channel,
1301 ChannelRPCRequester::shared_pointer
const &
requester,
1302 PVStructure::shared_pointer
const & pvRequest) :
1303 BaseRequestImpl(channel),
1304 m_callback(requester),
1305 m_pvRequest(pvRequest)
1313 EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(pvRequestNull, external_from_this<ChannelRPCImpl>()));
1317 BaseRequestImpl::activate();
1321 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1322 }
catch (std::runtime_error &rte) {
1323 EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(channelDestroyed, external_from_this<ChannelRPCImpl>()));
1324 BaseRequestImpl::destroy(
true);
1328 virtual ~ChannelRPCImpl()
1332 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
1335 int32 pendingRequest = beginRequest();
1336 if (pendingRequest < 0)
1338 base_send(buffer, control, pendingRequest);
1343 buffer->
putInt(m_channel->getServerChannelID());
1345 if ((pendingRequest &
QOS_INIT) == 0)
1348 if (pendingRequest & QOS_INIT)
1353 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1360 SerializationHelper::serializeStructureFull(buffer, control, m_structure);
1362 m_structure.reset();
1370 EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this<ChannelRPCImpl>()));
1375 EXCEPTION_GUARD3(m_callback, cb, cb->channelRPCConnect(status, external_from_this<ChannelRPCImpl>()));
1380 ChannelRPC::shared_pointer thisPtr(external_from_this<ChannelRPCImpl>());
1389 PVStructure::shared_pointer response(SerializationHelper::deserializeStructureFull(payloadBuffer, transport.
get()));
1390 EXCEPTION_GUARD3(m_callback, cb, cb->requestDone(status, thisPtr, response));
1393 virtual void request(epics::pvData::PVStructure::shared_pointer
const & pvArgument)
OVERRIDE FINAL {
1395 ChannelRPC::shared_pointer thisPtr(external_from_this<ChannelRPCImpl>());
1398 Lock guard(m_mutex);
1403 if (!m_initialized) {
1417 m_structure = pvArgument;
1420 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelRPCImpl>());
1421 }
catch (std::runtime_error &rte) {
1429 return BaseRequestImpl::getChannel();
1434 BaseRequestImpl::cancel();
1439 BaseRequestImpl::destroy();
1444 BaseRequestImpl::lastRequest();
1449 m_structureMutex.lock();
1454 m_structureMutex.unlock();
1466 class ChannelArrayImpl :
1467 public BaseRequestImpl,
1471 const ChannelArrayRequester::weak_pointer m_callback;
1473 const PVStructure::shared_pointer m_pvRequest;
1476 PVArray::shared_pointer m_arrayData;
1484 Mutex m_structureMutex;
1486 ChannelArrayImpl(ClientChannelImpl::shared_pointer
const & channel,
1487 ChannelArrayRequester::shared_pointer
const &
requester,
1488 PVStructure::shared_pointer
const & pvRequest) :
1489 BaseRequestImpl(channel),
1490 m_callback(requester),
1491 m_pvRequest(pvRequest),
1492 m_offset(0), m_count(0),
1501 EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(pvRequestNull, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1505 BaseRequestImpl::activate();
1509 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
1510 }
catch (std::runtime_error &rte) {
1511 EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(channelDestroyed, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1512 BaseRequestImpl::destroy(
true);
1516 virtual ~ChannelArrayImpl()
1520 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
1523 int32 pendingRequest = beginRequest();
1524 if (pendingRequest < 0)
1526 base_send(buffer, control, pendingRequest);
1531 buffer->
putInt(m_channel->getServerChannelID());
1538 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
1540 else if (pendingRequest &
QOS_GET)
1543 SerializeHelper::writeSize(m_offset, buffer, control);
1544 SerializeHelper::writeSize(m_count, buffer, control);
1545 SerializeHelper::writeSize(m_stride, buffer, control);
1550 SerializeHelper::writeSize(m_length, buffer, control);
1562 SerializeHelper::writeSize(m_offset, buffer, control);
1563 SerializeHelper::writeSize(m_stride, buffer, control);
1565 m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength());
1573 EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this<ChannelArrayImpl>(), Array::shared_pointer()));
1578 FieldConstPtr field = transport->cachedDeserialize(payloadBuffer);
1585 EXCEPTION_GUARD3(m_callback, cb, cb->channelArrayConnect(status, external_from_this<ChannelArrayImpl>(), m_arrayData->getArray()));
1590 ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1596 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, PVArray::shared_pointer()));
1602 m_arrayData->deserialize(payloadBuffer, transport.
get());
1605 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(status, thisPtr, m_arrayData));
1613 size_t length = SerializeHelper::readSize(payloadBuffer, transport.
get());
1615 EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(status, thisPtr, length));
1624 virtual void getArray(
size_t offset,
size_t count,
size_t stride)
OVERRIDE FINAL {
1628 ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1631 Lock guard(m_mutex);
1633 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(destroyedStatus, thisPtr, PVArray::shared_pointer()));
1636 if (!m_initialized) {
1637 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(notInitializedStatus, thisPtr, PVArray::shared_pointer()));
1643 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(otherRequestPendingStatus, thisPtr, PVArray::shared_pointer()));
1654 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1655 }
catch (std::runtime_error &rte) {
1657 EXCEPTION_GUARD3(m_callback, cb, cb->getArrayDone(channelNotConnected, thisPtr, PVArray::shared_pointer()));
1661 virtual void putArray(PVArray::shared_pointer
const & putArray,
size_t offset,
size_t count,
size_t stride)
OVERRIDE FINAL {
1665 ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1668 Lock guard(m_mutex);
1670 EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(destroyedStatus, thisPtr));
1673 if (!m_initialized) {
1674 EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(notInitializedStatus, thisPtr));
1679 if (!(*m_arrayData->getArray() == *putArray->getArray()))
1681 EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(invalidPutArrayStatus, thisPtr));
1686 EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(otherRequestPendingStatus, thisPtr));
1693 m_arrayData->copyUnchecked(*putArray);
1698 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1699 }
catch (std::runtime_error &rte) {
1701 EXCEPTION_GUARD3(m_callback, cb, cb->putArrayDone(channelNotConnected, thisPtr));
1707 ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1710 Lock guard(m_mutex);
1712 EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(destroyedStatus, thisPtr));
1715 if (!m_initialized) {
1716 EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(notInitializedStatus, thisPtr));
1722 EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(otherRequestPendingStatus, thisPtr));
1731 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1732 }
catch (std::runtime_error &rte) {
1734 EXCEPTION_GUARD3(m_callback, cb, cb->setLengthDone(channelNotConnected, thisPtr));
1741 ChannelArray::shared_pointer thisPtr(external_from_this<ChannelArrayImpl>());
1744 Lock guard(m_mutex);
1746 EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(destroyedStatus, thisPtr, 0));
1749 if (!m_initialized) {
1750 EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(notInitializedStatus, thisPtr, 0));
1756 EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(otherRequestPendingStatus, thisPtr, 0));
1761 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelArrayImpl>());
1762 }
catch (std::runtime_error &rte) {
1764 EXCEPTION_GUARD3(m_callback, cb, cb->getLengthDone(channelNotConnected, thisPtr, 0));
1770 return BaseRequestImpl::getChannel();
1775 BaseRequestImpl::cancel();
1780 BaseRequestImpl::destroy();
1785 BaseRequestImpl::lastRequest();
1790 m_structureMutex.lock();
1795 m_structureMutex.unlock();
1805 class MonitorStrategy :
public Monitor {
1807 virtual ~MonitorStrategy() {};
1809 virtual void response(Transport::shared_pointer
const & transport,
ByteBuffer* payloadBuffer) = 0;
1810 virtual void unlisten() = 0;
1813 typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
1814 typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
1817 class MonitorStrategyQueue :
1818 public MonitorStrategy,
1820 public std::tr1::enable_shared_from_this<MonitorStrategyQueue>
1824 const int32 m_queueSize;
1827 FreeElementQueue m_freeQueue;
1828 MonitorElementQueue m_monitorQueue;
1831 const MonitorRequester::weak_pointer m_callback;
1837 MonitorElement::shared_pointer m_overrunElement;
1838 bool m_overrunInProgress;
1840 PVStructure::shared_pointer m_up2datePVStructure;
1842 int32 m_releasedCount;
1843 bool m_reportQueueStateInProgress;
1846 const ClientChannelImpl::shared_pointer m_channel;
1849 const bool m_pipeline;
1850 const int32 m_ackAny;
1856 MonitorStrategyQueue(ClientChannelImpl::shared_pointer channel,
pvAccessID ioid,
1857 MonitorRequester::weak_pointer
const &
callback,
1859 bool pipeline,
int32 ackAny) :
1860 m_queueSize(queueSize), m_lastStructure(),
1863 m_callback(callback), m_mutex(),
1864 m_bitSet1(), m_bitSet2(), m_overrunInProgress(
false),
1866 m_reportQueueStateInProgress(
false),
1867 m_channel(channel), m_ioid(ioid),
1868 m_pipeline(pipeline), m_ackAny(ackAny),
1872 throw std::invalid_argument(
"queueSize <= 1");
1874 m_freeQueue.reserve(m_queueSize);
1879 virtual ~MonitorStrategyQueue() {}
1882 Lock guard(m_mutex);
1884 m_releasedCount = 0;
1885 m_reportQueueStateInProgress =
false;
1888 while (!m_monitorQueue.empty())
1889 m_monitorQueue.pop();
1891 m_freeQueue.clear();
1893 m_up2datePVStructure.reset();
1895 for (
int32 i = 0;
i < m_queueSize;
i++)
1897 PVStructure::shared_pointer pvStructure =
getPVDataCreate()->createPVStructure(structure);
1898 MonitorElement::shared_pointer monitorElement(
new MonitorElement(pvStructure));
1899 m_freeQueue.push_back(monitorElement);
1907 virtual void response(Transport::shared_pointer
const & transport,
ByteBuffer* payloadBuffer)
OVERRIDE FINAL {
1911 Lock guard(m_mutex);
1913 if (m_overrunInProgress)
1916 BitSet::shared_pointer changedBitSet = m_overrunElement->changedBitSet;
1917 BitSet::shared_pointer overrunBitSet = m_overrunElement->overrunBitSet;
1920 pvStructure->deserialize(payloadBuffer, transport.
get(), &m_bitSet1);
1926 overrunBitSet->or_and(*(changedBitSet.get()), m_bitSet1);
1929 *(changedBitSet.get()) |= m_bitSet1;
1932 *(overrunBitSet.get()) |= m_bitSet2;
1940 m_freeQueue.pop_back();
1942 if (m_freeQueue.empty())
1944 m_overrunInProgress =
true;
1945 m_overrunElement = newElement;
1950 BitSet::shared_pointer changedBitSet = newElement->changedBitSet;
1951 BitSet::shared_pointer overrunBitSet = newElement->overrunBitSet;
1955 if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get()) {
1956 assert(pvStructure->getStructure().get()==m_up2datePVStructure->getStructure().get());
1957 pvStructure->copyUnchecked(*m_up2datePVStructure, *changedBitSet,
true);
1959 pvStructure->deserialize(payloadBuffer, transport.
get(), changedBitSet.get());
1960 overrunBitSet->deserialize(payloadBuffer, transport.
get());
1962 m_up2datePVStructure = pvStructure;
1964 if (!m_overrunInProgress)
1965 m_monitorQueue.push(newElement);
1968 if (!m_overrunInProgress)
1976 bool notifyUnlisten =
false;
1978 Lock guard(m_mutex);
1979 notifyUnlisten = m_monitorQueue.empty();
1980 m_unlisten = !notifyUnlisten;
1989 virtual MonitorElement::shared_pointer poll()
OVERRIDE FINAL {
1990 Lock guard(m_mutex);
1992 if (m_monitorQueue.empty()) {
1999 return MonitorElement::shared_pointer();
2002 MonitorElement::shared_pointer retVal(m_monitorQueue.front());
2003 m_monitorQueue.pop();
2008 virtual void release(MonitorElement::shared_pointer
const & monitorElement)
OVERRIDE FINAL {
2013 if (monitorElement->pvStructurePtr->getStructure().get() != m_lastStructure.get())
2016 bool sendAck =
false;
2018 Lock guard(m_mutex);
2020 m_freeQueue.push_back(monitorElement);
2022 if (m_overrunInProgress)
2026 BitSetUtil::compress(m_overrunElement->changedBitSet, pvStructure);
2027 BitSetUtil::compress(m_overrunElement->overrunBitSet, pvStructure);
2029 m_monitorQueue.push(m_overrunElement);
2031 m_overrunElement.reset();
2032 m_overrunInProgress =
false;
2038 if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny)
2041 m_reportQueueStateInProgress =
true;
2051 m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
2052 }
catch (std::runtime_error&) {
2055 m_reportQueueStateInProgress =
false;
2056 }
catch (std::exception& e) {
2057 LOG(
logLevelWarn,
"Ignore exception during MonitorStrategyQueue::release: %s", e.what());
2059 m_reportQueueStateInProgress =
false;
2067 buffer->
putInt(m_channel->getServerChannelID());
2072 Lock guard(m_mutex);
2073 buffer->
putInt(m_releasedCount);
2074 m_releasedCount = 0;
2075 m_reportQueueStateInProgress =
false;
2079 control->
flush(
true);
2083 Lock guard(m_mutex);
2084 while (!m_monitorQueue.empty())
2086 m_freeQueue.push_back(m_monitorQueue.front());
2087 m_monitorQueue.pop();
2089 if (m_overrunElement)
2091 m_freeQueue.push_back(m_overrunElement);
2092 m_overrunElement.reset();
2094 m_overrunInProgress =
false;
2110 class ChannelMonitorImpl :
2111 public BaseRequestImpl,
2115 const MonitorRequester::weak_pointer m_callback;
2118 const PVStructure::shared_pointer m_pvRequest;
2120 std::tr1::shared_ptr<MonitorStrategy> m_monitorStrategy;
2127 ClientChannelImpl::shared_pointer
const & channel,
2128 MonitorRequester::shared_pointer
const &
requester,
2129 PVStructure::shared_pointer
const & pvRequest)
2131 BaseRequestImpl(channel),
2132 m_callback(requester),
2134 m_pvRequest(pvRequest),
2154 m_queueSize = option->getAs<
int32>();
2157 }
catch(std::runtime_error& e){
2162 option = pvOptions->getSubField<
PVScalar>(
"pipeline");
2166 }
catch(std::runtime_error& e){
2175 m_ackAny = m_queueSize/2;
2177 option = pvOptions->getSubField<
PVScalar>(
"ackAny");
2182 if(option->getScalar()->getScalarType()==
pvString) {
2183 std::string sval(option->getAs<std::string>());
2185 if(!sval.empty() && sval[sval.size()-1]==
'%') {
2187 double percent = castUnsafe<double>(sval.substr(0, sval.size()-1));
2188 size = (m_queueSize * percent) / 100.0;
2190 }
catch(std::runtime_error&){
2198 size = option->getAs<
int32>();
2200 }
catch(std::runtime_error&){
2206 }
else if (size <= 0) {
2209 m_ackAny = (m_ackAny <= m_queueSize) ? size : m_queueSize;
2215 BaseRequestImpl::activate();
2217 std::tr1::shared_ptr<MonitorStrategyQueue> tp(
2218 new MonitorStrategyQueue(m_channel, m_ioid, m_callback, m_queueSize,
2219 m_pipeline, m_ackAny)
2221 m_monitorStrategy = tp;
2225 resubscribeSubscription(m_channel->checkDestroyedAndGetTransport());
2226 }
catch (std::runtime_error &rte) {
2228 BaseRequestImpl::destroy(
true);
2233 virtual void resubscribeSubscription(Transport::shared_pointer
const & transport)
OVERRIDE FINAL {
2234 if (transport.get() != 0 && !m_subscribed.get() &&
2238 transport->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2242 virtual ~ChannelMonitorImpl() {}
2244 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
return m_callback.lock(); }
2247 int32 pendingRequest = beginRequest();
2248 if (pendingRequest < 0)
2250 base_send(buffer, control, pendingRequest);
2255 buffer->
putInt(m_channel->getServerChannelID());
2262 SerializationHelper::serializePVRequest(buffer, control, m_pvRequest);
2268 buffer->
putInt(m_queueSize);
2273 virtual void initResponse(
2274 Transport::shared_pointer
const & transport,
2288 transport->cachedDeserialize(payloadBuffer)
2291 throw std::runtime_error(
"initResponse() w/o Structure");
2292 m_monitorStrategy->init(structure);
2294 bool restoreStartedState = m_started;
2297 EXCEPTION_GUARD3(m_callback, cb, cb->monitorConnect(status, external_from_this<ChannelMonitorImpl>(), structure));
2299 if (restoreStartedState)
2303 virtual void normalResponse(
2304 Transport::shared_pointer
const & transport,
2319 m_monitorStrategy->response(transport, payloadBuffer);
2322 m_monitorStrategy->unlisten();
2326 m_monitorStrategy->response(transport, payloadBuffer);
2331 virtual void response(
2332 Transport::shared_pointer
const & transport,
2336 transport->ensureData(1);
2345 m_initialized =
true;
2347 initResponse(transport, version, payloadBuffer, qos, status);
2356 m_initialized =
false;
2359 normalResponse(transport, version, payloadBuffer, qos, status);
2363 normalResponse(transport, version, payloadBuffer, qos, Status::Ok);
2370 Lock guard(m_mutex);
2373 return BaseRequestImpl::destroyedStatus;
2375 return BaseRequestImpl::notInitializedStatus;
2377 m_monitorStrategy->start();
2381 return BaseRequestImpl::otherRequestPendingStatus;
2383 bool restore = m_started;
2390 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2392 }
catch (std::runtime_error &rte) {
2395 m_started = restore;
2397 return BaseRequestImpl::channelNotConnected;
2403 Lock guard(m_mutex);
2406 return BaseRequestImpl::destroyedStatus;
2408 return BaseRequestImpl::notInitializedStatus;
2410 m_monitorStrategy->stop();
2414 return BaseRequestImpl::otherRequestPendingStatus;
2416 bool restore = m_started;
2423 m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this<ChannelMonitorImpl>());
2425 }
catch (std::runtime_error &rte) {
2428 m_started = restore;
2430 return BaseRequestImpl::channelNotConnected;
2437 BaseRequestImpl::destroy();
2440 virtual MonitorElement::shared_pointer poll()
OVERRIDE FINAL 2442 return m_monitorStrategy->poll();
2445 virtual void release(MonitorElement::shared_pointer
const & monitorElement)
OVERRIDE FINAL 2447 m_monitorStrategy->release(monitorElement);
2457 const ClientContextImpl::weak_pointer _context;
2459 AbstractClientResponseHandler(ClientContextImpl::shared_pointer
const & context,
string const & description) :
2460 ResponseHandler(context.get(), description), _context(ClientContextImpl::weak_pointer(context)) {
2463 virtual ~AbstractClientResponseHandler() {
2467 class NoopResponse :
public AbstractClientResponseHandler {
2469 NoopResponse(ClientContextImpl::shared_pointer
const & context,
string const & description) :
2470 AbstractClientResponseHandler(context, description)
2474 virtual ~NoopResponse() {
2479 class ResponseRequestHandler :
public AbstractClientResponseHandler {
2481 ResponseRequestHandler(ClientContextImpl::shared_pointer
const & context) :
2482 AbstractClientResponseHandler(context,
"Data response")
2486 virtual ~ResponseRequestHandler() {
2489 virtual void handleResponse(
osiSockAddr* responseFrom,
2490 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2493 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2495 transport->ensureData(4);
2497 ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(payloadBuffer->
getInt());
2500 epics::atomic::add(rr->bytesRX, payloadSize);
2501 rr->response(transport, version, payloadBuffer);
2511 class MultipleResponseRequestHandler :
public AbstractClientResponseHandler {
2513 MultipleResponseRequestHandler(ClientContextImpl::shared_pointer
const & context) :
2514 AbstractClientResponseHandler(context,
"Multiple data response")
2518 virtual ~MultipleResponseRequestHandler() {
2521 virtual void handleResponse(
osiSockAddr* responseFrom,
2522 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2525 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2530 ClientContextImpl::shared_pointer context = _context.lock();
2533 transport->ensureData(4);
2538 ResponseRequest::shared_pointer rr = context->getResponseRequest(ioid);
2541 epics::atomic::add(rr->bytesRX, payloadSize);
2542 rr->response(transport, version, payloadBuffer);
2550 class SearchResponseHandler :
public AbstractClientResponseHandler {
2552 SearchResponseHandler(ClientContextImpl::shared_pointer
const & context) :
2553 AbstractClientResponseHandler(context,
"Search response")
2557 virtual ~SearchResponseHandler() {
2560 virtual void handleResponse(
osiSockAddr* responseFrom,
2561 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2564 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2566 transport->ensureData(12+4+16+2);
2574 memset(&serverAddress, 0,
sizeof(serverAddress));
2575 serverAddress.
ia.sin_family = AF_INET;
2581 if (serverAddress.
ia.sin_addr.s_addr == INADDR_ANY)
2582 serverAddress.
ia.sin_addr = responseFrom->
ia.sin_addr;
2586 serverAddress.
ia.sin_port = htons(port);
2588 SerializeHelper::deserializeString(payloadBuffer, transport.
get());
2590 transport->ensureData(1);
2591 bool found = payloadBuffer->
getByte() != 0;
2597 ClientContextImpl::shared_pointer context(_context.lock());
2600 std::tr1::shared_ptr<epics::pvAccess::ChannelSearchManager> csm = context->getChannelSearchManager();
2602 for (
int i = 0;
i < count;
i++)
2604 transport->ensureData(4);
2606 csm->searchResponse(guid, cid, searchSequenceId, version, &serverAddress);
2613 class SearchHandler :
public AbstractClientResponseHandler {
2615 SearchHandler(ClientContextImpl::shared_pointer
const & context) :
2616 AbstractClientResponseHandler(context,
"Search")
2620 virtual ~SearchHandler() {
2623 virtual void handleResponse(
osiSockAddr* responseFrom,
2624 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2627 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2629 transport->ensureData(4+1+3+16+2);
2631 size_t startPosition = payloadBuffer->
getPosition();
2641 memset(&responseAddress, 0,
sizeof(responseAddress));
2642 responseAddress.
ia.sin_family = AF_INET;
2648 if (responseAddress.
ia.sin_addr.s_addr == INADDR_ANY)
2649 responseAddress.
ia.sin_addr = responseFrom->
ia.sin_addr;
2653 responseAddress.
ia.sin_port = htons(port);
2661 if ((qosCode & 0x80) == 0x80)
2664 ClientContextImpl::shared_pointer context = _context.lock();
2669 if (bt && bt->hasLocalMulticastAddress())
2678 payloadBuffer->
putInt(16);
2683 payloadBuffer->
put(startPosition+4, (
int8)(qosCode & ~0x80));
2693 bt->getLocalMulticastAddress());
2702 class BeaconResponseHandler :
public AbstractClientResponseHandler {
2704 BeaconResponseHandler(ClientContextImpl::shared_pointer
const & context) :
2705 AbstractClientResponseHandler(context,
"Beacon")
2708 virtual ~BeaconResponseHandler() {}
2710 virtual void handleResponse(
osiSockAddr* responseFrom,
2711 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2718 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2720 transport->ensureData(12+2+2+16+2);
2730 memset(&serverAddress, 0,
sizeof(serverAddress));
2731 serverAddress.
ia.sin_family = AF_INET;
2737 if (serverAddress.
ia.sin_addr.s_addr == INADDR_ANY)
2738 serverAddress.
ia.sin_addr = responseFrom->
ia.sin_addr;
2742 serverAddress.
ia.sin_port = htons(port);
2744 string protocol(SerializeHelper::deserializeString(payloadBuffer, transport.
get()));
2749 ClientContextImpl::shared_pointer context = _context.lock();
2753 std::tr1::shared_ptr<epics::pvAccess::BeaconHandler> beaconHandler = context->getBeaconHandler(responseFrom);
2764 data->deserialize(payloadBuffer, transport.
get());
2768 beaconHandler->beaconNotify(responseFrom, version, ×tamp, guid, sequentalID, changeCount, data);
2772 class ClientConnectionValidationHandler :
public AbstractClientResponseHandler {
2774 ClientConnectionValidationHandler(ClientContextImpl::shared_pointer context) :
2775 AbstractClientResponseHandler(context,
"Connection validation")
2778 virtual ~ClientConnectionValidationHandler() {}
2780 virtual void handleResponse(
osiSockAddr* responseFrom,
2781 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2784 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2786 transport->ensureData(4+2);
2788 transport->setRemoteTransportReceiveBufferSize(payloadBuffer->
getInt());
2794 size_t size = SerializeHelper::readSize(payloadBuffer, transport.
get());
2795 vector<string> offeredSecurityPlugins;
2796 offeredSecurityPlugins.reserve(size);
2797 for (
size_t i = 0;
i < size;
i++)
2798 offeredSecurityPlugins.push_back(
2799 SerializeHelper::deserializeString(payloadBuffer, transport.
get())
2810 class ClientConnectionValidatedHandler :
public AbstractClientResponseHandler {
2812 ClientConnectionValidatedHandler(ClientContextImpl::shared_pointer context) :
2813 AbstractClientResponseHandler(context,
"Connection validated")
2816 virtual ~ClientConnectionValidatedHandler() {}
2818 virtual void handleResponse(
osiSockAddr* responseFrom,
2819 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2822 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2826 transport->verified(status);
2831 class MessageHandler :
public AbstractClientResponseHandler {
2833 MessageHandler(ClientContextImpl::shared_pointer
const & context) :
2834 AbstractClientResponseHandler(context,
"Message")
2837 virtual ~MessageHandler() {}
2839 virtual void handleResponse(
osiSockAddr* responseFrom,
2840 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2843 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2845 transport->ensureData(5);
2849 string message = SerializeHelper::deserializeString(payloadBuffer, transport.
get());
2852 ResponseRequest::shared_pointer rr = _context.lock()->getResponseRequest(ioid);
2855 epics::atomic::add(rr->bytesRX, payloadSize);
2856 Requester::shared_pointer
requester = rr->getRequester();
2858 requester->message(message, type);
2863 std::cerr<<
"Orphaned server message "<<type<<
" : "<<message<<
"\n";
2867 class CreateChannelHandler :
public AbstractClientResponseHandler {
2869 CreateChannelHandler(ClientContextImpl::shared_pointer
const & context) :
2870 AbstractClientResponseHandler(context,
"Create channel")
2873 virtual ~CreateChannelHandler() {}
2875 virtual void handleResponse(
osiSockAddr* responseFrom,
2876 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2879 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2881 transport->ensureData(8);
2897 std::stringstream ss;
2898 ss <<
"Failed to create channel '" << channel->
getChannelName() <<
"': ";
2905 channel->createChannelFailed();
2911 channel->connectionCompleted(sid);
2917 class DestroyChannelHandler :
public AbstractClientResponseHandler {
2919 DestroyChannelHandler(ClientContextImpl::shared_pointer
const & context) :
2920 AbstractClientResponseHandler(context,
"Destroy channel")
2923 virtual ~DestroyChannelHandler() {}
2925 virtual void handleResponse(
osiSockAddr* responseFrom,
2926 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
2929 AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
2931 transport->ensureData(8);
2955 vector<ResponseHandler::shared_pointer> m_handlerTable;
2959 virtual ~ClientResponseHandler() {}
2964 ClientResponseHandler(ClientContextImpl::shared_pointer
const & context)
2967 ResponseHandler::shared_pointer ignoreResponse(
new NoopResponse(context,
"Ignore"));
2968 ResponseHandler::shared_pointer dataResponse(
new ResponseRequestHandler(context));
2972 m_handlerTable[
CMD_BEACON].reset(
new BeaconResponseHandler(context));
2974 m_handlerTable[
CMD_ECHO] = ignoreResponse;
2975 m_handlerTable[
CMD_SEARCH].reset(
new SearchHandler(context));
2982 m_handlerTable[
CMD_GET] = dataResponse;
2983 m_handlerTable[
CMD_PUT] = dataResponse;
2986 m_handlerTable[
CMD_ARRAY] = dataResponse;
2990 m_handlerTable[
CMD_MESSAGE].reset(
new MessageHandler(context));
2991 m_handlerTable[
CMD_MULTIPLE_DATA].reset(
new MultipleResponseRequestHandler(context));
2992 m_handlerTable[
CMD_RPC] = dataResponse;
2996 virtual void handleResponse(
osiSockAddr* responseFrom,
2997 Transport::shared_pointer
const & transport,
int8 version,
int8 command,
3000 if (command < 0 || command >= (
int8)m_handlerTable.size())
3003 std::cerr<<
"Invalid (or unsupported) command: "<<
std::hex<<(int)(0xFF&command)<<
"\n" 3009 m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer);
3025 CONTEXT_NOT_INITIALIZED,
3030 CONTEXT_INITIALIZED,
3041 class InternalClientContextImpl :
3053 virtual ChannelFind::shared_pointer channelFind(
3054 std::string
const & channelName,
3055 ChannelFindRequester::shared_pointer
const & channelFindRequester)
OVERRIDE FINAL 3059 checkChannelName(channelName);
3061 if (!channelFindRequester.get())
3062 throw std::runtime_error(
"null requester");
3064 Status errorStatus(Status::STATUSTYPE_ERROR,
"not implemented");
3065 ChannelFind::shared_pointer nullChannelFind;
3066 EXCEPTION_GUARD(channelFindRequester->channelFindResult(errorStatus, nullChannelFind,
false));
3067 return nullChannelFind;
3070 virtual ChannelFind::shared_pointer channelList(
3071 ChannelListRequester::shared_pointer
const & channelListRequester)
OVERRIDE FINAL 3073 if (!channelListRequester.get())
3074 throw std::runtime_error(
"null requester");
3076 Status errorStatus(Status::STATUSTYPE_ERROR,
"not implemented");
3077 ChannelFind::shared_pointer nullChannelFind;
3079 EXCEPTION_GUARD(channelListRequester->channelListResult(errorStatus, nullChannelFind, none,
false));
3080 return nullChannelFind;
3083 virtual Channel::shared_pointer createChannel(
3084 std::string
const & channelName,
3085 ChannelRequester::shared_pointer
const & channelRequester,
3088 return createChannel(channelName, channelRequester, priority, std::string());
3091 virtual Channel::shared_pointer createChannel(
3092 std::string
const & channelName,
3093 ChannelRequester::shared_pointer
const & channelRequester,
3100 Channel::shared_pointer channel = createChannelInternal(channelName, channelRequester, priority, addresses);
3102 channelRequester->channelCreated(Status::Ok, channel);
3112 class InternalChannelImpl :
3116 InternalChannelImpl(InternalChannelImpl&);
3117 InternalChannelImpl& operator=(
const InternalChannelImpl&);
3122 const weak_pointer m_external_this, m_internal_this;
3124 shared_pointer external_from_this() {
3125 return shared_pointer(m_external_this);
3127 shared_pointer internal_from_this() {
3128 return shared_pointer(m_internal_this);
3134 const std::tr1::shared_ptr<InternalClientContextImpl> m_context;
3144 const string m_name;
3149 const ChannelRequester::weak_pointer m_requester;
3154 std::tr1::shared_ptr<ChannelGetFieldRequestImpl> m_getfield;
3160 const short m_priority;
3175 ConnectionState m_connectionState;
3180 IOIDResponseRequestMap m_responseRequests;
3185 Mutex m_responseRequestsMutex;
3187 bool m_needSubscriptionUpdate;
3192 bool m_allowCreation;
3201 Transport::shared_pointer m_transport;
3211 Mutex m_channelMutex;
3216 bool m_issueCreateMessage;
3219 int32_t m_userValue;
3227 static size_t num_instances;
3228 static size_t num_active;
3238 InternalChannelImpl(
3239 InternalClientContextImpl::shared_pointer
const & context,
3241 string const & name,
3242 ChannelRequester::shared_pointer
const &
requester,
3246 m_channelID(channelID),
3248 m_requester(requester),
3249 m_priority(priority),
3250 m_addresses(addresses),
3252 m_connectionState(NEVER_CONNECTED),
3253 m_needSubscriptionUpdate(
false),
3254 m_allowCreation(
true),
3255 m_serverChannelID(0xFFFFFFFF),
3256 m_issueCreateMessage(
true)
3258 REFTRACE_INCREMENT(num_instances);
3264 m_context->registerChannel(internal_from_this());
3269 REFTRACE_INCREMENT(num_active);
3274 static ClientChannelImpl::shared_pointer create(InternalClientContextImpl::shared_pointer context,
3276 string const & name,
3277 ChannelRequester::shared_pointer requester,
3281 std::tr1::shared_ptr<InternalChannelImpl>
internal(
3282 new InternalChannelImpl(context, channelID, name, requester, priority, addresses)),
3284 const_cast<weak_pointer&
>(
internal->m_internal_this) =
internal;
3285 const_cast<weak_pointer&
>(
internal->m_external_this) = external;
3286 internal->activate();
3290 virtual ~InternalChannelImpl()
3292 REFTRACE_DECREMENT(num_instances);
3298 Transport::shared_pointer old_transport;
3300 Lock guard(m_channelMutex);
3301 if (m_connectionState == DESTROYED)
3303 REFTRACE_DECREMENT(num_active);
3305 old_transport = m_transport;
3310 shared_pointer thisChannelPointer = internal_from_this();
3311 m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer);
3313 disconnectPendingIO(
true);
3315 if (m_connectionState == CONNECTED)
3317 disconnect(
false,
true);
3319 else if (m_transport)
3322 m_transport->release(getID());
3323 m_transport.reset();
3327 setConnectionState(DESTROYED);
3330 m_context->unregisterChannel(thisChannelPointer);
3334 reportChannelStateChange();
3339 return getChannelName();
3349 virtual ChannelProvider::shared_pointer getProvider()
OVERRIDE FINAL 3351 return m_context->external_from_this();
3357 Lock guard(m_channelMutex);
3358 if (m_connectionState != CONNECTED) {
3363 return m_transport->getRemoteName();
3372 virtual ChannelRequester::shared_pointer getChannelRequester()
OVERRIDE FINAL 3374 return ChannelRequester::shared_pointer(m_requester);
3379 Lock guard(m_channelMutex);
3380 return m_connectionState;
3397 return m_context.get();
3404 virtual const string& getSearchInstanceName()
OVERRIDE FINAL {
3409 Lock guard(m_channelMutex);
3410 return m_serverChannelID;
3413 virtual void registerResponseRequest(ResponseRequest::shared_pointer
const & responseRequest)
OVERRIDE FINAL 3415 Lock guard(m_responseRequestsMutex);
3416 m_responseRequests[responseRequest->getIOID()] = ResponseRequest::weak_pointer(responseRequest);
3422 Lock guard(m_responseRequestsMutex);
3423 m_responseRequests.erase(ioid);
3427 Lock guard(m_channelMutex);
3429 if (m_connectionState == DESTROYED)
3430 throw std::runtime_error(
"Channel destroyed.");
3431 else if (m_connectionState != CONNECTED)
3438 Transport::shared_pointer old_transport;
3439 Lock guard(m_channelMutex);
3440 old_transport = m_transport;
3443 if (m_connectionState == DESTROYED)
3444 throw std::runtime_error(
"Channel destroyed.");
3445 else if (m_connectionState == CONNECTED)
3446 disconnect(
false,
true);
3450 reportChannelStateChange();
3454 createChannelFailed();
3463 Transport::shared_pointer old_transport;
3464 Lock guard(m_channelMutex);
3468 m_transport->release(getID());
3469 old_transport.swap(m_transport);
3473 initiateSearch(
true);
3484 Lock guard(m_channelMutex);
3489 if (m_connectionState == DESTROYED)
3496 m_serverChannelID = sid;
3504 resubscribeSubscriptions();
3505 setConnectionState(CONNECTED);
3507 catch (std::exception& e) {
3508 LOG(
logLevelError,
"connectionCompleted() %d '%s' unhandled exception: %s\n", sid, m_name.c_str(), e.what());
3514 reportChannelStateChange();
3522 void disconnect(
bool initiateSearch,
bool remoteDestroy) {
3525 Transport::shared_pointer oldchan;
3526 Lock guard(m_channelMutex);
3528 if (m_connectionState != CONNECTED)
3531 if (!initiateSearch) {
3533 m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this());
3535 setConnectionState(DISCONNECTED);
3537 disconnectPendingIO(
false);
3542 if (remoteDestroy) {
3543 m_issueCreateMessage =
false;
3544 m_transport->enqueueSendRequest(internal_from_this());
3547 m_transport->release(getID());
3548 oldchan.swap(m_transport);
3552 this->initiateSearch();
3559 disconnect(
true,
false);
3562 reportChannelStateChange();
3566 #define STATIC_SEARCH_BASE_DELAY_SEC 5 3567 #define STATIC_SEARCH_MAX_MULTIPLIER 10 3572 void initiateSearch(
bool penalize =
false)
3574 Lock guard(m_channelMutex);
3576 m_allowCreation =
true;
3578 if (m_addresses.empty())
3580 m_context->getChannelSearchManager()->registerSearchInstance(internal_from_this(), penalize);
3584 m_context->getTimer()->scheduleAfterDelay(internal_from_this(),
3593 int ix = m_addressIndex % m_addresses.size();
3599 static ServerGUID guid = { { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 } };
3610 Transport::shared_pointer old_transport;
3612 Lock guard(m_channelMutex);
3613 Transport::shared_pointer transport(m_transport);
3621 EXCEPTION_GUARD3(m_requester, req, req->message(
"More than one channel with name '" + m_name +
3630 transport = m_context->getTransport(internal_from_this(), serverAddress, minorRevision, m_priority);
3633 createChannelFailed();
3643 Lock guard(m_channelMutex);
3646 if (!m_allowCreation)
3648 m_allowCreation =
false;
3651 if (m_transport && m_transport.get() != transport.get())
3653 disconnectPendingIO(
false);
3655 m_transport->release(getID());
3657 else if (m_transport.get() == transport.get())
3665 old_transport.swap(m_transport);
3666 m_transport.swap(transport);
3668 m_transport->enqueueSendRequest(internal_from_this());
3673 disconnect(
true,
false);
3676 reportChannelStateChange();
3679 virtual Transport::shared_pointer checkAndGetTransport()
OVERRIDE FINAL 3681 Lock guard(m_channelMutex);
3683 if (m_connectionState == DESTROYED)
3684 throw std::runtime_error(
"Channel destroyed.");
3685 else if (m_connectionState != CONNECTED)
3686 throw std::runtime_error(
"Channel not connected.");
3690 virtual Transport::shared_pointer checkDestroyedAndGetTransport()
OVERRIDE FINAL 3692 Lock guard(m_channelMutex);
3694 if (m_connectionState == DESTROYED)
3695 throw std::runtime_error(
"Channel destroyed.");
3696 else if (m_connectionState == CONNECTED)
3699 return Transport::shared_pointer();
3702 virtual Transport::shared_pointer getTransport()
OVERRIDE FINAL 3704 Lock guard(m_channelMutex);
3712 void setConnectionState(ConnectionState connectionState)
3714 Lock guard(m_channelMutex);
3715 if (m_connectionState != connectionState)
3717 m_connectionState = connectionState;
3726 channelStateChangeQueue.push(connectionState);
3732 std::queue<ConnectionState> channelStateChangeQueue;
3734 void reportChannelStateChange()
3741 shared_pointer
self(m_external_this.lock());
3743 self = internal_from_this();
3747 std::vector<ResponseRequest::weak_pointer> ops;
3748 ConnectionState connectionState;
3750 Lock guard(m_channelMutex);
3751 if (channelStateChangeQueue.empty())
3753 connectionState = channelStateChangeQueue.front();
3754 channelStateChangeQueue.pop();
3756 if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
3757 Lock guard(m_responseRequestsMutex);
3758 ops.reserve(m_responseRequests.size());
3759 for(IOIDResponseRequestMap::const_iterator it = m_responseRequests.begin(),
3760 end = m_responseRequests.end();
3763 ops.push_back(it->second);
3768 EXCEPTION_GUARD3(m_requester, req, req->channelStateChange(
self, connectionState));
3770 if(connectionState==Channel::DISCONNECTED || connectionState==Channel::DESTROYED) {
3771 for(
size_t i=0, N=ops.size();
i<N;
i++) {
3772 ResponseRequest::shared_pointer R(ops[
i].
lock());
3774 ChannelBaseRequester::shared_pointer req(R->getRequester());
3776 EXCEPTION_GUARD(req->channelDisconnect(connectionState==Channel::DESTROYED););
3786 m_channelMutex.lock();
3787 bool issueCreateMessage = m_issueCreateMessage;
3788 m_channelMutex.unlock();
3790 if (issueCreateMessage)
3797 buffer->
putInt(m_channelID);
3798 SerializeHelper::serializeString(m_name, buffer, control);
3801 control->
flush(
true);
3807 m_channelMutex.lock();
3809 m_channelMutex.unlock();
3812 buffer->
putInt(m_channelID);
3815 control->
flush(
true);
3824 void disconnectPendingIO(
bool destroy)
3828 Lock guard(m_responseRequestsMutex);
3830 m_needSubscriptionUpdate =
true;
3835 std::vector<ResponseRequest::weak_pointer> rrs(m_responseRequests.size());
3836 for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
3837 iter != m_responseRequests.end();
3840 rrs[count++] = iter->second;
3843 ResponseRequest::shared_pointer ptr;
3844 for (
size_t i = 0;
i< count;
i++)
3846 if((ptr = rrs[
i].
lock()))
3857 void resubscribeSubscriptions();
3863 void updateSubscriptions()
3865 Lock guard(m_responseRequestsMutex);
3867 if (m_needSubscriptionUpdate)
3868 m_needSubscriptionUpdate =
false;
3873 for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
3874 iter != m_responseRequests.end();
3877 ResponseRequest::shared_pointer ptr = iter->second.lock();
3887 virtual void getField(GetFieldRequester::shared_pointer
const & requester,std::string
const & subField)
OVERRIDE FINAL;
3889 virtual ChannelProcess::shared_pointer createChannelProcess(
3890 ChannelProcessRequester::shared_pointer
const & requester,
3891 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3893 return BaseRequestImpl::build<ChannelProcessRequestImpl>(external_from_this(),
requester, pvRequest);
3896 virtual ChannelGet::shared_pointer createChannelGet(
3897 ChannelGetRequester::shared_pointer
const & requester,
3898 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3900 return BaseRequestImpl::build<ChannelGetImpl>(external_from_this(),
requester, pvRequest);
3903 virtual ChannelPut::shared_pointer createChannelPut(
3904 ChannelPutRequester::shared_pointer
const & requester,
3905 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3907 return BaseRequestImpl::build<ChannelPutImpl>(external_from_this(),
requester, pvRequest);
3910 virtual ChannelPutGet::shared_pointer createChannelPutGet(
3911 ChannelPutGetRequester::shared_pointer
const & requester,
3912 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3914 return BaseRequestImpl::build<ChannelPutGetImpl>(external_from_this(),
requester, pvRequest);
3917 virtual ChannelRPC::shared_pointer createChannelRPC(
3918 ChannelRPCRequester::shared_pointer
const & requester,
3919 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3921 return BaseRequestImpl::build<ChannelRPCImpl>(external_from_this(),
requester, pvRequest);
3924 virtual Monitor::shared_pointer createMonitor(
3925 MonitorRequester::shared_pointer
const & requester,
3926 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3928 return BaseRequestImpl::build<ChannelMonitorImpl>(external_from_this(),
requester, pvRequest);
3931 virtual ChannelArray::shared_pointer createChannelArray(
3932 ChannelArrayRequester::shared_pointer
const & requester,
3933 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
OVERRIDE FINAL
3935 return BaseRequestImpl::build<ChannelArrayImpl>(external_from_this(),
requester, pvRequest);
3938 virtual void printInfo(std::ostream& out)
OVERRIDE FINAL {
3941 out <<
"CHANNEL : " << m_name << std::endl;
3942 out <<
"STATE : " << ConnectionStateNames[m_connectionState] << std::endl;
3943 if (m_connectionState == CONNECTED)
3945 out <<
"ADDRESS : " << getRemoteAddress() << std::endl;
3955 static size_t num_instances;
3957 InternalClientContextImpl(
const Configuration::shared_pointer& conf) :
3958 m_addressList(
""), m_autoAddressList(
true), m_connectionTimeout(30.0f), m_beaconPeriod(15.0f),
3960 m_lastCID(0x10203040),
3961 m_lastIOID(0x80706050),
3962 m_version(
"pvAccess Client",
"cpp",
3963 EPICS_PVA_MAJOR_VERSION,
3964 EPICS_PVA_MINOR_VERSION,
3965 EPICS_PVA_MAINTENANCE_VERSION,
3966 EPICS_PVA_DEVELOPMENT_FLAG),
3967 m_contextState(CONTEXT_NOT_INITIALIZED),
3968 m_configuration(conf)
3970 REFTRACE_INCREMENT(num_instances);
3972 if(!m_configuration) m_configuration = ConfigurationFactory::getConfiguration(
"pvAccess-client");
3973 m_flushTransports.reserve(64);
3974 loadConfiguration();
3977 virtual Configuration::const_shared_pointer getConfiguration()
OVERRIDE FINAL {
3978 return m_configuration;
3992 return &m_transportRegistry;
3995 virtual Transport::shared_pointer getSearchTransport()
OVERRIDE FINAL 3997 return m_searchTransport;
4003 if (m_contextState == CONTEXT_DESTROYED)
4004 throw std::runtime_error(
"Context destroyed!");
4005 else if (m_contextState == CONTEXT_INITIALIZED)
4006 throw std::runtime_error(
"Context already initialized.");
4008 internalInitialize();
4010 m_contextState = CONTEXT_INITIALIZED;
4016 out <<
"CLASS : ::epics::pvAccess::ClientContextImpl" << std::endl;
4017 out <<
"VERSION : " << m_version.getVersionString() << std::endl;
4018 out <<
"ADDR_LIST : " << m_addressList << std::endl;
4019 out <<
"AUTO_ADDR_LIST : " << (m_autoAddressList ?
"true" :
"false") << std::endl;
4020 out <<
"CONNECTION_TIMEOUT : " << m_connectionTimeout << std::endl;
4021 out <<
"BEACON_PERIOD : " << m_beaconPeriod << std::endl;
4022 out <<
"BROADCAST_PORT : " << m_broadcastPort << std::endl;;
4023 out <<
"RCV_BUFFER_SIZE : " << m_receiveBufferSize << std::endl;
4025 switch (m_contextState)
4027 case CONTEXT_NOT_INITIALIZED:
4028 out <<
"CONTEXT_NOT_INITIALIZED" << std::endl;
4030 case CONTEXT_INITIALIZED:
4031 out <<
"CONTEXT_INITIALIZED" << std::endl;
4033 case CONTEXT_DESTROYED:
4034 out <<
"CONTEXT_DESTROYED" << std::endl;
4037 out <<
"UNKNOWN" << std::endl;
4044 Lock guard(m_contextMutex);
4046 if (m_contextState == CONTEXT_DESTROYED)
4050 m_contextState = CONTEXT_DESTROYED;
4059 m_channelSearchManager->cancel();
4062 destroyAllChannels();
4065 for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin();
4066 iter != m_udpTransports.end(); iter++)
4068 m_udpTransports.clear();
4071 if (m_searchTransport)
4072 m_searchTransport->close();
4077 while ((transportCount = m_transportRegistry.size()) && tries--)
4081 Lock guard(m_beaconMapMutex);
4082 m_beaconHandlers.clear();
4086 LOG(
logLevelDebug,
"PVA client context destroyed with %u transport(s) active.", (
unsigned)transportCount);
4089 virtual ~InternalClientContextImpl()
4091 REFTRACE_DECREMENT(num_instances);
4094 const weak_pointer m_external_this, m_internal_this;
4095 shared_pointer internal_from_this()
const {
4096 return shared_pointer(m_internal_this);
4098 shared_pointer external_from_this()
const {
4099 return shared_pointer(m_external_this);
4103 void loadConfiguration() {
4110 m_addressList = m_configuration->getPropertyAsString(
"EPICS_PVA_ADDR_LIST", m_addressList);
4111 m_autoAddressList = m_configuration->getPropertyAsBoolean(
"EPICS_PVA_AUTO_ADDR_LIST", m_autoAddressList);
4112 m_connectionTimeout = m_configuration->getPropertyAsFloat(
"EPICS_PVA_CONN_TMO", m_connectionTimeout);
4113 m_beaconPeriod = m_configuration->getPropertyAsFloat(
"EPICS_PVA_BEACON_PERIOD", m_beaconPeriod);
4114 m_broadcastPort = m_configuration->getPropertyAsInteger(
"EPICS_PVA_BROADCAST_PORT", m_broadcastPort);
4115 m_receiveBufferSize = m_configuration->getPropertyAsInteger(
"EPICS_PVA_MAX_ARRAY_BYTES", m_receiveBufferSize);
4118 void internalInitialize() {
4122 InternalClientContextImpl::shared_pointer thisPointer(internal_from_this());
4124 m_connector.reset(
new BlockingTCPConnector(thisPointer, m_receiveBufferSize, m_connectionTimeout));
4127 m_responseHandler.reset(
new ClientResponseHandler(thisPointer));
4140 throw std::logic_error(
"Failed to create a socket to introspect network interfaces.");
4146 LOG(
logLevelError,
"Failed to introspect interfaces or no network interfaces available.");
4151 m_broadcastPort, m_autoAddressList, m_addressList, std::string());
4157 m_channelSearchManager->activate();
4162 void destroyAllChannels() {
4163 Lock guard(m_cidMapMutex);
4166 std::vector<ClientChannelImpl::weak_pointer> channels(m_channelsByCID.size());
4167 for (CIDChannelMap::iterator iter = m_channelsByCID.begin();
4168 iter != m_channelsByCID.end();
4171 channels[count++] = iter->second;
4177 ClientChannelImpl::shared_pointer ptr;
4178 for (
int i = 0;
i < count;
i++)
4180 ptr = channels[
i].lock();
4191 void checkChannelName(std::string
const & name)
OVERRIDE FINAL {
4193 throw std::runtime_error(
"0 or empty channel name");
4195 throw std::runtime_error(
"name too long");
4204 if (m_contextState == CONTEXT_DESTROYED)
4205 throw std::runtime_error(
"Context destroyed.");
4206 else if (m_contextState == CONTEXT_NOT_INITIALIZED)
4214 void registerChannel(ClientChannelImpl::shared_pointer
const & channel)
OVERRIDE FINAL 4216 Lock guard(m_cidMapMutex);
4217 m_channelsByCID[channel->getChannelID()] = ClientChannelImpl::weak_pointer(channel);
4224 void unregisterChannel(ClientChannelImpl::shared_pointer
const & channel)
OVERRIDE FINAL 4226 Lock guard(m_cidMapMutex);
4227 m_channelsByCID.erase(channel->getChannelID());
4237 Lock guard(m_cidMapMutex);
4238 CIDChannelMap::iterator it = m_channelsByCID.find(channelID);
4248 Lock guard(m_cidMapMutex);
4251 while (m_channelsByCID.find(++m_lastCID) != m_channelsByCID.end()) ;
4253 m_channelsByCID[m_lastCID].reset();
4260 void freeCID(
int cid)
4262 Lock guard(m_cidMapMutex);
4263 m_channelsByCID.erase(cid);
4274 Lock guard(m_ioidMapMutex);
4275 IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
4276 if (it == m_pendingResponseRequests.end())
return ResponseRequest::shared_pointer();
4277 return it->second.lock();
4287 Lock guard(m_ioidMapMutex);
4289 m_pendingResponseRequests[ioid] = ResponseRequest::weak_pointer(request);
4300 if (ioid ==
INVALID_IOID)
return ResponseRequest::shared_pointer();
4302 Lock guard(m_ioidMapMutex);
4303 IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid);
4304 if (it == m_pendingResponseRequests.end())
4305 return ResponseRequest::shared_pointer();
4307 ResponseRequest::shared_pointer retVal = it->second.lock();
4308 m_pendingResponseRequests.erase(it);
4318 Lock guard(m_ioidMapMutex);
4322 while (m_pendingResponseRequests.find(++m_lastIOID) != m_pendingResponseRequests.end()) ;
4326 m_pendingResponseRequests[m_lastIOID].reset();
4335 if (m_channelSearchManager)
4336 m_channelSearchManager->newServerDetected();
4347 Lock guard(m_beaconMapMutex);
4348 AddressBeaconHandlerMap::iterator it = m_beaconHandlers.find(*responseFrom);
4349 BeaconHandler::shared_pointer handler;
4350 if (it == m_beaconHandlers.end())
4353 handler.reset(
new BeaconHandler(internal_from_this(), responseFrom));
4354 m_beaconHandlers[*responseFrom] = handler;
4357 handler = it->second;
4371 Transport::shared_pointer t = m_connector->connect(client, m_responseHandler, *serverAddress, minorRevision, priority);
4374 catch (std::exception& e)
4377 return Transport::shared_pointer();
4386 ClientChannelImpl::shared_pointer createChannelInternal(std::string
const & name, ChannelRequester::shared_pointer
const &
requester,
short priority,
4390 checkChannelName(name);
4393 throw std::runtime_error(
"0 requester");
4395 if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX)
4396 throw std::range_error(
"priority out of bounds");
4405 return InternalChannelImpl::create(internal_from_this(), cid, name, requester, priority, addresses);
4406 }
catch(std::exception& e) {
4408 return ClientChannelImpl::shared_pointer();
4416 ChannelSearchManager::shared_pointer getChannelSearchManager()
OVERRIDE FINAL {
4417 return m_channelSearchManager;
4424 string m_addressList;
4429 bool m_autoAddressList;
4437 float m_connectionTimeout;
4442 float m_beaconPeriod;
4447 int32 m_broadcastPort;
4452 int m_receiveBufferSize;
4457 Timer::shared_pointer m_timer;
4467 BlockingUDPTransport::shared_pointer m_searchTransport;
4472 epics::auto_ptr<BlockingTCPConnector> m_connector;
4483 ClientResponseHandler::shared_pointer m_responseHandler;
4489 typedef std::map<pvAccessID, ClientChannelImpl::weak_pointer> CIDChannelMap;
4490 CIDChannelMap m_channelsByCID;
4495 Mutex m_cidMapMutex;
4505 IOIDResponseRequestMap m_pendingResponseRequests;
4510 Mutex m_ioidMapMutex;
4521 ChannelSearchManager::shared_pointer m_channelSearchManager;
4527 typedef std::map<osiSockAddr, BeaconHandler::shared_pointer, comp_osiSock_lt> AddressBeaconHandlerMap;
4528 AddressBeaconHandlerMap m_beaconHandlers;
4533 Mutex m_beaconMapMutex;
4550 Mutex m_contextMutex;
4552 friend class ChannelProviderImpl;
4554 Configuration::shared_pointer m_configuration;
4559 size_t InternalClientContextImpl::num_instances;
4560 size_t InternalClientContextImpl::InternalChannelImpl::num_instances;
4561 size_t InternalClientContextImpl::InternalChannelImpl::num_active;
4563 class ChannelGetFieldRequestImpl :
4566 public std::tr1::enable_shared_from_this<ChannelGetFieldRequestImpl>
4572 const InternalClientContextImpl::InternalChannelImpl::shared_pointer m_channel;
4574 const GetFieldRequester::weak_pointer m_callback;
4584 ChannelGetFieldRequestImpl(InternalClientContextImpl::InternalChannelImpl::shared_pointer
const & channel,
4585 GetFieldRequester::shared_pointer
const &
callback,
4586 std::string
const & subField) :
4588 m_callback(callback),
4589 m_subField(subField),
4599 ChannelGetFieldRequestImpl::shared_pointer
self(shared_from_this());
4600 m_ioid = m_channel->getContext()->registerResponseRequest(
self);
4601 m_channel->registerResponseRequest(
self);
4603 Lock L(m_channel->m_channelMutex);
4604 m_channel->m_getfield.swap(
self);
4611 m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
4612 }
catch (std::runtime_error &rte) {
4618 virtual ~ChannelGetFieldRequestImpl()
4635 ChannelBaseRequester::shared_pointer getRequester()
OVERRIDE FINAL {
4636 return m_callback.lock();
4645 buffer->
putInt(m_channel->getServerChannelID());
4647 SerializeHelper::serializeString(m_subField, buffer, control);
4651 virtual Channel::shared_pointer getChannel()
4667 if (status == Channel::DESTROYED)
4675 Lock guard(m_mutex);
4682 Lock L(m_channel->m_channelMutex);
4683 if(m_channel->m_getfield.get()==
this)
4684 m_channel->m_getfield.reset();
4688 m_channel->getContext()->unregisterResponseRequest(m_ioid);
4689 m_channel->unregisterResponseRequest(m_ioid);
4700 field = transport->cachedDeserialize(payloadBuffer);
4702 notify(status, field);
4711 void InternalClientContextImpl::InternalChannelImpl::getField(GetFieldRequester::shared_pointer
const &
requester,std::string
const & subField)
4713 ChannelGetFieldRequestImpl::shared_pointer
self(
new ChannelGetFieldRequestImpl(internal_from_this(), requester, subField));
4718 void InternalClientContextImpl::InternalChannelImpl::resubscribeSubscriptions()
4720 Lock guard(m_responseRequestsMutex);
4722 Transport::shared_pointer transport = getTransport();
4725 transport->enqueueSendRequest(m_getfield);
4729 for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin();
4730 iter != m_responseRequests.end();
4733 ResponseRequest::shared_pointer ptr = iter->second.lock();
4745 namespace pvAccess {
4749 registerRefCounter(
"InternalClientContextImpl", &InternalClientContextImpl::num_instances);
4750 registerRefCounter(
"InternalChannelImpl", &InternalClientContextImpl::InternalChannelImpl::num_instances);
4751 registerRefCounter(
"InternalChannelImpl (Active)", &InternalClientContextImpl::InternalChannelImpl::num_active);
4754 InternalClientContextImpl::shared_pointer
internal(
new InternalClientContextImpl(conf)),
4756 const_cast<InternalClientContextImpl::weak_pointer&
>(
internal->m_external_this) = external;
4757 const_cast<InternalClientContextImpl::weak_pointer&
>(
internal->m_internal_this) =
internal;
4758 internal->initialize();
void deserialize(ByteBuffer *buffer, DeserializableControl *flusher)
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
virtual void deserialize(ByteBuffer *buffer, DeserializableControl *flusher)
PVScalar is the base class for each scalar field.
#define assert(exp)
Declare that a condition should be true.
std::tr1::shared_ptr< detail::SharedPut > put
EPICS_ALWAYS_INLINE int8 getByte()
An EPICS-specific replacement for ANSI C's assert.
EPICS_ALWAYS_INLINE void putInt(int32 value)
const char * getBuffer() const
Class that must be implemented by code that makes Timer requests.
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
#define SET_LOG_LEVEL(level)
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
std::size_t getLimit() const
pvd::StructureConstPtr type
#define STATIC_SEARCH_BASE_DELAY_SEC
#define STATIC_SEARCH_MAX_MULTIPLIER
epicsShareExtern const std::string PVACCESS_DEBUG
TODO only here because of the Lockable.
std::tr1::shared_ptr< const Structure > StructureConstPtr
A lock for multithreading.
An element for a monitorQueue.
#define EXCEPTION_GUARD3(WEAK, PTR, code)
std::size_t getPosition() const
const std::string & getMessage() const
void copy(PVValueArray< T > &pvFrom, size_t fromOffset, size_t fromStride, PVValueArray< T > &pvTo, size_t toOffset, size_t toStride, size_t count)
Copy a subarray from one scalar array to another.
virtual void flush(bool lastMessageCompleted)=0
const epics::pvData::int32 PVA_SERVER_PORT
std::vector< BlockingUDPTransport::shared_pointer > BlockingUDPTransportVector
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
Support for delayed or periodic callback execution.
EPICS_ALWAYS_INLINE int32 getInt()
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
#define SEND_MESSAGE(WEAK, PTR, MSG, MTYPE)
#define LOG(level, format,...)
void setPosition(std::size_t pos)
This class implements introspection object for a structure.
#define POINTER_DEFINITIONS(clazz)
std::vector< osiSockAddr > InetAddrVector
ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer &conf)
void authNZInitialize(const std::vector< std::string > &offeredSecurityPlugins)
void encodeAsIPv6Address(ByteBuffer *buffer, const osiSockAddr *address)
std::vector< ifaceNode > IfaceNodeVector
const std::string & getStackDump() const
const ChannelProcessRequester::weak_pointer requester
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
const epics::pvData::int32 PVA_BROADCAST_PORT
HexDump & limit(size_t n=(size_t)-1)
safety limit on max bytes printed
EPICS_ALWAYS_INLINE void putByte(int8 value)
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
std::tr1::shared_ptr< PVScalar > PVScalarPtr
std::vector< Transport::shared_pointer > transportVector_t
EPICS_ALWAYS_INLINE int16 getShort()
Data interface for a structure,.
std::size_t getRemaining() const
std::tr1::shared_ptr< const Field > FieldConstPtr
#define EXCEPTION_GUARD(code)
virtual std::string getChannelName()=0
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
void initializeUDPTransports(bool serverFlag, BlockingUDPTransportVector &udpTransports, const IfaceNodeVector &ifaceList, const ResponseHandler::shared_pointer &responseHandler, BlockingUDPTransport::shared_pointer &sendTransport, int32 &listenPort, bool autoAddressList, const std::string &addressList, const std::string &ignoreAddressList)
void registerRefCounter(const char *name, const size_t *counter)
bool decodeAsIPv6Address(ByteBuffer *buffer, osiSockAddr *address)
Expose statistics related to network transport.
std::tr1::shared_ptr< PVStructure > PVStructurePtr
void getSocketAddressList(InetAddrVector &ret, const std::string &list, int defaultPort, const InetAddrVector *appendList)
const epics::pvData::int32 INVALID_IOID
std::tr1::shared_ptr< PVField > PVFieldPtr
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
virtual void ensureBuffer(std::size_t size)=0
void getVersion(epics::pvData::PVDataVersion *ptr)
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
std::tr1::shared_ptr< BitSet > BitSetPtr
detail::pick_type< int8_t, signed char, detail::pick_type< uint8_t, char, unsigned char >::type >::type boolean
Instance declaring destroy method.
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
const epics::pvData::uint32 MAX_CHANNEL_NAME_LENGTH
virtual void channelDestroyedOnServer()=0
PVArray is the base class for all array types.
const epics::pvData::int32 MAX_TCP_RECV
#define EPICS_NOT_COPYABLE(CLASS)
Disable implicit copyable.
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0)=0
int epicsStdCall sockAddrAreIdentical(const osiSockAddr *plhs, const osiSockAddr *prhs)
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
int discoverInterfaces(IfaceNodeVector &list, SOCKET socket, const osiSockAddr *pMatchAddr)
EPICS_ALWAYS_INLINE void putShort(int16 value)
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Methods for manipulating timeStamp.
#define IS_LOGGABLE(level)