This is Unofficial EPICS BASE Doxygen Site
epics::pvAccess::detail::BlockingTCPTransportCodec Class Reference

#include "codec.h"

+ Inheritance diagram for epics::pvAccess::detail::BlockingTCPTransportCodec:
+ Collaboration diagram for epics::pvAccess::detail::BlockingTCPTransportCodec:

Public Member Functions

 POINTER_DEFINITIONS (BlockingTCPTransportCodec)
 
 BlockingTCPTransportCodec (bool serverFlag, Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, size_t sendBufferSize, size_t receiveBufferSize, epics::pvData::int16 priority)
 
virtual ~BlockingTCPTransportCodec ()
 
virtual void readPollOne () OVERRIDE FINAL
 
virtual void writePollOne () OVERRIDE FINAL
 
virtual void scheduleSend () OVERRIDE FINAL
 
virtual void sendCompleted () OVERRIDE FINAL
 
virtual void close () OVERRIDE FINAL
 
virtual void waitJoin () OVERRIDE FINAL
 Call after close() to wait for any worker threads to exit. More...
 
virtual bool terminated () OVERRIDE FINAL
 
virtual bool isOpen () OVERRIDE FINAL
 
virtual void start ()
 
virtual int read (epics::pvData::ByteBuffer *dst) OVERRIDE FINAL
 
virtual int write (epics::pvData::ByteBuffer *src) OVERRIDE FINAL
 
virtual const osiSockAddrgetLastReadBufferSocketAddress () OVERRIDE FINAL
 
virtual void invalidDataStreamHandler () OVERRIDE FINAL
 
virtual std::string getType () const OVERRIDE FINAL
 
virtual void processControlMessage () OVERRIDE FINAL
 
virtual void processApplicationMessage () OVERRIDE FINAL
 
virtual const osiSockAddrgetRemoteAddress () const OVERRIDE FINAL
 
virtual const std::string & getRemoteName () const OVERRIDE FINAL
 
virtual std::size_t getReceiveBufferSize () const OVERRIDE FINAL
 
virtual epics::pvData::int16 getPriority () const OVERRIDE FINAL
 
virtual void setRemoteTransportReceiveBufferSize (std::size_t remoteTransportReceiveBufferSize) OVERRIDE FINAL
 
virtual void setRemoteTransportSocketReceiveBufferSize (std::size_t socketReceiveBufferSize) OVERRIDE FINAL
 
