This is Unofficial EPICS BASE Doxygen Site
rpcServer.cpp
Go to the documentation of this file.
1 
7 #include <stdexcept>
8 #include <vector>
9 #include <utility>
10 
11 #define epicsExportSharedSymbols
12 #include <pv/rpcServer.h>
13 #include <pv/serverContextImpl.h>
14 #include <pv/wildcard.h>
15 
16 using namespace epics::pvData;
17 using std::string;
18 
19 namespace epics {
20 namespace pvAccess {
21 
22 
24  public ChannelRPC,
25  public RPCResponseCallback,
26  public std::tr1::enable_shared_from_this<ChannelRPCServiceImpl>
27 {
28 private:
29  Channel::shared_pointer m_channel;
30  ChannelRPCRequester::shared_pointer m_channelRPCRequester;
31  RPCServiceAsync::shared_pointer m_rpcService;
32  AtomicBoolean m_lastRequest;
33 
34 public:
36  Channel::shared_pointer const & channel,
37  ChannelRPCRequester::shared_pointer const & channelRPCRequester,
38  RPCServiceAsync::shared_pointer const & rpcService) :
39  m_channel(channel),
40  m_channelRPCRequester(channelRPCRequester),
41  m_rpcService(rpcService),
42  m_lastRequest()
43  {
44  }
45 
47  {
48  destroy();
49  }
50 
51  virtual void requestDone(
53  epics::pvData::PVStructure::shared_pointer const & result
54  )
55  {
56  m_channelRPCRequester->requestDone(status, shared_from_this(), result);
57 
58  if (m_lastRequest.get())
59  destroy();
60  }
61 
62  virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
63  {
64  try
65  {
66  m_rpcService->request(pvArgument, shared_from_this());
67  }
68  catch (std::exception& ex)
69  {
70  // handle user unexpected errors
71  Status errorStatus(Status::STATUSTYPE_FATAL, ex.what());
72 
73  m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
74 
75  if (m_lastRequest.get())
76  destroy();
77  }
78  catch (...)
79  {
80  // handle user unexpected errors
81  Status errorStatus(Status::STATUSTYPE_FATAL,
82  "Unexpected exception caught while calling RPCServiceAsync.request(PVStructure, RPCResponseCallback).");
83 
84  m_channelRPCRequester->requestDone(errorStatus, shared_from_this(), PVStructure::shared_pointer());
85 
86  if (m_lastRequest.get())
87  destroy();
88  }
89 
90  // we wait for callback to be called
91  }
92 
93  void lastRequest()
94  {
95  m_lastRequest.set();
96  }
97 
98  virtual Channel::shared_pointer getChannel()
99  {
100  return m_channel;
101  }
102 
103  virtual void cancel()
104  {
105  // noop
106  }
107 
108  virtual void destroy()
109  {
110  // noop
111  }
112 };
113 
114 
115 
116 
117 class RPCChannel :
118  public Channel,
119  public std::tr1::enable_shared_from_this<RPCChannel>
120 {
121 private:
122 
123  AtomicBoolean m_destroyed;
124 
125  ChannelProvider::shared_pointer m_provider;
126  string m_channelName;
127  ChannelRequester::shared_pointer m_channelRequester;
128 
129  RPCServiceAsync::shared_pointer m_rpcService;
130 
131 public:
133 
135  ChannelProvider::shared_pointer const & provider,
136  string const & channelName,
137  ChannelRequester::shared_pointer const & channelRequester,
138  RPCServiceAsync::shared_pointer const & rpcService) :
139  m_provider(provider),
140  m_channelName(channelName),
141  m_channelRequester(channelRequester),
142  m_rpcService(rpcService)
143  {
144  }
145 
146  virtual ~RPCChannel()
147  {
148  destroy();
149  }
150 
151  virtual std::tr1::shared_ptr<ChannelProvider> getProvider()
152  {
153  return m_provider;
154  }
155 
156  virtual std::string getRemoteAddress()
157  {
158  // local
159  return getChannelName();
160  }
161 
163  {
164  return (!m_destroyed.get()) ?
165  Channel::CONNECTED :
166  Channel::DESTROYED;
167  }
168 
169  virtual std::string getChannelName()
170  {
171  return m_channelName;
172  }
173 
174  virtual std::tr1::shared_ptr<ChannelRequester> getChannelRequester()
175  {
176  return m_channelRequester;
177  }
178 
179  virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/)
180  {
181  return none;
182  }
183 
184  virtual ChannelRPC::shared_pointer createChannelRPC(
185  ChannelRPCRequester::shared_pointer const & channelRPCRequester,
186  epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
187  {
188  // nothing expected to be in pvRequest
189 
190  if (channelRPCRequester.get() == 0)
191  throw std::invalid_argument("channelRPCRequester == null");
192 
193  if (m_destroyed.get())
194  {
195  ChannelRPC::shared_pointer nullPtr;
196  channelRPCRequester->channelRPCConnect(epics::pvData::Status(epics::pvData::Status::STATUSTYPE_ERROR, "channel destroyed"), nullPtr);
197  return nullPtr;
198  }
199 
200  // TODO use std::make_shared
201  std::tr1::shared_ptr<ChannelRPCServiceImpl> tp(
202  new ChannelRPCServiceImpl(shared_from_this(), channelRPCRequester, m_rpcService)
203  );
204  ChannelRPC::shared_pointer channelRPCImpl = tp;
205  channelRPCRequester->channelRPCConnect(Status::Ok, channelRPCImpl);
206  return channelRPCImpl;
207  }
208 
209  virtual void printInfo(std::ostream& out)
210  {
211  out << "RPCChannel: ";
212  out << getChannelName();
213  out << " [";
214  out << Channel::ConnectionStateNames[getConnectionState()];
215  out << "]";
216  }
217 
218  virtual string getRequesterName()
219  {
220  return getChannelName();
221  }
222 
223  virtual void destroy()
224  {
225  m_destroyed.set();
226  }
227 };
228 
229 Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const & provider,
230  std::string const & channelName,
231  ChannelRequester::shared_pointer const & channelRequester,
232  RPCServiceAsync::shared_pointer const & rpcService)
233 {
234  // TODO use std::make_shared
235  std::tr1::shared_ptr<RPCChannel> tp(
236  new RPCChannel(provider, channelName, channelRequester, rpcService)
237  );
238  Channel::shared_pointer channel = tp;
239  return channel;
240 }
241 
242 
244  public virtual ChannelProvider,
245  public virtual ChannelFind,
246  public std::tr1::enable_shared_from_this<RPCChannelProvider> {
247 
248 public:
250 
251  static const string PROVIDER_NAME;
252 
254 
255  // TODO thread pool support
256 
258  }
259 
260  virtual string getProviderName() {
261  return PROVIDER_NAME;
262  }
263 
264  virtual std::tr1::shared_ptr<ChannelProvider> getChannelProvider()
265  {
266  return shared_from_this();
267  }
268 
269  virtual void cancel() {}
270 
271  virtual void destroy() {}
272 
273  virtual ChannelFind::shared_pointer channelFind(std::string const & channelName,
274  ChannelFindRequester::shared_pointer const & channelFindRequester)
275  {
276  bool found;
277  {
278  Lock guard(m_mutex);
279  found = (m_services.find(channelName) != m_services.end()) ||
280  findWildService(channelName);
281  }
282  ChannelFind::shared_pointer thisPtr(shared_from_this());
283  channelFindRequester->channelFindResult(Status::Ok, thisPtr, found);
284  return thisPtr;
285  }
286 
287 
288  virtual ChannelFind::shared_pointer channelList(
289  ChannelListRequester::shared_pointer const & channelListRequester)
290  {
291  if (!channelListRequester.get())
292  throw std::runtime_error("null requester");
293 
294  PVStringArray::svector channelNames;
295  {
296  Lock guard(m_mutex);
297  channelNames.reserve(m_services.size());
298  for (RPCServiceMap::const_iterator iter = m_services.begin();
299  iter != m_services.end();
300  iter++)
301  channelNames.push_back(iter->first);
302  }
303 
304  ChannelFind::shared_pointer thisPtr(shared_from_this());
305  channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false);
306  return thisPtr;
307  }
308 
309  virtual Channel::shared_pointer createChannel(
310  std::string const & channelName,
311  ChannelRequester::shared_pointer const & channelRequester,
312  short /*priority*/)
313  {
314  RPCServiceAsync::shared_pointer service;
315 
316  RPCServiceMap::const_iterator iter;
317  {
318  Lock guard(m_mutex);
319  iter = m_services.find(channelName);
320  }
321  if (iter != m_services.end())
322  service = iter->second;
323 
324  // check for wild services
325  if (!service)
326  service = findWildService(channelName);
327 
328  if (!service)
329  {
330  Channel::shared_pointer nullChannel;
331  channelRequester->channelCreated(noSuchChannelStatus, nullChannel);
332  return nullChannel;
333  }
334 
335  // TODO use std::make_shared
336  std::tr1::shared_ptr<RPCChannel> tp(
337  new RPCChannel(
338  shared_from_this(),
339  channelName,
340  channelRequester,
341  service));
342  Channel::shared_pointer rpcChannel = tp;
343  channelRequester->channelCreated(Status::Ok, rpcChannel);
344  return rpcChannel;
345  }
346 
347  virtual Channel::shared_pointer createChannel(
348  std::string const & /*channelName*/,
349  ChannelRequester::shared_pointer const & /*channelRequester*/,
350  short /*priority*/,
351  std::string const & /*address*/)
352  {
353  // this will never get called by the pvAccess server
354  throw std::runtime_error("not supported");
355  }
356 
357  void registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service)
358  {
359  Lock guard(m_mutex);
360  m_services[serviceName] = service;
361 
362  if (isWildcardPattern(serviceName))
363  m_wildServices.push_back(std::make_pair(serviceName, service));
364  }
365 
366  void unregisterService(std::string const & serviceName)
367  {
368  Lock guard(m_mutex);
369  m_services.erase(serviceName);
370 
371  if (isWildcardPattern(serviceName))
372  {
373  for (RPCWildServiceList::iterator iter = m_wildServices.begin();
374  iter != m_wildServices.end();
375  iter++)
376  if (iter->first == serviceName)
377  {
378  m_wildServices.erase(iter);
379  break;
380  }
381  }
382  }
383 
384 private:
385  // assumes sync on services
386  RPCServiceAsync::shared_pointer findWildService(string const & wildcard)
387  {
388  if (!m_wildServices.empty())
389  for (RPCWildServiceList::iterator iter = m_wildServices.begin();
390  iter != m_wildServices.end();
391  iter++)
392  if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str()))
393  return iter->second;
394 
395  return RPCServiceAsync::shared_pointer();
396  }
397 
398  // (too) simple check
399  bool isWildcardPattern(string const & pattern)
400  {
401  return
402  (pattern.find('*') != string::npos ||
403  pattern.find('?') != string::npos ||
404  (pattern.find('[') != string::npos && pattern.find(']') != string::npos));
405  }
406 
407  typedef std::map<string, RPCServiceAsync::shared_pointer> RPCServiceMap;
408  RPCServiceMap m_services;
409 
410  typedef std::vector<std::pair<string, RPCServiceAsync::shared_pointer> > RPCWildServiceList;
411  RPCWildServiceList m_wildServices;
412 
413  epics::pvData::Mutex m_mutex;
414 };
415 
416 const string RPCChannelProvider::PROVIDER_NAME("rpcService");
417 const Status RPCChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel");
418 
419 
420 RPCServer::RPCServer(const Configuration::const_shared_pointer &conf)
421  :m_channelProviderImpl(new RPCChannelProvider)
422 {
423  m_serverContext = ServerContext::create(ServerContext::Config()
424  .config(conf)
425  .provider(m_channelProviderImpl));
426 }
427 
429 {
430  // multiple destroy call is OK
431  destroy();
432 }
433 
435 {
436  std::cout << m_serverContext->getVersion().getVersionString() << std::endl;
437  m_serverContext->printInfo();
438 }
439 
440 void RPCServer::run(int seconds)
441 {
442  m_serverContext->run(seconds);
443 }
444 
446  RPCServer::shared_pointer server;
448 };
449 
450 static void threadRunner(void* usr)
451 {
452  ThreadRunnerParam* pusr = static_cast<ThreadRunnerParam*>(usr);
453  ThreadRunnerParam param = *pusr;
454  delete pusr;
455 
456  param.server->run(param.timeToRun);
457 }
458 
461 void RPCServer::runInNewThread(int seconds)
462 {
463  epics::auto_ptr<ThreadRunnerParam> param(new ThreadRunnerParam());
464  param->server = shared_from_this();
465  param->timeToRun = seconds;
466 
467  epicsThreadCreate("RPCServer thread",
470  threadRunner, param.get());
471 
472  // let the thread delete 'param'
473  param.release();
474 }
475 
477 {
478  m_serverContext->shutdown();
479 }
480 
481 void RPCServer::registerService(std::string const & serviceName, RPCServiceAsync::shared_pointer const & service)
482 {
483  m_channelProviderImpl->registerService(serviceName, service);
484 }
485 
486 void RPCServer::unregisterService(std::string const & serviceName)
487 {
488  m_channelProviderImpl->unregisterService(serviceName);
489 }
490 
491 }
492 }
virtual std::tr1::shared_ptr< ChannelProvider > getChannelProvider()
Definition: rpcServer.cpp:264
virtual ChannelRPC::shared_pointer createChannelRPC(ChannelRPCRequester::shared_pointer const &channelRPCRequester, epics::pvData::PVStructure::shared_pointer const &)
Definition: rpcServer.cpp:184
pvac::PutEvent result
Definition: clientSync.cpp:117
A holder for a contiguous piece of memory.
Definition: sharedVector.h:27
pvd::Status status
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
static Status Ok
Definition: status.h:47
static const string PROVIDER_NAME
Definition: rpcServer.cpp:251
virtual std::tr1::shared_ptr< ChannelRequester > getChannelRequester()
Definition: rpcServer.cpp:174
virtual Channel::shared_pointer getChannel()
Definition: rpcServer.cpp:98
virtual Channel::shared_pointer createChannel(std::string const &, ChannelRequester::shared_pointer const &, short, std::string const &)
Definition: rpcServer.cpp:347
void unregisterService(std::string const &serviceName)
Definition: rpcServer.cpp:486
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
virtual ChannelFind::shared_pointer channelFind(std::string const &channelName, ChannelFindRequester::shared_pointer const &channelFindRequester)
Definition: rpcServer.cpp:273
A lock for multithreading.
Definition: lock.h:36
void unregisterService(std::string const &serviceName)
Definition: rpcServer.cpp:366
virtual void request(epics::pvData::PVStructure::shared_pointer const &pvArgument)
Definition: rpcServer.cpp:62
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
epicsThreadId epicsStdCall epicsThreadCreate(const char *name, unsigned int priority, unsigned int stackSize, EPICSTHREADFUNC funptr, void *parm)
Definition: epicsThread.cpp:33
void registerService(std::string const &serviceName, RPCServiceAsync::shared_pointer const &service)
Definition: rpcServer.cpp:357
virtual std::tr1::shared_ptr< ChannelProvider > getProvider()
Definition: rpcServer.cpp:151
RPCChannel(ChannelProvider::shared_pointer const &provider, string const &channelName, ChannelRequester::shared_pointer const &channelRequester, RPCServiceAsync::shared_pointer const &rpcService)
Definition: rpcServer.cpp:134
pvData
Definition: monitor.h:428
void registerService(std::string const &serviceName, RPCServiceAsync::shared_pointer const &service)
Definition: rpcServer.cpp:481
#define POINTER_DEFINITIONS(clazz)
Definition: sharedPtr.h:198
void push_back(param_type v)
Definition: sharedVector.h:602
ChannelRPCServiceImpl(Channel::shared_pointer const &channel, ChannelRPCRequester::shared_pointer const &channelRPCRequester, RPCServiceAsync::shared_pointer const &rpcService)
Definition: rpcServer.cpp:35
virtual void printInfo(std::ostream &out)
Definition: rpcServer.cpp:209
virtual string getRequesterName()
Definition: rpcServer.cpp:218
epicsShareFunc Channel::shared_pointer createRPCChannel(ChannelProvider::shared_pointer const &provider, std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, RPCServiceAsync::shared_pointer const &rpcService)
Definition: rpcServer.cpp:229
virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const &)
Definition: rpcServer.cpp:179
virtual ConnectionState getConnectionState()
Definition: rpcServer.cpp:162
virtual ChannelFind::shared_pointer channelList(ChannelListRequester::shared_pointer const &channelListRequester)
Definition: rpcServer.cpp:288
virtual std::string getChannelName()
Definition: rpcServer.cpp:169
virtual std::string getRemoteAddress()
Definition: rpcServer.cpp:156
RPCServer::shared_pointer server
Definition: rpcServer.cpp:446
virtual Channel::shared_pointer createChannel(std::string const &channelName, ChannelRequester::shared_pointer const &channelRequester, short)
Definition: rpcServer.cpp:309
void runInNewThread(int seconds=0)
Definition: rpcServer.cpp:461
void run(int seconds=0)
Definition: rpcServer.cpp:440
if(yy_init)
Definition: scan.c:972
Options for a server insatnce.
static ServerContext::shared_pointer create(const Config &conf=Config())
static const Status noSuchChannelStatus
Definition: rpcServer.cpp:253
epicsMutex Mutex
Definition: lock.h:28
void reserve(size_t i)
Set array capacity.
Definition: sharedVector.h:428
virtual void requestDone(epics::pvData::Status const &status, epics::pvData::PVStructure::shared_pointer const &result)
Definition: rpcServer.cpp:51