This is Unofficial EPICS BASE Doxygen Site
blockingTCPAcceptor.cpp
Go to the documentation of this file.
1 
7 #include <sstream>
8 
9 #include <epicsThread.h>
10 #include <osiSock.h>
11 
12 #include <pv/epicsException.h>
13 
14 #define epicsExportSharedSymbols
15 #include <pv/blockingTCP.h>
16 #include <pv/codec.h>
17 #include <pv/remote.h>
18 #include <pv/logger.h>
19 
20 using std::ostringstream;
21 using namespace epics::pvData;
22 
23 namespace epics {
24 namespace pvAccess {
25 
26 BlockingTCPAcceptor::BlockingTCPAcceptor(Context::shared_pointer const & context,
27  ResponseHandler::shared_pointer const & responseHandler,
28  const osiSockAddr& addr, int receiveBufferSize) :
29  _context(context),
30  _responseHandler(responseHandler),
31  _bindAddress(),
32  _serverSocketChannel(INVALID_SOCKET),
33  _receiveBufferSize(receiveBufferSize),
34  _destroyed(false),
35  _thread(*this, "TCP-acceptor",
39 {
40  _bindAddress = addr;
41  initialize();
42 }
43 
45  destroy();
46 }
47 
48 int BlockingTCPAcceptor::initialize() {
49 
50  char ipAddrStr[24];
51  ipAddrToDottedIP(&_bindAddress.ia, ipAddrStr, sizeof(ipAddrStr));
52 
53  int tryCount = 0;
54  while(tryCount<2) {
55  char strBuffer[64];
56 
57  LOG(logLevelDebug, "Creating acceptor to %s.", ipAddrStr);
58 
59  _serverSocketChannel = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
60  if(_serverSocketChannel==INVALID_SOCKET) {
61  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
62  ostringstream temp;
63  temp<<"Socket create error: "<<strBuffer;
64  LOG(logLevelError, "%s", temp.str().c_str());
65  THROW_BASE_EXCEPTION(temp.str().c_str());
66  }
67  else {
68 
69  //epicsSocketEnableAddressReuseDuringTimeWaitState(_serverSocketChannel);
70 
71  // try to bind
72  int retval = ::bind(_serverSocketChannel, &_bindAddress.sa, sizeof(sockaddr));
73  if(retval<0) {
74  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
75  LOG(logLevelDebug, "Socket bind error: %s.", strBuffer);
76  if(_bindAddress.ia.sin_port!=0) {
77  // failed to bind to specified bind address,
78  // try to get port dynamically, but only once
79  LOG(
81  "Configured TCP port %d is unavailable, trying to assign it dynamically.",
82  ntohs(_bindAddress.ia.sin_port));
83  _bindAddress.ia.sin_port = htons(0);
84  }
85  else {
86  epicsSocketDestroy(_serverSocketChannel);
87  break; // exit while loop
88  }
89  }
90  else { // if(retval<0)
91  // bind succeeded
92 
93  // update bind address, if dynamically port selection was used
94  if(ntohs(_bindAddress.ia.sin_port)==0) {
95  osiSocklen_t sockLen = sizeof(sockaddr);
96  // read the actual socket info
97  retval = ::getsockname(_serverSocketChannel, &_bindAddress.sa, &sockLen);
98  if(retval<0) {
99  // error obtaining port number
100  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
101  LOG(logLevelDebug, "getsockname error: %s", strBuffer);
102  }
103  else {
104  LOG(
105  logLevelInfo,
106  "Using dynamically assigned TCP port %d.",
107  ntohs(_bindAddress.ia.sin_port));
108  }
109  }
110 
111  retval = ::listen(_serverSocketChannel, 4);
112  if(retval<0) {
113  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
114  ostringstream temp;
115  temp<<"Socket listen error: "<<strBuffer;
116  LOG(logLevelError, "%s", temp.str().c_str());
117  THROW_BASE_EXCEPTION(temp.str().c_str());
118  }
119 
120  _thread.start();
121 
122  // all OK, return
123  return ntohs(_bindAddress.ia.sin_port);
124  } // successful bind
125  } // successfully obtained socket
126  tryCount++;
127  } // while
128 
129  ostringstream temp;
130  temp<<"Failed to create acceptor to "<<ipAddrStr;
131  THROW_BASE_EXCEPTION(temp.str().c_str());
132 }
133 
134 void BlockingTCPAcceptor::run() {
135  // rise level if port is assigned dynamically
136  char ipAddrStr[24];
137  ipAddrToDottedIP(&_bindAddress.ia, ipAddrStr, sizeof(ipAddrStr));
138  LOG(logLevelDebug, "Accepting connections at %s.", ipAddrStr);
139 
140  bool socketOpen = true;
141  char strBuffer[64];
142 
143  while(socketOpen) {
144 
145  SOCKET sock;
146  {
147  Lock guard(_mutex);
148  if (_destroyed)
149  break;
150  sock = _serverSocketChannel;
151  }
152 
153  osiSockAddr address;
154  osiSocklen_t len = sizeof(sockaddr);
155 
156  SOCKET newClient = epicsSocketAccept(sock, &address.sa, &len);
157  if(newClient!=INVALID_SOCKET) {
158  // accept succeeded
159  ipAddrToDottedIP(&address.ia, ipAddrStr, sizeof(ipAddrStr));
160  LOG(logLevelDebug, "Accepted connection from PVA client: %s.", ipAddrStr);
161 
162  // enable TCP_NODELAY (disable Nagle's algorithm)
163  int optval = 1; // true
164  int retval = ::setsockopt(newClient, IPPROTO_TCP, TCP_NODELAY, (char *)&optval, sizeof(int));
165  if(retval<0) {
166  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
167  LOG(logLevelDebug, "Error setting TCP_NODELAY: %s.", strBuffer);
168  }
169 
170  // enable TCP_KEEPALIVE
171  retval = ::setsockopt(newClient, SOL_SOCKET, SO_KEEPALIVE, (char *)&optval, sizeof(int));
172  if(retval<0) {
173  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
174  LOG(logLevelDebug, "Error setting SO_KEEPALIVE: %s.", strBuffer);
175  }
176 
177  // do NOT tune socket buffer sizes, this will disable auto-tunning
178 
179  // get TCP send buffer size
180  osiSocklen_t intLen = sizeof(int);
181  int _socketSendBufferSize;
182  retval = getsockopt(newClient, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
183  if(retval<0) {
184  epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer));
185  LOG(logLevelDebug, "Error getting SO_SNDBUF: %s.", strBuffer);
186  }
187 
191  detail::BlockingServerTCPTransportCodec::shared_pointer transport =
193  _context,
194  newClient,
195  _responseHandler,
196  _socketSendBufferSize,
197  _receiveBufferSize);
198 
199  // validate connection
200  if(!validateConnection(transport, ipAddrStr)) {
201  // TODO
202  // wait for negative response to be sent back and
203  // hold off the client for retrying at very high rate
204  epicsThreadSleep(1.0);
205 
206  transport->close();
207  LOG(
209  "Connection to PVA client %s failed to be validated, closing it.",
210  ipAddrStr);
211  continue;
212  }
213 
214  LOG(logLevelDebug, "Serving to PVA client: %s.", ipAddrStr);
215 
216  }// accept succeeded
217  else
218  socketOpen = false;
219  } // while
220 }
221 
222 bool BlockingTCPAcceptor::validateConnection(Transport::shared_pointer const & transport, const char* address) {
223  try {
224  return transport->verify(5000);
225  } catch(...) {
226  LOG(logLevelDebug, "Validation of %s failed.", address);
227  return false;
228  }
229 }
230 
232  SOCKET sock;
233  {
234  Lock guard(_mutex);
235  if(_destroyed) return;
236  _destroyed = true;
237 
238  sock = _serverSocketChannel;
239  _serverSocketChannel = INVALID_SOCKET;
240  }
241 
242  if(sock!=INVALID_SOCKET) {
243  char ipAddrStr[24];
244  ipAddrToDottedIP(&_bindAddress.ia, ipAddrStr, sizeof(ipAddrStr));
245  LOG(logLevelDebug, "Stopped accepting connections at %s.", ipAddrStr);
246 
248  {
250  shutdown(sock, SHUT_RDWR);
251  epicsSocketDestroy(sock);
252  _thread.exitWait();
253  break;
255  LOG(logLevelError, "SigAlarm close not implemented for this target\n");
257  epicsSocketDestroy(sock);
258  _thread.exitWait();
259  break;
260  }
261  }
262 }
263 
264 }
265 }
static shared_pointer create(Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, int sendBufferSize, int receiveBufferSize)
Definition: codec.h:457
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
#define INVALID_SOCKET
Definition: osdSock.h:32
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
int osiSocklen_t
Definition: osdSock.h:36
struct sockaddr sa
Definition: osiSock.h:158
struct sockaddr_in ia
Definition: osiSock.h:157
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
A lock for multithreading.
Definition: lock.h:36
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
LIBCOM_API SOCKET epicsStdCall epicsSocketCreate(int domain, int type, int protocol)
Definition: osdSock.c:71
#define LOG(level, format,...)
Definition: logger.h:48
pvData
Definition: monitor.h:428
LIBCOM_API int epicsStdCall epicsSocketAccept(int sock, struct sockaddr *pAddr, osiSocklen_t *addrlen)
Definition: osdSock.c:94
int SOCKET
Definition: osdSock.h:31
#define THROW_BASE_EXCEPTION(msg)
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
C++ and C descriptions for a thread.
#define false
Definition: flexdef.h:85
#define SHUT_RDWR
Definition: osdSock.h:48
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144