This is Unofficial EPICS BASE Doxygen Site
monitor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright information and license terms for this software can be
3  * found in the file LICENSE that is included with the distribution
4  */
5 
6 #include <sstream>
7 #include <stdexcept>
8 
9 #include <epicsGuard.h>
10 #include <epicsMath.h>
11 #include <pv/reftrack.h>
12 
13 #define epicsExportSharedSymbols
14 #include <pv/monitor.h>
15 #include <pv/pvAccess.h>
16 #include <pv/createRequest.h>
17 
18 namespace pvd = epics::pvData;
19 
22 
23 namespace epics {namespace pvAccess {
24 
26  :maxCount(4)
27  ,defCount(4)
28  ,actualCount(0) // readback
29  ,dropEmptyUpdates(true)
30  ,mapperMode(pvd::PVRequestMapper::Mask)
31 {}
32 
33 size_t MonitorFIFO::num_instances;
34 
36 
37 MonitorFIFO::MonitorFIFO(const std::tr1::shared_ptr<MonitorRequester> &requester,
38  const pvData::PVStructure::const_shared_pointer &pvRequest,
39  const Source::shared_pointer &source, Config *inconf)
40  :conf(inconf ? *inconf : Config())
41  ,requester(requester)
42  ,pvRequest(pvRequest)
43  ,upstream(source)
44  ,state(Closed)
45  ,pipeline(false)
46  ,running(false)
47  ,finished(false)
48  ,needConnected(false)
49  ,needEvent(false)
50  ,needUnlisten(false)
51  ,needClosed(false)
52  ,freeHighLevel(0u)
53  ,flowCount(0)
54 {
55  REFTRACE_INCREMENT(num_instances);
56 
57  if(conf.maxCount==0)
58  conf.maxCount = 1;
59 
60  if(conf.defCount==0)
61  conf.defCount = 1;
62 
63  pvd::PVScalar::const_shared_pointer O(pvRequest->getSubField<pvd::PVScalar>("record._options.queueSize"));
64  if(O && conf.actualCount==0) {
65  try {
66  conf.actualCount = O->getAs<pvd::uint32>();
67  } catch(std::exception& e) {
68  std::ostringstream strm;
69  strm<<"invalid queueSize : "<<e.what();
70  requester->message(strm.str());
71  }
72  }
73 
74  if(conf.actualCount==0)
75  conf.actualCount = conf.defCount;
76 
77  if(conf.actualCount > conf.maxCount)
78  conf.actualCount = conf.maxCount;
79 
80  O = pvRequest->getSubField<pvd::PVScalar>("record._options.pipeline");
81  if(O) {
82  try {
83  pipeline = O->getAs<pvd::boolean>();
84  } catch(std::exception& e) {
85  std::ostringstream strm;
86  strm<<"invalid pipeline : "<<e.what();
87  requester->message(strm.str());
88  }
89  }
90 
91  setFreeHighMark(0.00);
92 
93  if(inconf)
94  *inconf = conf;
95 }
96 
98  REFTRACE_DECREMENT(num_instances);
99 }
100 
102 {}
103 
104 void MonitorFIFO::show(std::ostream& strm) const
105 {
106  // const (after ctor) bits
107  strm<<"MonitorFIFO"
108  " pipeline="<<pipeline
109  <<" size="<<conf.actualCount
110  <<" freeHighLevel="<<freeHighLevel
111  <<"\n";
112 
113  Guard G(mutex);
114 
115  switch(state) {
116  case Closed: strm<<" Closed"; break;
117  case Opened: strm<<" Opened"; break;
118  case Error: strm<<" Error:"<<error; break;
119  }
120 
121  strm<<" running="<<running<<" finished="<<finished<<"\n";
122  strm<<" #empty="<<empty.size()<<" #returned="<<returned.size()<<" #inuse="<<inuse.size()<<" flowCount="<<flowCount<<"\n";
123  strm<<" events "<<(needConnected?'C':'_')<<(needEvent?'E':'_')<<(needUnlisten?'U':'_')<<(needClosed?'X':'_')
124  <<"\n";
125 }
126 
128 {
129  level = std::max(0.0, std::min(level, 1.0));
130  pvd::uint32 lvl = std::max(size_t(0), std::min(size_t(conf.actualCount * level), conf.actualCount-1));
131 
132  Guard G(mutex);
133 
134  freeHighLevel = lvl;
135 }
136 
138 {
139  std::string message;
140  {
141  Guard G(mutex);
142 
143  if(state!=Closed)
144  throw std::logic_error("Monitor already open. Must close() before re-openning");
145  else if(needClosed)
146  throw std::logic_error("Monitor needs notify() between close() and open().");
147  else if(finished)
148  throw std::logic_error("Monitor finished. re-open() not possible");
149 
150  // keep the code simpler.
151  // never try to re-use elements, even on re-open w/o type change.
152  empty.clear();
153  inuse.clear();
154  returned.clear();
155 
156  // fill up empty.
158 
159  try {
160  mapper.compute(*create->createPVStructure(type), *pvRequest, conf.mapperMode);
161  message = mapper.warnings();
162 
163  while(empty.size() < conf.actualCount+1) {
164  MonitorElementPtr elem(new MonitorElement(mapper.buildRequested()));
165  empty.push_back(elem);
166  }
167 
168  state = Opened;
169  error = pvd::Status(); // ok
170 
171  assert(inuse.empty());
172  assert(empty.size()>=2);
173  assert(returned.empty());
174  assert(conf.actualCount>=1);
175 
176  }catch(std::runtime_error& e){
177  // error from compute()
178  error = pvd::Status::error(e.what());
179  state = Error;
180  }
181  needConnected = true;
182  }
183  if(message.empty()) return;
184  requester_type::shared_pointer req(requester.lock());
185  if(req) {
186  req->message(message, warningMessage);
187  }
188 }
189 
191 {
192  Guard G(mutex);
193  needClosed = state==Opened;
194  state = Closed;
195 }
196 
198 {
199  Guard G(mutex);
200  if(state==Closed)
201  throw std::logic_error("Can not finish() a closed Monitor");
202  else if(finished)
203  return; // no-op
204 
205  finished = true;
206  if(inuse.empty() && running && state==Opened)
207  needUnlisten = true;
208 }
209 
211  const pvd::BitSet& changed,
212  const pvd::BitSet& overrun,
213  bool force)
214 {
215  Guard G(mutex);
216 
217  if(state!=Opened || finished) return false; // when Error, act as always "full"
218  assert(!empty.empty() || !inuse.empty());
219 
220  const bool havefree = _freeCount()>0u;
221 
222  MonitorElementPtr elem;
223  if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask())) {
224  // drop empty update
225  } else if(havefree) {
226  // take an empty element
227  elem = empty.front();
228  empty.pop_front();
229  } else if(force) {
230  // allocate an extra element
231  elem.reset(new MonitorElement(mapper.buildRequested()));
232  }
233 
234  if(elem) {
235  try {
236  elem->changedBitSet->clear();
237  mapper.copyBaseToRequested(value, changed,
238  *elem->pvStructurePtr, *elem->changedBitSet);
239  elem->overrunBitSet->clear();
240  mapper.maskBaseToRequested(overrun, *elem->overrunBitSet);
241 
242  if(inuse.empty() && running)
243  needEvent = true;
244  inuse.push_back(elem);
245  }catch(...){
246  if(havefree) {
247  empty.push_front(elem);
248  }
249  throw;
250  }
251  if(pipeline)
252  flowCount--;
253  }
254 
255  return _freeCount()>0u;
256 }
257 
258 
260  const pvd::BitSet& changed,
261  const pvd::BitSet& overrun)
262 {
263  Guard G(mutex);
264 
265  if(state!=Opened || finished) return;
266  assert(!empty.empty() || !inuse.empty());
267 
268  const bool use_empty = !empty.empty();
269 
270  MonitorElementPtr elem;
271 
272  if(use_empty) {
273  // space in window, or entering overflow, fill an empty element
274 
275  assert(!empty.empty());
276 
277  elem = empty.front();
278 
279  } else {
280  // window full and already in overflow
281  // squash with last element
282  assert(!inuse.empty());
283  elem = inuse.back();
284  }
285 
286  if(conf.dropEmptyUpdates && !changed.logical_and(mapper.requestedMask()))
287  return; // drop empty update
288 
289  scratch.clear();
290  mapper.copyBaseToRequested(value, changed, *elem->pvStructurePtr, scratch);
291 
292  if(use_empty) {
293  *elem->changedBitSet = scratch;
294  elem->overrunBitSet->clear();
295  mapper.maskBaseToRequested(overrun, *elem->overrunBitSet);
296 
297  if(inuse.empty() && running)
298  needEvent = true;
299 
300  inuse.push_back(elem);
301  empty.pop_front();
302  if(pipeline)
303  flowCount--;
304 
305  } else {
306  // in overflow
307  // squash
308  elem->overrunBitSet->or_and(*elem->changedBitSet, scratch);
309  *elem->changedBitSet |= scratch;
310  oscratch.clear();
311  mapper.maskBaseToRequested(overrun, oscratch);
312  elem->overrunBitSet->or_and(oscratch, scratch);
313 
314  // leave as inuse.back()
315  }
316 }
317 
319 {
320  Monitor::shared_pointer self;
321  MonitorRequester::shared_pointer req;
323  bool conn = false,
324  evt = false,
325  unl = false,
326  clo = false;
327  pvd::Status err;
328 
329  {
330  Guard G(mutex);
331 
332  std::swap(conn, needConnected);
333  std::swap(evt, needEvent);
334  std::swap(unl, needUnlisten);
335  std::swap(clo, needClosed);
336  std::swap(err, error);
337 
338  if(conn | evt | unl | clo) {
339  req = requester.lock();
340  self = shared_from_this();
341  }
342  if(conn && err.isSuccess())
343  type = mapper.requested();
344  }
345 
346  if(!req)
347  return;
348  if(conn && err.isSuccess())
349  req->monitorConnect(pvd::Status(), self, type);
350  else if(conn)
351  req->monitorConnect(err, self, type);
352  if(evt)
353  req->monitorEvent(self);
354  if(unl)
355  req->unlisten(self);
356  if(clo)
357  req->channelDisconnect(false);
358 }
359 
361 {
362  Monitor::shared_pointer self;
363  MonitorRequester::shared_pointer req;
364 
365  {
366  Guard G(mutex);
367 
368  if(state==Closed)
369  throw std::logic_error("Monitor can't start() before open()");
370 
371  if(running || state!=Opened)
372  return pvd::Status();
373 
374  if(!inuse.empty()) {
375  self = shared_from_this();
376  req = requester.lock();
377  }
378 
379  running = true;
380  }
381 
382  if(req)
383  req->monitorEvent(self);
384 
385  return pvd::Status();
386 }
387 
389 {
390  Guard G(mutex);
391 
392  running = false;
393 
394  return pvd::Status();
395 }
396 
398 {
399  MonitorElementPtr ret;
400  Monitor::shared_pointer self;
401  MonitorRequester::shared_pointer req;
402 
403  {
404  Guard G(mutex);
405 
406  if(!inuse.empty() && inuse.size() + empty.size() > 1) {
407  ret = inuse.front();
408  inuse.pop_front();
409  if(inuse.empty() && finished) {
410  self = shared_from_this();
411  req = requester.lock();
412  }
413  }
414 
415  assert(!inuse.empty() || !empty.empty());
416  }
417 
418  if(req) {
419  req->unlisten(self);
420  }
421 
422  return ret;
423 }
424 
426 {
427  size_t nempty;
428  {
429  Guard G(mutex);
430 
431  assert(!inuse.empty() || !empty.empty());
432 
433  const pvd::StructureConstPtr& type((!inuse.empty() ? inuse.front() : empty.back())->pvStructurePtr->getStructure());
434 
435  if(elem->pvStructurePtr->getStructure() != type // return of old type
436  || empty.size()+returned.size()>=conf.actualCount+1) // return of force'd
437  return; // ignore it
438 
439  if(pipeline) {
440  // work done during reportRemoteQueueStatus()
441  returned.push_back(elem);
442  return;
443  }
444 
445  bool below = _freeCount() <= freeHighLevel;
446 
447  empty.push_front(elem);
448 
449  bool above = _freeCount() > freeHighLevel;
450 
451  if(!below || !above || !upstream)
452  return;
453 
454  nempty = _freeCount();
455  }
456 
457  upstream->freeHighMark(this, nempty);
458  notify();
459 }
460 
462 {
463  Guard G(mutex);
464  s.nempty = empty.size() + returned.size();
465  s.nfilled = inuse.size();
466  s.noutstanding = conf.actualCount - s.nempty - s.nfilled;
467 }
468 
470 {
471  if(nfree<=0 || !pipeline)
472  return; // paranoia
473 
474  size_t nempty;
475  {
476  Guard G(mutex);
477 
478  bool below = _freeCount() <= freeHighLevel;
479 
480  size_t nack = std::min(size_t(nfree), returned.size());
481  flowCount += nfree;
482 
483  buffer_t::iterator end(returned.begin());
484  std::advance(end, nack);
485 
486  // remove[0, nack) from returned and append to empty
487  empty.splice(empty.end(), returned, returned.begin(), end);
488 
489  bool above = _freeCount() > freeHighLevel;
490 
491  if(!below || !above || empty.size()<=1 || !upstream)
492  return;
493 
494  nempty = _freeCount();
495  }
496 
497  upstream->freeHighMark(this, nempty);
498  notify();
499 }
500 
502 {
503  Guard G(mutex);
504  return _freeCount();
505 }
506 
507 // caller must hold lock
508 size_t MonitorFIFO::_freeCount() const
509 {
510  if(pipeline) {
511  return std::max(0, std::min(flowCount, epicsInt32(empty.size())));
512  } else {
513  return empty.empty() ? 0 : empty.size()-1;
514  }
515 }
516 
517 }} // namespace epics::pvAccess
void copyBaseToRequested(const PVStructure &base, const BitSet &baseMask, PVStructure &request, BitSet &requestMask) const
Definition: link.h:174
#define max(x, y)
Definition: flexdef.h:81
PVScalar is the base class for each scalar field.
Definition: pvData.h:272
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) OVERRIDE FINAL
Definition: monitor.cpp:469
BitSet & clear(uint32 bitIndex)
Definition: bitSet.cpp:112
void open(const epics::pvData::StructureConstPtr &type)
Mark subscription as "open" with the associated structure type.
Definition: monitor.cpp:137
const BitSet & requestedMask() const
#define true
Definition: flexdef.h:84
const std::string & warnings() const
After compute(), check if !warnings().empty()
static Status error(const std::string &m)
Definition: status.h:50
virtual epics::pvData::Status start() OVERRIDE FINAL
Definition: monitor.cpp:360
#define min(x, y)
Definition: flexdef.h:78
void compute(const PVStructure &base, const PVStructure &pvRequest, mode_t mode=Mask)
void maskBaseToRequested(const BitSet &baseMask, BitSet &requestMask) const
PVStructurePtr buildRequested() const
pvd::StructureConstPtr type
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
MonitorFIFO(const std::tr1::shared_ptr< MonitorRequester > &requester, const pvData::PVStructure::const_shared_pointer &pvRequest, const Source::shared_pointer &source=Source::shared_pointer(), Config *conf=0)
Definition: monitor.cpp:37
A vector of bits.
Definition: bitSet.h:56
An element for a monitorQueue.
Definition: monitor.h:54
void setFreeHighMark(double level)
Definition: monitor.cpp:127
epics::pvData::PVRequestMapper::mode_t mapperMode
default Mask.
Definition: monitor.h:279
epicsGuardRelease< epicsMutex > UnGuard
Definition: monitor.cpp:21
virtual epics::pvData::Status stop() OVERRIDE FINAL
Definition: monitor.cpp:388
size_t actualCount
filled in with actual FIFO size
Definition: monitor.h:275
virtual void release(MonitorElementPtr const &monitorElement) OVERRIDE FINAL
Definition: monitor.cpp:425
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
Definition: pvData.h:124
size_t nfilled
of elements ready to be poll()d
Definition: monitor.h:101
pvData
Definition: monitor.h:428
size_t maxCount
upper limit on requested FIFO size
Definition: monitor.h:275
bool isSuccess() const
Definition: status.h:103
virtual void getStats(Stats &s) const OVERRIDE FINAL
Definition: monitor.cpp:461
epicsGuard< epicsMutex > Guard
Definition: monitor.cpp:20
bool logical_and(const BitSet &other) const
Returns true if any bit is set in both *this and other.
Definition: bitSet.cpp:214
void post(const pvData::PVStructure &value, const epics::pvData::BitSet &changed, const epics::pvData::BitSet &overrun=epics::pvData::BitSet())
Consume a free slot if available, otherwise squash with most recent.
Definition: monitor.cpp:259
Data interface for a structure,.
Definition: pvData.h:712
size_t defCount
FIFO size when client makes no request.
Definition: monitor.h:275
virtual MonitorElementPtr poll() OVERRIDE FINAL
Definition: monitor.cpp:397
size_t nempty
of elements available for new remote data
Definition: monitor.h:103
void finish()
Successful closure (eg. RDB query done)
Definition: monitor.cpp:197
void swap(shared_ptr< T > &a, shared_ptr< T > &b) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:783
size_t noutstanding
of elements poll()d but not released()d
Definition: monitor.h:102
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
Definition: monitor.h:40
detail::pick_type< int8_t, signed char, detail::pick_type< uint8_t, char, unsigned char >::type >::type boolean
Definition: pvType.h:71
size_t freeCount() const
Number of unused FIFO slots at this moment, which may changed in the next.
Definition: monitor.cpp:501
const StructureConstPtr & requested() const
void show(std::ostream &strm) const
Definition: monitor.cpp:104
bool tryPost(const pvData::PVStructure &value, const epics::pvData::BitSet &changed, const epics::pvData::BitSet &overrun=epics::pvData::BitSet(), bool force=false)
Definition: monitor.cpp:210
bool dropEmptyUpdates
default true. Drop updates which don&#39;t include an field values.
Definition: monitor.h:278
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
int epicsInt32
Definition: epicsTypes.h:42
void close()
Abnormal closure (eg. due to upstream dis-connection)
Definition: monitor.cpp:190
virtual void destroy() OVERRIDE FINAL
Definition: monitor.cpp:101
uint32_t uint32
Definition: pvType.h:99