This is Unofficial EPICS BASE Doxygen Site
sharedstate_pv.cpp
Go to the documentation of this file.
1 /*
2  * Copyright information and license terms for this software can be
3  * found in the file LICENSE that is included with the distribution
4  */
5 
6 #include <list>
7 
8 #include <epicsMutex.h>
9 #include <epicsGuard.h>
10 #include <errlog.h>
11 
12 #include <shareLib.h>
13 #include <pv/sharedPtr.h>
14 #include <pv/noDefaultMethods.h>
15 #include <pv/sharedVector.h>
16 #include <pv/bitSet.h>
17 #include <pv/pvData.h>
18 #include <pv/createRequest.h>
19 #include <pv/status.h>
20 #include <pv/reftrack.h>
21 
22 #define epicsExportSharedSymbols
23 #include "sharedstateimpl.h"
24 
25 
26 namespace {
27 struct MailboxHandler : public pvas::SharedPV::Handler {
28  virtual ~MailboxHandler() {}
29  virtual void onPut(const pvas::SharedPV::shared_pointer& self, pvas::Operation& op) OVERRIDE FINAL
30  {
31  self->post(op.value(), op.changed());
32  op.complete();
33  }
34  static std::tr1::shared_ptr<pvas::SharedPV::Handler> build() {
35  std::tr1::shared_ptr<MailboxHandler> ret(new MailboxHandler);
36  return ret;
37  }
38 };
39 } // namespace
40 
41 namespace pvas {
42 
43 SharedPV::Handler::~Handler() {}
44 
45 void SharedPV::Handler::onPut(const SharedPV::shared_pointer& pv, Operation& op)
46 {
47  op.complete(pvd::Status::error("Put not supported"));
48 }
49 
50 void SharedPV::Handler::onRPC(const SharedPV::shared_pointer& pv, Operation& op)
51 {
52  op.complete(pvd::Status::error("RPC not supported"));
53 }
54 
55 SharedPV::Config::Config()
56  :dropEmptyUpdates(true)
57  ,mapperMode(pvd::PVRequestMapper::Mask)
58 {}
59 
60 size_t SharedPV::num_instances;
61 
62 SharedPV::shared_pointer SharedPV::build(const std::tr1::shared_ptr<Handler>& handler, Config *conf)
63 {
64  assert(!!handler);
65  SharedPV::shared_pointer ret(new SharedPV(handler, conf));
66  ret->internal_self = ret;
67  return ret;
68 }
69 
70 SharedPV::shared_pointer SharedPV::buildReadOnly(Config *conf)
71 {
72  SharedPV::shared_pointer ret(new SharedPV(std::tr1::shared_ptr<Handler>(), conf));
73  ret->internal_self = ret;
74  return ret;
75 }
76 
77 SharedPV::shared_pointer SharedPV::buildMailbox(pvas::SharedPV::Config *conf)
78 {
79  std::tr1::shared_ptr<Handler> handler(new MailboxHandler);
80  SharedPV::shared_pointer ret(new SharedPV(handler, conf));
81  ret->internal_self = ret;
82  return ret;
83 }
84 
85 SharedPV::SharedPV(const std::tr1::shared_ptr<Handler> &handler, pvas::SharedPV::Config *conf)
86  :config(conf ? *conf : Config())
87  ,handler(handler)
88  ,notifiedConn(false)
89  ,debugLvl(0)
90 {
91  REFTRACE_INCREMENT(num_instances);
92 }
93 
95  close();
96  REFTRACE_DECREMENT(num_instances);
97 }
98 
99 void SharedPV::setHandler(const std::tr1::shared_ptr<Handler>& handler)
100 {
101  Guard G(mutex);
102  this->handler = handler;
103 }
104 
105 SharedPV::Handler::shared_pointer SharedPV::getHandler() const
106 {
107  Guard G(mutex);
108  return handler;
109 }
110 
111 
112 bool SharedPV::isOpen() const
113 {
114  Guard G(mutex);
115  return !!type;
116 }
117 
118 namespace {
119 struct PutInfo { // oh to be able to use std::tuple ...
120  std::tr1::shared_ptr<detail::SharedPut> put;
123  PutInfo(const std::tr1::shared_ptr<detail::SharedPut>& put, const pvd::StructureConstPtr& type, const pvd::Status& status)
124  :put(put), type(type), status(status)
125  {}
126  PutInfo(const std::tr1::shared_ptr<detail::SharedPut>& put, const pvd::StructureConstPtr& type, const std::string& message)
127  :put(put), type(type)
128  {
129  if(!message.empty())
130  status = pvd::Status::warn(message);
131  }
132 };
133 }
134 
136 {
137  typedef std::vector<PutInfo> xputs_t;
138  typedef std::vector<std::tr1::shared_ptr<detail::SharedRPC> > xrpcs_t;
139  typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
140  typedef std::vector<std::tr1::shared_ptr<pva::GetFieldRequester> > xgetfields_t;
141 
142  const pvd::StructureConstPtr newtype(value.getStructure());
143  pvd::PVStructurePtr newvalue(pvd::getPVDataCreate()->createPVStructure(newtype));
144  newvalue->copyUnchecked(value, valid);
145 
146  xputs_t p_put;
147  xrpcs_t p_rpc;
148  xmonitors_t p_monitor;
149  xgetfields_t p_getfield;
150  {
151  Guard I(mutex);
152 
153  if(type)
154  throw std::logic_error("Already open()");
155 
156  p_put.reserve(puts.size());
157  p_rpc.reserve(rpcs.size());
158  p_monitor.reserve(monitors.size());
159  p_getfield.reserve(getfields.size());
160 
161  type = newtype;
162  current = newvalue;
163  this->valid = valid;
164 
165  FOR_EACH(puts_t::const_iterator, it, end, puts) {
166  if((*it)->channel->dead) continue;
167  try {
168  try {
169  (*it)->mapper.compute(*current, *(*it)->pvRequest, config.mapperMode);
170  p_put.push_back(PutInfo((*it)->shared_from_this(), (*it)->mapper.requested(), (*it)->mapper.warnings()));
171  }catch(std::runtime_error& e) {
172  // compute() error
173  p_put.push_back(PutInfo((*it)->shared_from_this(), pvd::StructureConstPtr(), pvd::Status::error(e.what())));
174  }
175  }catch(std::tr1::bad_weak_ptr&) {
176  //racing destruction
177  }
178  }
179  FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
180  if((*it)->connected || (*it)->channel->dead) continue;
181  try {
182  p_rpc.push_back((*it)->shared_from_this());
183  }catch(std::tr1::bad_weak_ptr&) {}
184  }
185  FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
186  if((*it)->channel->dead) continue;
187  try {
188  (*it)->open(newtype);
189  // post initial update
190  (*it)->post(*current, valid);
191  p_monitor.push_back((*it)->shared_from_this());
192  }catch(std::tr1::bad_weak_ptr&) {}
193  }
194  // consume getField
195  FOR_EACH(getfields_t::iterator, it, end, getfields) {
196  // TODO: this may be on a dead Channel
197  p_getfield.push_back(it->lock());
198  }
199  getfields.clear(); // consume
200  }
201  // unlock for callbacks
202  FOR_EACH(xputs_t::iterator, it, end, p_put) {
203  detail::SharedPut::requester_type::shared_pointer requester(it->put->requester.lock());
204  if(requester) {
205  if(it->status.getType()==pvd::Status::STATUSTYPE_WARNING)
206  requester->message(it->status.getMessage(), pvd::warningMessage);
207  requester->channelPutConnect(it->status, it->put, it->type);
208  }
209  }
210  FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
211  detail::SharedRPC::requester_type::shared_pointer requester((*it)->requester.lock());
212  if(requester) requester->channelRPCConnect(pvd::Status(), *it);
213  }
214  FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
215  (*it)->notify();
216  }
217  FOR_EACH(xgetfields_t::iterator, it, end, p_getfield) {
218  if(*it) (*it)->getDone(pvd::Status(), newtype);
219  }
220 }
221 
223 {
224  // consider all fields to have non-default values. For users how don't keep track of this.
225  open(value, pvd::BitSet().set(0));
226 }
227 
229 {
230  pvd::PVStructurePtr value(pvd::getPVDataCreate()->createPVStructure(type));
231  open(*value);
232 }
233 
234 void SharedPV::realClose(bool destroy, bool closing, const epics::pvAccess::ChannelProvider* provider)
235 {
236  typedef std::vector<std::tr1::shared_ptr<pva::ChannelPutRequester> > xputs_t;
237  typedef std::vector<std::tr1::shared_ptr<pva::ChannelRPCRequester> > xrpcs_t;
238  typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
239  typedef std::vector<std::tr1::shared_ptr<detail::SharedChannel> > xchannels_t;
240 
241  xputs_t p_put;
242  xrpcs_t p_rpc;
243  xmonitors_t p_monitor;
244  xchannels_t p_channel;
245  Handler::shared_pointer p_handler;
246  {
247  Guard I(mutex);
248 
249  FOR_EACH(rpcs_t::const_iterator, it, end, rpcs) {
250  if(!(*it)->connected) continue;
251  p_rpc.push_back((*it)->requester.lock());
252  }
253 
254  if(type) {
255 
256  p_put.reserve(puts.size());
257  p_rpc.reserve(rpcs.size());
258  p_monitor.reserve(monitors.size());
259  p_channel.reserve(channels.size());
260 
261  FOR_EACH(puts_t::const_iterator, it, end, puts) {
262  if(provider && (*it)->channel->provider.lock().get()!=provider)
263  continue;
264  (*it)->mapper.reset();
265  p_put.push_back((*it)->requester.lock());
266  }
267  FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
268  (*it)->close();
269  try {
270  p_monitor.push_back((*it)->shared_from_this());
271  }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
272  }
273  FOR_EACH(channels_t::const_iterator, it, end, channels) {
274  try {
275  p_channel.push_back((*it)->shared_from_this());
276  }catch(std::tr1::bad_weak_ptr&) { /* ignore, racing dtor */ }
277  }
278 
279  if(closing) {
280  type.reset();
281  current.reset();
282  }
283  }
284 
285  if(destroy) {
286  // forget about all clients, to prevent the possibility of our
287  // sending a second destroy notification.
288  puts.clear();
289  rpcs.clear();
290  monitors.clear();
291  if(!channels.empty() && notifiedConn) {
292  p_handler = handler;
293  notifiedConn = false;
294  }
295  channels.clear();
296  }
297  }
298  FOR_EACH(xputs_t::iterator, it, end, p_put) {
299  if(*it) (*it)->channelDisconnect(destroy);
300  }
301  FOR_EACH(xrpcs_t::iterator, it, end, p_rpc) {
302  if(*it) (*it)->channelDisconnect(destroy);
303  }
304  FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
305  (*it)->notify();
306  }
307  FOR_EACH(xchannels_t::iterator, it, end, p_channel) {
308  pva::ChannelRequester::shared_pointer req((*it)->requester.lock());
309  if(!req) continue;
310  req->channelStateChange(*it, destroy ? pva::Channel::DESTROYED : pva::Channel::DISCONNECTED);
311  }
312  if(p_handler) {
313  shared_pointer self(internal_self);
314  p_handler->onLastDisconnect(self);
315  }
316 }
317 
318 pvd::PVStructure::shared_pointer SharedPV::build()
319 {
320  Guard G(mutex);
321  if(!type)
322  throw std::logic_error("Can't build() before open()");
323  return pvd::getPVDataCreate()->createPVStructure(type);
324 }
325 
327  const pvd::BitSet& changed)
328 {
329  typedef std::vector<std::tr1::shared_ptr<pva::MonitorFIFO> > xmonitors_t;
330  xmonitors_t p_monitor;
331  {
332  Guard I(mutex);
333 
334  if(!type)
335  throw std::logic_error("Not open()");
336  else if(*type!=*value.getStructure())
337  throw std::logic_error("Type mis-match");
338 
339  if(current) {
340  current->copyUnchecked(value, changed);
341  valid |= changed;
342  }
343 
344  p_monitor.reserve(monitors.size()); // ick, for lack of a list with thread-safe iteration
345 
346  FOR_EACH(monitors_t::const_iterator, it, end, monitors) {
347  (*it)->post(value, changed);
348  p_monitor.push_back((*it)->shared_from_this());
349  }
350  }
351  FOR_EACH(xmonitors_t::iterator, it, end, p_monitor) {
352  (*it)->notify();
353  }
354 }
355 
357 {
358  Guard I(mutex);
359  if(!type)
360  throw std::logic_error("Not open()");
361  else if(value.getStructure()!=type)
362  throw std::logic_error("Types do not match");
363 
364  value.copy(*current);
365  valid = this->valid;
366 }
367 
368 
369 std::tr1::shared_ptr<pva::Channel>
370 SharedPV::connect(const std::tr1::shared_ptr<epics::pvAccess::ChannelProvider> &provider,
371  const std::string &channelName,
372  const std::tr1::shared_ptr<pva::ChannelRequester>& requester)
373 {
374  shared_pointer self(internal_self);
375  std::tr1::shared_ptr<detail::SharedChannel> ret(new detail::SharedChannel(self, provider, channelName, requester));
376  return ret;
377 }
378 
379 void
381 {
382  realClose(destroy, false, provider);
383 }
384 
385 void SharedPV::setDebug(int lvl)
386 {
387  Guard G(mutex);
388  debugLvl = lvl;
389 }
390 
391 int SharedPV::isDebug() const
392 {
393  Guard G(mutex);
394  return debugLvl;
395 }
396 
397 } // namespace pvas
bool isOpen() const
test open-ness. cf. open() and close()
Definition: link.h:174
static Status warn(const std::string &m)
Definition: status.h:49
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
std::tr1::shared_ptr< detail::SharedPut > put
virtual ~SharedPV()
pvd::Status status
#define true
Definition: flexdef.h:84
static Status error(const std::string &m)
Definition: status.h:50
Definition: tool_lib.h:67
int isDebug() const
pvd::StructureConstPtr type
static shared_pointer buildMailbox(Config *conf=0)
A SharedPV which accepts all Put operations, and fails all RPC operations. In closed state...
void fetch(epics::pvData::PVStructure &value, epics::pvData::BitSet &valid)
Update arguments with current value, which is the initial value from open() with accumulated post() c...
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
void setDebug(int lvl)
A vector of bits.
Definition: bitSet.h:56
Mark external symbols and entry points for shared libraries.
virtual std::tr1::shared_ptr< epics::pvAccess::Channel > connect(const std::tr1::shared_ptr< epics::pvAccess::ChannelProvider > &provider, const std::string &channelName, const std::tr1::shared_ptr< epics::pvAccess::ChannelRequester > &requester) OVERRIDE FINAL
#define FOR_EACH(TYPE, IT, END, OBJ)
void setHandler(const std::tr1::shared_ptr< Handler > &handler)
Replace Handler given with ctor.
#define OVERRIDE
Definition: pvAccess.h:55
void copy(const PVStructure &from)
pvData
Definition: monitor.h:428
void open(const epics::pvData::PVStructure &value)
Shorthand for.
#define puts
Definition: epicsStdio.h:46
epicsMutex mutex
Definition: pvAccess.cpp:71
APIs for the epicsMutex mutual exclusion semaphore.
const ChannelProcessRequester::weak_pointer requester
Definition: pvAccess.cpp:68
See Server API API.
Data interface for a structure,.
Definition: pvData.h:712
virtual void onPut(const SharedPV::shared_pointer &pv, Operation &op)
Client requests Put.
std::tr1::shared_ptr< epics::pvData::PVStructure > build()
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
const epics::pvData::BitSet & changed() const
Applies to value(). Which fields of input data are actual valid. Others should not be used...
void close(bool destroy=false)
Definition: sharedstate.h:153
Handler::shared_pointer getHandler() const
const StructureConstPtr & getStructure() const
Definition: pvData.h:731
ChannelPut::shared_pointer op
Definition: pvAccess.cpp:132
void post(const epics::pvData::PVStructure &value, const epics::pvData::BitSet &changed)
virtual void disconnect(bool destroy, const epics::pvAccess::ChannelProvider *provider) OVERRIDE FINAL
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
#define FINAL
Definition: pvAccess.h:48
const epics::pvData::PVStructure & value() const
static shared_pointer buildReadOnly(Config *conf=0)
A SharedPV which fails all Put and RPC operations. In closed state.