virtual std::tr1::shared_ptr< const epics::pvData::FieldcachedDeserialize (epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
 
virtual void cachedSerialize (const std::tr1::shared_ptr< const epics::pvData::Field > &field, epics::pvData::ByteBuffer *buffer) OVERRIDE FINAL
 
virtual void flushSendQueue () OVERRIDE FINAL
 
virtual bool isClosed () OVERRIDE FINAL
 
void activate ()
 
virtual bool verify (epics::pvData::int32 timeoutMs) OVERRIDE
 
virtual void verified (epics::pvData::Status const &status) OVERRIDE
 
virtual void authNZMessage (epics::pvData::PVStructure::shared_pointer const &data) OVERRIDE FINAL
 
virtual void sendSecurityPluginMessage (epics::pvData::PVStructure::const_shared_pointer const &data) OVERRIDE FINAL
 
- Public Member Functions inherited from epics::pvAccess::detail::AbstractCodec
 AbstractCodec (bool serverFlag, size_t sendBufferSize, size_t receiveBufferSize, int32_t socketSendBufferSize, bool blockingProcessQueue)
 
virtual ~AbstractCodec ()
 
virtual void alignBuffer (std::size_t alignment) OVERRIDE FINAL
 
virtual void ensureData (std::size_t size) OVERRIDE FINAL
 
virtual void alignData (std::size_t alignment) OVERRIDE FINAL
 
virtual void startMessage (epics::pvData::int8 command, std::size_t ensureCapacity=0, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
 
void putControlMessage (epics::pvData::int8 command, epics::pvData::int32 data)
 
virtual void endMessage () OVERRIDE FINAL
 
virtual void ensureBuffer (std::size_t size) OVERRIDE FINAL
 
virtual void flushSerializeBuffer () OVERRIDE FINAL
 
virtual void flush (bool lastMessageCompleted) OVERRIDE FINAL
 
void processWrite ()
 
void processRead ()
 
void processSendQueue ()
 
virtual void enqueueSendRequest (TransportSender::shared_pointer const &sender) OVERRIDE FINAL
 
void enqueueSendRequest (TransportSender::shared_pointer const &sender, std::size_t requiredBufferSize)
 
void setSenderThread ()
 
virtual void setRecipient (osiSockAddr const &sendTo) OVERRIDE FINAL
 
virtual void setByteOrder (int byteOrder) OVERRIDE FINAL
 
virtual bool directSerialize (epics::pvData::ByteBuffer *, const char *, std::size_t, std::size_t) OVERRIDE
 
virtual bool directDeserialize (epics::pvData::ByteBuffer *, char *, std::size_t, std::size_t) OVERRIDE
 
bool sendQueueEmpty () const
 
epics::pvData::int8 getRevision () const
 
- Public Member Functions inherited from epics::pvAccess::TransportSendControl
 POINTER_DEFINITIONS (TransportSendControl)
 
virtual ~TransportSendControl ()
 
- Public Member Functions inherited from epics::pvData::SerializableControl
virtual ~SerializableControl ()
 
virtual void cachedSerialize (std::tr1::shared_ptr< const Field > const &field, ByteBuffer *buffer)=0
 
- Public Member Functions inherited from epics::pvAccess::Transport
 POINTER_DEFINITIONS (Transport)
 
 Transport ()
 
virtual ~Transport ()
 
virtual bool acquire (std::tr1::shared_ptr< ClientChannelImpl > const &client)=0
 
virtual void release (pvAccessID clientId)=0
 
- Public Member Functions inherited from epics::pvData::DeserializableControl
virtual ~DeserializableControl ()
 
- Public Member Functions inherited from epics::pvAccess::AuthenticationPluginControl
 POINTER_DEFINITIONS (AuthenticationPluginControl)
 
virtual ~AuthenticationPluginControl ()
 
virtual void authenticationCompleted (const epics::pvData::Status &status, const std::tr1::shared_ptr< PeerInfo > &peer)=0
 

Public Attributes

PeerInfo::const_shared_pointer _peerInfo
 
- Public Attributes inherited from epics::pvAccess::detail::AbstractCodec
epics::pvData::Mutex _mutex
 
- Public Attributes inherited from epics::pvAccess::Transport
size_t _totalBytesSent
 
size_t _totalBytesRecv
 

Static Public Attributes

static size_t num_instances
 
- Static Public Attributes inherited from epics::pvAccess::detail::AbstractCodec
static const std::size_t MAX_MESSAGE_PROCESS = 100
 
static const std::size_t MAX_MESSAGE_SEND = 100
 
static const std::size_t MAX_ENSURE_SIZE = 1024
 
static const std::size_t MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2
 
static const std::size_t MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE
 
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024
 
- Static Public Attributes inherited from epics::pvAccess::Transport
static size_t num_instances
 

Protected Member Functions

virtual void setRxTimeout (bool ena) OVERRIDE FINAL
 
virtual void sendBufferFull (int tries) OVERRIDE FINAL
 
virtual void internalClose ()
 
- Protected Member Functions inherited from epics::pvAccess::detail::AbstractCodec
void send (epics::pvData::ByteBuffer *buffer)
 
void flushSendBuffer ()
 

Protected Attributes

osiSockAddr _socketAddress
 
std::string _socketName
 
Context::shared_pointer _context
 
IntrospectionRegistry _incomingIR
 
IntrospectionRegistry _outgoingIR
 
std::string _authSessionName
 
AuthenticationSession::shared_pointer _authSession
 
bool _verified
 
epics::pvData::Event _verifiedEvent
 
- Protected Attributes inherited from epics::pvAccess::detail::AbstractCodec
ReadMode _readMode
 
int8_t _version
 
int8_t _flags
 
int8_t _command
 
int32_t _payloadSize
 
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize
 
osiSockAddr _sendTo
 
epicsThreadId _senderThread
 
WriteMode _writeMode
 
bool _writeOpReady
 
epics::pvData::ByteBuffer _socketBuffer
 
epics::pvData::ByteBuffer _sendBuffer
 
fair_queue< TransportSender_sendQueue
 
const epics::pvData::int8 _clientServerFlag
 

Additional Inherited Members

- Static Public Member Functions inherited from epics::pvAccess::detail::AbstractCodec
static std::size_t alignedValue (std::size_t value, std::size_t alignment)
 

Detailed Description

Definition at line 268 of file codec.h.

Constructor & Destructor Documentation

epics::pvAccess::detail::BlockingTCPTransportCodec::BlockingTCPTransportCodec ( bool  serverFlag,
Context::shared_pointer const &  context,
SOCKET  channel,
ResponseHandler::shared_pointer const &  responseHandler,
size_t  sendBufferSize,
size_t  receiveBufferSize,
epics::pvData::int16  priority 
)

Definition at line 1226 of file codec.cpp.

1230  :AbstractCodec(
1231  serverFlag,
1232  sendBufferSize,
1233  receiveBufferSize,
1234  sendBufferSize,
1235  true)
1236  ,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread)
1238  .name("TCP-rx")
1239  .stack(epicsThreadStackBig)
1240  .autostart(false))
1241  ,_sendThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::sendThread)
1243  .name("TCP-tx")
1244  .stack(epicsThreadStackBig)
1245  .autostart(false))
1246  ,_channel(channel)
1247  ,_context(context), _responseHandler(responseHandler)
1248  ,_remoteTransportReceiveBufferSize(MAX_TCP_RECV)
1249  ,_priority(priority)
1250  ,_verified(false)
1251 {
1252  REFTRACE_INCREMENT(num_instances);
1253 
1254  _isOpen.getAndSet(true);
1255 
1256  // get remote address
1257  osiSocklen_t saSize = sizeof(sockaddr);
1258  int retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
1259  if(unlikely(retval<0)) {
1260  char errStr[64];
1261  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
1263  "Error fetching socket remote address: %s.",
1264  errStr);
1265  _socketName = "<unknown>:0";
1266  } else {
1267  char ipAddrStr[24];
1268  ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
1269  _socketName = ipAddrStr;
1270  }
1271 }
int osiSocklen_t
Definition: osdSock.h:36
struct sockaddr sa
Definition: osiSock.h:158
struct sockaddr_in ia
Definition: osiSock.h:157
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
#define LOG(level, format,...)
Definition: logger.h:48
AbstractCodec(bool serverFlag, size_t sendBufferSize, size_t receiveBufferSize, int32_t socketSendBufferSize, bool blockingProcessQueue)
Definition: codec.cpp:90
Create a new thread using the given.
Definition: thread.h:95
#define unlikely(x)
Definition: likely.h:15
#define epicsThreadPriorityCAServerLow
Definition: epicsThread.h:80
const epics::pvData::int32 MAX_TCP_RECV
Definition: pvaConstants.h:64
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144
epics::pvAccess::detail::BlockingTCPTransportCodec::~BlockingTCPTransportCodec ( )
virtual

