10 #include <epicsMath.h> 13 #define epicsExportSharedSymbols 23 namespace epics {
namespace pvAccess {
29 ,dropEmptyUpdates(
true)
33 size_t MonitorFIFO::num_instances;
38 const pvData::PVStructure::const_shared_pointer &pvRequest,
39 const Source::shared_pointer &source,
Config *inconf)
40 :conf(inconf ? *inconf :
Config())
55 REFTRACE_INCREMENT(num_instances);
63 pvd::PVScalar::const_shared_pointer O(pvRequest->getSubField<
pvd::PVScalar>(
"record._options.queueSize"));
67 }
catch(std::exception& e) {
68 std::ostringstream strm;
69 strm<<
"invalid queueSize : "<<e.what();
70 requester->message(strm.str());
80 O = pvRequest->getSubField<
pvd::PVScalar>(
"record._options.pipeline");
84 }
catch(std::exception& e) {
85 std::ostringstream strm;
86 strm<<
"invalid pipeline : "<<e.what();
87 requester->message(strm.str());
98 REFTRACE_DECREMENT(num_instances);
108 " pipeline="<<pipeline
110 <<
" freeHighLevel="<<freeHighLevel
116 case Closed: strm<<
" Closed";
break;
117 case Opened: strm<<
" Opened";
break;
118 case Error: strm<<
" Error:"<<error;
break;
121 strm<<
" running="<<running<<
" finished="<<finished<<
"\n";
122 strm<<
" #empty="<<empty.size()<<
" #returned="<<returned.size()<<
" #inuse="<<inuse.size()<<
" flowCount="<<flowCount<<
"\n";
123 strm<<
" events "<<(needConnected?
'C':
'_')<<(needEvent?
'E':
'_')<<(needUnlisten?
'U':
'_')<<(needClosed?
'X':
'_')
144 throw std::logic_error(
"Monitor already open. Must close() before re-openning");
146 throw std::logic_error(
"Monitor needs notify() between close() and open().");
148 throw std::logic_error(
"Monitor finished. re-open() not possible");
165 empty.push_back(elem);
176 }
catch(std::runtime_error& e){
181 needConnected =
true;
183 if(message.empty())
return;
184 requester_type::shared_pointer req(requester.lock());
193 needClosed = state==Opened;
201 throw std::logic_error(
"Can not finish() a closed Monitor");
206 if(inuse.empty() && running && state==Opened)
217 if(state!=Opened || finished)
return false;
218 assert(!empty.empty() || !inuse.empty());
220 const bool havefree = _freeCount()>0u;
225 }
else if(havefree) {
227 elem = empty.front();
236 elem->changedBitSet->clear();
238 *elem->pvStructurePtr, *elem->changedBitSet);
239 elem->overrunBitSet->clear();
242 if(inuse.empty() && running)
244 inuse.push_back(elem);
247 empty.push_front(elem);
255 return _freeCount()>0u;
265 if(state!=Opened || finished)
return;
266 assert(!empty.empty() || !inuse.empty());
268 const bool use_empty = !empty.empty();
277 elem = empty.front();
293 *elem->changedBitSet = scratch;
294 elem->overrunBitSet->
clear();
297 if(inuse.empty() && running)
300 inuse.push_back(elem);
308 elem->overrunBitSet->or_and(*elem->changedBitSet, scratch);
309 *elem->changedBitSet |= scratch;
312 elem->overrunBitSet->or_and(oscratch, scratch);
320 Monitor::shared_pointer
self;
321 MonitorRequester::shared_pointer req;
338 if(conn | evt | unl | clo) {
339 req = requester.lock();
340 self = shared_from_this();
351 req->monitorConnect(err,
self, type);
353 req->monitorEvent(
self);
357 req->channelDisconnect(
false);
362 Monitor::shared_pointer
self;
363 MonitorRequester::shared_pointer req;
369 throw std::logic_error(
"Monitor can't start() before open()");
371 if(running || state!=Opened)
375 self = shared_from_this();
376 req = requester.lock();
383 req->monitorEvent(
self);
400 Monitor::shared_pointer
self;
401 MonitorRequester::shared_pointer req;
406 if(!inuse.empty() && inuse.size() + empty.size() > 1) {
409 if(inuse.empty() && finished) {
410 self = shared_from_this();
411 req = requester.lock();
415 assert(!inuse.empty() || !empty.empty());
431 assert(!inuse.empty() || !empty.empty());
435 if(elem->pvStructurePtr->getStructure() !=
type 436 || empty.size()+returned.size()>=conf.
actualCount+1)
441 returned.push_back(elem);
445 bool below = _freeCount() <= freeHighLevel;
447 empty.push_front(elem);
449 bool above = _freeCount() > freeHighLevel;
451 if(!below || !above || !upstream)
454 nempty = _freeCount();
457 upstream->freeHighMark(
this, nempty);
464 s.
nempty = empty.size() + returned.size();
471 if(nfree<=0 || !pipeline)
478 bool below = _freeCount() <= freeHighLevel;
480 size_t nack =
std::min(
size_t(nfree), returned.size());
483 buffer_t::iterator end(returned.begin());
484 std::advance(end, nack);
487 empty.splice(empty.end(), returned, returned.begin(), end);
489 bool above = _freeCount() > freeHighLevel;
491 if(!below || !above || empty.size()<=1 || !upstream)
494 nempty = _freeCount();
497 upstream->freeHighMark(
this, nempty);
508 size_t MonitorFIFO::_freeCount()
const 513 return empty.empty() ? 0 : empty.size()-1;
void copyBaseToRequested(const PVStructure &base, const BitSet &baseMask, PVStructure &request, BitSet &requestMask) const
PVScalar is the base class for each scalar field.
#define assert(exp)
Declare that a condition should be true.
virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) OVERRIDE FINAL
BitSet & clear(uint32 bitIndex)
void open(const epics::pvData::StructureConstPtr &type)
Mark subscription as "open" with the associated structure type.
const BitSet & requestedMask() const
const std::string & warnings() const
After compute(), check if !warnings().empty()
static Status error(const std::string &m)
virtual epics::pvData::Status start() OVERRIDE FINAL
void compute(const PVStructure &base, const PVStructure &pvRequest, mode_t mode=Mask)
void maskBaseToRequested(const BitSet &baseMask, BitSet &requestMask) const
PVStructurePtr buildRequested() const
pvd::StructureConstPtr type
TODO only here because of the Lockable.
std::tr1::shared_ptr< const Structure > StructureConstPtr
MonitorFIFO(const std::tr1::shared_ptr< MonitorRequester > &requester, const pvData::PVStructure::const_shared_pointer &pvRequest, const Source::shared_pointer &source=Source::shared_pointer(), Config *conf=0)
An element for a monitorQueue.
void setFreeHighMark(double level)
epics::pvData::PVRequestMapper::mode_t mapperMode
default Mask.
epicsGuardRelease< epicsMutex > UnGuard
virtual epics::pvData::Status stop() OVERRIDE FINAL
size_t actualCount
filled in with actual FIFO size
virtual void release(MonitorElementPtr const &monitorElement) OVERRIDE FINAL
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
size_t nfilled
of elements ready to be poll()d
size_t maxCount
upper limit on requested FIFO size
virtual void getStats(Stats &s) const OVERRIDE FINAL
epicsGuard< epicsMutex > Guard
bool logical_and(const BitSet &other) const
Returns true if any bit is set in both *this and other.
void post(const pvData::PVStructure &value, const epics::pvData::BitSet &changed, const epics::pvData::BitSet &overrun=epics::pvData::BitSet())
Consume a free slot if available, otherwise squash with most recent.
Data interface for a structure,.
size_t defCount
FIFO size when client makes no request.
virtual MonitorElementPtr poll() OVERRIDE FINAL
size_t nempty
of elements available for new remote data
void finish()
Successful closure (eg. RDB query done)
void swap(shared_ptr< T > &a, shared_ptr< T > &b) BOOST_NOEXCEPT
size_t noutstanding
of elements poll()d but not released()d
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
detail::pick_type< int8_t, signed char, detail::pick_type< uint8_t, char, unsigned char >::type >::type boolean
size_t freeCount() const
Number of unused FIFO slots at this moment, which may changed in the next.
const StructureConstPtr & requested() const
void show(std::ostream &strm) const
bool tryPost(const pvData::PVStructure &value, const epics::pvData::BitSet &changed, const epics::pvData::BitSet &overrun=epics::pvData::BitSet(), bool force=false)
bool dropEmptyUpdates
default true. Drop updates which don't include an field values.
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
void close()
Abnormal closure (eg. due to upstream dis-connection)
virtual void destroy() OVERRIDE FINAL