This is Unofficial EPICS BASE Doxygen Site
blockingTCPConnector.cpp
Go to the documentation of this file.
1 
7 #include <sstream>
8 #include <sys/types.h>
9 
10 #include <osiSock.h>
11 #include <epicsThread.h>
12 
13 #define epicsExportSharedSymbols
14 #include <pv/blockingTCP.h>
15 #include <pv/remote.h>
16 #include <pv/logger.h>
17 #include <pv/codec.h>
18 
19 using namespace epics::pvData;
20 
21 namespace epics {
22 namespace pvAccess {
23 
24 BlockingTCPConnector::BlockingTCPConnector(
25  Context::shared_pointer const & context,
26  int receiveBufferSize,
27  float heartbeatInterval) :
28  _context(context),
29  _receiveBufferSize(receiveBufferSize),
30  _heartbeatInterval(heartbeatInterval)
31 {
32 }
33 
34 SOCKET BlockingTCPConnector::tryConnect(osiSockAddr& address, int tries) {
35 
36  char strBuffer[24];
37  ipAddrToDottedIP(&address.ia, strBuffer, sizeof(strBuffer));
38 
39  for(int tryCount = 0; tryCount<tries; tryCount++) {
40 
42  "Opening socket to PVA server %s, attempt %d.",
43  strBuffer, tryCount+1);
44 
45  SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
46  if (socket == INVALID_SOCKET)
47  {
48  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
49  std::ostringstream temp;
50  temp<<"Socket create error: "<<strBuffer;
51  THROW_EXCEPTION2(std::runtime_error, temp.str());
52  }
53  else {
54  // TODO: use non-blocking connect() to have controllable timeout
55  if(::connect(socket, &address.sa, sizeof(sockaddr))==0) {
56  return socket;
57  }
58  else {
59  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
60  char saddr[32];
61  sockAddrToDottedIP(&address.sa, saddr, sizeof(saddr));
62  epicsSocketDestroy (socket);
63  std::ostringstream temp;
64  temp<<"error connecting to "<<saddr<<" : "<<strBuffer;
65  throw std::runtime_error(temp.str());
66  }
67  }
68  }
69  return INVALID_SOCKET;
70 }
71 
72 Transport::shared_pointer BlockingTCPConnector::connect(std::tr1::shared_ptr<ClientChannelImpl> const & client,
73  ResponseHandler::shared_pointer const & responseHandler, osiSockAddr& address,
74  int8 transportRevision, int16 priority) {
75 
76  SOCKET socket = INVALID_SOCKET;
77 
78  char ipAddrStr[24];
79  ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr));
80 
81  Context::shared_pointer context = _context.lock();
82 
83  TransportRegistry::Reservation rsvp(context->getTransportRegistry(),
84  address, priority);
85  // we are now blocking any connect() to this destination (address and prio)
86  // concurrent connect() to other destination is allowed.
87  // This prevents us from opening duplicate connections.
88 
89  Transport::shared_pointer transport = context->getTransportRegistry()->get(address, priority);
90  if(transport.get()) {
92  "Reusing existing connection to PVA server: %s.",
93  ipAddrStr);
94  if (transport->acquire(client))
95  return transport;
96  }
97 
98  try {
99  LOG(logLevelDebug, "Connecting to PVA server: %s.", ipAddrStr);
100 
101  socket = tryConnect(address, 3);
102 
103  LOG(logLevelDebug, "Socket connected to PVA server: %s.", ipAddrStr);
104 
105  // enable TCP_NODELAY (disable Nagle's algorithm)
106  int optval = 1; // true
107  int retval = ::setsockopt(socket, IPPROTO_TCP, TCP_NODELAY,
108  (char *)&optval, sizeof(int));
109  if(retval<0) {
110  char errStr[64];
111  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
112  LOG(logLevelWarn, "Error setting TCP_NODELAY: %s.", errStr);
113  }
114 
115  // enable TCP_KEEPALIVE
116  retval = ::setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE,
117  (char *)&optval, sizeof(int));
118  if(retval<0)
119  {
120  char errStr[64];
121  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
122  LOG(logLevelWarn, "Error setting SO_KEEPALIVE: %s.", errStr);
123  }
124 
125  // TODO tune buffer sizes?! Win32 defaults are 8k, which is OK
126 
127  // create transport
128  // TODO introduce factory
129  // get TCP send buffer size
130  osiSocklen_t intLen = sizeof(int);
131  int _socketSendBufferSize;
132  retval = getsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
133  if(retval<0) {
134  char strBuffer[64];
135  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
136  LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer);
137  }
138 
139  // create() also adds to context connection pool _context->getTransportRegistry()
141  context, socket, responseHandler, _receiveBufferSize, _socketSendBufferSize,
142  client, transportRevision, _heartbeatInterval, priority);
143 
144  // verify
145  if(!transport->verify(5000)) {
146  LOG(
148  "Connection to PVA server %s failed to be validated, closing it.",
149  ipAddrStr);
150 
151  std::ostringstream temp;
152  temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
153  THROW_BASE_EXCEPTION(temp.str().c_str());
154  }
155 
156  LOG(logLevelDebug, "Connected to PVA server: %s.", ipAddrStr);
157 
158  return transport;
159  } catch(std::exception&) {
160  if(transport.get())
161  transport->close();
162  else if(socket!=INVALID_SOCKET)
163  epicsSocketDestroy(socket);
164  throw;
165  }
166 }
167 
168 }
169 }
int8_t int8
Definition: pvType.h:75
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
unsigned epicsStdCall sockAddrToDottedIP(const struct sockaddr *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:118
static shared_pointer create(Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, int32_t sendBufferSize, int32_t receiveBufferSize, std::tr1::shared_ptr< ClientChannelImpl > const &client, int8_t remoteTransportRevision, float heartbeatInterval, int16_t priority)
Definition: codec.h:579
#define INVALID_SOCKET
Definition: osdSock.h:32
#define THROW_EXCEPTION2(TYPE, MSG)
int osiSocklen_t
Definition: osdSock.h:36
Transport::shared_pointer connect(std::tr1::shared_ptr< ClientChannelImpl > const &client, ResponseHandler::shared_pointer const &responseHandler, osiSockAddr &address, epics::pvData::int8 transportRevision, epics::pvData::int16 priority)
struct sockaddr sa
Definition: osiSock.h:158
struct sockaddr_in ia
Definition: osiSock.h:157
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
#define LOG(level, format,...)
Definition: logger.h:48
pvData
Definition: monitor.h:428
Definition: server.h:76
int SOCKET
Definition: osdSock.h:31
#define THROW_BASE_EXCEPTION(msg)
int16_t int16
Definition: pvType.h:79
C++ and C descriptions for a thread.
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144