This is Unofficial EPICS BASE Doxygen Site
chancache.cpp
Go to the documentation of this file.
1 #include <stdio.h>
2 
3 #include <epicsAtomic.h>
4 #include <errlog.h>
5 
6 #include <epicsMutex.h>
7 #include <epicsTimer.h>
8 
9 #include <pv/epicsException.h>
10 #include <pv/serverContext.h>
11 #include <pv/pvAccess.h>
12 
13 #define epicsExportSharedSymbols
14 #include "pva2pva.h"
15 #include "helper.h"
16 #include "chancache.h"
17 #include "channel.h"
18 
19 namespace pvd = epics::pvData;
20 namespace pva = epics::pvAccess;
21 
23 
25  :channelName(n), cache(c), dropPoke(true)
26 {
28 }
29 
31 {
32  // Should *not* be holding cache->cacheLock
33  if(channel.get())
34  channel->destroy(); // calls channelStateChange() w/ DESTROY
36 }
37 
38 std::string
40 {
41  return "GWClient";
42 }
43 
45 
46 ChannelCacheEntry::CRequester::CRequester(const ChannelCacheEntry::shared_pointer& p)
47  :chan(p)
48 {
50 }
51 
53 {
55 }
56 
57 // for ChannelRequester
58 void
60  pva::Channel::shared_pointer const & channel)
61 {}
62 
63 void
64 ChannelCacheEntry::CRequester::channelStateChange(pva::Channel::shared_pointer const & channel,
65  pva::Channel::ConnectionState connectionState)
66 {
67  ChannelCacheEntry::shared_pointer chan(this->chan.lock());
68  if(!chan)
69  return;
70 
71  {
72  Guard G(chan->cache->cacheLock);
73 
74  assert(chan->channel.get()==channel.get());
75 
76  switch(connectionState)
77  {
80  // Drop from cache
81  chan->cache->entries.erase(chan->channelName);
82  // keep 'chan' as a reference so that actual destruction doesn't happen which cacheLock is held
83  break;
84  default:
85  break;
86  }
87  }
88 
89  // fanout notification
90  ChannelCacheEntry::interested_t::vector_type interested(chan->interested.lock_vector()); // Copy
91 
92  FOREACH(ChannelCacheEntry::interested_t::vector_type::const_iterator, it, end, interested)
93  {
94  GWChannel *chan = it->get();
95  pva::ChannelRequester::shared_pointer req(chan->requester.lock());
96  if(req)
97  req->channelStateChange(*it, connectionState);
98  }
99 }
100 
101 
102 struct ChannelCache::cacheClean : public epicsTimerNotify
103 {
105  cacheClean(ChannelCache *c) : cache(c) {}
106  epicsTimerNotify::expireStatus expire(const epicsTime &currentTime)
107  {
108  // keep a reference to any cache entrys being removed so they
109  // aren't destroyed while cacheLock is held
110  std::set<ChannelCacheEntry::shared_pointer> cleaned;
111 
112  {
113  Guard G(cache->cacheLock);
114  cache->cleanerRuns++;
115 
116  ChannelCache::entries_t::iterator cur=cache->entries.begin(), next, end=cache->entries.end();
117  while(cur!=end) {
118  next = cur;
119  ++next;
120 
121  if(!cur->second->dropPoke && cur->second->interested.empty()) {
122  cleaned.insert(cur->second);
123  cache->entries.erase(cur);
124  cache->cleanerDust++;
125  } else {
126  cur->second->dropPoke = false;
127  }
128 
129  cur = next;
130  }
131  }
132  return epicsTimerNotify::expireStatus(epicsTimerNotify::restart, 30.0);
133  }
134 };
135 
136 ChannelCache::ChannelCache(const pva::ChannelProvider::shared_pointer& prov)
137  :provider(prov)
138  ,timerQueue(&epicsTimerQueueActive::allocate(1, epicsThreadPriorityCAServerLow-2))
139  ,cleaner(new cacheClean(this))
140  ,cleanerRuns(0)
141  ,cleanerDust(0)
142 {
143  if(!provider)
144  throw std::logic_error("Missing 'pva' provider");
147  cleanTimer->start(*cleaner, 30.0);
148 }
149 
151 {
152  entries_t E;
153  {
154  Guard G(cacheLock);
155 
156  cleanTimer->destroy();
157  timerQueue->release();
158  delete cleaner;
159 
160  entries_t E;
161  E.swap(entries);
162  }
163 }
164 
165 ChannelCacheEntry::shared_pointer
166 ChannelCache::lookup(const std::string& newName)
167 {
168  ChannelCacheEntry::shared_pointer ret;
169 
170  Guard G(cacheLock);
171 
172  entries_t::const_iterator it = entries.find(newName);
173 
174  if(it==entries.end()) {
175  // first request, create ChannelCacheEntry
176  //TODO: async lookup
177 
178  ChannelCacheEntry::shared_pointer ent(new ChannelCacheEntry(this, newName));
179  ent->requester.reset(new ChannelCacheEntry::CRequester(ent));
180 
181  entries[newName] = ent;
182 
183  pva::Channel::shared_pointer M;
184  {
185  // unlock to call createChannel()
187 
188  M = provider->createChannel(newName, ent->requester);
189  if(!M)
190  THROW_EXCEPTION2(std::runtime_error, "Failed to createChannel");
191  }
192  ent->channel = M;
193 
194  if(M->isConnected())
195  ret = ent; // immediate connect, mostly for unit-tests (thus delayed connect not covered)
196 
197  } else if(it->second->channel && it->second->channel->isConnected()) {
198  // another request, and hey we're connected this time
199 
200  ret = it->second;
201  it->second->dropPoke = true;
202 
203  } else {
204  // not connected yet, but a client is still interested
205  it->second->dropPoke = true;
206  }
207 
208  return ret;
209 }
entries_t entries
Definition: chancache.h:160
epicsTimer * cleanTimer
Definition: chancache.h:165
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epics::pvAccess::Channel::shared_pointer channel
Definition: chancache.h:117
size_t cleanerRuns
Definition: chancache.h:168
#define FOREACH(ITERTYPE, IT, END, C)
Definition: helper.h:6
pvd::Status status
#define THROW_EXCEPTION2(TYPE, MSG)
virtual void channelCreated(const epics::pvData::Status &status, epics::pvAccess::Channel::shared_pointer const &channel)
Definition: chancache.cpp:59
virtual std::string getRequesterName()
Definition: chancache.cpp:39
#define true
Definition: flexdef.h:84
char * cache
Definition: reader.c:18
epicsMutex cacheLock
Definition: chancache.h:158
epics::pvAccess::ChannelProvider::shared_pointer provider
Definition: chancache.h:162
ChannelCache(const epics::pvAccess::ChannelProvider::shared_pointer &prov)
Definition: chancache.cpp:136
epicsTimer & createTimer()
Definition: timerQueue.cpp:204
interested_t interested
Definition: chancache.h:123
ChannelCacheEntry::weak_pointer chan
Definition: chancache.h:140
virtual void channelStateChange(epics::pvAccess::Channel::shared_pointer const &channel, epics::pvAccess::Channel::ConnectionState connectionState)
Definition: chancache.cpp:64
EPICS_ATOMIC_INLINE size_t epicsAtomicDecrSizeT(size_t *pTarget)
virtual ~ChannelCacheEntry()
Definition: chancache.cpp:30
Holds all PVA related.
Definition: pvif.h:34
pvData
Definition: monitor.h:428
APIs for the epicsMutex mutual exclusion semaphore.
ChannelCache * cache
Definition: chancache.cpp:104
char * allocate(unsigned int n)
Definition: antelope.c:230
size_t cleanerDust
Definition: chancache.h:169
std::map< std::string, ChannelCacheEntry::shared_pointer > entries_t
Definition: chancache.h:155
const requester_type::weak_pointer requester
Definition: channel.h:17
#define epicsThreadPriorityCAServerLow
Definition: epicsThread.h:80
ChannelCacheEntry::shared_pointer lookup(const std::string &name)
Definition: chancache.cpp:166
static size_t num_instances
Definition: chancache.h:108
std::vector< value_pointer > vector_type
Definition: weakset.h:64
epicsTimerNotify::expireStatus expire(const epicsTime &currentTime)
Definition: chancache.cpp:106
cacheClean * cleaner
Definition: chancache.h:166
CRequester(const ChannelCacheEntry::shared_pointer &p)
Definition: chancache.cpp:46
EPICS_ATOMIC_INLINE size_t epicsAtomicIncrSizeT(size_t *pTarget)
cacheClean(ChannelCache *c)
Definition: chancache.cpp:105
ChannelCacheEntry(ChannelCache *, const std::string &n)
Definition: chancache.cpp:24