Definition at line 1016 of file codec.cpp.

1017 {
1018  REFTRACE_DECREMENT(num_instances);
1019 
1020  waitJoin();
1021 }
virtual void waitJoin() OVERRIDE FINAL
Call after close() to wait for any worker threads to exit.
Definition: codec.cpp:1049

Member Function Documentation

void epics::pvAccess::detail::BlockingTCPTransportCodec::activate ( )
inline

Definition at line 380 of file codec.h.

380  {
381  Transport::shared_pointer thisSharedPtr = shared_from_this();
382  _context->getTransportRegistry()->install(thisSharedPtr);
383 
384  start();
385  }
void epics::pvAccess::detail::BlockingTCPTransportCodec::authNZMessage ( epics::pvData::PVStructure::shared_pointer const &  data)
virtual

Pass data to the active security plug-in session.

Parameters
datathe data (any data), can be null.

Implements epics::pvAccess::Transport.

Definition at line 1388 of file codec.cpp.

1388  {
1389  AuthenticationSession::shared_pointer sess;
1390  {
1391  Guard G(_mutex);
1392  sess = _authSession;
1393  }
1394  if (sess)
1395  sess->messageReceived(data);
1396  else
1397  {
1398  char ipAddrStr[24];
1399  ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
1400  LOG(logLevelWarn, "authNZ message received from '%s' but no security plug-in session active.", ipAddrStr);
1401  }
1402 }
struct sockaddr_in ia
Definition: osiSock.h:157
epics::pvData::Mutex _mutex
Definition: codec.h:264
#define LOG(level, format,...)
Definition: logger.h:48
AuthenticationSession::shared_pointer _authSession
Definition: codec.h:425
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144
virtual std::tr1::shared_ptr<const epics::pvData::Field> epics::pvAccess::detail::BlockingTCPTransportCodec::cachedDeserialize ( epics::pvData::ByteBuffer buffer)
inlinevirtual

