This is Unofficial EPICS BASE Doxygen Site
epics::pvAccess::BlockingUDPTransport Class Reference

#include "blockingUDP.h"

+ Inheritance diagram for epics::pvAccess::BlockingUDPTransport:
+ Collaboration diagram for epics::pvAccess::BlockingUDPTransport:

Public Member Functions

 POINTER_DEFINITIONS (BlockingUDPTransport)
 
virtual ~BlockingUDPTransport ()
 
virtual bool isClosed () OVERRIDE FINAL
 
virtual const osiSockAddrgetRemoteAddress () const OVERRIDE FINAL
 
virtual const std::string & getRemoteName () const OVERRIDE FINAL
 
virtual std::string getType () const OVERRIDE FINAL
 
virtual std::size_t getReceiveBufferSize () const OVERRIDE FINAL
 
virtual epics::pvData::int16 getPriority () const OVERRIDE FINAL
 
virtual void setRemoteTransportReceiveBufferSize (std::size_t) OVERRIDE FINAL
 
virtual void setRemoteTransportSocketReceiveBufferSize (std::size_t) OVERRIDE FINAL
 
virtual bool verify (epics::pvData::int32) OVERRIDE FINAL
 
virtual void verified (epics::pvData::Status const &) OVERRIDE FINAL
 
virtual void authNZMessage (epics::pvData::PVStructure::shared_pointer const &data) OVERRIDE FINAL
 
virtual void setByteOrder (int byteOrder) OVERRIDE FINAL
 
virtual void enqueueSendRequest (TransportSender::shared_pointer const &sender) OVERRIDE FINAL
 
virtual void flushSendQueue () OVERRIDE FINAL
 
void start ()
 
virtual void close () OVERRIDE FINAL
 
virtual void ensureData (std::size_t size) OVERRIDE FINAL
 
virtual void alignData (std::size_t alignment) OVERRIDE FINAL
 
virtual bool directSerialize (epics::pvData::ByteBuffer *, const char *, std::size_t, std::size_t) OVERRIDE FINAL
 
virtual bool directDeserialize (epics::pvData::ByteBuffer *, char *, std::size_t, std::size_t) OVERRIDE FINAL
 
