This is Unofficial EPICS BASE Doxygen Site
channelSearchManager.cpp
Go to the documentation of this file.
1 
7 #include <stdlib.h>
8 #include <time.h>
9 #include <vector>
10 
11 #include <epicsMutex.h>
12 
13 #include <pv/serializationHelper.h>
14 #include <pv/timeStamp.h>
15 
16 #define epicsExportSharedSymbols
18 #include <pv/pvaConstants.h>
19 #include <pv/blockingUDP.h>
20 #include <pv/serializeHelper.h>
21 #include <pv/logger.h>
22 
23 using namespace std;
24 using namespace epics::pvData;
25 
26 namespace {
27 namespace pva = epics::pvAccess;
28 
29 class MockTransportSendControl: public pva::TransportSendControl
30 {
31 public:
32  void endMessage() {}
33  void flush(bool /*lastMessageCompleted*/) {}
34  void setRecipient(const osiSockAddr& /*sendTo*/) {}
35  void startMessage(epics::pvData::int8 /*command*/, std::size_t /*ensureCapacity*/, epics::pvData::int32 /*payloadSize*/) {}
36  void ensureBuffer(std::size_t /*size*/) {}
37  void alignBuffer(std::size_t /*alignment*/) {}
38  void flushSerializeBuffer() {}
39  void cachedSerialize(const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer)
40  {
41  // no cache
42  field->serialize(buffer, this);
43  }
44  virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/,
45  std::size_t /*elementCount*/, std::size_t /*elementSize*/)
46  {
47  return false;
48  }
49 };
50 
51 }// namespace
52 
53 namespace epics {
54 namespace pvAccess {
55 
56 // these are byte offset in a CMD_SEARCH request message
57 // used to mangle a buffer to support incremental construction. (ick!!!)
58 static const int DATA_COUNT_POSITION = PVA_MESSAGE_HEADER_SIZE + 4+1+3+16+2+1+4;
59 static const int CAST_POSITION = PVA_MESSAGE_HEADER_SIZE + 4;
60 static const int PAYLOAD_POSITION = 4;
61 
62 // 225ms +/- 25ms random
63 static const double ATOMIC_PERIOD = 0.225;
64 static const double PERIOD_JITTER_MS = 0.025;
65 
66 static const int DEFAULT_USER_VALUE = 1;
67 static const int BOOST_VALUE = 1;
68 // must be power of two (so that search is done)
69 static const int MAX_COUNT_VALUE = 1 << 8;
70 static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
71 
72 static const int MAX_FRAMES_AT_ONCE = 10;
73 static const int DELAY_BETWEEN_FRAMES_MS = 50;
74 
75 
76 ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) :
77  m_context(context),
78  m_responseAddress(), // initialized in activate()
79  m_canceled(),
80  m_sequenceNumber(0),
81  m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND),
82  m_channels(),
83  m_lastTimeSent(),
84  m_channelMutex(),
85  m_userValueMutex(),
86  m_mutex()
87 {
88  // initialize random seed with some random value
89  srand ( time(NULL) );
90 }
91 
93 {
94  m_responseAddress = Context::shared_pointer(m_context)->getSearchTransport()->getRemoteAddress();
95 
96  // initialize send buffer
97  initializeSendBuffer();
98 
99  // add some jitter so that all the clients do not send at the same time
100  double period = ATOMIC_PERIOD + double(rand())/RAND_MAX*PERIOD_JITTER_MS;
101 
102  Context::shared_pointer context(m_context.lock());
103  if (context)
104  context->getTimer()->schedulePeriodic(shared_from_this(), period, period);
105 }
106 
108 {
109  Lock guard(m_mutex);
110  if (!m_canceled.get()) {
111  LOG(logLevelWarn, "Logic error: ChannelSearchManager destroyed w/o cancel()");
112  }
113 }
114 
116 {
117  Lock guard(m_mutex);
118 
119  if (m_canceled.get())
120  return;
121  m_canceled.set();
122 
123  Context::shared_pointer context(m_context.lock());
124  if (context)
125  context->getTimer()->cancel(shared_from_this());
126 }
127 
129 {
130  Lock guard(m_channelMutex);
131  return static_cast<int32_t>(m_channels.size());
132 }
133 
134 void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize)
135 {
136  if (m_canceled.get())
137  return;
138 
139  bool immediateTrigger;
140  {
141  Lock guard(m_channelMutex);
142 
143  // overrides if already registered
144  m_channels[channel->getSearchInstanceID()] = channel;
145  immediateTrigger = (m_channels.size() == 1);
146 
147  Lock guard2(m_userValueMutex);
148  int32_t& userValue = channel->getUserValue();
149  userValue = (penalize ? MAX_FALLBACK_COUNT_VALUE : DEFAULT_USER_VALUE);
150  }
151 
152  if (immediateTrigger)
153  callback();
154 }
155 
156 void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel)
157 {
158  Lock guard(m_channelMutex);
159  pvAccessID id = channel->getSearchInstanceID();
160  m_channels.erase(id);
161 }
162 
163 void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t /*seqNo*/, int8_t minorRevision, osiSockAddr* serverAddress)
164 {
165  Lock guard(m_channelMutex);
166  m_channels_t::iterator channelsIter = m_channels.find(cid);
167  if(channelsIter == m_channels.end())
168  {
169  guard.unlock();
170  Context::shared_pointer ctxt(m_context.lock());
171  // TODO: proper action if !ctxt???
172  if(!ctxt) return;
173 
174  // enable duplicate reports
175  SearchInstance::shared_pointer si = std::tr1::dynamic_pointer_cast<SearchInstance>(ctxt->getChannel(cid));
176  if (si)
177  si->searchResponse(guid, minorRevision, serverAddress);
178  }
179  else
180  {
181  SearchInstance::shared_pointer si(channelsIter->second.lock());
182 
183  // remove from search list
184  m_channels.erase(cid);
185 
186  guard.unlock();
187 
188  // then notify SearchInstance
189  if(si)
190  si->searchResponse(guid, minorRevision, serverAddress);
191  }
192 }
193 
195 {
196  boost();
197  callback();
198 }
199 
200 void ChannelSearchManager::initializeSendBuffer()
201 {
202  // for now OK, since it is only set here
203  m_sequenceNumber++;
204 
205 
206  // new buffer
207  m_sendBuffer.clear();
208  m_sendBuffer.putByte(PVA_MAGIC);
210  m_sendBuffer.putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess
211  m_sendBuffer.putByte(CMD_SEARCH);
212  m_sendBuffer.putInt(4+1+3+16+2+1); // "zero" payload
213  m_sendBuffer.putInt(m_sequenceNumber);
214 
215  // multicast vs unicast mask
216  // This is CAST_POSITION, which is overwritten before send
217  m_sendBuffer.putByte((int8_t)0);
218 
219  // reserved part
220  m_sendBuffer.putByte((int8_t)0);
221  m_sendBuffer.putShort((int16_t)0);
222 
223  // NOTE: is it possible (very likely) that address is any local address ::ffff:0.0.0.0
224  encodeAsIPv6Address(&m_sendBuffer, &m_responseAddress);
225  m_sendBuffer.putShort((int16_t)ntohs(m_responseAddress.ia.sin_port));
226 
227  // TODO now only TCP is supported
228  // note: this affects DATA_COUNT_POSITION
229  m_sendBuffer.putByte((int8_t)1);
230 
231  MockTransportSendControl control;
232  SerializeHelper::serializeString("tcp", &m_sendBuffer, &control);
233  m_sendBuffer.putShort((int16_t)0); // count
234 }
235 
236 void ChannelSearchManager::flushSendBuffer()
237 {
238  Lock guard(m_mutex);
239 
240  Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
241  BlockingUDPTransport::shared_pointer ut = std::tr1::static_pointer_cast<BlockingUDPTransport>(tt);
242 
243  m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x80); // unicast, no reply required
244  ut->send(&m_sendBuffer, inetAddressType_unicast);
245 
246  m_sendBuffer.putByte(CAST_POSITION, (int8_t)0x00); // b/m-cast, no reply required
247  ut->send(&m_sendBuffer, inetAddressType_broadcast_multicast);
248 
249  initializeSendBuffer();
250 }
251 
252 
253 bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel,
254  ByteBuffer* requestMessage, TransportSendControl* control)
255 {
256  epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION);
257 
258  dataCount++;
259 
260  /*
261  if(dataCount >= MAX_SEARCH_BATCH_COUNT)
262  return false;
263  */
264 
265  const std::string& name(channel->getSearchInstanceName());
266  // not nice...
267  const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length());
268  if(((int)requestMessage->getRemaining()) < addedPayloadSize)
269  return false;
270 
271  requestMessage->putInt(channel->getSearchInstanceID());
272  SerializeHelper::serializeString(name, requestMessage, control);
273 
274  requestMessage->putInt(PAYLOAD_POSITION, requestMessage->getPosition() - PVA_MESSAGE_HEADER_SIZE);
275  requestMessage->putShort(DATA_COUNT_POSITION, dataCount);
276  return true;
277 }
278 
279 bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel,
280  bool allowNewFrame, bool flush)
281 {
282  MockTransportSendControl control;
283 
284  Lock guard(m_mutex);
285  bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &control);
286  // buffer full, flush
287  if(!success)
288  {
289  flushSendBuffer();
290  if(allowNewFrame)
291  generateSearchRequestMessage(channel, &m_sendBuffer, &control);
292  if (flush)
293  flushSendBuffer();
294  return true;
295  }
296 
297  if (flush)
298  flushSendBuffer();
299 
300  return flush;
301 }
302 
303 void ChannelSearchManager::boost()
304 {
305  Lock guard(m_channelMutex);
306  Lock guard2(m_userValueMutex);
307  m_channels_t::iterator channelsIter = m_channels.begin();
308  for(; channelsIter != m_channels.end(); channelsIter++)
309  {
310  SearchInstance::shared_pointer inst(channelsIter->second.lock());
311  if(!inst) continue;
312  int32_t& userValue = inst->getUserValue();
313  userValue = BOOST_VALUE;
314  }
315 }
316 
318 {
319  // high-frequency beacon anomaly trigger guard
320  {
321  Lock guard(m_mutex);
322 
324  now.getCurrent();
325  int64_t nowMS = now.getMilliseconds();
326 
327  if (nowMS - m_lastTimeSent < 100)
328  return;
329  m_lastTimeSent = nowMS;
330  }
331 
332 
333  int count = 0;
334  int frameSent = 0;
335 
336  vector<SearchInstance::shared_pointer> toSend;
337  {
338  Lock guard(m_channelMutex);
339  toSend.reserve(m_channels.size());
340 
341  for(m_channels_t::iterator channelsIter = m_channels.begin();
342  channelsIter != m_channels.end(); channelsIter++)
343  {
344  SearchInstance::shared_pointer inst(channelsIter->second.lock());
345  if(!inst) continue;
346  toSend.push_back(inst);
347  }
348  }
349 
350  vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
351  for (; siter != toSend.end(); siter++)
352  {
353  bool skip;
354  {
355  epicsGuard<epicsMutex> G(m_userValueMutex);
356  int32_t& countValue = (*siter)->getUserValue();
357  skip = !isPowerOfTwo(countValue);
358 
359  if (countValue >= MAX_COUNT_VALUE)
360  countValue = MAX_FALLBACK_COUNT_VALUE;
361  else
362  countValue++;
363  }
364 
365  // back-off
366  if (skip)
367  continue;
368 
369  count++;
370 
371  if (generateSearchRequestMessage(*siter, true, false))
372  frameSent++;
373  if (frameSent == MAX_FRAMES_AT_ONCE)
374  {
375  epicsThreadSleep(DELAY_BETWEEN_FRAMES_MS/(double)1000.0);
376  frameSent = 0;
377  }
378  }
379 
380  if (count > 0)
381  flushSendBuffer();
382 }
383 
384 bool ChannelSearchManager::isPowerOfTwo(int32_t x)
385 {
386  return ((x > 0) && (x & (x - 1)) == 0);
387 }
388 
390 {
391 }
392 
393 }
394 }
int8_t int8
Definition: pvType.h:75
void unregisterSearchInstance(SearchInstance::shared_pointer const &channel)
epicsInt32 pvAccessID
Definition: pvaDefs.h:18
Definition: assert.hpp:91
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
virtual void searchResponse(const ServerGUID &guid, int8_t minorRevision, osiSockAddr *serverAddress)=0
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
struct sockaddr_in ia
Definition: osiSock.h:157
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
Definition: memory.hpp:41
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
A lock for multithreading.
Definition: lock.h:36
#define NULL
Definition: catime.c:38
void unlock()
Definition: lock.h:66
virtual void timerStopped() OVERRIDE FINAL
Timer stooped callback.
std::size_t getPosition() const
Definition: byteBuffer.h:346
Holds all PVA related.
Definition: pvif.h:34
#define LOG(level, format,...)
Definition: logger.h:48
pvData
Definition: monitor.h:428
APIs for the epicsMutex mutual exclusion semaphore.
void encodeAsIPv6Address(ByteBuffer *buffer, const osiSockAddr *address)
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
EPICS_ALWAYS_INLINE int16 getShort()
Definition: byteBuffer.h:623
static void serializeString(const std::string &value, ByteBuffer *buffer, SerializableControl *flusher)
std::size_t getRemaining() const
Definition: byteBuffer.h:391
epicsEventId flush
Definition: errlog.c:70
void registerSearchInstance(SearchInstance::shared_pointer const &channel, bool penalize=false)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
void searchResponse(const ServerGUID &guid, pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr *serverAddress)
virtual void callback() OVERRIDE FINAL
Timer callback.
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:808
int16_t int16
Definition: pvType.h:79
const epics::pvData::int32 MAX_UDP_UNFRAGMENTED_SEND
Definition: pvaConstants.h:55
#define EPICS_BYTE_ORDER
Definition: osdWireConfig.h:16
EPICS_ALWAYS_INLINE void putShort(int16 value)
Definition: byteBuffer.h:531
int32_t int32
Definition: pvType.h:83
Methods for manipulating timeStamp.
Definition: timeStamp.h:43