deserialize via cache

Parameters
bufferbuffer to be deserialized from

Implements epics::pvData::DeserializableControl.

Definition at line 358 of file codec.h.

359  {
360  return _incomingIR.deserialize(buffer, this);
361  }
epics::pvData::FieldConstPtr deserialize(epics::pvData::ByteBuffer *buffer, epics::pvData::DeserializableControl *control)
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::cachedSerialize ( const std::tr1::shared_ptr< const epics::pvData::Field > &  field,
epics::pvData::ByteBuffer buffer 
)
inlinevirtual

Definition at line 364 of file codec.h.

367  {
368  _outgoingIR.serialize(field, buffer, this);
369  }
void serialize(epics::pvData::FieldConstPtr const &field, epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control)
void epics::pvAccess::detail::BlockingTCPTransportCodec::close ( )
virtual

Close transport.

Implements epics::pvAccess::Transport.

Definition at line 1033 of file codec.cpp.

1033  {
1034 
1035  if (_isOpen.getAndSet(false))
1036  {
1037  // always close in the same thread, same way, etc.
1038  // wakeup processSendQueue
1039 
1040  // clean resources (close socket)
1041  internalClose();
1042 
1043  // Break sender from queue wait
1044  BreakTransport::shared_pointer B(new BreakTransport);
1045  enqueueSendRequest(B);
1046  }
1047 }
virtual void enqueueSendRequest(TransportSender::shared_pointer const &sender) OVERRIDE FINAL
Definition: codec.cpp:873
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::flushSendQueue ( )
inlinevirtual

Flush send queue (sent messages).

Implements epics::pvAccess::Transport.

Definition at line 372 of file codec.h.

372 { }
virtual const osiSockAddr* epics::pvAccess::detail::BlockingTCPTransportCodec::getLastReadBufferSocketAddress ( )
inlinevirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 302 of file codec.h.

302  {
303  return &_socketAddress;
304  }
virtual epics::pvData::int16 epics::pvAccess::detail::BlockingTCPTransportCodec::getPriority ( ) const
inlinevirtual

Transport priority.

Returns
protocol priority.

Implements epics::pvAccess::Transport.

Definition at line 340 of file codec.h.

340  {
341  return _priority;
342  }
virtual std::size_t epics::pvAccess::detail::BlockingTCPTransportCodec::getReceiveBufferSize ( ) const
inlinevirtual

Get receive buffer size.

Returns
receive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 335 of file codec.h.

335  {
336  return _socketBuffer.getSize();
337  }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
virtual const osiSockAddr& epics::pvAccess::detail::BlockingTCPTransportCodec::getRemoteAddress ( ) const
inlinevirtual

Implements epics::pvAccess::Transport.

Definition at line 326 of file codec.h.

326  {
327  return _socketAddress;
328  }
virtual const std::string& epics::pvAccess::detail::BlockingTCPTransportCodec::getRemoteName ( ) const
inlinevirtual

Implements epics::pvAccess::Transport.

