This is Unofficial EPICS BASE Doxygen Site
pdbgroup.cpp
Go to the documentation of this file.
1 
2 #include <stdio.h>
3 
4 // rediect stdio/stderr for iocsh
5 #include <epicsStdio.h>
6 
7 #include <epicsAtomic.h>
8 #include <dbAccess.h>
9 #include <dbChannel.h>
10 #include <dbStaticLib.h>
11 
12 #include <pv/pvAccess.h>
13 #include <pv/configuration.h>
14 #include <pv/epicsException.h>
15 
16 #include "helper.h"
17 #include "pdbgroup.h"
18 #include "pdb.h"
19 
20 namespace pvd = epics::pvData;
21 namespace pva = epics::pvAccess;
22 
27 
29 
30 void pdb_group_event(void *user_arg, struct dbChannel *chan,
31  int eventsRemaining, struct db_field_log *pfl)
32 {
33  DBEvent *evt=(DBEvent*)user_arg;
34  unsigned idx = evt->index;
35  try{
36  PDBGroupPV::shared_pointer self(std::tr1::static_pointer_cast<PDBGroupPV>(((PDBGroupPV*)evt->self)->shared_from_this()));
37  PDBGroupPV::Info& info = self->members[idx];
38 
40  {
41 
42  Guard G(self->lock);
43 
44  self->scratch.clear();
45  if(evt->dbe_mask&DBE_PROPERTY || !self->monatomic)
46  {
47  DBScanLocker L(dbChannelRecord(info.chan));
48  self->members[idx].pvif->put(self->scratch, evt->dbe_mask, pfl);
49 
50  } else {
51  // we ignore 'pfl' (and the dbEvent queue) when collecting an atomic snapshot
52 
53  DBManyLocker L(info.locker); // lock only those records in the triggers list
54  FOREACH(PDBGroupPV::Info::triggers_t::const_iterator, it, end, info.triggers)
55  {
56  size_t i = *it;
57  // go get a consistent snapshot we must ignore the db_field_log which came through the dbEvent buffer
58  LocalFL FL(NULL, self->members[i].chan); // create a read fl if needed
59  self->members[i].pvif->put(self->scratch, evt->dbe_mask, FL.pfl);
60  }
61  }
62 
63  if(!(evt->dbe_mask&DBE_PROPERTY)) {
64  if(!info.had_initial_VALUE) {
65  info.had_initial_VALUE = true;
66  assert(self->initial_waits>0);
67  self->initial_waits--;
68  }
69  } else {
70  if(!info.had_initial_PROPERTY) {
71  info.had_initial_PROPERTY = true;
72  assert(self->initial_waits>0);
73  self->initial_waits--;
74  }
75  }
76 
77  if(self->initial_waits==0) {
78  self->interested_iterating = true;
79 
80  FOREACH(PDBGroupPV::interested_t::const_iterator, it, end, self->interested) {
81  PDBGroupMonitor& mon = **it;
82  mon.post(G, self->scratch); // G unlocked
83  }
84 
85  {
86  Guard G(self->lock);
87 
88  assert(self->interested_iterating);
89 
90  while(!self->interested_add.empty()) {
91  PDBGroupPV::interested_t::iterator first(self->interested_add.begin());
92  self->interested.insert(*first);
93  self->interested_add.erase(first);
94  }
95 
96  temp.swap(self->interested_remove);
97  for(PDBGroupPV::interested_remove_t::iterator it(temp.begin()),
98  end(temp.end()); it != end; ++it)
99  {
100  self->interested.erase(static_cast<PDBGroupMonitor*>(it->get()));
101  }
102 
103  self->interested_iterating = false;
104 
105  self->finalizeMonitor();
106  }
107  }
108  }
109 
110  }catch(std::tr1::bad_weak_ptr&){
111  /* We are racing destruction of the PDBGroupPV, but things are ok.
112  * The destructor is running, but has not completed db_cancel_event()
113  * so storage is still valid.
114  * Just do nothing
115  */
116  }catch(std::exception& e){
117  std::cerr<<"Unhandled exception in pdb_group_event(): "<<e.what()<<"\n"
118  <<SHOW_EXCEPTION(e)<<"\n";
119  }
120 }
121 
123  :pgatomic(false)
124  ,monatomic(false)
125  ,interested_iterating(false)
126  ,initial_waits(0)
127 {
128  epics::atomic::increment(num_instances);
129 }
130 
132 {
133  epics::atomic::decrement(num_instances);
134 }
135 
136 pva::Channel::shared_pointer
137 PDBGroupPV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
138  const pva::ChannelRequester::shared_pointer& req)
139 {
140  PDBGroupChannel::shared_pointer ret(new PDBGroupChannel(shared_from_this(), prov, req));
141 
142  ret->cred.update(req);
143 
144  ret->aspvt.resize(members.size());
145  for(size_t i=0, N=members.size(); i<N; i++)
146  {
147  ret->aspvt[i].add(members[i].chan, ret->cred);
148  }
149 
150  return ret;
151 }
152 
153 // caller must not hold lock
155 {
156  Guard G(lock);
157  if(interested.empty() && interested_add.empty()) {
158  // first monitor
159  // start subscriptions
160 
161  size_t ievts = 0;
162  for(size_t i=0; i<members.size(); i++) {
163  PDBGroupPV::Info& info = members[i];
164 
165  if(!!info.evt_VALUE) {
166  db_event_enable(info.evt_VALUE.subscript);
167  db_post_single_event(info.evt_VALUE.subscript);
168  ievts++;
169  info.had_initial_VALUE = false;
170  } else {
171  info.had_initial_VALUE = true;
172  }
174  db_event_enable(info.evt_PROPERTY.subscript);
175  db_post_single_event(info.evt_PROPERTY.subscript);
176  ievts++;
177  info.had_initial_PROPERTY = false;
178  }
179  initial_waits = ievts;
180 
181  } else if(initial_waits==0) {
182  // new subscriber and already had initial update
183  mon->post(G);
184  } // else new subscriber, but no initial update. so just wait
185 
187  interested_add.insert(mon);
188  else
189  interested.insert(mon);
190 }
191 
192 // caller must not hold lock
194 {
195  Guard G(lock);
196 
197  if(interested_add.erase(mon)) {
198  // and+remove while iterating. no-op
199 
200  } else if(interested_iterating) {
201  // keep the monitor alive until we've finished iterating
202  interested_remove.insert(mon->shared_from_this());
203 
204  } else {
205  interested.erase(mon);
206  finalizeMonitor();
207  }
208 }
209 
210 // must hold lock
212 {
214 
215  if(!interested.empty())
216  return;
217 
218  // last subscriber
219  for(size_t i=0; i<members.size(); i++) {
220  PDBGroupPV::Info& info = members[i];
221 
222  if(!!info.evt_VALUE) {
223  db_event_disable(info.evt_VALUE.subscript);
224  }
225  db_event_disable(info.evt_PROPERTY.subscript);
226  }
227 }
228 
229 void PDBGroupPV::show(int lvl)
230 {
231  // no locking as we only print things which are const after initialization
232 
233  printf(" Atomic Get/Put:%s Monitor:%s Members:%zu\n",
234  pgatomic?"yes":"no", monatomic?"yes":"no", members.size());
235 
236  if(lvl<=1)
237  return;
238 
240  it != end; ++it)
241  {
242  const Info& info = *it;
243  printf(" ");
244  info.attachment.show(); // printf()s
245  printf("\t<-> %s\n", dbChannelName(info.chan));
246  }
247 }
248 
249 
250 PDBGroupChannel::PDBGroupChannel(const PDBGroupPV::shared_pointer& pv,
251  const std::tr1::shared_ptr<pva::ChannelProvider>& prov,
252  const pva::ChannelRequester::shared_pointer& req)
253  :BaseChannel(pv->name, prov, req, pv->fielddesc)
254  ,pv(pv)
255 {
256  epics::atomic::increment(num_instances);
257 }
258 
260 {
261  epics::atomic::decrement(num_instances);
262 }
263 
264 void PDBGroupChannel::printInfo(std::ostream& out)
265 {
266  out<<"PDBGroupChannel";
267 }
268 
269 pva::ChannelPut::shared_pointer
271  pva::ChannelPutRequester::shared_pointer const & requester,
272  pvd::PVStructure::shared_pointer const & pvRequest)
273 {
274  PDBGroupPut::shared_pointer ret(new PDBGroupPut(shared_from_this(), requester, pvRequest));
275  requester->channelPutConnect(pvd::Status(), ret, fielddesc);
276  return ret;
277 }
278 
279 pva::Monitor::shared_pointer
281  pva::MonitorRequester::shared_pointer const & requester,
282  pvd::PVStructure::shared_pointer const & pvRequest)
283 {
284  PDBGroupMonitor::shared_pointer ret(new PDBGroupMonitor(pv->shared_from_this(), requester, pvRequest));
285  ret->weakself = ret;
286  assert(!!pv->complete);
287  guard_t G(pv->lock);
288  ret->connect(G, pv->complete);
289  return ret;
290 }
291 
292 
293 
294 PDBGroupPut::PDBGroupPut(const PDBGroupChannel::shared_pointer& channel,
295  const requester_type::shared_pointer& requester,
296  const epics::pvData::PVStructure::shared_pointer &pvReq)
297  :channel(channel)
298  ,requester(requester)
299  ,atomic(channel->pv->pgatomic)
300  ,doWait(false)
301  ,doProc(PVIF::ProcPassive)
302  ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
303  ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
304 {
305  epics::atomic::increment(num_instances);
306  try {
307  getS<pvd::boolean>(pvReq, "record._options.atomic", atomic);
308 
309  getS<pvd::boolean>(pvReq, "record._options.block", doWait);
310 
311  std::string proccmd;
312  if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
313  if(proccmd=="true") {
315  } else if(proccmd=="false") {
317  doWait = false; // no point in waiting
318  } else if(proccmd=="passive") {
320  } else {
321  requester->message("process= expects: true|false|passive", pva::warningMessage);
322  }
323  }
324  }catch(std::exception& e){
325  requester->message(std::string("Error processing request options: ")+e.what());
326  }
327 
328  pvf->getSubFieldT<pvd::PVBoolean>("record._options.atomic")->put(atomic);
329 
330 
331  const size_t npvs = channel->pv->members.size();
332  pvif.resize(npvs);
333  for(size_t i=0; i<npvs; i++)
334  {
335  PDBGroupPV::Info& info = channel->pv->members[i];
336 
337  pvif[i].reset(info.builder->attach(info.chan, pvf, info.attachment));
338  }
339 }
340 
342 {
343  epics::atomic::decrement(num_instances);
344 }
345 
346 void PDBGroupPut::put(pvd::PVStructure::shared_pointer const & value,
347  pvd::BitSet::shared_pointer const & changed)
348 {
349  // assume value may be a different struct each time... lot of wasted prep work
350  const size_t npvs = channel->pv->members.size();
351  std::vector<std::tr1::shared_ptr<PVIF> > putpvif(npvs);
352 
353  for(size_t i=0; i<npvs; i++)
354  {
355  PDBGroupPV::Info& info = channel->pv->members[i];
356  if(!info.allowProc) continue;
357 
358  putpvif[i].reset(info.builder->attach(info.chan, value, info.attachment));
359  }
360 
361  pvd::Status ret;
362  if(atomic) {
363  DBManyLocker L(channel->pv->locker);
364  for(size_t i=0; ret && i<npvs; i++) {
365  if(!putpvif[i].get()) continue;
366 
367  ret |= putpvif[i]->get(*changed, doProc);
368  }
369 
370  } else {
371  for(size_t i=0; ret && i<npvs; i++)
372  {
373  if(!putpvif[i].get()) continue;
374 
375  PDBGroupPV::Info& info = channel->pv->members[i];
376 
377  DBScanLocker L(dbChannelRecord(info.chan));
378 
379  ret |= putpvif[i]->get(*changed,
381  channel->aspvt[i].canWrite());
382  }
383  }
384 
385  requester_type::shared_pointer req(requester.lock());
386  if(req)
387  req->putDone(ret, shared_from_this());
388 }
389 
391 {
392  const size_t npvs = pvif.size();
393 
394  changed->clear();
395  if(atomic) {
396  DBManyLocker L(channel->pv->locker);
397  for(size_t i=0; i<npvs; i++)
399  } else {
400 
401  for(size_t i=0; i<npvs; i++)
402  {
403  PDBGroupPV::Info& info = channel->pv->members[i];
404 
405  DBScanLocker L(dbChannelRecord(info.chan));
407  }
408  }
409  //TODO: report unused fields as changed?
410  changed->clear();
411  changed->set(0);
412 
413  requester_type::shared_pointer req(requester.lock());
414  if(req)
415  req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
416 }
417 
418 PDBGroupMonitor::PDBGroupMonitor(const PDBGroupPV::shared_pointer& pv,
419  const epics::pvAccess::MonitorRequester::weak_pointer &requester,
420  const pvd::PVStructure::shared_pointer& pvReq)
421  :BaseMonitor(pv->lock, requester, pvReq)
422  ,pv(pv)
423 {
424  epics::atomic::increment(num_instances);
425 }
426 
428 {
429  destroy();
430  epics::atomic::decrement(num_instances);
431 }
432 
434 {
436  PDBGroupPV::shared_pointer pv;
437  {
438  Guard G(lock);
439  this->pv.swap(pv);
440  }
441 }
442 
444 {
445  pv->addMonitor(this);
446 }
447 
449 {
450  pv->removeMonitor(this);
451 }
452 
454 {
455  Guard G(pv->lock);
456  post(G);
457 }
std::vector< std::tr1::shared_ptr< PVIF > > pvif
Definition: pdbgroup.h:177
DBManyLock locker
Definition: pdbgroup.h:96
virtual void printInfo()
Definition: pvAccess.h:1126
Definition: link.h:174
members_t members
Definition: pdbgroup.h:104
epics::pvData::PVStructurePtr pvf
Definition: pdbgroup.h:176
PDBGroupPV::shared_pointer pv
Definition: pdbgroup.h:200
void * self
Definition: pvif.h:218
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
dbEventSubscription subscript
Definition: pvif.h:216
bool doWait
Definition: pdbgroup.h:172
static size_t num_instances
Definition: pdbgroup.h:146
bool monatomic
Definition: pdbgroup.h:85
#define FOREACH(ITERTYPE, IT, END, C)
Definition: helper.h:6
int i
Definition: scan.c:967
void pdb_group_event(void *user_arg, struct dbChannel *chan, int eventsRemaining, struct db_field_log *pfl)
Definition: pdbgroup.cpp:30
Definition: tool_lib.h:67
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
virtual epics::pvData::Monitor::shared_pointer createMonitor(epics::pvData::MonitorRequester::shared_pointer const &requester, epics::pvData::PVStructure::shared_pointer const &pvRequest) OVERRIDE FINAL
Definition: pdbgroup.cpp:280
virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut(epics::pvAccess::ChannelPutRequester::shared_pointer const &requester, epics::pvData::PVStructure::shared_pointer const &pvRequest) OVERRIDE FINAL
Definition: pdbgroup.cpp:270
#define printf
Definition: epicsStdio.h:41
std::set< BaseMonitor::shared_pointer > interested_remove_t
Definition: pdbgroup.h:114
virtual epics::pvAccess::Channel::shared_pointer connect(const std::tr1::shared_ptr< PDBProvider > &prov, const epics::pvAccess::ChannelRequester::shared_pointer &req) OVERRIDE FINAL
Definition: pdbgroup.cpp:137
#define NULL
Definition: catime.c:38
#define DBE_ALARM
Definition: caeventmask.h:41
db_field_log * pfl
Definition: pvif.h:243
interested_t interested_add
Definition: pdbgroup.h:112
static size_t num_instances
Definition: pdbgroup.h:179
virtual void get() OVERRIDE FINAL
Definition: pdbgroup.cpp:390
PDBGroupPut(const PDBGroupChannel::shared_pointer &channel, const epics::pvAccess::ChannelPutRequester::shared_pointer &requester, const epics::pvData::PVStructure::shared_pointer &pvReq)
Definition: pdbgroup.cpp:294
size_t initial_waits
Definition: pdbgroup.h:117
virtual void destroy() OVERRIDE FINAL
Definition: pdbgroup.cpp:433
#define SHOW_EXCEPTION(EI)
epics::pvData::StructureConstPtr fielddesc
Definition: pdb.h:19
#define DBE_VALUE
Definition: caeventmask.h:38
PDBGroupChannel(const PDBGroupPV::shared_pointer &pv, const std::tr1::shared_ptr< epics::pvAccess::ChannelProvider > &prov, const epics::pvAccess::ChannelRequester::shared_pointer &req)
Definition: pdbgroup.cpp:250
Holds all PVA related.
Definition: pvif.h:34
bool had_initial_PROPERTY
Definition: pdbgroup.h:99
shared_pointer shared_from_this()
Definition: pvahelper.h:73
pvData
Definition: monitor.h:428
void show() const
Definition: pdb.cpp:739
PDBGroupChannel::shared_pointer channel
Definition: pdbgroup.h:168
Definition: pvif.h:241
interested_remove_t interested_remove
Definition: pdbgroup.h:115
virtual void onStop() OVERRIDE FINAL
Definition: pdbgroup.cpp:448
virtual void onStart() OVERRIDE FINAL
Definition: pdbgroup.cpp:443
virtual ~PDBGroupMonitor()
Definition: pdbgroup.cpp:427
void addMonitor(PDBGroupMonitor *)
Definition: pdbgroup.cpp:154
std::string name
Definition: pdbgroup.h:88
meta::decorate_const< Info >::type * const_iterator
Definition: sharedVector.h:300
virtual void show(int lvl) OVERRIDE
Definition: pdbgroup.cpp:229
bool interested_iterating
Definition: pdbgroup.h:111
size_t size() const
Number of elements visible through this vector.
Definition: sharedVector.h:220
epics::pvData::BitSetPtr changed
Definition: pdbgroup.h:175
requester_type::weak_pointer requester
Definition: pdbgroup.h:169
void removeMonitor(PDBGroupMonitor *)
Definition: pdbgroup.cpp:193
#define DBE_PROPERTY
Definition: caeventmask.h:42
unsigned dbe_mask
Definition: pvif.h:217
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
Definition: pvahelper.h:136
unsigned index
Definition: pvif.h:219
virtual void lock()
Definition: pvAccess.h:97
shared_pointer shared_from_this()
Definition: pdbgroup.h:77
static size_t num_instances
Definition: pdbgroup.h:119
Class that holds the data for each possible scalar type.
Definition: pvData.h:54
Definition: pvif.h:214
PDBGroupMonitor(const PDBGroupPV::shared_pointer &pv, const requester_type::weak_pointer &requester, const epics::pvData::PVStructure::shared_pointer &pvReq)
Definition: pdbgroup.cpp:418
FieldName attachment
Definition: pdbgroup.h:93
DBEvent evt_VALUE
Definition: pdbgroup.h:98
Definition: pvif.h:355
triggers_t triggers
Definition: pdbgroup.h:95
virtual ~PDBGroupPut()
Definition: pdbgroup.cpp:341
void finalizeMonitor()
Definition: pdbgroup.cpp:211
bool pgatomic
Definition: pdbgroup.h:85
const requester_type::weak_pointer requester
Definition: pvahelper.h:35
bool atomic
Definition: pdbgroup.h:172
epicsGuard< epicsMutex > Guard
Definition: pdbgroup.cpp:28
bool had_initial_VALUE
Definition: pdbgroup.h:99
virtual void requestUpdate() OVERRIDE FINAL
Definition: pdbgroup.cpp:453
virtual ~PDBGroupChannel()
Definition: pdbgroup.cpp:259
interested_t interested
Definition: pdbgroup.h:112
std::tr1::shared_ptr< PVIFBuilder > builder
Definition: pdbgroup.h:92
static size_t num_instances
Definition: pdbgroup.h:204
virtual void destroy()
Definition: pvahelper.h:273
DBEvent evt_PROPERTY
Definition: pdbgroup.h:98
epicsMutex & lock
Definition: pvahelper.h:77
#define false
Definition: flexdef.h:85
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
const epics::pvData::StructureConstPtr fielddesc
Definition: pvahelper.h:36
virtual ~PDBGroupPV()
Definition: pdbgroup.cpp:131
virtual void put(epics::pvData::PVStructure::shared_pointer const &pvPutStructure, epics::pvData::BitSet::shared_pointer const &putBitSet) OVERRIDE FINAL
Definition: pdbgroup.cpp:346
PVIF::proc_t doProc
Definition: pdbgroup.h:173
epicsMutex lock
Definition: pdbgroup.h:83