This is Unofficial EPICS BASE Doxygen Site
pipelineServer.cpp
Go to the documentation of this file.
1 
7 #include <stdexcept>
8 #include <vector>
9 #include <queue>
10 #include <utility>
11 
12 #define epicsExportSharedSymbols
13 #include <pv/pipelineServer.h>
14 #include <pv/wildcard.h>
15 
16 using namespace epics::pvData;
17 using namespace std;
18 
19 namespace {
20 using namespace epics::pvAccess;
21 
22 class ChannelPipelineMonitorImpl :
23  public PipelineMonitor,
24  public PipelineControl,
25  public std::tr1::enable_shared_from_this<ChannelPipelineMonitorImpl>
26 {
27 private:
28 
29  typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
30  typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
31 
32  Channel::shared_pointer m_channel;
33  MonitorRequester::shared_pointer m_monitorRequester;
34  PipelineSession::shared_pointer m_pipelineSession;
35 
36  size_t m_queueSize;
37 
38  FreeElementQueue m_freeQueue;
39  MonitorElementQueue m_monitorQueue;
40 
41  Mutex m_freeQueueLock;
42  Mutex m_monitorQueueLock;
43 
44  bool m_active;
45  MonitorElement::shared_pointer m_nullMonitorElement;
46 
47  size_t m_requestedCount;
48 
49  bool m_pipeline;
50 
51  bool m_done;
52 
53  bool m_unlistenReported;
54 
55 public:
56  ChannelPipelineMonitorImpl(
57  Channel::shared_pointer const & channel,
58  MonitorRequester::shared_pointer const & monitorRequester,
59  epics::pvData::PVStructure::shared_pointer const & pvRequest,
60  PipelineService::shared_pointer const & pipelineService) :
61  m_channel(channel),
62  m_monitorRequester(monitorRequester),
63  m_queueSize(2),
64  m_freeQueueLock(),
65  m_monitorQueueLock(),
66  m_active(false),
67  m_requestedCount(0),
68  m_pipeline(false),
69  m_done(false),
70  m_unlistenReported(false)
71  {
72 
73  m_pipelineSession = pipelineService->createPipeline(pvRequest);
74 
75  // extract queueSize and pipeline parameter
76  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
77  if (pvOptions) {
78  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
79  if (pvString) {
80  int32 size;
81  std::stringstream ss;
82  ss << pvString->get();
83  ss >> size;
84  if (size > 1)
85  m_queueSize = static_cast<size_t>(size);
86  }
87  pvString = pvOptions->getSubField<PVString>("pipeline");
88  if (pvString)
89  m_pipeline = (pvString->get() == "true");
90  }
91 
92  // server queue size must be >= client queue size
93  size_t minQueueSize = m_pipelineSession->getMinQueueSize();
94  if (m_queueSize < minQueueSize)
95  m_queueSize = minQueueSize;
96 
97  Structure::const_shared_pointer structure = m_pipelineSession->getStructure();
98 
99  // create free elements
100  {
101  Lock guard(m_freeQueueLock);
102  m_freeQueue.reserve(m_queueSize);
103  for (size_t i = 0; i < m_queueSize; i++)
104  {
105  PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure);
106  MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure));
107  // we always send all
108  monitorElement->changedBitSet->set(0);
109  m_freeQueue.push_back(monitorElement);
110  }
111  }
112  }
113 
114  PipelineSession::shared_pointer getPipelineSession() const {
115  return m_pipelineSession;
116  }
117 
118  bool isPipelineEnabled() const {
119  return m_pipeline;
120  }
121 
122  virtual ~ChannelPipelineMonitorImpl()
123  {
124  destroy();
125  }
126 
127  virtual Status start()
128  {
129  bool notify = false;
130  {
131  Lock guard(m_monitorQueueLock);
132 
133  // already started
134  if (m_active)
135  return Status::Ok;
136  m_active = true;
137 
138  notify = (m_monitorQueue.size() != 0);
139  }
140 
141  if (notify)
142  {
143  Monitor::shared_pointer thisPtr = shared_from_this();
144  m_monitorRequester->monitorEvent(thisPtr);
145  }
146 
147  return Status::Ok;
148  }
149 
150  virtual Status stop()
151  {
152  Lock guard(m_monitorQueueLock);
153  m_active = false;
154  return Status::Ok;
155  }
156 
157  // get next free element
158  virtual MonitorElement::shared_pointer poll()
159  {
160  Lock guard(m_monitorQueueLock);
161 
162  // do not give send more elements than m_requestedCount
163  // even if m_monitorQueue is not empty
164  bool emptyQueue = m_monitorQueue.empty();
165  if (emptyQueue || m_requestedCount == 0 || !m_active)
166  {
167  // report "unlisten" event if queue empty and done, release lock first
168  if (!m_unlistenReported && m_done && emptyQueue)
169  {
170  m_unlistenReported = true;
171  guard.unlock();
172  m_monitorRequester->unlisten(shared_from_this());
173  }
174 
175  return m_nullMonitorElement;
176  }
177 
178  MonitorElement::shared_pointer element = m_monitorQueue.front();
179  m_monitorQueue.pop();
180 
181  m_requestedCount--;
182 
183  return element;
184  }
185 
186  virtual void release(MonitorElement::shared_pointer const & monitorElement)
187  {
188  Lock guard(m_freeQueueLock);
189  m_freeQueue.push_back(monitorElement);
190  }
191 
192  virtual void reportRemoteQueueStatus(int32 freeElements)
193  {
194  // TODO check
195  size_t count = static_cast<size_t>(freeElements);
196 
197  //std::cout << "reportRemoteQueueStatus(" << count << ')' << std::endl;
198 
199  bool notify = false;
200  {
201  Lock guard(m_monitorQueueLock);
202  m_requestedCount += count;
203  notify = m_active && (m_monitorQueue.size() != 0);
204  }
205 
206  // notify
207  // TODO too many notify calls?
208  if (notify)
209  {
210  Monitor::shared_pointer thisPtr = shared_from_this();
211  m_monitorRequester->monitorEvent(thisPtr);
212  }
213 
214  m_pipelineSession->request(shared_from_this(), count);
215  }
216 
217  virtual void destroy()
218  {
219  bool notifyCancel = false;
220 
221  {
222  Lock guard(m_monitorQueueLock);
223  m_active = false;
224  notifyCancel = !m_done;
225  m_done = true;
226  }
227 
228  if (notifyCancel)
229  m_pipelineSession->cancel();
230  }
231 
232  virtual size_t getFreeElementCount() {
233  Lock guard(m_freeQueueLock);
234  return m_freeQueue.size();
235  }
236 
237  virtual size_t getRequestedCount() {
238  // TODO consider using atomic ops
239  Lock guard(m_monitorQueueLock);
240  return m_requestedCount;
241  }
242 
243  virtual MonitorElement::shared_pointer getFreeElement() {
244  Lock guard(m_freeQueueLock);
245  if (m_freeQueue.empty())
246  return m_nullMonitorElement;
247 
248  MonitorElement::shared_pointer freeElement = m_freeQueue.back();
249  m_freeQueue.pop_back();
250 
251  return freeElement;
252  }
253 
254  virtual void putElement(MonitorElement::shared_pointer const & element) {
255 
256  bool notify = false;
257  {
258  Lock guard(m_monitorQueueLock);
259  if (m_done)
260  return;
261  // throw std::logic_error("putElement called after done");
262 
263  m_monitorQueue.push(element);
264  // TODO there is way to much of notification, per each putElement
265  notify = (m_requestedCount != 0);
266  }
267 
268  // notify
269  if (notify)
270  {
271  Monitor::shared_pointer thisPtr = shared_from_this();
272  m_monitorRequester->monitorEvent(thisPtr);
273  }
274  }
275 
276  virtual void done() {
277  Lock guard(m_monitorQueueLock);
278  m_done = true;
279 
280  bool report = !m_unlistenReported && m_monitorQueue.empty();
281  if (report)
282  m_unlistenReported = true;
283 
284  guard.unlock();
285 
286  if (report)
287  m_monitorRequester->unlisten(shared_from_this());
288  }
289 
290 };
291 
292 
293 class PipelineChannel :
294  public Channel,
295  public std::tr1::enable_shared_from_this<PipelineChannel>
296 {
297 private:
298 
299  static Status notSupportedStatus;
300  static Status destroyedStatus;
301 
302  AtomicBoolean m_destroyed;
303 
304  ChannelProvider::shared_pointer m_provider;
305  string m_channelName;
306  ChannelRequester::shared_pointer m_channelRequester;
307 
308  PipelineService::shared_pointer m_pipelineService;
309 
310 public:
311  POINTER_DEFINITIONS(PipelineChannel);
312 
313  PipelineChannel(
314  ChannelProvider::shared_pointer const & provider,
315  string const & channelName,
316  ChannelRequester::shared_pointer const & channelRequester,
317  PipelineService::shared_pointer const & pipelineService) :
318  m_provider(provider),
319  m_channelName(channelName),
320  m_channelRequester(channelRequester),
321  m_pipelineService(pipelineService)
322  {
323  }
324 
325  virtual ~PipelineChannel()
326  {
327  destroy();
328  }
329 
330  virtual std::tr1::shared_ptr<ChannelProvider> getProvider()
331  {
332  return m_provider;
333  }
334 
335  virtual std::string getRemoteAddress()
336  {
337  // local
338  return getChannelName();
339  }
340 
341  virtual ConnectionState getConnectionState()
342  {
343  return m_destroyed.get() ?
344  Channel::DESTROYED :
345  Channel::CONNECTED;
346  }
347 
348  virtual std::string getChannelName()
349  {
350  return m_channelName;
351  }
352 
353  virtual std::tr1::shared_ptr<ChannelRequester> getChannelRequester()
354  {
355  return m_channelRequester;
356  }
357 
358  virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/)
359  {
360  return none;
361  }
362 
363  virtual Monitor::shared_pointer createMonitor(
364  MonitorRequester::shared_pointer const & monitorRequester,
365  epics::pvData::PVStructure::shared_pointer const & pvRequest)
366  {
367  if (!pvRequest)
368  throw std::invalid_argument("pvRequest == null");
369 
370  if (m_destroyed.get())
371  {
372  Monitor::shared_pointer nullPtr;
373  epics::pvData::Structure::const_shared_pointer nullStructure;
374  monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure);
375  return nullPtr;
376  }
377 
378  // TODO use std::make_shared
379  std::tr1::shared_ptr<ChannelPipelineMonitorImpl> tp(
380  new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService)
381  );
382  Monitor::shared_pointer channelPipelineMonitorImpl = tp;
383 
384  if (tp->isPipelineEnabled())
385  {
386  monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure());
387  return channelPipelineMonitorImpl;
388  }
389  else
390  {
391  Monitor::shared_pointer nullPtr;
392  epics::pvData::Structure::const_shared_pointer nullStructure;
393  Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining");
394  monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure);
395  return nullPtr;
396  }
397  }
398 
399  virtual ChannelArray::shared_pointer createChannelArray(
400  ChannelArrayRequester::shared_pointer const & channelArrayRequester,
401  epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
402  {
403  ChannelArray::shared_pointer nullPtr;
404  channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer());
405  return nullPtr;
406  }
407 
408 
409  virtual void printInfo(std::ostream& out)
410  {
411  out << "PipelineChannel: ";
412  out << getChannelName();
413  out << " [";
414  out << Channel::ConnectionStateNames[getConnectionState()];
415  out << "]";
416  }
417 
418  virtual string getRequesterName()
419  {
420  return getChannelName();
421  }
422 
423  virtual void destroy()
424  {
425  m_destroyed.set();
426  }
427 };
428 
429 Status PipelineChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only monitor (aka pipeline) requests are supported by this channel");
430 Status PipelineChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed");
431 
432 } // namespace
433 namespace epics {
434 namespace pvAccess {
435 
436 Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider,
437  std::string const & channelName,
438  ChannelRequester::shared_pointer const & channelRequester,
439  PipelineService::shared_pointer const & pipelineService)
440 {
441  // TODO use std::make_shared
442  std::tr1::shared_ptr<PipelineChannel> tp(
443  new PipelineChannel(provider, channelName, channelRequester, pipelineService)
444  );
445  Channel::shared_pointer channel = tp;
446  return channel;
447 }
448 
450  public virtual ChannelProvider,
451  public virtual ChannelFind,
452  public std::tr1::enable_shared_from_this<PipelineChannelProvider> {
453 
454 public:
456 
457  static const string PROVIDER_NAME;
458 
460 
461  // TODO thread pool support
462 
464  }
465 
466  virtual string getProviderName() {
467  return PROVIDER_NAME;
468  }
469 
470  virtual std::tr1::shared_ptr<ChannelProvider> getChannelProvider()
471  {
472  return shared_from_this();
473  }
474 
475  virtual void cancel() {}
476 
477  virtual void destroy() {}
478 
479  virtual ChannelFind::shared_pointer channelFind(std::string const & channelName,
480  ChannelFindRequester::shared_pointer const & channelFindRequester)
481  {
482  bool found;
483  {
484  Lock guard(m_mutex);
485  found = (m_services.find(channelName) != m_services.end()) ||
486  findWildService(channelName);
487  }
488  ChannelFind::shared_pointer thisPtr(shared_from_this());
489  channelFindRequester->channelFindResult(Status::Ok, thisPtr, found);
490  return thisPtr;
491  }
492 
493 
494  virtual ChannelFind::shared_pointer channelList(
495  ChannelListRequester::shared_pointer const & channelListRequester)
496  {
497  if (!channelListRequester.get())
498  throw std::runtime_error("null requester");
499 
500  PVStringArray::svector channelNames;
501  {
502  Lock guard(m_mutex);
503  channelNames.reserve(m_services.size());
504  for (PipelineServiceMap::const_iterator iter = m_services.begin();
505  iter != m_services.end();
506  iter++)
507  channelNames.push_back(iter->first);
508  }
509 
510  ChannelFind::shared_pointer thisPtr(shared_from_this());
511  channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false);
512  return thisPtr;
513  }
514 
515  virtual Channel::shared_pointer createChannel(
516  std::string const & channelName,
517  ChannelRequester::shared_pointer const & channelRequester,
518  short /*priority*/)
519  {
520  PipelineService::shared_pointer service;
521 
522  PipelineServiceMap::const_iterator iter;
523  {
524  Lock guard(m_mutex);
525  iter = m_services.find(channelName);
526  }
527  if (iter != m_services.end())
528  service = iter->second;
529 
530  // check for wild services
531  if (!service)
532  service = findWildService(channelName);
533 
534  if (!service)
535  {
536  Channel::shared_pointer nullChannel;
537  channelRequester->channelCreated(noSuchChannelStatus, nullChannel);
538  return nullChannel;
539  }
540 
541  // TODO use std::make_shared
542  std::tr1::shared_ptr<PipelineChannel> tp(
543  new PipelineChannel(
544  shared_from_this(),
545  channelName,
546  channelRequester,
547  service));
548  Channel::shared_pointer pipelineChannel = tp;
549  channelRequester->channelCreated(Status::Ok, pipelineChannel);
550  return pipelineChannel;
551  }
552 
553  virtual Channel::shared_pointer createChannel(
554  std::string const & /*channelName*/,
555  ChannelRequester::shared_pointer const & /*channelRequester*/,
556  short /*priority*/,
557  std::string const & /*address*/)
558  {
559  // this will never get called by the pvAccess server
560  throw std::runtime_error("not supported");
561  }
562 
563  void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service)
564  {
565  Lock guard(m_mutex);
566  m_services[serviceName] = service;
567 
568  if (isWildcardPattern(serviceName))
569  m_wildServices.push_back(std::make_pair(serviceName, service));
570  }
571 
572  void unregisterService(std::string const & serviceName)
573  {
574  Lock guard(m_mutex);
575  m_services.erase(serviceName);
576 
577  if (isWildcardPattern(serviceName))
578  {
579  for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
580  iter != m_wildServices.end();
581  iter++)
582  if (iter->first == serviceName)
583  {
584  m_wildServices.erase(iter);
585  break;
586  }
587  }
588  }
589 
590 private:
591  // assumes sync on services
592  PipelineService::shared_pointer findWildService(string const & wildcard)
593  {
594  if (!m_wildServices.empty())
595  for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
596  iter != m_wildServices.end();
597  iter++)
598  if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
599  return iter->second;
600 
601  return PipelineService::shared_pointer();
602  }
603 
604  // (too) simple check
605  bool isWildcardPattern(string const & pattern)
606  {
607  return
608  (pattern.find('*') != string::npos ||
609  pattern.find('?') != string::npos ||
610  (pattern.find('[') != string::npos && pattern.find(']') != string::npos));
611  }
612 
613  typedef std::map<string, PipelineService::shared_pointer> PipelineServiceMap;
614  PipelineServiceMap m_services;
615 
616  typedef std::vector<std::pair<string, PipelineService::shared_pointer> > PipelineWildServiceList;
617  PipelineWildServiceList m_wildServices;
618 
619  epics::pvData::Mutex m_mutex;
620 };
621 
622 const string PipelineChannelProvider::PROVIDER_NAME("PipelineService");
623 const Status PipelineChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel");
624 
625 PipelineServer::PipelineServer()
626  :m_channelProviderImpl(new PipelineChannelProvider)
627 {
628  m_serverContext = ServerContext::create(ServerContext::Config()
629  .provider(m_channelProviderImpl));
630 }
631 
633 {
634  // multiple destroy call is OK
635  destroy();
636 }
637 
639 {
640  std::cout << m_serverContext->getVersion().getVersionString() << std::endl;
641  m_serverContext->printInfo();
642 }
643 
644 void PipelineServer::run(int seconds)
645 {
646  m_serverContext->run(seconds);
647 }
648 
652 {
653  if(seconds!=0)
654  std::cerr<<"PipelineServer::runInNewThread() only suppose seconds=0\n";
655 }
656 
658 {
659  m_serverContext->shutdown();
660 }
661 
662 void PipelineServer::registerService(std::string const & serviceName, PipelineService::shared_pointer const & service)
663 {
664  m_channelProviderImpl->registerService(serviceName, service);
665 }
666 
667 void PipelineServer::unregisterService(std::string const & serviceName)
668 {
669  m_channelProviderImpl->unregisterService(serviceName);
670 }
671 
672 }
673 }
void registerService(std::string const &serviceName, PipelineService::shared_pointer const &service)
A holder for a contiguous piece of memory.
Definition: sharedVector.h:27
static Status Ok
Definition: status.h:47
int i
Definition: scan.c:967
virtual ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const &channelListRequester)
void unregisterService(std::string const &serviceName)
Definition: memory.hpp:41
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
A lock for multithreading.
Definition: lock.h:36
An element for a monitorQueue.
Definition: monitor.h:54
void unlock()
Definition: lock.h:66
virtual std::tr1::shared_ptr< ChannelProvider > getChannelProvider()
storage_t::arg_type get() const
Definition: pvData.h:396
virtual Channel::shared_pointer createChannel(std::string const &, ChannelRequester::shared_pointer const &, short, std::string const &)
void unregisterService(std::string const &serviceName)
Holds all PVA related.
Definition: pvif.h:34
PVString is special case, since it implements SerializableArray.
Definition: pvData.h:521
Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const &provider, std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, PipelineService::shared_pointer const &pipelineService)
pvData
Definition: monitor.h:428
#define POINTER_DEFINITIONS(clazz)
Definition: sharedPtr.h:198
void push_back(param_type v)
Definition: sharedVector.h:602
virtual ChannelFind::shared_pointer channelFind(std::string const &channelName, ChannelFindRequester::shared_pointer const &channelFindRequester)
#define report
Definition: aaiRecord.c:53
Data interface for a structure,.
Definition: pvData.h:712
void registerService(std::string const &serviceName, PipelineService::shared_pointer const &service)
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
std::tr1::shared_ptr< PVString > PVStringPtr
Definition: pvData.h:540
virtual Channel::shared_pointer createChannel(std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, short)
void done(int k)
Definition: antelope.c:77
void runInNewThread(int seconds=0)
if(yy_init)
Definition: scan.c:972
Options for a server insatnce.
static ServerContext::shared_pointer create(const Config &conf=Config())
epicsMutex Mutex
Definition: lock.h:28
void reserve(size_t i)
Set array capacity.
Definition: sharedVector.h:428
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648