This is Unofficial EPICS BASE Doxygen Site
pvalink_channel.cpp
Go to the documentation of this file.
1 
2 #include <alarm.h>
3 
4 #include <pv/reftrack.h>
5 
6 #include "pvalink.h"
7 
9 
10 namespace pvalink {
11 
13 
14 
16  :create(pvd::getPVDataCreate())
17  ,queue("PVAL")
18  ,running(false)
19 {
20  // worker should be above PVA worker priority?
22 }
23 
25 {
26 }
27 
30 
31 
32 bool pvaLinkChannel::LinkSort::operator()(const pvaLink *L, const pvaLink *R) const {
33  if(L->monorder==R->monorder)
34  return L < R;
35  return L->monorder < R->monorder;
36 }
37 
38 // being called with pvaGlobal::lock held
39 pvaLinkChannel::pvaLinkChannel(const pvaGlobal_t::channels_key_t &key, const pvd::PVStructure::const_shared_pointer& pvRequest)
40  :key(key)
41  ,pvRequest(pvRequest)
42  ,num_disconnect(0u)
43  ,num_type_change(0u)
44  ,connected(false)
45  ,connected_latched(false)
46  ,isatomic(false)
47  ,queued(false)
48  ,debug(false)
49  ,links_changed(false)
50 {}
51 
53  {
54  Guard G(pvaGlobal->lock);
55  pvaGlobal->channels.erase(key);
56  }
57 
58  Guard G(lock);
59 
60  assert(links.empty());
61  REFTRACE_DECREMENT(num_instances);
62 }
63 
65 {
66  Guard G(lock);
67 
68  try {
69  chan = pvaGlobal->provider_local.connect(key.first);
70  DEBUG(this, <<key.first<<" OPEN Local");
71  providerName = pvaGlobal->provider_local.name();
72  } catch(std::exception& e){
73  // The PDBProvider doesn't have a way to communicate to us
74  // whether this is an invalid record or group name,
75  // or if this is some sort of internal error.
76  // So we are forced to assume it is an invalid name.
77  DEBUG(this, <<key.first<<" OPEN Not local "<<e.what());
78  }
79  if(!pvaLinkIsolate && !chan) {
80  chan = pvaGlobal->provider_remote.connect(key.first);
81  DEBUG(this, <<key.first<<" OPEN Remote ");
82  providerName = pvaGlobal->provider_remote.name();
83  }
84 
85  op_mon = chan.monitor(this, pvRequest);
86 
87  REFTRACE_INCREMENT(num_instances);
88 }
89 
90 static
91 pvd::StructureConstPtr putRequestType = pvd::getFieldCreate()->createFieldBuilder()
92  ->addNestedStructure("field")
93  ->endNested()
94  ->addNestedStructure("record")
95  ->addNestedStructure("_options")
96  ->add("block", pvd::pvBoolean)
97  ->add("process", pvd::pvString) // "true", "false", or "passive"
98  ->endNested()
99  ->endNested()
100  ->createStructure();
101 
102 // call with channel lock held
103 void pvaLinkChannel::put(bool force)
104 {
105  pvd::PVStructurePtr pvReq(pvd::getPVDataCreate()->createPVStructure(putRequestType));
106  pvReq->getSubFieldT<pvd::PVBoolean>("record._options.block")->put(false); // TODO: some way to expose completion...
107 
108  unsigned reqProcess = 0;
109  bool doit = force;
110  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
111  {
112  pvaLink *link = *it;
113 
114  if(!link->used_scratch) continue;
115 
117  temp.swap(link->put_scratch);
118  link->used_scratch = false;
119  temp.swap(link->put_queue);
120  link->used_queue = true;
121 
122  doit = true;
123 
124  switch(link->pp) {
125  case pvaLink::NPP:
126  reqProcess |= 1;
127  break;
128  case pvaLink::Default:
129  break;
130  case pvaLink::PP:
131  case pvaLink::CP:
132  case pvaLink::CPP:
133  reqProcess |= 2;
134  break;
135  }
136  }
137 
138  /* By default, use remote default (passive).
139  * Request processing, or not, if any link asks.
140  * Prefer PP over NPP if both are specified.
141  *
142  * TODO: per field granularity?
143  */
144  const char *proc = "passive";
145  if((reqProcess&2) || force) {
146  proc = "true";
147  } else if(reqProcess&1) {
148  proc = "false";
149  }
150  pvReq->getSubFieldT<pvd::PVString>("record._options.process")->put(proc);
151 
152  DEBUG(this, <<key.first<<"Start put "<<doit);
153  if(doit) {
154  // start net Put, cancels in-progress put
155  op_put = chan.put(this, pvReq);
156  }
157 }
158 
160 {
161  Guard G(lock);
162 
163  pvd::PVStructurePtr top(pvaGlobal->create->createPVStructure(build));
164 
165  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
166  {
167  pvaLink *link = *it;
168 
169  if(!link->used_queue) continue;
170  link->used_queue = false; // clear early so unexpected exception won't get us in a retry loop
171 
172  pvd::PVFieldPtr value(link->fieldName.empty() ? pvd::PVFieldPtr(top) : top->getSubField(link->fieldName));
173  if(value && value->getField()->getType()==pvd::structure) {
174  // maybe drill into NTScalar et al.
175  pvd::PVFieldPtr sub(static_cast<pvd::PVStructure*>(value.get())->getSubField("value"));
176  if(sub)
177  value.swap(sub);
178  }
179 
180  if(!value) continue; // TODO: how to signal error?
181 
182  pvd::PVStringArray::const_svector choices; // TODO populate from op_mon
183 
184  DEBUG(this, <<key.first<<" <- "<<value->getFullName());
185  copyDBF2PVD(link->put_queue, value, args.tosend, choices);
186 
187  link->put_queue.clear();
188  }
189  DEBUG(this, <<key.first<<" Put built");
190 
191  args.root = top;
192 }
193 
195 {
196  if(evt.event==pvac::PutEvent::Fail) {
197  errlogPrintf("%s PVA link put ERROR: %s\n", key.first.c_str(), evt.message.c_str());
198  }
199 
200  Guard G(lock);
201 
202  DEBUG(this, <<key.first<<" Put result "<<evt.event);
203 
205 
206  if(evt.event==pvac::PutEvent::Success) {
207  // see if we need start a queue'd put
208  put();
209  }
210 }
211 
213 {
214  bool queue = false;
215 
216  {
217  DEBUG(this, <<key.first<<" EVENT "<<evt.event);
218  Guard G(lock);
219 
220  switch(evt.event) {
224  queue = true;
225  break;
227  break; // no-op
229  connected = false;
230  queue = true;
231  errlogPrintf("%s: PVA link monitor ERROR: %s\n", chan.name().c_str(), evt.message.c_str());
232  break;
233  }
234 
235  if(queued)
236  return; // already scheduled
237 
238  queued = queue;
239  }
240 
241  if(queue) {
242  pvaGlobal->queue.add(shared_from_this());
243  }
244 }
245 
246 // the work in calling dbProcess() which is common to
247 // both dbScanLock() and dbScanLockMany()
248 void pvaLinkChannel::run_dbProcess(size_t idx)
249 {
250  dbCommon *precord = scan_records[idx];
251 
252  if(scan_check_passive[idx] && precord->scan!=0) {
253  return;
254 
255  } else if(connected_latched && !op_mon.changed.logical_and(scan_changed[idx])) {
256  return;
257 
258  } else if (precord->pact) {
259  if (precord->tpro)
260  printf("%s: Active %s\n",
261  epicsThreadGetNameSelf(), precord->name);
262  precord->rpro = TRUE;
263 
264  }
265  dbProcess(precord);
266 }
267 
268 // Running from global WorkQueue thread
269 void pvaLinkChannel::run()
270 {
271  bool requeue = false;
272  {
273  Guard G(lock);
274 
275  queued = false;
276 
278 
279  // pop next update from monitor queue.
280  // still under lock to safeguard concurrent calls to lset functions
281  if(connected && !op_mon.poll()) {
282  DEBUG(this, <<key.first<<" RUN "<<"empty");
283  run_done.signal();
284  return; // monitor queue is empty, nothing more to do here
285  }
286 
287  DEBUG(this, <<key.first<<" RUN "<<(connected_latched?"connected":"disconnected"));
288 
289  assert(!connected || !!op_mon.root);
290 
291  if(!connected) {
292  num_disconnect++;
293 
294  // cancel pending put operations
296 
297  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
298  {
299  pvaLink *link = *it;
300  link->onDisconnect();
301  }
302 
303  // Don't clear previous_root on disconnect.
304  // We will usually re-connect with the same type,
305  // and may get back the same PVStructure.
306 
307  } else if(previous_root.get() != (const void*)op_mon.root.get()) {
308  num_type_change++;
309 
310  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
311  {
312  pvaLink *link = *it;
313  link->onTypeChange();
314  }
315 
317  }
318 
319  // at this point we know we will re-queue, but not immediately
320  // so an expected error won't get us stuck in a tight loop.
321  requeue = queued = connected_latched;
322 
323  if(links_changed) {
324  // a link has been added or removed since the last update.
325  // rebuild our cached list of records to (maybe) process.
326 
327  scan_records.clear();
328  scan_check_passive.clear();
329  scan_changed.clear();
330 
331  for(links_t::iterator it(links.begin()), end(links.end()); it!=end; ++it)
332  {
333  pvaLink *link = *it;
334  assert(link && link->alive);
335 
336  if(!link->plink) continue;
337 
338  // NPP and none/Default don't scan
339  // PP, CP, and CPP do scan
340  // PP and CPP only if SCAN=Passive
341  if(link->pp != pvaLink::PP && link->pp != pvaLink::CPP && link->pp != pvaLink::CP)
342  continue;
343 
344  scan_records.push_back(link->plink->precord);
345  scan_check_passive.push_back(link->pp != pvaLink::CP);
346  scan_changed.push_back(link->proc_changed);
347  }
348 
349  DBManyLock ML(scan_records);
350 
351  atomic_lock.swap(ML);
352 
353  links_changed = false;
354  }
355  }
356 
357  if(scan_records.empty()) {
358  // Nothing to do, so don't bother locking
359 
360  } else if(isatomic && scan_records.size() > 1u) {
361  DBManyLocker L(atomic_lock);
362 
363  for(size_t i=0, N=scan_records.size(); i<N; i++) {
364  run_dbProcess(i);
365  }
366 
367  } else {
368  for(size_t i=0, N=scan_records.size(); i<N; i++) {
369  DBScanLocker L(scan_records[i]);
370  run_dbProcess(i);
371  }
372  }
373 
374  if(requeue) {
375  // re-queue until monitor queue is empty
376  pvaGlobal->queue.add(shared_from_this());
377  } else {
378  run_done.signal();
379  }
380 }
381 
382 } // namespace pvalink
Operation put(PutCallback *cb, epics::pvData::PVStructure::const_shared_pointer pvRequest=epics::pvData::PVStructure::const_shared_pointer(), bool getprevious=false)
Definition: clientPut.cpp:211
std::string message
set for event=Fail
Definition: client.h:192
Definition: link.h:174
#define max(x, y)
Definition: flexdef.h:81
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
int i
Definition: scan.c:967
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
LIBCOM_API const char *epicsStdCall epicsThreadGetNameSelf(void)
Definition: osdThread.c:846
#define printf
Definition: epicsStdio.h:41
subscription interrupted due to loss of communication
Definition: client.h:189
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
std::string name() const
Channel name or an empty string.
Definition: client.cpp:158
void add(const value_type &work)
Definition: tpool.cpp:79
void clear()
Clear contents. size() becomes 0.
Definition: sharedVector.h:210
Handle for in-progress get/put/rpc operation.
Definition: client.h:50
subscription ends in cancellation
Definition: client.h:188
std::string message
Check when event==Fail.
Definition: client.h:95
std::string name() const
Definition: client.cpp:288
PVString is special case, since it implements SerializableArray.
Definition: pvData.h:521
Information on put completion.
Definition: client.h:88
epics::pvData::PVStructure::const_shared_pointer root
Definition: client.h:160
pvData
Definition: monitor.h:428
epics::pvData::PVStructure::const_shared_pointer root
Callee must fill this in with an instance of the Structure passed as the &#39;build&#39; argument.
Definition: client.h:363
enum pvac::PutEvent::event_t event
void swap(shared_vector_base &o)
Swap the contents of this vector with another.
Definition: sharedVector.h:199
bool logical_and(const BitSet &other) const
Returns true if any bit is set in both *this and other.
Definition: bitSet.cpp:214
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
Information on monitor subscription/queue change.
Definition: client.h:184
ClientChannel connect(const std::string &name, const ClientChannel::Options &conf=ClientChannel::Options())
Definition: client.cpp:295
request ends in failure. Check message
Definition: client.h:91
enum pvac::MonitorEvent::event_t event
Class that holds the data for each possible scalar type.
Definition: pvData.h:54
std::tr1::shared_ptr< PVField > PVFieldPtr
Definition: pvData.h:66
#define TRUE
Definition: dbDefs.h:27
epics::pvData::BitSet & tosend
Callee must set bits corresponding to the fields of &#39;root&#39; which will actually be sent...
Definition: client.h:365
Data queue not empty. Call Monitor::poll()
Definition: client.h:190
long copyDBF2PVD(const pvd::shared_vector< const void > &inbuf, const pvd::PVField::shared_pointer &outraw, pvd::BitSet &changed, const pvd::PVStringArray::const_svector &choices)
Definition: dbf_copy.cpp:164
epics::pvData::BitSet changed
Definition: client.h:161
#define false
Definition: flexdef.h:85
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
It worked!
Definition: client.h:93
subscription ends in an error
Definition: client.h:187
Monitor monitor(MonitorCallback *cb, epics::pvData::PVStructure::const_shared_pointer pvRequest=epics::pvData::PVStructure::const_shared_pointer())
::epics::pvData::shared_vector< const T > const_svector
Definition: pvData.h:1185
void start(unsigned nworkers=1, unsigned prio=epicsThreadPriorityLow)
Definition: tpool.cpp:25