12 #define epicsExportSharedSymbols 22 class ChannelPipelineMonitorImpl :
25 public std::tr1::enable_shared_from_this<ChannelPipelineMonitorImpl>
29 typedef vector<MonitorElement::shared_pointer> FreeElementQueue;
30 typedef queue<MonitorElement::shared_pointer> MonitorElementQueue;
32 Channel::shared_pointer m_channel;
33 MonitorRequester::shared_pointer m_monitorRequester;
34 PipelineSession::shared_pointer m_pipelineSession;
38 FreeElementQueue m_freeQueue;
39 MonitorElementQueue m_monitorQueue;
41 Mutex m_freeQueueLock;
42 Mutex m_monitorQueueLock;
45 MonitorElement::shared_pointer m_nullMonitorElement;
47 size_t m_requestedCount;
53 bool m_unlistenReported;
56 ChannelPipelineMonitorImpl(
57 Channel::shared_pointer
const & channel,
58 MonitorRequester::shared_pointer
const & monitorRequester,
59 epics::pvData::PVStructure::shared_pointer
const & pvRequest,
60 PipelineService::shared_pointer
const & pipelineService) :
62 m_monitorRequester(monitorRequester),
70 m_unlistenReported(
false)
73 m_pipelineSession = pipelineService->createPipeline(pvRequest);
82 ss << pvString->
get();
85 m_queueSize =
static_cast<size_t>(size);
87 pvString = pvOptions->getSubField<
PVString>(
"pipeline");
89 m_pipeline = (pvString->get() ==
"true");
93 size_t minQueueSize = m_pipelineSession->getMinQueueSize();
94 if (m_queueSize < minQueueSize)
95 m_queueSize = minQueueSize;
97 Structure::const_shared_pointer
structure = m_pipelineSession->getStructure();
101 Lock guard(m_freeQueueLock);
102 m_freeQueue.reserve(m_queueSize);
103 for (
size_t i = 0;
i < m_queueSize;
i++)
105 PVStructure::shared_pointer pvStructure =
getPVDataCreate()->createPVStructure(structure);
106 MonitorElement::shared_pointer monitorElement(
new MonitorElement(pvStructure));
108 monitorElement->changedBitSet->set(0);
109 m_freeQueue.push_back(monitorElement);
114 PipelineSession::shared_pointer getPipelineSession()
const {
115 return m_pipelineSession;
118 bool isPipelineEnabled()
const {
122 virtual ~ChannelPipelineMonitorImpl()
131 Lock guard(m_monitorQueueLock);
138 notify = (m_monitorQueue.size() != 0);
143 Monitor::shared_pointer thisPtr = shared_from_this();
144 m_monitorRequester->monitorEvent(thisPtr);
152 Lock guard(m_monitorQueueLock);
158 virtual MonitorElement::shared_pointer poll()
160 Lock guard(m_monitorQueueLock);
164 bool emptyQueue = m_monitorQueue.empty();
165 if (emptyQueue || m_requestedCount == 0 || !m_active)
168 if (!m_unlistenReported && m_done && emptyQueue)
170 m_unlistenReported =
true;
172 m_monitorRequester->unlisten(shared_from_this());
175 return m_nullMonitorElement;
178 MonitorElement::shared_pointer element = m_monitorQueue.front();
179 m_monitorQueue.pop();
186 virtual void release(MonitorElement::shared_pointer
const & monitorElement)
188 Lock guard(m_freeQueueLock);
189 m_freeQueue.push_back(monitorElement);
192 virtual void reportRemoteQueueStatus(
int32 freeElements)
195 size_t count =
static_cast<size_t>(freeElements);
201 Lock guard(m_monitorQueueLock);
202 m_requestedCount += count;
203 notify = m_active && (m_monitorQueue.size() != 0);
210 Monitor::shared_pointer thisPtr = shared_from_this();
211 m_monitorRequester->monitorEvent(thisPtr);
214 m_pipelineSession->request(shared_from_this(), count);
217 virtual void destroy()
219 bool notifyCancel =
false;
222 Lock guard(m_monitorQueueLock);
224 notifyCancel = !m_done;
229 m_pipelineSession->cancel();
232 virtual size_t getFreeElementCount() {
233 Lock guard(m_freeQueueLock);
234 return m_freeQueue.size();
237 virtual size_t getRequestedCount() {
239 Lock guard(m_monitorQueueLock);
240 return m_requestedCount;
243 virtual MonitorElement::shared_pointer getFreeElement() {
244 Lock guard(m_freeQueueLock);
245 if (m_freeQueue.empty())
246 return m_nullMonitorElement;
248 MonitorElement::shared_pointer freeElement = m_freeQueue.back();
249 m_freeQueue.pop_back();
254 virtual void putElement(MonitorElement::shared_pointer
const & element) {
258 Lock guard(m_monitorQueueLock);
263 m_monitorQueue.push(element);
265 notify = (m_requestedCount != 0);
271 Monitor::shared_pointer thisPtr = shared_from_this();
272 m_monitorRequester->monitorEvent(thisPtr);
276 virtual void done() {
277 Lock guard(m_monitorQueueLock);
280 bool report = !m_unlistenReported && m_monitorQueue.empty();
282 m_unlistenReported =
true;
287 m_monitorRequester->unlisten(shared_from_this());
293 class PipelineChannel :
295 public std::tr1::enable_shared_from_this<PipelineChannel>
299 static Status notSupportedStatus;
300 static Status destroyedStatus;
304 ChannelProvider::shared_pointer m_provider;
305 string m_channelName;
306 ChannelRequester::shared_pointer m_channelRequester;
308 PipelineService::shared_pointer m_pipelineService;
314 ChannelProvider::shared_pointer
const & provider,
315 string const & channelName,
316 ChannelRequester::shared_pointer
const & channelRequester,
317 PipelineService::shared_pointer
const & pipelineService) :
318 m_provider(provider),
319 m_channelName(channelName),
320 m_channelRequester(channelRequester),
321 m_pipelineService(pipelineService)
325 virtual ~PipelineChannel()
330 virtual std::tr1::shared_ptr<ChannelProvider> getProvider()
335 virtual std::string getRemoteAddress()
338 return getChannelName();
341 virtual ConnectionState getConnectionState()
343 return m_destroyed.
get() ?
348 virtual std::string getChannelName()
350 return m_channelName;
353 virtual std::tr1::shared_ptr<ChannelRequester> getChannelRequester()
355 return m_channelRequester;
358 virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer
const & )
363 virtual Monitor::shared_pointer createMonitor(
364 MonitorRequester::shared_pointer
const & monitorRequester,
365 epics::pvData::PVStructure::shared_pointer
const & pvRequest)
368 throw std::invalid_argument(
"pvRequest == null");
370 if (m_destroyed.
get())
372 Monitor::shared_pointer nullPtr;
373 epics::pvData::Structure::const_shared_pointer nullStructure;
374 monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure);
379 std::tr1::shared_ptr<ChannelPipelineMonitorImpl> tp(
380 new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService)
382 Monitor::shared_pointer channelPipelineMonitorImpl = tp;
384 if (tp->isPipelineEnabled())
386 monitorRequester->monitorConnect(
Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure());
387 return channelPipelineMonitorImpl;
391 Monitor::shared_pointer nullPtr;
392 epics::pvData::Structure::const_shared_pointer nullStructure;
393 Status noPipelineEnabledStatus(
Status::STATUSTYPE_ERROR,
"pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining");
394 monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure);
399 virtual ChannelArray::shared_pointer createChannelArray(
400 ChannelArrayRequester::shared_pointer
const & channelArrayRequester,
401 epics::pvData::PVStructure::shared_pointer
const & )
403 ChannelArray::shared_pointer nullPtr;
404 channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer());
409 virtual void printInfo(std::ostream& out)
411 out <<
"PipelineChannel: ";
412 out << getChannelName();
414 out << Channel::ConnectionStateNames[getConnectionState()];
418 virtual string getRequesterName()
420 return getChannelName();
423 virtual void destroy()
437 std::string
const & channelName,
438 ChannelRequester::shared_pointer
const & channelRequester,
439 PipelineService::shared_pointer
const & pipelineService)
442 std::tr1::shared_ptr<PipelineChannel> tp(
443 new PipelineChannel(provider, channelName, channelRequester, pipelineService)
445 Channel::shared_pointer channel = tp;
452 public std::tr1::enable_shared_from_this<PipelineChannelProvider> {
467 return PROVIDER_NAME;
472 return shared_from_this();
479 virtual ChannelFind::shared_pointer
channelFind(std::string
const & channelName,
480 ChannelFindRequester::shared_pointer
const & channelFindRequester)
485 found = (m_services.find(channelName) != m_services.end()) ||
486 findWildService(channelName);
488 ChannelFind::shared_pointer thisPtr(shared_from_this());
489 channelFindRequester->channelFindResult(
Status::Ok, thisPtr, found);
495 ChannelListRequester::shared_pointer
const & channelListRequester)
497 if (!channelListRequester.get())
498 throw std::runtime_error(
"null requester");
503 channelNames.
reserve(m_services.size());
504 for (PipelineServiceMap::const_iterator iter = m_services.begin();
505 iter != m_services.end();
510 ChannelFind::shared_pointer thisPtr(shared_from_this());
511 channelListRequester->channelListResult(
Status::Ok, thisPtr, freeze(channelNames),
false);
516 std::string
const & channelName,
517 ChannelRequester::shared_pointer
const & channelRequester,
520 PipelineService::shared_pointer service;
522 PipelineServiceMap::const_iterator iter;
525 iter = m_services.find(channelName);
527 if (iter != m_services.end())
528 service = iter->second;
532 service = findWildService(channelName);
536 Channel::shared_pointer nullChannel;
537 channelRequester->channelCreated(noSuchChannelStatus, nullChannel);
542 std::tr1::shared_ptr<PipelineChannel> tp(
548 Channel::shared_pointer pipelineChannel = tp;
549 channelRequester->channelCreated(
Status::Ok, pipelineChannel);
550 return pipelineChannel;
554 std::string
const & ,
555 ChannelRequester::shared_pointer
const & ,
557 std::string
const & )
560 throw std::runtime_error(
"not supported");
563 void registerService(std::string
const & serviceName, PipelineService::shared_pointer
const & service)
566 m_services[serviceName] = service;
568 if (isWildcardPattern(serviceName))
569 m_wildServices.push_back(std::make_pair(serviceName, service));
575 m_services.erase(serviceName);
577 if (isWildcardPattern(serviceName))
579 for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
580 iter != m_wildServices.end();
582 if (iter->first == serviceName)
584 m_wildServices.erase(iter);
592 PipelineService::shared_pointer findWildService(
string const & wildcard)
594 if (!m_wildServices.empty())
595 for (PipelineWildServiceList::iterator iter = m_wildServices.begin();
596 iter != m_wildServices.end();
598 if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
601 return PipelineService::shared_pointer();
605 bool isWildcardPattern(
string const & pattern)
608 (pattern.find(
'*') != string::npos ||
609 pattern.find(
'?') != string::npos ||
610 (pattern.find(
'[') != string::npos && pattern.find(
']') != string::npos));
613 typedef std::map<string, PipelineService::shared_pointer> PipelineServiceMap;
614 PipelineServiceMap m_services;
616 typedef std::vector<std::pair<string, PipelineService::shared_pointer> > PipelineWildServiceList;
617 PipelineWildServiceList m_wildServices;
622 const string PipelineChannelProvider::PROVIDER_NAME(
"PipelineService");
625 PipelineServer::PipelineServer()
629 .provider(m_channelProviderImpl));
640 std::cout << m_serverContext->getVersion().getVersionString() << std::endl;
641 m_serverContext->printInfo();
646 m_serverContext->run(seconds);
654 std::cerr<<
"PipelineServer::runInNewThread() only suppose seconds=0\n";
659 m_serverContext->shutdown();
664 m_channelProviderImpl->registerService(serviceName, service);
669 m_channelProviderImpl->unregisterService(serviceName);
void registerService(std::string const &serviceName, PipelineService::shared_pointer const &service)
static const string PROVIDER_NAME
A holder for a contiguous piece of memory.
PipelineChannelProvider()
virtual ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const &channelListRequester)
void unregisterService(std::string const &serviceName)
TODO only here because of the Lockable.
A lock for multithreading.
An element for a monitorQueue.
virtual std::tr1::shared_ptr< ChannelProvider > getChannelProvider()
storage_t::arg_type get() const
virtual Channel::shared_pointer createChannel(std::string const &, ChannelRequester::shared_pointer const &, short, std::string const &)
void unregisterService(std::string const &serviceName)
PVString is special case, since it implements SerializableArray.
Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const &provider, std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, PipelineService::shared_pointer const &pipelineService)
#define POINTER_DEFINITIONS(clazz)
void push_back(param_type v)
virtual ChannelFind::shared_pointer channelFind(std::string const &channelName, ChannelFindRequester::shared_pointer const &channelFindRequester)
Data interface for a structure,.
void registerService(std::string const &serviceName, PipelineService::shared_pointer const &service)
std::tr1::shared_ptr< PVStructure > PVStructurePtr
std::tr1::shared_ptr< PVString > PVStringPtr
virtual Channel::shared_pointer createChannel(std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, short)
static const Status noSuchChannelStatus
virtual string getProviderName()
void runInNewThread(int seconds=0)
virtual ~PipelineServer()
Options for a server insatnce.
static ServerContext::shared_pointer create(const Config &conf=Config())
void reserve(size_t i)
Set array capacity.
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()