This is Unofficial EPICS BASE Doxygen Site
epics::pvAccess::detail::AbstractCodec Class Referenceabstract

#include "codec.h"

+ Inheritance diagram for epics::pvAccess::detail::AbstractCodec:
+ Collaboration diagram for epics::pvAccess::detail::AbstractCodec:

Public Member Functions

 AbstractCodec (bool serverFlag, size_t sendBufferSize, size_t receiveBufferSize, int32_t socketSendBufferSize, bool blockingProcessQueue)
 
virtual void processControlMessage ()=0
 
virtual void processApplicationMessage ()=0
 
virtual const osiSockAddrgetLastReadBufferSocketAddress ()=0
 
virtual void invalidDataStreamHandler ()=0
 
virtual void readPollOne ()=0
 
virtual void writePollOne ()=0
 
virtual void scheduleSend ()=0
 
virtual void sendCompleted ()=0
 
virtual bool terminated ()=0
 
virtual int write (epics::pvData::ByteBuffer *src)=0
 
virtual int read (epics::pvData::ByteBuffer *dst)=0
 
virtual bool isOpen ()=0
 
virtual ~AbstractCodec ()
 
virtual void alignBuffer (std::size_t alignment) OVERRIDE FINAL
 
virtual void ensureData (std::size_t size) OVERRIDE FINAL
 
virtual void alignData (std::size_t alignment) OVERRIDE FINAL
 
virtual void startMessage (epics::pvData::int8 command, std::size_t ensureCapacity=0, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
 
void putControlMessage (epics::pvData::int8 command, epics::pvData::int32 data)
 
virtual void endMessage () OVERRIDE FINAL
 
virtual void ensureBuffer (std::size_t size) OVERRIDE FINAL
 
virtual void flushSerializeBuffer () OVERRIDE FINAL
 
virtual void flush (bool lastMessageCompleted) OVERRIDE FINAL
 
void processWrite ()
 
void processRead ()
 
void processSendQueue ()
 
virtual void enqueueSendRequest (TransportSender::shared_pointer const &sender) OVERRIDE FINAL
 
void enqueueSendRequest (TransportSender::shared_pointer const &sender, std::size_t requiredBufferSize)
 
void setSenderThread ()
 
virtual void setRecipient (osiSockAddr const &sendTo) OVERRIDE FINAL
 
virtual void setByteOrder (int byteOrder) OVERRIDE FINAL
 
virtual bool directSerialize (epics::pvData::ByteBuffer *, const char *, std::size_t, std::size_t) OVERRIDE
 
virtual bool directDeserialize (epics::pvData::ByteBuffer *, char *, std::size_t, std::size_t) OVERRIDE
 
bool sendQueueEmpty () const
 
epics::pvData::int8 getRevision () const
 
- Public Member Functions inherited from epics::pvAccess::TransportSendControl
 POINTER_DEFINITIONS (TransportSendControl)
 
virtual ~TransportSendControl ()
 
- Public Member Functions inherited from epics::pvData::SerializableControl
virtual ~SerializableControl ()
 
virtual void cachedSerialize (std::tr1::shared_ptr< const Field > const &field, ByteBuffer *buffer)=0
 
- Public Member Functions inherited from epics::pvAccess::Transport
 POINTER_DEFINITIONS (Transport)
 
 Transport ()
 
virtual ~Transport ()
 
virtual bool acquire (std::tr1::shared_ptr< ClientChannelImpl > const &client)=0
 
virtual void release (pvAccessID clientId)=0
 
virtual std::string getType () const =0
 
virtual const osiSockAddrgetRemoteAddress () const =0
 
virtual const std::string & getRemoteName () const =0
 
virtual std::size_t getReceiveBufferSize () const =0
 
virtual epics::pvData::int16 getPriority () const =0
 
virtual void setRemoteTransportReceiveBufferSize (std::size_t receiveBufferSize)=0
 
virtual void setRemoteTransportSocketReceiveBufferSize (std::size_t socketReceiveBufferSize)=0
 
virtual void flushSendQueue ()=0
 
virtual void verified (epics::pvData::Status const &status)=0
 
virtual bool verify (epics::pvData::int32 timeoutMs)=0
 
virtual void close ()=0
 
virtual void waitJoin ()
 Call after close() to wait for any worker threads to exit. More...
 
virtual bool isClosed ()=0
 
virtual void authNZMessage (epics::pvData::PVStructure::shared_pointer const &data)=0
 
- Public Member Functions inherited from epics::pvData::DeserializableControl
virtual ~DeserializableControl ()
 
virtual std::tr1::shared_ptr< const FieldcachedDeserialize (ByteBuffer *buffer)=0
 

Static Public Member Functions

static std::size_t alignedValue (std::size_t value, std::size_t alignment)
 

Public Attributes

epics::pvData::Mutex _mutex
 
- Public Attributes inherited from epics::pvAccess::Transport
size_t _totalBytesSent
 
size_t _totalBytesRecv
 

Static Public Attributes

static const std::size_t MAX_MESSAGE_PROCESS = 100
 
static const std::size_t MAX_MESSAGE_SEND = 100
 
static const std::size_t MAX_ENSURE_SIZE = 1024
 
static const std::size_t MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2
 
static const std::size_t MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE
 
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE = 1024
 
- Static Public Attributes inherited from epics::pvAccess::Transport
static size_t num_instances
 

Protected Member Functions

virtual void sendBufferFull (int tries)=0
 
void send (epics::pvData::ByteBuffer *buffer)
 
void flushSendBuffer ()
 
virtual void setRxTimeout (bool ena)
 

Protected Attributes

ReadMode _readMode
 
int8_t _version
 
int8_t _flags
 
int8_t _command
 
int32_t _payloadSize
 
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize
 
osiSockAddr _sendTo
 
epicsThreadId _senderThread
 
WriteMode _writeMode
 
bool _writeOpReady
 
epics::pvData::ByteBuffer _socketBuffer
 
epics::pvData::ByteBuffer _sendBuffer
 
fair_queue< TransportSender_sendQueue
 
const epics::pvData::int8 _clientServerFlag
 

Detailed Description

Definition at line 127 of file codec.h.

Constructor & Destructor Documentation

epics::pvAccess::detail::AbstractCodec::AbstractCodec ( bool  serverFlag,
size_t  sendBufferSize,
size_t  receiveBufferSize,
int32_t  socketSendBufferSize,
bool  blockingProcessQueue 
)

Definition at line 90 of file codec.cpp.

95  :
96  //PROTECTED
99  _senderThread(0),
101  _writeOpReady(false),
102  _socketBuffer(bufSizeSelect(receiveBufferSize)),
103  _sendBuffer(bufSizeSelect(sendBufferSize)),
104  //PRIVATE
105  _storedPayloadSize(0), _storedPosition(0), _startPosition(0),
106  _maxSendPayloadSize(_sendBuffer.getSize() - 2*PVA_MESSAGE_HEADER_SIZE), // start msg + control
107  _lastMessageStartPosition(std::numeric_limits<size_t>::max()),_lastSegmentedMessageType(0),
108  _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0),
109  _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00),
110  _clientServerFlag(serverFlag ? 0x40 : 0x00)
111 {
113  throw std::invalid_argument(
114  "receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE");
115 
117  throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE");
118 
119  // initialize to be empty
121  _startPosition = _socketBuffer.getPosition();
122 
123  // clear send
124  _sendBuffer.clear();
125 }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
#define max(x, y)
Definition: flexdef.h:81
std::size_t getLimit() const
Definition: byteBuffer.h:368
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
std::size_t getPosition() const
Definition: byteBuffer.h:346
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
static const std::size_t MAX_ENSURE_SIZE
Definition: codec.h:135
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
epics::pvData::int32 _remoteTransportSocketReceiveBufferSize
Definition: codec.h:224
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
const epics::pvData::int8 _clientServerFlag
Definition: codec.h:260
#define EPICS_BYTE_ORDER
Definition: osdWireConfig.h:16
const epics::pvData::int32 MAX_TCP_RECV
Definition: pvaConstants.h:64
virtual epics::pvAccess::detail::AbstractCodec::~AbstractCodec ( )
inlinevirtual

Definition at line 161 of file codec.h.

162  {
163  }

Member Function Documentation

void epics::pvAccess::detail::AbstractCodec::alignBuffer ( std::size_t  alignment)
virtual

Add pad bytes to buffer.

Parameters
alignmentalignment required.

Implements epics::pvData::SerializableControl.

Definition at line 580 of file codec.cpp.

580  {
581 
582  std::size_t k = (alignment - 1);
583  std::size_t pos = _sendBuffer.getPosition();
584  std::size_t newpos = (pos + k) & (~k);
585  if (pos == newpos)
586  return;
587 
588  // for safety reasons we really pad (override previous message data)
589  std::size_t padCount = newpos - pos;
590  _sendBuffer.put(PADDING_BYTES, 0, padCount);
591 }
std::size_t getPosition() const
Definition: byteBuffer.h:346
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
void epics::pvAccess::detail::AbstractCodec::alignData ( std::size_t  alignment)
virtual

Align buffer. Note that this takes care only current buffer alignment. If streaming protocol is used, care must be taken that entire stream is aligned.

Parameters
alignmentsize in bytes, must be power of two.

Implements epics::pvData::DeserializableControl.

Definition at line 546 of file codec.cpp.

546  {
547 
548  std::size_t k = (alignment - 1);
549  std::size_t pos = _socketBuffer.getPosition();
550  std::size_t newpos = (pos + k) & (~k);
551  if (pos == newpos)
552  return;
553 
554  std::size_t diff = _socketBuffer.getLimit() - newpos;
555  if (diff > 0)
556  {
557  _socketBuffer.setPosition(newpos);
558  return;
559  }
560 
561  ensureData(diff);
562 
563  // position has changed, recalculate
564  newpos = (_socketBuffer.getPosition() + k) & (~k);
565  _socketBuffer.setPosition(newpos);
566 }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
virtual void ensureData(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:423
std::size_t getLimit() const
Definition: byteBuffer.h:368
std::size_t getPosition() const
Definition: byteBuffer.h:346
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::size_t epics::pvAccess::detail::AbstractCodec::alignedValue ( std::size_t  value,
std::size_t  alignment 
)
static

Definition at line 537 of file codec.cpp.

539  {
540 
541  std::size_t k = (alignment - 1);
542  return (value + k) & (~k);
543 }
Definition: link.h:174
bool epics::pvAccess::detail::AbstractCodec::directDeserialize ( epics::pvData::ByteBuffer existingBuffer,
char *  deserializeTo,
std::size_t  elementCount,
std::size_t  elementSize 
)
virtual

Method for deserializing array data. Hook for supplying custom deserialization implementation. The deserialization implementation need not be provided. Returns true if method performs deserialization, false otherwise. This should only be used for arrays of primitive types. i.e. boolean, byte,..., double. It cannot be called for string, structure, or union arrays.

Parameters
existingBufferthe existing buffer from the caller.
deserializeTolocation of data.
elementCountnumber of elements.
elementSizeelement size.
Returns
true if deserialization performed, else false.

Implements epics::pvData::DeserializableControl.

Definition at line 1003 of file codec.cpp.

1005 {
1006  return false;
1007 }
bool epics::pvAccess::detail::AbstractCodec::directSerialize ( epics::pvData::ByteBuffer existingBuffer,
const char *  toSerialize,
std::size_t  elementCount,
std::size_t  elementSize 
)
virtual

Method for serializing primitive array data. Hook for supplying custom serialization implementation. The serialization implementation need not be provided. Returns true if method performs serialization, false otherwise. This should only be used for arrays of primitive types, i. e. boolean, byte,..., double. It cannot be called for string, structure, or union arrays.

Parameters
existingBufferthe existing buffer from the caller.
toSerializelocation of data to be put into buffer.
elementCountnumber of elements.
elementSizeelement size.
Returns
true if serialization performed, else false.

Implements epics::pvData::SerializableControl.

Definition at line 961 of file codec.cpp.

963 {
964  // TODO overflow check of "size_t count", overflow int32 field of payloadSize header field
965  // TODO max message size in connection validation
966  std::size_t count = elementCount * elementSize;
967 
968  // TODO find smart limit
969  // check if direct mode actually pays off
970  if (count < 64*1024)
971  return false;
972 
973  //
974  // first end current message, and write a header of next "directly serialized" message
975  //
976 
977  // first end current message indicating the we will segment
978  endMessage(true);
979 
980  // append segmented message header with payloadSize == count
981  // TODO size_t to int32
982  startMessage(_lastSegmentedMessageCommand, 0, static_cast<int32>(count));
983 
984  // flush
985  flushSendBuffer();
986 
987  // TODO think if alignment is preserved after...
988 
989  //
990  // send toSerialize buffer
991  //
992  ByteBuffer wrappedBuffer(const_cast<char*>(toSerialize), count);
993  send(&wrappedBuffer);
994 
995  //
996  // continue where we left before calling directSerialize
997  //
998  startMessage(_lastSegmentedMessageCommand, 0);
999 
1000  return true;
1001 }
size_t elementSize(ScalarType id)
gives sizeof(T) where T depends on the scalar type id.
Definition: TypeFunc.cpp:82
virtual void endMessage() OVERRIDE FINAL
Definition: codec.cpp:632
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity=0, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
Definition: codec.cpp:594
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
void send(epics::pvData::ByteBuffer *buffer)
Definition: codec.cpp:776
void epics::pvAccess::detail::AbstractCodec::endMessage ( )
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 632 of file codec.cpp.

632  {
633  endMessage(false);
634 }
virtual void endMessage() OVERRIDE FINAL
Definition: codec.cpp:632
void epics::pvAccess::detail::AbstractCodec::enqueueSendRequest ( TransportSender::shared_pointer const &  sender)
virtual

Enqueue send request.

Parameters
sender

Implements epics::pvAccess::Transport.

Definition at line 873 of file codec.cpp.

874  {
875  _sendQueue.push_back(sender);
876  scheduleSend();
877 }
fair_queue< TransportSender > _sendQueue
Definition: codec.h:234
void epics::pvAccess::detail::AbstractCodec::enqueueSendRequest ( TransportSender::shared_pointer const &  sender,
std::size_t  requiredBufferSize 
)

Definition at line 928 of file codec.cpp.

930  {
931 
933  _sendQueue.empty() &&
934  _sendBuffer.getRemaining() >= requiredBufferSize)
935  {
936  processSender(sender);
937  if (_sendBuffer.getPosition() > 0)
938  {
939  scheduleSend();
940  }
941  }
942  else
943  enqueueSendRequest(sender);
944 }
std::size_t getPosition() const
Definition: byteBuffer.h:346
std::size_t getRemaining() const
Definition: byteBuffer.h:391
fair_queue< TransportSender > _sendQueue
Definition: codec.h:234
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
virtual void enqueueSendRequest(TransportSender::shared_pointer const &sender) OVERRIDE FINAL
Definition: codec.cpp:873
LIBCOM_API epicsThreadId epicsStdCall epicsThreadGetIdSelf(void)
Definition: osdThread.c:810
void epics::pvAccess::detail::AbstractCodec::ensureBuffer ( std::size_t  size)
virtual

Make sure buffer has at least size bytes remaining. If not flush existing buffer and provide a new one.

Parameters
sizeThe number of bytes.

Implements epics::pvData::SerializableControl.

Definition at line 701 of file codec.cpp.

701  {
702 
703  if (_sendBuffer.getRemaining() >= size)
704  return;
705 
706  // too large for buffer...
707  if (_maxSendPayloadSize < size) {
708  std::ostringstream msg;
709  msg << "requested for buffer size " <<
710  size << ", but only " << _maxSendPayloadSize << " available.";
711  std::string s = msg.str();
713  "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
714  throw std::invalid_argument(s);
715  }
716 
717  while (_sendBuffer.getRemaining() < size)
718  flush(false);
719 }
virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL
Definition: codec.cpp:747
#define LOG(level, format,...)
Definition: logger.h:48
std::size_t getRemaining() const
Definition: byteBuffer.h:391
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
void epics::pvAccess::detail::AbstractCodec::ensureData ( std::size_t  size)
virtual

Helper method. Ensures specified size of bytes, provides it if necessary.

Parameters
sizeThe number of bytes.

Implements epics::pvData::DeserializableControl.

Definition at line 423 of file codec.cpp.

423  {
424 
425  // enough of data?
426  if (_socketBuffer.getRemaining() >= size)
427  return;
428 
429  // to large for buffer...
430  if (size > MAX_ENSURE_DATA_SIZE) {// half for SPLIT, half for SEGMENTED
431  std::ostringstream msg;
432  msg << "requested for buffer size " << size
433  << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed.";
435  "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
436  throw std::invalid_argument(msg.str());
437  }
438 
439  try
440  {
441 
442  // subtract what was already processed
443  std::size_t pos = _socketBuffer.getPosition();
444  _storedPayloadSize -= pos - _storedPosition;
445 
446  // SPLIT message case
447  // no more data and we have some payload left => read buffer
448  // NOTE: (storedPayloadSize >= size) does not work if size
449  //spans over multiple messages
450  if (_storedPayloadSize >= (_storedLimit-pos))
451  {
452  // just read up remaining payload
453  // this will move current (<size) part of the buffer
454  // to the beginning of the buffer
455  ReadMode storedMode = _readMode;
456  _readMode = SPLIT;
457  readToBuffer(size, true);
458  _readMode = storedMode;
459  _storedPosition = _socketBuffer.getPosition();
460  _storedLimit = _socketBuffer.getLimit();
462  std::min<std::size_t>(
463  _storedPosition + _storedPayloadSize, _storedLimit));
464 
465  // check needed, if not enough data is available or
466  // we run into segmented message
467  ensureData(size);
468  }
469  // SEGMENTED message case
470  else
471  {
472  // TODO check flags
473  //if (flags && SEGMENTED_FLAGS_MASK == 0)
474  // throw IllegalStateException("segmented message expected,
475  //but current message flag does not indicate it");
476 
477 
478  // copy remaining bytes of payload to safe area
479  //[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any
480  // remaining is relative to payload since buffer is
481  //bounded from outside
482  std::size_t remainingBytes = _socketBuffer.getRemaining();
483  for (std::size_t i = 0; i < remainingBytes; i++)
485 
486  // restore limit (there might be some data already present
487  //and readToBuffer needs to know real limit)
488  _socketBuffer.setLimit(_storedLimit);
489 
490  // we expect segmented message, we expect header
491  // that (and maybe some control packets) needs to be "removed"
492  // so that we get combined payload
493  ReadMode storedMode = _readMode;
495  processRead();
496  _readMode = storedMode;
497 
498  // make sure we have all the data (maybe we run into SPLIT)
499  readToBuffer(size - remainingBytes, true);
500 
501  // SPLIT cannot mess with this, since start of the message,
502  //i.e. current position, is always aligned
505 
506  // copy before position (i.e. start of the payload)
507  for (int32_t i = remainingBytes - 1,
508  j = _socketBuffer.getPosition() - 1; i >= 0; i--, j--)
510 
511  _startPosition = _socketBuffer.getPosition() - remainingBytes;
512  _socketBuffer.setPosition(_startPosition);
513 
514  _storedPayloadSize += remainingBytes;
515  _storedPosition = _startPosition;
516  _storedLimit = _socketBuffer.getLimit();
518  std::min<std::size_t>(
519  _storedPosition + _storedPayloadSize, _storedLimit));
520 
521  // sequential small segmented messages in the buffer
522  ensureData(size);
523  }
524  }
525  catch (io_exception &) {
526  try {
527  close();
528  } catch (io_exception & ) {
529  // noop, best-effort close
530  }
531  throw connection_closed_exception(
532  "Failed to ensure data to read buffer.");
533  }
534 }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
static const std::size_t MAX_ENSURE_DATA_SIZE
Definition: codec.h:136
virtual void ensureData(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:423
EPICS_ALWAYS_INLINE int8 getByte()
Definition: byteBuffer.h:617
int i
Definition: scan.c:967
std::size_t getLimit() const
Definition: byteBuffer.h:368
std::size_t getPosition() const
Definition: byteBuffer.h:346
#define LOG(level, format,...)
Definition: logger.h:48
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
std::size_t getRemaining() const
Definition: byteBuffer.h:391
void setLimit(std::size_t limit)
Definition: byteBuffer.h:380
virtual void close()=0
void epics::pvAccess::detail::AbstractCodec::flush ( bool  lastMessageCompleted)
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 747 of file codec.cpp.

747  {
748 
749  // automatic end
750  endMessage(!lastMessageCompleted);
751 
752  // flush send buffer
753  flushSendBuffer();
754 
755  // start with last header
756  if (!lastMessageCompleted && _lastSegmentedMessageType != 0)
757  startMessage(_lastSegmentedMessageCommand, 0);
758 }
virtual void endMessage() OVERRIDE FINAL
Definition: codec.cpp:632
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity=0, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
Definition: codec.cpp:594
void epics::pvAccess::detail::AbstractCodec::flushSendBuffer ( )
protected

Definition at line 726 of file codec.cpp.

726  {
727 
728  _sendBuffer.flip();
729 
730  try {
731  send(&_sendBuffer);
732  } catch (io_exception &) {
733  try {
734  if (isOpen())
735  close();
736  } catch (io_exception &) {
737  // noop, best-effort close
738  }
739  throw connection_closed_exception("Failed to send buffer.");
740  }
741 
742  _sendBuffer.clear();
743 
744  _lastMessageStartPosition = std::numeric_limits<size_t>::max();
745 }
#define max(x, y)
Definition: flexdef.h:81
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
void send(epics::pvData::ByteBuffer *buffer)
Definition: codec.cpp:776
virtual void close()=0
void epics::pvAccess::detail::AbstractCodec::flushSerializeBuffer ( )
virtual

Done with this buffer. Flush it.

Implements epics::pvData::SerializableControl.

Definition at line 722 of file codec.cpp.

722  {
723  flush(false);
724 }
virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL
Definition: codec.cpp:747
virtual const osiSockAddr* epics::pvAccess::detail::AbstractCodec::getLastReadBufferSocketAddress ( )
pure virtual
epics::pvData::int8 epics::pvAccess::detail::AbstractCodec::getRevision ( ) const
inline

Definition at line 205 of file codec.h.

205  {
208  return myver < _version ? myver : _version;
209  }
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
epics::pvData::Mutex _mutex
Definition: codec.h:264
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
const epics::pvData::int8 _clientServerFlag
Definition: codec.h:260
virtual void epics::pvAccess::detail::AbstractCodec::invalidDataStreamHandler ( )
pure virtual
virtual bool epics::pvAccess::detail::AbstractCodec::isOpen ( )
pure virtual
virtual void epics::pvAccess::detail::AbstractCodec::processApplicationMessage ( )
pure virtual
virtual void epics::pvAccess::detail::AbstractCodec::processControlMessage ( )
pure virtual
void epics::pvAccess::detail::AbstractCodec::processRead ( )

Definition at line 129 of file codec.cpp.

129  {
130  switch (_readMode)
131  {
132  case NORMAL:
133  processReadNormal();
134  break;
135  case SEGMENTED:
136  processReadSegmented();
137  break;
138  case SPLIT:
139  throw std::logic_error("ReadMode == SPLIT not supported");
140  }
141 
142 }
void epics::pvAccess::detail::AbstractCodec::processSendQueue ( )

Definition at line 833 of file codec.cpp.

834 {
835 
836  {
837  std::size_t senderProcessed = 0;
838  while (senderProcessed++ < MAX_MESSAGE_SEND)
839  {
840  TransportSender::shared_pointer sender;
841  _sendQueue.pop_front_try(sender);
842  if (sender.get() == 0)
843  {
844  // flush
845  if (_sendBuffer.getPosition() > 0)
846  flush(true);
847 
848  sendCompleted(); // do not schedule sending
849 
850  if (terminated()) // termination
851  break;
852  // termination (we want to process even if shutdown)
853  _sendQueue.pop_front(sender);
854  }
855 
856  try {
857  processSender(sender);
858  } catch(...) {
859  if (_sendBuffer.getPosition() > 0)
860  flush(true);
861  sendCompleted();
862  throw;
863  }
864  }
865  }
866 
867  // flush
868  if (_sendBuffer.getPosition() > 0)
869  flush(true);
870 }
virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL
Definition: codec.cpp:747
std::size_t getPosition() const
Definition: byteBuffer.h:346
static const std::size_t MAX_MESSAGE_SEND
Definition: codec.h:134
fair_queue< TransportSender > _sendQueue
Definition: codec.h:234
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
void epics::pvAccess::detail::AbstractCodec::processWrite ( )

Definition at line 762 of file codec.cpp.

762  {
763 
764  switch (_writeMode)
765  {
766  case PROCESS_SEND_QUEUE:
768  break;
770  _writeOpReady = true;
771  break;
772  }
773 }
void epics::pvAccess::detail::AbstractCodec::putControlMessage ( epics::pvData::int8  command,
epics::pvData::int32  data 
)

Definition at line 617 of file codec.cpp.

619  {
620 
621  _lastMessageStartPosition =
622  std::numeric_limits<size_t>::max(); // TODO revise this
626  _sendBuffer.putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message
627  _sendBuffer.putByte(command); // command
628  _sendBuffer.putInt(data); // data
629 }
#define max(x, y)
Definition: flexdef.h:81
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
virtual void ensureBuffer(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:701
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
const epics::pvData::int8 _clientServerFlag
Definition: codec.h:260
virtual int epics::pvAccess::detail::AbstractCodec::read ( epics::pvData::ByteBuffer dst)
pure virtual
virtual void epics::pvAccess::detail::AbstractCodec::readPollOne ( )
pure virtual
virtual void epics::pvAccess::detail::AbstractCodec::scheduleSend ( )
pure virtual
void epics::pvAccess::detail::AbstractCodec::send ( epics::pvData::ByteBuffer buffer)
protected

Definition at line 776 of file codec.cpp.

777 {
778 
779  // On Windows, limiting the buffer size is important to prevent
780  // poor throughput performances when transferring large amount of
781  // data over non-blocking socket. See Microsoft KB article KB823764.
782  // We do it also for other systems just to be safe.
783  std::size_t maxBytesToSend = (size_t)-1;
784  // std::min<int32_t>(
785  // _socketSendBufferSize, _remoteTransportSocketReceiveBufferSize) / 2;
786 
787  std::size_t limit = buffer->getLimit();
788  std::size_t bytesToSend = limit - buffer->getPosition();
789 
790  // limit sending
791  if (bytesToSend > maxBytesToSend)
792  {
793  bytesToSend = maxBytesToSend;
794  buffer->setLimit(buffer->getPosition() + bytesToSend);
795  }
796 
797  int tries = 0;
798  while (buffer->getRemaining() > 0)
799  {
800 
801  //int p = buffer.position();
802  int bytesSent = write(buffer);
803 
804  if (bytesSent < 0)
805  {
806  // connection lost
807  close();
808  throw connection_closed_exception("bytesSent < 0");
809  }
810  else if (bytesSent == 0)
811  {
812  sendBufferFull(tries++);
813  continue;
814  }
815 
816  atomic::add(_totalBytesSent, bytesSent);
817 
818  // readjust limit
819  if (bytesToSend == maxBytesToSend)
820  {
821  bytesToSend = limit - buffer->getPosition();
822 
823  if(bytesToSend > maxBytesToSend)
824  bytesToSend = maxBytesToSend;
825 
826  buffer->setLimit(buffer->getPosition() + bytesToSend);
827  }
828  tries = 0;
829  }
830 }
virtual void sendBufferFull(int tries)=0
std::size_t getLimit() const
Definition: byteBuffer.h:368
std::size_t getPosition() const
Definition: byteBuffer.h:346
std::size_t getRemaining() const
Definition: byteBuffer.h:391
virtual int write(epics::pvData::ByteBuffer *src)=0
void setLimit(std::size_t limit)
Definition: byteBuffer.h:380
virtual void close()=0
virtual void epics::pvAccess::detail::AbstractCodec::sendBufferFull ( int  tries)
protectedpure virtual
virtual void epics::pvAccess::detail::AbstractCodec::sendCompleted ( )
pure virtual
bool epics::pvAccess::detail::AbstractCodec::sendQueueEmpty ( ) const
inline

Definition at line 201 of file codec.h.

201  {
202  return _sendQueue.empty();
203  }
fair_queue< TransportSender > _sendQueue
Definition: codec.h:234
void epics::pvAccess::detail::AbstractCodec::setByteOrder ( int  byteOrder)
virtual

Set byte order.

Parameters
byteOrderbyte order to set.

Implements epics::pvAccess::Transport.

Definition at line 952 of file codec.cpp.

953 {
954  _socketBuffer.setEndianess(byteOrder);
955  // TODO sync
956  _sendBuffer.setEndianess(byteOrder);
957  _byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00;
958 }
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
void setEndianess(int byteOrder)
Definition: byteBuffer.h:285
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
void epics::pvAccess::detail::AbstractCodec::setRecipient ( osiSockAddr const &  sendTo)
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 947 of file codec.cpp.

947  {
948  _sendTo = sendTo;
949 }
virtual void epics::pvAccess::detail::AbstractCodec::setRxTimeout ( bool  ena)
inlineprotectedvirtual

Reimplemented in epics::pvAccess::detail::BlockingTCPTransportCodec.

Definition at line 217 of file codec.h.

217 {}
void epics::pvAccess::detail::AbstractCodec::setSenderThread ( )

Definition at line 880 of file codec.cpp.

881 {
883 }
LIBCOM_API epicsThreadId epicsStdCall epicsThreadGetIdSelf(void)
Definition: osdThread.c:810
void epics::pvAccess::detail::AbstractCodec::startMessage ( epics::pvData::int8  command,
std::size_t  ensureCapacity = 0,
epics::pvData::int32  payloadSize = 0 
)
virtual

Implements epics::pvAccess::TransportSendControl.

Definition at line 594 of file codec.cpp.

597  {
598  _lastMessageStartPosition =
599  std::numeric_limits<size_t>::max(); // TODO revise this
600  ensureBuffer(
601  PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset);
602  _lastMessageStartPosition = _sendBuffer.getPosition();
606  (_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message
607  _sendBuffer.putByte(command); // command
608  _sendBuffer.putInt(payloadSize);
609 
610  // apply offset
611  if (_nextMessagePayloadOffset > 0)
613  _sendBuffer.getPosition() + _nextMessagePayloadOffset);
614 }
#define max(x, y)
Definition: flexdef.h:81
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
virtual void ensureBuffer(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:701
std::size_t getPosition() const
Definition: byteBuffer.h:346
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
const epics::pvData::int8 _clientServerFlag
Definition: codec.h:260
virtual bool epics::pvAccess::detail::AbstractCodec::terminated ( )
pure virtual
virtual int epics::pvAccess::detail::AbstractCodec::write ( epics::pvData::ByteBuffer src)
pure virtual
virtual void epics::pvAccess::detail::AbstractCodec::writePollOne ( )
pure virtual

Member Data Documentation

const epics::pvData::int8 epics::pvAccess::detail::AbstractCodec::_clientServerFlag
protected

Definition at line 260 of file codec.h.

int8_t epics::pvAccess::detail::AbstractCodec::_command
protected

Definition at line 222 of file codec.h.

int8_t epics::pvAccess::detail::AbstractCodec::_flags
protected

Definition at line 221 of file codec.h.

epics::pvData::Mutex epics::pvAccess::detail::AbstractCodec::_mutex
mutable

Definition at line 264 of file codec.h.

int32_t epics::pvAccess::detail::AbstractCodec::_payloadSize
protected

Definition at line 223 of file codec.h.

ReadMode epics::pvAccess::detail::AbstractCodec::_readMode
protected

Definition at line 219 of file codec.h.

epics::pvData::int32 epics::pvAccess::detail::AbstractCodec::_remoteTransportSocketReceiveBufferSize
protected

Definition at line 224 of file codec.h.

epics::pvData::ByteBuffer epics::pvAccess::detail::AbstractCodec::_sendBuffer
protected

Definition at line 232 of file codec.h.

epicsThreadId epics::pvAccess::detail::AbstractCodec::_senderThread
protected

Definition at line 227 of file codec.h.

fair_queue<TransportSender> epics::pvAccess::detail::AbstractCodec::_sendQueue
protected

Definition at line 234 of file codec.h.

osiSockAddr epics::pvAccess::detail::AbstractCodec::_sendTo
protected

Definition at line 226 of file codec.h.

epics::pvData::ByteBuffer epics::pvAccess::detail::AbstractCodec::_socketBuffer
protected

Definition at line 231 of file codec.h.

int8_t epics::pvAccess::detail::AbstractCodec::_version
protected

Definition at line 220 of file codec.h.

WriteMode epics::pvAccess::detail::AbstractCodec::_writeMode
protected

Definition at line 228 of file codec.h.

bool epics::pvAccess::detail::AbstractCodec::_writeOpReady
protected

Definition at line 229 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE
static

Definition at line 137 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024
static

Definition at line 138 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2
static

Definition at line 136 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_ENSURE_SIZE = 1024
static

Definition at line 135 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_MESSAGE_PROCESS = 100
static

Definition at line 133 of file codec.h.

const std::size_t epics::pvAccess::detail::AbstractCodec::MAX_MESSAGE_SEND = 100
static

Definition at line 134 of file codec.h.


The documentation for this class was generated from the following files: