This is Unofficial EPICS BASE Doxygen Site
comQueSend.cpp
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2002 The University of Chicago, as Operator of Argonne
3 * National Laboratory.
4 * Copyright (c) 2002 The Regents of the University of California, as
5 * Operator of Los Alamos National Laboratory.
6 * EPICS BASE is distributed subject to a Software License Agreement found
7 * in file LICENSE that is included with this distribution.
8 \*************************************************************************/
9 
10 /*
11  *
12  *
13  * L O S A L A M O S
14  * Los Alamos National Laboratory
15  * Los Alamos, New Mexico 87545
16  *
17  * Copyright, 1986, The Regents of the University of California.
18  *
19  *
20  * Author Jeffrey O. Hill
21  * johill@lanl.gov
22  */
23 
24 //
25 // Requirements:
26 // 1) Allow sufficent headroom so that users will be able to perform
27 // a reasonable amount of IO within CA callbacks without experiencing
28 // a push/pull deadlock. If a potential push/pull deadlock situation
29 // occurs then detect and avoid it and provide diagnotic to the user
30 // via special status.
31 // 2) Return status to the user when there is insufficent memory to
32 // queue a complete message.
33 // 3) return status to the user when a message cant be flushed because
34 // a connection dropped.
35 // 4) Do not allocate too much memory in exception situatons (such as
36 // after a circuit disconnect).
37 // 5) Avoid allocating more memory than is absolutely necessary to meet
38 // the above requirements.
39 // 6) Message fragments must never be sent to the IOC when there isnt
40 // enough memory to queue part of a message (we also must not force
41 // a disconnect because the client is starved for memory).
42 // 7) avoid the need to check status for each byte pushed into the
43 // protocol stream.
44 //
45 // Implementation:
46 // 1) When queuing a complete message, first test to see if a flush is
47 // required. If it is a receive thread scheduals the flush with the
48 // send thread, and otherwise directly execute the system call. The
49 // send thread must run at a higher priority than the receive thread
50 // if we are to minimize memory consumption.
51 // 2) Preallocate space for the entire message prior to copying in the
52 // message so that message fragments are not flushed out just prior
53 // to detecting that memory is unavailable.
54 // 3) Return a special error constant when the following situations
55 // are detected when the user is attempting to queue a request
56 // from within a user callback executed by a receive thread:
57 // a) A user is queuing more requests that demand a response from a
58 // callback than are removed by the response that initiated the
59 // callback, and this situation persists for many callbacks until
60 // all buffering in the system is exausted.
61 // b) A user is queuing many requests that demand a response from one
62 // callback until all buffering in the system is exausted.
63 // c) Some combination of both (a) nad (b).
64 //
65 //
66 
67 #define epicsAssertAuthor "Jeff Hill johill@lanl.gov"
68 
69 #include "iocinf.h"
70 #include "virtualCircuit.h"
71 #include "db_access.h" // for dbr_short_t etc
72 
73 // nill message alignment pad bytes
74 const char cacNillBytes [] =
75 {
76  0, 0, 0, 0,
77  0, 0, 0, 0
78 };
79 
81  comBufMemoryManager & comBufMemMgrIn ):
82  comBufMemMgr ( comBufMemMgrIn ), wire ( wireIn ),
83  nBytesPending ( 0u )
84 {
85 }
86 
88 {
89  this->clear ();
90 }
91 
93 {
94  comBuf *pBuf;
95 
96  while ( ( pBuf = this->bufs.get () ) ) {
97  this->nBytesPending -= pBuf->occupiedBytes ();
98  pBuf->~comBuf ();
99  this->comBufMemMgr.release ( pBuf );
100  }
101  this->pFirstUncommited = tsDLIter < comBuf > ();
102  assert ( this->nBytesPending == 0 );
103 }
104 
105 void comQueSend::copy_dbr_string ( const void * pValue )
106 {
107  this->push ( static_cast < const char * > ( pValue ), MAX_STRING_SIZE );
108 }
109 
110 void comQueSend::copy_dbr_short ( const void * pValue )
111 {
112  this->push ( * static_cast <const dbr_short_t *> ( pValue ) );
113 }
114 
115 void comQueSend::copy_dbr_float ( const void * pValue )
116 {
117  this->push ( * static_cast <const dbr_float_t *> ( pValue ) );
118 }
119 
120 void comQueSend::copy_dbr_char ( const void * pValue )
121 {
122  this->push ( * static_cast <const dbr_char_t *> ( pValue ) );
123 }
124 
125 void comQueSend::copy_dbr_long ( const void * pValue )
126 {
127  this->push ( * static_cast <const dbr_long_t *> ( pValue ) );
128 }
129 
130 void comQueSend::copy_dbr_double ( const void * pValue )
131 {
132  this->push ( * static_cast <const dbr_double_t *> ( pValue ) );
133 }
134 
135 void comQueSend::copy_dbr_invalid ( const void * )
136 {
137  throw cacChannel::badType ();
138 }
139 
140 const comQueSend::copyScalarFunc_t comQueSend::dbrCopyScalar [39] = {
141  &comQueSend::copy_dbr_string,
142  &comQueSend::copy_dbr_short,
143  &comQueSend::copy_dbr_float,
144  &comQueSend::copy_dbr_short, // DBR_ENUM
145  &comQueSend::copy_dbr_char,
146  &comQueSend::copy_dbr_long,
147  &comQueSend::copy_dbr_double,
148  &comQueSend::copy_dbr_invalid, // DBR_STS_SHORT
149  &comQueSend::copy_dbr_invalid, // DBR_STS_FLOAT
150  &comQueSend::copy_dbr_invalid, // DBR_STS_ENUM
151  &comQueSend::copy_dbr_invalid, // DBR_STS_CHAR
152  &comQueSend::copy_dbr_invalid, // DBR_STS_LONG
153  &comQueSend::copy_dbr_invalid, // DBR_STS_DOUBLE
154  &comQueSend::copy_dbr_invalid, // DBR_TIME_STRING
155  &comQueSend::copy_dbr_invalid, // DBR_TIME_INT
156  &comQueSend::copy_dbr_invalid, // DBR_TIME_SHORT
157  &comQueSend::copy_dbr_invalid, // DBR_TIME_FLOAT
158  &comQueSend::copy_dbr_invalid, // DBR_TIME_ENUM
159  &comQueSend::copy_dbr_invalid, // DBR_TIME_CHAR
160  &comQueSend::copy_dbr_invalid, // DBR_TIME_LONG
161  &comQueSend::copy_dbr_invalid, // DBR_TIME_DOUBLE
162  &comQueSend::copy_dbr_invalid, // DBR_GR_STRING
163  &comQueSend::copy_dbr_invalid, // DBR_GR_SHORT
164  &comQueSend::copy_dbr_invalid, // DBR_GR_FLOAT
165  &comQueSend::copy_dbr_invalid, // DBR_GR_ENUM
166  &comQueSend::copy_dbr_invalid, // DBR_GR_CHAR
167  &comQueSend::copy_dbr_invalid, // DBR_GR_LONG
168  &comQueSend::copy_dbr_invalid, // DBR_GR_DOUBLE
169  &comQueSend::copy_dbr_invalid, // DBR_CTRL_STRING
170  &comQueSend::copy_dbr_invalid, // DBR_CTRL_SHORT
171  &comQueSend::copy_dbr_invalid, // DBR_CTRL_FLOAT
172  &comQueSend::copy_dbr_invalid, // DBR_CTRL_ENUM
173  &comQueSend::copy_dbr_invalid, // DBR_CTRL_CHAR
174  &comQueSend::copy_dbr_invalid, // DBR_CTRL_LONG
175  &comQueSend::copy_dbr_invalid, // DBR_CTRL_DOUBLE
176  &comQueSend::copy_dbr_short, // DBR_PUT_ACKT
177  &comQueSend::copy_dbr_short, // DBR_PUT_ACKS
178  &comQueSend::copy_dbr_invalid, // DBR_STSACK_STRING
179  &comQueSend::copy_dbr_invalid // DBR_CLASS_NAME
180 };
181 
182 void comQueSend::copy_dbr_string ( const void *pValue, unsigned nElem )
183 {
184  this->push ( static_cast < const char * > ( pValue ), nElem * MAX_STRING_SIZE );
185 }
186 
187 void comQueSend::copy_dbr_short ( const void *pValue, unsigned nElem )
188 {
189  this->push ( static_cast <const dbr_short_t *> ( pValue ), nElem );
190 }
191 
192 void comQueSend::copy_dbr_float ( const void *pValue, unsigned nElem )
193 {
194  this->push ( static_cast <const dbr_float_t *> ( pValue ), nElem );
195 }
196 
197 void comQueSend::copy_dbr_char ( const void *pValue, unsigned nElem )
198 {
199  this->push ( static_cast <const dbr_char_t *> ( pValue ), nElem );
200 }
201 
202 void comQueSend::copy_dbr_long ( const void *pValue, unsigned nElem )
203 {
204  this->push ( static_cast <const dbr_long_t *> ( pValue ), nElem );
205 }
206 
207 void comQueSend::copy_dbr_double ( const void *pValue, unsigned nElem )
208 {
209  this->push ( static_cast <const dbr_double_t *> ( pValue ), nElem );
210 }
211 
212 void comQueSend::copy_dbr_invalid ( const void *, unsigned )
213 {
214  throw cacChannel::badType ();
215 }
216 
217 const comQueSend::copyVectorFunc_t comQueSend::dbrCopyVector [39] = {
218  &comQueSend::copy_dbr_string,
219  &comQueSend::copy_dbr_short,
220  &comQueSend::copy_dbr_float,
221  &comQueSend::copy_dbr_short, // DBR_ENUM
222  &comQueSend::copy_dbr_char,
223  &comQueSend::copy_dbr_long,
224  &comQueSend::copy_dbr_double,
225  &comQueSend::copy_dbr_invalid, // DBR_STS_SHORT
226  &comQueSend::copy_dbr_invalid, // DBR_STS_FLOAT
227  &comQueSend::copy_dbr_invalid, // DBR_STS_ENUM
228  &comQueSend::copy_dbr_invalid, // DBR_STS_CHAR
229  &comQueSend::copy_dbr_invalid, // DBR_STS_LONG
230  &comQueSend::copy_dbr_invalid, // DBR_STS_DOUBLE
231  &comQueSend::copy_dbr_invalid, // DBR_TIME_STRING
232  &comQueSend::copy_dbr_invalid, // DBR_TIME_INT
233  &comQueSend::copy_dbr_invalid, // DBR_TIME_SHORT
234  &comQueSend::copy_dbr_invalid, // DBR_TIME_FLOAT
235  &comQueSend::copy_dbr_invalid, // DBR_TIME_ENUM
236  &comQueSend::copy_dbr_invalid, // DBR_TIME_CHAR
237  &comQueSend::copy_dbr_invalid, // DBR_TIME_LONG
238  &comQueSend::copy_dbr_invalid, // DBR_TIME_DOUBLE
239  &comQueSend::copy_dbr_invalid, // DBR_GR_STRING
240  &comQueSend::copy_dbr_invalid, // DBR_GR_SHORT
241  &comQueSend::copy_dbr_invalid, // DBR_GR_FLOAT
242  &comQueSend::copy_dbr_invalid, // DBR_GR_ENUM
243  &comQueSend::copy_dbr_invalid, // DBR_GR_CHAR
244  &comQueSend::copy_dbr_invalid, // DBR_GR_LONG
245  &comQueSend::copy_dbr_invalid, // DBR_GR_DOUBLE
246  &comQueSend::copy_dbr_invalid, // DBR_CTRL_STRING
247  &comQueSend::copy_dbr_invalid, // DBR_CTRL_SHORT
248  &comQueSend::copy_dbr_invalid, // DBR_CTRL_FLOAT
249  &comQueSend::copy_dbr_invalid, // DBR_CTRL_ENUM
250  &comQueSend::copy_dbr_invalid, // DBR_CTRL_CHAR
251  &comQueSend::copy_dbr_invalid, // DBR_CTRL_LONG
252  &comQueSend::copy_dbr_invalid, // DBR_CTRL_DOUBLE
253  &comQueSend::copy_dbr_short, // DBR_PUT_ACKT
254  &comQueSend::copy_dbr_short, // DBR_PUT_ACKS
255  &comQueSend::copy_dbr_invalid, // DBR_STSACK_STRING
256  &comQueSend::copy_dbr_invalid // DBR_CLASS_NAME
257 };
258 
260 {
261  comBuf *pBuf = this->bufs.get ();
262  if ( pBuf ) {
263  unsigned nBytesThisBuf = pBuf->occupiedBytes ();
264  if ( nBytesThisBuf ) {
265  assert ( this->nBytesPending >= nBytesThisBuf );
266  this->nBytesPending -= nBytesThisBuf;
267  }
268  else {
269  this->bufs.push ( *pBuf );
270  pBuf = 0;
271  }
272  }
273  else {
274  assert ( this->nBytesPending == 0u );
275  }
276  return pBuf;
277 }
278 
280  ca_uint16_t request, ca_uint32_t payloadSize,
281  ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid,
282  ca_uint32_t requestDependent, bool v49Ok )
283 {
284  if ( payloadSize < 0xffff && nElem < 0xffff ) {
285  comBuf * pComBuf = this->bufs.last ();
286  if ( ! pComBuf || pComBuf->unoccupiedBytes() < 16u ) {
287  pComBuf = newComBuf ();
288  this->pushComBuf ( *pComBuf );
289  }
290  pComBuf->push ( request );
291  pComBuf->push ( static_cast < ca_uint16_t > ( payloadSize ) );
292  pComBuf->push ( dataType );
293  pComBuf->push ( static_cast < ca_uint16_t > ( nElem ) );
294  pComBuf->push ( cid );
295  pComBuf->push ( requestDependent );
296  }
297  else if ( v49Ok ) {
298  comBuf * pComBuf = this->bufs.last ();
299  if ( ! pComBuf || pComBuf->unoccupiedBytes() < 24u ) {
300  pComBuf = newComBuf ();
301  this->pushComBuf ( *pComBuf );
302  }
303  pComBuf->push ( request );
304  pComBuf->push ( static_cast < ca_uint16_t > ( 0xffff ) );
305  pComBuf->push ( dataType );
306  pComBuf->push ( static_cast < ca_uint16_t > ( 0u ) );
307  pComBuf->push ( cid );
308  pComBuf->push ( requestDependent );
309  pComBuf->push ( payloadSize );
310  pComBuf->push ( nElem );
311  }
312  else {
313  throw cacChannel::outOfBounds ();
314  }
315 }
316 
318  ca_uint16_t request, unsigned dataType, arrayElementCount nElem,
319  ca_uint32_t cid, ca_uint32_t requestDependent,
320  const void * pPayload, bool v49Ok )
321 {
322  if ( INVALID_DB_REQ ( dataType ) ) {
323  throw cacChannel::badType ();
324  }
325  if ( dataType >= comQueSendCopyDispatchSize ) {
326  throw cacChannel::badType();
327  }
328  ca_uint32_t size = 0u;
329  ca_uint32_t payloadSize = 0u;
330  if ( nElem == 1 ) {
331  if ( dataType == DBR_STRING ) {
332  const char * pStr = static_cast < const char * > ( pPayload );
333  size = strlen ( pStr ) + 1u;
334  if ( size > MAX_STRING_SIZE ) {
335  throw cacChannel::outOfBounds();
336  }
337  payloadSize = CA_MESSAGE_ALIGN ( size );
338  this->insertRequestHeader ( request, payloadSize,
339  static_cast <ca_uint16_t> ( dataType ),
340  nElem, cid, requestDependent, v49Ok );
341  this->pushString ( pStr, size );
342  }
343  else {
344  size = dbr_size[dataType];
345  payloadSize = CA_MESSAGE_ALIGN ( size );
346  this->insertRequestHeader ( request, payloadSize,
347  static_cast <ca_uint16_t> ( dataType ),
348  nElem, cid, requestDependent, v49Ok );
349  ( this->*dbrCopyScalar [dataType] ) ( pPayload );
350  }
351  }
352  else {
353  arrayElementCount maxBytes;
354  if ( v49Ok ) {
355  maxBytes = 0xffffffff;
356  }
357  else {
358  maxBytes = MAX_TCP - sizeof ( caHdr );
359  }
360  arrayElementCount maxElem =
361  ( maxBytes - sizeof (dbr_double_t) - dbr_size[dataType] ) /
362  dbr_value_size[dataType];
363  if ( nElem >= maxElem ) {
364  throw cacChannel::outOfBounds();
365  }
366  // the above checks verify that the total size
367  // is lest that 0xffffffff
368  size = static_cast < ca_uint32_t >
369  ( dbr_size_n ( dataType, nElem ) );
370  payloadSize = CA_MESSAGE_ALIGN ( size );
371  this->insertRequestHeader ( request, payloadSize,
372  static_cast <ca_uint16_t> ( dataType ),
373  static_cast < ca_uint32_t > ( nElem ),
374  cid, requestDependent, v49Ok );
375  ( this->*dbrCopyVector [dataType] ) ( pPayload, nElem );
376  }
377  // set pad bytes to nill
378  unsigned padSize = payloadSize - size;
379  if ( padSize ) {
380  this->pushString ( cacNillBytes, payloadSize - size );
381  }
382 }
383 
384 void comQueSend::commitMsg ()
385 {
386  while ( this->pFirstUncommited.valid() ) {
387  this->nBytesPending += this->pFirstUncommited->uncommittedBytes ();
388  this->pFirstUncommited->commitIncomming ();
389  this->pFirstUncommited++;
390  }
391  // printf ( "NBP: %u\n", this->nBytesPending );
392 }
393 
394 
395 void comQueSend::clearUncommitedMsg ()
396 {
397  while ( this->pFirstUncommited.valid() ) {
398  tsDLIter < comBuf > next = this->pFirstUncommited;
399  next++;
400  this->pFirstUncommited->clearUncommittedIncomming ();
401  if ( this->pFirstUncommited->occupiedBytes() == 0u ) {
402  this->bufs.remove ( *this->pFirstUncommited );
403  this->pFirstUncommited->~comBuf ();
404  this->comBufMemMgr.release ( this->pFirstUncommited.pointer() );
405  }
406  this->pFirstUncommited = next;
407  }
408 }
409 
T * pointer() const
Definition: tsDLList.h:678
unsigned push(comBuf &)
Definition: comBuf.h:162
#define DBR_STRING
Definition: db_access.h:69
#define MAX_TCP
Definition: caProto.h:63
const unsigned short dbr_value_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:896
const unsigned short dbr_size[LAST_BUFFER_TYPE+1]
Definition: access.cpp:847
void commitIncomming()
Definition: comBuf.h:242
std::string request
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
void pushString(const char *pVal, unsigned nChar)
Definition: comQueSend.h:203
unsigned int ca_uint32_t
Definition: caProto.h:76
#define INVALID_DB_REQ(x)
Definition: db_access.h:115
unsigned uncommittedBytes() const
Definition: comBuf.h:157
unsigned short ca_uint16_t
Definition: caProto.h:75
bool valid() const
Definition: tsDLList.h:607
comQueSend(wireSendAdapter &, comBufMemoryManager &)
Definition: comQueSend.cpp:80
void clearUncommittedIncomming()
Definition: comBuf.h:247
T * last(void) const
Definition: tsDLList.h:199
#define CA_MESSAGE_ALIGN(A)
Definition: caProto.h:154
const char cacNillBytes[]
Definition: comQueSend.cpp:74
unsigned occupiedBytes() const
Definition: comBuf.h:152
epicsFloat64 dbr_double_t
Definition: db_access.h:47
struct ca_hdr caHdr
void insertRequestWithPayLoad(ca_uint16_t request, unsigned dataType, arrayElementCount nElem, ca_uint32_t cid, ca_uint32_t requestDependent, const void *pPayload, bool v49Ok)
Definition: comQueSend.cpp:317
unsigned long arrayElementCount
Definition: cacIO.h:57
comBuf * popNextComBufToSend()
Definition: comQueSend.cpp:259
T * get()
Definition: tsDLList.h:261
Definition: comBuf.h:75
void push(T &item)
Definition: tsDLList.h:416
#define MAX_STRING_SIZE
Definition: epicsTypes.h:65
#define dbr_size_n(TYPE, COUNT)
Definition: db_access.h:518
unsigned unoccupiedBytes() const
Definition: comBuf.h:147
void insertRequestHeader(ca_uint16_t request, ca_uint32_t payloadSize, ca_uint16_t dataType, ca_uint32_t nElem, ca_uint32_t cid, ca_uint32_t requestDependent, bool v49Ok)
Definition: comQueSend.cpp:279
void remove(T &item)
Definition: tsDLList.h:219
#define comQueSendCopyDispatchSize
Definition: comQueSend.h:34
void clear()
Definition: comQueSend.cpp:92
virtual void release(void *)=0