This is Unofficial EPICS BASE Doxygen Site
rpcClient.cpp
Go to the documentation of this file.
1 
7 #include <iostream>
8 #include <string>
9 
10 #include <epicsEvent.h>
11 #include <pv/pvData.h>
12 #include <pv/event.h>
13 #include <pv/current_function.h>
14 
15 #define epicsExportSharedSymbols
16 #include <pv/pvAccess.h>
17 #include <pv/clientFactory.h>
18 #include <pv/logger.h>
19 #include <pv/rpcService.h>
20 
21 #include "pv/rpcClient.h"
22 
23 #if 0
24 # define TRACE(msg) std::cerr<<"TRACE: "<<CURRENT_FUNCTION<<" : "<< msg <<"\n"
25 #else
26 # define TRACE(msg)
27 #endif
28 
29 namespace pvd = epics::pvData;
30 namespace pva = epics::pvAccess;
31 
32 namespace epics{namespace pvAccess{
33 
35 {
37 
39  ChannelRPC::shared_pointer op;
41  epics::pvData::PVStructure::shared_pointer next_args, last_data;
44 
46  :conn_status(pvd::Status::error("Never connected"))
47  ,resp_status(pvd::Status::error("Never connected"))
48  ,inprogress(false)
49  ,last(false)
50  {}
51  virtual ~RPCRequester() {}
52 
53  virtual std::string getRequesterName() { return "RPCClient::RPCRequester"; }
54 
55  virtual void channelRPCConnect(
56  const pvd::Status& status,
57  ChannelRPC::shared_pointer const & operation)
58  {
59  bool lastreq, inprog;
60  pvd::PVStructure::shared_pointer args;
61  {
62  pvd::Lock L(mutex);
63  TRACE("status="<<status);
64  op = operation;
65  conn_status = status;
66  args.swap(next_args);
67  lastreq = last;
68  inprog = inprogress;
69  if(inprog && args)
70  TRACE("request deferred: "<<args);
71  }
72  if(inprog && args) {
73  TRACE("request deferred: "<<args);
74  if(lastreq)
75  operation->lastRequest();
76  operation->request(args);
77  }
78  event.signal();
79  }
80 
81  virtual void requestDone(
82  const pvd::Status& status,
83  ChannelRPC::shared_pointer const & operation,
84  pvd::PVStructure::shared_pointer const & pvResponse)
85  {
86  TRACE("status="<<status<<" response:\n"<<pvResponse<<"\n");
87  {
88  pvd::Lock L(mutex);
89  if(!inprogress) {
90  std::cerr<<"pva provider give RPC requestDone() when no request in progress\n";
91  } else {
92  resp_status = status;
93  last_data = pvResponse;
94  if(resp_status.isSuccess() && !last_data) {
95  resp_status = pvd::Status::error("No reply data");
96  }
97  inprogress = false;
98  }
99  }
100  event.signal();
101  }
102 
103  virtual void channelDisconnect(bool destroy)
104  {
105  TRACE("destroy="<<destroy);
106  {
107  pvd::Lock L(mutex);
108  resp_status = conn_status = pvd::Status::error("Connection lost");
109  last_data.reset();
110  next_args.reset();
111  inprogress = false;
112  }
113  event.signal();
114  }
115 };
116 
117 
118 RPCClient::RPCClient(const std::string & serviceName,
119  pvd::PVStructure::shared_pointer const & pvRequest,
120  const ChannelProvider::shared_pointer &provider,
121  const std::string &address)
122  : m_serviceName(serviceName)
123  , m_provider(provider)
124  , m_pvRequest(pvRequest ? pvRequest : pvd::createRequest(""))
125 {
127  if(!m_provider)
128  m_provider = ChannelProviderRegistry::clients()->getProvider("pva");
129  if(!m_provider)
130  throw std::logic_error("Unknown Provider");
131 
132  m_channel = m_provider->createChannel(serviceName, DefaultChannelRequester::build(),
134  address);
135 
136  if(!m_channel)
137  throw std::logic_error("provider createChannel() succeeds w/ NULL Channel");
138 
139  m_rpc_requester.reset(new RPCRequester);
140  m_rpc = m_channel->createChannelRPC(m_rpc_requester, m_pvRequest);
141  if(!m_rpc)
142  throw std::logic_error("channel createChannelRPC() NULL");
143 }
144 
146 {
147  if (m_channel)
148  {
149  m_channel->destroy();
150  m_channel.reset();
151  }
152  if (m_rpc)
153  {
154  m_rpc->destroy();
155  m_rpc.reset();
156  }
157 }
158 
160 {
161  issueConnect();
162  return waitConnect(timeout);
163 }
164 
166 {
167 }
168 
170 {
171  pvd::Lock L(m_rpc_requester->mutex);
172  TRACE("timeout="<<timeout);
173  while(!m_rpc_requester->conn_status.isSuccess()) {
174  L.unlock();
175  if(!m_rpc_requester->event.wait(timeout)) {
176  TRACE("TIMEOUT");
177  return false;
178  }
179  L.lock();
180  }
181  TRACE("Connected");
182  return true;
183 }
184 
185 
186 
187 pvd::PVStructure::shared_pointer RPCClient::request(
188  pvd::PVStructure::shared_pointer const & pvArgument,
189  double timeout,
190  bool lastRequest)
191 {
192  if (connect(timeout))
193  {
194  issueRequest(pvArgument, lastRequest);
195  return waitResponse(timeout); // TODO reduce timeout for a time spent on connect
196  }
197  else
199 }
200 
202  pvd::PVStructure::shared_pointer const & pvArgument,
203  bool lastRequest)
204 {
205  {
206  pvd::Lock L(m_rpc_requester->mutex);
207  TRACE("conn_status="<<m_rpc_requester->conn_status
208  <<" resp_status="<<m_rpc_requester->resp_status
209  <<" args:\n"<<pvArgument);
210  if(m_rpc_requester->inprogress)
211  throw std::logic_error("Request already in progress");
212  m_rpc_requester->inprogress = true;
213  m_rpc_requester->resp_status = pvd::Status::error("No Data");
214  if(!m_rpc_requester->conn_status.isSuccess()) {
215  TRACE("defer");
216  m_rpc_requester->last = lastRequest;
217  m_rpc_requester->next_args = pvArgument;
218  return;
219  }
220  TRACE("request args: "<<pvArgument);
221  }
222  if(lastRequest)
223  m_rpc->lastRequest();
224  m_rpc->request(pvArgument);
225 }
226 
227 pvd::PVStructure::shared_pointer RPCClient::waitResponse(double timeout)
228 {
229  pvd::Lock L(m_rpc_requester->mutex);
230  TRACE("timeout="<<timeout);
231 
232  while(m_rpc_requester->inprogress)
233  {
234  L.unlock();
235  if(!m_rpc_requester->event.wait(timeout)) {
236  TRACE("TIMEOUT");
238  }
239  L.lock();
240  }
241  TRACE("Complete: conn_status="<<m_rpc_requester->conn_status
242  <<" resp_status="<<m_rpc_requester->resp_status
243  <<" data:\n"<<m_rpc_requester->last_data);
244 
245  if(!m_rpc_requester->conn_status.isSuccess())
246  throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->conn_status.getMessage());
247 
248  if(!m_rpc_requester->resp_status.isSuccess())
249  throw RPCRequestException(pvd::Status::STATUSTYPE_ERROR, m_rpc_requester->resp_status.getMessage());
250 
251  // consume last_data so that we can't possibly return it twice
252  pvd::PVStructure::shared_pointer data;
253  data.swap(m_rpc_requester->last_data);
254 
255  if(!data)
256  throw std::logic_error("No request in progress");
257 
258  // copy it so that the caller need not worry about whether it will overwritten
259  // when the next request is issued
260  pvd::PVStructure::shared_pointer ret(pvd::getPVDataCreate()->createPVStructure(data->getStructure()));
261  ret->copyUnchecked(*data);
262 
263  return ret;
264 }
265 
266 RPCClient::shared_pointer RPCClient::create(const std::string & serviceName,
267  pvd::PVStructure::shared_pointer const & pvRequest)
268 {
269  return RPCClient::shared_pointer(new RPCClient(serviceName, pvRequest));
270 }
271 
272 
273 }}// namespace epics::pvAccess
double timeout
Definition: pvutils.cpp:25
RPCClient(const std::string &serviceName, epics::pvData::PVStructure::shared_pointer const &pvRequest, const ChannelProvider::shared_pointer &provider=ChannelProvider::shared_pointer(), const std::string &address=std::string())
pvd::Status status
virtual void channelRPCConnect(const pvd::Status &status, ChannelRPC::shared_pointer const &operation)
Definition: rpcClient.cpp:55
static Status error(const std::string &m)
Definition: status.h:50
virtual void requestDone(const pvd::Status &status, ChannelRPC::shared_pointer const &operation, pvd::PVStructure::shared_pointer const &pvResponse)
Definition: rpcClient.cpp:81
void lock()
Definition: lock.h:55
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
A lock for multithreading.
Definition: lock.h:36
void unlock()
Definition: lock.h:66
PVStructure::shared_pointer createRequest(std::string const &request)
const std::tr1::weak_ptr< Process2PutProxy > operation
Definition: pvAccess.cpp:69
virtual std::string getRequesterName()
Definition: rpcClient.cpp:53
bool connect(double timeout=RPCCLIENT_DEFAULT_TIMEOUT)
Definition: rpcClient.cpp:159
Holds all PVA related.
Definition: pvif.h:34
epics::pvData::PVStructure::shared_pointer last_data
Definition: rpcClient.cpp:41
epics::pvData::PVStructure::shared_pointer request(epics::pvData::PVStructure::shared_pointer const &pvArgument, double timeout=RPCCLIENT_DEFAULT_TIMEOUT, bool lastRequest=false)
Definition: rpcClient.cpp:187
pvData
Definition: monitor.h:428
bool isSuccess() const
Definition: status.h:103
static ChannelProviderRegistry::shared_pointer clients()
virtual void channelDisconnect(bool destroy)
Definition: rpcClient.cpp:103
APIs for the epicsEvent binary semaphore.
bool waitConnect(double timeout=RPCCLIENT_DEFAULT_TIMEOUT)
Definition: rpcClient.cpp:169
#define TRACE(msg)
Definition: rpcClient.cpp:26
epics::pvData::PVStructure::shared_pointer waitResponse(double timeout=RPCCLIENT_DEFAULT_TIMEOUT)
Definition: rpcClient.cpp:227
epics::pvData::PVStructure::shared_pointer next_args
Definition: rpcClient.cpp:41
ChannelRPC::shared_pointer op
Definition: rpcClient.cpp:39
static const short PRIORITY_DEFAULT
Definition: pvAccess.h:1241
void issueRequest(epics::pvData::PVStructure::shared_pointer const &pvArgument, bool lastRequest=false)
Definition: rpcClient.cpp:201
epicsMutex Mutex
Definition: lock.h:28
static ChannelRequester::shared_pointer build()
Definition: pvAccess.cpp:444
#define false
Definition: flexdef.h:85
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
static shared_pointer create(const std::string &serviceName, epics::pvData::PVStructure::shared_pointer const &pvRequest=epics::pvData::PVStructure::shared_pointer())
Definition: rpcClient.cpp:266