This is Unofficial EPICS BASE Doxygen Site
monitorFactory.cpp
Go to the documentation of this file.
1 /* monitorFactory.cpp */
12 #include <sstream>
13 
14 #include <epicsGuard.h>
15 #include <pv/thread.h>
16 #include <pv/bitSetUtil.h>
17 #include <pv/pvData.h>
18 #include <pv/pvAccess.h>
19 #include <pv/pvTimeStamp.h>
20 #include <pv/rpcService.h>
21 #include <pv/serverContext.h>
22 #include <pv/timeStamp.h>
23 
24 #define epicsExportSharedSymbols
25 #include "pv/pvStructureCopy.h"
26 #include "pv/pvDatabase.h"
28 
29 using namespace epics::pvData;
30 using namespace epics::pvAccess;
31 using namespace epics::pvCopy;
33 using std::cout;
34 using std::endl;
35 using std::string;
36 
37 namespace epics { namespace pvDatabase {
38 
40 typedef std::tr1::shared_ptr<MonitorLocal> MonitorLocalPtr;
41 
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45  Status::STATUSTYPE_ERROR,"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,"record is deleted");
49 
51 typedef std::tr1::shared_ptr<MonitorElementQueue> MonitorElementQueuePtr;
52 
54 {
55 private:
56  MonitorElementPtrArray elements;
57  // TODO use size_t instead
58  int size;
59  int numberFree;
60  int numberUsed;
61  int nextGetFree;
62  int nextSetUsed;
63  int nextGetUsed;
64  int nextReleaseUsed;
65 public:
67 
68  MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69  : elements(monitorElementArray),
70  size(monitorElementArray.size()),
71  numberFree(size),
72  numberUsed(0),
73  nextGetFree(0),
74  nextSetUsed(0),
75  nextGetUsed(0),
76  nextReleaseUsed(0)
77  {
78  }
79 
80  virtual ~MonitorElementQueue() {}
81 
82  void clear()
83  {
84  numberFree = size;
85  numberUsed = 0;
86  nextGetFree = 0;
87  nextSetUsed = 0;
88  nextGetUsed = 0;
89  nextReleaseUsed = 0;
90  }
91 
93  {
94  if(numberFree==0) return MonitorElementPtr();
95  numberFree--;
96  int ind = nextGetFree;
97  MonitorElementPtr queueElement = elements[nextGetFree++];
98  if(nextGetFree>=size) nextGetFree = 0;
99  return elements[ind];
100  }
101 
102  void setUsed(MonitorElementPtr const &element)
103  {
104  if(element!=elements[nextSetUsed++]) {
105  throw std::logic_error("not correct queueElement");
106  }
107  numberUsed++;
108  if(nextSetUsed>=size) nextSetUsed = 0;
109  }
110 
112  {
113  if(numberUsed==0) return MonitorElementPtr();
114  int ind = nextGetUsed;
115  MonitorElementPtr queueElement = elements[nextGetUsed++];
116  if(nextGetUsed>=size) nextGetUsed = 0;
117  return elements[ind];
118  }
119  void releaseUsed(MonitorElementPtr const &element)
120  {
121  if(element!=elements[nextReleaseUsed++]) {
122  throw std::logic_error(
123  "not queueElement returned by last call to getUsed");
124  }
125  if(nextReleaseUsed>=size) nextReleaseUsed = 0;
126  numberUsed--;
127  numberFree++;
128  }
129 };
130 
131 
132 typedef std::tr1::shared_ptr<MonitorRequester> MonitorRequesterPtr;
133 
134 
136  public Monitor,
137  public PVListener,
138  public std::tr1::enable_shared_from_this<MonitorLocal>
139 {
140  enum MonitorState {idle,active,deleted};
141 public:
143  virtual ~MonitorLocal();
144  virtual void destroy() {} // DEPRECATED
145  virtual Status start();
146  virtual Status stop();
147  virtual MonitorElementPtr poll();
148  virtual void detach(PVRecordPtr const & pvRecord){}
149  virtual void release(MonitorElementPtr const & monitorElement);
150  virtual void dataPut(PVRecordFieldPtr const & pvRecordField);
151  virtual void dataPut(
152  PVRecordStructurePtr const & requested,
153  PVRecordFieldPtr const & pvRecordField);
154  virtual void beginGroupPut(PVRecordPtr const & pvRecord);
155  virtual void endGroupPut(PVRecordPtr const & pvRecord);
156  virtual void unlisten(PVRecordPtr const & pvRecord);
157  MonitorElementPtr getActiveElement();
158  void releaseActiveElement();
159  bool init(PVStructurePtr const & pvRequest);
160  MonitorLocal(
161  MonitorRequester::shared_pointer const & channelMonitorRequester,
162  PVRecordPtr const &pvRecord);
163  PVCopyPtr getPVCopy() { return pvCopy;}
164 private:
165  MonitorLocalPtr getPtrSelf()
166  {
167  return shared_from_this();
168  }
169  MonitorRequester::weak_pointer monitorRequester;
170  PVRecordPtr pvRecord;
171  MonitorState state;
172  PVCopyPtr pvCopy;
173  MonitorElementQueuePtr queue;
174  MonitorElementPtr activeElement;
175  bool isGroupPut;
176  bool dataChanged;
177  Mutex mutex;
178  Mutex queueMutex;
179 };
180 
181 MonitorLocal::MonitorLocal(
182  MonitorRequester::shared_pointer const & channelMonitorRequester,
183  PVRecordPtr const &pvRecord)
184 : monitorRequester(channelMonitorRequester),
185  pvRecord(pvRecord),
186  state(idle),
187  isGroupPut(false),
188  dataChanged(false)
189 {
190 }
191 
193 {
194 //cout << "MonitorLocal::~MonitorLocal()" << endl;
195 }
196 
197 
199 {
200  if(pvRecord->getTraceLevel()>0)
201  {
202  cout << "MonitorLocal::start state " << state << endl;
203  }
204  {
205  Lock xx(mutex);
206  if(state==active) return alreadyStartedStatus;
207  if(state==deleted) return deletedStatus;
208  }
209  pvRecord->addListener(getPtrSelf(),pvCopy);
210  epicsGuard <PVRecord> guard(*pvRecord);
211  Lock xx(mutex);
212  state = active;
213  queue->clear();
214  isGroupPut = false;
215  activeElement = queue->getFree();
216  activeElement->changedBitSet->clear();
217  activeElement->overrunBitSet->clear();
218  activeElement->changedBitSet->set(0);
220  return Status::Ok;
221 }
222 
224 {
225  if(pvRecord->getTraceLevel()>0){
226  cout << "MonitorLocal::stop state " << state << endl;
227  }
228  {
229  Lock xx(mutex);
230  if(state==idle) return notStartedStatus;
231  if(state==deleted) return deletedStatus;
232  state = idle;
233  }
234  pvRecord->removeListener(getPtrSelf(),pvCopy);
235  return Status::Ok;
236 }
237 
239 {
240  if(pvRecord->getTraceLevel()>1)
241  {
242  cout << "MonitorLocal::poll state " << state << endl;
243  }
244  {
245  Lock xx(queueMutex);
246  if(state!=active) return NULLMonitorElement;
247  return queue->getUsed();
248  }
249 }
250 
251 void MonitorLocal::release(MonitorElementPtr const & monitorElement)
252 {
253  if(pvRecord->getTraceLevel()>1)
254  {
255  cout << "MonitorLocal::release state " << state << endl;
256  }
257  {
258  Lock xx(queueMutex);
259  if(state!=active) return;
260  queue->releaseUsed(monitorElement);
261  }
262 }
263 
265 {
266  if(pvRecord->getTraceLevel()>1)
267  {
268  cout << "MonitorLocal::releaseActiveElement state " << state << endl;
269  }
270  {
271  Lock xx(queueMutex);
272  if(state!=active) return;
273  bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
274  if(!result) return;
275  MonitorElementPtr newActive = queue->getFree();
276  if(!newActive) return;
277  BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
278  BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
279  queue->setUsed(activeElement);
280  activeElement = newActive;
281  activeElement->changedBitSet->clear();
282  activeElement->overrunBitSet->clear();
283  }
284  MonitorRequesterPtr requester = monitorRequester.lock();
285  if(!requester) return;
286  requester->monitorEvent(getPtrSelf());
287  return;
288 }
289 
290 void MonitorLocal::dataPut(PVRecordFieldPtr const & pvRecordField)
291 {
292  if(pvRecord->getTraceLevel()>1)
293  {
294  cout << "MonitorLocal::dataPut(pvRecordField)" << endl;
295  }
296  if(state!=active) return;
297  {
298  Lock xx(mutex);
299  size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
300  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
301  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
302  bool isSet = changedBitSet->get(offset);
303  changedBitSet->set(offset);
304  if(isSet) overrunBitSet->set(offset);
305  dataChanged = true;
306  }
307  if(!isGroupPut) {
309  dataChanged = false;
310  }
311 }
312 
314  PVRecordStructurePtr const & requested,
315  PVRecordFieldPtr const & pvRecordField)
316 {
317  if(pvRecord->getTraceLevel()>1)
318  {
319  cout << "MonitorLocal::dataPut(requested,pvRecordField)" << endl;
320  }
321  if(state!=active) return;
322  {
323  Lock xx(mutex);
324  BitSetPtr const &changedBitSet = activeElement->changedBitSet;
325  BitSetPtr const &overrunBitSet = activeElement->overrunBitSet;
326  size_t offsetCopyRequested = pvCopy->getCopyOffset(
327  requested->getPVField());
328  size_t offset = offsetCopyRequested
329  + (pvRecordField->getPVField()->getFieldOffset()
330  - requested->getPVField()->getFieldOffset());
331  bool isSet = changedBitSet->get(offset);
332  changedBitSet->set(offset);
333  if(isSet) overrunBitSet->set(offset);
334  dataChanged = true;
335  }
336  if(!isGroupPut) {
338  dataChanged = false;
339  }
340 }
341 
343 {
344  if(pvRecord->getTraceLevel()>1)
345  {
346  cout << "MonitorLocal::beginGroupPut()" << endl;
347  }
348  if(state!=active) return;
349  {
350  Lock xx(mutex);
351  isGroupPut = true;
352  dataChanged = false;
353  }
354 }
355 
357 {
358  if(pvRecord->getTraceLevel()>1)
359  {
360  cout << "MonitorLocal::endGroupPut dataChanged " << dataChanged << endl;
361  }
362  if(state!=active) return;
363  {
364  Lock xx(mutex);
365  isGroupPut = false;
366  }
367  if(dataChanged) {
368  dataChanged = false;
370  }
371 }
372 
373 void MonitorLocal::unlisten(PVRecordPtr const & pvRecord)
374 {
375  if(pvRecord->getTraceLevel()>1)
376  {
377  cout << "MonitorLocal::unlisten\n";
378  }
379  {
380  Lock xx(mutex);
381  state = deleted;
382  }
383  MonitorRequesterPtr requester = monitorRequester.lock();
384  if(requester) {
385  if(pvRecord->getTraceLevel()>1)
386  {
387  cout << "MonitorLocal::unlisten calling requester->unlisten\n";
388  }
389  requester->unlisten(getPtrSelf());
390  }
391 }
392 
393 
394 bool MonitorLocal::init(PVStructurePtr const & pvRequest)
395 {
396  PVFieldPtr pvField;
397  size_t queueSize = 2;
398  PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>("record._options");
399  MonitorRequesterPtr requester = monitorRequester.lock();
400  if(!requester) return false;
401  if(pvOptions) {
402  PVStringPtr pvString = pvOptions->getSubField<PVString>("queueSize");
403  if(pvString) {
404  try {
405  int32 size;
406  std::stringstream ss;
407  ss << pvString->get();
408  ss >> size;
409  queueSize = size;
410  } catch (...) {
411  requester->message("queueSize " +pvString->get() + " illegal",errorMessage);
412  return false;
413  }
414  }
415  }
416  pvField = pvRequest->getSubField("field");
417  if(!pvField) {
418  pvCopy = PVCopy::create(
419  pvRecord->getPVRecordStructure()->getPVStructure(),
420  pvRequest,"");
421  if(!pvCopy) {
422  requester->message("illegal pvRequest",errorMessage);
423  return false;
424  }
425  } else {
426  if(pvField->getField()->getType()!=structure) {
427  requester->message("illegal pvRequest",errorMessage);
428  return false;
429  }
430  pvCopy = PVCopy::create(
431  pvRecord->getPVRecordStructure()->getPVStructure(),
432  pvRequest,"field");
433  if(!pvCopy) {
434  requester->message("illegal pvRequest",errorMessage);
435  return false;
436  }
437  }
438  if(queueSize<2) queueSize = 2;
439  std::vector<MonitorElementPtr> monitorElementArray;
440  monitorElementArray.reserve(queueSize);
441  for(size_t i=0; i<queueSize; i++) {
442  PVStructurePtr pvStructure = pvCopy->createPVStructure();
443  MonitorElementPtr monitorElement(
444  new MonitorElement(pvStructure));
445  monitorElementArray.push_back(monitorElement);
446  }
447  queue = MonitorElementQueuePtr(new MonitorElementQueue(monitorElementArray));
448  requester->monitorConnect(
449  Status::Ok,
450  getPtrSelf(),
451  pvCopy->getStructure());
452  return true;
453 }
454 
456  PVRecordPtr const & pvRecord,
457  MonitorRequester::shared_pointer const & monitorRequester,
458  PVStructurePtr const & pvRequest)
459 {
460  MonitorLocalPtr monitor(new MonitorLocal(
461  monitorRequester,pvRecord));
462  bool result = monitor->init(pvRequest);
463  if(!result) {
464  MonitorPtr monitor;
466  monitorRequester->monitorConnect(
467  failedToCreateMonitorStatus,monitor,structure);
468  return nullMonitor;
469  }
470  if(pvRecord->getTraceLevel()>0)
471  {
472  cout << "MonitorFactory::createMonitor"
473  << " recordName " << pvRecord->getRecordName() << endl;
474  }
475  return monitor;
476 }
477 
478 }}
virtual void beginGroupPut(PVRecordPtr const &pvRecord)
Begin a set of puts.
virtual void release(MonitorElementPtr const &monitorElement)
pvac::PutEvent result
Definition: clientSync.cpp:117
int i
Definition: scan.c:967
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Definition: pvPlugin.h:25
Listener for PVRecord::message.
Definition: pvDatabase.h:428
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
virtual MonitorElementPtr poll()
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
A lock for multithreading.
Definition: lock.h:36
An element for a monitorQueue.
Definition: monitor.h:54
storage_t::arg_type get() const
Definition: pvData.h:396
virtual void endGroupPut(PVRecordPtr const &pvRecord)
End a set of puts.
Holds all PVA related.
Definition: pvif.h:34
PVString is special case, since it implements SerializableArray.
Definition: pvData.h:521
std::vector< MonitorElementPtr > MonitorElementPtrArray
Definition: monitor.h:42
pvData
Definition: monitor.h:428
void releaseUsed(MonitorElementPtr const &element)
#define POINTER_DEFINITIONS(clazz)
Definition: sharedPtr.h:198
epicsMutex mutex
Definition: pvAccess.cpp:71
epicsShareFunc epics::pvData::MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, epics::pvData::MonitorRequester::shared_pointer const &monitorRequester, epics::pvData::PVStructurePtr const &pvRequest)
virtual void dataPut(PVRecordFieldPtr const &pvRecordField)
pvField has been modified.
const ChannelProcessRequester::weak_pointer requester
Definition: pvAccess.cpp:68
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
void setUsed(MonitorElementPtr const &element)
MonitorElementQueue(std::vector< MonitorElementPtr > monitorElementArray)
Data interface for a structure,.
Definition: pvData.h:712
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
Definition: pvDatabase.h:21
virtual void unlisten(PVRecordPtr const &pvRecord)
Connection to record is being terminated.
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
std::tr1::shared_ptr< PVString > PVStringPtr
Definition: pvData.h:540
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
Definition: pvDatabase.h:31
std::tr1::shared_ptr< PVField > PVFieldPtr
Definition: pvData.h:66
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
Definition: pvDatabase.h:26
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorLocal(MonitorRequester::shared_pointer const &channelMonitorRequester, PVRecordPtr const &pvRecord)
virtual void detach(PVRecordPtr const &pvRecord)
Detach from the record because it is being removed.
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
Definition: monitor.h:40
std::tr1::shared_ptr< BitSet > BitSetPtr
Definition: bitSet.h:26
epicsMutex Mutex
Definition: lock.h:28
bool init(PVStructurePtr const &pvRequest)
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
std::tr1::shared_ptr< Monitor > MonitorPtr
Definition: monitor.h:44