This is Unofficial EPICS BASE Doxygen Site
clientSync.cpp
Go to the documentation of this file.
1 /*
2  * Copyright information and license terms for this software can be
3  * found in the file LICENSE that is included with the distribution
4  */
5 
6 #include <epicsMutex.h>
7 #include <epicsGuard.h>
8 #include <epicsEvent.h>
9 
10 #include <pv/current_function.h>
11 #include <pv/pvData.h>
12 #include <pv/bitSet.h>
13 #include <pv/epicsException.h>
14 
15 #define epicsExportSharedSymbols
16 #include "pv/logger.h"
17 #include "pva/client.h"
18 #include "pv/pvAccess.h"
19 
20 namespace pvd = epics::pvData;
21 namespace pva = epics::pvAccess;
24 
25 namespace {
26 struct WaitCommon
27 {
29  epicsEvent event;
30  bool done;
31 
32  WaitCommon() :done(false) {}
33  void wait(double timeout)
34  {
35  Guard G(mutex);
36  while(!done) {
37  UnGuard U(G);
38  if(!event.wait(timeout)) {
39  throw pvac::Timeout();
40  }
41  }
42  }
43 };
44 
45 struct GetWait : public pvac::ClientChannel::GetCallback,
46  public WaitCommon
47 {
49 
50  GetWait() {}
51  virtual ~GetWait() {}
52  virtual void getDone(const pvac::GetEvent& evt) OVERRIDE FINAL
53  {
54  {
55  Guard G(mutex);
56  if(done) {
57  LOG(pva::logLevelWarn, "oops, double event to GetCallback");
58  } else {
59  result = evt;
60  done = true;
61  }
62  }
63  event.signal();
64  }
65 };
66 } //namespace
67 
68 namespace pvac {
69 
70 pvd::PVStructure::const_shared_pointer
72  pvd::PVStructure::const_shared_pointer pvRequest)
73 {
74  GetWait waiter;
75  {
76  Operation op(get(&waiter, pvRequest));
77  waiter.wait(timeout);
78  }
79  switch(waiter.result.event) {
80  case GetEvent::Success:
81  return waiter.result.value;
82  case GetEvent::Fail:
83  throw std::runtime_error(waiter.result.message);
84  default:
85  case GetEvent::Cancel: // cancel implies timeout, which should already be thrown
86  THROW_EXCEPTION2(std::logic_error, "Cancelled!?!?");
87  }
88 }
89 
90 pvd::PVStructure::const_shared_pointer
91 pvac::ClientChannel::rpc(double timeout,
92  const epics::pvData::PVStructure::const_shared_pointer& arguments,
93  epics::pvData::PVStructure::const_shared_pointer pvRequest)
94 {
95  GetWait waiter;
96  Operation op(rpc(&waiter, arguments, pvRequest));
97  {
98  Guard G(waiter.mutex);
99  while(!waiter.done) {
100  UnGuard U(G);
101  if(!waiter.event.wait(timeout)) {
102  op.cancel();
103  throw Timeout();
104  }
105  }
106  }
107  if(waiter.result.event==pvac::GetEvent::Success)
108  return waiter.result.value;
109  else
110  throw std::runtime_error(waiter.result.message);
111 }
112 
113 namespace {
114 struct PutValCommon : public pvac::ClientChannel::PutCallback,
115  public WaitCommon
116 {
118 
119  PutValCommon() {}
120  virtual ~PutValCommon() {}
121 
122  virtual void putDone(const PutEvent& evt) OVERRIDE FINAL
123  {
124  {
125  Guard G(mutex);
126  if(done) {
127  LOG(pva::logLevelWarn, "oops, double event to PutCallback");
128  } else {
129  result = evt;
130  done = true;
131  }
132  }
133  event.signal();
134  }
135 };
136 
137 } //namespace
138 
139 namespace detail {
140 
142  public WaitCommon
143 {
146 
148  :builder(builder)
149  {}
150  virtual ~Exec() {}
151 
152  virtual void putBuild(const epics::pvData::StructureConstPtr& build, Args& args) OVERRIDE FINAL
153  {
155  pvd::PVStructurePtr root(create->createPVStructure(build));
156 
157  for(PutBuilder::scalars_t::const_iterator it = builder.scalars.begin(), end = builder.scalars.end();
158  it!=end; ++it)
159  {
160  if(it->value.empty())
161  continue;
162 
163  pvd::PVFieldPtr fld(root->getSubField(it->name));
164  if(!fld && it->required)
165  throw std::runtime_error(std::string("Server does not have required field ")+it->name);
166  else if(!fld)
167  continue; // !it->required
168 
169  const pvd::FieldConstPtr& ftype(fld->getField());
170  if(ftype->getType()==pvd::union_) {
171  const pvd::Union *utype = static_cast<const pvd::Union*>(ftype.get());
172  pvd::PVUnion *ufld = static_cast<pvd::PVUnion*>(fld.get());
173 
174  if(utype->isVariant()) {
175  pvd::PVScalarPtr scalar(create->createPVScalar(it->value.type()));
176 
177  scalar->putFrom(it->value);
178  ufld->set(scalar);
179 
180  } else {
181  // attempt automagic assignment to descriminating union
182  pvd::int32 idx = utype->guess(pvd::scalar, it->value.type());
183 
184  if(idx==-1)
185  throw std::runtime_error(std::string("Unable to descriminate union field ")+it->name);
186 
187  ufld->select<pvd::PVScalar>(idx)->putFrom(it->value);
188  }
189 
190  } else if(ftype->getType()==pvd::scalar) {
191  static_cast<pvd::PVScalar*>(fld.get())->putFrom(it->value);
192 
193  } else {
194  throw std::runtime_error(std::string("Type mis-match assigning scalar to field ")+it->name);
195 
196  }
197 
198  args.tosend.set(fld->getFieldOffset());
199  }
200 
201  for(PutBuilder::arrays_t::const_iterator it = builder.arrays.begin(), end = builder.arrays.end();
202  it!=end; ++it)
203  {
204  if(it->value.empty())
205  continue;
206 
207  pvd::PVFieldPtr fld(root->getSubField(it->name));
208  if(!fld && it->required)
209  throw std::runtime_error(std::string("Server does not have required field ")+it->name);
210  else if(!fld)
211  continue; // !it->required
212 
213  const pvd::FieldConstPtr& ftype(fld->getField());
214  if(ftype->getType()==pvd::union_) {
215  const pvd::Union *utype = static_cast<const pvd::Union*>(ftype.get());
216  pvd::PVUnion *ufld = static_cast<pvd::PVUnion*>(fld.get());
217 
218  if(utype->isVariant()) {
219  pvd::PVScalarArrayPtr scalar(create->createPVScalarArray(it->value.original_type()));
220 
221  scalar->putFrom(it->value);
222  ufld->set(scalar);
223 
224  } else {
225  // attempt automagic assignment to descriminating union
226  pvd::int32 idx = utype->guess(pvd::scalarArray, it->value.original_type());
227 
228  if(idx==-1)
229  throw std::runtime_error(std::string("Unable to descriminate union field ")+it->name);
230 
231  ufld->select<pvd::PVScalarArray>(idx)->putFrom(it->value);
232  }
233 
234  } else if(ftype->getType()==pvd::scalarArray) {
235  static_cast<pvd::PVScalarArray*>(fld.get())->putFrom(it->value);
236 
237  // TODO
238  } else {
239  throw std::runtime_error(std::string("Type mis-match assigning scalar to field ")+it->name);
240 
241  }
242 
243  args.tosend.set(fld->getFieldOffset());
244  }
245 
246  args.root = root;
247  }
248 
249  virtual void putDone(const PutEvent& evt) OVERRIDE FINAL
250  {
251  {
252  Guard G(mutex);
253  if(done) {
254  LOG(pva::logLevelWarn, "oops, double event to PutCallback");
255  } else {
256  result = evt;
257  done = true;
258  }
259  }
260  event.signal();
261  }
262 };
263 
264 void PutBuilder::exec(double timeout)
265 {
266  Exec work(*this);
267  {
268  Operation op(channel.put(&work, request));
269  work.wait(timeout);
270  }
271  switch(work.result.event) {
272  case PutEvent::Success: return;
273  case PutEvent::Fail:
274  throw std::runtime_error(work.result.message);
275  case PutEvent::Cancel:
276  THROW_EXCEPTION2(std::logic_error, "Cancelled!?!");
277  }
278 }
279 
280 } // namespace detail
281 
283 {
284  const bool ourevent;
285  epicsEvent * const event;
286 
288  bool hadevent;
289 
291 
292  // maintained to ensure we (MonitorCallback) outlive the subscription
294 
296  :ourevent(!event)
297  ,event(ourevent ? new epicsEvent : event)
298  ,hadevent(false)
299  {}
300  virtual ~SImpl()
301  {
302  sub.cancel();
303  if(ourevent)
304  delete event;
305  }
306 
307  virtual void monitorEvent(const MonitorEvent& evt) OVERRIDE FINAL
308  {
309  {
310  Guard G(mutex);
311  last = evt;
312  hadevent = true;
313  }
314  event->signal();
315  }
316 };
317 
318 MonitorSync::MonitorSync(const Monitor& mon, const std::tr1::shared_ptr<SImpl>& simpl)
319  :Monitor(mon.impl)
320  ,simpl(simpl)
321 {
322  simpl->sub = mon;
323  event.event = MonitorEvent::Fail;
324 }
325 
327 }
328 
330 {
331  if(!simpl) throw std::logic_error("No subscription");
332  Guard G(simpl->mutex);
333  event = simpl->last;
334  simpl->last.event = MonitorEvent::Fail;
335  bool ret = simpl->hadevent;
336  simpl->hadevent = false;
337  return ret;
338 }
339 
341 {
342  if(!simpl) throw std::logic_error("No subscription");
343  simpl->event->wait();
344  Guard G(simpl->mutex);
345  event = simpl->last;
346  simpl->last.event = MonitorEvent::Fail;
347  bool ret = simpl->hadevent;
348  simpl->hadevent = false;
349  return ret;
350 }
351 
352 bool MonitorSync::wait(double timeout)
353 {
354  if(!simpl) throw std::logic_error("No subscription");
355  bool ret = simpl->event->wait(timeout);
356  if(ret) {
357  Guard G(simpl->mutex);
358  event = simpl->last;
359  simpl->last.event = MonitorEvent::Fail;
360  ret = simpl->hadevent;
361  simpl->hadevent = false;
362  }
363  return ret;
364 }
365 
367  if(simpl) simpl->event->signal();
368 }
369 
371 ClientChannel::monitor(const epics::pvData::PVStructure::const_shared_pointer &pvRequest,
372  epicsEvent *event)
373 {
374  std::tr1::shared_ptr<MonitorSync::SImpl> simpl(new MonitorSync::SImpl(event));
375  Monitor mon(monitor(simpl.get(), pvRequest));
376  return MonitorSync(mon, simpl);
377 }
378 
379 namespace {
380 
381 
382 struct InfoWait : public pvac::ClientChannel::InfoCallback,
383  public WaitCommon
384 {
386 
387  InfoWait() {}
388  virtual ~InfoWait() {}
389  virtual void infoDone(const pvac::InfoEvent& evt) OVERRIDE FINAL
390  {
391  {
392  Guard G(mutex);
393  if(done) {
394  LOG(pva::logLevelWarn, "oops, double event to InfoCallback");
395  } else {
396  result = evt;
397  done = true;
398  }
399  }
400  event.signal();
401  }
402 };
403 
404 } // namespace
405 
407 ClientChannel::info(double timeout, const std::string& subfld)
408 {
409  InfoWait waiter;
410  {
411  Operation op(info(&waiter, subfld));
412  waiter.wait(timeout);
413  }
414  switch(waiter.result.event) {
415  case InfoEvent::Success:
416  return waiter.result.type;
417  case InfoEvent::Fail:
418  throw std::runtime_error(waiter.result.message);
419  default:
420  case InfoEvent::Cancel: // cancel implies timeout, which should already be thrown
421  THROW_EXCEPTION2(std::logic_error, "Cancelled!?!?");
422  }
423 }
424 
425 
426 }//namespace pvac
Thrown by blocking methods of ClientChannel on operation timeout.
Definition: client.h:247
double timeout
Definition: pvutils.cpp:25
PVScalar is the base class for each scalar field.
Definition: pvData.h:272
This class implements introspection object for a union.
Definition: pvIntrospect.h:866
pvac::PutEvent result
Definition: clientSync.cpp:117
std::string request
Exec(detail::PutBuilder &builder)
Definition: clientSync.cpp:147
#define THROW_EXCEPTION2(TYPE, MSG)
virtual void putBuild(const epics::pvData::StructureConstPtr &build, Args &args) OVERRIDE FINAL
Definition: clientSync.cpp:152
Operation rpc(GetCallback *cb, const epics::pvData::PVStructure::const_shared_pointer &arguments, epics::pvData::PVStructure::const_shared_pointer pvRequest=epics::pvData::PVStructure::const_shared_pointer())
Definition: clientRPC.cpp:158
Information on get/rpc completion.
Definition: client.h:99
callbacks for put()
Definition: client.h:358
SImpl(epicsEvent *event)
Definition: clientSync.cpp:295
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
Operation get(GetCallback *cb, epics::pvData::PVStructure::const_shared_pointer pvRequest=epics::pvData::PVStructure::const_shared_pointer())
Definition: clientGet.cpp:151
Handle for monitor subscription.
Definition: client.h:117
callback for get() and rpc()
Definition: client.h:319
#define OVERRIDE
Definition: pvAccess.h:55
Handle for in-progress get/put/rpc operation.
Definition: client.h:50
detail::PutBuilder & builder
Definition: clientSync.cpp:144
std::string message
Check when event==Fail.
Definition: client.h:95
std::tr1::shared_ptr< PVDataCreate > PVDataCreatePtr
Definition: pvData.h:124
Holds all PVA related.
Definition: pvif.h:34
Information on put completion.
Definition: client.h:88
#define LOG(level, format,...)
Definition: logger.h:48
virtual void putDone(const PutEvent &evt) OVERRIDE FINAL
Put operation is complete.
Definition: clientSync.cpp:249
pvData
Definition: monitor.h:428
epicsGuard< epicsMutex > Guard
Definition: clientSync.cpp:22
void cancel()
Immediate cancellation.
epicsMutex mutex
Definition: pvAccess.cpp:71
APIs for the epicsMutex mutual exclusion semaphore.
void cancel()
Definition: client.cpp:136
enum pvac::PutEvent::event_t event
Base class for a scalarArray.
Definition: pvData.h:618
std::tr1::shared_ptr< PVScalar > PVScalarPtr
Definition: pvData.h:77
epicsGuardRelease< epicsMutex > UnGuard
Definition: clientSync.cpp:23
PVUnion has a single subfield.
Definition: pvData.h:940
std::tr1::shared_ptr< const Field > FieldConstPtr
Definition: pvIntrospect.h:137
void set(PVFieldPtr const &value)
Definition: pvData.h:1028
request cancelled before completion
Definition: client.h:92
APIs for the epicsEvent binary semaphore.
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
PVFieldPtr select(int32 index)
Definition: PVUnion.cpp:54
Information on monitor subscription/queue change.
Definition: client.h:184
MonitorEvent event
Definition: client.h:231
request ends in failure. Check message
Definition: client.h:91
virtual void monitorEvent(const MonitorEvent &evt) OVERRIDE FINAL
Definition: clientSync.cpp:307
epicsEvent *const event
Definition: clientSync.cpp:285
std::tr1::shared_ptr< PVField > PVFieldPtr
Definition: pvData.h:66
void done(int k)
Definition: antelope.c:77
Operation info(InfoCallback *cb, const std::string &subfld=std::string())
Definition: clientInfo.cpp:99
ChannelPut::shared_pointer op
Definition: pvAccess.cpp:132
Monitor event notification.
Definition: client.h:403
bool isVariant() const
std::tr1::shared_ptr< PVScalarArray > PVScalarArrayPtr
Definition: pvData.h:82
int32 guess(Type t, ScalarType s) const
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
It worked!
Definition: client.h:93
#define FINAL
Definition: pvAccess.h:48
See Client API API.
Definition: client.cpp:30
subscription ends in an error
Definition: client.h:187
Monitor monitor(MonitorCallback *cb, epics::pvData::PVStructure::const_shared_pointer pvRequest=epics::pvData::PVStructure::const_shared_pointer())