This is Unofficial EPICS BASE Doxygen Site
responseHandlers.cpp
Go to the documentation of this file.
1 
7 #include <sstream>
8 #include <time.h>
9 #include <stdlib.h>
10 
11 #ifdef _WIN32
12 #define GETPID() GetCurrentProcessId()
13 #endif
14 
15 #ifdef vxWorks
16 #include <taskLib.h>
17 #define GETPID() taskIdSelf()
18 #endif
19 
20 #ifndef GETPID
21 #define GETPID() getpid()
22 #endif
23 
24 #include <osiSock.h>
25 #include <osiProcess.h>
26 #include <epicsAssert.h>
27 #include <epicsAtomic.h>
28 
29 #include <pv/byteBuffer.h>
30 #include <pv/timer.h>
31 
32 #define epicsExportSharedSymbols
33 #include <pv/responseHandlers.h>
34 #include <pv/remote.h>
35 #include <pv/hexDump.h>
36 #include <pv/serializationHelper.h>
37 #include <pv/logger.h>
38 #include <pv/pvAccessMB.h>
39 #include <pv/codec.h>
40 #include <pv/rpcServer.h>
41 #include <pv/securityImpl.h>
42 
43 using std::string;
44 using std::ostringstream;
45 using std::hex;
46 
49 
50 using namespace epics::pvData;
51 
52 namespace epics {
53 namespace pvAccess {
54 
55 // TODO this is a copy from clientContextImpl.cpp
56 static PVDataCreatePtr pvDataCreate = getPVDataCreate();
57 
58 
59 static BitSet::shared_pointer createBitSetFor(
60  PVStructure::shared_pointer const & pvStructure,
61  BitSet::shared_pointer const & existingBitSet)
62 {
63  assert(pvStructure);
64  int pvStructureSize = pvStructure->getNumberFields();
65  if (existingBitSet.get() && static_cast<int32>(existingBitSet->size()) >= pvStructureSize)
66  {
67  // clear existing BitSet
68  // also necessary if larger BitSet is reused
69  existingBitSet->clear();
70  return existingBitSet;
71  }
72  else
73  return BitSet::shared_pointer(new BitSet(pvStructureSize));
74 }
75 
76 static PVField::shared_pointer reuseOrCreatePVField(
77  Field::const_shared_pointer const & field,
78  PVField::shared_pointer const & existingPVField)
79 {
80  if (existingPVField.get() && *field == *existingPVField->getField())
81  return existingPVField;
82  else
83  return pvDataCreate->createPVField(field);
84 }
85 
86 
87 
88 void ServerBadResponse::handleResponse(osiSockAddr* responseFrom,
89  Transport::shared_pointer const & transport, int8 version, int8 command,
90  size_t payloadSize, ByteBuffer* payloadBuffer)
91 {
92  AbstractServerResponseHandler::handleResponse(responseFrom,
93  transport, version, command, payloadSize, payloadBuffer);
94 
95  char ipAddrStr[24];
96  ipAddrToDottedIP(&responseFrom->ia, ipAddrStr, sizeof(ipAddrStr));
97 
99  "Undecipherable message (bad response type %d) from %s.",
100  command, ipAddrStr);
101 
102 }
103 
104 ServerResponseHandler::ServerResponseHandler(ServerContextImpl::shared_pointer const & context)
105  :ResponseHandler(context.get(), "ServerResponseHandler")
106  ,handle_bad(context)
107  ,handle_beacon(context, "Beacon")
108  ,handle_validation(context)
109  ,handle_echo(context)
110  ,handle_search(context)
111  ,handle_authnz(context.get())
112  ,handle_create(context)
113  ,handle_destroy(context)
114  ,handle_get(context)
115  ,handle_put(context)
116  ,handle_putget(context)
117  ,handle_monitor(context)
118  ,handle_array(context)
119  ,handle_close(context)
120  ,handle_process(context)
121  ,handle_getfield(context)
122  ,handle_rpc(context)
123  ,handle_cancel(context)
124  ,m_handlerTable(CMD_CANCEL_REQUEST+1, &handle_bad)
125 {
126 
127  m_handlerTable[CMD_BEACON] = &handle_beacon; /* 0 */
128  m_handlerTable[CMD_CONNECTION_VALIDATION] = &handle_validation; /* 1 */
129  m_handlerTable[CMD_ECHO] = &handle_echo; /* 2 */
130  m_handlerTable[CMD_SEARCH] = &handle_search; /* 3 */
131  m_handlerTable[CMD_SEARCH_RESPONSE] = &handle_bad;
132  m_handlerTable[CMD_AUTHNZ] = &handle_authnz; /* 5 */
133  m_handlerTable[CMD_ACL_CHANGE] = &handle_bad; /* 6 - access right change */
134  m_handlerTable[CMD_CREATE_CHANNEL] = &handle_create; /* 7 */
135  m_handlerTable[CMD_DESTROY_CHANNEL] = &handle_destroy; /* 8 */
136  m_handlerTable[CMD_CONNECTION_VALIDATED] = &handle_bad; /* 9 */
137 
138  m_handlerTable[CMD_GET] = &handle_get; /* 10 - get response */
139  m_handlerTable[CMD_PUT] = &handle_put; /* 11 - put response */
140  m_handlerTable[CMD_PUT_GET] = &handle_putget; /* 12 - put-get response */
141  m_handlerTable[CMD_MONITOR] = &handle_monitor; /* 13 - monitor response */
142  m_handlerTable[CMD_ARRAY] = &handle_array; /* 14 - array response */
143  m_handlerTable[CMD_DESTROY_REQUEST] = &handle_close; /* 15 - destroy request */
144  m_handlerTable[CMD_PROCESS] = &handle_process; /* 16 - process response */
145  m_handlerTable[CMD_GET_FIELD] = &handle_getfield; /* 17 - get field response */
146  m_handlerTable[CMD_MESSAGE] = &handle_bad; /* 18 - message to Requester */
147  m_handlerTable[CMD_MULTIPLE_DATA] = &handle_bad; /* 19 - grouped monitors */
148 
149  m_handlerTable[CMD_RPC] = &handle_rpc; /* 20 - RPC response */
150  m_handlerTable[CMD_CANCEL_REQUEST] = &handle_cancel; /* 21 - cancel request */
151 }
152 
154  Transport::shared_pointer const & transport, int8 version, int8 command,
155  size_t payloadSize, ByteBuffer* payloadBuffer)
156 {
157  if(command<0||command>=(int8)m_handlerTable.size())
158  {
160  "Invalid (or unsupported) command: %x.", (0xFF&command));
161 
163  std::cerr<<"Invalid (or unsupported) command: "<<std::hex<<(int)(0xFF&command)<<"\n"
164  <<HexDump(*payloadBuffer, payloadSize).limit(256u);
165  }
166  return;
167  }
168 
169  // delegate
170  m_handlerTable[command]->handleResponse(responseFrom, transport,
171  version, command, payloadSize, payloadBuffer);
172 }
173 
175  osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version,
176  int8 command, size_t payloadSize,
177  ByteBuffer* payloadBuffer)
178 {
180  transport, version, command, payloadSize, payloadBuffer);
181 
182  transport->ensureData(4+2+2);
183  transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt());
184  // TODO clientIntrospectionRegistryMaxSize
185  /* int clientIntrospectionRegistryMaxSize = */ payloadBuffer->getShort();
186  // TODO connectionQoS
187  /* int16 connectionQoS = */ payloadBuffer->getShort();
188 
189  // authNZ
190  std::string securityPluginName = SerializeHelper::deserializeString(payloadBuffer, transport.get());
191 
192  // optional authNZ plug-in initialization data
193  PVStructure::shared_pointer data;
194  if (payloadBuffer->getRemaining()) {
195  PVField::shared_pointer raw(SerializationHelper::deserializeFull(payloadBuffer, transport.get()));
196  if(raw && raw->getField()->getType()==structure) {
198  } else {
199  // was originally allowed, but never used
200  }
201  }
202 
203  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
204  //TODO: simplify byzantine class heirarchy...
205  assert(casTransport);
206 
207  try {
208  casTransport->authNZInitialize(securityPluginName, data);
209  }catch(std::exception& e){
211  {
212  LOG(logLevelDebug, "Security plug-in '%s' failed to create a session for PVA client: %s.", securityPluginName.c_str(), casTransport->getRemoteName().c_str());
213  }
214  casTransport->verified(pvData::Status::error(e.what()));
215  throw;
216  }
217 }
218 
219 
220 
221 
222 
223 
225  Transport::shared_pointer const & transport, int8 version, int8 command,
226  size_t payloadSize, ByteBuffer* payloadBuffer)
227 {
229  transport, version, command, payloadSize, payloadBuffer);
230 
231  // send back
232  TransportSender::shared_pointer echoReply(new EchoTransportSender(responseFrom, payloadSize, *payloadBuffer));
233  transport->enqueueSendRequest(echoReply);
234 }
235 
236 /****************************************************************************************/
237 
238 const std::string ServerSearchHandler::SUPPORTED_PROTOCOL = "tcp";
239 
241  AbstractServerResponseHandler(context, "Search request")
242 {
243  // initialize random seed with some random value
244  srand ( time(NULL) );
245 }
246 
248  Transport::shared_pointer const & transport, int8 version, int8 command,
249  size_t payloadSize, ByteBuffer* payloadBuffer)
250 {
252  transport, version, command, payloadSize, payloadBuffer);
253 
254  transport->ensureData(4+1+3+16+2);
255 
256  size_t startPosition = payloadBuffer->getPosition();
257 
258  const int32 searchSequenceId = payloadBuffer->getInt();
259  const int8 qosCode = payloadBuffer->getByte();
260 
261  // reserved part
262  payloadBuffer->getByte();
263  payloadBuffer->getShort();
264 
265  osiSockAddr responseAddress;
266  memset(&responseAddress, 0, sizeof(responseAddress));
267  responseAddress.ia.sin_family = AF_INET;
268 
269  // 128-bit IPv6 address
270  if (!decodeAsIPv6Address(payloadBuffer, &responseAddress)) return;
271 
272  // accept given address if explicitly specified by sender
273  if (responseAddress.ia.sin_addr.s_addr == INADDR_ANY)
274  responseAddress.ia.sin_addr = responseFrom->ia.sin_addr;
275 
276  // NOTE: htons might be a macro (e.g. vxWorks)
277  int16 port = payloadBuffer->getShort();
278  responseAddress.ia.sin_port = htons(port);
279 
280  size_t protocolsCount = SerializeHelper::readSize(payloadBuffer, transport.get());
281  bool allowed = (protocolsCount == 0);
282  for (size_t i = 0; i < protocolsCount; i++)
283  {
284  string protocol = SerializeHelper::deserializeString(payloadBuffer, transport.get());
285  if (SUPPORTED_PROTOCOL == protocol)
286  allowed = true;
287  }
288 
289  // NOTE: we do not stop reading the buffer
290 
291  transport->ensureData(2);
292  const int32 count = payloadBuffer->getShort() & 0xFFFF;
293 
294  // TODO DoS attack?
295  // You bet! With a reply address encoded in the request we don't even need a forged UDP header.
296  const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0;
297 
298  //
299  // locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP
300  //
301  if ((qosCode & 0x80) == 0x80)
302  {
303  BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast<BlockingUDPTransport>(transport);
304  if (bt && bt->hasLocalMulticastAddress())
305  {
306  // RECEIVE_BUFFER_PRE_RESERVE allows to pre-fix message
307  size_t newStartPos = (startPosition-PVA_MESSAGE_HEADER_SIZE)-PVA_MESSAGE_HEADER_SIZE-16;
308  payloadBuffer->setPosition(newStartPos);
309 
310  // copy part of a header, and add: command, payloadSize, NIF address
311  payloadBuffer->put(payloadBuffer->getBuffer(), startPosition-PVA_MESSAGE_HEADER_SIZE, PVA_MESSAGE_HEADER_SIZE-5);
312  payloadBuffer->putByte(CMD_ORIGIN_TAG);
313  payloadBuffer->putInt(16);
314  // encode this socket bind address
315  encodeAsIPv6Address(payloadBuffer, bt->getBindAddress());
316 
317  // clear unicast flag
318  payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
319 
320  // update response address
321  payloadBuffer->setPosition(startPosition+8);
322  encodeAsIPv6Address(payloadBuffer, &responseAddress);
323 
324  // set to end of a message
325  payloadBuffer->setPosition(payloadBuffer->getLimit());
326 
327  bt->send(payloadBuffer->getBuffer()+newStartPos, payloadBuffer->getPosition()-newStartPos,
328  bt->getLocalMulticastAddress());
329 
330  return;
331  }
332  }
333 
334  PeerInfo::shared_pointer info;
335  if(allowed) {
336  info.reset(new PeerInfo);
337  info->transport = "pva";
338  info->peer = inetAddressToString(*responseFrom);
339  info->transportVersion = version;
340  }
341 
342  if (count > 0)
343  {
344  // regular name search
345  for (int32 i = 0; i < count; i++)
346  {
347  transport->ensureData(4);
348  const int32 cid = payloadBuffer->getInt();
349  const string name = SerializeHelper::deserializeString(payloadBuffer, transport.get());
350  // no name check here...
351 
352  if (allowed)
353  {
354  const std::vector<ChannelProvider::shared_pointer>& _providers = _context->getChannelProviders();
355 
356  int providerCount = _providers.size();
357  std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(new ServerChannelFindRequesterImpl(_context, info, providerCount));
358  tp->set(name, searchSequenceId, cid, responseAddress, responseRequired, false);
359 
360  for (int i = 0; i < providerCount; i++)
361  _providers[i]->channelFind(name, tp);
362  }
363  }
364  }
365  else
366  {
367  // server discovery ping by pvlist
368  if (allowed)
369  {
370  // ~random hold-off to reduce impact of all servers responding.
371  // in [0.05, 0.15]
372  double delay = double(rand())/RAND_MAX; // [0, 1]
373  delay = delay*0.1 + 0.05;
374 
375  std::tr1::shared_ptr<ServerChannelFindRequesterImpl> tp(new ServerChannelFindRequesterImpl(_context, info, 1));
376  tp->set("", searchSequenceId, 0, responseAddress, true, true);
377 
378  TimerCallback::shared_pointer tc = tp;
379  _context->getTimer()->scheduleAfterDelay(tc, delay);
380  }
381  }
382 }
383 
385  int32 expectedResponseCount) :
386  _guid(context->getGUID()),
387  _sendTo(),
388  _wasFound(false),
389  _context(context),
390  _peer(peer),
391  _expectedResponseCount(expectedResponseCount),
392  _responseCount(0),
393  _serverSearch(false)
394 {}
395 
397 {
398  Lock guard(_mutex);
399  _wasFound = false;
400  _responseCount = 0;
401  _serverSearch = false;
402 }
403 
405 {
406  channelFindResult(Status::Ok, ChannelFind::shared_pointer(), false);
407 }
408 
410 {
411  // noop
412 }
413 
414 ServerChannelFindRequesterImpl* ServerChannelFindRequesterImpl::set(std::string name, int32 searchSequenceId, int32 cid, osiSockAddr const & sendTo,
415  bool responseRequired, bool serverSearch)
416 {
417  Lock guard(_mutex);
418  _name = name;
419  _searchSequenceId = searchSequenceId;
420  _cid = cid;
421  _sendTo = sendTo;
422  _responseRequired = responseRequired;
423  _serverSearch = serverSearch;
424  return this;
425 }
426 
427 void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/, ChannelFind::shared_pointer const & channelFind, bool wasFound)
428 {
429  // TODO status
430  Lock guard(_mutex);
431 
432  _responseCount++;
433  if (_responseCount > _expectedResponseCount)
434  {
435  if ((_responseCount+1) == _expectedResponseCount)
436  {
437  LOG(logLevelDebug,"[ServerChannelFindRequesterImpl::channelFindResult] More responses received than expected fpr channel '%s'!", _name.c_str());
438  }
439  return;
440  }
441 
442  if (wasFound && _wasFound)
443  {
444  LOG(logLevelDebug,"[ServerChannelFindRequesterImpl::channelFindResult] Channel '%s' is hosted by different channel providers!", _name.c_str());
445  return;
446  }
447 
448  if (wasFound || (_responseRequired && (_responseCount == _expectedResponseCount)))
449  {
450  if (wasFound && _expectedResponseCount > 1)
451  {
452  Lock L(_context->_mutex);
453  _context->s_channelNameToProvider[_name] = channelFind->getChannelProvider();
454  }
455  _wasFound = wasFound;
456 
457  BlockingUDPTransport::shared_pointer bt = _context->getBroadcastTransport();
458  if (bt)
459  {
460  TransportSender::shared_pointer thisSender = shared_from_this();
461  bt->enqueueSendRequest(thisSender);
462  }
463  }
464 }
465 
466 std::tr1::shared_ptr<const PeerInfo> ServerChannelFindRequesterImpl::getPeerInfo()
467 {
468  return _peer;
469 }
470 
472 {
473  control->startMessage(CMD_SEARCH_RESPONSE, 12+4+16+2);
474 
475  Lock guard(_mutex);
476  buffer->put(_guid.value, 0, sizeof(_guid.value));
477  buffer->putInt(_searchSequenceId);
478 
479  // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0
480  encodeAsIPv6Address(buffer, _context->getServerInetAddress());
481  buffer->putShort((int16)_context->getServerPort());
482 
483  SerializeHelper::serializeString(ServerSearchHandler::SUPPORTED_PROTOCOL, buffer, control);
484 
485  control->ensureBuffer(1);
486  buffer->putByte(_wasFound ? (int8)1 : (int8)0);
487 
488  if (!_serverSearch)
489  {
490  // TODO for now we do not gather search responses
491  buffer->putShort((int16)1);
492  buffer->putInt(_cid);
493  }
494  else
495  {
496  buffer->putShort((int16)0);
497  }
498 
499  control->setRecipient(_sendTo);
500 }
501 
502 /****************************************************************************************/
503 
505  public ChannelListRequester
506 {
507 public:
509 
512 
513  virtual void channelListResult(
514  const epics::pvData::Status& status,
515  ChannelFind::shared_pointer const & channelFind,
516  PVStringArray::const_svector const & channelNames,
517  bool hasDynamic)
518  {
519  epics::pvData::Lock lock(_waitMutex);
520 
521  this->status = status;
522  this->channelNames = channelNames;
523 
524  _waitEvent.signal();
525  }
526 
527  bool waitForCompletion(int32 timeoutSec) {
528  return _waitEvent.wait(timeoutSec);
529  }
530 
531  void resetEvent() {
532  _waitEvent.tryWait();
533  }
534 
535 private:
536  epics::pvData::Mutex _waitMutex;
537  epics::pvData::Event _waitEvent;
538 
539 };
540 
541 }}
542 namespace {
543 using namespace epics::pvAccess;
544 
545 // TODO move out to a separate class
546 class ServerRPCService : public RPCService {
547 
548 private:
549  static int32 TIMEOUT_SEC;
550 
551  static Structure::const_shared_pointer helpStructure;
552  static Structure::const_shared_pointer channelListStructure;
553  static Structure::const_shared_pointer infoStructure;
554 
555  static const std::string helpString;
556 
557  ServerContextImpl::shared_pointer m_serverContext;
558 
559  // s1 starts with s2 check
560  static bool starts_with(const string& s1, const string& s2) {
561  return s2.size() <= s1.size() && s1.compare(0, s2.size(), s2) == 0;
562  }
563 
564 public:
565 
566  ServerRPCService(ServerContextImpl::shared_pointer const & context) :
567  m_serverContext(context)
568  {
569  }
570 
571  virtual epics::pvData::PVStructure::shared_pointer request(
572  epics::pvData::PVStructure::shared_pointer const & arguments
573  )
574  {
575  // NTURI support
576  PVStructure::shared_pointer args(
577  (starts_with(arguments->getStructure()->getID(), "epics:nt/NTURI:1.")) ?
578  arguments->getSubField<PVStructure>("query") :
579  arguments
580  );
581 
582  // help support
583  if (args->getSubField("help"))
584  {
585  PVStructure::shared_pointer help = getPVDataCreate()->createPVStructure(helpStructure);
586  help->getSubFieldT<PVString>("value")->put(helpString);
587  return help;
588  }
589 
590  PVString::shared_pointer opField = args->getSubField<PVString>("op");
591  if (!opField)
592  throw RPCRequestException(Status::STATUSTYPE_ERROR, "unspecified 'string op' field");
593 
594  string op = opField->get();
595  if (op == "channels")
596  {
597  PVStructure::shared_pointer result =
598  getPVDataCreate()->createPVStructure(channelListStructure);
599  PVStringArray::shared_pointer allChannelNames = result->getSubFieldT<PVStringArray>("value");
600 
601  ChannelListRequesterImpl::shared_pointer listListener(new ChannelListRequesterImpl());
602  const std::vector<ChannelProvider::shared_pointer>& providers = m_serverContext->getChannelProviders();
603 
604  size_t providerCount = providers.size();
605  for (size_t i = 0; i < providerCount; i++)
606  {
607  providers[i]->channelList(listListener);
608  if (!listListener->waitForCompletion(TIMEOUT_SEC))
609  throw RPCRequestException(Status::STATUSTYPE_ERROR, "failed to fetch channel list due to timeout");
610 
611  Status& status = listListener->status;
612  if (!status.isSuccess())
613  {
614  string errorMessage = "failed to fetch channel list: " + status.getMessage();
615  if (!status.getStackDump().empty())
616  errorMessage += "\n" + status.getStackDump();
617  if (providerCount == 1)
618  throw RPCRequestException(Status::STATUSTYPE_ERROR, errorMessage);
619  else
620  {
621  LOG(logLevelDebug, "%s: %s", providers[i]->getProviderName().c_str(), errorMessage.c_str());
622  }
623  }
624 
625  // optimization
626  if (providerCount == 1)
627  {
628  allChannelNames->replace(listListener->channelNames);
629  }
630  else
631  {
632  PVStringArray::svector list(allChannelNames->reuse());
633  std::copy(listListener->channelNames.begin(), listListener->channelNames.end(),
634  back_inserter(list));
635  allChannelNames->replace(freeze(list));
636  }
637 
638  listListener->resetEvent();
639  }
640 
641  return result;
642  }
643  else if (op == "info")
644  {
645  PVStructure::shared_pointer result =
646  getPVDataCreate()->createPVStructure(infoStructure);
647 
648  // TODO cache hostname in InetAddressUtil
649  char buffer[256];
650  std::string hostName("localhost");
651  if (gethostname(buffer, sizeof(buffer)) == 0)
652  hostName = buffer;
653 
654  std::stringstream ret;
655  ret << EPICS_PVA_MAJOR_VERSION << '.' <<
656  EPICS_PVA_MINOR_VERSION << '.' <<
657  EPICS_PVA_MAINTENANCE_VERSION;
658  if (EPICS_PVA_DEVELOPMENT_FLAG)
659  ret << "-SNAPSHOT";
660 
661  result->getSubFieldT<PVString>("version")->put(ret.str());
662  result->getSubFieldT<PVString>("implLang")->put("cpp");
663  result->getSubFieldT<PVString>("host")->put(hostName);
664 
665  std::stringstream sspid;
666  sspid << GETPID();
667  result->getSubFieldT<PVString>("process")->put(sspid.str());
668 
669  char timeText[64];
670  epicsTimeToStrftime(timeText, 64, "%Y-%m-%dT%H:%M:%S.%03f", &m_serverContext->getStartTime());
671 
672  result->getSubFieldT<PVString>("startTime")->put(timeText);
673 
674 
675  return result;
676  }
677  else
678  throw RPCRequestException(Status::STATUSTYPE_ERROR, "unsupported operation '" + op + "'.");
679  }
680 };
681 
682 int32 ServerRPCService::TIMEOUT_SEC = 3;
683 Structure::const_shared_pointer ServerRPCService::helpStructure =
684  getFieldCreate()->createFieldBuilder()->
685  setId("epics:nt/NTScalar:1.0")->
686  add("value", pvString)->
687  createStructure();
688 
689 Structure::const_shared_pointer ServerRPCService::channelListStructure =
690  getFieldCreate()->createFieldBuilder()->
691  setId("epics:nt/NTScalarArray:1.0")->
692  addArray("value", pvString)->
693  createStructure();
694 
695 Structure::const_shared_pointer ServerRPCService::infoStructure =
696  getFieldCreate()->createFieldBuilder()->
697  add("process", pvString)->
698  add("startTime", pvString)->
699  add("version", pvString)->
700  add("implLang", pvString)->
701  add("host", pvString)->
702 // add("os", pvString)->
703 // add("arch", pvString)->
704 // add("CPUs", pvInt)->
705  createStructure();
706 
707 
708 const std::string ServerRPCService::helpString =
709  "pvAccess server RPC service.\n"
710  "arguments:\n"
711  "\tstring op\toperation to execute\n"
712  "\n"
713  "\toperations:\n"
714  "\t\tinfo\t\treturns some information about the server\n"
715  "\t\tchannels\treturns a list of 'static' channels the server can provide\n"
716 // "\t\t\t (no arguments)\n"
717  "\n";
718 
719 }
720 namespace epics {
721 namespace pvAccess {
722 
723 const std::string ServerCreateChannelHandler::SERVER_CHANNEL_NAME = "server";
724 
726  Transport::shared_pointer const & transport, int8 version, int8 command,
727  size_t payloadSize, ByteBuffer* payloadBuffer)
728 {
730  transport, version, command, payloadSize, payloadBuffer);
731 
732  // TODO for not only one request at the time is supported, i.e. dataCount == 1
733  transport->ensureData((sizeof(int32)+sizeof(int16))/sizeof(int8));
734  const int16 count = payloadBuffer->getShort();
735  if (count != 1)
736  {
737  THROW_BASE_EXCEPTION("only 1 supported for now");
738  }
739  const pvAccessID cid = payloadBuffer->getInt();
740 
741  string channelName = SerializeHelper::deserializeString(payloadBuffer, transport.get());
742  if (channelName.size() == 0)
743  {
744  LOG(logLevelDebug,"Zero length channel name, disconnecting client: %s", transport->getRemoteName().c_str());
745  disconnect(transport);
746  return;
747  }
748  else if (channelName.size() > MAX_CHANNEL_NAME_LENGTH)
749  {
750  LOG(logLevelDebug,"Unreasonable channel name length, disconnecting client: %s", transport->getRemoteName().c_str());
751  disconnect(transport);
752  return;
753  }
754 
755  if (channelName == SERVER_CHANNEL_NAME)
756  {
757  // TODO singleton!!!
758  ServerRPCService::shared_pointer serverRPCService(new ServerRPCService(_context));
759 
760  // TODO use std::make_shared
761  std::tr1::shared_ptr<ServerChannelRequesterImpl> tp(new ServerChannelRequesterImpl(transport, channelName, cid));
762  ChannelRequester::shared_pointer cr = tp;
763  Channel::shared_pointer serverChannel = createRPCChannel(ChannelProvider::shared_pointer(), channelName, cr, serverRPCService);
764  cr->channelCreated(Status::Ok, serverChannel);
765  }
766  else
767  {
768  const std::vector<ChannelProvider::shared_pointer>& _providers(_context->getChannelProviders());
769  ServerContextImpl::s_channelNameToProvider_t::const_iterator it;
770 
771  if (_providers.size() == 1)
772  ServerChannelRequesterImpl::create(_providers[0], transport, channelName, cid);
773  else {
774  ChannelProvider::shared_pointer prov;
775  {
776  Lock L(_context->_mutex);
777  if((it = _context->s_channelNameToProvider.find(channelName)) != _context->s_channelNameToProvider.end())
778  prov = it->second.lock();
779  }
780  if(prov)
781  ServerChannelRequesterImpl::create(prov, transport, channelName, cid);
782  }
783  }
784 }
785 
786 void ServerCreateChannelHandler::disconnect(Transport::shared_pointer const & transport)
787 {
788  transport->close();
789 }
790 
791 ServerChannelRequesterImpl::ServerChannelRequesterImpl(const Transport::shared_pointer &transport,
792  const string channelName, const pvAccessID cid) :
793  _serverChannel(),
794  _transport(std::tr1::static_pointer_cast<detail::BlockingServerTCPTransportCodec>(transport)),
795  _channelName(channelName),
796  _cid(cid),
797  _created(false)
798 {
799 }
800 
801 ChannelRequester::shared_pointer ServerChannelRequesterImpl::create(
802  ChannelProvider::shared_pointer const & provider, Transport::shared_pointer const & transport,
803  const string channelName, const pvAccessID cid)
804 {
805  // TODO use std::make_shared
806  std::tr1::shared_ptr<ServerChannelRequesterImpl> tp(new ServerChannelRequesterImpl(transport, channelName, cid));
807  ChannelRequester::shared_pointer cr = tp;
808  // TODO exception guard and report error back
809  provider->createChannel(channelName, cr, transport->getPriority());
810  return cr;
811 }
812 
813 void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::shared_pointer const & channel)
814 {
815  if(_created)
816  throw std::logic_error("Channel already created");
817  if(detail::BlockingServerTCPTransportCodec::shared_pointer transport = _transport.lock())
818  {
819  ServerChannel::shared_pointer serverChannel;
820  try
821  {
822  if (status.isSuccess())
823  {
824  //
825  // create a new channel instance
826  //
827  pvAccessID sid = transport->preallocateChannelSID();
828  try
829  {
830  serverChannel.reset(new ServerChannel(channel, shared_from_this(), _cid, sid));
831 
832  // ack allocation and register
833  transport->registerChannel(sid, serverChannel);
834 
835  } catch (...)
836  {
837  // depreallocate and rethrow
838  transport->depreallocateChannelSID(sid);
839  throw;
840  }
841  }
842 
843 
844  {
845  Lock guard(_mutex);
846  _status = status;
847  _serverChannel = serverChannel;
848  _created = true;
849  }
850 
851  TransportSender::shared_pointer thisSender = shared_from_this();
852  transport->enqueueSendRequest(thisSender);
853  }
854  catch (std::exception& e)
855  {
856  LOG(logLevelDebug, "Exception caught when creating channel '%s': %s", _channelName.c_str(), e.what());
857  {
858  Lock guard(_mutex);
859  _status = Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what());
860  }
861  TransportSender::shared_pointer thisSender = shared_from_this();
862  transport->enqueueSendRequest(thisSender);
863  }
864  catch (...)
865  {
866  LOG(logLevelDebug, "Exception caught when creating channel: %s", _channelName.c_str());
867  {
868  Lock guard(_mutex);
869  _status = Status(Status::STATUSTYPE_FATAL, "failed to create channel");
870  }
871  TransportSender::shared_pointer thisSender = shared_from_this();
872  transport->enqueueSendRequest(thisSender);
873  }
874  }
875 }
876 
877 void ServerChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & /*channel*/, const Channel::ConnectionState isConnected)
878 {
879  if(isConnected==Channel::CONNECTED || isConnected==Channel::NEVER_CONNECTED)
880  return;
881 
882  if(detail::BlockingServerTCPTransportCodec::shared_pointer transport = _transport.lock())
883  {
884  ServerChannel::shared_pointer channel;
885  {
886  Lock guard(_mutex);
887  _created = false;
888  channel= dynamic_pointer_cast<ServerChannel>(_serverChannel.lock());
889  }
890 
891  if (!channel)
892  return;
893 
894  // destroy
895  channel->destroy();
896 
897  // .. and unregister
898  transport->unregisterChannel(channel->getSID());
899 
900  // send response back
901  TransportSender::shared_pointer sr(new ServerDestroyChannelHandlerTransportSender(channel->getCID(), channel->getSID()));
902  transport->enqueueSendRequest(sr);
903  }
904 }
905 
906 std::tr1::shared_ptr<const PeerInfo> ServerChannelRequesterImpl::getPeerInfo()
907 {
908  if(detail::BlockingServerTCPTransportCodec::shared_pointer transport = _transport.lock()) {
909  epicsGuard<epicsMutex> G(transport->_mutex);
910  return transport->_peerInfo;
911 
912  } else {
913  return std::tr1::shared_ptr<const PeerInfo>();
914  }
915 }
916 
918 {
919  detail::BlockingServerTCPTransportCodec::shared_pointer transport = _transport.lock();
920  if (transport)
921  return transport->getRemoteName();
922  else
923  return "<unknown>:0";
924 }
925 
926 void ServerChannelRequesterImpl::message(std::string const & message, MessageType messageType)
927 {
928  LOG(logLevelDebug, "[%s] %s", getMessageTypeName(messageType).c_str(), message.c_str());
929 }
930 
932 {
933  ServerChannel::shared_pointer serverChannel;
934  Status status;
935  {
936  Lock guard(_mutex);
937  serverChannel = _serverChannel.lock();
938  status = _status;
939  }
940 
941  if (detail::BlockingServerTCPTransportCodec::shared_pointer transport = _transport.lock())
942  {
943  // error response
944  if (!serverChannel)
945  {
946  control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
947  buffer->putInt(_cid);
948  buffer->putInt(-1);
949  // error status is expected or channel has been destroyed locally
950  if (status.isSuccess())
951  status = Status(Status::STATUSTYPE_ERROR, "channel has been destroyed");
952  status.serialize(buffer, control);
953  }
954  // OK
955  else
956  {
957  ServerChannel::shared_pointer serverChannelImpl = dynamic_pointer_cast<ServerChannel>(serverChannel);
958  control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
959  buffer->putInt(serverChannelImpl->getCID());
960  buffer->putInt(serverChannelImpl->getSID());
961  status.serialize(buffer, control);
962  }
963  }
964 }
965 
966 /****************************************************************************************/
967 
969  Transport::shared_pointer const & transport, int8 version, int8 command,
970  size_t payloadSize, ByteBuffer* payloadBuffer)
971 {
973  transport, version, command, payloadSize, payloadBuffer);
974 
975  // NOTE: we do not explicitly check if transport OK
976  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
977 
978 
979  transport->ensureData(8);
980  const pvAccessID sid = payloadBuffer->getInt();
981  const pvAccessID cid = payloadBuffer->getInt();
982 
983  // get channel by SID
984  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
985  if (channel.get() == NULL)
986  {
987  if (!transport->isClosed())
988  {
989  char host[100];
990  sockAddrToDottedIP(&responseFrom->sa,host,100);
991  LOG(logLevelDebug, "Trying to destroy a channel that no longer exists (SID: %d, CID %d, client: %s).", sid, cid, host);
992  }
993  return;
994  }
995 
996  // destroy
997  channel->destroy();
998 
999  // .. and unregister
1000  casTransport->unregisterChannel(sid);
1001 
1002  // send response back
1003  TransportSender::shared_pointer sr(new ServerDestroyChannelHandlerTransportSender(cid, sid));
1004  transport->enqueueSendRequest(sr);
1005 }
1006 
1007 /****************************************************************************************/
1008 
1010  Transport::shared_pointer const & transport, int8 version, int8 command,
1011  size_t payloadSize, ByteBuffer* payloadBuffer)
1012 {
1014  transport, version, command, payloadSize, payloadBuffer);
1015 
1016  // NOTE: we do not explicitly check if transport is OK
1017  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
1018 
1019  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
1020  const pvAccessID sid = payloadBuffer->getInt();
1021  const pvAccessID ioid = payloadBuffer->getInt();
1022 
1023  // mode
1024  const int8 qosCode = payloadBuffer->getByte();
1025 
1026  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
1027  if (channel.get() == NULL)
1028  {
1030  return;
1031  }
1032 
1033  const bool init = (QOS_INIT & qosCode) != 0;
1034  if (init)
1035  {
1036  // pvRequest
1037  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
1038 
1039  // create...
1040  ServerChannelGetRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
1041  }
1042  else
1043  {
1044  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
1045 
1047  if (!request)
1048  {
1050  return;
1051  }
1052  atomic::add(request->bytesRX, payloadSize);
1053 
1054  if (!request->startRequest(qosCode))
1055  {
1057  return;
1058  }
1059 
1060  ChannelGet::shared_pointer channelGet = request->getChannelGet();
1061  if (lastRequest)
1062  channelGet->lastRequest();
1063  channelGet->get();
1064  }
1065 }
1066 
1067 #define INIT_EXCEPTION_GUARD(cmd, var, code) \
1068  try { \
1069  operation_type::shared_pointer op(code); \
1070  epicsGuard<epicsMutex> G(_mutex); \
1071  var = op; \
1072  } \
1073  catch (std::exception &e) { \
1074  Status status(Status::STATUSTYPE_FATAL, e.what()); \
1075  BaseChannelRequester::sendFailureMessage((int8)cmd, _transport, _ioid, (int8)QOS_INIT, status); \
1076  destroy(); \
1077  } \
1078  catch (...) { \
1079  Status status(Status::STATUSTYPE_FATAL, "unknown exception caught"); \
1080  BaseChannelRequester::sendFailureMessage((int8)cmd, _transport, _ioid, (int8)QOS_INIT, status); \
1081  destroy(); \
1082  }
1083 
1084 #define DESERIALIZE_EXCEPTION_GUARD(code) \
1085  try { \
1086  code; \
1087  } \
1088  catch (std::exception &e) { \
1089  Status status(Status::STATUSTYPE_ERROR, e.what()); \
1090  BaseChannelRequester::sendFailureMessage((int8)command, transport, ioid, qosCode, status); \
1091  throw; \
1092  } \
1093  catch (...) { \
1094  Status status(Status::STATUSTYPE_ERROR, "unknown exception caught"); \
1095  BaseChannelRequester::sendFailureMessage((int8)command, transport, ioid, qosCode, status); \
1096  throw; \
1097  }
1098 
1099 ServerChannelGetRequesterImpl::ServerChannelGetRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport) :
1100  BaseChannelRequester(context, channel, ioid, transport)
1101 
1102 {
1103 }
1104 
1105 ChannelGetRequester::shared_pointer ServerChannelGetRequesterImpl::create(ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,
1106  PVStructure::shared_pointer const & pvRequest)
1107 {
1108  // TODO use std::make_shared
1109  std::tr1::shared_ptr<ServerChannelGetRequesterImpl> tp(new ServerChannelGetRequesterImpl(context, channel, ioid, transport));
1110  ChannelGetRequester::shared_pointer thisPointer = tp;
1111  static_cast<ServerChannelGetRequesterImpl*>(thisPointer.get())->activate(pvRequest);
1112  return thisPointer;
1113 }
1114 
1115 void ServerChannelGetRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
1116 {
1118  shared_pointer thisPointer(shared_from_this());
1119  _channel->registerRequest(_ioid, thisPointer);
1120  INIT_EXCEPTION_GUARD(CMD_GET, _channelGet, _channel->getChannel()->createChannelGet(thisPointer, pvRequest));
1121 }
1122 
1123 void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet::shared_pointer const & channelGet, Structure::const_shared_pointer const & structure)
1124 {
1125  {
1126  Lock guard(_mutex);
1127  _status = status;
1128  _channelGet = channelGet;
1129 
1130  if (_status.isSuccess())
1131  {
1132  _pvStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(structure, _pvStructure));
1133  _bitSet = createBitSetFor(_pvStructure, _bitSet);
1134  }
1135  }
1136 
1137  TransportSender::shared_pointer thisSender = shared_from_this();
1138  _transport->enqueueSendRequest(thisSender);
1139 
1140  // self-destruction
1141  if (!status.isSuccess())
1142  {
1143  destroy();
1144  }
1145 }
1146 
1147 void ServerChannelGetRequesterImpl::getDone(const Status& status, ChannelGet::shared_pointer const & /*channelGet*/,
1148  PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
1149 {
1150  {
1151  Lock guard(_mutex);
1152  _status = status;
1153  if (_status.isSuccess())
1154  {
1155  *_bitSet = *bitSet;
1156  _pvStructure->copyUnchecked(*pvStructure, *_bitSet);
1157  }
1158  }
1159 
1160  TransportSender::shared_pointer thisSender = shared_from_this();
1161  _transport->enqueueSendRequest(thisSender);
1162 }
1163 
1165 {
1166  // keep a reference to ourselves as the owner
1167  // could release its reference and we don't want to be
1168  // destroyed prematurely
1169  shared_pointer self(shared_from_this());
1170 
1171  // hold a reference to channelGet so that _channelGet.reset()
1172  // does not call ~ChannelGet (external code) while we are holding a lock
1173  ChannelGet::shared_pointer channelGet = _channelGet;
1174  {
1175  Lock guard(_mutex);
1176  _channel->unregisterRequest(_ioid);
1177 
1178  if (_channelGet)
1179  {
1180  _channelGet->destroy();
1181  _channelGet.reset();
1182  }
1183  }
1184 }
1185 
1187 {
1188  return _channelGet;
1189 }
1190 
1191 // TODO get rid of all these mutex-es
1193 {
1194  const int32 request = getPendingRequest();
1195 
1196  ChannelGet::shared_pointer channelGet;
1197  {
1198  Lock guard(_mutex);
1199  channelGet = _channelGet;
1200  // we must respond to QOS_INIT (e.g. creation error)
1201  if (!channelGet && !(request & QOS_INIT))
1202  return;
1203  }
1204 
1205  control->startMessage((int8)CMD_GET, sizeof(int32)/sizeof(int8) + 1);
1206  buffer->putInt(_ioid);
1207  buffer->put((int8)request);
1208  {
1209  Lock guard(_mutex);
1210  _status.serialize(buffer, control);
1211  }
1212 
1213  // TODO !!!
1214  // if we call stopRequest() below (the second one, commented out), we might be too late
1215  // since between last serialization data and stopRequest() a buffer can be already flushed
1216  // (i.e. in case of directSerialize)
1217  // if we call it here, then a bad client can issue another request just after stopRequest() was called
1218  stopRequest();
1219 
1220  if (_status.isSuccess())
1221  {
1222  if (request & QOS_INIT)
1223  {
1224  Lock guard(_mutex);
1225  control->cachedSerialize(_pvStructure->getStructure(), buffer);
1226  }
1227  else
1228  {
1229  ScopedLock lock(channelGet);
1230 
1231  _bitSet->serialize(buffer, control);
1232  _pvStructure->serialize(buffer, control, _bitSet.get());
1233  }
1234  }
1235 
1236  //stopRequest();
1237 
1238  // lastRequest
1239  if (request & QOS_DESTROY)
1240  {
1241  destroy();
1242  }
1243 }
1244 /****************************************************************************************/
1246  Transport::shared_pointer const & transport, int8 version, int8 command,
1247  size_t payloadSize, ByteBuffer* payloadBuffer) {
1249  transport, version, command, payloadSize, payloadBuffer);
1250 
1251 
1252  // NOTE: we do not explicitly check if transport is OK
1253  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
1254 
1255  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
1256  const pvAccessID sid = payloadBuffer->getInt();
1257  const pvAccessID ioid = payloadBuffer->getInt();
1258 
1259  // mode
1260  const int8 qosCode = payloadBuffer->getByte();
1261 
1262  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
1263  if (!channel.get())
1264  {
1266  return;
1267  }
1268 
1269  const bool init = (QOS_INIT & qosCode) != 0;
1270  if (init)
1271  {
1272  // pvRequest
1273  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
1274 
1275  // create...
1276  ServerChannelPutRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
1277  }
1278  else
1279  {
1280  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
1281  const bool get = (QOS_GET & qosCode) != 0;
1282 
1284  if (!request.get())
1285  {
1287  return;
1288  }
1289  atomic::add(request->bytesRX, payloadSize);
1290 
1291  if (!request->startRequest(qosCode))
1292  {
1294  return;
1295  }
1296 
1297  ChannelPut::shared_pointer channelPut = request->getChannelPut();
1298 
1299  if (lastRequest)
1300  channelPut->lastRequest();
1301 
1302  if (get)
1303  {
1304  channelPut->get();
1305  }
1306  else
1307  {
1308  // deserialize bitSet and do a put
1309 
1310  {
1311  ScopedLock lock(channelPut); // TODO not needed if put is processed by the same thread
1312  BitSet::shared_pointer putBitSet = request->getPutBitSet();
1313  PVStructure::shared_pointer putPVStructure = request->getPutPVStructure();
1314 
1316  putBitSet->deserialize(payloadBuffer, transport.get());
1317  putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get());
1318  );
1319 
1320  lock.unlock();
1321 
1322  channelPut->put(putPVStructure, putBitSet);
1323  }
1324  }
1325  }
1326 }
1327 
1328 ServerChannelPutRequesterImpl::ServerChannelPutRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
1329  const pvAccessID ioid, Transport::shared_pointer const & transport):
1330  BaseChannelRequester(context, channel, ioid, transport)
1331 {
1332 }
1333 
1334 ChannelPutRequester::shared_pointer ServerChannelPutRequesterImpl::create(ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
1335  const pvAccessID ioid, Transport::shared_pointer const & transport, PVStructure::shared_pointer const & pvRequest)
1336 {
1337  // TODO use std::make_shared
1338  std::tr1::shared_ptr<ServerChannelPutRequesterImpl> tp(new ServerChannelPutRequesterImpl(context, channel, ioid, transport));
1339  ChannelPutRequester::shared_pointer thisPointer = tp;
1340  static_cast<ServerChannelPutRequesterImpl*>(thisPointer.get())->activate(pvRequest);
1341  return thisPointer;
1342 }
1343 
1344 void ServerChannelPutRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
1345 {
1347  shared_pointer thisPointer(shared_from_this());
1348  _channel->registerRequest(_ioid, thisPointer);
1349  INIT_EXCEPTION_GUARD(CMD_PUT, _channelPut, _channel->getChannel()->createChannelPut(thisPointer, pvRequest));
1350 }
1351 
1352 void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut::shared_pointer const & channelPut, Structure::const_shared_pointer const & structure)
1353 {
1354  {
1355  Lock guard(_mutex);
1356  _status = status;
1357  _channelPut = channelPut;
1358  if (_status.isSuccess())
1359  {
1360  _pvStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(structure, _pvStructure));
1361  _bitSet = createBitSetFor(_pvStructure, _bitSet);
1362  }
1363  }
1364 
1365  TransportSender::shared_pointer thisSender = shared_from_this();
1366  _transport->enqueueSendRequest(thisSender);
1367 
1368  // self-destruction
1369  if (!status.isSuccess())
1370  {
1371  destroy();
1372  }
1373 }
1374 
1375 void ServerChannelPutRequesterImpl::putDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/)
1376 {
1377  {
1378  Lock guard(_mutex);
1379  _status = status;
1380  }
1381  TransportSender::shared_pointer thisSender = shared_from_this();
1382  _transport->enqueueSendRequest(thisSender);
1383 }
1384 
1385 void ServerChannelPutRequesterImpl::getDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
1386 {
1387  {
1388  Lock guard(_mutex);
1389  _status = status;
1390  if (_status.isSuccess())
1391  {
1392  *_bitSet = *bitSet;
1393  _pvStructure->copyUnchecked(*pvStructure, *_bitSet);
1394  }
1395  }
1396  TransportSender::shared_pointer thisSender = shared_from_this();
1397  _transport->enqueueSendRequest(thisSender);
1398 }
1399 
1401 {
1402  // keep a reference to ourselves as the owner
1403  // could release its reference and we don't want to be
1404  // destroyed prematurely
1405  shared_pointer self(shared_from_this());
1406 
1407  // hold a reference to channelGet so that _channelPut.reset()
1408  // does not call ~ChannelPut (external code) while we are holding a lock
1409  ChannelPut::shared_pointer channelPut = _channelPut;
1410  {
1411  Lock guard(_mutex);
1412  _channel->unregisterRequest(_ioid);
1413 
1414  if (_channelPut)
1415  {
1416  _channelPut->destroy();
1417  _channelPut.reset();
1418  }
1419  }
1420 }
1421 
1423 {
1424  //Lock guard(_mutex);
1425  return _channelPut;
1426 }
1427 
1429 {
1430  //Lock guard(_mutex);
1431  return _bitSet;
1432 }
1433 
1435 {
1436  //Lock guard(_mutex);
1437  return _pvStructure;
1438 }
1439 
1441 {
1442  const int32 request = getPendingRequest();
1443 
1444  ChannelPut::shared_pointer channelPut;
1445  {
1446  Lock guard(_mutex);
1447  channelPut = _channelPut;
1448  // we must respond to QOS_INIT (e.g. creation error)
1449  if (!channelPut && !(request & QOS_INIT))
1450  return;
1451  }
1452 
1453  control->startMessage((int32)CMD_PUT, sizeof(int32)/sizeof(int8) + 1);
1454  buffer->putInt(_ioid);
1455  buffer->putByte((int8)request);
1456  {
1457  Lock guard(_mutex);
1458  _status.serialize(buffer, control);
1459  }
1460 
1461  if (_status.isSuccess())
1462  {
1463  if ((QOS_INIT & request) != 0)
1464  {
1465  Lock guard(_mutex);
1466  control->cachedSerialize(_pvStructure->getStructure(), buffer);
1467  }
1468  else if ((QOS_GET & request) != 0)
1469  {
1470  ScopedLock lock(channelPut);
1471  _bitSet->serialize(buffer, control);
1472  _pvStructure->serialize(buffer, control, _bitSet.get());
1473  }
1474  }
1475 
1476  stopRequest();
1477 
1478  // lastRequest
1479  if ((QOS_DESTROY & request) != 0)
1480  destroy();
1481 }
1482 
1483 
1484 /****************************************************************************************/
1486  Transport::shared_pointer const & transport, int8 version, int8 command,
1487  size_t payloadSize, ByteBuffer* payloadBuffer) {
1489  transport, version, command, payloadSize, payloadBuffer);
1490 
1491  // NOTE: we do not explicitly check if transport is OK
1492  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
1493 
1494  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
1495  const pvAccessID sid = payloadBuffer->getInt();
1496  const pvAccessID ioid = payloadBuffer->getInt();
1497 
1498  // mode
1499  const int8 qosCode = payloadBuffer->getByte();
1500 
1501  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
1502  if (!channel.get())
1503  {
1505  return;
1506  }
1507 
1508  const bool init = (QOS_INIT & qosCode) != 0;
1509  if (init)
1510  {
1511  // pvRequest
1512  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
1513 
1514  // create...
1515  ServerChannelPutGetRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
1516  }
1517  else
1518  {
1519  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
1520  const bool getGet = (QOS_GET & qosCode) != 0;
1521  const bool getPut = (QOS_GET_PUT & qosCode) != 0;
1522 
1524  if (!request.get())
1525  {
1527  return;
1528  }
1529  atomic::add(request->bytesRX, payloadSize);
1530 
1531  if (!request->startRequest(qosCode))
1532  {
1534  return;
1535  }
1536 
1537  ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet();
1538  if (lastRequest)
1539  channelPutGet->lastRequest();
1540 
1541  if (getGet)
1542  {
1543  channelPutGet->getGet();
1544  }
1545  else if(getPut)
1546  {
1547  channelPutGet->getPut();
1548  }
1549  else
1550  {
1551  // deserialize bitSet and do a put
1552  {
1553  ScopedLock lock(channelPutGet); // TODO not necessary if read is done in putGet
1554  BitSet::shared_pointer putBitSet = request->getPutGetBitSet();
1555  PVStructure::shared_pointer putPVStructure = request->getPutGetPVStructure();
1556 
1558  putBitSet->deserialize(payloadBuffer, transport.get());
1559  putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get());
1560  );
1561 
1562  lock.unlock();
1563 
1564  channelPutGet->putGet(putPVStructure, putBitSet);
1565  }
1566  }
1567  }
1568 }
1569 
1571  const pvAccessID ioid, Transport::shared_pointer const & transport):
1572  BaseChannelRequester(context, channel, ioid, transport), _channelPutGet(), _pvPutStructure(), _pvGetStructure()
1573 {
1574 }
1575 
1576 ChannelPutGetRequester::shared_pointer ServerChannelPutGetRequesterImpl::create(ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
1577  const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest)
1578 {
1579  // TODO use std::make_shared
1580  std::tr1::shared_ptr<ServerChannelPutGetRequesterImpl> tp(new ServerChannelPutGetRequesterImpl(context, channel, ioid, transport));
1581  ChannelPutGetRequester::shared_pointer thisPointer = tp;
1582  static_cast<ServerChannelPutGetRequesterImpl*>(thisPointer.get())->activate(pvRequest);
1583  return thisPointer;
1584 }
1585 
1586 void ServerChannelPutGetRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
1587 {
1589  shared_pointer thisPointer(shared_from_this());
1590  _channel->registerRequest(_ioid, thisPointer);
1591  INIT_EXCEPTION_GUARD(CMD_PUT_GET, _channelPutGet, _channel->getChannel()->createChannelPutGet(thisPointer, pvRequest));
1592 }
1593 
1594 void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
1595  Structure::const_shared_pointer const & putStructure, Structure::const_shared_pointer const & getStructure)
1596 {
1597  {
1598  Lock guard(_mutex);
1599  _status = status;
1600  _channelPutGet = channelPutGet;
1601  if (_status.isSuccess())
1602  {
1603  _pvPutStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(putStructure, _pvPutStructure));
1604  _pvPutBitSet = createBitSetFor(_pvPutStructure, _pvPutBitSet);
1605 
1606  _pvGetStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(getStructure, _pvGetStructure));
1607  _pvGetBitSet = createBitSetFor(_pvGetStructure, _pvGetBitSet);
1608  }
1609  }
1610 
1611  TransportSender::shared_pointer thisSender = shared_from_this();
1612  _transport->enqueueSendRequest(thisSender);
1613 
1614  // self-destruction
1615  if (!status.isSuccess())
1616  {
1617  destroy();
1618  }
1619 }
1620 
1621 void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
1622  PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
1623 {
1624  {
1625  Lock guard(_mutex);
1626  _status = status;
1627  if (_status.isSuccess())
1628  {
1629  *_pvGetBitSet = *bitSet;
1630  _pvGetStructure->copyUnchecked(*pvStructure, *_pvGetBitSet);
1631  }
1632  }
1633  TransportSender::shared_pointer thisSender = shared_from_this();
1634  _transport->enqueueSendRequest(thisSender);
1635 }
1636 
1637 void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
1638  PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
1639 {
1640  {
1641  Lock guard(_mutex);
1642  _status = status;
1643  if (_status.isSuccess())
1644  {
1645  *_pvPutBitSet = *bitSet;
1646  _pvPutStructure->copyUnchecked(*pvStructure, *_pvPutBitSet);
1647  }
1648  }
1649  TransportSender::shared_pointer thisSender = shared_from_this();
1650  _transport->enqueueSendRequest(thisSender);
1651 }
1652 
1653 void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
1654  PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
1655 {
1656  {
1657  Lock guard(_mutex);
1658  _status = status;
1659  if (_status.isSuccess())
1660  {
1661  *_pvGetBitSet = *bitSet;
1662  _pvGetStructure->copyUnchecked(*pvStructure, *_pvGetBitSet);
1663  }
1664  }
1665  TransportSender::shared_pointer thisSender = shared_from_this();
1666  _transport->enqueueSendRequest(thisSender);
1667 }
1668 
1670 {
1671  // keep a reference to ourselves as the owner
1672  // could release its reference and we don't want to be
1673  // destroyed prematurely
1674  shared_pointer self(shared_from_this());
1675 
1676  // hold a reference to channelPutGet so that _channelPutGet.reset()
1677  // does not call ~ChannelPutGet (external code) while we are holding a lock
1678  ChannelPutGet::shared_pointer channelPutGet = _channelPutGet;
1679  {
1680  Lock guard(_mutex);
1681  _channel->unregisterRequest(_ioid);
1682 
1683  if (_channelPutGet)
1684  {
1685  _channelPutGet->destroy();
1686  _channelPutGet.reset();
1687  }
1688  }
1689 }
1690 
1692 {
1693  //Lock guard(_mutex);
1694  return _channelPutGet;
1695 }
1696 
1698 {
1699  //Lock guard(_mutex);
1700  return _pvPutStructure;
1701 }
1702 
1704 {
1705  //Lock guard(_mutex);
1706  return _pvPutBitSet;
1707 }
1708 
1710 {
1711  const int32 request = getPendingRequest();
1712 
1713  ChannelPutGet::shared_pointer channelPutGet;
1714  {
1715  Lock guard(_mutex);
1716  channelPutGet = _channelPutGet;
1717  // we must respond to QOS_INIT (e.g. creation error)
1718  if (!channelPutGet && !(request & QOS_INIT))
1719  return;
1720  }
1721 
1722  control->startMessage(CMD_PUT_GET, sizeof(int32)/sizeof(int8) + 1);
1723  buffer->putInt(_ioid);
1724  buffer->putByte((int8)request);
1725  {
1726  Lock guard(_mutex);
1727  _status.serialize(buffer, control);
1728  }
1729 
1730  if (_status.isSuccess())
1731  {
1732  if ((QOS_INIT & request) != 0)
1733  {
1734  Lock guard(_mutex);
1735  control->cachedSerialize(_pvPutStructure->getStructure(), buffer);
1736  control->cachedSerialize(_pvGetStructure->getStructure(), buffer);
1737  }
1738  else if ((QOS_GET & request) != 0)
1739  {
1740  Lock guard(_mutex);
1741  _pvGetBitSet->serialize(buffer, control);
1742  _pvGetStructure->serialize(buffer, control, _pvGetBitSet.get());
1743  }
1744  else if ((QOS_GET_PUT & request) != 0)
1745  {
1746  ScopedLock lock(channelPutGet);
1747  //Lock guard(_mutex);
1748  _pvPutBitSet->serialize(buffer, control);
1749  _pvPutStructure->serialize(buffer, control, _pvPutBitSet.get());
1750  }
1751  else
1752  {
1753  ScopedLock lock(channelPutGet);
1754  //Lock guard(_mutex);
1755  _pvGetBitSet->serialize(buffer, control);
1756  _pvGetStructure->serialize(buffer, control, _pvGetBitSet.get());
1757  }
1758  }
1759 
1760  stopRequest();
1761 
1762  // lastRequest
1763  if ((QOS_DESTROY & request) != 0)
1764  destroy();
1765 }
1766 
1767 /****************************************************************************************/
1769  Transport::shared_pointer const & transport, int8 version, int8 command,
1770  size_t payloadSize, ByteBuffer* payloadBuffer) {
1772  transport, version, command, payloadSize, payloadBuffer);
1773 
1774  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
1775  assert(!!casTransport);
1776 
1777  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
1778  const pvAccessID sid = payloadBuffer->getInt();
1779  const pvAccessID ioid = payloadBuffer->getInt();
1780 
1781  // mode
1782  const int8 qosCode = payloadBuffer->getByte();
1783 
1784  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
1785  if (!channel.get())
1786  {
1788  return;
1789  }
1790 
1791  const bool init = (QOS_INIT & qosCode) != 0;
1792  if (init)
1793  {
1794  // pvRequest
1795  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
1796 
1797  // create...
1798  ServerMonitorRequesterImpl::shared_pointer request(ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest));
1799 
1800  // pipelining monitor (i.e. w/ flow control)
1801  const bool ack = (QOS_GET_PUT & qosCode) != 0;
1802  if (ack)
1803  {
1804  transport->ensureData(4);
1805  int32 nfree = payloadBuffer->getInt();
1806 
1807  request->ack(nfree);
1808  }
1809 
1810  }
1811  else
1812  {
1813  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
1814  const bool get = (QOS_GET & qosCode) != 0;
1815  const bool process = (QOS_PROCESS & qosCode) != 0;
1816  const bool ack = (QOS_GET_PUT & qosCode) != 0;
1817 
1819  if (!request.get())
1820  {
1822  return;
1823  }
1824  atomic::add(request->bytesRX, payloadSize);
1825 
1826  if (ack)
1827  {
1828  transport->ensureData(4);
1829  int32 nfree = payloadBuffer->getInt();
1830  request->ack(nfree);
1831  return;
1832  // note: not possible to ack and destroy
1833  }
1834 
1835  /*
1836  if (!request->startRequest(qosCode))
1837  {
1838  BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::otherRequestPendingStatus);
1839  return;
1840  }
1841  */
1842 
1843  if (process)
1844  {
1845  if (get)
1846  request->getChannelMonitor()->start();
1847  else
1848  request->getChannelMonitor()->stop();
1849  //request.stopRequest();
1850  }
1851  else if (get)
1852  {
1853  // not supported
1854  }
1855 
1856  if (lastRequest)
1857  request->destroy();
1858  }
1859 }
1860 
1862  ServerContextImpl::shared_pointer const & context,
1863  ServerChannel::shared_pointer const & channel,
1864  const pvAccessID ioid, Transport::shared_pointer const & transport)
1865  :BaseChannelRequester(context, channel, ioid, transport)
1866  ,_window_open(0u)
1867  ,_unlisten(false)
1868  ,_pipeline(false)
1869 {}
1870 
1872  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
1873  const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest)
1874 {
1875  std::tr1::shared_ptr<ServerMonitorRequesterImpl> tp(new ServerMonitorRequesterImpl(context, channel, ioid, transport));
1876  tp->activate(pvRequest);
1877  return tp;
1878 }
1879 
1880 void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
1881 {
1882  epics::pvData::PVScalar::const_shared_pointer O(pvRequest->getSubField<epics::pvData::PVScalar>("record._options.pipeline"));
1883  if(O) {
1884  try{
1885  _pipeline = O->getAs<epics::pvData::boolean>();
1886  }catch(std::exception& e){
1887  std::ostringstream strm;
1888  strm<<"Ignoring invalid pipeline= : "<<e.what();
1889  message(strm.str(), epics::pvData::errorMessage);
1890  }
1891  }
1893  shared_pointer thisPointer(shared_from_this());
1894  _channel->registerRequest(_ioid, thisPointer);
1895  INIT_EXCEPTION_GUARD(CMD_MONITOR, _channelMonitor, _channel->getChannel()->createMonitor(thisPointer, pvRequest));
1896 }
1897 
1898 void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::shared_pointer const & monitor, epics::pvData::StructureConstPtr const & structure)
1899 {
1900  {
1901  Lock guard(_mutex);
1902  _status = status;
1903  _channelMonitor = monitor;
1904  _structure = structure;
1905  }
1906  TransportSender::shared_pointer thisSender = shared_from_this();
1907  _transport->enqueueSendRequest(thisSender);
1908 
1909  // self-destruction
1910  if (!status.isSuccess())
1911  {
1912  destroy();
1913  }
1914 }
1915 
1916 void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/)
1917 {
1918  {
1919  Lock guard(_mutex);
1920  _unlisten = true;
1921  }
1922  TransportSender::shared_pointer thisSender = shared_from_this();
1923  _transport->enqueueSendRequest(thisSender);
1924 }
1925 
1926 void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/)
1927 {
1928  TransportSender::shared_pointer thisSender = shared_from_this();
1929  _transport->enqueueSendRequest(thisSender);
1930 }
1931 
1933 {
1934  // keep a reference to ourselves as the owner
1935  // could release its reference and we don't want to be
1936  // destroyed prematurely
1937  shared_pointer self(shared_from_this());
1938 
1939  // hold a reference to channelMonitor so that _channelMonitor.reset()
1940  // does not call ~Monitor (external code) while we are holding a lock
1941  Monitor::shared_pointer monitor;
1942  window_t window;
1943  {
1944  Lock guard(_mutex);
1945  _channel->unregisterRequest(_ioid);
1946 
1947  window.swap(_window_closed);
1948 
1949  monitor.swap(_channelMonitor);
1950  }
1951  window.clear();
1952  if(monitor) {
1953  monitor->destroy();
1954  }
1955 }
1956 
1958 {
1959  Lock guard(_mutex);
1960  return _channelMonitor;
1961 }
1962 
1964 {
1965  const int32 request = getPendingRequest();
1966 
1967  if ((QOS_INIT & request) != 0)
1968  {
1969  control->startMessage((int32)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
1970  buffer->putInt(_ioid);
1971  buffer->putByte((int8)request);
1972 
1973  {
1974  Lock guard(_mutex);
1975  _status.serialize(buffer, control);
1976  }
1977 
1978  if (_status.isSuccess())
1979  {
1980  // valid due to _mutex lock above
1981  control->cachedSerialize(_structure, buffer);
1982  }
1983  stopRequest();
1985  }
1986  else
1987  {
1988  Monitor::shared_pointer monitor(getChannelMonitor());
1989  if (!monitor)
1990  return;
1991 
1992  // TODO asCheck ?
1993 
1994  bool busy = false;
1995  if(_pipeline) {
1996  Lock guard(_mutex);
1997  busy = _window_open==0;
1998  }
1999 
2000  MonitorElement::Ref element;
2001  if(!busy) {
2002  MonitorElement::Ref E(monitor);
2003  E.swap(element);
2004  }
2005  if (element)
2006  {
2007  control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
2008  buffer->putInt(_ioid);
2009  buffer->putByte((int8)request);
2010 
2011  // changedBitSet and data, if not notify only (i.e. queueSize == -1)
2012  const BitSet::shared_pointer& changedBitSet = element->changedBitSet;
2013  if (changedBitSet)
2014  {
2015  changedBitSet->serialize(buffer, control);
2016  element->pvStructurePtr->serialize(buffer, control, changedBitSet.get());
2017 
2018  // overrunBitset
2019  element->overrunBitSet->serialize(buffer, control);
2020  }
2021 
2022  {
2023  Lock guard(_mutex);
2024  if(!_pipeline) {
2025  } else if(_window_open==0) {
2026  // This really shouldn't happen as the above ensures that _window_open *was* non-zero,
2027  // and only we (the sender) will decrement.
2028  message("Monitor Logic Error: send outside of window", epics::pvData::warningMessage);
2029  LOG(logLevelError, "Monitor Logic Error: send outside of window %zu", _window_closed.size());
2030 
2031  } else {
2032  _window_closed.push_back(element.letGo());
2033  _window_open--;
2034  }
2035  }
2036 
2037  element.reset(); // calls Monitor::release() if not swap()'d
2038 
2039  // TODO if we try to proces several monitors at once, then fairness suffers
2040  // TODO compbine several monitors into one message (reduces payload)
2041  TransportSender::shared_pointer thisSender = shared_from_this();
2042  _transport->enqueueSendRequest(thisSender);
2043  }
2044  else
2045  {
2046  bool unlisten;
2047  window_t window;
2048  {
2049  Lock guard(_mutex);
2050  unlisten = _unlisten;
2051  _unlisten = false;
2052  if(unlisten) {
2053  window.swap(_window_closed);
2054  _window_open = 0u;
2055  }
2056  }
2057 
2058  for(window_t::iterator it(window.begin()), end(window.end()); it!=end; ++it) {
2059  monitor->release(*it);
2060  }
2061  window.clear();
2062 
2063  if (unlisten)
2064  {
2065  control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1);
2066  buffer->putInt(_ioid);
2067  buffer->putByte((int8)QOS_DESTROY);
2068  Status::Ok.serialize(buffer, control);
2069  }
2070  }
2071 
2072  }
2073 }
2074 
2076 {
2077  typedef std::vector<MonitorElementPtr> acking_t;
2078  acking_t acking;
2079  Monitor::shared_pointer mon;
2080  {
2081  Lock guard(_mutex);
2082 
2083  // cnt will be larger if this is the initial window update,
2084  // or if the window is being enlarged.
2085  size_t nack = std::min(cnt, _window_closed.size());
2086 
2087  _window_open += cnt;
2088 
2089  window_t::iterator it, end(_window_closed.begin());
2090  std::advance(end, nack);
2091 
2092  acking.resize(nack);
2093 
2094  size_t i;
2095  for(i=0, it=_window_closed.begin(); i<nack; i++, ++it)
2096  {
2097  acking[i].swap(*it);
2098  }
2099 
2100  _window_closed.erase(_window_closed.begin(), end);
2101 
2102  mon = _channelMonitor;
2103  }
2104 
2105  for(acking_t::iterator it(acking.begin()), end(acking.end()); it!=end; ++it) {
2106  mon->release(*it);
2107  }
2108 
2109  mon->reportRemoteQueueStatus(cnt);
2110 }
2111 
2112 /****************************************************************************************/
2114  Transport::shared_pointer const & transport, int8 version, int8 command,
2115  size_t payloadSize, ByteBuffer* payloadBuffer) {
2117  transport, version, command, payloadSize, payloadBuffer);
2118 
2119  // NOTE: we do not explicitly check if transport is OK
2120  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2121 
2122  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
2123  const pvAccessID sid = payloadBuffer->getInt();
2124  const pvAccessID ioid = payloadBuffer->getInt();
2125 
2126  // mode
2127  const int8 qosCode = payloadBuffer->getByte();
2128 
2129  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2130  if (!channel.get())
2131  {
2133  return;
2134  }
2135 
2136  const bool init = (QOS_INIT & qosCode) != 0;
2137  if (init)
2138  {
2139  // pvRequest
2140  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
2141 
2142  // create...
2143  ServerChannelArrayRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
2144  }
2145  else
2146  {
2147  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
2148  const bool get = (QOS_GET & qosCode) != 0;
2149  const bool setLength = (QOS_GET_PUT & qosCode) != 0;
2150  const bool getLength = (QOS_PROCESS & qosCode) != 0;
2151 
2153  if (!request.get())
2154  {
2156  return;
2157  }
2158  atomic::add(request->bytesRX, payloadSize);
2159 
2160  if (!request->startRequest(qosCode))
2161  {
2163  return;
2164  }
2165 
2166  ChannelArray::shared_pointer channelArray = request->getChannelArray();
2167  if (lastRequest)
2168  channelArray->lastRequest();
2169 
2170  if (get)
2171  {
2172  size_t offset = SerializeHelper::readSize(payloadBuffer, transport.get());
2173  size_t count = SerializeHelper::readSize(payloadBuffer, transport.get());
2174  size_t stride = SerializeHelper::readSize(payloadBuffer, transport.get());
2175 
2176  request->getChannelArray()->getArray(offset, count, stride);
2177  }
2178  else if (setLength)
2179  {
2180  size_t length = SerializeHelper::readSize(payloadBuffer, transport.get());
2181 
2182  request->getChannelArray()->setLength(length);
2183  }
2184  else if (getLength)
2185  {
2186  request->getChannelArray()->getLength();
2187  }
2188  else
2189  {
2190  // deserialize data to put
2191  size_t offset;
2192  size_t stride;
2193  PVArray::shared_pointer array = request->getPVArray();
2194  {
2195  ScopedLock lock(channelArray); // TODO not needed if read by the same thread
2196 
2198  offset = SerializeHelper::readSize(payloadBuffer, transport.get());
2199  stride = SerializeHelper::readSize(payloadBuffer, transport.get());
2200  array->deserialize(payloadBuffer, transport.get());
2201  );
2202  }
2203 
2204  channelArray->putArray(array, offset, array->getLength(), stride);
2205  }
2206  }
2207 }
2208 
2210  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2211  const pvAccessID ioid, Transport::shared_pointer const & transport):
2212  BaseChannelRequester(context, channel, ioid, transport)
2213 {
2214 }
2215 
2216 ChannelArrayRequester::shared_pointer ServerChannelArrayRequesterImpl::create(
2217  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2218  const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest)
2219 {
2220  // TODO use std::make_shared
2221  std::tr1::shared_ptr<ServerChannelArrayRequesterImpl> tp(new ServerChannelArrayRequesterImpl(context, channel, ioid, transport));
2222  ChannelArrayRequester::shared_pointer thisPointer = tp;
2223  static_cast<ServerChannelArrayRequesterImpl*>(thisPointer.get())->activate(pvRequest);
2224  return thisPointer;
2225 }
2226 
2227 void ServerChannelArrayRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
2228 {
2230  shared_pointer thisPointer(shared_from_this());
2231  _channel->registerRequest(_ioid, thisPointer);
2232  INIT_EXCEPTION_GUARD(CMD_ARRAY, _channelArray, _channel->getChannel()->createChannelArray(thisPointer, pvRequest));
2233 }
2234 
2235 void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray::shared_pointer const & channelArray, Array::const_shared_pointer const & array)
2236 {
2237  if (status.isSuccess() && array->getArraySizeType() == Array::fixed)
2238  {
2239  Lock guard(_mutex);
2240  _status = Status(Status::STATUSTYPE_ERROR, "fixed sized array returned as a ChannelArray array instance");
2241  _channelArray.reset();
2242  _pvArray.reset();
2243  }
2244  else
2245  {
2246  Lock guard(_mutex);
2247  _status = status;
2248  _channelArray = channelArray;
2249  if (_status.isSuccess())
2250  {
2251  _pvArray = std::tr1::static_pointer_cast<PVArray>(reuseOrCreatePVField(array, _pvArray));
2252  }
2253  }
2254 
2255  TransportSender::shared_pointer thisSender = shared_from_this();
2256  _transport->enqueueSendRequest(thisSender);
2257 
2258  // self-destruction
2259  if (!status.isSuccess())
2260  {
2261  destroy();
2262  }
2263 }
2264 
2265 void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/, PVArray::shared_pointer const & pvArray)
2266 {
2267  {
2268  Lock guard(_mutex);
2269  _status = status;
2270  if (_status.isSuccess())
2271  {
2272  _pvArray->copyUnchecked(*pvArray);
2273  }
2274  }
2275  TransportSender::shared_pointer thisSender = shared_from_this();
2276  _transport->enqueueSendRequest(thisSender);
2277 }
2278 
2279 void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/)
2280 {
2281  {
2282  Lock guard(_mutex);
2283  _status = status;
2284  }
2285  TransportSender::shared_pointer thisSender = shared_from_this();
2286  _transport->enqueueSendRequest(thisSender);
2287 }
2288 
2289 void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/)
2290 {
2291  {
2292  Lock guard(_mutex);
2293  _status = status;
2294  }
2295  TransportSender::shared_pointer thisSender = shared_from_this();
2296  _transport->enqueueSendRequest(thisSender);
2297 }
2298 
2299 void ServerChannelArrayRequesterImpl::getLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/,
2300  size_t length)
2301 {
2302  {
2303  Lock guard(_mutex);
2304  _status = status;
2305  _length = length;
2306  }
2307  TransportSender::shared_pointer thisSender = shared_from_this();
2308  _transport->enqueueSendRequest(thisSender);
2309 }
2310 
2312 {
2313  // keep a reference to ourselves as the owner
2314  // could release its reference and we don't want to be
2315  // destroyed prematurely
2316  shared_pointer self(shared_from_this());
2317 
2318  // hold a reference to channelArray so that _channelArray.reset()
2319  // does not call ~ChannelArray (external code) while we are holding a lock
2320  ChannelArray::shared_pointer channelArray = _channelArray;
2321  {
2322  Lock guard(_mutex);
2323  _channel->unregisterRequest(_ioid);
2324 
2325  if (_channelArray)
2326  {
2327  _channelArray->destroy();
2328  _channelArray.reset();
2329  }
2330  }
2331 }
2332 
2334 {
2335  //Lock guard(_mutex);
2336  return _channelArray;
2337 }
2338 
2340 {
2341  //Lock guard(_mutex);
2342  return _pvArray;
2343 }
2344 
2346 {
2347  const int32 request = getPendingRequest();
2348 
2349  ChannelArray::shared_pointer channelArray;
2350  {
2351  Lock guard(_mutex);
2352  channelArray = _channelArray;
2353  // we must respond to QOS_INIT (e.g. creation error)
2354  if (!channelArray && !(request & QOS_INIT))
2355  return;
2356  }
2357 
2358  control->startMessage((int32)CMD_ARRAY, sizeof(int32)/sizeof(int8) + 1);
2359  buffer->putInt(_ioid);
2360  buffer->putByte((int8)request);
2361  {
2362  Lock guard(_mutex);
2363  _status.serialize(buffer, control);
2364  }
2365 
2366  if (_status.isSuccess())
2367  {
2368  if ((QOS_GET & request) != 0)
2369  {
2370  //Lock guard(_mutex);
2371  ScopedLock lock(channelArray);
2372  _pvArray->serialize(buffer, control, 0, _pvArray->getLength());
2373  }
2374  else if ((QOS_PROCESS & request) != 0)
2375  {
2376  //Lock guard(_mutex);
2377  SerializeHelper::writeSize(_length, buffer, control);
2378  }
2379  else if ((QOS_INIT & request) != 0)
2380  {
2381  Lock guard(_mutex);
2382  control->cachedSerialize(_pvArray->getArray(), buffer);
2383  }
2384  }
2385 
2386  stopRequest();
2387 
2388  // lastRequest
2389  if ((QOS_DESTROY & request) != 0)
2390  destroy();
2391 }
2392 
2393 /****************************************************************************************/
2395  Transport::shared_pointer const & transport, int8 version, int8 command,
2396  size_t payloadSize, ByteBuffer* payloadBuffer) {
2398  transport, version, command, payloadSize, payloadBuffer);
2399 
2400  // NOTE: we do not explicitly check if transport is OK
2401  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2402 
2403  transport->ensureData(2*sizeof(int32)/sizeof(int8));
2404  const pvAccessID sid = payloadBuffer->getInt();
2405  const pvAccessID ioid = payloadBuffer->getInt();
2406 
2407  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2408  if (!channel.get())
2409  {
2410  failureResponse(transport, ioid, BaseChannelRequester::badCIDStatus);
2411  return;
2412  }
2413 
2414  Destroyable::shared_pointer request = channel->getRequest(ioid);
2415  if (!request.get())
2416  {
2417  failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus);
2418  return;
2419  }
2420  // atomic::add(request->bytesRX, payloadSize);
2421 
2422  // destroy
2423  request->destroy();
2424 
2425  // ... and remove from channel
2426  channel->unregisterRequest(ioid);
2427 }
2428 
2429 void ServerDestroyRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus)
2430 {
2431  BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage);
2432 }
2433 
2434 /****************************************************************************************/
2436  Transport::shared_pointer const & transport, int8 version, int8 command,
2437  size_t payloadSize, ByteBuffer* payloadBuffer) {
2439  transport, version, command, payloadSize, payloadBuffer);
2440 
2441  // NOTE: we do not explicitly check if transport is OK
2442  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2443 
2444  transport->ensureData(2*sizeof(int32)/sizeof(int8));
2445  const pvAccessID sid = payloadBuffer->getInt();
2446  const pvAccessID ioid = payloadBuffer->getInt();
2447 
2448  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2449  if (!channel)
2450  {
2451  failureResponse(transport, ioid, BaseChannelRequester::badCIDStatus);
2452  return;
2453  }
2454 
2455  BaseChannelRequester::shared_pointer request(channel->getRequest(ioid));
2456  if (!request)
2457  {
2458  failureResponse(transport, ioid, BaseChannelRequester::badIOIDStatus);
2459  return;
2460  }
2461  //atomic::add(request->bytesRX, payloadSize);
2462 
2463  ChannelRequest::shared_pointer cr = dynamic_pointer_cast<ChannelRequest>(request->getOperation());
2464  if (!cr)
2465  {
2466  failureResponse(transport, ioid, BaseChannelRequester::notAChannelRequestStatus);
2467  return;
2468  }
2469 
2470  cr->cancel();
2471 }
2472 
2473 void ServerCancelRequestHandler::failureResponse(Transport::shared_pointer const & transport, pvAccessID ioid, const Status& errorStatus)
2474 {
2475  BaseChannelRequester::message(transport, ioid, errorStatus.getMessage(), warningMessage);
2476 }
2477 
2478 /****************************************************************************************/
2480  Transport::shared_pointer const & transport, int8 version, int8 command,
2481  size_t payloadSize, ByteBuffer* payloadBuffer) {
2483  transport, version, command, payloadSize, payloadBuffer);
2484 
2485  // NOTE: we do not explicitly check if transport is OK
2486  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2487 
2488  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
2489  const pvAccessID sid = payloadBuffer->getInt();
2490  const pvAccessID ioid = payloadBuffer->getInt();
2491 
2492  // mode
2493  const int8 qosCode = payloadBuffer->getByte();
2494 
2495  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2496  if (!channel.get())
2497  {
2499  return;
2500  }
2501 
2502  const bool init = (QOS_INIT & qosCode) != 0;
2503  if (init)
2504  {
2505  // pvRequest
2506  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
2507 
2508  // create...
2509  ServerChannelProcessRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
2510  }
2511  else
2512  {
2513  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
2514 
2516  if (!request.get())
2517  {
2519  return;
2520  }
2521  atomic::add(request->bytesRX, payloadSize);
2522 
2523  if (!request->startRequest(qosCode))
2524  {
2526  return;
2527  }
2528 
2529  if (lastRequest)
2530  request->getChannelProcess()->lastRequest();
2531 
2532  request->getChannelProcess()->process();
2533  }
2534 }
2535 
2537  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2538  const pvAccessID ioid, Transport::shared_pointer const & transport):
2539  BaseChannelRequester(context, channel, ioid, transport), _channelProcess()
2540 {
2541 }
2542 
2543 ChannelProcessRequester::shared_pointer ServerChannelProcessRequesterImpl::create(
2544  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2545  const pvAccessID ioid, Transport::shared_pointer const & transport,PVStructure::shared_pointer const & pvRequest)
2546 {
2547  // TODO use std::make_shared
2548  std::tr1::shared_ptr<ServerChannelProcessRequesterImpl> tp(new ServerChannelProcessRequesterImpl(context, channel, ioid, transport));
2549  ChannelProcessRequester::shared_pointer thisPointer = tp;
2550  static_cast<ServerChannelProcessRequesterImpl*>(thisPointer.get())->activate(pvRequest);
2551  return thisPointer;
2552 }
2553 
2554 void ServerChannelProcessRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
2555 {
2557  shared_pointer thisPointer(shared_from_this());
2558  _channel->registerRequest(_ioid, thisPointer);
2559  INIT_EXCEPTION_GUARD(CMD_PROCESS, _channelProcess, _channel->getChannel()->createChannelProcess(thisPointer, pvRequest));
2560 }
2561 
2562 void ServerChannelProcessRequesterImpl::channelProcessConnect(const Status& status, ChannelProcess::shared_pointer const & channelProcess)
2563 {
2564  {
2565  Lock guard(_mutex);
2566  _status = status;
2567  _channelProcess = channelProcess;
2568  }
2569  TransportSender::shared_pointer thisSender = shared_from_this();
2570  _transport->enqueueSendRequest(thisSender);
2571 
2572  // self-destruction
2573  if (!status.isSuccess())
2574  {
2575  destroy();
2576  }
2577 }
2578 
2579 void ServerChannelProcessRequesterImpl::processDone(const Status& status, ChannelProcess::shared_pointer const & /*channelProcess*/)
2580 {
2581  {
2582  Lock guard(_mutex);
2583  _status = status;
2584  }
2585  TransportSender::shared_pointer thisSender = shared_from_this();
2586  _transport->enqueueSendRequest(thisSender);
2587 }
2588 
2590 {
2591  // keep a reference to ourselves as the owner
2592  // could release its reference and we don't want to be
2593  // destroyed prematurely
2594  shared_pointer self(shared_from_this());
2595 
2596  {
2597  Lock guard(_mutex);
2598  _channel->unregisterRequest(_ioid);
2599 
2600  if (_channelProcess.get())
2601  {
2602  _channelProcess->destroy();
2603  }
2604  }
2605  // TODO
2606  _channelProcess.reset();
2607 }
2608 
2610 {
2611  //Lock guard(_mutex);
2612  return _channelProcess;
2613 }
2614 
2616 {
2617  const int32 request = getPendingRequest();
2618 
2619  control->startMessage((int32)CMD_PROCESS, sizeof(int32)/sizeof(int8) + 1);
2620  buffer->putInt(_ioid);
2621  buffer->putByte((int8)request);
2622  {
2623  Lock guard(_mutex);
2624  _status.serialize(buffer, control);
2625  }
2626 
2627  stopRequest();
2628 
2629  // lastRequest
2630  if ((QOS_DESTROY & request) != 0)
2631  {
2632  destroy();
2633  }
2634 }
2635 
2636 
2637 /****************************************************************************************/
2639  Transport::shared_pointer const & transport, int8 version, int8 command,
2640  size_t payloadSize, ByteBuffer* payloadBuffer) {
2642  transport, version, command, payloadSize, payloadBuffer);
2643 
2644  // NOTE: we do not explicitly check if transport is OK
2645  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2646 
2647  transport->ensureData(2*sizeof(int32)/sizeof(int8));
2648  const pvAccessID sid = payloadBuffer->getInt();
2649  const pvAccessID ioid = payloadBuffer->getInt();
2650 
2651  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2652  if (!channel.get())
2653  {
2654  getFieldFailureResponse(transport, ioid, BaseChannelRequester::badCIDStatus);
2655  return;
2656  }
2657 
2658  string subField = SerializeHelper::deserializeString(payloadBuffer, transport.get());
2659 
2660  // issue request
2661  GetFieldRequester::shared_pointer req;
2662  {
2663  std::tr1::shared_ptr<ServerGetFieldRequesterImpl> tp(new ServerGetFieldRequesterImpl(_context, channel, ioid, transport));
2664  req = tp;
2665  }
2666 
2667  channel->installGetField(req);
2668 
2669  // TODO exception check
2670  channel->getChannel()->getField(req, subField);
2671 }
2672 
2673 void ServerGetFieldHandler::getFieldFailureResponse(Transport::shared_pointer const & transport, const pvAccessID ioid, const Status& errorStatus)
2674 {
2675  TransportSender::shared_pointer sender(new ServerGetFieldHandlerTransportSender(ioid,errorStatus,transport));
2676  transport->enqueueSendRequest(sender);
2677 }
2678 
2680  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2681  const pvAccessID ioid, Transport::shared_pointer const & transport) :
2682  BaseChannelRequester(context, channel, ioid, transport), done(false)
2683 {
2684 }
2685 
2687 {
2688  bool twice;
2689  {
2690  Lock guard(_mutex);
2691  _status = status;
2692  _field = field;
2693  twice = done;
2694  done = true;
2695  }
2696  if(!twice) {
2697  TransportSender::shared_pointer thisSender = shared_from_this();
2698  _transport->enqueueSendRequest(thisSender);
2699  }
2700  _channel->completeGetField(this);
2701 }
2702 
2704 {
2705 }
2706 
2708 {
2709  control->startMessage((int8)CMD_GET_FIELD, sizeof(int32)/sizeof(int8));
2710  buffer->putInt(_ioid);
2711  {
2712  Lock guard(_mutex);
2713  _status.serialize(buffer, control);
2714  if (_status.isSuccess())
2715  control->cachedSerialize(_field, buffer);
2716  }
2717 }
2718 
2719 /****************************************************************************************/
2721  Transport::shared_pointer const & transport, int8 version, int8 command,
2722  size_t payloadSize, ByteBuffer* payloadBuffer) {
2724  transport, version, command, payloadSize, payloadBuffer);
2725 
2726  // NOTE: we do not explicitly check if transport is OK
2727  detail::BlockingServerTCPTransportCodec* casTransport(static_cast<detail::BlockingServerTCPTransportCodec*>(transport.get()));
2728 
2729  transport->ensureData(2*sizeof(int32)/sizeof(int8)+1);
2730  const pvAccessID sid = payloadBuffer->getInt();
2731  const pvAccessID ioid = payloadBuffer->getInt();
2732 
2733  // mode
2734  const int8 qosCode = payloadBuffer->getByte();
2735 
2736  ServerChannel::shared_pointer channel = casTransport->getChannel(sid);
2737  if (!channel.get())
2738  {
2740  return;
2741  }
2742 
2743  const bool init = (QOS_INIT & qosCode) != 0;
2744  if (init)
2745  {
2746  // pvRequest
2747  PVStructure::shared_pointer pvRequest(SerializationHelper::deserializePVRequest(payloadBuffer, transport.get()));
2748 
2749  // create...
2750  ServerChannelRPCRequesterImpl::create(_context, channel, ioid, transport, pvRequest);
2751  }
2752  else
2753  {
2754  const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
2755 
2757  if (!request.get())
2758  {
2760  return;
2761  }
2762  atomic::add(request->bytesRX, payloadSize);
2763 
2764  if (!request->startRequest(qosCode))
2765  {
2767  return;
2768  }
2769 
2770  // deserialize put data
2771  ChannelRPC::shared_pointer channelRPC = request->getChannelRPC();
2772  // pvArgument
2773  PVStructure::shared_pointer pvArgument;
2774 
2776  pvArgument = SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get());
2777  );
2778 
2779  if (lastRequest)
2780  channelRPC->lastRequest();
2781 
2782  channelRPC->request(pvArgument);
2783  }
2784 }
2785 
2787  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2788  const pvAccessID ioid, Transport::shared_pointer const & transport):
2789  BaseChannelRequester(context, channel, ioid, transport),
2790  _channelRPC(), _pvResponse()
2791  ,_status(Status::fatal("Invalid State"))
2792 
2793 {
2794 }
2795 
2796 ChannelRPCRequester::shared_pointer ServerChannelRPCRequesterImpl::create(
2797  ServerContextImpl::shared_pointer const & context, ServerChannel::shared_pointer const & channel,
2798  const pvAccessID ioid, Transport::shared_pointer const & transport, PVStructure::shared_pointer const & pvRequest)
2799 {
2800  // TODO use std::make_shared
2801  std::tr1::shared_ptr<ServerChannelRPCRequesterImpl> tp(new ServerChannelRPCRequesterImpl(context, channel, ioid, transport));
2802  tp->activate(pvRequest);
2803  return tp;
2804 }
2805 
2806 void ServerChannelRPCRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest)
2807 {
2809  shared_pointer thisPointer(shared_from_this());
2810  _channel->registerRequest(_ioid, thisPointer);
2811  INIT_EXCEPTION_GUARD(CMD_RPC, _channelRPC, _channel->getChannel()->createChannelRPC(thisPointer, pvRequest));
2812 }
2813 
2814 void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, ChannelRPC::shared_pointer const & channelRPC)
2815 {
2816  {
2817  Lock guard(_mutex);
2818  _status = status;
2819  _channelRPC = channelRPC;
2820  }
2821  TransportSender::shared_pointer thisSender = shared_from_this();
2822  _transport->enqueueSendRequest(thisSender);
2823 
2824  // self-destruction
2825  if (!status.isSuccess())
2826  {
2827  destroy();
2828  }
2829 }
2830 
2831 void ServerChannelRPCRequesterImpl::requestDone(const Status& status, ChannelRPC::shared_pointer const & /*channelRPC*/, PVStructure::shared_pointer const & pvResponse)
2832 {
2833  {
2834  Lock guard(_mutex);
2835  _status = status;
2836  _pvResponse = pvResponse;
2837  }
2838  TransportSender::shared_pointer thisSender = shared_from_this();
2839  _transport->enqueueSendRequest(thisSender);
2840 }
2841 
2843 {
2844  // keep a reference to ourselves as the owner
2845  // could release its reference and we don't want to be
2846  // destroyed prematurely
2847  shared_pointer self(shared_from_this());
2848 
2849  {
2850  Lock guard(_mutex);
2851  _channel->unregisterRequest(_ioid);
2852 
2853  if (_channelRPC.get())
2854  {
2855  _channelRPC->destroy();
2856  }
2857  }
2858  // TODO
2859  _channelRPC.reset();
2860 }
2861 
2863 {
2864  //Lock guard(_mutex);
2865  return _channelRPC;
2866 }
2867 
2869 {
2870  const int32 request = getPendingRequest();
2871 
2872  control->startMessage((int32)CMD_RPC, sizeof(int32)/sizeof(int8) + 1);
2873  buffer->putInt(_ioid);
2874  buffer->putByte((int8)request);
2875 
2876  {
2877  Lock guard(_mutex);
2878  _status.serialize(buffer, control);
2879 
2880  if (_status.isSuccess())
2881  {
2882  if ((QOS_INIT & request) != 0)
2883  {
2884  // noop
2885  }
2886  else
2887  {
2888  SerializationHelper::serializeStructureFull(buffer, control, _pvResponse);
2889  }
2890  }
2891  _status = Status::fatal("Stale state");
2892  }
2893 
2894  stopRequest();
2895 
2896  // lastRequest
2897  if ((QOS_DESTROY & request) != 0)
2898  destroy();
2899 }
2900 
2901 }
2902 }
int8_t int8
Definition: pvType.h:75
static ChannelPutRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
epics::pvData::PVStructure::shared_pointer getPutGetPVStructure()
void fatal(char *msg) NORETURN
Definition: error.c:15
virtual void setRecipient(osiSockAddr const &sendTo)=0
unsigned epicsStdCall sockAddrToDottedIP(const struct sockaddr *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:118
std::tr1::shared_ptr< ServerChannelRPCRequesterImpl > shared_pointer
virtual void putGetDone(const epics::pvData::Status &status, ChannelPutGet::shared_pointer const &channelPutGet, epics::pvData::PVStructure::shared_pointer const &pvStructure, epics::pvData::BitSet::shared_pointer const &bitSet) OVERRIDE FINAL
PVScalar is the base class for each scalar field.
Definition: pvData.h:272
pvac::PutEvent result
Definition: clientSync.cpp:117
C++ wrapper for epicsEvent from EPICS base.
Definition: event.h:31
std::string request
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
ServerGetFieldRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
std::tr1::shared_ptr< ServerChannelGetRequesterImpl > shared_pointer
epicsInt32 pvAccessID
Definition: pvaDefs.h:18
std::tr1::shared_ptr< detail::SharedPut > put
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
const std::tr1::shared_ptr< ServerChannel > _channel
epics::pvData::PVStructure::shared_pointer getPutPVStructure()
epicsShareExtern std::string getMessageTypeName(MessageType messageType)
Definition: requester.cpp:25
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
Information provded by a client to a server-type ChannelProvider.
Definition: security.h:119
EPICS_ALWAYS_INLINE int8 getByte()
Definition: byteBuffer.h:617
virtual void getGetDone(const epics::pvData::Status &status, ChannelPutGet::shared_pointer const &channelPutGet, epics::pvData::PVStructure::shared_pointer const &pvStructure, epics::pvData::BitSet::shared_pointer const &bitSet) OVERRIDE FINAL
pvd::Status status
virtual void getDone(const epics::pvData::Status &status, ChannelGet::shared_pointer const &channelGet, epics::pvData::PVStructure::shared_pointer const &pvStructure, epics::pvData::BitSet::shared_pointer const &bitSet) OVERRIDE FINAL
An EPICS-specific replacement for ANSI C&#39;s assert.
static std::string deserializeString(ByteBuffer *buffer, DeserializableControl *control)
static ChannelProcessRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
epics::pvData::PVArray::shared_pointer getPVArray()
static Status Ok
Definition: status.h:47
static Status error(const std::string &m)
Definition: status.h:50
int i
Definition: scan.c:967
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const char * getBuffer() const
Definition: byteBuffer.h:294
struct sockaddr sa
Definition: osiSock.h:158
epics::pvData::BitSet::shared_pointer getPutGetBitSet()
#define min(x, y)
Definition: flexdef.h:78
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer)
virtual void getPutDone(const epics::pvData::Status &status, ChannelPutGet::shared_pointer const &channelPutGet, epics::pvData::PVStructure::shared_pointer const &pvStructure, epics::pvData::BitSet::shared_pointer const &bitSet) OVERRIDE FINAL
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
#define INIT_EXCEPTION_GUARD(cmd, var, code)
struct sockaddr_in ia
Definition: osiSock.h:157
virtual void putDone(const epics::pvData::Status &status, ChannelPut::shared_pointer const &channelPut) OVERRIDE FINAL
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
bool startRequest(epics::pvData::int32 qos)
ServerChannelProcessRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
static const epics::pvData::Status otherRequestPendingStatus
std::size_t getLimit() const
Definition: byteBuffer.h:368
virtual void channelStateChange(Channel::shared_pointer const &c, const Channel::ConnectionState isConnected) OVERRIDE FINAL
Definition: memory.hpp:41
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
A lock for multithreading.
Definition: lock.h:36
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
#define NULL
Definition: catime.c:38
A vector of bits.
Definition: bitSet.h:56
static void writeSize(std::size_t s, ByteBuffer *buffer, SerializableControl *flusher)
ServerChannelPutRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
static std::size_t readSize(ByteBuffer *buffer, DeserializableControl *control)
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual void channelCreated(const epics::pvData::Status &status, Channel::shared_pointer const &channel) OVERRIDE FINAL
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
std::size_t getPosition() const
Definition: byteBuffer.h:346
virtual void getLengthDone(const epics::pvData::Status &status, ChannelArray::shared_pointer const &channelArray, std::size_t length) OVERRIDE FINAL
static ChannelArrayRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
static epics::pvData::PVField::shared_pointer deserializeFull(epics::pvData::ByteBuffer *payloadBuffer, epics::pvData::DeserializableControl *control)
const std::string & getMessage() const
Definition: status.h:80
const Transport::shared_pointer _transport
const epics::pvData::PVStructurePtr pvStructurePtr
Definition: monitor.h:58
void copy(PVValueArray< T > &pvFrom, size_t fromOffset, size_t fromStride, PVValueArray< T > &pvTo, size_t toOffset, size_t toStride, size_t count)
Copy a subarray from one scalar array to another.
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
static const epics::pvData::Status badIOIDStatus
ServerChannelRequesterImpl(Transport::shared_pointer const &transport, const std::string channelName, const pvAccessID cid)
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual void channelPutConnect(const epics::pvData::Status &status, ChannelPut::shared_pointer const &channelPut, epics::pvData::Structure::const_shared_pointer const &structure) OVERRIDE FINAL
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
Definition: pvData.h:124
EPICS_ALWAYS_INLINE int32 getInt()
Definition: byteBuffer.h:629
std::tr1::shared_ptr< ServerChannelArrayRequesterImpl > shared_pointer
Holds all PVA related.
Definition: pvif.h:34
LIBCOM_API size_t epicsStdCall epicsTimeToStrftime(char *pBuff, size_t bufLength, const char *pFormat, const epicsTimeStamp *pTS)
Convert epicsTimeStamp to string. See epicsTime::strftime()
Definition: epicsTime.cpp:1120
PVString is special case, since it implements SerializableArray.
Definition: pvData.h:521
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
#define LOG(level, format,...)
Definition: logger.h:48
void authNZInitialize(const std::string &securityPluginName, const epics::pvData::PVStructure::shared_pointer &data)
Definition: codec.cpp:1663
ServerChannelRPCRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
virtual void getDone(const epics::pvData::Status &status, epics::pvData::FieldConstPtr const &field) OVERRIDE FINAL
virtual void channelGetConnect(const epics::pvData::Status &status, ChannelGet::shared_pointer const &channelGet, epics::pvData::Structure::const_shared_pointer const &structure) OVERRIDE FINAL
virtual void channelProcessConnect(const epics::pvData::Status &status, ChannelProcess::shared_pointer const &channelProcess) OVERRIDE FINAL
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
pvData
Definition: monitor.h:428
virtual std::tr1::shared_ptr< const PeerInfo > getPeerInfo() OVERRIDE FINAL
Return information on requesting peer if applicable.
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual std::tr1::shared_ptr< const PeerInfo > getPeerInfo() OVERRIDE FINAL
Return information on connected peer if applicable.
void serialize(ByteBuffer *buffer, SerializableControl *flusher) const
Definition: status.cpp:45
virtual void putArrayDone(const epics::pvData::Status &status, ChannelArray::shared_pointer const &channelArray) OVERRIDE FINAL
std::tr1::shared_ptr< ServerChannel > getChannel(pvAccessID sid)
Definition: codec.cpp:1486
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
#define DESERIALIZE_EXCEPTION_GUARD(code)
template class for all extensions of PVArray.
Definition: pvData.h:55
static Status fatal(const std::string &m)
Definition: status.h:51
virtual void unlisten(Monitor::shared_pointer const &monitor) OVERRIDE FINAL
virtual void processDone(const epics::pvData::Status &status, ChannelProcess::shared_pointer const &channelProcess) OVERRIDE FINAL
ServerChannelPutGetRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
void encodeAsIPv6Address(ByteBuffer *buffer, const osiSockAddr *address)
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void requestDone(const epics::pvData::Status &status, ChannelRPC::shared_pointer const &channelRPC, epics::pvData::PVStructure::shared_pointer const &pvResponse) OVERRIDE FINAL
virtual void channelRPCConnect(const epics::pvData::Status &status, ChannelRPC::shared_pointer const &channelRPC) OVERRIDE FINAL
epicsShareFunc Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const &provider, std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, RPCServiceAsync::shared_pointer const &rpcService)
Definition: rpcServer.cpp:229
const std::string & getStackDump() const
Definition: status.h:86
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
static const epics::pvData::Status badCIDStatus
bool isSuccess() const
Definition: status.h:103
HexDump & limit(size_t n=(size_t)-1)
safety limit on max bytes printed
Definition: hexDump.h:44
virtual void verified(epics::pvData::Status const &status) OVERRIDE FINAL
Definition: codec.h:511
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void getDone(const epics::pvData::Status &status, ChannelPut::shared_pointer const &channelPut, epics::pvData::PVStructure::shared_pointer const &pvStructure, epics::pvData::BitSet::shared_pointer const &bitSet) OVERRIDE FINAL
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
static void serializeStructureFull(epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control, epics::pvData::PVStructure::shared_pointer const &pvStructure)
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
EPICS_ALWAYS_INLINE int16 getShort()
Definition: byteBuffer.h:623
virtual void setLengthDone(const epics::pvData::Status &status, ChannelArray::shared_pointer const &channelArray) OVERRIDE FINAL
const epics::pvData::BitSet::shared_pointer overrunBitSet
Definition: monitor.h:60
void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
POINTER_DEFINITIONS(ChannelFindRequester)
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
static void serializeString(const std::string &value, ByteBuffer *buffer, SerializableControl *flusher)
std::tr1::shared_ptr< ServerChannelPutRequesterImpl > shared_pointer
virtual void monitorConnect(const epics::pvData::Status &status, Monitor::shared_pointer const &monitor, epics::pvData::StructureConstPtr const &structure) OVERRIDE FINAL
Data interface for a structure,.
Definition: pvData.h:712
std::size_t getRemaining() const
Definition: byteBuffer.h:391
std::tr1::shared_ptr< const Field > FieldConstPtr
Definition: pvIntrospect.h:137
static epics::pvData::PVStructure::shared_pointer deserializeStructureFull(epics::pvData::ByteBuffer *payloadBuffer, epics::pvData::DeserializableControl *control)
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void channelPutGetConnect(const epics::pvData::Status &status, ChannelPutGet::shared_pointer const &channelPutGet, epics::pvData::Structure::const_shared_pointer const &putStructure, epics::pvData::Structure::const_shared_pointer const &getStructure) OVERRIDE FINAL
virtual void cachedSerialize(std::tr1::shared_ptr< const Field > const &field, ByteBuffer *buffer)=0
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
bool decodeAsIPv6Address(ByteBuffer *buffer, osiSockAddr *address)
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
const epics::pvData::BitSet::shared_pointer changedBitSet
Definition: monitor.h:59
static ChannelGetRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
#define GETPID()
static epics::pvData::PVStructure::shared_pointer deserializePVRequest(epics::pvData::ByteBuffer *payloadBuffer, epics::pvData::DeserializableControl *control)
virtual void lock()
Definition: pvAccess.h:97
virtual void getArrayDone(const epics::pvData::Status &status, ChannelArray::shared_pointer const &channelArray, epics::pvData::PVArray::shared_pointer const &pvArray) OVERRIDE FINAL
#define THROW_BASE_EXCEPTION(msg)
PVStringArray::const_svector channelNames
virtual void message(std::string const &message, epics::pvData::MessageType messageType) OVERRIDE FINAL
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
ServerChannelGetRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
virtual void channelListResult(const epics::pvData::Status &status, ChannelFind::shared_pointer const &channelFind, PVStringArray::const_svector const &channelNames, bool hasDynamic)
void done(int k)
Definition: antelope.c:77
std::tr1::shared_ptr< ServerChannelProcessRequesterImpl > shared_pointer
std::tr1::shared_ptr< ServerMonitorRequesterImpl > shared_pointer
std::tr1::shared_ptr< ServerChannelPutGetRequesterImpl > shared_pointer
static const std::string SUPPORTED_PROTOCOL
ServerContextImpl::shared_pointer _context
Definition: tool_lib.h:64
ServerChannelArrayRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
static void sendFailureMessage(const epics::pvData::int8 command, Transport::shared_pointer const &transport, const pvAccessID ioid, const epics::pvData::int8 qos, const epics::pvData::Status status)
virtual void ensureBuffer(std::size_t size)=0
ServerChannelFindRequesterImpl(ServerContextImpl::shared_pointer const &context, const PeerInfo::const_shared_pointer &peer, epics::pvData::int32 expectedResponseCount)
ServerMonitorRequesterImpl(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport)
detail::pick_type< int8_t, signed char, detail::pick_type< uint8_t, char, unsigned char >::type >::type boolean
Definition: pvType.h:71
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:808
ServerChannelFindRequesterImpl * set(std::string _name, epics::pvData::int32 searchSequenceId, epics::pvData::int32 cid, osiSockAddr const &sendTo, bool responseRequired, bool serverSearch)
static ChannelRequester::shared_pointer create(ChannelProvider::shared_pointer const &provider, Transport::shared_pointer const &transport, const std::string channelName, const pvAccessID cid)
std::string * get(StringArray &value)
Definition: pvType.h:112
int16_t int16
Definition: pvType.h:79
ChannelPut::shared_pointer op
Definition: pvAccess.cpp:132
const epics::pvData::uint32 MAX_CHANNEL_NAME_LENGTH
Definition: pvaConstants.h:73
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
virtual const std::string & getRemoteName() const OVERRIDE FINAL
Definition: codec.h:330
epicsMutex Mutex
Definition: lock.h:28
ServerSearchHandler(ServerContextImpl::shared_pointer const &context)
PVArray is the base class for all array types.
Definition: pvData.h:551
epics::pvData::BitSet::shared_pointer getPutBitSet()
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0)=0
virtual void handleResponse(osiSockAddr *responseFrom, Transport::shared_pointer const &transport, epics::pvData::int8 version, epics::pvData::int8 command, std::size_t payloadSize, epics::pvData::ByteBuffer *payloadBuffer) OVERRIDE FINAL
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
static ChannelPutGetRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
static shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
EPICS_ALWAYS_INLINE void putShort(int16 value)
Definition: byteBuffer.h:531
virtual void monitorEvent(Monitor::shared_pointer const &monitor) OVERRIDE FINAL
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
std::tr1::shared_ptr< ServerContextImpl > shared_pointer
void activate(epics::pvData::PVStructure::shared_pointer const &pvRequest)
virtual std::string getRequesterName() OVERRIDE FINAL
#define IS_LOGGABLE(level)
Definition: logger.h:51
virtual void channelFindResult(const epics::pvData::Status &status, ChannelFind::shared_pointer const &channelFind, bool wasFound) OVERRIDE FINAL
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144
static ChannelRPCRequester::shared_pointer create(ServerContextImpl::shared_pointer const &context, std::tr1::shared_ptr< ServerChannel > const &channel, const pvAccessID ioid, Transport::shared_pointer const &transport, epics::pvData::PVStructure::shared_pointer const &pvRequest)
virtual void message(std::string const &message, epics::pvData::MessageType messageType) OVERRIDE FINAL
virtual void channelArrayConnect(const epics::pvData::Status &status, ChannelArray::shared_pointer const &channelArray, epics::pvData::Array::const_shared_pointer const &array) OVERRIDE FINAL
static const epics::pvData::Status notAChannelRequestStatus