This is Unofficial EPICS BASE Doxygen Site
codec.cpp
Go to the documentation of this file.
1 
7 #include <map>
8 #include <string>
9 #include <vector>
10 #include <limits>
11 #include <stdexcept>
12 #include <sstream>
13 #include <sys/types.h>
14 
15 #include <osiSock.h>
16 #include <epicsTime.h>
17 #include <epicsThread.h>
18 #include <epicsVersion.h>
19 #include <errlog.h>
20 #include <epicsAtomic.h>
21 
22 #include <pv/byteBuffer.h>
23 #include <pv/pvType.h>
24 #include <pv/lock.h>
25 #include <pv/timer.h>
26 #include <pv/event.h>
27 #include <pv/reftrack.h>
28 
29 #define epicsExportSharedSymbols
30 #include <pv/blockingTCP.h>
31 #include <pv/remote.h>
32 #include <pv/inetAddressUtil.h>
33 #include <pv/hexDump.h>
34 #include <pv/logger.h>
35 #include <pv/likely.h>
36 #include <pv/codec.h>
37 #include <pv/serializationHelper.h>
38 #include <pv/serverChannelImpl.h>
39 #include <pv/clientContextImpl.h>
40 
41 using namespace std;
42 using namespace epics::pvData;
43 using namespace epics::pvAccess;
44 
46 
47 namespace {
48 struct BreakTransport : TransportSender
49 {
50  virtual ~BreakTransport() {}
51  virtual void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) OVERRIDE FINAL
52  {
54  }
55 };
56 } // namespace
57 
58 namespace epics {
59 namespace pvAccess {
60 
61 size_t Transport::num_instances;
62 
63 Transport::Transport()
64  :_totalBytesSent(0u)
65  ,_totalBytesRecv(0u)
66 {
67  REFTRACE_INCREMENT(num_instances);
68 }
69 
71 {
72  REFTRACE_DECREMENT(num_instances);
73 }
74 
75 namespace detail {
76 
77 const std::size_t AbstractCodec::MAX_MESSAGE_PROCESS = 100;
78 const std::size_t AbstractCodec::MAX_MESSAGE_SEND = 100;
79 const std::size_t AbstractCodec::MAX_ENSURE_SIZE = 1024;
80 const std::size_t AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2;
81 const std::size_t AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE;
82 const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
83 
84 static
85 size_t bufSizeSelect(size_t request)
86 {
87  return std::max(request, size_t(MAX_TCP_RECV + AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE));
88 }
89 
90 AbstractCodec::AbstractCodec(
91  bool serverFlag,
92  size_t sendBufferSize,
93  size_t receiveBufferSize,
94  int32_t socketSendBufferSize,
95  bool blockingProcessQueue):
96  //PROTECTED
97  _readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0),
98  _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV),
99  _senderThread(0),
100  _writeMode(PROCESS_SEND_QUEUE),
101  _writeOpReady(false),
102  _socketBuffer(bufSizeSelect(receiveBufferSize)),
103  _sendBuffer(bufSizeSelect(sendBufferSize)),
104  //PRIVATE
105  _storedPayloadSize(0), _storedPosition(0), _startPosition(0),
106  _maxSendPayloadSize(_sendBuffer.getSize() - 2*PVA_MESSAGE_HEADER_SIZE), // start msg + control
107  _lastMessageStartPosition(std::numeric_limits<size_t>::max()),_lastSegmentedMessageType(0),
108  _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0),
109  _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00),
110  _clientServerFlag(serverFlag ? 0x40 : 0x00)
111 {
113  throw std::invalid_argument(
114  "receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE");
115 
117  throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE");
118 
119  // initialize to be empty
121  _startPosition = _socketBuffer.getPosition();
122 
123  // clear send
124  _sendBuffer.clear();
125 }
126 
127 
128 // thows io_exception, connection_closed_exception, invalid_stream_exception
130  switch (_readMode)
131  {
132  case NORMAL:
133  processReadNormal();
134  break;
135  case SEGMENTED:
136  processReadSegmented();
137  break;
138  case SPLIT:
139  throw std::logic_error("ReadMode == SPLIT not supported");
140  }
141 
142 }
143 
144 
145 void AbstractCodec::processHeader() {
146 
147  Guard G(_mutex); // guards access to _version et al.
148 
149  // magic code
150  int8_t magicCode = _socketBuffer.getByte();
151 
152  // version
153  int8_t ver = _socketBuffer.getByte();
154  if(_version!=ver) {
155  // enable timeout if both ends support
156  _version = ver;
158  }
159 
160  // flags
162 
163  // command
165 
166  // read payload size
168 
169  // check magic code
170  if (magicCode != PVA_MAGIC || _version==0)
171  {
173  "Invalid header received from the client : %s %02x%02x%02x%02x disconnecting...",
175  unsigned(magicCode), unsigned(_version), unsigned(_flags), unsigned(_command));
177  throw invalid_data_stream_exception("invalid header received");
178  }
179 
180 }
181 
182 
183 void AbstractCodec::processReadNormal() {
184 
185  try
186  {
187  std::size_t messageProcessCount = 0;
188  while (messageProcessCount++ < MAX_MESSAGE_PROCESS)
189  {
190  // read as much as available, but at least for a header
191  // readFromSocket checks if reading from socket is really necessary
192  if (!readToBuffer(PVA_MESSAGE_HEADER_SIZE, false)) {
193  return;
194  }
195 
196  // read header fields
197  processHeader();
198  bool isControl = ((_flags & 0x01) == 0x01);
199  if (isControl) {
201  }
202  else
203  {
204  // segmented sanity check
205  bool notFirstSegment = (_flags & 0x20) != 0;
206  if (notFirstSegment)
207  {
208  // not-first segmented message with zero payload is "kind of" valid
209  // TODO this should check if previous message was first- or middle-segmented message
210  if (_payloadSize == 0)
211  continue;
212 
214  "Protocol Violation: Not-a-first segmented message received in normal mode"
215  " from the client at %s:%d: %s, disconnecting...",
216  __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
219  "not-a-first segmented message received in normal mode");
220  }
221 
222  _storedPayloadSize = _payloadSize;
223  _storedPosition = _socketBuffer.getPosition();
224  _storedLimit = _socketBuffer.getLimit();
225  _socketBuffer.setLimit(std::min(_storedPosition + _storedPayloadSize, _storedLimit));
226  bool postProcess = true;
227  try
228  {
229  // handle response
231 
232  if (!isOpen())
233  return;
234 
235  postProcess = false;
236  postProcessApplicationMessage();
237  }
238  catch(...)
239  {
240  if (!isOpen())
241  return;
242 
243  if (postProcess)
244  {
245  postProcessApplicationMessage();
246  }
247 
248  throw;
249  }
250  }
251  }
252 
253  }
255  {
256  // noop, should be already handled (and logged)
257  }
258  catch (connection_closed_exception & )
259  {
260  // noop, should be already handled (and logged)
261  }
262 }
263 
264 void AbstractCodec::postProcessApplicationMessage()
265 {
266  // can be closed by now
267  // isOpen() should be efficiently implemented
268  while (true)
269  //while (isOpen())
270  {
271  // set position as whole message was read
272  //(in case code haven't done so)
273  std::size_t newPosition = _storedPosition + _storedPayloadSize;
274 
275  // aligned buffer size ensures that there is enough space
276  //in buffer,
277  // however data might not be fully read
278 
279  // discard the rest of the packet
280  if (newPosition > _storedLimit)
281  {
282  // processApplicationMessage() did not read up
283  //quite some buffer
284 
285  // we only handle unused alignment bytes
286  int bytesNotRead =
287  newPosition - _socketBuffer.getPosition();
288  assert(bytesNotRead>=0);
289 
290  if (bytesNotRead==0)
291  {
292  // reveal currently existing padding
293  _socketBuffer.setLimit(_storedLimit);
294  continue;
295  }
296 
297  // TODO we do not handle this for now (maybe never)
299  "unprocessed read buffer from client at %s:%d: %s,"
300  " disconnecting...",
301  __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
304  "unprocessed read buffer");
305  }
306  _socketBuffer.setLimit(_storedLimit);
307  _socketBuffer.setPosition(newPosition);
308  break;
309  }
310 }
311 
312 void AbstractCodec::processReadSegmented() {
313 
314  while (true)
315  {
316  // read as much as available, but at least for a header
317  // readFromSocket checks if reading from socket is really necessary
318  readToBuffer(PVA_MESSAGE_HEADER_SIZE, true);
319 
320  // read header fields
321  processHeader();
322 
323  bool isControl = ((_flags & 0x01) == 0x01);
324  if (isControl)
326  else
327  {
328  // last segment bit set (means in-between segment or last segment)
329  // we expect this, no non-control messages between
330  //segmented message are supported
331  // NOTE: for now... it is easy to support non-semgented
332  //messages between segmented messages
333  bool notFirstSegment = (_flags & 0x20) != 0;
334  if (!notFirstSegment)
335  {
337  "Protocol Violation: Not-a-first segmented message expected from the client at"
338  " %s:%d: %s, disconnecting...",
339  __FILE__, __LINE__, inetAddressToString(*getLastReadBufferSocketAddress()).c_str());
342  "not-a-first segmented message expected");
343  }
344 
345  _storedPayloadSize = _payloadSize;
346 
347  // return control to caller code
348  return;
349  }
350  }
351 
352 }
353 
354 
355 bool AbstractCodec::readToBuffer(
356  std::size_t requiredBytes,
357  bool persistent) {
358 
359  // do we already have requiredBytes available?
360  std::size_t remainingBytes = _socketBuffer.getRemaining();
361  if (remainingBytes >= requiredBytes) {
362  return true;
363  }
364 
365  // assumption: remainingBytes < MAX_ENSURE_DATA_BUFFER_SIZE &&
366  // requiredBytes < (socketBuffer.capacity() - 1)
367 
368  //
369  // copy unread part to the beginning of the buffer
370  // to make room for new data (as much as we can read)
371  // NOTE: requiredBytes is expected to be small (order of 10 bytes)
372  //
373 
374  // a new start position, we are careful to preserve alignment
375  _startPosition = MAX_ENSURE_SIZE;
376 
377  std::size_t endPosition = _startPosition + remainingBytes;
378 
379  for (std::size_t i = _startPosition; i < endPosition; i++)
381 
382  // update buffer to the new position
384  _socketBuffer.setPosition(endPosition);
385 
386  // read at least requiredBytes bytes
387  std::size_t requiredPosition = _startPosition + requiredBytes;
388  while (_socketBuffer.getPosition() < requiredPosition)
389  {
390  int bytesRead = read(&_socketBuffer);
391 
392  if (bytesRead < 0)
393  {
394  close();
395  throw connection_closed_exception("bytesRead < 0");
396  }
397  // non-blocking IO support
398  else if (bytesRead == 0)
399  {
400  if (persistent)
401  readPollOne();
402  else
403  {
404  // set pointers (aka flip)
406  _socketBuffer.setPosition(_startPosition);
407 
408  return false;
409  }
410  }
411 
412  atomic::add(_totalBytesRecv, bytesRead);
413  }
414 
415  // set pointers (aka flip)
417  _socketBuffer.setPosition(_startPosition);
418 
419  return true;
420 }
421 
422 
423 void AbstractCodec::ensureData(std::size_t size) {
424 
425  // enough of data?
426  if (_socketBuffer.getRemaining() >= size)
427  return;
428 
429  // to large for buffer...
430  if (size > MAX_ENSURE_DATA_SIZE) {// half for SPLIT, half for SEGMENTED
431  std::ostringstream msg;
432  msg << "requested for buffer size " << size
433  << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed.";
435  "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
436  throw std::invalid_argument(msg.str());
437  }
438 
439  try
440  {
441 
442  // subtract what was already processed
443  std::size_t pos = _socketBuffer.getPosition();
444  _storedPayloadSize -= pos - _storedPosition;
445 
446  // SPLIT message case
447  // no more data and we have some payload left => read buffer
448  // NOTE: (storedPayloadSize >= size) does not work if size
449  //spans over multiple messages
450  if (_storedPayloadSize >= (_storedLimit-pos))
451  {
452  // just read up remaining payload
453  // this will move current (<size) part of the buffer
454  // to the beginning of the buffer
455  ReadMode storedMode = _readMode;
456  _readMode = SPLIT;
457  readToBuffer(size, true);
458  _readMode = storedMode;
459  _storedPosition = _socketBuffer.getPosition();
460  _storedLimit = _socketBuffer.getLimit();
462  std::min<std::size_t>(
463  _storedPosition + _storedPayloadSize, _storedLimit));
464 
465  // check needed, if not enough data is available or
466  // we run into segmented message
467  ensureData(size);
468  }
469  // SEGMENTED message case
470  else
471  {
472  // TODO check flags
473  //if (flags && SEGMENTED_FLAGS_MASK == 0)
474  // throw IllegalStateException("segmented message expected,
475  //but current message flag does not indicate it");
476 
477 
478  // copy remaining bytes of payload to safe area
479  //[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any
480  // remaining is relative to payload since buffer is
481  //bounded from outside
482  std::size_t remainingBytes = _socketBuffer.getRemaining();
483  for (std::size_t i = 0; i < remainingBytes; i++)
485 
486  // restore limit (there might be some data already present
487  //and readToBuffer needs to know real limit)
488  _socketBuffer.setLimit(_storedLimit);
489 
490  // we expect segmented message, we expect header
491  // that (and maybe some control packets) needs to be "removed"
492  // so that we get combined payload
493  ReadMode storedMode = _readMode;
495  processRead();
496  _readMode = storedMode;
497 
498  // make sure we have all the data (maybe we run into SPLIT)
499  readToBuffer(size - remainingBytes, true);
500 
501  // SPLIT cannot mess with this, since start of the message,
502  //i.e. current position, is always aligned
505 
506  // copy before position (i.e. start of the payload)
507  for (int32_t i = remainingBytes - 1,
508  j = _socketBuffer.getPosition() - 1; i >= 0; i--, j--)
510 
511  _startPosition = _socketBuffer.getPosition() - remainingBytes;
512  _socketBuffer.setPosition(_startPosition);
513 
514  _storedPayloadSize += remainingBytes;
515  _storedPosition = _startPosition;
516  _storedLimit = _socketBuffer.getLimit();
518  std::min<std::size_t>(
519  _storedPosition + _storedPayloadSize, _storedLimit));
520 
521  // sequential small segmented messages in the buffer
522  ensureData(size);
523  }
524  }
525  catch (io_exception &) {
526  try {
527  close();
528  } catch (io_exception & ) {
529  // noop, best-effort close
530  }
532  "Failed to ensure data to read buffer.");
533  }
534 }
535 
536 
538  std::size_t value,
539  std::size_t alignment) {
540 
541  std::size_t k = (alignment - 1);
542  return (value + k) & (~k);
543 }
544 
545 
546 void AbstractCodec::alignData(std::size_t alignment) {
547 
548  std::size_t k = (alignment - 1);
549  std::size_t pos = _socketBuffer.getPosition();
550  std::size_t newpos = (pos + k) & (~k);
551  if (pos == newpos)
552  return;
553 
554  std::size_t diff = _socketBuffer.getLimit() - newpos;
555  if (diff > 0)
556  {
557  _socketBuffer.setPosition(newpos);
558  return;
559  }
560 
561  ensureData(diff);
562 
563  // position has changed, recalculate
564  newpos = (_socketBuffer.getPosition() + k) & (~k);
565  _socketBuffer.setPosition(newpos);
566 }
567 
568 static const char PADDING_BYTES[] =
569 {
570  static_cast<char>(0xFF),
571  static_cast<char>(0xFF),
572  static_cast<char>(0xFF),
573  static_cast<char>(0xFF),
574  static_cast<char>(0xFF),
575  static_cast<char>(0xFF),
576  static_cast<char>(0xFF),
577  static_cast<char>(0xFF)
578 };
579 
580 void AbstractCodec::alignBuffer(std::size_t alignment) {
581 
582  std::size_t k = (alignment - 1);
583  std::size_t pos = _sendBuffer.getPosition();
584  std::size_t newpos = (pos + k) & (~k);
585  if (pos == newpos)
586  return;
587 
588  // for safety reasons we really pad (override previous message data)
589  std::size_t padCount = newpos - pos;
590  _sendBuffer.put(PADDING_BYTES, 0, padCount);
591 }
592 
593 
595  epics::pvData::int8 command,
596  std::size_t ensureCapacity,
597  epics::pvData::int32 payloadSize) {
598  _lastMessageStartPosition =
599  std::numeric_limits<size_t>::max(); // TODO revise this
600  ensureBuffer(
601  PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset);
602  _lastMessageStartPosition = _sendBuffer.getPosition();
606  (_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message
607  _sendBuffer.putByte(command); // command
608  _sendBuffer.putInt(payloadSize);
609 
610  // apply offset
611  if (_nextMessagePayloadOffset > 0)
613  _sendBuffer.getPosition() + _nextMessagePayloadOffset);
614 }
615 
616 
618  epics::pvData::int8 command,
619  epics::pvData::int32 data) {
620 
621  _lastMessageStartPosition =
622  std::numeric_limits<size_t>::max(); // TODO revise this
626  _sendBuffer.putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message
627  _sendBuffer.putByte(command); // command
628  _sendBuffer.putInt(data); // data
629 }
630 
631 
633  endMessage(false);
634 }
635 
636 
637 void AbstractCodec::endMessage(bool hasMoreSegments) {
638 
639  if (_lastMessageStartPosition != std::numeric_limits<size_t>::max())
640  {
641  std::size_t lastPayloadBytePosition = _sendBuffer.getPosition();
642 
643  // set paylaod size (non-aligned)
644  std::size_t payloadSize =
645  lastPayloadBytePosition -
646  _lastMessageStartPosition - PVA_MESSAGE_HEADER_SIZE;
647 
648  _sendBuffer.putInt(_lastMessageStartPosition + 4, payloadSize);
649 
650  // set segmented bit
651  if (hasMoreSegments) {
652  // first segment
653  if (_lastSegmentedMessageType == 0)
654  {
655  std::size_t flagsPosition = _lastMessageStartPosition + 2;
656  epics::pvData::int8 type = _sendBuffer.getByte(flagsPosition);
657  // set first segment bit
658  _sendBuffer.putByte(flagsPosition, (type | 0x10));
659  // first + last segment bit == in-between segment
660  _lastSegmentedMessageType = type | 0x30;
661  _lastSegmentedMessageCommand =
662  _sendBuffer.getByte(flagsPosition + 1);
663  }
664  _nextMessagePayloadOffset = 0;
665  }
666  else
667  {
668  // last segment
669  if (_lastSegmentedMessageType != 0)
670  {
671  std::size_t flagsPosition = _lastMessageStartPosition + 2;
672  // set last segment bit (by clearing first segment bit)
673  _sendBuffer.putByte(flagsPosition,
674  (_lastSegmentedMessageType & 0xEF));
675  _lastSegmentedMessageType = 0;
676  }
677  _nextMessagePayloadOffset = 0;
678  }
679 
680  // TODO
681  /*
682  // manage markers
683  final int position = sendBuffer.position();
684  final int bytesLeft = sendBuffer.remaining();
685  if (position >= nextMarkerPosition && bytesLeft >=
686  PVAConstants.PVA_MESSAGE_HEADER_SIZE)
687  {
688  sendBuffer.put(PVAConstants.PVA_MAGIC);
689  sendBuffer.put(PVAConstants.PVA_VERSION);
690  sendBuffer.put((byte)(0x01 | byteOrderFlag)); // control data
691  sendBuffer.put((byte)0); // marker
692  sendBuffer.putInt((int)(totalBytesSent + position +
693  PVAConstants.PVA_MESSAGE_HEADER_SIZE));
694  nextMarkerPosition = position + markerPeriodBytes;
695  }
696  */
697  _lastMessageStartPosition = std::numeric_limits<size_t>::max();
698  }
699 }
700 
701 void AbstractCodec::ensureBuffer(std::size_t size) {
702 
703  if (_sendBuffer.getRemaining() >= size)
704  return;
705 
706  // too large for buffer...
707  if (_maxSendPayloadSize < size) {
708  std::ostringstream msg;
709  msg << "requested for buffer size " <<
710  size << ", but only " << _maxSendPayloadSize << " available.";
711  std::string s = msg.str();
713  "%s at %s:%d.,", msg.str().c_str(), __FILE__, __LINE__);
714  throw std::invalid_argument(s);
715  }
716 
717  while (_sendBuffer.getRemaining() < size)
718  flush(false);
719 }
720 
721 // assumes startMessage was called (or header is in place), because endMessage(true) is later called that peeks and sets _lastSegmentedMessageType
723  flush(false);
724 }
725 
727 
728  _sendBuffer.flip();
729 
730  try {
731  send(&_sendBuffer);
732  } catch (io_exception &) {
733  try {
734  if (isOpen())
735  close();
736  } catch (io_exception &) {
737  // noop, best-effort close
738  }
739  throw connection_closed_exception("Failed to send buffer.");
740  }
741 
742  _sendBuffer.clear();
743 
744  _lastMessageStartPosition = std::numeric_limits<size_t>::max();
745 }
746 
747 void AbstractCodec::flush(bool lastMessageCompleted) {
748 
749  // automatic end
750  endMessage(!lastMessageCompleted);
751 
752  // flush send buffer
753  flushSendBuffer();
754 
755  // start with last header
756  if (!lastMessageCompleted && _lastSegmentedMessageType != 0)
757  startMessage(_lastSegmentedMessageCommand, 0);
758 }
759 
760 
761 // thows io_exception, connection_closed_exception
763 
764  switch (_writeMode)
765  {
766  case PROCESS_SEND_QUEUE:
768  break;
770  _writeOpReady = true;
771  break;
772  }
773 }
774 
775 
777 {
778 
779  // On Windows, limiting the buffer size is important to prevent
780  // poor throughput performances when transferring large amount of
781  // data over non-blocking socket. See Microsoft KB article KB823764.
782  // We do it also for other systems just to be safe.
783  std::size_t maxBytesToSend = (size_t)-1;
784  // std::min<int32_t>(
785  // _socketSendBufferSize, _remoteTransportSocketReceiveBufferSize) / 2;
786 
787  std::size_t limit = buffer->getLimit();
788  std::size_t bytesToSend = limit - buffer->getPosition();
789 
790  // limit sending
791  if (bytesToSend > maxBytesToSend)
792  {
793  bytesToSend = maxBytesToSend;
794  buffer->setLimit(buffer->getPosition() + bytesToSend);
795  }
796 
797  int tries = 0;
798  while (buffer->getRemaining() > 0)
799  {
800 
801  //int p = buffer.position();
802  int bytesSent = write(buffer);
803 
804  if (bytesSent < 0)
805  {
806  // connection lost
807  close();
808  throw connection_closed_exception("bytesSent < 0");
809  }
810  else if (bytesSent == 0)
811  {
812  sendBufferFull(tries++);
813  continue;
814  }
815 
816  atomic::add(_totalBytesSent, bytesSent);
817 
818  // readjust limit
819  if (bytesToSend == maxBytesToSend)
820  {
821  bytesToSend = limit - buffer->getPosition();
822 
823  if(bytesToSend > maxBytesToSend)
824  bytesToSend = maxBytesToSend;
825 
826  buffer->setLimit(buffer->getPosition() + bytesToSend);
827  }
828  tries = 0;
829  }
830 }
831 
832 
834 {
835 
836  {
837  std::size_t senderProcessed = 0;
838  while (senderProcessed++ < MAX_MESSAGE_SEND)
839  {
840  TransportSender::shared_pointer sender;
841  _sendQueue.pop_front_try(sender);
842  if (sender.get() == 0)
843  {
844  // flush
845  if (_sendBuffer.getPosition() > 0)
846  flush(true);
847 
848  sendCompleted(); // do not schedule sending
849 
850  if (terminated()) // termination
851  break;
852  // termination (we want to process even if shutdown)
853  _sendQueue.pop_front(sender);
854  }
855 
856  try {
857  processSender(sender);
858  } catch(...) {
859  if (_sendBuffer.getPosition() > 0)
860  flush(true);
861  sendCompleted();
862  throw;
863  }
864  }
865  }
866 
867  // flush
868  if (_sendBuffer.getPosition() > 0)
869  flush(true);
870 }
871 
872 
874  TransportSender::shared_pointer const & sender) {
875  _sendQueue.push_back(sender);
876  scheduleSend();
877 }
878 
879 
881 {
883 }
884 
885 
886 void AbstractCodec::processSender(
887  TransportSender::shared_pointer const & sender)
888 {
889 
890  ScopedLock lock(sender);
891 
892  try {
893  _lastMessageStartPosition = _sendBuffer.getPosition();
894 
895  size_t before = atomic::get(_totalBytesSent) + _sendBuffer.getPosition();
896 
897  sender->send(&_sendBuffer, this);
898 
899  // automatic end (to set payload size)
900  endMessage(false);
901 
903 
904  atomic::add(sender->bytesTX, after - before);
905  }
906  catch (connection_closed_exception & ) {
907  throw;
908  }
909  catch (std::exception &e ) {
910 
911  std::ostringstream msg;
912  msg << "an exception caught while processing a send message: "
913  << e.what();
914  LOG(logLevelWarn, "%s at %s:%d.",
915  msg.str().c_str(), __FILE__, __LINE__);
916 
917  try {
918  close();
919  } catch (io_exception & ) {
920  // noop
921  }
922 
923  throw connection_closed_exception(msg.str());
924  }
925 }
926 
927 
929  TransportSender::shared_pointer const & sender,
930  std::size_t requiredBufferSize) {
931 
933  _sendQueue.empty() &&
934  _sendBuffer.getRemaining() >= requiredBufferSize)
935  {
936  processSender(sender);
937  if (_sendBuffer.getPosition() > 0)
938  {
939  scheduleSend();
940  }
941  }
942  else
943  enqueueSendRequest(sender);
944 }
945 
946 
948  _sendTo = sendTo;
949 }
950 
951 
952 void AbstractCodec::setByteOrder(int byteOrder)
953 {
954  _socketBuffer.setEndianess(byteOrder);
955  // TODO sync
956  _sendBuffer.setEndianess(byteOrder);
957  _byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00;
958 }
959 
960 
961 bool AbstractCodec::directSerialize(ByteBuffer* /*existingBuffer*/, const char* toSerialize,
962  std::size_t elementCount, std::size_t elementSize)
963 {
964  // TODO overflow check of "size_t count", overflow int32 field of payloadSize header field
965  // TODO max message size in connection validation
966  std::size_t count = elementCount * elementSize;
967 
968  // TODO find smart limit
969  // check if direct mode actually pays off
970  if (count < 64*1024)
971  return false;
972 
973  //
974  // first end current message, and write a header of next "directly serialized" message
975  //
976 
977  // first end current message indicating the we will segment
978  endMessage(true);
979 
980  // append segmented message header with payloadSize == count
981  // TODO size_t to int32
982  startMessage(_lastSegmentedMessageCommand, 0, static_cast<int32>(count));
983 
984  // flush
985  flushSendBuffer();
986 
987  // TODO think if alignment is preserved after...
988 
989  //
990  // send toSerialize buffer
991  //
992  ByteBuffer wrappedBuffer(const_cast<char*>(toSerialize), count);
993  send(&wrappedBuffer);
994 
995  //
996  // continue where we left before calling directSerialize
997  //
998  startMessage(_lastSegmentedMessageCommand, 0);
999 
1000  return true;
1001 }
1002 
1003 bool AbstractCodec::directDeserialize(ByteBuffer *existingBuffer, char* deserializeTo,
1004  std::size_t elementCount, std::size_t elementSize)
1005 {
1006  return false;
1007 }
1008 
1009 //
1010 //
1011 // BlockingAbstractCodec
1012 //
1013 //
1014 //
1015 
1017 {
1018  REFTRACE_DECREMENT(num_instances);
1019 
1020  waitJoin();
1021 }
1022 
1024  throw std::logic_error("should not be called for blocking IO");
1025 }
1026 
1027 
1029  throw std::logic_error("should not be called for blocking IO");
1030 }
1031 
1032 
1034 
1035  if (_isOpen.getAndSet(false))
1036  {
1037  // always close in the same thread, same way, etc.
1038  // wakeup processSendQueue
1039 
1040  // clean resources (close socket)
1041  internalClose();
1042 
1043  // Break sender from queue wait
1044  BreakTransport::shared_pointer B(new BreakTransport);
1045  enqueueSendRequest(B);
1046  }
1047 }
1048 
1050 {
1051  assert(!_isOpen.get());
1052  _sendThread.exitWait();
1053  _readThread.exitWait();
1054 }
1055 
1057 {
1058  {
1059 
1062  switch ( info )
1063  {
1065  epicsSocketDestroy ( _channel );
1066  break;
1068  {
1069  /*int status =*/ ::shutdown ( _channel, SHUT_RDWR );
1070  /*
1071  if ( status ) {
1072  char sockErrBuf[64];
1073  epicsSocketConvertErrnoToString (
1074  sockErrBuf, sizeof ( sockErrBuf ) );
1075  LOG(logLevelDebug,
1076  "TCP socket to %s failed to shutdown: %s.",
1077  inetAddressToString(_socketAddress).c_str(), sockErrBuf);
1078  }
1079  */
1080  epicsSocketDestroy ( _channel );
1081  }
1082  break;
1084  // not supported anymore anyway
1085  default:
1086  epicsSocketDestroy(_channel);
1087  }
1088  }
1089 
1090  Transport::shared_pointer thisSharedPtr = this->shared_from_this();
1091  _context->getTransportRegistry()->remove(thisSharedPtr);
1092 
1094  {
1096  "TCP socket to %s is to be closed.",
1097  _socketName.c_str());
1098  }
1099 }
1100 
1102  return !isOpen();
1103 }
1104 
1105 
1107  return _isOpen.get();
1108 }
1109 
1110 
1111 // NOTE: must not be called from constructor (e.g. needs shared_from_this())
1113 
1114  _readThread.start();
1115 
1116  _sendThread.start();
1117 
1118 }
1119 
1120 
1121 void BlockingTCPTransportCodec::receiveThread()
1122 {
1123  /* This innocuous ref. is an important hack.
1124  * The code behind Transport::close() will cause
1125  * channels and operations to drop references
1126  * to this transport. This ref. keeps it from
1127  * being destroyed way down the call stack, from
1128  * which it is apparently not possible to return
1129  * safely. Rather than try to untangle this
1130  * knot, just keep this ref...
1131  */
1132  Transport::shared_pointer ptr(this->shared_from_this());
1133 
1134  // initially enable timeout for all clients to weed out
1135  // impersonators (security scanners?)
1136  setRxTimeout(true);
1137 
1138  while (this->isOpen())
1139  {
1140  try {
1141  this->processRead();
1142  continue;
1143  } catch (std::exception &e) {
1144  PRINT_EXCEPTION(e);
1146  "an exception caught while in receiveThread at %s:%d: %s",
1147  __FILE__, __LINE__, e.what());
1148  } catch (...) {
1150  "unknown exception caught while in receiveThread at %s:%d.",
1151  __FILE__, __LINE__);
1152  }
1153  // exception
1154  close();
1155  }
1156 }
1157 
1158 
1159 void BlockingTCPTransportCodec::sendThread()
1160 {
1161  // cf. the comment in receiveThread()
1162  Transport::shared_pointer ptr(this->shared_from_this());
1163 
1164  this->setSenderThread();
1165 
1166  while (this->isOpen())
1167  {
1168  try {
1169  this->processWrite();
1170  continue;
1171  } catch (connection_closed_exception &cce) {
1172  // noop
1173  } catch (std::exception &e) {
1174  PRINT_EXCEPTION(e);
1175  LOG(logLevelWarn,
1176  "an exception caught while in sendThread at %s:%d: %s",
1177  __FILE__, __LINE__, e.what());
1178  } catch (...) {
1179  LOG(logLevelWarn,
1180  "unknown exception caught while in sendThread at %s:%d.",
1181  __FILE__, __LINE__);
1182  }
1183  // exception
1184  close();
1185  }
1186  _sendQueue.clear();
1187 }
1188 
1190 {
1191  double timeout = !ena ? 0.0 : std::max(0.0, _context->getConfiguration()->getPropertyAsDouble("EPICS_PVA_CONN_TMO", 30.0));
1192 #ifdef _WIN32
1193  DWORD timo = DWORD(timeout*1000); // in milliseconds
1194 #else
1195  timeval timo;
1196  timo.tv_sec = unsigned(timeout);
1197  timo.tv_usec = (timeout-timo.tv_sec)*1e6;
1198 #endif
1199 
1200  int ret = setsockopt(_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timo, sizeof(timo));
1201  if(ret==-1) {
1202  int err = SOCKERRNO;
1203  static int lasterr;
1204  if(err!=lasterr) {
1205  errlogPrintf("%s: Unable to set RX timeout: %d\n", _socketName.c_str(), err);
1206  lasterr = err;
1207  }
1208  }
1209 }
1210 
1212  // TODO constants
1213  epicsThreadSleep(std::max<double>(tries * 0.1, 1));
1214 }
1215 
1216 
1217 //
1218 //
1219 // BlockingTCPTransportCodec
1220 //
1221 //
1222 //
1223 
1225 
1226 BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Context::shared_pointer &context,
1227  SOCKET channel, const ResponseHandler::shared_pointer &responseHandler,
1228  size_t sendBufferSize,
1229  size_t receiveBufferSize, int16 priority)
1230  :AbstractCodec(
1231  serverFlag,
1232  sendBufferSize,
1233  receiveBufferSize,
1234  sendBufferSize,
1235  true)
1236  ,_readThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::receiveThread)
1238  .name("TCP-rx")
1239  .stack(epicsThreadStackBig)
1240  .autostart(false))
1241  ,_sendThread(epics::pvData::Thread::Config(this, &BlockingTCPTransportCodec::sendThread)
1243  .name("TCP-tx")
1244  .stack(epicsThreadStackBig)
1245  .autostart(false))
1246  ,_channel(channel)
1247  ,_context(context), _responseHandler(responseHandler)
1248  ,_remoteTransportReceiveBufferSize(MAX_TCP_RECV)
1249  ,_priority(priority)
1250  ,_verified(false)
1251 {
1252  REFTRACE_INCREMENT(num_instances);
1253 
1254  _isOpen.getAndSet(true);
1255 
1256  // get remote address
1257  osiSocklen_t saSize = sizeof(sockaddr);
1258  int retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
1259  if(unlikely(retval<0)) {
1260  char errStr[64];
1261  epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
1263  "Error fetching socket remote address: %s.",
1264  errStr);
1265  _socketName = "<unknown>:0";
1266  } else {
1267  char ipAddrStr[24];
1268  ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
1269  _socketName = ipAddrStr;
1270  }
1271 }
1272 
1273 
1275  close();
1276 }
1277 
1278 
1281 
1282  std::size_t remaining;
1283  while((remaining=src->getRemaining()) > 0) {
1284 
1285  int bytesSent = ::send(_channel,
1286  &src->getBuffer()[src->getPosition()],
1287  remaining, 0);
1288 
1289  // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
1290 
1291  // TODO winsock return 0 on disconnect (blocking socket) ?
1292 
1293  if(unlikely(bytesSent<0)) {
1294 
1295  int socketError = SOCKERRNO;
1296 
1297  // spurious EINTR check
1298  if (socketError==SOCK_EINTR)
1299  continue;
1300  else if (socketError==SOCK_ENOBUFS)
1301  return 0;
1302  }
1303 
1304  if (bytesSent > 0) {
1305  src->setPosition(src->getPosition() + bytesSent);
1306  }
1307 
1308  return bytesSent;
1309 
1310  }
1311 
1312  return 0;
1313 }
1314 
1315 
1317 
1318  std::size_t remaining;
1319  while((remaining=dst->getRemaining()) > 0) {
1320 
1321  // read
1322  std::size_t pos = dst->getPosition();
1323 
1324  int bytesRead = ::recv(_channel,
1325  (char*)(dst->getBuffer()+pos), remaining, 0);
1326 
1327  // NOTE: do not log here, you might override SOCKERRNO relevant to recv() operation above
1328 
1329  if(unlikely(bytesRead==0)) {
1330  return -1; // 0 means connection loss for blocking transport, notify codec by returning -1
1331 
1332  } else if(unlikely(bytesRead<0)) {
1333  int err = SOCKERRNO;
1334 
1335  if(err==SOCK_EINTR) {
1336  // interrupted by signal. Retry
1337  continue;
1338 
1339  } else if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINPROGRESS
1340  || err==SOCK_ETIMEDOUT
1341  || err==SOCK_ECONNABORTED || err==SOCK_ECONNRESET
1342  ) {
1343  // different ways of saying timeout.
1344  // Linux: EAGAIN or EWOULDBLOCK, or EINPROGRESS
1345  // WIN32: WSAETIMEDOUT
1346  // others that RSRV checks for, but may not need to, ECONNABORTED, ECONNRESET
1347 
1348  // Note: with windows, after ETIMEOUT leaves the socket in an undefined state.
1349  // so it must be closed. (cf. SO_RCVTIMEO)
1350 
1351  return -1;
1352 
1353  } else {
1354  // some other (fatal) error
1355  if(_isOpen.get())
1356  errlogPrintf("%s : Connection closed with RX socket error %d\n", _socketName.c_str(), err);
1357  return -1;
1358  }
1359  }
1360 
1361  dst->setPosition(dst->getPosition() + bytesRead);
1362  return bytesRead;
1363  }
1364 
1365  return 0;
1366 }
1367 
1368 
1370  return _verifiedEvent.wait(timeoutMs/1000.0) && _verified;
1371 }
1372 
1375 
1376  if (IS_LOGGABLE(logLevelDebug) && !status.isOK())
1377  {
1378  LOG(logLevelDebug, "Failed to verify connection to %s: %s.", _socketName.c_str(), status.getMessage().c_str());
1379  }
1380 
1381  {
1382  Guard G(_mutex);
1383  _verified = status.isSuccess();
1384  }
1386 }
1387 
1388 void BlockingTCPTransportCodec::authNZMessage(epics::pvData::PVStructure::shared_pointer const & data) {
1389  AuthenticationSession::shared_pointer sess;
1390  {
1391  Guard G(_mutex);
1392  sess = _authSession;
1393  }
1394  if (sess)
1395  sess->messageReceived(data);
1396  else
1397  {
1398  char ipAddrStr[24];
1399  ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr));
1400  LOG(logLevelWarn, "authNZ message received from '%s' but no security plug-in session active.", ipAddrStr);
1401  }
1402 }
1403 
1404 
1406 public:
1408 
1409  SecurityPluginMessageTransportSender(PVStructure::const_shared_pointer const & data) :
1410  _data(data)
1411  {
1412  }
1413 
1414  void send(ByteBuffer* buffer, TransportSendControl* control) {
1415  control->startMessage(CMD_AUTHNZ, 0);
1416  SerializationHelper::serializeFull(buffer, control, _data);
1417  // send immediately
1418  control->flush(true);
1419  }
1420 
1421 private:
1422  PVStructure::const_shared_pointer _data;
1423 };
1424 
1425 void BlockingTCPTransportCodec::sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer const & data) {
1426  SecurityPluginMessageTransportSender::shared_pointer spmts(new SecurityPluginMessageTransportSender(data));
1427  enqueueSendRequest(spmts);
1428 }
1429 
1430 
1431 
1432 
1433 
1435  Context::shared_pointer const & context,
1436  SOCKET channel,
1437  ResponseHandler::shared_pointer const & responseHandler,
1438  int32_t sendBufferSize,
1439  int32_t receiveBufferSize)
1440  :BlockingTCPTransportCodec(true, context, channel, responseHandler,
1441  sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY)
1442  ,_lastChannelSID(0x12003400)
1443  ,_verificationStatus(pvData::Status::fatal("Uninitialized error"))
1444  ,_verifyOrVerified(false)
1445 {
1446  // NOTE: priority not yet known, default priority is used to
1447  //register/unregister
1448  // TODO implement priorities in Reactor... not that user will
1449  // change it.. still getPriority() must return "registered" priority!
1450 }
1451 
1452 
1454 }
1455 
1456 
1458 
1459  Lock lock(_channelsMutex);
1460  // search first free (theoretically possible loop of death)
1461  pvAccessID sid = ++_lastChannelSID;
1462  while(_channels.find(sid)!=_channels.end())
1463  sid = ++_lastChannelSID;
1464  return sid;
1465 }
1466 
1467 
1469  pvAccessID sid,
1470  ServerChannel::shared_pointer const & channel) {
1471 
1472  Lock lock(_channelsMutex);
1473  _channels[sid] = channel;
1474 
1475 }
1476 
1477 
1479 
1480  Lock lock(_channelsMutex);
1481  _channels.erase(sid);
1482 }
1483 
1484 
1485 ServerChannel::shared_pointer
1487 
1488  Lock lock(_channelsMutex);
1489 
1490  std::map<pvAccessID, ServerChannel::shared_pointer>::iterator it =
1491  _channels.find(sid);
1492 
1493  if(it!=_channels.end()) return it->second;
1494 
1495  return ServerChannel::shared_pointer();
1496 }
1497 
1498 
1500 
1501  Lock lock(_channelsMutex);
1502  return _channels.size();
1503 }
1504 
1505 void BlockingServerTCPTransportCodec::getChannels(std::vector<ServerChannel::shared_pointer>& channels) const
1506 {
1507  Lock lock(_channelsMutex);
1508  for(_channels_t::const_iterator it(_channels.begin()), end(_channels.end());
1509  it!=end; ++it)
1510  {
1511  channels.push_back(it->second);
1512  }
1513 }
1514 
1516  TransportSendControl* control) {
1517 
1518  if (!_verifyOrVerified)
1519  {
1520  _verifyOrVerified = true;
1521 
1522  //
1523  // set byte order control message
1524  //
1525 
1527  buffer->putByte(PVA_MAGIC);
1529  buffer->putByte(
1530  0x01 | 0x40 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG)
1531  ? 0x80 : 0x00)); // control + server + endian
1532  buffer->putByte(CMD_SET_ENDIANESS); // set byte order
1533  buffer->putInt(0);
1534 
1535 
1536  //
1537  // send verification message
1538  //
1539  control->startMessage(CMD_CONNECTION_VALIDATION, 4+2);
1540 
1541  // receive buffer size
1542  buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
1543 
1544  // server introspection registy max size
1545  // TODO
1546  buffer->putShort(0x7FFF);
1547 
1548  // list of authNZ plugin names advertised to this client
1549 
1551  AuthenticationRegistry::servers().snapshot(plugins); // copy
1552  std::vector<std::string> validSPNames;
1553  validSPNames.reserve(plugins.size()); // assume all will be valid
1554 
1555  PeerInfo info;
1556  info.transport = "pva";
1557  info.peer = _socketName;
1558  info.transportVersion = this->getRevision();
1559 
1560  // filter plugins which may be used by this peer
1561  for(AuthenticationRegistry::list_t::iterator it(plugins.begin()), end(plugins.end());
1562  it!=end; ++it)
1563  {
1564  info.authority = it->first;
1565  if(it->second->isValidFor(info))
1566  validSPNames.push_back(it->first);
1567  }
1568 
1569  SerializeHelper::writeSize(validSPNames.size(), buffer, this);
1570  for (vector<string>::const_iterator iter(validSPNames.begin()), end(validSPNames.end());
1571  iter != end; iter++)
1572  {
1573  SerializeHelper::serializeString(*iter, buffer, this);
1574  }
1575 
1576  {
1577  Guard G(_mutex);
1578  advertisedAuthPlugins.swap(validSPNames);
1579  }
1580 
1581  // send immediately
1582  control->flush(true);
1583  }
1584  else
1585  {
1586  //
1587  // send verified message
1588  //
1590 
1591  pvData::Status sts;
1592  {
1593  Lock lock(_mutex);
1594  sts = _verificationStatus;
1595  }
1596  sts.serialize(buffer, control);
1597 
1598  // send immediately
1599  control->flush(true);
1600 
1601  }
1602 }
1603 
1605  Lock lock(_channelsMutex);
1606  if(_channels.size()==0) return;
1607 
1609  {
1610  LOG(
1611  logLevelDebug,
1612  "Transport to %s still has %zu channel(s) active and closing...",
1613  _socketName.c_str(), _channels.size());
1614  }
1615 
1616  _channels_t temp;
1617  temp.swap(_channels);
1618 
1619  for(_channels_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it)
1620  it->second->destroy();
1621 }
1622 
1624  Transport::shared_pointer thisSharedPtr = shared_from_this();
1627 }
1628 
1630  const std::tr1::shared_ptr<PeerInfo>& peer)
1631 {
1633  {
1634  LOG(logLevelDebug, "Authentication completed with status '%s' for PVA client: %s.", Status::StatusTypeName[status.getType()], _socketName.c_str());
1635  }
1636 
1637  if(peer)
1639 
1640  bool isVerified;
1641  {
1642  Guard G(_mutex);
1643  isVerified = _verified;
1644  if(status.isSuccess())
1645  _peerInfo = peer;
1646  else
1647  _peerInfo.reset();
1648  }
1649 
1650  if (!isVerified)
1651  verified(status);
1652  else if (!status.isSuccess())
1653  {
1654  string errorMessage = "Re-authentication failed: " + status.getMessage();
1655  if (!status.getStackDump().empty())
1656  errorMessage += "\n" + status.getStackDump();
1657  LOG(logLevelInfo, "%s", errorMessage.c_str());
1658 
1659  close();
1660  }
1661 }
1662 
1663 void BlockingServerTCPTransportCodec::authNZInitialize(const std::string& securityPluginName,
1664  const epics::pvData::PVStructure::shared_pointer& data)
1665 {
1666  AuthenticationPlugin::shared_pointer plugin(AuthenticationRegistry::servers().lookup(securityPluginName));
1667  // attempting the force use of an un-advertised/non-existant plugin is treated as a protocol error.
1668  // We cheat here by assuming the the registry doesn't often change after server start,
1669  // and don't test if securityPluginName is in advertisedAuthPlugins
1670  if(!plugin)
1671  throw std::runtime_error(_socketName+" failing attempt to select non-existant auth. plugin "+securityPluginName);
1672 
1673  PeerInfo::shared_pointer info(new PeerInfo);
1674  info->peer = _socketName;
1675  info->transport = "pva";
1676  info->transportVersion = getRevision();
1677  info->authority = securityPluginName;
1678 
1679  if (!plugin->isValidFor(*info))
1680  verified(pvData::Status::error("invalid security plug-in name"));
1681 
1683  {
1684  LOG(logLevelDebug, "Accepted security plug-in '%s' for PVA client: %s.", securityPluginName.c_str(), _socketName.c_str());
1685  }
1686 
1687  AuthenticationSession::shared_pointer sess(plugin->createSession(info, shared_from_this(), data));
1688 
1689  Guard G(_mutex);
1690  _authSessionName = securityPluginName;
1691  _authSession.swap(sess);
1692 }
1693 
1694 
1695 
1696 
1697 
1699  Context::shared_pointer const & context,
1700  SOCKET channel,
1701  ResponseHandler::shared_pointer const & responseHandler,
1702  int32_t sendBufferSize,
1703  int32_t receiveBufferSize,
1704  ClientChannelImpl::shared_pointer const & client,
1705  epics::pvData::int8 /*remoteTransportRevision*/,
1706  float heartbeatInterval,
1707  int16_t priority ) :
1708  BlockingTCPTransportCodec(false, context, channel, responseHandler,
1709  sendBufferSize, receiveBufferSize, priority),
1710  _connectionTimeout(heartbeatInterval),
1711  _verifyOrEcho(true),
1712  sendQueued(true) // don't start sending echo until after auth complete
1713 {
1714  // initialize owners list, send queue
1715  acquire(client);
1716 
1717  // use immediate for clients
1718  //setFlushStrategy(DELAYED);
1719 }
1720 
1722 {
1724  // add some randomness to our timer phase
1725  double R = float(rand())/RAND_MAX; // [0, 1]
1726  // shape a bit
1727  R = R*0.5 + 0.5; // [0.5, 1.0]
1728  _context->getTimer()->schedulePeriodic(tcb, _connectionTimeout/2.0*R, _connectionTimeout/2.0);
1730 }
1731 
1733 }
1734 
1735 
1736 
1737 
1738 
1739 
1740 
1741 
1742 
1744 {
1745  {
1746  Guard G(_mutex);
1747  if(sendQueued) return;
1748  sendQueued = true;
1749  }
1750  // send echo
1751  TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
1752  enqueueSendRequest(transportSender);
1753 }
1754 
1755 #define EXCEPTION_GUARD(code) try { code; } \
1756  catch (std::exception &e) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
1757  catch (...) { LOG(logLevelError, "Unhandled exception caught from code at %s:%d.", __FILE__, __LINE__); }
1758 
1759 bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer const & client) {
1760  Lock lock(_mutex);
1761  if(isClosed()) return false;
1762 
1764  {
1765  LOG(logLevelDebug, "Acquiring transport to %s.", _socketName.c_str());
1766  }
1767 
1768  _owners[client->getID()] = ClientChannelImpl::weak_pointer(client);
1769  //_owners.insert(ClientChannelImpl::weak_pointer(client));
1770 
1771  return true;
1772 }
1773 
1774 // _mutex is held when this method is called
1777 
1779  _context->getTimer()->cancel(tcb);
1780 
1781  // _owners cannot change when transport is closed
1782 
1783  // Notifies clients about disconnect.
1784 
1785  // check if still acquired
1786  size_t refs = _owners.size();
1787  if(refs>0) {
1788 
1790  {
1791  LOG(
1792  logLevelDebug,
1793  "Transport to %s still has %zu client(s) active and closing...",
1794  _socketName.c_str(), refs);
1795  }
1796 
1797  TransportClientMap_t::iterator it = _owners.begin();
1798  for(; it!=_owners.end(); it++) {
1799  ClientChannelImpl::shared_pointer client = it->second.lock();
1800  if (client)
1801  {
1802  EXCEPTION_GUARD(client->transportClosed());
1803  }
1804  }
1805 
1806  }
1807 
1808  _owners.clear();
1809 }
1810 
1811 //void BlockingClientTCPTransportCodec::release(ClientChannelImpl::shared_pointer const & client) {
1813  Lock lock(_mutex);
1814  if(isClosed()) return;
1815 
1817  {
1818  LOG(logLevelDebug, "Releasing TCP transport to %s.", _socketName.c_str());
1819  }
1820 
1821  _owners.erase(clientID);
1822  //_owners.erase(ClientChannelImpl::weak_pointer(client));
1823 
1824  // not used anymore, close it
1825  // TODO consider delayed destruction (can improve performance!!!)
1826  if(_owners.size()==0) {
1827  lock.unlock();
1828  close();
1829  }
1830 }
1831 
1833  TransportSendControl* control)
1834 {
1835  bool voe;
1836  {
1837  Guard G(_mutex);
1838  sendQueued = false;
1839  voe = _verifyOrEcho;
1840  _verifyOrEcho = false;
1841  }
1842 
1843  if(voe) {
1844  /*
1845  * send verification response message
1846  */
1847 
1848  control->startMessage(CMD_CONNECTION_VALIDATION, 4+2+2);
1849 
1850  // receive buffer size
1851  buffer->putInt(static_cast<int32>(getReceiveBufferSize()));
1852 
1853  // max introspection registry size
1854  // TODO
1855  buffer->putShort(0x7FFF);
1856 
1857  // QoS (aka connection priority)
1858  buffer->putShort(getPriority());
1859 
1860  std::string pluginName;
1861  AuthenticationSession::shared_pointer session;
1862  {
1863  Guard G(_mutex);
1864  pluginName = _authSessionName;
1865  session = _authSession;
1866  }
1867 
1868  if (session)
1869  {
1870  // selected authNZ plug-in name
1872 
1873  // optional authNZ plug-in initialization data
1874  SerializationHelper::serializeFull(buffer, control, session->initializationData());
1875  }
1876  else
1877  {
1878  //TODO: allowed?
1879  // emptry authNZ plug-in name
1880  SerializeHelper::serializeString("", buffer, control);
1881 
1882  // no authNZ plug-in initialization data
1883  SerializationHelper::serializeNullField(buffer, control);
1884  }
1885 
1886  // send immediately
1887  control->flush(true);
1888  }
1889  else {
1890  control->startMessage(CMD_ECHO, 0);
1891  // send immediately
1892  control->flush(true);
1893  }
1894 
1895 }
1896 
1897 
1898 void BlockingClientTCPTransportCodec::authNZInitialize(const std::vector<std::string>& offeredSecurityPlugins)
1899 {
1901  std::string selectedName;
1902  AuthenticationPlugin::shared_pointer plugin;
1903 
1904  // because of a missing break; the original SecurityPlugin effectively treated the offered list as being
1905  // in order of increasing preference (last is preferred).
1906  // we continue with this because, hey isn't compatibility fun...
1907 
1908  for(std::vector<std::string>::const_reverse_iterator it(offeredSecurityPlugins.rbegin()), end(offeredSecurityPlugins.rend());
1909  it!=end; ++it)
1910  {
1911  plugin = plugins.lookup(*it);
1912  if(plugin) {
1913  selectedName = *it;
1914  break;
1915  }
1916  }
1917 
1918  if(!plugin) {
1919  // mis-match and legacy. some early servers (java?) don't advertise any plugins.
1920  // treat this as anonymous
1921  selectedName = "anonymous";
1922  plugin = plugins.lookup(selectedName);
1923  assert(plugin); // fallback required
1924  }
1925 
1926  {
1927  PeerInfo::shared_pointer info(new PeerInfo);
1928  info->peer = _socketName; // this is the server name
1929  info->transport = "pva";
1930  info->transportVersion = getRevision();
1931  info->authority = selectedName;
1932 
1933  AuthenticationSession::shared_pointer sess(plugin->createSession(info, shared_from_this(), pvData::PVStructure::shared_pointer()));
1934 
1935  Guard G(_mutex);
1936  _authSessionName = selectedName;
1937  _authSession = sess;
1938  }
1939 
1940  TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
1941  enqueueSendRequest(transportSender);
1942 }
1943 
1945  const std::tr1::shared_ptr<PeerInfo>& peer)
1946 {
1947  // noop for client side (server will send ConnectionValidation message)
1948 }
1949 
1951 {
1952  AuthenticationSession::shared_pointer sess;
1953  {
1954  Guard G(_mutex);
1955  sess = _authSession;
1956  }
1957  if(sess)
1958  sess->authenticationComplete(status);
1960 }
1961 
1962 }
1963 }
1964 }
int8_t int8
Definition: pvType.h:75
double timeout
Definition: pvutils.cpp:25
LIBCOM_API void epicsStdCall epicsSocketDestroy(SOCKET s)
Definition: osdSock.c:117
void fatal(char *msg) NORETURN
Definition: error.c:15
epics::pvData::ByteBuffer _socketBuffer
Definition: codec.h:231
static const std::size_t MAX_ENSURE_DATA_SIZE
Definition: codec.h:136
#define EXCEPTION_GUARD(code)
Definition: codec.cpp:1755
Definition: link.h:174
#define max(x, y)
Definition: flexdef.h:81
#define SOCK_ECONNABORTED
Definition: osdSock.h:59
virtual int read(epics::pvData::ByteBuffer *dst)=0
std::string request
virtual void ensureData(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:423
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
epicsInt32 pvAccessID
Definition: pvaDefs.h:18
size_t elementSize(ScalarType id)
gives sizeof(T) where T depends on the scalar type id.
Definition: TypeFunc.cpp:82
static const std::size_t MAX_MESSAGE_PROCESS
Definition: codec.h:133
Information provded by a client to a server-type ChannelProvider.
Definition: security.h:119
EPICS_ALWAYS_INLINE int8 getByte()
Definition: byteBuffer.h:617
virtual bool directDeserialize(epics::pvData::ByteBuffer *, char *, std::size_t, std::size_t) OVERRIDE
Definition: codec.cpp:1003
epicsMutexId lock
Definition: osiClockTime.c:37
pvd::Status status
int osiSocklen_t
Definition: osdSock.h:36
#define true
Definition: flexdef.h:84
#define SOCK_EINPROGRESS
Definition: osdSock.h:60
static Status error(const std::string &m)
Definition: status.h:50
virtual void sendBufferFull(int tries)=0
int i
Definition: scan.c:967
EPICS_ALWAYS_INLINE void putInt(int32 value)
Definition: byteBuffer.h:537
const char * getBuffer() const
Definition: byteBuffer.h:294
#define SOCK_ENOBUFS
Definition: osdSock.h:52
struct sockaddr sa
Definition: osiSock.h:158
Class that must be implemented by code that makes Timer requests.
Definition: timer.h:40
static void serializeFull(epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control, const epics::pvData::PVField::const_shared_pointer &pvField)
virtual void waitJoin() OVERRIDE FINAL
Call after close() to wait for any worker threads to exit.
Definition: codec.cpp:1049
#define min(x, y)
Definition: flexdef.h:78
const epics::pvData::int8 PVA_MAGIC
Definition: pvaConstants.h:29
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
Definition: codec.cpp:1515
void run(const std::tr1::shared_ptr< PeerInfo > &peer)
Definition: security.cpp:290
virtual void flush(bool lastMessageCompleted) OVERRIDE FINAL
Definition: codec.cpp:747
const epics::pvData::int8 PVA_CLIENT_PROTOCOL_REVISION
Definition: pvaConstants.h:32
virtual void readPollOne() OVERRIDE FINAL
Definition: codec.cpp:1023
struct sockaddr_in ia
Definition: osiSock.h:157
virtual void alignData(std::size_t alignment) OVERRIDE FINAL
Definition: codec.cpp:546
#define SOCK_ETIMEDOUT
Definition: osdSock.h:54
std::string transport
transport protocol used eg. "pva". Must not be empty.
Definition: security.h:125
std::size_t getLimit() const
Definition: byteBuffer.h:368
static void serializeNullField(epics::pvData::ByteBuffer *buffer, epics::pvData::SerializableControl *control)
pvd::StructureConstPtr type
virtual int write(epics::pvData::ByteBuffer *src) OVERRIDE FINAL
Definition: codec.cpp:1279
virtual void waitJoin()
Call after close() to wait for any worker threads to exit.
Definition: remote.h:258
enum epicsSocketSystemCallInterruptMechanismQueryInfo epicsSocketSystemCallInterruptMechanismQuery()
Definition: memory.hpp:41
virtual void endMessage() OVERRIDE FINAL
Definition: codec.cpp:632
TODO only here because of the Lockable.
Definition: ntaggregate.cpp:16
virtual void setRxTimeout(bool ena)
Definition: codec.h:217
virtual void ensureBuffer(std::size_t size) OVERRIDE FINAL
Definition: codec.cpp:701
epicsGuard< epicsMutex > Guard
Definition: codec.cpp:45
virtual bool verify(epics::pvData::int32 timeoutMs) OVERRIDE
Definition: codec.cpp:1369
virtual void invalidDataStreamHandler() OVERRIDE FINAL
Definition: codec.cpp:1274
virtual void sendBufferFull(int tries) OVERRIDE FINAL
Definition: codec.cpp:1211
A lock for multithreading.
Definition: lock.h:36
virtual void verified(epics::pvData::Status const &status) OVERRIDE FINAL
Definition: codec.cpp:1950
virtual void setRxTimeout(bool ena) OVERRIDE FINAL
Definition: codec.cpp:1189
#define PRINT_EXCEPTION(EI)
static void writeSize(std::size_t s, ByteBuffer *buffer, SerializableControl *flusher)
void unlock()
Definition: lock.h:66
EPICS_ALWAYS_INLINE std::size_t getSize() const
Definition: byteBuffer.h:400
bucket * lookup(char *name)
Definition: symtab.c:66
virtual void flushSerializeBuffer() OVERRIDE FINAL
Definition: codec.cpp:722
virtual void release(pvAccessID clientId) OVERRIDE FINAL
Definition: codec.cpp:1812
std::size_t getPosition() const
Definition: byteBuffer.h:346
epics::pvData::Mutex _mutex
Definition: codec.h:264
epicsSocketSystemCallInterruptMechanismQueryInfo
Definition: osiSock.h:47
#define OVERRIDE
Definition: pvAccess.h:55
static std::size_t alignedValue(std::size_t value, std::size_t alignment)
Definition: codec.cpp:537
const std::string & getMessage() const
Definition: status.h:80
virtual void writePollOne() OVERRIDE FINAL
Definition: codec.cpp:1028
virtual void flush(bool lastMessageCompleted)=0
BlockingTCPTransportCodec(bool serverFlag, Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, size_t sendBufferSize, size_t receiveBufferSize, epics::pvData::int16 priority)
Definition: codec.cpp:1226
void epicsSocketConvertErrnoToString(char *pBuf, unsigned bufSize)
EPICS_ALWAYS_INLINE int32 getInt()
Definition: byteBuffer.h:629
virtual bool terminated() OVERRIDE FINAL
Definition: codec.cpp:1101
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity=0, epics::pvData::int32 payloadSize=0) OVERRIDE FINAL
Definition: codec.cpp:594
virtual void setByteOrder(int byteOrder) OVERRIDE FINAL
Definition: codec.cpp:952
#define SOCK_ECONNRESET
Definition: osdSock.h:53
Holds all PVA related.
Definition: pvif.h:34
static const std::size_t MAX_MESSAGE_SEND
Definition: codec.h:134
virtual void sendSecurityPluginMessage(epics::pvData::PVStructure::const_shared_pointer const &data) OVERRIDE FINAL
Definition: codec.cpp:1425
#define LOG(level, format,...)
Definition: logger.h:48
static AuthenticationRegistry & servers()
The server side of the conversation.
Definition: security.cpp:204
void authNZInitialize(const std::string &securityPluginName, const epics::pvData::PVStructure::shared_pointer &data)
Definition: codec.cpp:1663
void setPosition(std::size_t pos)
Definition: byteBuffer.h:357
std::string peer
network address of remote peer. eg. "192.168.1.1:5075".
Definition: security.h:124
pvData
Definition: monitor.h:428
AuthenticationSession::shared_pointer _authSession
Definition: codec.h:425
void send(ByteBuffer *buffer, TransportSendControl *control)
Definition: codec.cpp:1414
virtual void send(epics::pvData::ByteBuffer *buffer, TransportSendControl *control) OVERRIDE FINAL
Definition: codec.cpp:1832
void authNZInitialize(const std::vector< std::string > &offeredSecurityPlugins)
Definition: codec.cpp:1898
Definition: server.h:76
void serialize(ByteBuffer *buffer, SerializableControl *flusher) const
Definition: status.cpp:45
virtual bool isOpen() OVERRIDE FINAL
Definition: codec.cpp:1106
std::tr1::shared_ptr< ServerChannel > getChannel(pvAccessID sid)
Definition: codec.cpp:1486
void getChannels(std::vector< std::tr1::shared_ptr< ServerChannel > > &channels) const
Definition: codec.cpp:1505
epics::pvData::int8 getRevision() const
Definition: codec.h:205
virtual bool isClosed() OVERRIDE FINAL
Definition: codec.h:375
std::tr1::shared_ptr< TimerCallback > TimerCallbackPtr
Definition: timer.h:32
void snapshot(list_t &plugmap) const
Save a copy of the current registry in order of increasing priority.
Definition: security.cpp:211
SecurityPluginMessageTransportSender(PVStructure::const_shared_pointer const &data)
Definition: codec.cpp:1409
static const std::size_t MAX_ENSURE_SIZE
Definition: codec.h:135
#define EPICS_ENDIAN_BIG
Definition: epicsEndian.h:16
epicsMutexId lock
Definition: server.h:82
const std::string & getStackDump() const
Definition: status.h:86
const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE
Definition: pvaConstants.h:47
bool isSuccess() const
Definition: status.h:103
AuthenticationPlugin::shared_pointer lookup(const std::string &name) const
Definition: security.cpp:242
virtual void verified(epics::pvData::Status const &status) OVERRIDE FINAL
Definition: codec.h:511
EPICS_ALWAYS_INLINE void putByte(int8 value)
Definition: byteBuffer.h:525
This class implements a Bytebuffer that is like the java.nio.ByteBuffer.
Definition: byteBuffer.h:233
BSD and SRV5 Unix timestamp.
Definition: epicsTime.h:52
int SOCKET
Definition: osdSock.h:31
virtual int read(epics::pvData::ByteBuffer *dst) OVERRIDE FINAL
Definition: codec.cpp:1316
static const char * StatusTypeName[]
Definition: status.h:45
BlockingServerTCPTransportCodec(Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, int32_t sendBufferSize, int32_t receiveBufferSize)
Definition: codec.cpp:1434
static void serializeString(const std::string &value, ByteBuffer *buffer, SerializableControl *flusher)
std::size_t getRemaining() const
Definition: byteBuffer.h:391
virtual void authenticationCompleted(epics::pvData::Status const &status, const std::tr1::shared_ptr< PeerInfo > &peer) OVERRIDE FINAL
Definition: codec.cpp:1944
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
const epics::pvData::int8 PVA_SERVER_PROTOCOL_REVISION
Definition: pvaConstants.h:31
virtual epics::pvData::int16 getPriority() const OVERRIDE FINAL
Definition: codec.h:340
#define unlikely(x)
Definition: likely.h:15
#define SOCKERRNO
Definition: osdSock.h:33
#define epicsThreadPriorityCAServerLow
Definition: epicsThread.h:80
virtual std::size_t getReceiveBufferSize() const OVERRIDE FINAL
Definition: codec.h:335
virtual int write(epics::pvData::ByteBuffer *src)=0
virtual void authenticationCompleted(epics::pvData::Status const &status, const std::tr1::shared_ptr< PeerInfo > &peer) OVERRIDE FINAL
Definition: codec.cpp:1629
virtual void lock()
Definition: pvAccess.h:97
#define SOCK_EINTR
Definition: osdSock.h:64
void setEndianess(int byteOrder)
Definition: byteBuffer.h:285
LIBCOM_API void epicsStdCall epicsThreadSleep(double seconds)
Block the calling thread for at least the specified time.
Definition: osdThread.c:790
PeerInfo::const_shared_pointer _peerInfo
Definition: codec.h:428
Definition: caget.c:48
fair_queue< TransportSender > _sendQueue
Definition: codec.h:234
C++ wrapper for epicsThread from EPICS base.
Definition: thread.h:65
epics::pvData::ByteBuffer _sendBuffer
Definition: codec.h:232
bool isOK() const
Definition: status.h:95
void registerChannel(pvAccessID sid, std::tr1::shared_ptr< ServerChannel > const &channel)
Definition: codec.cpp:1468
shared_ptr< T > dynamic_pointer_cast(shared_ptr< U > const &r) BOOST_NOEXCEPT
Definition: shared_ptr.hpp:808
static AuthorizationRegistry & plugins()
Definition: security.cpp:258
int16_t int16
Definition: pvType.h:79
static AuthenticationRegistry & clients()
The client side of the conversation.
Definition: security.cpp:197
std::vector< map_t::mapped_type > list_t
Definition: security.h:239
const epics::pvData::int8 _clientServerFlag
Definition: codec.h:260
const epics::pvData::int16 PVA_DEFAULT_PRIORITY
Definition: pvaConstants.h:70
EPICS time-stamps (epicsTimeStamp), epicsTime C++ class and C functions for handling wall-clock times...
virtual bool acquire(std::tr1::shared_ptr< ClientChannelImpl > const &client) OVERRIDE FINAL
Definition: codec.cpp:1759
#define EPICS_BYTE_ORDER
Definition: osdWireConfig.h:16
virtual bool directSerialize(epics::pvData::ByteBuffer *, const char *, std::size_t, std::size_t) OVERRIDE
Definition: codec.cpp:961
std::string authority
authentication mechanism used. eg. "anonymous" or "gssapi". Must not be empty.
Definition: security.h:126
static size_t num_instances
Definition: remote.h:168
C++ and C descriptions for a thread.
void send(epics::pvData::ByteBuffer *buffer)
Definition: codec.cpp:776
const epics::pvData::int32 MAX_TCP_RECV
Definition: pvaConstants.h:64
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize=0)=0
virtual void alignBuffer(std::size_t alignment) OVERRIDE FINAL
Definition: codec.cpp:580
unsigned transportVersion
If applicable, the protocol minor version number.
Definition: security.h:137
virtual const osiSockAddr * getLastReadBufferSocketAddress()=0
virtual void enqueueSendRequest(TransportSender::shared_pointer const &sender) OVERRIDE FINAL
Definition: codec.cpp:873
StatusType getType() const
Definition: status.h:74
void setLimit(std::size_t limit)
Definition: byteBuffer.h:380
virtual void close()=0
string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex)
BlockingClientTCPTransportCodec(Context::shared_pointer const &context, SOCKET channel, ResponseHandler::shared_pointer const &responseHandler, int32_t sendBufferSize, int32_t receiveBufferSize, std::tr1::shared_ptr< ClientChannelImpl > const &client, epics::pvData::int8 remoteTransportRevision, float heartbeatInterval, int16_t priority)
Definition: codec.cpp:1698
EPICS_ALWAYS_INLINE void putShort(int16 value)
Definition: byteBuffer.h:531
#define false
Definition: flexdef.h:85
int32_t int32
Definition: pvType.h:83
void putControlMessage(epics::pvData::int8 command, epics::pvData::int32 data)
Definition: codec.cpp:617
#define FINAL
Definition: pvAccess.h:48
virtual void setRecipient(osiSockAddr const &sendTo) OVERRIDE FINAL
Definition: codec.cpp:947
virtual void authNZMessage(epics::pvData::PVStructure::shared_pointer const &data) OVERRIDE FINAL
Definition: codec.cpp:1388
#define SHUT_RDWR
Definition: osdSock.h:48
#define IS_LOGGABLE(level)
Definition: logger.h:51
#define SOCK_EWOULDBLOCK
Definition: osdSock.h:51
virtual void verified(epics::pvData::Status const &status) OVERRIDE
Definition: codec.cpp:1373
unsigned epicsStdCall ipAddrToDottedIP(const struct sockaddr_in *paddr, char *pBuf, unsigned bufSize)
Definition: osiSock.c:144
LIBCOM_API epicsThreadId epicsStdCall epicsThreadGetIdSelf(void)
Definition: osdThread.c:810