This is Unofficial EPICS BASE Doxygen Site
pvalink.cpp
Go to the documentation of this file.
1 
2 #include <set>
3 #include <map>
4 
5 #define EPICS_DBCA_PRIVATE_API
6 #include <epicsGuard.h>
7 #include <dbAccess.h>
8 #include <dbCommon.h>
9 #include <dbLink.h>
10 #include <dbScan.h>
11 #include <errlog.h>
12 #include <initHooks.h>
13 #include <alarm.h>
14 #include <epicsExit.h>
15 #include <epicsAtomic.h>
16 #include <link.h>
17 #include <dbJLink.h>
18 #include <epicsUnitTest.h>
19 #include <epicsString.h>
20 
21 #include <epicsStdio.h> /* redirects stdout/stderr */
22 
23 #include <pv/pvAccess.h>
24 #include <pv/clientFactory.h>
25 #include <pv/iocshelper.h>
26 #include <pv/reftrack.h>
27 #include <pva/client.h>
28 
29 #include "pv/qsrv.h"
30 #include "helper.h"
31 #include "pvif.h"
32 #include "pvalink.h"
33 
34 #include <epicsExport.h> /* defines epicsExportSharedSymbols */
35 
38 
39 using namespace pvalink;
40 
41 namespace {
42 
43 // halt, and clear, scan workers before dbCloseLinks() (cf. iocShutdown())
44 static void shutdownStep1()
45 {
46  // no locking here as we assume that shutdown doesn't race startup
47  if(!pvaGlobal) return;
48 
49  pvaGlobal->queue.close();
50 }
51 
52 // Cleanup pvaGlobal, including PVA client and QSRV providers ahead of PDB cleanup
53 // specifically QSRV provider must be free'd prior to db_cleanup_events()
54 static void shutdownStep2()
55 {
56  if(!pvaGlobal) return;
57 
58  {
59  Guard G(pvaGlobal->lock);
60  if(pvaGlobal->channels.size()) {
61  fprintf(stderr, "pvaLink leaves %zu channels open\n",
62  pvaGlobal->channels.size());
63  }
64  }
65 
66  delete pvaGlobal;
67  pvaGlobal = NULL;
68 }
69 
70 static void stopPVAPool(void*)
71 {
72  try {
73  shutdownStep1();
74  }catch(std::exception& e){
75  fprintf(stderr, "Error while stopping PVA link pool : %s\n", e.what());
76  }
77 }
78 
79 static void finalizePVA(void*)
80 {
81  try {
82  shutdownStep2();
83  }catch(std::exception& e){
84  fprintf(stderr, "Error initializing pva link handling : %s\n", e.what());
85  }
86 }
87 
88 bool atexitInstalled;
89 
90 /* The Initialization game...
91  *
92  * # Parse links during dbPutString() (calls our jlif*)
93  * # announce initHookAfterCaLinkInit
94  * # dbChannelInit() (needed for QSRV to work)
95  * # Re-parse links (calls to our jlif*)
96  * # Open links. Calls jlif::get_lset() and then lset::openLink()
97  * # announce initHookAfterInitDatabase
98  * # ... scan threads start ...
99  * # announce initHookAfterIocBuilt
100  */
101 void initPVALink(initHookState state)
102 {
103  try {
104  if(state==initHookAfterCaLinkInit) {
105  // before epicsExit(exitDatabase),
106  // so hook registered here will be run after iocShutdown()
107  // which closes links
108  if(pvaGlobal) {
109  cantProceed("# Missing call to testqsrvShutdownOk() and/or testqsrvCleanup()");
110  }
111  pvaGlobal = new pvaGlobal_t;
112 
113  if(!atexitInstalled) {
114  epicsAtExit(finalizePVA, NULL);
115  atexitInstalled = true;
116  }
117 
118  } else if(state==initHookAfterInitDatabase) {
119  pvac::ClientProvider local("server:QSRV"),
120  remote("pva");
121  pvaGlobal->provider_local = local;
122  pvaGlobal->provider_remote = remote;
123 
124  } else if(state==initHookAfterIocBuilt) {
125  // after epicsExit(exitDatabase)
126  // so hook registered here will be run before iocShutdown()
127  epicsAtExit(stopPVAPool, NULL);
128 
129  Guard G(pvaGlobal->lock);
130  pvaGlobal->running = true;
131 
132  for(pvaGlobal_t::channels_t::iterator it(pvaGlobal->channels.begin()), end(pvaGlobal->channels.end());
133  it != end; ++it)
134  {
135  std::tr1::shared_ptr<pvaLinkChannel> chan(it->second.lock());
136  if(!chan) continue;
137 
138  chan->open();
139  }
140  }
141  }catch(std::exception& e){
142  cantProceed("Error initializing pva link handling : %s\n", e.what());
143  }
144 }
145 
146 } // namespace
147 
148 // halt, and clear, scan workers before dbCloseLinks() (cf. iocShutdown())
150 {
151  try {
152  shutdownStep1();
153  }catch(std::exception& e){
154  testAbort("Error while stopping PVA link pool : %s\n", e.what());
155  }
156 }
157 
158 void testqsrvCleanup(void)
159 {
160  try {
161  shutdownStep2();
162  }catch(std::exception& e){
163  testAbort("Error initializing pva link handling : %s\n", e.what());
164  }
165 }
166 
167 void testqsrvWaitForLinkEvent(struct link *plink)
168 {
169  std::tr1::shared_ptr<pvaLinkChannel> lchan;
170  {
171  DBScanLocker lock(plink->precord);
172 
173  if(plink->type!=JSON_LINK || !plink->value.json.jlink || plink->value.json.jlink->pif!=&lsetPVA) {
174  testAbort("Not a PVA link");
175  }
176  pvaLink *pval = static_cast<pvaLink*>(plink->value.json.jlink);
177  lchan = pval->lchan;
178  }
179  if(lchan) {
180  lchan->run_done.wait();
181  }
182 }
183 
184 extern "C"
185 void dbpvar(const char *precordname, int level)
186 {
187  try {
188  if(!pvaGlobal) {
189  printf("PVA links not initialized\n");
190  return;
191  }
192 
193  if (!precordname || precordname[0] == '\0' || !strcmp(precordname, "*")) {
194  precordname = NULL;
195  printf("PVA links in all records\n\n");
196  } else {
197  printf("PVA links in record named '%s'\n\n", precordname);
198  }
199 
200  size_t nchans = 0, nlinks = 0, nconn = 0;
201 
202  pvaGlobal_t::channels_t channels;
203  {
204  Guard G(pvaGlobal->lock);
205  channels = pvaGlobal->channels; // copy snapshot
206  }
207 
208  for(pvaGlobal_t::channels_t::const_iterator it(channels.begin()), end(channels.end());
209  it != end; ++it)
210  {
211  std::tr1::shared_ptr<pvaLinkChannel> chan(it->second.lock());
212  if(!chan) continue;
213 
214  Guard G(chan->lock);
215 
216  if(precordname) {
217  // only show links fields of these records
218  bool match = false;
219  for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end());
220  it2 != end2; ++it2)
221  {
222  const pvaLink *pval = *it2;
223  // plink==NULL shouldn't happen, but we are called for debugging, so be paranoid.
224  if(pval->plink && epicsStrGlobMatch(pval->plink->precord->name, precordname)) {
225  match = true;
226  nlinks++;
227  }
228  }
229  if(!match)
230  continue;
231  }
232 
233  nchans++;
234  if(chan->connected_latched)
235  nconn++;
236 
237  if(!precordname)
238  nlinks += chan->links.size();
239 
240  if(level<=0)
241  continue;
242 
243  if(level>=2 || (!chan->connected_latched && level==1)) {
244  if(chan->key.first.size()<=28) {
245  printf("%28s ", chan->key.first.c_str());
246  } else {
247  printf("%s\t", chan->key.first.c_str());
248  }
249 
250  printf("conn=%c %zu disconnects, %zu type changes",
251  chan->connected_latched?'T':'F',
252  chan->num_disconnect,
253  chan->num_type_change);
254  if(chan->op_put.valid()) {
255  printf(" Put");
256  }
257 
258  if(level>=3) {
259  printf(", provider '%s'", chan->providerName.c_str());
260  }
261  printf("\n");
262  // level 4 reserved for channel/provider details
263 
264  if(level>=5) {
265  for(pvaLinkChannel::links_t::const_iterator it2(chan->links.begin()), end2(chan->links.end());
266  it2 != end2; ++it2)
267  {
268  const pvaLink *pval = *it2;
269 
270  if(!pval->plink)
271  continue;
272  else if(precordname && !epicsStrGlobMatch(pval->plink->precord->name, precordname))
273  continue;
274 
275  const char *fldname = "???";
276  pdbRecordIterator rec(pval->plink->precord);
277  for(bool done = !!dbFirstField(&rec.ent, 0); !done; done = !!dbNextField(&rec.ent, 0))
278  {
279  if(rec.ent.pfield == (void*)pval->plink) {
280  fldname = rec.ent.pflddes->name;
281  break;
282  }
283  }
284 
285  printf("%*s%s.%s", 30, "", pval->plink ? pval->plink->precord->name : "<NULL>", fldname);
286 
287  switch(pval->pp) {
288  case pvaLinkConfig::NPP: printf(" NPP"); break;
289  case pvaLinkConfig::Default: printf(" Def"); break;
290  case pvaLinkConfig::PP: printf(" PP"); break;
291  case pvaLinkConfig::CP: printf(" CP"); break;
292  case pvaLinkConfig::CPP: printf(" CPP"); break;
293  }
294  switch(pval->ms) {
295  case pvaLinkConfig::NMS: printf(" NMS"); break;
296  case pvaLinkConfig::MS: printf(" MS"); break;
297  case pvaLinkConfig::MSI: printf(" MSI"); break;
298  }
299 
300  printf(" Q=%u pipe=%c defer=%c time=%c retry=%c morder=%d\n",
301  unsigned(pval->queueSize),
302  pval->pipeline ? 'T' : 'F',
303  pval->defer ? 'T' : 'F',
304  pval->time ? 'T' : 'F',
305  pval->retry ? 'T' : 'F',
306  pval->monorder);
307  }
308  printf("\n");
309  }
310  }
311  }
312 
313  printf(" %zu/%zu channels connected used by %zu links\n",
314  nconn, nchans, nlinks);
315 
316  } catch(std::exception& e) {
317  fprintf(stderr, "Error: %s\n", e.what());
318  }
319 }
320 
321 static
322 void installPVAAddLinkHook()
323 {
324  initHookRegister(&initPVALink);
325  epics::iocshRegister<const char*, int, &dbpvar>("dbpvar", "record name", "level");
328 }
329 
330 extern "C" {
331  epicsExportRegistrar(installPVAAddLinkHook);
335 }
long dbFirstField(DBENTRY *pdbentry, int dctonly)
Definition: dbStaticLib.c:1316
epicsMutexId lock
Definition: osiClockTime.c:37
dbFldDes * pflddes
Definition: dbStaticLib.h:36
int epicsStrGlobMatch(const char *str, const char *pattern)
Definition: epicsString.c:279
#define printf
Definition: epicsStdio.h:41
#define NULL
Definition: catime.c:38
void * pfield
Definition: dbStaticLib.h:39
void testAbort(const char *fmt,...)
long dbNextField(DBENTRY *pdbentry, int dctonly)
Definition: dbStaticLib.c:1323
Extended replacement for the Posix exit and atexit routines.
Central client context.
Definition: client.h:517
void registerRefCounter(const char *name, const size_t *counter)
Definition: reftrack.cpp:59
int initHookRegister(initHookFunction func)
Definition: initHooks.c:51
Unit test routines.
void done(int k)
Definition: antelope.c:77
LIBCOM_API void cantProceed(const char *msg,...)
Definition: cantProceed.c:54
initHookState
Definition: initHooks.h:25
#define stderr
Definition: epicsStdio.h:32
char * name
Definition: dbBase.h:81
#define epicsAtExit(F, A)
Convenience macro to register a function and context value to be run when the process exits...
Definition: epicsExit.h:70
struct json_link json
Definition: link.h:177
DBENTRY ent
Definition: pvif.h:129
Exporting IOC objects.