14 #include <sys/types.h> 25 #define epicsExportSharedSymbols 41 inline int sendto(
int s,
const char *buf,
size_t len,
int flags,
const struct sockaddr *to,
int tolen)
43 return ::sendto(s, const_cast<char*>(buf), len, flags, const_cast<struct sockaddr *>(to), tolen);
48 #define RECEIVE_BUFFER_PRE_RESERVE (PVA_MESSAGE_HEADER_SIZE + 16) 50 size_t BlockingUDPTransport::num_instances;
52 BlockingUDPTransport::BlockingUDPTransport(
bool serverFlag,
53 ResponseHandler::shared_pointer
const & responseHandler,
SOCKET channel,
57 _responseHandler(responseHandler),
59 _bindAddress(bindAddress),
63 _sendToEnabled(
false),
64 _localMulticastAddressEnabled(
false),
67 _lastMessageStartPosition(0),
68 _clientServerWithEndianFlag(
71 assert(_responseHandler.get());
75 int retval = ::getsockname(_channel, &_remoteAddress.sa, &sockLen);
78 _remoteAddress = _bindAddress;
83 _remoteName =
"<unknown>:0";
87 _remoteName = strBuffer;
92 REFTRACE_INCREMENT(num_instances);
95 BlockingUDPTransport::~BlockingUDPTransport() {
96 REFTRACE_DECREMENT(num_instances);
101 void BlockingUDPTransport::start() {
110 _thread.reset(
new epicsThread(*
this, threadName.c_str(),
116 void BlockingUDPTransport::close() {
120 void BlockingUDPTransport::ensureData(std::size_t size) {
121 if (_receiveBuffer.getRemaining() >= size)
123 std::ostringstream msg;
124 msg<<
"no more data in UDP packet : " 125 <<_receiveBuffer.getPosition()<<
":"<<_receiveBuffer.getLimit()
127 throw std::underflow_error(msg.str());
130 void BlockingUDPTransport::close(
bool waitForThreadToComplete) {
133 if(_closed.get())
return;
140 "UDP socket %s closed.",
175 if (_thread.get() && waitForThreadToComplete)
177 if (!_thread->exitWait(5.0))
180 "Receive thread for UDP socket %s has not exited.",
186 void BlockingUDPTransport::enqueueSendRequest(TransportSender::shared_pointer
const & sender) {
189 _sendToEnabled =
false;
193 sender->send(&_sendBuffer,
this);
199 send(&_sendBuffer, _sendTo);
203 void BlockingUDPTransport::flushSendQueue()
208 void BlockingUDPTransport::startMessage(
int8 command,
size_t ,
int32 payloadSize) {
209 _lastMessageStartPosition = _sendBuffer.getPosition();
212 _sendBuffer.putByte(_clientServerWithEndianFlag);
213 _sendBuffer.putByte(command);
214 _sendBuffer.putInt(payloadSize);
217 void BlockingUDPTransport::endMessage() {
219 _lastMessageStartPosition+(
sizeof(
int16)+2),
223 void BlockingUDPTransport::run() {
229 Transport::shared_pointer thisTransport(internal_this);
235 while(!_closed.get())
237 int bytesRead = recvfrom(_channel,
238 recvfrom_buffer_start, recvfrom_buffer_len,
239 0, (sockaddr*)&fromAddress,
242 if(
likely(bytesRead>=0)) {
244 atomic::add(_totalBytesRecv, bytesRead);
246 for(
size_t i = 0;
i <_ignoredAddresses.size();
i++)
248 if(_ignoredAddresses[
i].ia.sin_addr.s_addr==fromAddress.
ia.sin_addr.s_addr)
254 LOG(
logLevelDebug,
"UDP Ignore (%d) %s x- %s", bytesRead, _remoteName.c_str(), strBuffer);
264 LOG(
logLevelDebug,
"UDP %s Rx (%d) %s <- %s", (_clientServerWithEndianFlag&0x40)?
"Server":
"Client", bytesRead, _remoteName.c_str(), strBuffer);
271 processBuffer(thisTransport, fromAddress, &_receiveBuffer);
272 }
catch(std::exception& e) {
276 size_t epos = _receiveBuffer.getPosition();
282 std::cerr<<
"Error on UDP RX "<<strBuffer<<
" -> "<<_remoteName<<
" at "<<epos<<
" : "<<e.what()<<
"\n" 293 socketError == EAGAIN ||
328 bool BlockingUDPTransport::processBuffer(Transport::shared_pointer
const & transport,
365 size_t payloadSize = receiveBuffer->
getInt();
371 size_t nextRequestPosition = receiveBuffer->
getPosition() + payloadSize;
381 if (!_tappedNIF.empty())
385 memset(&originNIFAddress, 0,
sizeof(originNIFAddress));
389 originNIFAddress.
ia.sin_family = AF_INET;
398 if (originNIFAddress.
ia.sin_addr.s_addr != htonl(INADDR_ANY))
401 for(
size_t i = 0;
i < _tappedNIF.size();
i++)
403 if(_tappedNIF[
i].ia.sin_addr.s_addr == originNIFAddress.
ia.sin_addr.s_addr)
420 _responseHandler->handleResponse(&fromAddress, transport,
421 version, command, payloadSize,
433 bool BlockingUDPTransport::send(
const char* buffer,
size_t length,
const osiSockAddr& address)
441 int retval = sendto(_channel, buffer,
442 length, 0, &(address.
sa),
sizeof(sockaddr));
451 atomic::add(_totalBytesSent, length);
466 int retval = sendto(_channel, buffer->
getBuffer(),
467 buffer->
getLimit(), 0, &(address.
sa),
sizeof(sockaddr));
476 atomic::add(_totalBytesSent, buffer->
getLimit());
485 if(_sendAddresses.empty())
return false;
490 for(
size_t i = 0;
i<_sendAddresses.size();
i++) {
504 int retval = sendto(_channel, buffer->
getBuffer(),
505 buffer->
getLimit(), 0, &(_sendAddresses[
i].sa),
515 atomic::add(_totalBytesSent, buffer->
getLimit());
527 struct ip_mreq imreq;
528 memset(&imreq, 0,
sizeof(
struct ip_mreq));
530 imreq.imr_multiaddr.s_addr = mcastAddr.
ia.sin_addr.s_addr;
531 imreq.imr_interface.s_addr = nifAddr.
ia.sin_addr.s_addr;
534 int status = ::setsockopt(_channel, IPPROTO_IP, IP_ADD_MEMBERSHIP,
535 (
char*)&imreq,
sizeof(
struct ip_mreq));
540 throw std::runtime_error(
541 string(
"Failed to join to the multicast group '") +
547 void BlockingUDPTransport::setMutlicastNIF(
const osiSockAddr & nifAddr,
bool loopback)
550 int status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_IF,
551 (
char*)&nifAddr.
ia.sin_addr,
sizeof(
struct in_addr));
556 throw std::runtime_error(
557 string(
"Failed to set multicast network interface '") +
562 unsigned char mcast_loop = (loopback ? 1 : 0);
563 status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_LOOP,
564 (
char*)&mcast_loop,
sizeof(
unsigned char));
569 throw std::runtime_error(
570 string(
"Failed to enable multicast loopback on network interface '") +
579 const ResponseHandler::shared_pointer& responseHandler,
580 BlockingUDPTransport::shared_pointer& sendTransport,
582 bool autoAddressList,
583 const std::string& addressList,
584 const std::string& ignoreAddressList)
595 memset(&anyAddress, 0,
sizeof(anyAddress));
596 anyAddress.
ia.sin_family = AF_INET;
597 anyAddress.
ia.sin_port = htons(0);
598 anyAddress.
ia.sin_addr.s_addr = htonl(INADDR_ANY);
600 sendTransport = connector.
connect(responseHandler, anyAddress, protoVer);
609 listenPort = ntohs(sendTransport->getRemoteAddress().ia.sin_port);
610 LOG(
logLevelDebug,
"Dynamic listen UDP port set to %u.", (
unsigned)listenPort);
614 int32 sendPort = listenPort;
621 for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
629 node.
peer.
ia.sin_port = htons(sendPort);
630 autoBCastAddr.push_back(node.
peer);
634 node.
bcast.
ia.sin_port = htons(sendPort);
635 autoBCastAddr.push_back(node.
bcast);
650 for (InetAddrVector::const_iterator iter = list.begin(); iter != list.end(); iter++)
654 for(InetAddrVector::const_iterator inner = dedup.begin(); !match && inner != dedup.end(); inner++)
656 match = iter->ia.sin_family==inner->ia.sin_family && iter->ia.sin_addr.s_addr==inner->ia.sin_addr.s_addr;
660 dedup.push_back(*iter);
665 std::vector<bool> isunicast(list.size());
669 "No %s broadcast addresses found or specified - empty address list!", serverFlag ?
"server" :
"client");
672 for (
size_t i = 0;
i < list.size();
i++) {
676 for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); isunicast[
i] && iter != ifaceList.end(); iter++)
680 if(node.
validBcast && list[i].ia.sin_family==iter->bcast.ia.sin_family
681 && list[i].ia.sin_addr.s_addr==iter->bcast.ia.sin_addr.s_addr) {
682 isunicast[
i] =
false;
687 isunicast[i]?
"":
"not ");
690 sendTransport->setSendAddresses(list, isunicast);
693 sendTransport->start();
694 udpTransports.push_back(sendTransport);
698 memset(&loAddr, 0,
sizeof(loAddr));
699 loAddr.
ia.sin_family = AF_INET;
700 loAddr.
ia.sin_port = ntohs(0);
704 std::string mcastAddress(
"224.0.0.128");
707 aToIPAddr(mcastAddress.c_str(), listenPort, &group.
ia);
721 for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
725 LOG(
logLevelDebug,
"Setting up UDP for interface %s/%s, broadcast %s, dest %s.",
734 memset(&listenLocalAddress, 0,
sizeof(listenLocalAddress));
735 listenLocalAddress.
ia.sin_family = AF_INET;
736 listenLocalAddress.
ia.sin_port = htons(listenPort);
737 listenLocalAddress.
ia.sin_addr.s_addr = node.
addr.
ia.sin_addr.s_addr;
739 BlockingUDPTransport::shared_pointer transport = connector.
connect(
740 responseHandler, listenLocalAddress, protoVer);
743 listenLocalAddress = transport->getRemoteAddress();
745 transport->setIgnoredAddresses(ignoreAddressVector);
747 tappedNIF.push_back(listenLocalAddress);
750 BlockingUDPTransport::shared_pointer transport2;
753 node.
bcast.
ia.sin_addr.s_addr == listenLocalAddress.
ia.sin_addr.s_addr) {
769 memset(&bcastAddress, 0,
sizeof(bcastAddress));
770 bcastAddress.
ia.sin_family = AF_INET;
771 bcastAddress.
ia.sin_port = htons(listenPort);
772 bcastAddress.
ia.sin_addr.s_addr = node.
bcast.
ia.sin_addr.s_addr;
774 transport2 = connector.
connect(responseHandler, bcastAddress, protoVer);
784 transport2->setIgnoredAddresses(ignoreAddressVector);
786 tappedNIF.push_back(bcastAddress);
791 transport->setMutlicastNIF(loAddr,
true);
792 transport->setLocalMulticastAddress(group);
795 udpTransports.push_back(transport);
800 udpTransports.push_back(transport2);
803 catch (std::exception& e)
820 anyAddress.
ia.sin_port = htons(listenPort);
823 BlockingUDPTransport::shared_pointer localMulticastTransport;
827 localMulticastTransport = connector.
connect(
835 if (!localMulticastTransport)
836 throw std::runtime_error(
"Failed to bind UDP socket.");
838 localMulticastTransport->setTappedNIF(tappedNIF);
839 localMulticastTransport->join(group, loAddr);
840 localMulticastTransport->start();
841 udpTransports.push_back(localMulticastTransport);
847 catch (std::exception& ex)
849 LOG(
logLevelDebug,
"Failed to initialize local multicast, functionality disabled. Reason: %s.", ex.what());
#define SOCK_ECONNREFUSED
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
unsigned epicsStdCall sockAddrToDottedIP(const struct sockaddr *paddr, char *pBuf, unsigned bufSize)
LIBCOM_API int epicsStdCall aToIPAddr(const char *pAddrString, unsigned short defaultPort, struct sockaddr_in *pIP)
#define assert(exp)
Declare that a condition should be true.
EPICS_ALWAYS_INLINE int8 getByte()
#define epicsThreadPriorityMedium
bool validBcast
true if bcast and mask have been set
const char * getBuffer() const
const epics::pvData::int8 PVA_MAGIC
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
#define RECEIVE_BUFFER_PRE_RESERVE
shared_ptr< T > static_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
std::size_t getLimit() const
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
TODO only here because of the Lockable.
BlockingUDPTransport::shared_pointer connect(ResponseHandler::shared_pointer const &responseHandler, osiSockAddr &bindAddress, epics::pvData::int8 transportRevision)
A lock for multithreading.
osiSockAddr mask
Net mask.
osiSockAddr peer
point to point peer
std::size_t getPosition() const
epicsSocketSystemCallInterruptMechanismQueryInfo
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
osiSockAddr addr
Our address.
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
std::vector< BlockingUDPTransport::shared_pointer > BlockingUDPTransportVector
EPICS_ALWAYS_INLINE int32 getInt()
#define LOG(level, format,...)
void setPosition(std::size_t pos)
std::vector< osiSockAddr > InetAddrVector
std::vector< ifaceNode > IfaceNodeVector
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
HexDump & limit(size_t n=(size_t)-1)
safety limit on max bytes printed
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
std::size_t getRemaining() const
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
void initializeUDPTransports(bool serverFlag, BlockingUDPTransportVector &udpTransports, const IfaceNodeVector &ifaceList, const ResponseHandler::shared_pointer &responseHandler, BlockingUDPTransport::shared_pointer &sendTransport, int32 &listenPort, bool autoAddressList, const std::string &addressList, const std::string &ignoreAddressList)
bool decodeAsIPv6Address(ByteBuffer *buffer, osiSockAddr *address)
bool isMulticastAddress(const osiSockAddr *address)
#define THROW_BASE_EXCEPTION(msg)
void getSocketAddressList(InetAddrVector &ret, const std::string &list, int defaultPort, const InetAddrVector *appendList)
void setEndianess(int byteOrder)
bool validP2P
true if peer has been set.
bool pvAccessIsLoggable(pvAccessLogLevel level)
C++ and C descriptions for a thread.
#define THROW_BASE_EXCEPTION_CAUSE(msg, cause)
osiSockAddr bcast
sub-net broadcast address
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
#define IS_LOGGABLE(level)
#define EPICS_ENDIAN_LITTLE