This is Unofficial EPICS BASE Doxygen Site
pdbsingle.cpp
Go to the documentation of this file.
1 #include <sstream>
2 
3 #include <string.h>
4 
5 #include <dbAccess.h>
6 #include <dbChannel.h>
7 #include <dbStaticLib.h>
8 #include <errlog.h>
9 #include <dbNotify.h>
10 #include <osiSock.h>
11 #include <epicsAtomic.h>
12 
13 #include <pv/epicsException.h>
14 #include <pv/pvAccess.h>
15 #include <pv/security.h>
16 #include <pv/configuration.h>
17 
18 #include "helper.h"
19 #include "pdbsingle.h"
20 #include "pdb.h"
21 
22 namespace pvd = epics::pvData;
23 namespace pva = epics::pvAccess;
24 
29 
31 
32 static
33 void pdb_single_event(void *user_arg, struct dbChannel *chan,
34  int eventsRemaining, struct db_field_log *pfl)
35 {
36  DBEvent *evt=(DBEvent*)user_arg;
37  try{
38  PDBSinglePV::shared_pointer self(std::tr1::static_pointer_cast<PDBSinglePV>(((PDBSinglePV*)evt->self)->shared_from_this()));
40  {
41  Guard G(self->lock);
42 
43  // we have exclusive use of self->scratch
44  self->scratch.clear();
45  {
46  DBScanLocker L(dbChannelRecord(self->chan));
47  // dbGet() into self->complete
48  self->pvif->put(self->scratch, evt->dbe_mask, pfl);
49  }
50 
51  if(evt->dbe_mask&DBE_PROPERTY)
52  self->hadevent_PROPERTY = true;
53  else
54  self->hadevent_VALUE = true;
55 
56  if(self->hadevent_VALUE && self->hadevent_PROPERTY) {
57  self->interested_iterating = true;
58 
59  FOREACH(PDBSinglePV::interested_t::const_iterator, it, end, self->interested) {
60  PDBSingleMonitor& mon = **it;
61  // from self->complete into monitor queue element
62  mon.post(G, self->scratch); // G unlocked during call
63  }
64 
65  while(!self->interested_add.empty()) {
66  PDBSinglePV::interested_t::iterator first(self->interested_add.begin());
67  self->interested.insert(*first);
68  self->interested_add.erase(first);
69  }
70 
71  temp.swap(self->interested_remove);
72  for(PDBSinglePV::interested_remove_t::iterator it(temp.begin()),
73  end(temp.end()); it != end; ++it)
74  {
75  self->interested.erase(static_cast<PDBSingleMonitor*>(it->get()));
76  }
77 
78  self->interested_iterating = false;
79 
80  self->finalizeMonitor();
81  }
82  }
83 
84  }catch(std::tr1::bad_weak_ptr&){
85  /* We are racing destruction of the PDBSinglePV, but things are ok.
86  * The destructor is running, but has not completed db_cancel_event()
87  * so storage is still valid.
88  * Just do nothing
89  */
90  }catch(std::exception& e){
91  std::cerr<<"Unhandled exception in pdb_single_event(): "<<e.what()<<"\n"
92  <<SHOW_EXCEPTION(e)<<"\n";
93  }
94 }
95 
97  const PDBProvider::shared_pointer& prov)
98  :provider(prov)
99  ,builder(new ScalarBuilder)
100  ,interested_iterating(false)
101  ,evt_VALUE(this)
102  ,evt_PROPERTY(this)
103  ,hadevent_VALUE(false)
104  ,hadevent_PROPERTY(false)
105 {
106  this->chan.swap(chan);
107  fielddesc = std::tr1::static_pointer_cast<const pvd::Structure>(builder->dtype(this->chan));
108 
109  complete = pvd::getPVDataCreate()->createPVStructure(fielddesc);
110  FieldName temp;
111  pvif.reset(builder->attach(this->chan, complete, temp));
112 
113  epics::atomic::increment(num_instances);
114 }
115 
117 {
118  epics::atomic::decrement(num_instances);
119 }
120 
122 {
123  evt_VALUE.create(provider->event_context, this->chan, &pdb_single_event, DBE_VALUE|DBE_ALARM);
124  evt_PROPERTY.create(provider->event_context, this->chan, &pdb_single_event, DBE_PROPERTY);
125 }
126 
127 pva::Channel::shared_pointer
128 PDBSinglePV::connect(const std::tr1::shared_ptr<PDBProvider>& prov,
129  const pva::ChannelRequester::shared_pointer& req)
130 {
131  PDBSingleChannel::shared_pointer ret(new PDBSingleChannel(shared_from_this(), req));
132 
133  ret->cred.update(req);
134 
135  ret->aspvt.add(chan, ret->cred);
136 
137  return ret;
138 }
139 
141 {
142  Guard G(lock);
143  if(interested.empty() && interested_add.empty()) {
144  // first monitor
145  // start subscription
146 
147  hadevent_VALUE = false;
148  hadevent_PROPERTY = false;
149  db_event_enable(evt_VALUE.subscript);
150  db_event_enable(evt_PROPERTY.subscript);
151  db_post_single_event(evt_VALUE.subscript);
152  db_post_single_event(evt_PROPERTY.subscript);
153 
154  } else if(hadevent_VALUE && hadevent_PROPERTY) {
155  // new subscriber and already had initial update
156  mon->post(G);
157  } // else new subscriber, but no initial update. so just wait
158 
160  interested_add.insert(mon);
161  } else {
162  interested.insert(mon);
163  }
164 }
165 
167 {
168  Guard G(lock);
169 
170  if(interested_add.erase(mon)) {
171  // and+remove while iterating. no-op
172 
173  } else if(interested_iterating) {
174  // keep monitor alive while iterating
175  interested_remove.insert(mon->shared_from_this());
176 
177  } else {
178  interested.erase(mon);
179  finalizeMonitor();
180  }
181 }
182 
184 {
186 
187  if(interested.empty()) {
188  db_event_disable(evt_VALUE.subscript);
189  db_event_disable(evt_PROPERTY.subscript);
190  }
191 }
192 
193 PDBSingleChannel::PDBSingleChannel(const PDBSinglePV::shared_pointer& pv,
194  const pva::ChannelRequester::shared_pointer& req)
195  :BaseChannel(dbChannelName(pv->chan), pv->provider, req, pv->fielddesc)
196  ,pv(pv)
197 {
198  assert(!!this->pv);
199  epics::atomic::increment(num_instances);
200 }
201 
203 {
204  epics::atomic::decrement(num_instances);
205 }
206 
207 void PDBSingleChannel::printInfo(std::ostream& out)
208 {
209  if(aspvt.canWrite())
210  out << "RW ";
211  else
212  out << "RO ";
213  out<<(&cred.user[0])<<'@'<<(&cred.host[0]);
214  for(size_t i=0, N=cred.groups.size(); i<N; i++) {
215  out<<", "<<(&cred.groups[i][0]);
216  }
217  out<<"\n";
218 }
219 
220 pva::ChannelPut::shared_pointer
222  pva::ChannelPutRequester::shared_pointer const & requester,
223  pvd::PVStructure::shared_pointer const & pvRequest)
224 {
225  PDBSinglePut::shared_pointer ret(new PDBSinglePut(shared_from_this(), requester, pvRequest));
226  requester->channelPutConnect(pvd::Status(), ret, fielddesc);
227  return ret;
228 }
229 
230 
231 pva::Monitor::shared_pointer
233  pva::MonitorRequester::shared_pointer const & requester,
234  pvd::PVStructure::shared_pointer const & pvRequest)
235 {
236  PDBSingleMonitor::shared_pointer ret(new PDBSingleMonitor(pv->shared_from_this(), requester, pvRequest));
237  ret->weakself = ret;
238  assert(!!pv->complete);
239  guard_t G(pv->lock);
240  ret->connect(G, pv->complete);
241  return ret;
242 }
243 
244 
245 static
246 int single_put_callback(struct processNotify *notify,notifyPutType type)
247 {
248  PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt;
249 
250  if(notify->status!=notifyOK) return 0;
251 
252  // we've previously ensured that wait_changed&DBE_VALUE is true
253 
254  switch(type) {
255  case putDisabledType:
256  return 0;
257  case putFieldType:
258  {
259  DBScanLocker L(notify->chan);
260  self->wait_pvif->get(*self->wait_changed);
261  }
262  break;
263  case putType:
264  self->wait_pvif->get(*self->wait_changed);
265  break;
266  }
267  return 1;
268 }
269 
270 static
271 void single_done_callback(struct processNotify *notify)
272 {
273  PDBSinglePut *self = (PDBSinglePut*)notify->usrPvt;
274  pvd::Status sts;
275 
276  // busy state should be 1 (normal completion) or 2 (if cancel in progress)
277  if(epics::atomic::compareAndSwap(self->notifyBusy, 1, 0)==0) {
278  std::cerr<<"PDBSinglePut dbNotify state error?\n";
279  }
280 
281  switch(notify->status) {
282  case notifyOK:
283  break;
284  case notifyCanceled:
285  return; // skip notification
286  case notifyError:
287  sts = pvd::Status::error("Error in dbNotify");
288  break;
289  case notifyPutDisabled:
290  sts = pvd::Status::error("Put disabled");
291  break;
292  }
293 
294  PDBSinglePut::requester_type::shared_pointer req(self->requester.lock());
295  if(req)
296  req->putDone(sts, self->shared_from_this());
297 }
298 
299 PDBSinglePut::PDBSinglePut(const PDBSingleChannel::shared_pointer &channel,
300  const pva::ChannelPutRequester::shared_pointer &requester,
301  const pvd::PVStructure::shared_pointer &pvReq)
302  :channel(channel)
303  ,requester(requester)
304  ,changed(new pvd::BitSet(channel->fielddesc->getNumberFields()))
305  ,pvf(pvd::getPVDataCreate()->createPVStructure(channel->fielddesc))
306  ,pvif(channel->pv->builder->attach(channel->pv->chan, pvf, FieldName()))
307  ,notifyBusy(0)
308  ,doProc(PVIF::ProcPassive)
309  ,doWait(false)
310 {
311  epics::atomic::increment(num_instances);
312  dbChannel *chan = channel->pv->chan;
313 
314  try {
315  getS<pvd::boolean>(pvReq, "record._options.block", doWait);
316  } catch(std::runtime_error& e) {
317  requester->message(std::string("block= not understood : ")+e.what(), pva::warningMessage);
318  }
319 
320  std::string proccmd;
321  if(getS<std::string>(pvReq, "record._options.process", proccmd)) {
322  if(proccmd=="true") {
324  } else if(proccmd=="false") {
326  doWait = false; // no point in waiting
327  } else if(proccmd=="passive") {
329  } else {
330  requester->message("process= expects: true|false|passive", pva::warningMessage);
331  }
332  }
333 
334  memset((void*)&notify, 0, sizeof(notify));
335  notify.usrPvt = (void*)this;
336  notify.chan = chan;
337  notify.putCallback = &single_put_callback;
338  notify.doneCallback = &single_done_callback;
339 }
340 
342 {
343  cancel();
344  epics::atomic::decrement(num_instances);
345 }
346 
347 void PDBSinglePut::put(pvd::PVStructure::shared_pointer const & value,
348  pvd::BitSet::shared_pointer const & changed)
349 {
350  dbChannel *chan = channel->pv->chan;
351  dbFldDes *fld = dbChannelFldDes(chan);
352 
353  pvd::Status ret;
354  if(!channel->aspvt.canWrite()) {
355  ret = pvd::Status::error("Put not permitted");
356 
357  } else if(dbChannelFieldType(chan)>=DBF_INLINK && dbChannelFieldType(chan)<=DBF_FWDLINK) {
358  try{
359  std::string lval(value->getSubFieldT<pvd::PVScalar>("value")->getAs<std::string>());
360  long status = dbChannelPutField(chan, DBF_STRING, lval.c_str(), 1);
361  if(status)
362  ret = pvd::Status(pvd::Status::error("dbPutField() error"));
363  }catch(std::exception& e) {
364  std::ostringstream strm;
365  strm<<"Failed to put link field "<<dbChannelName(chan)<<"."<<fld->name<<" : "<<e.what()<<"\n";
366  ret = pvd::Status(pvd::Status::error(strm.str()));
367  }
368 
369  } else if(doWait) {
370  // TODO: dbNotify doesn't allow us for force processing
371 
372  // assume value may be a different struct each time
373  p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(channel->pv->chan, value, FieldName()));
374  unsigned mask = putpvif->dbe(*changed);
375 
376  if(mask!=DBE_VALUE) {
377  requester_type::shared_pointer req(requester.lock());
378  if(req)
379  req->message("block=true only supports .value (empty put mask)", pva::warningMessage);
380  }
381 
382  if(epics::atomic::compareAndSwap(notifyBusy, 0, 1)!=0)
383  throw std::logic_error("Previous put() not complete");
384 
385  notify.requestType = (mask&DBE_VALUE) ? putProcessRequest : processRequest;
386 
387  wait_pvif = PTRMOVE(putpvif);
389 
390  dbProcessNotify(&notify);
391 
392  return; // skip notification
393  } else {
394  // assume value may be a different struct each time
395  p2p::auto_ptr<PVIF> putpvif(channel->pv->builder->attach(channel->pv->chan, value, FieldName()));
396  try{
397  DBScanLocker L(chan);
398  putpvif->get(*changed, doProc);
399 
400  }catch(std::runtime_error& e){
401  ret = pvd::Status::error(e.what());
402  }
403  }
404  requester_type::shared_pointer req(requester.lock());
405  if(req)
406  req->putDone(ret, shared_from_this());
407 }
408 
410 {
411  if(epics::atomic::compareAndSwap(notifyBusy, 1, 2)==1) {
412  dbNotifyCancel(&notify);
413  wait_changed.reset();
414  wait_pvif.reset();
415  epics::atomic::set(notifyBusy, 0);
416  }
417 }
418 
420 {
421  changed->clear();
422  {
423  DBScanLocker L(pvif->chan);
425  }
426  //TODO: report unused fields as changed?
427  changed->clear();
428  changed->set(0);
429 
430  requester_type::shared_pointer req(requester.lock());
431  if(req)
432  req->getDone(pvd::Status(), shared_from_this(), pvf, changed);
433 }
434 
435 PDBSingleMonitor::PDBSingleMonitor(const PDBSinglePV::shared_pointer& pv,
436  const requester_t::shared_pointer& requester,
437  const pvd::PVStructure::shared_pointer& pvReq)
438  :BaseMonitor(pv->lock, requester, pvReq)
439  ,pv(pv)
440 {
441  epics::atomic::increment(num_instances);
442 }
443 
445 {
446  destroy();
447  epics::atomic::decrement(num_instances);
448 }
449 
451 {
453 }
454 
456 {
457  pv->addMonitor(this);
458 }
459 
461 {
462  guard_t G(pv->lock);
463 
464  pv->removeMonitor(this);
465 }
466 
468 {
469  guard_t G(pv->lock);
470  post(G);
471 }
bool interested_iterating
Definition: pdbsingle.h:49
virtual void onStart() OVERRIDE FINAL
Definition: pdbsingle.cpp:455
void activate()
Definition: pdbsingle.cpp:121
interested_t interested
Definition: pdbsingle.h:50
virtual void printInfo()
Definition: pvAccess.h:1126
bool canWrite()
Definition: pvif.cpp:153
Definition: link.h:174
PVScalar is the base class for each scalar field.
Definition: pvData.h:272
PDBSingleChannel(const PDBSinglePV::shared_pointer &pv, const epics::pvAccess::ChannelRequester::shared_pointer &req)
Definition: pdbsingle.cpp:193
p2p::auto_ptr< PVIF > pvif
Definition: pdbsingle.h:44
epics::pvData::BitSetPtr changed
Definition: pdbsingle.h:111
void * self
Definition: pvif.h:218
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
dbEventSubscription subscript
Definition: pvif.h:216
static size_t num_instances
Definition: pdbsingle.h:121
shared_pointer shared_from_this()
Definition: pdbsingle.h:27
virtual epics::pvAccess::ChannelPut::shared_pointer createChannelPut(epics::pvAccess::ChannelPutRequester::shared_pointer const &requester, epics::pvData::PVStructure::shared_pointer const &pvRequest) OVERRIDE FINAL
Definition: pdbsingle.cpp:221
#define FOREACH(ITERTYPE, IT, END, C)
Definition: helper.h:6
pvd::Status status
static Status error(const std::string &m)
Definition: status.h:50
int i
Definition: scan.c:967
PDBSingleMonitor(const PDBSinglePV::shared_pointer &pv, const requester_t::shared_pointer &requester, const epics::pvData::PVStructure::shared_pointer &pvReq)
Definition: pdbsingle.cpp:435
virtual ~PDBSingleChannel()
Definition: pdbsingle.cpp:202
PDBSingleChannel::shared_pointer channel
Definition: pdbsingle.h:108
virtual ~PDBSinglePut()
Definition: pdbsingle.cpp:341
interested_remove_t interested_remove
Definition: pdbsingle.h:53
virtual void put(epics::pvData::PVStructure::shared_pointer const &pvPutStructure, epics::pvData::BitSet::shared_pointer const &putBitSet) OVERRIDE FINAL
Definition: pdbsingle.cpp:347
Definition: tool_lib.h:67
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
std::vector< char > host
Definition: pvif.h:93
ASCLIENT aspvt
Definition: pdbsingle.h:84
virtual void onStop() OVERRIDE FINAL
Definition: pdbsingle.cpp:460
int notifyBusy
Definition: pdbsingle.h:115
pvd::StructureConstPtr type
bool hadevent_PROPERTY
Definition: pdbsingle.h:56
#define NULL
Definition: catime.c:38
#define DBE_ALARM
Definition: caeventmask.h:41
virtual void get() OVERRIDE FINAL
Definition: pdbsingle.cpp:419
virtual void destroy() OVERRIDE FINAL
Definition: pdbsingle.cpp:450
p2p::auto_ptr< PVIF > pvif
Definition: pdbsingle.h:113
p2p::auto_ptr< PVIF > wait_pvif
Definition: pdbsingle.h:113
void swap(DBCH &)
Definition: pvif.cpp:67
epicsMutex lock
Definition: pdbsingle.h:41
#define SHOW_EXCEPTION(EI)
epicsGuard< epicsMutex > Guard
Definition: pdbsingle.cpp:30
PDBProvider::shared_pointer provider
Definition: pdbsingle.h:35
epics::pvData::StructureConstPtr fielddesc
Definition: pdb.h:19
virtual ~PDBSingleMonitor()
Definition: pdbsingle.cpp:444
#define DBE_VALUE
Definition: caeventmask.h:38
std::vector< std::vector< char > > groups
Definition: pvif.h:94
Holds all PVA related.
Definition: pvif.h:34
void removeMonitor(PDBSingleMonitor *)
Definition: pdbsingle.cpp:166
DBEvent evt_PROPERTY
Definition: pdbsingle.h:55
shared_pointer shared_from_this()
Definition: pvahelper.h:73
requester_t::weak_pointer requester
Definition: pdbsingle.h:109
pvData
Definition: monitor.h:428
This class implements introspection object for a structure.
Definition: pvIntrospect.h:697
virtual epics::pvAccess::Channel::shared_pointer connect(const std::tr1::shared_ptr< PDBProvider > &prov, const epics::pvAccess::ChannelRequester::shared_pointer &req) OVERRIDE FINAL
Definition: pdbsingle.cpp:128
PVIF::proc_t doProc
Definition: pdbsingle.h:118
void create(dbEventCtx ctx, dbChannel *ch, EVENTFUNC *fn, unsigned mask)
Definition: pvif.h:224
static size_t num_instances
Definition: pdbsingle.h:86
epics::pvData::PVStructurePtr complete
Definition: pdbsingle.h:46
static size_t num_instances
Definition: pdbsingle.h:144
p2p::auto_ptr< ScalarBuilder > builder
Definition: pdbsingle.h:43
void addMonitor(PDBSingleMonitor *)
Definition: pdbsingle.cpp:140
DBEvent evt_VALUE
Definition: pdbsingle.h:55
DBCH chan
Definition: pdbsingle.h:34
Definition: pvif.h:72
#define DBE_PROPERTY
Definition: caeventmask.h:42
void finalizeMonitor()
Definition: pdbsingle.cpp:183
unsigned dbe_mask
Definition: pvif.h:217
processNotify notify
Definition: pdbsingle.h:114
interested_t interested_add
Definition: pdbsingle.h:50
virtual epics::pvData::Monitor::shared_pointer createMonitor(epics::pvData::MonitorRequester::shared_pointer const &requester, epics::pvData::PVStructure::shared_pointer const &pvRequest) OVERRIDE FINAL
Definition: pdbsingle.cpp:232
bool post(guard_t &guard, const epics::pvData::BitSet &updated, no_overflow)
post update if queue not full, if full return false w/o overflow
Definition: pvahelper.h:136
virtual void lock()
Definition: pvAccess.h:97
std::set< BaseMonitor::shared_pointer > interested_remove_t
Definition: pdbsingle.h:52
virtual void requestUpdate() OVERRIDE FINAL
Definition: pdbsingle.cpp:467
Definition: pvif.h:214
epics::pvData::PVStructurePtr pvf
Definition: pdbsingle.h:112
Definition: pvif.h:355
static size_t num_instances
Definition: pdbsingle.h:58
epics::pvData::BitSetPtr wait_changed
Definition: pdbsingle.h:111
virtual void cancel() OVERRIDE FINAL
Definition: pdbsingle.cpp:409
const requester_type::weak_pointer requester
Definition: pvahelper.h:35
#define PTRMOVE(AUTO)
Definition: helper.h:15
std::vector< char > user
Definition: pvif.h:93
char * name
Definition: dbBase.h:81
bool hadevent_VALUE
Definition: pdbsingle.h:56
virtual ~PDBSinglePV()
Definition: pdbsingle.cpp:116
virtual void destroy()
Definition: pvahelper.h:273
T getAs() const
Definition: pvData.h:302
#define false
Definition: flexdef.h:85
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
PDBSinglePV(DBCH &chan, const PDBProvider::shared_pointer &prov)
Definition: pdbsingle.cpp:96
const epics::pvData::StructureConstPtr fielddesc
Definition: pvahelper.h:36
PDBSinglePut(const PDBSingleChannel::shared_pointer &channel, const epics::pvAccess::ChannelPutRequester::shared_pointer &requester, const epics::pvData::PVStructure::shared_pointer &pvReq)
Definition: pdbsingle.cpp:299