16 #define epicsExportSharedSymbols 36 void ensureBuffer(std::size_t ) {}
37 void alignBuffer(std::size_t ) {}
38 void flushSerializeBuffer() {}
42 field->serialize(buffer,
this);
45 std::size_t , std::size_t )
60 static const int PAYLOAD_POSITION = 4;
63 static const double ATOMIC_PERIOD = 0.225;
64 static const double PERIOD_JITTER_MS = 0.025;
66 static const int DEFAULT_USER_VALUE = 1;
67 static const int BOOST_VALUE = 1;
69 static const int MAX_COUNT_VALUE = 1 << 8;
70 static const int MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1;
72 static const int MAX_FRAMES_AT_ONCE = 10;
73 static const int DELAY_BETWEEN_FRAMES_MS = 50;
76 ChannelSearchManager::ChannelSearchManager(Context::shared_pointer
const & context) :
94 m_responseAddress = Context::shared_pointer(m_context)->getSearchTransport()->getRemoteAddress();
97 initializeSendBuffer();
100 double period = ATOMIC_PERIOD + double(rand())/RAND_MAX*PERIOD_JITTER_MS;
102 Context::shared_pointer context(m_context.lock());
104 context->getTimer()->schedulePeriodic(shared_from_this(), period, period);
110 if (!m_canceled.
get()) {
111 LOG(
logLevelWarn,
"Logic error: ChannelSearchManager destroyed w/o cancel()");
119 if (m_canceled.
get())
123 Context::shared_pointer context(m_context.lock());
125 context->getTimer()->cancel(shared_from_this());
130 Lock guard(m_channelMutex);
131 return static_cast<int32_t
>(m_channels.size());
136 if (m_canceled.
get())
139 bool immediateTrigger;
141 Lock guard(m_channelMutex);
144 m_channels[channel->getSearchInstanceID()] = channel;
145 immediateTrigger = (m_channels.size() == 1);
147 Lock guard2(m_userValueMutex);
148 int32_t& userValue = channel->getUserValue();
149 userValue = (penalize ? MAX_FALLBACK_COUNT_VALUE : DEFAULT_USER_VALUE);
152 if (immediateTrigger)
158 Lock guard(m_channelMutex);
159 pvAccessID id = channel->getSearchInstanceID();
160 m_channels.erase(
id);
165 Lock guard(m_channelMutex);
166 m_channels_t::iterator channelsIter = m_channels.find(cid);
167 if(channelsIter == m_channels.end())
170 Context::shared_pointer ctxt(m_context.lock());
181 SearchInstance::shared_pointer si(channelsIter->second.lock());
184 m_channels.erase(cid);
190 si->searchResponse(guid, minorRevision, serverAddress);
200 void ChannelSearchManager::initializeSendBuffer()
207 m_sendBuffer.
clear();
212 m_sendBuffer.
putInt(4+1+3+16+2+1);
213 m_sendBuffer.
putInt(m_sequenceNumber);
217 m_sendBuffer.
putByte((int8_t)0);
220 m_sendBuffer.
putByte((int8_t)0);
225 m_sendBuffer.
putShort((int16_t)ntohs(m_responseAddress.
ia.sin_port));
229 m_sendBuffer.
putByte((int8_t)1);
231 MockTransportSendControl control;
236 void ChannelSearchManager::flushSendBuffer()
240 Transport::shared_pointer tt = m_context.lock()->getSearchTransport();
243 m_sendBuffer.
putByte(CAST_POSITION, (int8_t)0x80);
246 m_sendBuffer.
putByte(CAST_POSITION, (int8_t)0x00);
249 initializeSendBuffer();
253 bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer
const & channel,
265 const std::string& name(channel->getSearchInstanceName());
267 const int addedPayloadSize =
sizeof(
int32)/
sizeof(
int8) + (1 +
sizeof(
int32)/
sizeof(
int8) + name.length());
268 if(((
int)requestMessage->
getRemaining()) < addedPayloadSize)
271 requestMessage->
putInt(channel->getSearchInstanceID());
275 requestMessage->
putShort(DATA_COUNT_POSITION, dataCount);
279 bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer
const & channel,
280 bool allowNewFrame,
bool flush)
282 MockTransportSendControl control;
285 bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &control);
291 generateSearchRequestMessage(channel, &m_sendBuffer, &control);
303 void ChannelSearchManager::boost()
305 Lock guard(m_channelMutex);
306 Lock guard2(m_userValueMutex);
307 m_channels_t::iterator channelsIter = m_channels.begin();
308 for(; channelsIter != m_channels.end(); channelsIter++)
310 SearchInstance::shared_pointer inst(channelsIter->second.lock());
312 int32_t& userValue = inst->getUserValue();
313 userValue = BOOST_VALUE;
327 if (nowMS - m_lastTimeSent < 100)
329 m_lastTimeSent = nowMS;
336 vector<SearchInstance::shared_pointer> toSend;
338 Lock guard(m_channelMutex);
339 toSend.reserve(m_channels.size());
341 for(m_channels_t::iterator channelsIter = m_channels.begin();
342 channelsIter != m_channels.end(); channelsIter++)
344 SearchInstance::shared_pointer inst(channelsIter->second.lock());
346 toSend.push_back(inst);
350 vector<SearchInstance::shared_pointer>::iterator siter = toSend.begin();
351 for (; siter != toSend.end(); siter++)
356 int32_t& countValue = (*siter)->getUserValue();
357 skip = !isPowerOfTwo(countValue);
359 if (countValue >= MAX_COUNT_VALUE)
360 countValue = MAX_FALLBACK_COUNT_VALUE;
371 if (generateSearchRequestMessage(*siter,
true,
false))
373 if (frameSent == MAX_FRAMES_AT_ONCE)
384 bool ChannelSearchManager::isPowerOfTwo(int32_t x)
386 return ((x > 0) && (x & (x - 1)) == 0);
void unregisterSearchInstance(SearchInstance::shared_pointer const &channel)
EPICS_ALWAYS_INLINE void putInt(int32 value)
virtual void searchResponse(const ServerGUID &guid, int8_t minorRevision, osiSockAddr *serverAddress)=0
const epics::pvData::int8 PVA_MAGIC
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
TODO only here because of the Lockable.
A lock for multithreading.
virtual void timerStopped() OVERRIDE FINAL
Timer stooped callback.
std::size_t getPosition() const
int32_t registeredCount()
#define LOG(level, format,...)
APIs for the epicsMutex mutual exclusion semaphore.
void encodeAsIPv6Address(ByteBuffer *buffer, const osiSockAddr *address)
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
EPICS_ALWAYS_INLINE void putByte(int8 value)
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
EPICS_ALWAYS_INLINE int16 getShort()
static void serializeString(const std::string &value, ByteBuffer *buffer, SerializableControl *flusher)
std::size_t getRemaining() const
void registerSearchInstance(SearchInstance::shared_pointer const &channel, bool penalize=false)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
void searchResponse(const ServerGUID &guid, pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr *serverAddress)
virtual void callback() OVERRIDE FINAL
Timer callback.
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
const epics::pvData::int32 MAX_UDP_UNFRAGMENTED_SEND
EPICS_ALWAYS_INLINE void putShort(int16 value)
virtual ~ChannelSearchManager()
Methods for manipulating timeStamp.