Definition at line 330 of file codec.h.

330  {
331  return _socketName;
332  }
virtual std::string epics::pvAccess::detail::BlockingTCPTransportCodec::getType ( ) const
inlinevirtual

Get protocol type (tcp, udp, ssl, etc.).

Returns
protocol type.

Implements epics::pvAccess::Transport.

Definition at line 307 of file codec.h.

307  {
308  return std::string("tcp");
309  }
void epics::pvAccess::detail::BlockingTCPTransportCodec::internalClose ( )
protectedvirtual

Called from close(). after start of shutdown (isOpen()==false) but before worker thread shutdown.

Reimplemented in epics::pvAccess::detail::BlockingClientTCPTransportCodec, and epics::pvAccess::detail::BlockingServerTCPTransportCodec.

Definition at line 1056 of file codec.cpp.

1057 {
1058  {
1059 
1062  switch ( info )
1063  {
1065  epicsSocketDestroy ( _channel );
1066  break;
1068  {
1069  /*int status =*/ ::shutdown ( _channel, SHUT_RDWR );
1070  /*
1071  if ( status ) {
1072  char sockErrBuf[64];
1073  epicsSocketConvertErrnoToString (
1074  sockErrBuf, sizeof ( sockErrBuf ) );
1075  LOG(logLevelDebug,
1076  "TCP socket to %s failed to shutdown: %s.",
1077  inetAddressToString(_socketAddress).c_str(), sockErrBuf);
1078  }
1079  */
1080  epicsSocketDestroy ( _channel );
1081  }
1082  break;
1084  // not supported anymore anyway
1085  default:
1086  epicsSocketDestroy(_channel);
1087  }
1088  }
1089 
1090  Transport::shared_pointer thisSharedPtr = this->shared_from_this();
1091  _context->getTransportRegistry()->remove(thisSharedPtr);
1092 
1094  {
1096  "TCP socket to %s is to be closed.",
1097  _socketName.c_str());
1098  }
1099 }
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
epicsSocketSystemCallInterruptMechanismQueryInfo
Definition: osiSock.h:47
#define LOG(level, format,...)
Definition: logger.h:48
#define SHUT_RDWR
Definition: osdSock.h:48
#define IS_LOGGABLE(level)
Definition: logger.h:51
void epics::pvAccess::detail::BlockingTCPTransportCodec::invalidDataStreamHandler ( )
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1274 of file codec.cpp.

1274  {
1275  close();
1276 }
virtual bool epics::pvAccess::detail::BlockingTCPTransportCodec::isClosed ( )
inlinevirtual

Check connection status.

Returns
true if connected.

Implements epics::pvAccess::Transport.

Definition at line 375 of file codec.h.

375  {
376  return !isOpen();
377  }
virtual bool isOpen() OVERRIDE FINAL
Definition: codec.cpp:1106
bool epics::pvAccess::detail::BlockingTCPTransportCodec::isOpen ( )
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1106 of file codec.cpp.

1106  {
1107  return _isOpen.get();
1108 }
epics::pvAccess::detail::BlockingTCPTransportCodec::POINTER_DEFINITIONS ( BlockingTCPTransportCodec  )
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::processApplicationMessage ( )
inlinevirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 320 of file codec.h.

320  {
321  _responseHandler->handleResponse(&_socketAddress, shared_from_this(),
323  }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::processControlMessage ( )
inlinevirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 311 of file codec.h.

311  {
313  {
314  // check 7-th bit
316  }
317  }
virtual void setByteOrder(int byteOrder) OVERRIDE FINAL
Definition: codec.cpp:952
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
#define EPICS_ENDIAN_LITTLE
Definition: epicsEndian.h:15
int epics::pvAccess::detail::BlockingTCPTransportCodec::read ( epics::pvData::ByteBuffer dst)
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1316 of file codec.cpp.

1316  {
1317 
1318  std::size_t remaining;
1319  while((remaining=dst->getRemaining()) > 0) {
1320 
1321  // read
1322  std::size_t pos = dst->getPosition();
1323 
1324  int bytesRead = ::recv(_channel,
1325  (char*)(dst->getBuffer()+pos), remaining, 0);
1326 
1327  // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
1328 
1329  if(unlikely(bytesRead==0)) {
1330  return -1; // 0 means connection loss for blocking transport, notify codec by returning -1
1331 
1332  } else if(unlikely(bytesRead<0)) {
1333  int err = SOCKERRNO;
1334 
1335  if(err==SOCK_EINTR) {
1336  // interrupted by signal. Retry
1337  continue;
1338 
1339  } else if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINPROGRESS
1340  || err==SOCK_ETIMEDOUT
1341  || err==SOCK_ECONNABORTED || err==SOCK_ECONNRESET
1342  ) {
1343  // different ways of saying timeout.
1344  // Linux: EAGAIN or EWOULDBLOCK, or EINPROGRESS
1345  // WIN32: WSAETIMEDOUT
1346  // others that RSRV checks for, but may not need to, ECONNABORTED, ECONNRESET
1347 
1348  // Note: with windows, after ETIMEOUT leaves the socket in an undefined state.
1349  // so it must be closed. (cf. SO_RCVTIMEO)
1350 
1351  return -1;
1352 
1353  } else {
1354  // some other (fatal) error
1355  if(_isOpen.get())
1356  errlogPrintf("%s : Connection closed with RX socket error %d\n", _socketName.c_str(), err);
1357  return -1;
1358  }
1359  }
1360 
1361  dst->setPosition(dst->getPosition() + bytesRead);
1362  return bytesRead;
1363  }
1364 
1365  return 0;
1366 }
#define SOCK_ECONNABORTED
Definition: osdSock.h:59
#define SOCK_EINPROGRESS
Definition: osdSock.h:60
const char * getBuffer() const
Definition: byteBuffer.h:294
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
std::size_t getPosition() const
Definition: byteBuffer.h:346
#define SOCK_ECONNRESET
Definition: osdSock.h:53
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::size_t getRemaining() const
Definition: byteBuffer.h:391
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
#define unlikely(x)
Definition: likely.h:15
#define SOCKERRNO
Definition: osdSock.h:33
#define SOCK_EINTR
Definition: osdSock.h:64
#define SOCK_EWOULDBLOCK
Definition: osdSock.h:51
void epics::pvAccess::detail::BlockingTCPTransportCodec::readPollOne ( )
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1023 of file codec.cpp.

1023  {
1024  throw std::logic_error("should not be called for blocking IO");
1025 }
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::scheduleSend ( )
inlinevirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 292 of file codec.h.

292 {}
void epics::pvAccess::detail::BlockingTCPTransportCodec::sendBufferFull ( int  tries)
protectedvirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1211 of file codec.cpp.

1211  {
1212  // TODO constants
1213  epicsThreadSleep(std::max<double>(tries * 0.1, 1));
1214 }
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::sendCompleted ( )
inlinevirtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 293 of file codec.h.

293 {}
void epics::pvAccess::detail::BlockingTCPTransportCodec::sendSecurityPluginMessage ( epics::pvData::PVStructure::const_shared_pointer const &  data)
virtual

Send AUTHZ to peer with payload. caller gives up ownership of data, which must not be modified.

Implements epics::pvAccess::AuthenticationPluginControl.

Definition at line 1425 of file codec.cpp.

1425  {
1426  SecurityPluginMessageTransportSender::shared_pointer spmts(new SecurityPluginMessageTransportSender(data));
1427  enqueueSendRequest(spmts);
1428 }
virtual void enqueueSendRequest(TransportSender::shared_pointer const &sender) OVERRIDE FINAL
Definition: codec.cpp:873
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::setRemoteTransportReceiveBufferSize ( std::size_t  receiveBufferSize)
inlinevirtual

Set remote transport receive buffer size.

Parameters
receiveBufferSizereceive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 345 of file codec.h.

346  {
347  _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize;
348  }
virtual void epics::pvAccess::detail::BlockingTCPTransportCodec::setRemoteTransportSocketReceiveBufferSize ( std::size_t  socketReceiveBufferSize)
inlinevirtual

Set remote transport socket receive buffer size.

Parameters
socketReceiveBufferSizeremote socket receive buffer size.

Implements epics::pvAccess::Transport.

Definition at line 351 of file codec.h.

352  {
353  _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize;
354  }
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize
Definition: codec.h:224
void epics::pvAccess::detail::BlockingTCPTransportCodec::setRxTimeout ( bool  ena)
protectedvirtual

Reimplemented from epics::pvAccess::detail::AbstractCodec.

Definition at line 1189 of file codec.cpp.

1190 {
1191  double timeout = !ena ? 0.0 : std::max(0.0, _context->getConfiguration()->getPropertyAsDouble("EPICS_PVA_CONN_TMO", 30.0));
1192 #ifdef _WIN32
1193  DWORD timo = DWORD(timeout*1000); // in milliseconds
1194 #else
1195  timeval timo;
1196  timo.tv_sec = unsigned(timeout);
1197  timo.tv_usec = (timeout-timo.tv_sec)*1e6;
1198 #endif
1199 
1200  int ret = setsockopt(_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timo, sizeof(timo));
1201  if(ret==-1) {
1202  int err = SOCKERRNO;
1203  static int lasterr;
1204  if(err!=lasterr) {
1205  errlogPrintf("%s: Unable to set RX timeout: %d\n", _socketName.c_str(), err);
1206  lasterr = err;
1207  }
1208  }
1209 }
double timeout
Definition: pvutils.cpp:25
#define max(x, y)
Definition: flexdef.h:81
BSD and SRV5 Unix timestamp.
Definition: epicsTime.h:52
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
#define SOCKERRNO
Definition: osdSock.h:33
void epics::pvAccess::detail::BlockingTCPTransportCodec::start ( )
virtual

Reimplemented in epics::pvAccess::detail::BlockingClientTCPTransportCodec.

Definition at line 1112 of file codec.cpp.

1112  {
1113 
1114  _readThread.start();
1115 
1116  _sendThread.start();
1117 
1118 }
bool epics::pvAccess::detail::BlockingTCPTransportCodec::terminated ( )
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1101 of file codec.cpp.

1101  {
1102  return !isOpen();
1103 }
virtual bool isOpen() OVERRIDE FINAL
Definition: codec.cpp:1106
void epics::pvAccess::detail::BlockingTCPTransportCodec::verified ( epics::pvData::Status const &  status)
virtual

Notify transport that it is has been verified.

Parameters
statusvefification status;

Implements epics::pvAccess::Transport.

Reimplemented in epics::pvAccess::detail::BlockingClientTCPTransportCodec, and epics::pvAccess::detail::BlockingServerTCPTransportCodec.

Definition at line 1373 of file codec.cpp.

1373  {
1375 
1376  if (IS_LOGGABLE(logLevelDebug) && !status.isOK())
1377  {
1378  LOG(logLevelDebug, "Failed to verify connection to %s: %s.", _socketName.c_str(), status.getMessage().c_str());
1379  }
1380 
1381  {
1382  Guard G(_mutex);
1384  }
1386 }
epicsMutexId lock
Definition: osiClockTime.c:37
pvd::Status status
A lock for multithreading.
Definition: lock.h:36
epics::pvData::Mutex _mutex
Definition: codec.h:264
const std::string & getMessage() const
Definition: status.h:80
#define LOG(level, format,...)
Definition: logger.h:48
bool isSuccess() const
Definition: status.h:103
bool isOK() const
Definition: status.h:95
#define IS_LOGGABLE(level)
Definition: logger.h:51
bool epics::pvAccess::detail::BlockingTCPTransportCodec::verify ( epics::pvData::int32  timeoutMs)
virtual

Waits (if needed) until transport is verified, i.e. verified() method is being called.

Parameters
timeoutMstimeout to wait for verification, infinite if 0.

Implements epics::pvAccess::Transport.

Reimplemented in epics::pvAccess::detail::BlockingServerTCPTransportCodec.

Definition at line 1369 of file codec.cpp.