virtual void startMessage (epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
 
virtual void endMessage () OVERRIDE FINAL
 
virtual void flush (bool) OVERRIDE FINAL
 
virtual void setRecipient (const osiSockAddr &sendTo) OVERRIDE FINAL
 
void setLocalMulticastAddress (const osiSockAddr &sendTo)
 
bool hasLocalMulticastAddress () const
 
const osiSockAddrgetLocalMulticastAddress () const
 
virtual void flushSerializeBuffer () OVERRIDE FINAL
 
virtual void ensureBuffer (std::size_t) OVERRIDE FINAL
 
virtual void alignBuffer (std::size_t alignment) OVERRIDE FINAL
 
virtual void cachedSerialize (const std::tr1::shared_ptr< const epics::pvData::Field > &field, epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
 
virtual std::tr1::shared_ptr< const epics::pvData::FieldcachedDeserialize (epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
 
virtual bool acquire (std::tr1::shared_ptr< ClientChannelImpl > const &) OVERRIDE FINAL
 
virtual void release (pvAccessID) OVERRIDE FINAL
 
void setIgnoredAddresses (const InetAddrVector &addresses)
 
const InetAddrVectorgetIgnoredAddresses () const
 
void setTappedNIF (const InetAddrVector &addresses)
 
const InetAddrVectorgetTappedNIF () const
 
bool send (const char *buffer, size_t length, const osiSockAddr &address)
 
bool send (epics::pvData::ByteBuffer *buffer, const osiSockAddr &address)
 
bool send (epics::pvData::ByteBuffer *buffer, InetAddressType target=inetAddressType_all)
 
const InetAddrVectorgetSendAddresses ()
 
const osiSockAddrgetBindAddress () const
 
bool isBroadcastAddress (const osiSockAddr *address, const InetAddrVector &broadcastAddresses)
 
void setSendAddresses (InetAddrVector &addresses, std::vector< bool > &address_types)
 
void join (const osiSockAddr &mcastAddr, const osiSockAddr &nifAddr)
 
void setMutlicastNIF (const osiSockAddr &nifAddr, bool loopback)
 
- Public Member Functions inherited from epics::pvAccess::Transport
 POINTER_DEFINITIONS (Transport)
 
 Transport ()
 
virtual ~Transport ()
 
virtual void waitJoin ()
 Call after close() to wait for any worker threads to exit. More...
 
- Public Member Functions inherited from epics::pvData::DeserializableControl
virtual ~DeserializableControl ()
 
- Public Member Functions inherited from epics::pvAccess::TransportSendControl
 POINTER_DEFINITIONS (TransportSendControl)
 
virtual ~TransportSendControl ()
 
- Public Member Functions inherited from epics::pvData::SerializableControl
virtual ~SerializableControl ()
 
virtual void cachedSerialize (std::tr1::shared_ptr< const Field > const &field, ByteBuffer *buffer)=0
 

Static Public Attributes

static size_t num_instances
 
- Static Public Attributes inherited from epics::pvAccess::Transport
static size_t num_instances
 

Protected Member Functions

virtual void run () OVERRIDE FINAL
 

Protected Attributes

AtomicBoolean _closed
 
ResponseHandler::shared_pointer _responseHandler
 

Friends

class BlockingUDPConnector
 

Additional Inherited Members

- Public Attributes inherited from epics::pvAccess::Transport
size_t _totalBytesSent
 
size_t _totalBytesRecv
 

Detailed Description

Definition at line 44 of file blockingUDP.h.

Constructor & Destructor Documentation

epics::pvAccess::BlockingUDPTransport::~BlockingUDPTransport ( )
virtual

Definition at line 95 of file blockingUDPTransport.cpp.

95  {
96  REFTRACE_DECREMENT(num_instances);
97 
98  close(true); // close the socket and stop the thread.
99 }

Member Function Documentation

virtual bool epics::pvAccess::BlockingUDPTransport::acquire ( std::tr1::shared_ptr< ClientChannelImpl > const &  client)
inlinevirtual

Acquires transport.

Parameters
clientclient (channel) acquiring the transport
Returns
true if transport was granted, false otherwise.

Implements epics::pvAccess::Transport.

Definition at line 200 of file blockingUDP.h.

201  {
202  return false;
203  }
virtual void epics::pvAccess::BlockingUDPTransport::alignBuffer ( std::size_t  alignment)
inlinevirtual

Add pad bytes to buffer.

Parameters
alignmentalignment required.

Implements epics::pvData::SerializableControl.

Definition at line 181 of file blockingUDP.h.

181  {
182  _sendBuffer.align(alignment);
183  }
void align(std::size_t size, char fill='\0')
Definition: byteBuffer.h:504
virtual void epics::pvAccess::BlockingUDPTransport::alignData ( std::size_t  alignment)
inlinevirtual

Align buffer. Note that this takes care only current buffer alignment. If streaming protocol is used, care must be taken that entire stream is aligned.

Parameters
alignmentsize in bytes, must be power of two.

Implements epics::pvData::DeserializableControl.

Definition at line 132 of file blockingUDP.h.

132  {
133  _receiveBuffer.align(alignment);
134  }
void align(std::size_t size, char fill='\0')
Definition: byteBuffer.h:504
virtual void epics::pvAccess::BlockingUDPTransport::authNZMessage ( epics::pvData::PVStructure::shared_pointer const &  data)
inlinevirtual

Pass data to the active security plug-in session.

Parameters
datathe data (any data), can be null.

Implements epics::pvAccess::Transport.

Definition at line 109 of file blockingUDP.h.

109  {
110  // noop
111  }
virtual std::tr1::shared_ptr<const epics::pvData::Field> epics::pvAccess::BlockingUDPTransport::cachedDeserialize ( epics::pvData::ByteBuffer buffer)
inlinevirtual

deserialize via cache

Parameters
bufferbuffer to be deserialized from

Implements epics::pvData::DeserializableControl.

Definition at line 193 of file blockingUDP.h.

194  {
195  // no cache
196  // TODO
197  return epics::pvData::getFieldCreate()->deserialize(buffer, this);
198  }
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
virtual void epics::pvAccess::BlockingUDPTransport::cachedSerialize ( const std::tr1::shared_ptr< const epics::pvData::Field > &  field,
epics::pvData::ByteBuffer buffer 
)
inlinevirtual

Definition at line 185 of file blockingUDP.h.

187  {
188  // no cache
189  field->serialize(buffer, this);
190  }
void epics::pvAccess::BlockingUDPTransport::close ( )
virtual

Close transport.

Implements epics::pvAccess::Transport.

Definition at line 116 of file blockingUDPTransport.cpp.

116  {
117  close(true);
118 }
virtual bool epics::pvAccess::BlockingUDPTransport::directDeserialize ( epics::pvData::ByteBuffer existingBuffer,
char *  deserializeTo,
std::size_t  elementCount,
std::size_t  elementSize 
)
inlinevirtual

Method for deserializing array data. Hook for supplying custom deserialization implementation. The deserialization implementation need not be provided. Returns true if method performs deserialization, false otherwise. This should only be used for arrays of primitive types. i.e. boolean, byte,..., double. It cannot be called for string, structure, or union arrays.

Parameters
existingBufferthe existing buffer from the caller.
deserializeTolocation of data.
elementCountnumber of elements.
elementSizeelement size.
Returns
true if deserialization performed, else false.

Implements epics::pvData::DeserializableControl.

Definition at line 142 of file blockingUDP.h.

144  {
145  return false;
146  }
virtual bool epics::pvAccess::BlockingUDPTransport::directSerialize ( epics::pvData::ByteBuffer existingBuffer,
const char *  toSerialize,
std::size_t  elementCount,
std::size_t  elementSize 
)
inlinevirtual

Method for serializing primitive array data. Hook for supplying custom serialization implementation. The serialization implementation need not be provided. Returns true if method performs serialization, false otherwise. This should only be used for arrays of primitive types, i. e. boolean, byte,..., double. It cannot be called for string, structure, or union arrays.

Parameters
existingBufferthe existing buffer from the caller.
toSerializelocation of data to be put into buffer.
elementCountnumber of elements.
elementSizeelement size.
Returns
true if serialization performed, else false.

Implements epics::pvData::SerializableControl.

Definition at line 136 of file blockingUDP.h.

138  {
139  return false;
140  }
void epics::pvAccess::BlockingUDPTransport::endMessage ( )
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 217 of file blockingUDPTransport.cpp.

217  {
218  _sendBuffer.putInt(
219  _lastMessageStartPosition+(sizeof(int16)+2),
220  _sendBuffer.getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE);
221 }
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
std::size_t getPosition() const
Definition: byteBuffer.h:346
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
int16_t int16
Definition: pvType.h:79
void epics::pvAccess::BlockingUDPTransport::enqueueSendRequest ( TransportSender::shared_pointer const &  sender)
virtual

Enqueue send request.

Parameters
sender

Implements epics::pvAccess::Transport.

Definition at line 186 of file blockingUDPTransport.cpp.

186  {
187  Lock lock(_sendMutex);
188 
189  _sendToEnabled = false;
190  _sendBuffer.clear();
191  {
192  epicsGuard<TransportSender> G(*sender);
193  sender->send(&_sendBuffer, this);
194  }
195  endMessage();
196  if(!_sendToEnabled)
197  send(&_sendBuffer);
198  else
199  send(&_sendBuffer, _sendTo);
200 }
epicsMutexId lock
Definition: osiClockTime.c:37
A lock for multithreading.
Definition: lock.h:36
virtual void endMessage() OVERRIDE FINAL
bool send(const char *buffer, size_t length, const osiSockAddr &address)
virtual void epics::pvAccess::BlockingUDPTransport::ensureBuffer ( std::size_t  size)
inlinevirtual

Make sure buffer has at least size bytes remaining. If not flush existing buffer and provide a new one.

Parameters
sizeThe number of bytes.

Implements epics::pvData::SerializableControl.

Definition at line 177 of file blockingUDP.h.

177  {
178  // noop
179  }
void epics::pvAccess::BlockingUDPTransport::ensureData ( std::size_t  size)
virtual

Helper method. Ensures specified size of bytes, provides it if necessary.

Parameters
sizeThe number of bytes.

Implements epics::pvData::DeserializableControl.

Definition at line 120 of file blockingUDPTransport.cpp.

120  {
121  if (_receiveBuffer.getRemaining() >= size)
122  return;
123  std::ostringstream msg;
124  msg<<"no more data in UDP packet : "
125  <<_receiveBuffer.getPosition()<<":"<<_receiveBuffer.getLimit()
126  <<" for "<<size;
127  throw std::underflow_error(msg.str());
128 }
std::size_t getLimit() const
Definition: byteBuffer.h:368
std::size_t getPosition() const
Definition: byteBuffer.h:346
std::size_t getRemaining() const
Definition: byteBuffer.h:391
virtual void epics::pvAccess::BlockingUDPTransport::flush ( bool  )
inlinevirtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 151 of file blockingUDP.h.

151  {
152  // noop since all UDP requests are sent immediately
153  }
void epics::pvAccess::BlockingUDPTransport::flushSendQueue ( )
virtual

Flush send queue (sent messages).

Implements epics::pvAccess::Transport.

Definition at line 203 of file blockingUDPTransport.cpp.

204 {
205  // noop (note different sent addresses are possible)
206 }
virtual void epics::pvAccess::BlockingUDPTransport::flushSerializeBuffer ( )
inlinevirtual

Done with this buffer. Flush it.

Implements epics::pvData::SerializableControl.

Definition at line 173 of file blockingUDP.h.

173  {
174  // noop
175  }
const osiSockAddr* epics::pvAccess::BlockingUDPTransport::getBindAddress ( ) const
inline

Get bind address.

Returns
bind address.

Definition at line 257 of file blockingUDP.h.

257  {
258  return &_bindAddress;
259  }
const InetAddrVector& epics::pvAccess::BlockingUDPTransport::getIgnoredAddresses ( ) const
inline

Get list of ignored addresses.

Returns
ignored addresses.

Definition at line 219 of file blockingUDP.h.

219  {
220  return _ignoredAddresses;
221  }
const osiSockAddr& epics::pvAccess::BlockingUDPTransport::getLocalMulticastAddress ( ) const
inline

Definition at line 169 of file blockingUDP.h.

169  {
170  return _localMulticastAddress;
171  }
virtual epics::pvData::int16 epics::pvAccess::BlockingUDPTransport::getPriority ( ) const
inlinevirtual

Transport priority.

Returns
protocol priority.

Implements epics::pvAccess::Transport.

Definition at line 86 of file blockingUDP.h.

86  {
87  return PVA_DEFAULT_PRIORITY;
88  }
const epics::pvData::int16 PVA_DEFAULT_PRIORITY
Definition: pvaConstants.h:70
virtual std::size_t epics::pvAccess::BlockingUDPTransport::getReceiveBufferSize ( ) const
inlinevirtual

Get receive buffer size.

Returns
receive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 82 of file blockingUDP.h.

82  {
83  return _receiveBuffer.getSize();
84  }
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
virtual const osiSockAddr& epics::pvAccess::BlockingUDPTransport::getRemoteAddress ( ) const
inlinevirtual

Implements epics::pvAccess::Transport.

Definition at line 70 of file blockingUDP.h.

70  {
71  return _remoteAddress;
72  }
virtual const std::string& epics::pvAccess::BlockingUDPTransport::getRemoteName ( ) const
inlinevirtual

Implements epics::pvAccess::Transport.

Definition at line 74 of file blockingUDP.h.

74  {
75  return _remoteName;
76  }
const InetAddrVector& epics::pvAccess::BlockingUDPTransport::getSendAddresses ( )
inline

Get list of send addresses.

Returns
send addresses.

Definition at line 249 of file blockingUDP.h.

249  {
250  return _sendAddresses;
251  }
const InetAddrVector& epics::pvAccess::BlockingUDPTransport::getTappedNIF ( ) const
inline

Get list of tapped NIF addresses.

Returns
tapped NIF addresses.

Definition at line 235 of file blockingUDP.h.

235  {
236  return _tappedNIF;
237  }
virtual std::string epics::pvAccess::BlockingUDPTransport::getType ( ) const
inlinevirtual

Get protocol type (tcp, udp, ssl, etc.).

Returns
protocol type.

Implements epics::pvAccess::Transport.

Definition at line 78 of file blockingUDP.h.

78  {
79  return std::string("udp");
80  }
bool epics::pvAccess::BlockingUDPTransport::hasLocalMulticastAddress ( ) const
inline

Definition at line 165 of file blockingUDP.h.

165  {
166  return _localMulticastAddressEnabled;
167  }
bool epics::pvAccess::BlockingUDPTransport::isBroadcastAddress ( const osiSockAddr address,
const InetAddrVector broadcastAddresses 
)
inline

Definition at line 261 of file blockingUDP.h.

262  {
263  for (size_t i = 0; i < broadcastAddresses.size(); i++)
264  if (broadcastAddresses[i].ia.sin_addr.s_addr == address->ia.sin_addr.s_addr)
265  return true;
266  return false;
267  }
int i
Definition: scan.c:967
struct sockaddr_in ia
Definition: osiSock.h:157
virtual bool epics::pvAccess::BlockingUDPTransport::isClosed ( )
inlinevirtual

Check connection status.

Returns
true if connected.

Implements epics::pvAccess::Transport.

Definition at line 66 of file blockingUDP.h.

66  {
67  return _closed.get();
68  }
void epics::pvAccess::BlockingUDPTransport::join ( const osiSockAddr mcastAddr,
const osiSockAddr nifAddr 
)

Definition at line 525 of file blockingUDPTransport.cpp.

526 {
527  struct ip_mreq imreq;
528  memset(&imreq, 0, sizeof(struct ip_mreq));
529 
530  imreq.imr_multiaddr.s_addr = mcastAddr.ia.sin_addr.s_addr;
531  imreq.imr_interface.s_addr = nifAddr.ia.sin_addr.s_addr;
532 
533  // join multicast group on the given interface
534  int status = ::setsockopt(_channel, IPPROTO_IP, IP_ADD_MEMBERSHIP,
535  (char*)&imreq, sizeof(struct ip_mreq));
536  if (status)
537  {
538  char errStr[64];
539  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
540  throw std::runtime_error(
541  string("Failed to join to the multicast group '") +
542  inetAddressToString(mcastAddr) + "' on network interface '" +
543  inetAddressToString(nifAddr, false) + "': " + errStr);
544  }
545 }
pvd::Status status
struct sockaddr_in ia
Definition: osiSock.h:157
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
epics::pvAccess::BlockingUDPTransport::POINTER_DEFINITIONS ( BlockingUDPTransport  )
virtual void epics::pvAccess::BlockingUDPTransport::release ( pvAccessID  clientId)
inlinevirtual

Releases transport.

Parameters
clientclient (channel) releasing the transport

Implements epics::pvAccess::Transport.

Definition at line 205 of file blockingUDP.h.

205 {}
void epics::pvAccess::BlockingUDPTransport::run ( )
protectedvirtual

Definition at line 223 of file blockingUDPTransport.cpp.

223  {
224  // This function is always called from only one thread - this
225  // object's own thread.
226 
227  osiSockAddr fromAddress;
228  osiSocklen_t addrStructSize = sizeof(sockaddr);
229  Transport::shared_pointer thisTransport(internal_this);
230 
231  try {
232 
233  char* recvfrom_buffer_start = (char*)(_receiveBuffer.getBuffer()+RECEIVE_BUFFER_PRE_RESERVE);
234  size_t recvfrom_buffer_len =_receiveBuffer.getSize()-RECEIVE_BUFFER_PRE_RESERVE;
235  while(!_closed.get())
236  {
237  int bytesRead = recvfrom(_channel,
238  recvfrom_buffer_start, recvfrom_buffer_len,
239  0, (sockaddr*)&fromAddress,
240  &addrStructSize);
241 
242  if(likely(bytesRead>=0)) {
243  // successfully got datagram
244  atomic::add(_totalBytesRecv, bytesRead);
245  bool ignore = false;
246  for(size_t i = 0; i <_ignoredAddresses.size(); i++)
247  {
248  if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr)
249  {
250  ignore = true;
252  char strBuffer[64];
253  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
254  LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer);
255  }
256  break;
257  }
258  }
259 
260  if(likely(!ignore)) {
262  char strBuffer[64];
263  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
264  LOG(logLevelDebug, "UDP %s Rx (%d) %s <- %s", (_clientServerWithEndianFlag&0x40)?"Server":"Client", bytesRead, _remoteName.c_str(), strBuffer);
265  }
266 
267  _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
268  _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
269 
270  try {
271  processBuffer(thisTransport, fromAddress, &_receiveBuffer);
272  } catch(std::exception& e) {
274  char strBuffer[64];
275  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
276  size_t epos = _receiveBuffer.getPosition();
277 
278  // of course _receiveBuffer _may_ have been modified during processing...
279  _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
280  _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
281 
282  std::cerr<<"Error on UDP RX "<<strBuffer<<" -> "<<_remoteName<<" at "<<epos<<" : "<<e.what()<<"\n"
283  <<HexDump(_receiveBuffer).limit(256u);
284  }
285  }
286  }
287  } else {
288 
289  int socketError = SOCKERRNO;
290 
291  // interrupted or timeout
292  if (socketError == SOCK_EINTR ||
293  socketError == EAGAIN || // no alias in libCom
294  // windows times out with this
295  socketError == SOCK_ETIMEDOUT ||
296  socketError == SOCK_EWOULDBLOCK)
297  continue;
298 
299  if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
300  socketError == SOCK_ECONNRESET) // or ECONNRESET in Windows
301  continue;
302 
303  // log a 'recvfrom' error
304  if(!_closed.get())
305  {
306  char errStr[64];
307  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
308  LOG(logLevelError, "Socket recvfrom error: %s.", errStr);
309  }
310 
311  close(false);
312  break;
313  }
314 
315  }
316  } catch(...) {
317  // TODO: catch all exceptions, and act accordingly
318  close(false);
319  }
320 
322  {
323  string threadName = "UDP-rx "+inetAddressToString(_bindAddress);
324  LOG(logLevelTrace, "Thread '%s' exiting.", threadName.c_str());
325  }
326 }
#define SOCK_ECONNREFUSED
Definition: osdSock.h:58
unsigned epicsStdCall sockAddrToDottedIP(const struct sockaddr *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:118
int osiSocklen_t
Definition: osdSock.h:36
int i
Definition: scan.c:967
const char * getBuffer() const
Definition: byteBuffer.h:294
struct sockaddr sa
Definition: osiSock.h:158
struct sockaddr_in ia
Definition: osiSock.h:157
#define RECEIVE_BUFFER_PRE_RESERVE
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
std::size_t getPosition() const
Definition: byteBuffer.h:346
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define SOCK_ECONNRESET
Definition: osdSock.h:53
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
#define likely(x)
Definition: likely.h:14
#define SOCKERRNO
Definition: osdSock.h:33
#define SOCK_EINTR
Definition: osdSock.h:64
bool pvAccessIsLoggable(pvAccessLogLevel level)
Definition: logger.cpp:64
void setLimit(std::size_t limit)
Definition: byteBuffer.h:380
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
Definition: logger.h:51
#define SOCK_EWOULDBLOCK
Definition: osdSock.h:51
bool epics::pvAccess::BlockingUDPTransport::send ( const char *  buffer,
size_t  length,
const osiSockAddr address 
)

Definition at line 433 of file blockingUDPTransport.cpp.

434 {
436  {
437  LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
438  length, _remoteName.c_str(), inetAddressToString(address).c_str());
439  }
440 
441  int retval = sendto(_channel, buffer,
442  length, 0, &(address.sa), sizeof(sockaddr));
443  if(unlikely(retval<0))
444  {
445  char errStr[64];
446  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
447  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
448  inetAddressToString(address).c_str(), errStr);
449  return false;
450  }
451  atomic::add(_totalBytesSent, length);
452 
453  return true;
454 }
struct sockaddr sa
Definition: osiSock.h:158
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define LOG(level, format,...)
Definition: logger.h:48
#define unlikely(x)
Definition: likely.h:15
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
Definition: logger.h:51
bool epics::pvAccess::BlockingUDPTransport::send ( epics::pvData::ByteBuffer buffer,
const osiSockAddr address 
)

Definition at line 456 of file blockingUDPTransport.cpp.

456  {
457 
458  buffer->flip();
459 
461  {
462  LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
463  buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
464  }
465 
466  int retval = sendto(_channel, buffer->getBuffer(),
467  buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
468  if(unlikely(retval<0))
469  {
470  char errStr[64];
471  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
472  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
473  inetAddressToString(address).c_str(), errStr);
474  return false;
475  }
476  atomic::add(_totalBytesSent, buffer->getLimit());
477 
478  // all sent
479  buffer->setPosition(buffer->getLimit());
480 
481  return true;
482 }
const char * getBuffer() const
Definition: byteBuffer.h:294
struct sockaddr sa
Definition: osiSock.h:158
std::size_t getLimit() const
Definition: byteBuffer.h:368
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::size_t getRemaining() const
Definition: byteBuffer.h:391
#define unlikely(x)
Definition: likely.h:15
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
Definition: logger.h:51
bool epics::pvAccess::BlockingUDPTransport::send ( epics::pvData::ByteBuffer buffer,
InetAddressType  target = inetAddressType_all 
)

Definition at line 484 of file blockingUDPTransport.cpp.

484  {
485  if(_sendAddresses.empty()) return false;
486 
487  buffer->flip();
488 
489  bool allOK = true;
490  for(size_t i = 0; i<_sendAddresses.size(); i++) {
491 
492  // filter
493  if (target != inetAddressType_all)
494  if ((target == inetAddressType_unicast && !_isSendAddressUnicast[i]) ||
495  (target == inetAddressType_broadcast_multicast && _isSendAddressUnicast[i]))
496  continue;
497 
499  {
500  LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
501  buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
502  }
503 
504  int retval = sendto(_channel, buffer->getBuffer(),
505  buffer->getLimit(), 0, &(_sendAddresses[i].sa),
506  sizeof(sockaddr));
507  if(unlikely(retval<0))
508  {
509  char errStr[64];
510  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
511  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
512  inetAddressToString(_sendAddresses[i]).c_str(), errStr);
513  allOK = false;
514  }
515  atomic::add(_totalBytesSent, buffer->getLimit());
516  }
517 
518  // all sent
519  buffer->setPosition(buffer->getLimit());
520 
521  return allOK;
522 }
int i
Definition: scan.c:967
const char * getBuffer() const
Definition: byteBuffer.h:294
std::size_t getLimit() const
Definition: byteBuffer.h:368
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::size_t getRemaining() const
Definition: byteBuffer.h:391
#define unlikely(x)
Definition: likely.h:15
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
Definition: logger.h:51
virtual void epics::pvAccess::BlockingUDPTransport::setByteOrder ( int  byteOrder)
inlinevirtual

Set byte order.

Parameters
byteOrderbyte order to set.

Implements epics::pvAccess::Transport.

Definition at line 114 of file blockingUDP.h.

114  {
115  // called from receive thread... or before processing
116  _receiveBuffer.setEndianess(byteOrder);
117 
118  // sync?!
119  _sendBuffer.setEndianess(byteOrder);
120  }
void setEndianess(int byteOrder)
Definition: byteBuffer.h:285
void epics::pvAccess::BlockingUDPTransport::setIgnoredAddresses ( const InetAddrVector addresses)
inline

Set ignore list.

Parameters
addresslist of ignored addresses.

Definition at line 211 of file blockingUDP.h.

211  {
212  _ignoredAddresses = addresses;
213  }
void epics::pvAccess::BlockingUDPTransport::setLocalMulticastAddress ( const osiSockAddr sendTo)
inline

Definition at line 160 of file blockingUDP.h.

160  {
161  _localMulticastAddressEnabled = true;
162  _localMulticastAddress = sendTo;
163  }
void epics::pvAccess::BlockingUDPTransport::setMutlicastNIF ( const osiSockAddr nifAddr,
bool  loopback 
)

Definition at line 547 of file blockingUDPTransport.cpp.

548 {
549  // set the multicast outgoing interface
550  int status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_IF,
551  (char*)&nifAddr.ia.sin_addr, sizeof(struct in_addr));
552  if (status)
553  {
554  char errStr[64];
555  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
556  throw std::runtime_error(
557  string("Failed to set multicast network interface '") +
558  inetAddressToString(nifAddr, false) + "': " + errStr);
559  }
560 
561  // send multicast traffic to myself too
562  unsigned char mcast_loop = (loopback ? 1 : 0);
563  status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_LOOP,
564  (char*)&mcast_loop, sizeof(unsigned char));
565  if (status)
566  {
567  char errStr[64];
568  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
569  throw std::runtime_error(
570  string("Failed to enable multicast loopback on network interface '") +
571  inetAddressToString(nifAddr, false) + "': " + errStr);
572  }
573 
574 }
pvd::Status status
struct sockaddr_in ia
Definition: osiSock.h:157
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
virtual void epics::pvAccess::BlockingUDPTransport::setRecipient ( const osiSockAddr sendTo)
inlinevirtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 155 of file blockingUDP.h.

155  {
156  _sendToEnabled = true;
157  _sendTo = sendTo;
158  }
virtual void epics::pvAccess::BlockingUDPTransport::setRemoteTransportReceiveBufferSize ( std::size_t  receiveBufferSize)
inlinevirtual

Set remote transport receive buffer size.

Parameters
receiveBufferSizereceive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 90 of file blockingUDP.h.

91  {
92  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
93  }
virtual void epics::pvAccess::BlockingUDPTransport::setRemoteTransportSocketReceiveBufferSize ( std::size_t  socketReceiveBufferSize)
inlinevirtual

Set remote transport socket receive buffer size.

Parameters
socketReceiveBufferSizeremote socket receive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 95 of file blockingUDP.h.

96  {
97  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
98  }
void epics::pvAccess::BlockingUDPTransport::setSendAddresses ( InetAddrVector addresses,
std::vector< bool > &  address_types 
)
inline

Definition at line 270 of file blockingUDP.h.

270  {
271  _sendAddresses.swap(addresses);
272  _isSendAddressUnicast.swap(address_types);
273  }
void epics::pvAccess::BlockingUDPTransport::setTappedNIF ( const InetAddrVector addresses)
inline

Set tapped NIF list.

Parameters
NIFaddress list to tap.

Definition at line 227 of file blockingUDP.h.

227  {
228  _tappedNIF = addresses;
229  }
void epics::pvAccess::BlockingUDPTransport::start ( )

Definition at line 101 of file blockingUDPTransport.cpp.

101  {
102 
103  string threadName = "UDP-rx " + inetAddressToString(_bindAddress);
104 
106  {
107  LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str());
108  }
109 
110  _thread.reset(new epicsThread(*this, threadName.c_str(),
113  _thread->start();
114 }
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
#define LOG(level, format,...)
Definition: logger.h:48
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
Definition: logger.h:51
void epics::pvAccess::BlockingUDPTransport::startMessage ( epics::pvData::int8  command,
std::size_t  ensureCapacity,
epics::pvData::int32  payloadSize = 0 
)
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 208 of file blockingUDPTransport.cpp.

208  {
209  _lastMessageStartPosition = _sendBuffer.getPosition();
210  _sendBuffer.putByte(PVA_MAGIC);
211  _sendBuffer.putByte((_clientServerWithEndianFlag&0x40) ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION);
212  _sendBuffer.putByte(_clientServerWithEndianFlag);
213  _sendBuffer.putByte(command); // command
214  _sendBuffer.putInt(payloadSize);
215 }
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
std::size_t getPosition() const
Definition: byteBuffer.h:346
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
virtual void epics::pvAccess::BlockingUDPTransport::verified ( epics::pvData::Status const &  status)
inlinevirtual

Notify transport that it is has been verified.

Parameters
statusvefification status;

Implements epics::pvAccess::Transport.

Definition at line 105 of file blockingUDP.h.

105  {
106  // noop
107  }
virtual bool epics::pvAccess::BlockingUDPTransport::verify ( epics::pvData::int32  timeoutMs)
inlinevirtual

Waits (if needed) until transport is verified, i.e. verified() method is being called.

Parameters
timeoutMstimeout to wait for verification, infinite if 0.

Implements epics::pvAccess::Transport.

Definition at line 100 of file blockingUDP.h.

100  {
101  // noop
102  return true;
103  }

Friends And Related Function Documentation

friend class BlockingUDPConnector
friend

Definition at line 57 of file blockingUDP.h.

Member Data Documentation

AtomicBoolean epics::pvAccess::BlockingUDPTransport::_closed
protected

Definition at line 280 of file blockingUDP.h.

ResponseHandler::shared_pointer epics::pvAccess::BlockingUDPTransport::_responseHandler
protected

Response handler.

Definition at line 285 of file blockingUDP.h.

size_t epics::pvAccess::BlockingUDPTransport::num_instances
static

Definition at line 53 of file blockingUDP.h.


The documentation for this class was generated from the following files: