This is Unofficial EPICS BASE Doxygen Site
blockingUDPTransport.cpp
Go to the documentation of this file.
1 
7 #ifdef _WIN32
8 // needed for ip_mreq
9 #include <ws2tcpip.h>
10 #endif
11 
12 #include <sstream>
13 
14 #include <sys/types.h>
15 #include <cstdio>
16 
17 #include <epicsThread.h>
18 #include <osiSock.h>
19 #include <epicsAtomic.h>
20 
21 #include <pv/lock.h>
22 #include <pv/byteBuffer.h>
23 #include <pv/reftrack.h>
24 
25 #define epicsExportSharedSymbols
26 #include <pv/blockingUDP.h>
27 #include <pv/pvaConstants.h>
28 #include <pv/inetAddressUtil.h>
29 #include <pv/logger.h>
30 #include <pv/likely.h>
31 #include <pv/hexDump.h>
32 
33 using namespace epics::pvData;
34 using namespace std;
36 
37 namespace epics {
38 namespace pvAccess {
39 
40 #ifdef __vxworks
41 inline int sendto(int s, const char *buf, size_t len, int flags, const struct sockaddr *to, int tolen)
42 {
43  return ::sendto(s, const_cast<char*>(buf), len, flags, const_cast<struct sockaddr *>(to), tolen);
44 }
45 #endif
46 
47 // reserve some space for CMD_ORIGIN_TAG message
48 #define RECEIVE_BUFFER_PRE_RESERVE (PVA_MESSAGE_HEADER_SIZE + 16)
49 
50 size_t BlockingUDPTransport::num_instances;
51 
52 BlockingUDPTransport::BlockingUDPTransport(bool serverFlag,
53  ResponseHandler::shared_pointer const & responseHandler, SOCKET channel,
54  osiSockAddr& bindAddress,
55  short /*remoteTransportRevision*/) :
56  _closed(),
57  _responseHandler(responseHandler),
58  _channel(channel),
59  _bindAddress(bindAddress),
60  _sendAddresses(0),
61  _ignoredAddresses(0),
62  _tappedNIF(0),
63  _sendToEnabled(false),
64  _localMulticastAddressEnabled(false),
66  _sendBuffer(MAX_UDP_RECV),
67  _lastMessageStartPosition(0),
68  _clientServerWithEndianFlag(
69  (serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00))
70 {
71  assert(_responseHandler.get());
72 
73  osiSocklen_t sockLen = sizeof(sockaddr);
74  // read the actual socket info
75  int retval = ::getsockname(_channel, &_remoteAddress.sa, &sockLen);
76  if(retval<0) {
77  // error obtaining remote address, fallback to bindAddress
78  _remoteAddress = _bindAddress;
79 
80  char strBuffer[64];
81  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
82  LOG(logLevelDebug, "getsockname error: %s.", strBuffer);
83  _remoteName = "<unknown>:0";
84  } else {
85  char strBuffer[64];
86  sockAddrToDottedIP(&_remoteAddress.sa, strBuffer, sizeof(strBuffer));
87  _remoteName = strBuffer;
88  LOG(logLevelDebug, "Creating datagram socket from: %s.",
89  _remoteName.c_str());
90  }
91 
92  REFTRACE_INCREMENT(num_instances);
93 }
94 
95 BlockingUDPTransport::~BlockingUDPTransport() {
96  REFTRACE_DECREMENT(num_instances);
97 
98  close(true); // close the socket and stop the thread.
99 }
100 
101 void BlockingUDPTransport::start() {
102 
103  string threadName = "UDP-rx " + inetAddressToString(_bindAddress);
104 
106  {
107  LOG(logLevelTrace, "Starting thread: %s.", threadName.c_str());
108  }
109 
110  _thread.reset(new epicsThread(*this, threadName.c_str(),
113  _thread->start();
114 }
115 
116 void BlockingUDPTransport::close() {
117  close(true);
118 }
119 
120 void BlockingUDPTransport::ensureData(std::size_t size) {
121  if (_receiveBuffer.getRemaining() >= size)
122  return;
123  std::ostringstream msg;
124  msg<<"no more data in UDP packet : "
125  <<_receiveBuffer.getPosition()<<":"<<_receiveBuffer.getLimit()
126  <<" for "<<size;
127  throw std::underflow_error(msg.str());
128 }
129 
130 void BlockingUDPTransport::close(bool waitForThreadToComplete) {
131  {
132  Lock guard(_mutex);
133  if(_closed.get()) return;
134  _closed.set();
135  }
136 
138  {
140  "UDP socket %s closed.",
141  inetAddressToString(_bindAddress).c_str());
142  }
143 
146  switch ( info )
147  {
149  epicsSocketDestroy ( _channel );
150  break;
152  {
153  /*int status =*/ ::shutdown ( _channel, SHUT_RDWR );
154  /*
155  if ( status ) {
156  char sockErrBuf[64];
157  epicsSocketConvertErrnoToString (
158  sockErrBuf, sizeof ( sockErrBuf ) );
159  LOG(logLevelDebug,
160  "UDP socket %s failed to shutdown: %s.",
161  inetAddressToString(_bindAddress).c_str(), sockErrBuf);
162  }
163  */
164  epicsSocketDestroy ( _channel );
165  }
166  break;
168  // not supported anymore anyway
169  default:
170  epicsSocketDestroy(_channel);
171  }
172 
173 
174  // wait for send thread to exit cleanly
175  if (_thread.get() && waitForThreadToComplete)
176  {
177  if (!_thread->exitWait(5.0))
178  {
180  "Receive thread for UDP socket %s has not exited.",
181  inetAddressToString(_bindAddress).c_str());
182  }
183  }
184 }
185 
186 void BlockingUDPTransport::enqueueSendRequest(TransportSender::shared_pointer const & sender) {
187  Lock lock(_sendMutex);
188 
189  _sendToEnabled = false;
190  _sendBuffer.clear();
191  {
192  epicsGuard<TransportSender> G(*sender);
193  sender->send(&_sendBuffer, this);
194  }
195  endMessage();
196  if(!_sendToEnabled)
197  send(&_sendBuffer);
198  else
199  send(&_sendBuffer, _sendTo);
200 }
201 
202 
203 void BlockingUDPTransport::flushSendQueue()
204 {
205  // noop (note different sent addresses are possible)
206 }
207 
208 void BlockingUDPTransport::startMessage(int8 command, size_t /*ensureCapacity*/, int32 payloadSize) {
209  _lastMessageStartPosition = _sendBuffer.getPosition();
210  _sendBuffer.putByte(PVA_MAGIC);
211  _sendBuffer.putByte((_clientServerWithEndianFlag&0x40) ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION);
212  _sendBuffer.putByte(_clientServerWithEndianFlag);
213  _sendBuffer.putByte(command); // command
214  _sendBuffer.putInt(payloadSize);
215 }
216 
217 void BlockingUDPTransport::endMessage() {
218  _sendBuffer.putInt(
219  _lastMessageStartPosition+(sizeof(int16)+2),
220  _sendBuffer.getPosition()-_lastMessageStartPosition-PVA_MESSAGE_HEADER_SIZE);
221 }
222 
223 void BlockingUDPTransport::run() {
224  // This function is always called from only one thread - this
225  // object's own thread.
226 
227  osiSockAddr fromAddress;
228  osiSocklen_t addrStructSize = sizeof(sockaddr);
229  Transport::shared_pointer thisTransport(internal_this);
230 
231  try {
232 
233  char* recvfrom_buffer_start = (char*)(_receiveBuffer.getBuffer()+RECEIVE_BUFFER_PRE_RESERVE);
234  size_t recvfrom_buffer_len =_receiveBuffer.getSize()-RECEIVE_BUFFER_PRE_RESERVE;
235  while(!_closed.get())
236  {
237  int bytesRead = recvfrom(_channel,
238  recvfrom_buffer_start, recvfrom_buffer_len,
239  0, (sockaddr*)&fromAddress,
240  &addrStructSize);
241 
242  if(likely(bytesRead>=0)) {
243  // successfully got datagram
244  atomic::add(_totalBytesRecv, bytesRead);
245  bool ignore = false;
246  for(size_t i = 0; i <_ignoredAddresses.size(); i++)
247  {
248  if(_ignoredAddresses[i].ia.sin_addr.s_addr==fromAddress.ia.sin_addr.s_addr)
249  {
250  ignore = true;
252  char strBuffer[64];
253  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
254  LOG(logLevelDebug, "UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer);
255  }
256  break;
257  }
258  }
259 
260  if(likely(!ignore)) {
262  char strBuffer[64];
263  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
264  LOG(logLevelDebug, "UDP %s Rx (%d) %s <- %s", (_clientServerWithEndianFlag&0x40)?"Server":"Client", bytesRead, _remoteName.c_str(), strBuffer);
265  }
266 
267  _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
268  _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
269 
270  try {
271  processBuffer(thisTransport, fromAddress, &_receiveBuffer);
272  } catch(std::exception& e) {
274  char strBuffer[64];
275  sockAddrToDottedIP(&fromAddress.sa, strBuffer, sizeof(strBuffer));
276  size_t epos = _receiveBuffer.getPosition();
277 
278  // of course _receiveBuffer _may_ have been modified during processing...
279  _receiveBuffer.setPosition(RECEIVE_BUFFER_PRE_RESERVE);
280  _receiveBuffer.setLimit(RECEIVE_BUFFER_PRE_RESERVE+bytesRead);
281 
282  std::cerr<<"Error on UDP RX "<<strBuffer<<" -> "<<_remoteName<<" at "<<epos<<" : "<<e.what()<<"\n"
283  <<HexDump(_receiveBuffer).limit(256u);
284  }
285  }
286  }
287  } else {
288 
289  int socketError = SOCKERRNO;
290 
291  // interrupted or timeout
292  if (socketError == SOCK_EINTR ||
293  socketError == EAGAIN || // no alias in libCom
294  // windows times out with this
295  socketError == SOCK_ETIMEDOUT ||
296  socketError == SOCK_EWOULDBLOCK)
297  continue;
298 
299  if (socketError == SOCK_ECONNREFUSED || // avoid spurious ECONNREFUSED in Linux
300  socketError == SOCK_ECONNRESET) // or ECONNRESET in Windows
301  continue;
302 
303  // log a 'recvfrom' error
304  if(!_closed.get())
305  {
306  char errStr[64];
307  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
308  LOG(logLevelError, "Socket recvfrom error: %s.", errStr);
309  }
310 
311  close(false);
312  break;
313  }
314 
315  }
316  } catch(...) {
317  // TODO: catch all exceptions, and act accordingly
318  close(false);
319  }
320 
322  {
323  string threadName = "UDP-rx "+inetAddressToString(_bindAddress);
324  LOG(logLevelTrace, "Thread '%s' exiting.", threadName.c_str());
325  }
326 }
327 
328 bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & transport,
329  osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {
330 
331 
332  // handle response(s)
333  while(likely((int)receiveBuffer->getRemaining()>=PVA_MESSAGE_HEADER_SIZE)) {
334 
335  //
336  // read header
337  //
338 
339  // first byte is PVA_MAGIC
340  int8 magic = receiveBuffer->getByte();
341  if(unlikely(magic != PVA_MAGIC))
342  return false;
343 
344  // second byte version
345  int8 version = receiveBuffer->getByte();
346  if(version==0) {
347  // 0 -> 1 included incompatible changes
348  return false;
349  }
350 
351  int8 flags = receiveBuffer->getByte();
352  if (flags & 0x80)
353  {
354  // 7th bit set
355  receiveBuffer->setEndianess(EPICS_ENDIAN_BIG);
356  }
357  else
358  {
359  receiveBuffer->setEndianess(EPICS_ENDIAN_LITTLE);
360  }
361 
362  // command ID and paylaod
363  int8 command = receiveBuffer->getByte();
364  // TODO check this cast (size_t must be 32-bit)
365  size_t payloadSize = receiveBuffer->getInt();
366 
367  // control message check (skip message)
368  if (flags & 0x01)
369  continue;
370 
371  size_t nextRequestPosition = receiveBuffer->getPosition() + payloadSize;
372 
373  // payload size check
374  if(unlikely(nextRequestPosition>receiveBuffer->getLimit())) return false;
375 
376  // CMD_ORIGIN_TAG filtering
377  // NOTE: from design point of view this is not a right place to process application message here
378  if (unlikely(command == CMD_ORIGIN_TAG))
379  {
380  // enabled?
381  if (!_tappedNIF.empty())
382  {
383  // 128-bit IPv6 address
384  osiSockAddr originNIFAddress;
385  memset(&originNIFAddress, 0, sizeof(originNIFAddress));
386 
387  if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress))
388  {
389  originNIFAddress.ia.sin_family = AF_INET;
390 
391  /*
392  LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.",
393  inetAddressToString(fromAddress, true).c_str(),
394  inetAddressToString(originNIFAddress, false).c_str());
395  */
396 
397  // filter
398  if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY))
399  {
400  bool accept = false;
401  for(size_t i = 0; i < _tappedNIF.size(); i++)
402  {
403  if(_tappedNIF[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr)
404  {
405  accept = true;
406  break;
407  }
408  }
409 
410  // ignore messages from non-tapped NIFs
411  if (!accept)
412  return false;
413  }
414  }
415  }
416  }
417  else
418  {
419  // handle
420  _responseHandler->handleResponse(&fromAddress, transport,
421  version, command, payloadSize,
422  &_receiveBuffer);
423  }
424 
425  // set position (e.g. in case handler did not read all)
426  receiveBuffer->setPosition(nextRequestPosition);
427  }
428 
429  // all ok
430  return true;
431 }
432 
433 bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSockAddr& address)
434 {
436  {
437  LOG(logLevelDebug, "UDP Tx (%zu) %s -> %s.",
438  length, _remoteName.c_str(), inetAddressToString(address).c_str());
439  }
440 
441  int retval = sendto(_channel, buffer,
442  length, 0, &(address.sa), sizeof(sockaddr));
443  if(unlikely(retval<0))
444  {
445  char errStr[64];
446  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
447  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
448  inetAddressToString(address).c_str(), errStr);
449  return false;
450  }
451  atomic::add(_totalBytesSent, length);
452 
453  return true;
454 }
455 
456 bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) {
457 
458  buffer->flip();
459 
461  {
462  LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
463  buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(address).c_str());
464  }
465 
466  int retval = sendto(_channel, buffer->getBuffer(),
467  buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
468  if(unlikely(retval<0))
469  {
470  char errStr[64];
471  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
472  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
473  inetAddressToString(address).c_str(), errStr);
474  return false;
475  }
476  atomic::add(_totalBytesSent, buffer->getLimit());
477 
478  // all sent
479  buffer->setPosition(buffer->getLimit());
480 
481  return true;
482 }
483 
484 bool BlockingUDPTransport::send(ByteBuffer* buffer, InetAddressType target) {
485  if(_sendAddresses.empty()) return false;
486 
487  buffer->flip();
488 
489  bool allOK = true;
490  for(size_t i = 0; i<_sendAddresses.size(); i++) {
491 
492  // filter
493  if (target != inetAddressType_all)
494  if ((target == inetAddressType_unicast && !_isSendAddressUnicast[i]) ||
495  (target == inetAddressType_broadcast_multicast && _isSendAddressUnicast[i]))
496  continue;
497 
499  {
500  LOG(logLevelDebug, "Sending %zu bytes %s -> %s.",
501  buffer->getRemaining(), _remoteName.c_str(), inetAddressToString(_sendAddresses[i]).c_str());
502  }
503 
504  int retval = sendto(_channel, buffer->getBuffer(),
505  buffer->getLimit(), 0, &(_sendAddresses[i].sa),
506  sizeof(sockaddr));
507  if(unlikely(retval<0))
508  {
509  char errStr[64];
510  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
511  LOG(logLevelDebug, "Socket sendto to %s error: %s.",
512  inetAddressToString(_sendAddresses[i]).c_str(), errStr);
513  allOK = false;
514  }
515  atomic::add(_totalBytesSent, buffer->getLimit());
516  }
517 
518  // all sent
519  buffer->setPosition(buffer->getLimit());
520 
521  return allOK;
522 }
523 
524 
525 void BlockingUDPTransport::join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr)
526 {
527  struct ip_mreq imreq;
528  memset(&imreq, 0, sizeof(struct ip_mreq));
529 
530  imreq.imr_multiaddr.s_addr = mcastAddr.ia.sin_addr.s_addr;
531  imreq.imr_interface.s_addr = nifAddr.ia.sin_addr.s_addr;
532 
533  // join multicast group on the given interface
534  int status = ::setsockopt(_channel, IPPROTO_IP, IP_ADD_MEMBERSHIP,
535  (char*)&imreq, sizeof(struct ip_mreq));
536  if (status)
537  {
538  char errStr[64];
539  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
540  throw std::runtime_error(
541  string("Failed to join to the multicast group '") +
542  inetAddressToString(mcastAddr) + "' on network interface '" +
543  inetAddressToString(nifAddr, false) + "': " + errStr);
544  }
545 }
546 
547 void BlockingUDPTransport::setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback)
548 {
549  // set the multicast outgoing interface
550  int status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_IF,
551  (char*)&nifAddr.ia.sin_addr, sizeof(struct in_addr));
552  if (status)
553  {
554  char errStr[64];
555  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
556  throw std::runtime_error(
557  string("Failed to set multicast network interface '") +
558  inetAddressToString(nifAddr, false) + "': " + errStr);
559  }
560 
561  // send multicast traffic to myself too
562  unsigned char mcast_loop = (loopback ? 1 : 0);
563  status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_LOOP,
564  (char*)&mcast_loop, sizeof(unsigned char));
565  if (status)
566  {
567  char errStr[64];
568  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
569  throw std::runtime_error(
570  string("Failed to enable multicast loopback on network interface '") +
571  inetAddressToString(nifAddr, false) + "': " + errStr);
572  }
573 
574 }
575 
576 void initializeUDPTransports(bool serverFlag,
577  BlockingUDPTransportVector& udpTransports,
578  const IfaceNodeVector& ifaceList,
579  const ResponseHandler::shared_pointer& responseHandler,
580  BlockingUDPTransport::shared_pointer& sendTransport,
581  int32& listenPort,
582  bool autoAddressList,
583  const std::string& addressList,
584  const std::string& ignoreAddressList)
585 {
586  BlockingUDPConnector connector(serverFlag);
587 
588  const int8_t protoVer = serverFlag ? PVA_SERVER_PROTOCOL_REVISION : PVA_CLIENT_PROTOCOL_REVISION;
589 
590  //
591  // Create UDP transport for sending (to all network interfaces)
592  //
593 
594  osiSockAddr anyAddress;
595  memset(&anyAddress, 0, sizeof(anyAddress));
596  anyAddress.ia.sin_family = AF_INET;
597  anyAddress.ia.sin_port = htons(0);
598  anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
599 
600  sendTransport = connector.connect(responseHandler, anyAddress, protoVer);
601  if (!sendTransport)
602  {
603  THROW_BASE_EXCEPTION("Failed to initialize UDP transport.");
604  }
605 
606  // to allow automatic assignment of listen port (for testing)
607  if (listenPort == 0)
608  {
609  listenPort = ntohs(sendTransport->getRemoteAddress().ia.sin_port);
610  LOG(logLevelDebug, "Dynamic listen UDP port set to %u.", (unsigned)listenPort);
611  }
612 
613  // TODO current implementation shares the port (aka beacon and search port)
614  int32 sendPort = listenPort;
615 
616  //
617  // compile auto address list - where to send packets
618  //
619 
620  InetAddrVector autoBCastAddr;
621  for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
622  {
623  ifaceNode node = *iter;
624 
625  // in practice, interface will have either destination (PPP)
626  // or broadcast, but never both.
627  if (node.validP2P && node.peer.ia.sin_family != AF_UNSPEC)
628  {
629  node.peer.ia.sin_port = htons(sendPort);
630  autoBCastAddr.push_back(node.peer);
631  }
632  if (node.validBcast && node.bcast.ia.sin_family != AF_UNSPEC)
633  {
634  node.bcast.ia.sin_port = htons(sendPort);
635  autoBCastAddr.push_back(node.bcast);
636  }
637  }
638 
639  //
640  // set send address list
641  //
642  {
643  InetAddrVector list;
644  getSocketAddressList(list, addressList, sendPort, autoAddressList ? &autoBCastAddr : NULL);
645 
646  // avoid duplicates in address list
647  {
648  InetAddrVector dedup;
649 
650  for (InetAddrVector::const_iterator iter = list.begin(); iter != list.end(); iter++)
651  {
652  bool match = false;
653 
654  for(InetAddrVector::const_iterator inner = dedup.begin(); !match && inner != dedup.end(); inner++)
655  {
656  match = iter->ia.sin_family==inner->ia.sin_family && iter->ia.sin_addr.s_addr==inner->ia.sin_addr.s_addr;
657  }
658 
659  if(!match)
660  dedup.push_back(*iter);
661  }
662  list.swap(dedup);
663  }
664 
665  std::vector<bool> isunicast(list.size());
666 
667  if (list.empty()) {
669  "No %s broadcast addresses found or specified - empty address list!", serverFlag ? "server" : "client");
670  }
671 
672  for (size_t i = 0; i < list.size(); i++) {
673 
674  isunicast[i] = !isMulticastAddress(&list[i]);
675 
676  for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); isunicast[i] && iter != ifaceList.end(); iter++)
677  {
678  ifaceNode node = *iter;
679  // compare with all iface bcasts
680  if(node.validBcast && list[i].ia.sin_family==iter->bcast.ia.sin_family
681  && list[i].ia.sin_addr.s_addr==iter->bcast.ia.sin_addr.s_addr) {
682  isunicast[i] = false;
683  }
684  }
686  "Broadcast address #%zu: %s. (%sunicast)", i, inetAddressToString(list[i]).c_str(),
687  isunicast[i]?"":"not ");
688  }
689 
690  sendTransport->setSendAddresses(list, isunicast);
691  }
692 
693  sendTransport->start();
694  udpTransports.push_back(sendTransport);
695 
696  // TODO configurable local NIF, address
697  osiSockAddr loAddr;
698  memset(&loAddr, 0, sizeof(loAddr));
699  loAddr.ia.sin_family = AF_INET;
700  loAddr.ia.sin_port = ntohs(0);
701  loAddr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
702 
703  // TODO configurable local multicast address
704  std::string mcastAddress("224.0.0.128");
705 
706  osiSockAddr group;
707  aToIPAddr(mcastAddress.c_str(), listenPort, &group.ia);
708 
709  //
710  // set ignore address list
711  //
712  InetAddrVector ignoreAddressVector;
713  getSocketAddressList(ignoreAddressVector, ignoreAddressList, 0, 0);
714 
715  //
716  // Setup UDP trasport(s) (per interface)
717  //
718 
719  InetAddrVector tappedNIF;
720 
721  for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
722  {
723  ifaceNode node = *iter;
724 
725  LOG(logLevelDebug, "Setting up UDP for interface %s/%s, broadcast %s, dest %s.",
726  inetAddressToString(node.addr, false).c_str(),
727  node.validBcast ? inetAddressToString(node.mask, false).c_str() : "<none>",
728  node.validBcast ? inetAddressToString(node.bcast, false).c_str() : "<none>",
729  node.validP2P ? inetAddressToString(node.peer, false).c_str() : "<none>");
730  try
731  {
732  // where to bind (listen) address
733  osiSockAddr listenLocalAddress;
734  memset(&listenLocalAddress, 0, sizeof(listenLocalAddress));
735  listenLocalAddress.ia.sin_family = AF_INET;
736  listenLocalAddress.ia.sin_port = htons(listenPort);
737  listenLocalAddress.ia.sin_addr.s_addr = node.addr.ia.sin_addr.s_addr;
738 
739  BlockingUDPTransport::shared_pointer transport = connector.connect(
740  responseHandler, listenLocalAddress, protoVer);
741  if (!transport)
742  continue;
743  listenLocalAddress = transport->getRemoteAddress();
744 
745  transport->setIgnoredAddresses(ignoreAddressVector);
746 
747  tappedNIF.push_back(listenLocalAddress);
748 
749 
750  BlockingUDPTransport::shared_pointer transport2;
751 
752  if(!node.validBcast || node.bcast.sa.sa_family != AF_INET ||
753  node.bcast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
754  // warning if not point-to-point
755  LOG(node.bcast.sa.sa_family != AF_INET ? logLevelDebug : logLevelWarn,
756  "Unable to find broadcast address of interface %s.", inetAddressToString(node.addr, false).c_str());
757  }
758 #if !defined(_WIN32)
759  else
760  {
761  /* An oddness of BSD sockets (not winsock) is that binding to
762  * INADDR_ANY will receive unicast and broadcast, but binding to
763  * a specific interface address receives only unicast. The trick
764  * is to bind a second socket to the interface broadcast address,
765  * which will then receive only broadcasts.
766  */
767 
768  osiSockAddr bcastAddress;
769  memset(&bcastAddress, 0, sizeof(bcastAddress));
770  bcastAddress.ia.sin_family = AF_INET;
771  bcastAddress.ia.sin_port = htons(listenPort);
772  bcastAddress.ia.sin_addr.s_addr = node.bcast.ia.sin_addr.s_addr;
773 
774  transport2 = connector.connect(responseHandler, bcastAddress, protoVer);
775  if (transport2)
776  {
777  /* The other wrinkle is that nothing should be sent from this second
778  * socket. So replies are made through the unicast socket.
779  *
780  transport2->setReplyTransport(transport);
781  */
782  // NOTE: search responses all always send from sendTransport
783 
784  transport2->setIgnoredAddresses(ignoreAddressVector);
785 
786  tappedNIF.push_back(bcastAddress);
787  }
788  }
789 #endif
790 
791  transport->setMutlicastNIF(loAddr, true);
792  transport->setLocalMulticastAddress(group);
793 
794  transport->start();
795  udpTransports.push_back(transport);
796 
797  if (transport2)
798  {
799  transport2->start();
800  udpTransports.push_back(transport2);
801  }
802  }
803  catch (std::exception& e)
804  {
805  THROW_BASE_EXCEPTION_CAUSE("Failed to initialize UDP transport.", e);
806  }
807  catch (...)
808  {
809  THROW_BASE_EXCEPTION("Failed to initialize UDP transport.");
810  }
811  }
812 
813 
814  //
815  // Setup local multicasting
816  //
817 
818  // WIN32 do not allow binding to multicast address, use any address w/ port
819 #if defined(_WIN32)
820  anyAddress.ia.sin_port = htons(listenPort);
821 #endif
822 
823  BlockingUDPTransport::shared_pointer localMulticastTransport;
824  try
825  {
826  // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
827  localMulticastTransport = connector.connect(
828  responseHandler,
829 #if !defined(_WIN32)
830  group,
831 #else
832  anyAddress,
833 #endif
834  protoVer);
835  if (!localMulticastTransport)
836  throw std::runtime_error("Failed to bind UDP socket.");
837 
838  localMulticastTransport->setTappedNIF(tappedNIF);
839  localMulticastTransport->join(group, loAddr);
840  localMulticastTransport->start();
841  udpTransports.push_back(localMulticastTransport);
842 
843  LOG(logLevelDebug, "Local multicast enabled on %s/%s.",
844  inetAddressToString(loAddr, false).c_str(),
845  inetAddressToString(group).c_str());
846  }
847  catch (std::exception& ex)
848  {
849  LOG(logLevelDebug, "Failed to initialize local multicast, functionality disabled. Reason: %s.", ex.what());
850  }
851 }
852 
853 
854 }
855 }
int8_t int8
Definition: pvType.h:75
#define SOCK_ECONNREFUSED
Definition: osdSock.h:58
#define INADDR_LOOPBACK
Definition: osdSock.h:76
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
unsigned epicsStdCall sockAddrToDottedIP(const struct sockaddr *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:118
LIBCOM_API int epicsStdCall aToIPAddr(const char *pAddrString, unsigned short defaultPort, struct sockaddr_in *pIP)
Definition: aToIPAddr.c:78
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
EPICS_ALWAYS_INLINE int8 getByte()
Definition: byteBuffer.h:617
epicsMutexId lock
Definition: osiClockTime.c:37
pvd::Status status
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
bool validBcast
true if bcast and mask have been set
int osiSocklen_t
Definition: osdSock.h:36
int i
Definition: scan.c:967
const char * getBuffer() const
Definition: byteBuffer.h:294
struct sockaddr sa
Definition: osiSock.h:158
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
struct sockaddr_in ia
Definition: osiSock.h:157
#define RECEIVE_BUFFER_PRE_RESERVE
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:788
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
std::size_t getLimit() const
Definition: byteBuffer.h:368
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
Definition: memory.hpp:41
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
BlockingUDPTransport::shared_pointer connect(ResponseHandler::shared_pointer const &responseHandler, osiSockAddr &bindAddress, epics::pvData::int8 transportRevision)
A lock for multithreading.
Definition: lock.h:36
#define NULL
Definition: catime.c:38
osiSockAddr mask
Net mask.
osiSockAddr peer
point to point peer
std::size_t getPosition() const
Definition: byteBuffer.h:346
epicsSocketSystemCallInterruptMechanismQueryInfo
Definition: osiSock.h:47
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
osiSockAddr addr
Our address.
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
std::vector< BlockingUDPTransport::shared_pointer > BlockingUDPTransportVector
Definition: blockingUDP.h:403
EPICS_ALWAYS_INLINE int32 getInt()
Definition: byteBuffer.h:629
#define SOCK_ECONNRESET
Definition: osdSock.h:53
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
pvData
Definition: monitor.h:428
std::vector< osiSockAddr > InetAddrVector
#define likely(x)
Definition: likely.h:14
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
std::vector< ifaceNode > IfaceNodeVector
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
HexDump & limit(size_t n=(size_t)-1)
safety limit on max bytes printed
Definition: hexDump.h:44
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
int SOCKET
Definition: osdSock.h:31
std::size_t getRemaining() const
Definition: byteBuffer.h:391
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
void initializeUDPTransports(bool serverFlag, BlockingUDPTransportVector &udpTransports, const IfaceNodeVector &ifaceList, const ResponseHandler::shared_pointer &responseHandler, BlockingUDPTransport::shared_pointer &sendTransport, int32 &listenPort, bool autoAddressList, const std::string &addressList, const std::string &ignoreAddressList)
bool decodeAsIPv6Address(ByteBuffer *buffer, osiSockAddr *address)
bool isMulticastAddress(const osiSockAddr *address)
#define unlikely(x)
Definition: likely.h:15
#define SOCKERRNO
Definition: osdSock.h:33
#define MAX_UDP_RECV
Definition: caProto.h:61
#define THROW_BASE_EXCEPTION(msg)
void getSocketAddressList(InetAddrVector &ret, const std::string &list, int defaultPort, const InetAddrVector *appendList)
#define SOCK_EINTR
Definition: osdSock.h:64
void setEndianess(int byteOrder)
Definition: byteBuffer.h:285
bool validP2P
true if peer has been set.
bool pvAccessIsLoggable(pvAccessLogLevel level)
Definition: logger.cpp:64
int16_t int16
Definition: pvType.h:79
#define EPICS_BYTE_ORDER
Definition: osdWireConfig.h:16
C++ and C descriptions for a thread.
#define THROW_BASE_EXCEPTION_CAUSE(msg, cause)
osiSockAddr bcast
sub-net broadcast address
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
#define SHUT_RDWR
Definition: osdSock.h:48
#define IS_LOGGABLE(level)
Definition: logger.h:51
#define SOCK_EWOULDBLOCK
Definition: osdSock.h:51
#define EPICS_ENDIAN_LITTLE
Definition: epicsEndian.h:15