1369  {
1370  return _verifiedEvent.wait(timeoutMs/1000.0) && _verified;
1371 }
void epics::pvAccess::detail::BlockingTCPTransportCodec::waitJoin ( )
virtual

Call after close() to wait for any worker threads to exit.

Reimplemented from epics::pvAccess::Transport.

Definition at line 1049 of file codec.cpp.

1050 {
1051  assert(!_isOpen.get());
1052  _sendThread.exitWait();
1053  _readThread.exitWait();
1054 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
int epics::pvAccess::detail::BlockingTCPTransportCodec::write ( epics::pvData::ByteBuffer src)
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1279 of file codec.cpp.

1280  {
1281 
1282  std::size_t remaining;
1283  while((remaining=src->getRemaining()) > 0) {
1284 
1285  int bytesSent = ::send(_channel,
1286  &src->getBuffer()[src->getPosition()],
1287  remaining, 0);
1288 
1289  // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
1290 
1291  // TODO winsock return 0 on disconnect (blocking socket) ?
1292 
1293  if(unlikely(bytesSent<0)) {
1294 
1295  int socketError = SOCKERRNO;
1296 
1297  // spurious EINTR check
1298  if (socketError==SOCK_EINTR)
1299  continue;
1300  else if (socketError==SOCK_ENOBUFS)
1301  return 0;
1302  }
1303 
1304  if (bytesSent > 0) {
1305  src->setPosition(src->getPosition() + bytesSent);
1306  }
1307 
1308  return bytesSent;
1309 
1310  }
1311 
1312  return 0;
1313 }
const char * getBuffer() const
Definition: byteBuffer.h:294
#define SOCK_ENOBUFS
Definition: osdSock.h:52
std::size_t getPosition() const
Definition: byteBuffer.h:346
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::size_t getRemaining() const
Definition: byteBuffer.h:391
#define unlikely(x)
Definition: likely.h:15
#define SOCKERRNO
Definition: osdSock.h:33
#define SOCK_EINTR
Definition: osdSock.h:64
void send(epics::pvData::ByteBuffer *buffer)
Definition: codec.cpp:776
void epics::pvAccess::detail::BlockingTCPTransportCodec::writePollOne ( )
virtual

Implements epics::pvAccess::detail::AbstractCodec.

Definition at line 1028 of file codec.cpp.

1028  {
1029  throw std::logic_error("should not be called for blocking IO");
1030 }

Member Data Documentation

AuthenticationSession::shared_pointer epics::pvAccess::detail::BlockingTCPTransportCodec::_authSession
protected

Definition at line 425 of file codec.h.

std::string epics::pvAccess::detail::BlockingTCPTransportCodec::_authSessionName
protected

Definition at line 424 of file codec.h.

Context::shared_pointer epics::pvAccess::detail::BlockingTCPTransportCodec::_context
protected

Definition at line 418 of file codec.h.

IntrospectionRegistry epics::pvAccess::detail::BlockingTCPTransportCodec::_incomingIR
protected

Definition at line 420 of file codec.h.

IntrospectionRegistry epics::pvAccess::detail::BlockingTCPTransportCodec::_outgoingIR
protected

Definition at line 421 of file codec.h.

PeerInfo::const_shared_pointer epics::pvAccess::detail::BlockingTCPTransportCodec::_peerInfo

Definition at line 428 of file codec.h.

osiSockAddr epics::pvAccess::detail::BlockingTCPTransportCodec::_socketAddress
protected

Definition at line 415 of file codec.h.

std::string epics::pvAccess::detail::BlockingTCPTransportCodec::_socketName
protected

Definition at line 416 of file codec.h.

bool epics::pvAccess::detail::BlockingTCPTransportCodec::_verified
protected

Definition at line 437 of file codec.h.

epics::pvData::Event epics::pvAccess::detail::BlockingTCPTransportCodec::_verifiedEvent
protected

Definition at line 438 of file codec.h.

size_t epics::pvAccess::detail::BlockingTCPTransportCodec::num_instances
static

Definition at line 278 of file codec.h.


The documentation for this class was generated from the following files: