This is Unofficial EPICS BASE Doxygen Site
blockingUDP.h
Go to the documentation of this file.
1 
7 #ifndef BLOCKINGUDP_H_
8 #define BLOCKINGUDP_H_
9 
10 #ifdef epicsExportSharedSymbols
11 # define blockingUDPEpicsExportSharedSymbols
12 # undef epicsExportSharedSymbols
13 #endif
14 
15 #include <shareLib.h>
16 #include <osiSock.h>
17 #include <epicsThread.h>
18 
19 #include <pv/noDefaultMethods.h>
20 #include <pv/byteBuffer.h>
21 #include <pv/lock.h>
22 #include <pv/event.h>
23 #include <pv/pvIntrospect.h>
24 
25 #ifdef blockingUDPEpicsExportSharedSymbols
26 # define epicsExportSharedSymbols
27 # undef blockingUDPEpicsExportSharedSymbols
28 #endif
29 
30 #include <shareLib.h>
31 
32 #include <pv/remote.h>
33 #include <pv/pvaConstants.h>
34 #include <pv/inetAddressUtil.h>
35 
36 namespace epics {
37 namespace pvAccess {
38 
39 class ClientChannelImpl;
40 class BlockingUDPConnector;
41 
43 
45  public Transport,
46  public TransportSendControl,
47  public epicsThreadRunable
48 {
50 public:
52 
53  static size_t num_instances;
54 
55 private:
56  std::tr1::weak_ptr<BlockingUDPTransport> internal_this;
57  friend class BlockingUDPConnector;
58  BlockingUDPTransport(bool serverFlag,
59  ResponseHandler::shared_pointer const & responseHandler,
60  SOCKET channel, osiSockAddr &bindAddress,
61  short remoteTransportRevision);
62 public:
63 
64  virtual ~BlockingUDPTransport();
65 
66  virtual bool isClosed() OVERRIDE FINAL {
67  return _closed.get();
68  }
69 
70  virtual const osiSockAddr& getRemoteAddress() const OVERRIDE FINAL {
71  return _remoteAddress;
72  }
73 
74  virtual const std::string& getRemoteName() const OVERRIDE FINAL {
75  return _remoteName;
76  }
77 
78  virtual std::string getType() const OVERRIDE FINAL {
79  return std::string("udp");
80  }
81 
82  virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL {
83  return _receiveBuffer.getSize();
84  }
85 
87  return PVA_DEFAULT_PRIORITY;
88  }
89 
91  std::size_t /*receiveBufferSize*/) OVERRIDE FINAL {
92  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
93  }
94 
96  std::size_t /*socketReceiveBufferSize*/) OVERRIDE FINAL {
97  // noop for UDP (limited by 64k; MAX_UDP_SEND for PVA)
98  }
99 
100  virtual bool verify(epics::pvData::int32 /*timeoutMs*/) OVERRIDE FINAL {
101  // noop
102  return true;
103  }
104 
105  virtual void verified(epics::pvData::Status const & /*status*/) OVERRIDE FINAL {
106  // noop
107  }
108 
109  virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) OVERRIDE FINAL {
110  // noop
111  }
112 
113  // NOTE: this is not yet used for UDP
114  virtual void setByteOrder(int byteOrder) OVERRIDE FINAL {
115  // called from receive thread... or before processing
116  _receiveBuffer.setEndianess(byteOrder);
117 
118  // sync?!
119  _sendBuffer.setEndianess(byteOrder);
120  }
121 
122  virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender) OVERRIDE FINAL;
123 
124  virtual void flushSendQueue() OVERRIDE FINAL;
125 
126  void start();
127 
128  virtual void close() OVERRIDE FINAL;
129 
130  virtual void ensureData(std::size_t size) OVERRIDE FINAL;
131 
132  virtual void alignData(std::size_t alignment) OVERRIDE FINAL {
133  _receiveBuffer.align(alignment);
134  }
135 
136  virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/,
137  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
138  {
139  return false;
140  }
141 
142  virtual bool directDeserialize(epics::pvData::ByteBuffer* /*existingBuffer*/, char* /*deserializeTo*/,
143  std::size_t /*elementCount*/, std::size_t /*elementSize*/) OVERRIDE FINAL
144  {
145  return false;
146  }
147 
148  virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize = 0) OVERRIDE FINAL;
149  virtual void endMessage() OVERRIDE FINAL;
150 
151  virtual void flush(bool /*lastMessageCompleted*/) OVERRIDE FINAL {
152  // noop since all UDP requests are sent immediately
153  }
154 
155  virtual void setRecipient(const osiSockAddr& sendTo) OVERRIDE FINAL {
156  _sendToEnabled = true;
157  _sendTo = sendTo;
158  }
159 
160  void setLocalMulticastAddress(const osiSockAddr& sendTo) {
161  _localMulticastAddressEnabled = true;
162  _localMulticastAddress = sendTo;
163  }
164 
166  return _localMulticastAddressEnabled;
167  }
168 
170  return _localMulticastAddress;
171  }
172 
173  virtual void flushSerializeBuffer() OVERRIDE FINAL {
174  // noop
175  }
176 
177  virtual void ensureBuffer(std::size_t /*size*/) OVERRIDE FINAL {
178  // noop
179  }
180 
181  virtual void alignBuffer(std::size_t alignment) OVERRIDE FINAL {
182  _sendBuffer.align(alignment);
183  }
184 
185  virtual void cachedSerialize(
186  const std::tr1::shared_ptr<const epics::pvData::Field>& field, epics::pvData::ByteBuffer* buffer) OVERRIDE FINAL
187  {
188  // no cache
189  field->serialize(buffer, this);
190  }
191 
192  virtual std::tr1::shared_ptr<const epics::pvData::Field>
194  {
195  // no cache
196  // TODO
197  return epics::pvData::getFieldCreate()->deserialize(buffer, this);
198  }
199 
200  virtual bool acquire(std::tr1::shared_ptr<ClientChannelImpl> const & /*client*/) OVERRIDE FINAL
201  {
202  return false;
203  }
204 
205  virtual void release(pvAccessID /*clientId*/) OVERRIDE FINAL {}
206 
211  void setIgnoredAddresses(const InetAddrVector& addresses) {
212  _ignoredAddresses = addresses;
213  }
214 
220  return _ignoredAddresses;
221  }
222 
227  void setTappedNIF(const InetAddrVector& addresses) {
228  _tappedNIF = addresses;
229  }
230 
235  const InetAddrVector& getTappedNIF() const {
236  return _tappedNIF;
237  }
238 
239  bool send(const char* buffer, size_t length, const osiSockAddr& address);
240 
241  bool send(epics::pvData::ByteBuffer* buffer, const osiSockAddr& address);
242 
244 
250  return _sendAddresses;
251  }
252 
257  const osiSockAddr* getBindAddress() const {
258  return &_bindAddress;
259  }
260 
261  bool isBroadcastAddress(const osiSockAddr* address, const InetAddrVector& broadcastAddresses)
262  {
263  for (size_t i = 0; i < broadcastAddresses.size(); i++)
264  if (broadcastAddresses[i].ia.sin_addr.s_addr == address->ia.sin_addr.s_addr)
265  return true;
266  return false;
267  }
268 
269  // consumes arguments
270  void setSendAddresses(InetAddrVector& addresses, std::vector<bool>& address_types) {
271  _sendAddresses.swap(addresses);
272  _isSendAddressUnicast.swap(address_types);
273  }
274 
275  void join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr);
276 
277  void setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback);
278 
279 protected:
281 
285  ResponseHandler::shared_pointer _responseHandler;
286 
287  virtual void run() OVERRIDE FINAL;
288 
289 private:
290  bool processBuffer(Transport::shared_pointer const & transport, osiSockAddr& fromAddress, epics::pvData::ByteBuffer* receiveBuffer);
291 
292  void close(bool waitForThreadToComplete);
293 
294  // Context only used for logging in this class
295 
299  SOCKET _channel;
300 
301  /* When provided, this transport is used for replies (passed to handler)
302  * instead of *this. This feature is used in the situation where broadcast
303  * traffic is received on one socket, but a different socket must be used
304  * for unicast replies.
305  *
306  Transport::shared_pointer _replyTransport;
307  */
308 
312  osiSockAddr _bindAddress;
313 
317  osiSockAddr _remoteAddress;
318  std::string _remoteName;
319 
323  InetAddrVector _sendAddresses;
324 
325  std::vector<bool> _isSendAddressUnicast;
326 
330  InetAddrVector _ignoredAddresses;
331 
335  InetAddrVector _tappedNIF;
336 
340  osiSockAddr _sendTo;
341  bool _sendToEnabled;
342 
346  osiSockAddr _localMulticastAddress;
347  bool _localMulticastAddressEnabled;
348 
352  epics::pvData::ByteBuffer _receiveBuffer;
353 
357  epics::pvData::ByteBuffer _sendBuffer;
358 
362  int _lastMessageStartPosition;
363 
367  epics::pvData::Mutex _mutex;
368  epics::pvData::Mutex _sendMutex;
369 
373  epics::auto_ptr<epicsThread> _thread;
374 
375  epics::pvData::int8 _clientServerWithEndianFlag;
376 
377 };
378 
380 public:
381  POINTER_DEFINITIONS(BlockingUDPConnector);
382 
383  BlockingUDPConnector(bool serverFlag) :_serverFlag(serverFlag) {}
384 
388  BlockingUDPTransport::shared_pointer connect(
389  ResponseHandler::shared_pointer const & responseHandler,
390  osiSockAddr& bindAddress,
391  epics::pvData::int8 transportRevision);
392 
393 private:
394 
398  bool _serverFlag;
399 
400  EPICS_NOT_COPYABLE(BlockingUDPConnector)
401 };
402 
403 typedef std::vector<BlockingUDPTransport::shared_pointer> BlockingUDPTransportVector;
404 
406  bool serverFlag,
407  BlockingUDPTransportVector& udpTransports,
408  const IfaceNodeVector& ifaceList,
409  const ResponseHandler::shared_pointer& responseHandler,
410  BlockingUDPTransport::shared_pointer& sendTransport,
411  epics::pvData::int32& listenPort,
412  bool autoAddressList,
413  const std::string& addressList,
414  const std::string& ignoreAddressList);
415 
416 
417 }
418 }
419 
420 #endif /* BLOCKINGUDP_H_ */
int8_t int8
Definition: pvType.h:75
virtual void setRemoteTransportReceiveBufferSize(std::size_t) OVERRIDE FINAL
Definition: blockingUDP.h:90
virtual const std::string & getRemoteName() const OVERRIDE FINAL
Definition: blockingUDP.h:74
virtual void alignData(std::size_t alignment) OVERRIDE FINAL
Definition: blockingUDP.h:132
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
epicsInt32 pvAccessID
Definition: pvaDefs.h:18
virtual void flush(bool) OVERRIDE FINAL
Definition: blockingUDP.h:151
virtual std::string getType() const OVERRIDE FINAL
Definition: blockingUDP.h:78
virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL
Definition: blockingUDP.h:82
const osiSockAddr & getLocalMulticastAddress() const
Definition: blockingUDP.h:169
virtual void ensureBuffer(std::size_t) OVERRIDE FINAL
Definition: blockingUDP.h:177
int i
Definition: scan.c:967
POINTER_DEFINITIONS(BlockingUDPTransport)
void join(const osiSockAddr &mcastAddr, const osiSockAddr &nifAddr)
struct sockaddr_in ia
Definition: osiSock.h:157
virtual bool directDeserialize(epics::pvData::ByteBuffer *, char *, std::size_t, std::size_t) OVERRIDE FINAL
Definition: blockingUDP.h:142
Definition: memory.hpp:41
virtual void setByteOrder(int byteOrder) OVERRIDE FINAL
Definition: blockingUDP.h:114
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
virtual void enqueueSendRequest(TransportSender::shared_pointer const &sender) OVERRIDE FINAL
ResponseHandler::shared_pointer _responseHandler
Definition: blockingUDP.h:285
bool isBroadcastAddress(const osiSockAddr *address, const InetAddrVector &broadcastAddresses)
Definition: blockingUDP.h:261
Mark external symbols and entry points for shared libraries.
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
void align(std::size_t size, char fill='\0')
Definition: byteBuffer.h:504
virtual void ensureData(std::size_t size) OVERRIDE FINAL
#define OVERRIDE
Definition: pvAccess.h:55
virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const &data) OVERRIDE FINAL
Definition: blockingUDP.h:109
virtual const osiSockAddr & getRemoteAddress() const OVERRIDE FINAL
Definition: blockingUDP.h:70
std::vector< BlockingUDPTransport::shared_pointer > BlockingUDPTransportVector
Definition: blockingUDP.h:403
virtual void flushSerializeBuffer() OVERRIDE FINAL
Definition: blockingUDP.h:173
virtual void cachedSerialize(const std::tr1::shared_ptr< const epics::pvData::Field > &field, epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
Definition: blockingUDP.h:185
virtual void alignBuffer(std::size_t alignment) OVERRIDE FINAL
Definition: blockingUDP.h:181
virtual bool acquire(std::tr1::shared_ptr< ClientChannelImpl > const &) OVERRIDE FINAL
Definition: blockingUDP.h:200
const InetAddrVector & getTappedNIF() const
Definition: blockingUDP.h:235
virtual bool isClosed() OVERRIDE FINAL
Definition: blockingUDP.h:66
std::vector< osiSockAddr > InetAddrVector
const osiSockAddr * getBindAddress() const
Definition: blockingUDP.h:257
const InetAddrVector & getIgnoredAddresses() const
Definition: blockingUDP.h:219
virtual void setRecipient(const osiSockAddr &sendTo) OVERRIDE FINAL
Definition: blockingUDP.h:155
virtual void flushSendQueue() OVERRIDE FINAL
void setIgnoredAddresses(const InetAddrVector &addresses)
Definition: blockingUDP.h:211
std::vector< ifaceNode > IfaceNodeVector
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
int SOCKET
Definition: osdSock.h:31
void setSendAddresses(InetAddrVector &addresses, std::vector< bool > &address_types)
Definition: blockingUDP.h:270
virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL
Definition: blockingUDP.h:86
FORCE_INLINE const FieldCreatePtr & getFieldCreate()
void initializeUDPTransports(bool serverFlag, BlockingUDPTransportVector &udpTransports, const IfaceNodeVector &ifaceList, const ResponseHandler::shared_pointer &responseHandler, BlockingUDPTransport::shared_pointer &sendTransport, int32 &listenPort, bool autoAddressList, const std::string &addressList, const std::string &ignoreAddressList)
virtual void endMessage() OVERRIDE FINAL
void setLocalMulticastAddress(const osiSockAddr &sendTo)
Definition: blockingUDP.h:160
const InetAddrVector & getSendAddresses()
Definition: blockingUDP.h:249
virtual void verified(epics::pvData::Status const &) OVERRIDE FINAL
Definition: blockingUDP.h:105
virtual void release(pvAccessID) OVERRIDE FINAL
Definition: blockingUDP.h:205
void setEndianess(int byteOrder)
Definition: byteBuffer.h:285
virtual void setRemoteTransportSocketReceiveBufferSize(std::size_t) OVERRIDE FINAL
Definition: blockingUDP.h:95
void setTappedNIF(const InetAddrVector &addresses)
Definition: blockingUDP.h:227
bool send(const char *buffer, size_t length, const osiSockAddr &address)
int16_t int16
Definition: pvType.h:79
virtual std::tr1::shared_ptr< const epics::pvData::Field > cachedDeserialize(epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
Definition: blockingUDP.h:193
const epics::pvData::int16 PVA_DEFAULT_PRIORITY
Definition: pvaConstants.h:70
virtual bool verify(epics::pvData::int32) OVERRIDE FINAL
Definition: blockingUDP.h:100
epicsMutex Mutex
Definition: lock.h:28
C++ and C descriptions for a thread.
#define EPICS_NOT_COPYABLE(CLASS)
Disable implicit copyable.
int32_t int32
Definition: pvType.h:83
void setMutlicastNIF(const osiSockAddr &nifAddr, bool loopback)
#define FINAL
Definition: pvAccess.h:48
virtual bool directSerialize(epics::pvData::ByteBuffer *, const char *, std::size_t, std::size_t) OVERRIDE FINAL
Definition: blockingUDP.h:136