This is Unofficial EPICS BASE Doxygen Site
MonitorCacheEntry Struct Reference

#include "chancache.h"

+ Inheritance diagram for MonitorCacheEntry:
+ Collaboration diagram for MonitorCacheEntry:

Public Types

typedef std::vector< epicsUInt8pvrequest_t
 
typedef weak_set< MonitorUserinterested_t
 
- Public Types inherited from epics::pvAccess::MonitorRequester
typedef Monitor operation_type
 

Public Member Functions

 POINTER_DEFINITIONS (MonitorCacheEntry)
 
epicsMutex & mutex () const
 
 MonitorCacheEntry (ChannelCacheEntry *ent, const epics::pvData::PVStructure::shared_pointer &pvr)
 
virtual ~MonitorCacheEntry ()
 
virtual void monitorConnect (epics::pvData::Status const &status, epics::pvData::MonitorPtr const &monitor, epics::pvData::StructureConstPtr const &structure)
 
virtual void monitorEvent (epics::pvData::MonitorPtr const &monitor)
 
virtual void unlisten (epics::pvData::MonitorPtr const &monitor)
 
virtual std::string getRequesterName ()
 
- Public Member Functions inherited from epics::pvAccess::MonitorRequester
 POINTER_DEFINITIONS (MonitorRequester)
 
virtual ~MonitorRequester ()
 
virtual void monitorConnect (epics::pvData::Status const &status, MonitorPtr const &monitor, epics::pvData::StructureConstPtr const &structure)=0
 
virtual void monitorEvent (MonitorPtr const &monitor)=0
 
virtual void unlisten (MonitorPtr const &monitor)=0
 
- Public Member Functions inherited from epics::pvAccess::ChannelBaseRequester
 POINTER_DEFINITIONS (ChannelBaseRequester)
 
 ChannelBaseRequester ()
 
virtual ~ChannelBaseRequester ()
 
virtual void channelDisconnect (bool destroy)
 

Public Attributes

weak_pointer weakref
 
ChannelCacheEntry *const chan
 
const size_t bufferSize
 
bool havedata
 
bool done
 
size_t nwakeups
 
size_t nevents
 
epics::pvData::StructureConstPtr typedesc
 
epics::pvData::MonitorElement::shared_pointer lastelem
 
epics::pvData::MonitorPtr mon
 
epics::pvData::Status startresult
 
interested_t interested
 

Static Public Attributes

static size_t num_instances
 
- Static Public Attributes inherited from epics::pvAccess::ChannelBaseRequester
static size_t num_instances
 

Detailed Description

Definition at line 24 of file chancache.h.

Member Typedef Documentation

Definition at line 52 of file chancache.h.

Definition at line 37 of file chancache.h.

Constructor & Destructor Documentation

MonitorCacheEntry::MonitorCacheEntry ( ChannelCacheEntry ent,
const epics::pvData::PVStructure::shared_pointer &  pvr 
)

Definition at line 34 of file moncache.cpp.

35  :chan(ent)
36  ,bufferSize(getS<pvd::uint32>(pvr, "record._options.queueSize", 2)) // should be same default as pvAccess, but not required
37  ,havedata(false)
38  ,done(false)
39  ,nwakeups(0)
40  ,nevents(0)
41 {
43 }
ChannelCacheEntry *const chan
Definition: chancache.h:30
const size_t bufferSize
Definition: chancache.h:32
static size_t num_instances
Definition: chancache.h:27
EPICS_ATOMIC_INLINE size_t epicsAtomicIncrSizeT(size_t *pTarget)
MonitorCacheEntry::~MonitorCacheEntry ( )
virtual

Definition at line 45 of file moncache.cpp.

46 {
47  pvd::Monitor::shared_pointer M;
48  M.swap(mon);
49  if(M) {
50  M->destroy();
51  }
53  const_cast<ChannelCacheEntry*&>(chan) = NULL; // spoil to fault use after free
54 }
epics::pvData::MonitorPtr mon
Definition: chancache.h:49
ChannelCacheEntry *const chan
Definition: chancache.h:30
#define NULL
Definition: catime.c:38
EPICS_ATOMIC_INLINE size_t epicsAtomicDecrSizeT(size_t *pTarget)
static size_t num_instances
Definition: chancache.h:27

Member Function Documentation

std::string MonitorCacheEntry::getRequesterName ( )
virtual

Definition at line 239 of file moncache.cpp.

240 {
241  return "MonitorCacheEntry";
242 }
void MonitorCacheEntry::monitorConnect ( epics::pvData::Status const &  status,
epics::pvData::MonitorPtr const &  monitor,
epics::pvData::StructureConstPtr const &  structure 
)
virtual

Definition at line 57 of file moncache.cpp.

60 {
62  {
63  Guard G(mutex());
64  if(typedesc) {
65  // we shouldn't have to deal with monitor type change since we
66  // destroy() Monitors on Channel disconnect.
67  std::cerr<<"monitorConnect() w/ new type. Monitor has outlived it's connection.\n";
68  monitor->stop();
69  //TODO: unlisten()
70  return;
71  }
73 
74  if(status.isSuccess()) {
75  startresult = monitor->start();
76  } else {
78  }
79 
80  if(startresult.isSuccess()) {
81  lastelem.reset(new pvd::MonitorElement(pvd::getPVDataCreate()->createPVStructure(structure)));
82  }
83 
84  // set typedesc and startresult for futured MonitorUsers
85  // and copy snapshot of already interested MonitorUsers
86  tonotify = interested.lock_vector();
87  }
88 
89  if(!startresult.isSuccess())
90  std::cout<<"upstream monitor start() fails\n";
91 
92  shared_pointer self(weakref); // keeps us alive all MonitorUsers are destroy()ed
93 
94  for(interested_t::vector_type::const_iterator it = tonotify.begin(),
95  end = tonotify.end(); it!=end; ++it)
96  {
97  pvd::MonitorRequester::shared_pointer req((*it)->req);
98  if(req) {
99  req->monitorConnect(startresult, *it, structure);
100  }
101  }
102 }
pvd::Status status
epicsMutex & mutex() const
Definition: chancache.h:35
An element for a monitorQueue.
Definition: monitor.h:54
epics::pvData::Status startresult
Definition: chancache.h:50
weak_pointer weakref
Definition: chancache.h:28
interested_t interested
Definition: chancache.h:53
epics::pvData::StructureConstPtr typedesc
Definition: chancache.h:44
vector_type lock_vector() const
Definition: weakset.h:268
bool isSuccess() const
Definition: status.h:103
std::vector< value_pointer > vector_type
Definition: weakset.h:64
epics::pvData::MonitorElement::shared_pointer lastelem
Definition: chancache.h:48
FORCE_INLINE const PVDataCreatePtr & getPVDataCreate()
Definition: pvData.h:1648
void MonitorCacheEntry::monitorEvent ( epics::pvData::MonitorPtr const &  monitor)
virtual

Definition at line 110 of file moncache.cpp.

111 {
112  /* PVA is being tricky, the Monitor* passed to monitorConnect()
113  * isn't the same one we see here!
114  * The original was a ChannelMonitorImpl, we now see a MonitorStrategyQueue
115  * owned by the original, which delegates deserialization and accumulation
116  * of deltas into complete events for us.
117  * However, we don't want to keep the MonitorStrategyQueue as it's
118  * destroy() method is a no-op!
119  */
120 
122 
123  shared_pointer self(weakref); // keeps us alive in case all MonitorUsers are destroy()ed
124 
125  pva::MonitorElementPtr update;
126 
127  typedef std::vector<MonitorUser::shared_pointer> dsnotify_t;
128  dsnotify_t dsnotify;
129 
130  {
131  Guard G(mutex()); // MCE and MU guarded by the same mutex
132  if(!havedata)
133  havedata = true;
134 
135  //TODO: flow control, if all MU buffers are full, break before poll()==NULL
136  while((update=monitor->poll()))
137  {
139 
140  lastelem->pvStructurePtr->copyUnchecked(*update->pvStructurePtr,
141  *update->changedBitSet);
142  *lastelem->changedBitSet = *update->changedBitSet;
143  *lastelem->overrunBitSet = *update->overrunBitSet;
144  monitor->release(update);
145  update.reset();
146 
147  interested_t::iterator IIT(interested); // recursively locks interested.mutex() (assumes this->mutex() is interestd.mutex())
148  for(interested_t::value_pointer pusr = IIT.next(); pusr; pusr = IIT.next())
149  {
150  MonitorUser *usr = pusr.get();
151 
152  {
153  Guard G(usr->mutex());
154  if(usr->initial)
155  continue; // no start() yet
156  // TODO: track overflow when !running (after stop())?
157  if(!usr->running || usr->empty.empty()) {
158  usr->inoverflow = true;
159 
160  /* overrun |= lastelem->overrun // upstream overflows
161  * overrun |= changed & lastelem->changed // downstream overflows
162  * changed |= lastelem->changed // accumulate changes
163  */
164 
165  *usr->overflowElement->overrunBitSet |= *lastelem->overrunBitSet;
166  usr->overflowElement->overrunBitSet->or_and(*usr->overflowElement->changedBitSet,
167  *lastelem->changedBitSet);
168  *usr->overflowElement->changedBitSet |= *lastelem->changedBitSet;
169 
170  usr->overflowElement->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr,
171  *lastelem->changedBitSet);
172 
174  continue;
175  }
176  // we only come out of overflow when downstream release()s an element to us
177  // empty.empty() does not imply inoverflow,
178  // however inoverflow does imply empty.empty()
179  assert(!usr->inoverflow);
180 
181  if(usr->filled.empty())
182  dsnotify.push_back(pusr);
183 
184  pvd::MonitorElementPtr elem(usr->empty.front());
185 
186  *elem->overrunBitSet = *lastelem->overrunBitSet;
187  *elem->changedBitSet = *lastelem->changedBitSet;
188  // Note: can't use changed mask to optimize this copy since we don't know
189  // the state of the free element
190  elem->pvStructurePtr->copyUnchecked(*lastelem->pvStructurePtr);
191 
192  usr->filled.push_back(elem);
193  usr->empty.pop_front();
194 
196  }
197  }
198  }
199  }
200 
201  // unlock here, race w/ stop(), unlisten()?
202  //TODO: notify from worker thread
203 
204  FOREACH(dsnotify_t::iterator, it,end,dsnotify) {
205  MonitorUser *usr = (*it).get();
206  pvd::MonitorRequester::shared_pointer req(usr->req);
208  req->monitorEvent(*it); // notify when first item added to empty queue, may call poll(), release(), and others
209  }
210 }
epics::pvData::MonitorRequester::weak_pointer req
Definition: chancache.h:76
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define FOREACH(ITERTYPE, IT, END, C)
Definition: helper.h:6
bool running
Definition: chancache.h:81
epicsMutex & mutex() const
Definition: chancache.h:35
std::deque< epics::pvData::MonitorElementPtr > empty
Definition: chancache.h:87
size_t nwakeups
Definition: chancache.h:83
epics::pvData::MonitorElementPtr overflowElement
Definition: chancache.h:90
weak_pointer weakref
Definition: chancache.h:28
interested_t interested
Definition: chancache.h:53
bool initial
Definition: chancache.h:80
size_t nevents
Definition: chancache.h:84
epicsMutex & mutex() const
Definition: chancache.h:73
std::tr1::shared_ptr< MonitorElement > MonitorElementPtr
Definition: monitor.h:40
std::tr1::shared_ptr< MonitorUser > value_pointer
Definition: weakset.h:61
EPICS_ATOMIC_INLINE size_t epicsAtomicIncrSizeT(size_t *pTarget)
epics::pvData::MonitorElement::shared_pointer lastelem
Definition: chancache.h:48
bool inoverflow
Definition: chancache.h:82
void * usr
Definition: cadef.h:85
size_t ndropped
Definition: chancache.h:85
std::deque< epics::pvData::MonitorElementPtr > filled
Definition: chancache.h:87
epicsMutex& MonitorCacheEntry::mutex ( ) const
inline

Definition at line 35 of file chancache.h.

35 { return interested.mutex(); }
epicsMutex & mutex() const
Definition: weakset.h:199
interested_t interested
Definition: chancache.h:53
MonitorCacheEntry::POINTER_DEFINITIONS ( MonitorCacheEntry  )
void MonitorCacheEntry::unlisten ( epics::pvData::MonitorPtr const &  monitor)
virtual

Definition at line 214 of file moncache.cpp.

215 {
216  pvd::Monitor::shared_pointer M;
217  interested_t::vector_type tonotify;
218  {
219  Guard G(mutex());
220  M.swap(mon);
221  tonotify = interested.lock_vector();
222  // assume that upstream won't call monitorEvent() again
223 
224  // cause future downstream start() to error
225  startresult = pvd::Status(pvd::Status::STATUSTYPE_ERROR, "upstream unlisten()");
226  }
227  if(M) {
228  M->destroy();
229  }
230  FOREACH(interested_t::vector_type::iterator, it, end, tonotify) {
231  MonitorUser *usr = it->get();
232  pvd::MonitorRequester::shared_pointer req(usr->req);
233  if(usr->inuse.empty()) // TODO: what about stopped?
234  req->unlisten(*it);
235  }
236 }
epics::pvData::MonitorRequester::weak_pointer req
Definition: chancache.h:76
epics::pvData::MonitorPtr mon
Definition: chancache.h:49
#define FOREACH(ITERTYPE, IT, END, C)
Definition: helper.h:6
epicsMutex & mutex() const
Definition: chancache.h:35
epics::pvData::Status startresult
Definition: chancache.h:50
interested_t interested
Definition: chancache.h:53
vector_type lock_vector() const
Definition: weakset.h:268
std::vector< value_pointer > vector_type
Definition: weakset.h:64
void * usr
Definition: cadef.h:85
std::set< epics::pvData::MonitorElementPtr > inuse
Definition: chancache.h:88

Member Data Documentation

const size_t MonitorCacheEntry::bufferSize

Definition at line 32 of file chancache.h.

ChannelCacheEntry* const MonitorCacheEntry::chan

Definition at line 30 of file chancache.h.

bool MonitorCacheEntry::done

Definition at line 40 of file chancache.h.

bool MonitorCacheEntry::havedata

Definition at line 39 of file chancache.h.

interested_t MonitorCacheEntry::interested

Definition at line 53 of file chancache.h.

epics::pvData::MonitorElement::shared_pointer MonitorCacheEntry::lastelem

value of upstream monitor (accumulation of all deltas) changed/overflow bit masks of last delta

Definition at line 48 of file chancache.h.

epics::pvData::MonitorPtr MonitorCacheEntry::mon

Definition at line 49 of file chancache.h.

size_t MonitorCacheEntry::nevents

Definition at line 42 of file chancache.h.

size_t MonitorCacheEntry::num_instances
static

Definition at line 27 of file chancache.h.

size_t MonitorCacheEntry::nwakeups

Definition at line 41 of file chancache.h.

epics::pvData::Status MonitorCacheEntry::startresult

Definition at line 50 of file chancache.h.

epics::pvData::StructureConstPtr MonitorCacheEntry::typedesc

Definition at line 44 of file chancache.h.

weak_pointer MonitorCacheEntry::weakref

Definition at line 28 of file chancache.h.


The documentation for this struct was generated from the following files: