This is Unofficial EPICS BASE Doxygen Site
client.cpp
Go to the documentation of this file.
1 /*
2  * Copyright information and license terms for this software can be
3  * found in the file LICENSE that is included with the distribution
4  */
5 
6 #include <typeinfo>
7 
8 #include <epicsMutex.h>
9 #include <epicsGuard.h>
10 #include <epicsEvent.h>
11 #include <epicsThread.h>
12 
13 #include <pv/pvData.h>
14 #include <pv/bitSet.h>
15 #include <pv/reftrack.h>
16 #include <pv/epicsException.h>
17 
18 #define epicsExportSharedSymbols
19 #include "pv/logger.h"
20 #include "clientpvt.h"
21 #include "pv/pvAccess.h"
22 #include "pv/configuration.h"
23 
24 namespace pvd = epics::pvData;
25 namespace pva = epics::pvAccess;
26 
29 
30 namespace pvac {
31 
33  :std::runtime_error("Timeout")
34 {}
35 
37  public pvac::detail::wrapped_shared_from_this<ClientChannel::Impl>
38 {
40  pva::Channel::shared_pointer channel;
41  // assume few listeners per channel, store in vector
42  typedef std::vector<ClientChannel::ConnectCallback*> listeners_t;
43  listeners_t listeners;
46 
47  static size_t num_instances;
48 
49  Impl() :listeners_inprogress(false) {REFTRACE_INCREMENT(num_instances);}
50  virtual ~Impl() {REFTRACE_DECREMENT(num_instances);}
51 
52  // called automatically via wrapped_shared_from_this
53  void cancel()
54  {
55  // ClientChannel destroy implicitly removes all callbacks,
56  // but doesn't destroy the Channel or cancel Operations
57  Guard G(mutex);
58  while(listeners_inprogress) {
59  UnGuard U(G);
60  listeners_done.wait();
61  }
62  listeners.clear();
63  }
64 
65  virtual std::string getRequesterName() OVERRIDE FINAL { return "ClientChannel::Impl"; }
66 
67  virtual void channelCreated(const pvd::Status& status, pva::Channel::shared_pointer const & channel) OVERRIDE FINAL {}
68 
69  virtual void channelStateChange(pva::Channel::shared_pointer const & channel, pva::Channel::ConnectionState connectionState) OVERRIDE FINAL
70  {
71  listeners_t notify;
72  {
73  Guard G(mutex);
74  notify = listeners; // copy vector
75  listeners_inprogress = true;
76  }
77  try {
78  ConnectEvent evt;
79  evt.connected = connectionState==pva::Channel::CONNECTED;
80  if(evt.connected)
81  evt.peerName = channel->getRemoteAddress();
82  for(listeners_t::const_iterator it=notify.begin(), end=notify.end(); it!=end; ++it)
83  {
84  try {
85  (*it)->connectEvent(evt);
86  }catch(std::exception& e){
87  LOG(pva::logLevelError, "Unhandled exception in connection state listener: %s\n", e.what());
88 
89  Guard G(mutex);
90  for(listeners_t::iterator it2=listeners.begin(), end2=listeners.end(); it2!=end2; ++it2) {
91  if(*it==*it2) {
92  listeners.erase(it2);
93  break;
94  }
95  }
96  }
97  }
98  }catch(...){
99  {
100  Guard G(mutex);
101  listeners_inprogress = false;
102  }
103  listeners_done.signal();
104  }
105  {
106  Guard G(mutex);
107  listeners_inprogress = false;
108  }
109  listeners_done.signal();
110  }
111 };
112 
114 
116  :priority(0)
117  ,address()
118 {}
119 
121 {
122  return priority<O.priority || (priority==O.priority && address<O.address);
123 }
124 
125 Operation::Operation(const std::tr1::shared_ptr<Impl>& i)
126  :impl(i)
127 {}
128 
130 
131 std::string Operation::name() const
132 {
133  return impl ? impl->name() : "<NULL>";
134 }
135 
137 {
138  if(impl) impl->cancel();
139 }
140 
141 ClientChannel::ClientChannel(const std::tr1::shared_ptr<pva::ChannelProvider>& provider,
142  const std::string& name,
143  const Options& opt)
144  :impl(Impl::build())
145 {
146  if(name.empty())
147  THROW_EXCEPTION2(std::logic_error, "empty channel name not allowed");
148  if(!provider)
149  THROW_EXCEPTION2(std::logic_error, "NULL ChannelProvider");
150  impl->channel = provider->createChannel(name, impl->internal_shared_from_this(),
151  opt.priority, opt.address);
152  if(!impl->channel)
153  throw std::runtime_error("ChannelProvider failed to create Channel");
154 }
155 
157 
158 std::string ClientChannel::name() const
159 {
160  return impl ? impl->channel->getChannelName() : std::string();
161 }
162 
164 {
165  if(!impl) throw std::logic_error("Dead Channel");
166  ConnectEvent evt;
167  {
168  Guard G(impl->mutex);
169 
170  for(Impl::listeners_t::const_iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it)
171  {
172  if(cb==*it) return; // no duplicates
173  }
174  impl->listeners.push_back(cb);
175  evt.connected = impl->channel->isConnected();
176  }
177  try{
178  cb->connectEvent(evt);
179  }catch(...){
180  removeConnectListener(cb);
181  throw;
182  }
183 }
184 
186 {
187  if(!impl) throw std::logic_error("Dead Channel");
188  Guard G(impl->mutex);
189 
190  // ensure no in-progress callbacks
191  while(impl->listeners_inprogress) {
192  UnGuard U(G);
193  impl->listeners_done.wait();
194  }
195 
196  for(Impl::listeners_t::iterator it=impl->listeners.begin(), end=impl->listeners.end(); it!=end; ++it)
197  {
198  if(cb==*it) {
199  impl->listeners.erase(it);
200  return;
201  }
202  }
203 }
204 
205 
206 void ClientChannel::show(std::ostream& strm) const
207 {
208  if(impl) {
209  strm<<typeid(*impl->channel.get()).name()<<" : ";
210  impl->channel->printInfo(strm);
211  } else {
212  strm<<"NULL Channel";
213  }
214 }
215 
216 static
217 void register_reftrack()
218 {
219  static volatile int done;
220  if(done) return;
221  done = 1;
222  // done is an optimization, duplicate calls to registerRef* are no-ops
229 }
230 
231 std::tr1::shared_ptr<epics::pvAccess::Channel>
232 ClientChannel::getChannel()
233 { return impl->channel; }
234 
236 {
237  static size_t num_instances;
238  Impl() {register_reftrack(); REFTRACE_INCREMENT(num_instances);}
239  ~Impl() {REFTRACE_DECREMENT(num_instances);}
240 
241  pva::ChannelProvider::shared_pointer provider;
242 
244  typedef std::map<std::pair<std::string, ClientChannel::Options>, std::tr1::weak_ptr<ClientChannel::Impl> > channels_t;
245  channels_t channels;
246 };
247 
249 
250 ClientProvider::ClientProvider(const std::string& providerName,
251  const std::tr1::shared_ptr<epics::pvAccess::Configuration>& conf)
252  :impl(new Impl)
253 {
254  std::string name;
255  pva::ChannelProviderRegistry::shared_pointer reg;
256 
257  if(strncmp("server:", providerName.c_str(), 7)==0) {
258  name = providerName.substr(7);
260  } else if(strncmp("client:", providerName.c_str(), 7)==0) {
261  name = providerName.substr(7);
263  } else {
264  name = providerName;
266  }
267  impl->provider = reg->createProvider(name,
268  conf ? conf : pva::ConfigurationBuilder()
269  .push_env()
270  .build());
271 
272  if(!impl->provider)
273  THROW_EXCEPTION2(std::invalid_argument, providerName);
274 }
275 
276 ClientProvider::ClientProvider(const std::tr1::shared_ptr<epics::pvAccess::ChannelProvider>& provider)
277  :impl(new Impl)
278 {
279  impl->provider = provider;
280 
281  if(!impl->provider)
282  THROW_EXCEPTION2(std::invalid_argument, "null ChannelProvider");
283 }
284 
286 
287 std::string
289 {
290  if(!impl) throw std::logic_error("Dead Provider");
291  return impl->provider->getProviderName();
292 }
293 
295 ClientProvider::connect(const std::string& name,
296  const ClientChannel::Options& conf)
297 {
298  if(!impl) throw std::logic_error("Dead Provider");
299  Guard G(impl->mutex);
300  Impl::channels_t::key_type K(name, conf);
301  Impl::channels_t::iterator it(impl->channels.find(K));
302  if(it!=impl->channels.end()) {
303  // cache hit
304  std::tr1::shared_ptr<ClientChannel::Impl> chan(it->second.lock());
305  if(chan)
306  return ClientChannel(chan);
307  else
308  impl->channels.erase(it); // remove stale
309  }
310  // cache miss
311  ClientChannel ret(impl->provider, name, conf);
312  impl->channels[K] = ret.impl;
313  return ret;
314 }
315 
316 bool ClientProvider::disconnect(const std::string& name,
317  const ClientChannel::Options& conf)
318 {
319  if(!impl) throw std::logic_error("Dead Provider");
320  Guard G(impl->mutex);
321 
322  Impl::channels_t::iterator it(impl->channels.find(std::make_pair(name, conf)));
323  bool found = it!=impl->channels.end();
324  if(found)
325  impl->channels.erase(it);
326  return found;
327 }
328 
330 {
331  if(!impl) throw std::logic_error("Dead Provider");
332  Guard G(impl->mutex);
333  impl->channels.clear();
334 }
335 
336 ::std::ostream& operator<<(::std::ostream& strm, const Operation& op)
337 {
338  if(op.impl) {
339  op.impl->show(strm);
340  } else {
341  strm << "Operation()";
342  }
343  return strm;
344 }
345 
346 ::std::ostream& operator<<(::std::ostream& strm, const ClientChannel& op)
347 {
348  if(op.impl) {
349  strm << "ClientChannel("
350  << typeid(*op.impl->channel.get()).name()<<", "
351  "\"" << op.impl->channel->getChannelName() <<"\", "
352  "\"" << op.impl->channel->getProvider()->getProviderName() <<"\", "
353  "connected="<<(op.impl->channel->isConnected()?"true":"false")
354  <<"\")";
355  } else {
356  strm << "ClientChannel()";
357  }
358  return strm;
359 }
360 
361 ::std::ostream& operator<<(::std::ostream& strm, const ClientProvider& op)
362 {
363  if(op.impl) {
364  strm << "ClientProvider("
365  << typeid(*op.impl->provider.get()).name()<<", "
366  "\""<<op.impl->provider->getProviderName()<<"\")";
367  } else {
368  strm << "ClientProvider()";
369  }
370  return strm;
371 }
372 
373 namespace detail {
374 
376 {
377  epics::registerRefCounter("pvac::ClientChannel::Impl", &ClientChannel::Impl::num_instances);
378  epics::registerRefCounter("pvac::ClientProvider::Impl", &ClientProvider::Impl::num_instances);
379 }
380 
381 }
382 
383 } //namespace pvac
ClientProvider()
Construct a null provider. All methods throw. May later be assigned from a valid ClientProvider.
Definition: client.h:526
static ChannelProviderRegistry::shared_pointer servers()
pvd::Status status
#define THROW_EXCEPTION2(TYPE, MSG)
friend epicsShareFunc::std::ostream & operator<<(::std::ostream &strm, const ClientProvider &op)
Definition: client.cpp:361
void registerRefTrackInfo()
Definition: clientInfo.cpp:117
int i
Definition: scan.c:967
void show(std::ostream &strm) const
Definition: client.cpp:206
void registerRefTrack()
Definition: client.cpp:375
pva::Channel::shared_pointer channel
Definition: client.cpp:40
std::string name() const
Channel name.
Definition: client.cpp:131
static size_t num_instances
Definition: client.cpp:237
Definition: memory.hpp:41
std::string name() const
Channel name or an empty string.
Definition: client.cpp:158
std::tr1::shared_ptr< Impl > impl
Definition: client.h:84
#define OVERRIDE
Definition: pvAccess.h:55
Handle for in-progress get/put/rpc operation.
Definition: client.h:50
void registerRefTrackMonitor()
void registerRefTrackPut()
Definition: clientPut.cpp:233
epicsGuard< epicsMutex > Guard
Definition: client.cpp:27
bool operator<(const Options &) const
Definition: client.cpp:120
Holds all PVA related.
Definition: pvif.h:34
std::string name() const
Definition: client.cpp:288
virtual std::string getRequesterName() OVERRIDE FINAL
Definition: client.cpp:65
bool connected
Is this a connection, or disconnection, event.
Definition: client.h:240
#define LOG(level, format,...)
Definition: logger.h:48
void registerRefTrackGet()
Definition: clientGet.cpp:171
epicsEvent listeners_done
Definition: client.cpp:45
pvData
Definition: monitor.h:428
listeners_t listeners
Definition: client.cpp:43
virtual void connectEvent(const ConnectEvent &evt)=0
APIs for the epicsMutex mutual exclusion semaphore.
std::string peerName
Definition: client.h:243
void cancel()
Definition: client.cpp:136
Connection state change CB.
Definition: client.h:448
static ChannelProviderRegistry::shared_pointer clients()
information on connect/disconnect
Definition: client.h:237
ClientChannel()
Construct a null channel. All methods throw. May later be assigned from a valid ClientChannel.
Definition: client.h:290
pva::ChannelProvider::shared_pointer provider
Definition: client.cpp:241
Central client context.
Definition: client.h:517
void registerRefCounter(const char *name, const size_t *counter)
Definition: reftrack.cpp:59
APIs for the epicsEvent binary semaphore.
virtual void channelStateChange(pva::Channel::shared_pointer const &channel, pva::Channel::ConnectionState connectionState) OVERRIDE FINAL
Definition: client.cpp:69
ClientChannel connect(const std::string &name, const ClientChannel::Options &conf=ClientChannel::Options())
Definition: client.cpp:295
void disconnect()
Clear channel cache.
Definition: client.cpp:329
void done(int k)
Definition: antelope.c:77
epicsGuardRelease< epicsMutex > UnGuard
Definition: client.cpp:28
std::vector< ClientChannel::ConnectCallback * > listeners_t
Definition: client.cpp:42
ChannelPut::shared_pointer op
Definition: pvAccess.cpp:132
void registerRefTrackRPC()
Definition: clientRPC.cpp:179
C++ and C descriptions for a thread.
void removeConnectListener(ConnectCallback *)
Remove from list of listeners.
Definition: client.cpp:185
std::map< std::pair< std::string, ClientChannel::Options >, std::tr1::weak_ptr< ClientChannel::Impl > > channels_t
Definition: client.cpp:244
#define false
Definition: flexdef.h:85
#define FINAL
Definition: pvAccess.h:48
void addConnectListener(ConnectCallback *)
Definition: client.cpp:163
See Client API API.
Definition: client.cpp:30
static size_t num_instances
Definition: client.cpp:47
virtual void channelCreated(const pvd::Status &status, pva::Channel::shared_pointer const &channel) OVERRIDE FINAL
Definition: client.cpp:67
Channel creation options.
Definition: client.h:282