This is Unofficial EPICS BASE Doxygen Site
pvaClientRPC.cpp
Go to the documentation of this file.
1 /* pvaClientRPC.cpp */
12 #include <sstream>
13 #include <pv/event.h>
14 #include <pv/bitSetUtil.h>
15 #include <pv/rpcService.h>
16 
17 #define epicsExportSharedSymbols
18 
19 #include <pv/pvaClient.h>
20 
21 using namespace epics::pvData;
22 using namespace epics::pvAccess;
23 using namespace std;
24 
25 namespace epics { namespace pvaClient {
26 
28 {
29  PvaClientRPC::weak_pointer pvaClientRPC;
30  PvaClient::weak_pointer pvaClient;
31 public:
33  PvaClientRPCPtr const & pvaClientRPC,
34  PvaClientPtr const &pvaClient)
35  : pvaClientRPC(pvaClientRPC),
36  pvaClient(pvaClient)
37  {}
38  virtual ~RPCRequesterImpl() {
39  if(PvaClient::getDebug()) std::cout << "~RPCRequesterImpl" << std::endl;
40  }
41 
42  virtual std::string getRequesterName() {
43  PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
44  if(!clientRPC) return string("pvaClientRPC is null");
45  return clientRPC->getRequesterName();
46  }
47 
48  virtual void message(std::string const & message, epics::pvData::MessageType messageType) {
49  PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
50  if(!clientRPC) return;
51  clientRPC->message(message,messageType);
52  }
53 
54  virtual void channelRPCConnect(
56  ChannelRPC::shared_pointer const & channelRPC)
57  {
58  PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
59  if(!clientRPC) return;
60  clientRPC->rpcConnect(status,channelRPC);
61  }
62 
63  virtual void requestDone(
64  const Status& status,
65  ChannelRPC::shared_pointer const & channelRPC,
66  PVStructure::shared_pointer const & pvResponse)
67  {
68  PvaClientRPCPtr clientRPC(pvaClientRPC.lock());
69  if(!clientRPC) return;
70  clientRPC->requestDone(status,channelRPC,pvResponse);
71  }
72 };
73 
74 PvaClientRPCPtr PvaClientRPC::create(
75  PvaClientPtr const &pvaClient,
76  Channel::shared_pointer const & channel)
77 {
78  StructureConstPtr structure(getFieldCreate()->createStructure());
79  PVStructurePtr pvRequest(getPVDataCreate()->createPVStructure(structure));
80  return create(pvaClient,channel,pvRequest);
81 }
82 PvaClientRPCPtr PvaClientRPC::create(
83  PvaClientPtr const &pvaClient,
84  Channel::shared_pointer const & channel,
85  PVStructurePtr const &pvRequest)
86 {
87  PvaClientRPCPtr epv(new PvaClientRPC(pvaClient,channel,pvRequest));
88  epv->rpcRequester = RPCRequesterImplPtr(
89  new RPCRequesterImpl(epv,pvaClient));
90  return epv;
91 }
92 
93 PvaClientRPC::PvaClientRPC(
94  PvaClientPtr const &pvaClient,
95  Channel::shared_pointer const & channel,
96  PVStructurePtr const &pvRequest)
97 :
98  connectState(connectIdle),
99  pvaClient(pvaClient),
100  channel(channel),
101  pvRequest(pvRequest),
102  rpcState(rpcIdle),
103  responseTimeout(0.0)
104 {
105  if(PvaClient::getDebug()) {
106  cout<< "PvaClientRPC::PvaClientRPC()"
107  << " channelName " << channel->getChannelName()
108  << endl;
109  }
110 }
111 
112 PvaClientRPC::~PvaClientRPC()
113 {
114  if(PvaClient::getDebug()) {
115  string channelName("disconnected");
116  Channel::shared_pointer chan(channel.lock());
117  if(chan) channelName = chan->getChannelName();
118  cout<< "PvaClientRPC::~PvaClientRPC"
119  << " channelName " << channelName
120  << endl;
121  }
122 }
123 
124 void PvaClientRPC::checkRPCState()
125 {
126  if(PvaClient::getDebug()) {
127  string channelName("disconnected");
128  Channel::shared_pointer chan(channel.lock());
129  if(chan) channelName = chan->getChannelName();
130  cout << "PvaClientRPC::checkRPCState"
131  << " channelName " << channelName
132  << " connectState " << connectState
133  << endl;
134  }
135  if(connectState==connectIdle) connect();
136 }
137 
138 string PvaClientRPC::getRequesterName()
139 {
140  PvaClientPtr yyy = pvaClient.lock();
141  if(!yyy) return string("PvaClientRPC::getRequesterName() PvaClient isDestroyed");
142  return yyy->getRequesterName();
143 }
144 
145 void PvaClientRPC::message(string const & message,MessageType messageType)
146 {
147  PvaClientPtr yyy = pvaClient.lock();
148  if(!yyy) return;
149  yyy->message(message, messageType);
150 }
151 
152 void PvaClientRPC::rpcConnect(
153  const Status& status,
154  ChannelRPC::shared_pointer const & channelRPC)
155 {
156  Channel::shared_pointer chan(channel.lock());
157  if(PvaClient::getDebug()) {
158  string channelName("disconnected");
159  Channel::shared_pointer chan(channel.lock());
160  if(chan) channelName = chan->getChannelName();
161  cout << "PvaClientRPC::rpcConnect"
162  << " channelName " << channelName
163  << " status.isOK " << (status.isOK() ? "true" : "false")
164  << endl;
165  }
166  if(!chan) return;
167  connectStatus = status;
168  connectState = connected;
169  if(PvaClient::getDebug()) {
170  cout << "PvaClientRPC::rpcConnect calling waitForConnect.signal\n";
171  }
172  waitForConnect.signal();
173 
174 }
175 
176 void PvaClientRPC::requestDone(
177  const Status& status,
178  ChannelRPC::shared_pointer const & channelRPC,
179  PVStructure::shared_pointer const & pvResponse)
180 {
181  PvaClientRPCRequesterPtr req = pvaClientRPCRequester.lock();
182  {
183  Lock xx(mutex);
184  requestStatus = status;
185  if(PvaClient::getDebug()) {
186  string channelName("disconnected");
187  Channel::shared_pointer chan(channel.lock());
188  if(chan) channelName = chan->getChannelName();
189  cout << "PvaClientRPC::requestDone"
190  << " channelName " << channelName
191  << endl;
192  }
193  if(rpcState!=rpcActive) {
194  string channelName("disconnected");
195  Channel::shared_pointer chan(channel.lock());
196  if(chan) channelName = chan->getChannelName();
197  string message = "channel "
198  + channelName
199  +" PvaClientRPC::requestDone"
200  + " but not active";
201  throw std::runtime_error(message);
202  }
203  if(req && (responseTimeout<=0.0)) {
204  rpcState = rpcIdle;
205  } else {
206  rpcState = rpcComplete;
207  if(!req) this->pvResponse = pvResponse;
208  waitForDone.signal();
209  }
210  }
211  if(req) {
212  req->requestDone(status,shared_from_this(),pvResponse);
213  }
214 }
215 
216 void PvaClientRPC::connect()
217 {
218  if(PvaClient::getDebug()) cout << "PvaClientRPC::connect\n";
219  issueConnect();
220  Status status = waitConnect();
221  if(status.isOK()) return;
222  Channel::shared_pointer chan(channel.lock());
223  string channelName("disconnected");
224  if(chan) channelName = chan->getChannelName();
225  string message = string("channel ")
226  + channelName
227  + " PvaClientRPC::connect "
228  + status.getMessage();
230 }
231 
232 void PvaClientRPC::issueConnect()
233 {
234  if(PvaClient::getDebug()) cout << "PvaClientRPC::issueConnect\n";
235  Channel::shared_pointer chan(channel.lock());
236  if(connectState!=connectIdle) {
237  string channelName("disconnected");
238  if(chan) channelName = chan->getChannelName();
239  string message = string("channel ")
240  + channelName
241  + " pvaClientRPC already connected ";
242  throw std::runtime_error(message);
243  }
244  if(chan) {
245  connectState = connectActive;
246  channelRPC = chan->createChannelRPC(rpcRequester,pvRequest);
247  return;
248  }
249  throw std::runtime_error("PvaClientRPC::issueConnect() but channel disconnected");
250 }
251 
252 Status PvaClientRPC::waitConnect()
253 {
254  if(PvaClient::getDebug()) cout << "PvaClientRPC::waitConnect\n";
255  if(connectState==connected) {
256  if(!connectStatus.isOK()) connectState = connectIdle;
257  return connectStatus;
258  }
259  if(connectState!=connectActive) {
260  Channel::shared_pointer chan(channel.lock());
261  string channelName("disconnected");
262  if(chan) channelName = chan->getChannelName();
263  string message = string("channel ")
264  + channelName
265  + " PvaClientRPC::waitConnect illegal connect state ";
266  throw std::runtime_error(message);
267  }
268  if(PvaClient::getDebug()) {
269  cout << "PvaClientRPC::waitConnect calling waitForConnect.wait\n";
270  }
271  waitForConnect.wait();
272  connectState = connectStatus.isOK() ? connected : connectIdle;
273  if(PvaClient::getDebug()) {
274  cout << "PvaClientRPC::waitConnect"
275  << " connectStatus " << (connectStatus.isOK() ? "connected" : "not connected");
276  }
277  return connectStatus;
278 }
279 
280 PVStructure::shared_pointer PvaClientRPC::request(PVStructure::shared_pointer const & pvArgument)
281 {
282  checkRPCState();
283  {
284  Lock xx(mutex);
285  if(rpcState!=rpcIdle) {
286  Channel::shared_pointer chan(channel.lock());
287  string channelName("disconnected");
288  if(chan) channelName = chan->getChannelName();
289  string message = "channel "
290  + channelName
291  + " PvaClientRPC::request request aleady active ";
292  throw std::runtime_error(message);
293  }
294  rpcState = rpcActive;
295  }
296  channelRPC->request(pvArgument);
297  if(responseTimeout>0.0) {
298  waitForDone.wait(responseTimeout);
299  } else {
300  waitForDone.wait();
301  }
302  Lock xx(mutex);
303  if(rpcState!=rpcComplete) {
304  Channel::shared_pointer chan(channel.lock());
305  string channelName("disconnected");
306  if(chan) channelName = chan->getChannelName();
307  string message = "channel "
308  + channelName
309  + " PvaClientRPC::request request timeout ";
311  }
312  rpcState = rpcIdle;
313  if(requestStatus.isOK()) return pvResponse;
314  Channel::shared_pointer chan(channel.lock());
315  string channelName("disconnected");
316  if(chan) channelName = chan->getChannelName();
317  string message = "channel "
318  + channelName
319  + " PvaClientRPC::request status ";
320  message += requestStatus.getMessage();
322 }
323 
324 
326  PVStructure::shared_pointer const & pvArgument,
327  PvaClientRPCRequesterPtr const & pvaClientRPCRequester)
328 {
329  checkRPCState();
330  this->pvaClientRPCRequester = pvaClientRPCRequester;
331  if(responseTimeout<=0.0) {
332  {
333  Lock xx(mutex);
334  if(rpcState!=rpcIdle) {
335  Channel::shared_pointer chan(channel.lock());
336  string channelName("disconnected");
337  if(chan) channelName = chan->getChannelName();
338  string message = "channel "
339  + channelName
340  + " PvaClientRPC::request request aleady active ";
341  throw std::runtime_error(message);
342  }
343  rpcState = rpcActive;
344  }
345  channelRPC->request(pvArgument);
346  return;
347  }
348  request(pvArgument);
349 }
350 
351 
352 }}
std::tr1::shared_ptr< PvaClient > PvaClientPtr
Definition: pvaClient.h:46
std::string request
virtual void message(std::string const &message, epics::pvData::MessageType messageType)
pvd::Status status
Definition: memory.hpp:41
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
std::tr1::shared_ptr< const Structure > StructureConstPtr
Definition: pvIntrospect.h:162
A lock for multithreading.
Definition: lock.h:36
An easy to use alternative to RPC.
Definition: pvaClient.h:1736
const std::string & getMessage() const
Definition: status.h:80
virtual void requestDone(const Status &status, ChannelRPC::shared_pointer const &channelRPC, PVStructure::shared_pointer const &pvResponse)
Holds all PVA related.
Definition: pvif.h:34
virtual std::string getRequesterName()
pvData
Definition: monitor.h:428
epicsMutex mutex
Definition: pvAccess.cpp:71
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
std::tr1::shared_ptr< PVStructure > PVStructurePtr
Definition: pvData.h:87
std::tr1::shared_ptr< RPCRequesterImpl > RPCRequesterImplPtr
Definition: pvaClient.h:1728
virtual void channelRPCConnect(const epics::pvData::Status &status, ChannelRPC::shared_pointer const &channelRPC)
std::tr1::shared_ptr< PvaClientRPC > PvaClientRPCPtr
Definition: pvaClient.h:90
bool isOK() const
Definition: status.h:95
std::tr1::shared_ptr< PvaClientRPCRequester > PvaClientRPCRequesterPtr
Definition: pvaClient.h:92
RPCRequesterImpl(PvaClientRPCPtr const &pvaClientRPC, PvaClientPtr const &pvaClient)
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648