This is Unofficial EPICS BASE Doxygen Site
osdMessageQueue.cpp
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2002 The University of Chicago, as Operator of Argonne
3 * National Laboratory.
4 * Copyright (c) 2002 The Regents of the University of California, as
5 * Operator of Los Alamos National Laboratory.
6 * EPICS BASE Versions 3.13.7
7 * and higher are distributed subject to a Software License Agreement found
8 * in file LICENSE that is included with this distribution.
9 \*************************************************************************/
10 /*
11  * Author W. Eric Norum
12  * norume@aps.anl.gov
13  * 630 252 4793
14  */
15 
16 #include <stdexcept>
17 #include <string.h>
18 #include <stdlib.h>
19 #include <stdio.h>
20 
21 #include "epicsMessageQueue.h"
22 #include <ellLib.h>
23 #include <epicsAssert.h>
24 #include <epicsEvent.h>
25 #include <epicsMutex.h>
26 
27 /*
28  * Event cache
29  */
30 struct eventNode {
33 };
34 
35 /*
36  * List of threads waiting to send or receive a message
37  */
38 struct threadNode {
40  struct eventNode *evp;
41  void *buf;
42  unsigned int size;
43  volatile bool eventSent;
44 };
45 
46 /*
47  * Message info
48  */
54 
56  unsigned long capacity;
57  unsigned long maxMessageSize;
58 
59  unsigned long *buf;
62  volatile char *inPtr;
63  volatile char *outPtr;
64  unsigned long slotSize;
65 
66  bool full;
67 };
68 
70  unsigned int capacity,
71  unsigned int maxMessageSize)
72 {
74  unsigned int slotBytes, slotLongs;
75 
76  if(capacity == 0)
77  return NULL;
78 
79  pmsg = (epicsMessageQueueId)calloc(1, sizeof(*pmsg));
80  if(!pmsg)
81  return NULL;
82 
83  pmsg->capacity = capacity;
84  pmsg->maxMessageSize = maxMessageSize;
85  slotLongs = 1 + ((maxMessageSize + sizeof(unsigned long) - 1) / sizeof(unsigned long));
86  slotBytes = slotLongs * sizeof(unsigned long);
87 
88  pmsg->mutex = epicsMutexCreate();
89  pmsg->buf = (unsigned long*)calloc(pmsg->capacity, slotBytes);
90  if(!pmsg->buf || !pmsg->mutex) {
91  if(pmsg->mutex)
92  epicsMutexDestroy(pmsg->mutex);
93  free(pmsg->buf);
94  free(pmsg);
95  return NULL;
96  }
97 
98  pmsg->inPtr = pmsg->outPtr = pmsg->firstMessageSlot = (char *)&pmsg->buf[0];
99  pmsg->lastMessageSlot = (char *)&pmsg->buf[(capacity - 1) * slotLongs];
100  pmsg->full = false;
101  pmsg->slotSize = slotBytes;
102 
103  ellInit(&pmsg->sendQueue);
104  ellInit(&pmsg->receiveQueue);
105  ellInit(&pmsg->eventFreeList);
106  return pmsg;
107 }
108 
109 static void
110 destroyEventNode(struct eventNode *enode)
111 {
112  epicsEventDestroy(enode->event);
113  free(enode);
114 }
115 
116 LIBCOM_API void epicsStdCall
118 {
119  struct eventNode *evp;
120 
121  while ((evp = reinterpret_cast < struct eventNode * >
122  ( ellGet(&pmsg->eventFreeList) ) ) != NULL) {
123  destroyEventNode(evp);
124  }
125  epicsMutexDestroy(pmsg->mutex);
126  free(pmsg->buf);
127  free(pmsg);
128 }
129 
130 static struct eventNode *
131 getEventNode(epicsMessageQueueId pmsg)
132 {
133  struct eventNode *evp;
134 
135  evp = reinterpret_cast < struct eventNode * > ( ellGet(&pmsg->eventFreeList) );
136  if (evp == NULL) {
137  evp = (struct eventNode *) calloc(1, sizeof(*evp));
138  if (evp) {
140  if (evp->event == NULL) {
141  free(evp);
142  return NULL;
143  }
144  }
145  }
146  return evp;
147 }
148 
149 static void
150 freeEventNode(epicsMessageQueueId pmsg, eventNode *evp, epicsEventStatus status)
151 {
152  if (status == epicsEventWaitTimeout) {
153  epicsEventSignal(evp->event);
154  epicsEventWait(evp->event);
155  }
156  ellAdd(&pmsg->eventFreeList, &evp->link);
157 }
158 
159 static int
160 mySend(epicsMessageQueueId pmsg, void *message, unsigned int size,
161  double timeout)
162 {
163  char *myInPtr, *nextPtr;
164  struct threadNode *pthr;
165 
166  if(size > pmsg->maxMessageSize)
167  return -1;
168 
169  /*
170  * See if message can be sent
171  */
172  epicsMutexMustLock(pmsg->mutex);
173 
174  if ((pmsg->numberOfSendersWaiting > 0)
175  || (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL))) {
176  /*
177  * Return if not allowed to wait. NB -1 means wait forever.
178  */
179  if (timeout == 0) {
180  epicsMutexUnlock(pmsg->mutex);
181  return -1;
182  }
183 
184  /*
185  * Indicate that we're waiting
186  */
187  struct threadNode threadNode;
188  threadNode.evp = getEventNode(pmsg);
189  threadNode.eventSent = false;
190  if (!threadNode.evp) {
191  epicsMutexUnlock(pmsg->mutex);
192  return -1;
193  }
194 
195  ellAdd(&pmsg->sendQueue, &threadNode.link);
196  pmsg->numberOfSendersWaiting++;
197 
198  epicsMutexUnlock(pmsg->mutex);
199 
200  /*
201  * Wait for receiver to wake us
202  */
203  epicsEventStatus status = timeout < 0 ?
204  epicsEventWait(threadNode.evp->event) :
205  epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
206 
207  epicsMutexMustLock(pmsg->mutex);
208 
209  if (!threadNode.eventSent) {
210  /* Receiver didn't take us off the sendQueue, do it ourselves */
211  ellDelete(&pmsg->sendQueue, &threadNode.link);
212  pmsg->numberOfSendersWaiting--;
213  }
214 
215  freeEventNode(pmsg, threadNode.evp, status);
216 
217  if (pmsg->full && (ellFirst(&pmsg->receiveQueue) == NULL)) {
218  /* State of the queue didn't change, exit */
219  epicsMutexUnlock(pmsg->mutex);
220  return -1;
221  }
222  }
223 
224  /*
225  * Copy message to waiting receiver
226  */
227  if ((pthr = reinterpret_cast < struct threadNode * >
228  ( ellGet(&pmsg->receiveQueue) ) ) != NULL) {
229  if(size <= pthr->size)
230  memcpy(pthr->buf, message, size);
231  pthr->size = size;
232  pthr->eventSent = true;
233  epicsEventSignal(pthr->evp->event);
234  epicsMutexUnlock(pmsg->mutex);
235  return 0;
236  }
237 
238  /*
239  * Copy to queue
240  */
241  myInPtr = (char *)pmsg->inPtr;
242  if (myInPtr == pmsg->lastMessageSlot)
243  nextPtr = pmsg->firstMessageSlot;
244  else
245  nextPtr = myInPtr + pmsg->slotSize;
246  if (nextPtr == (char *)pmsg->outPtr)
247  pmsg->full = true;
248  *(volatile unsigned long *)myInPtr = size;
249  memcpy((unsigned long *)myInPtr + 1, message, size);
250  pmsg->inPtr = nextPtr;
251  epicsMutexUnlock(pmsg->mutex);
252  return 0;
253 }
254 
255 LIBCOM_API int epicsStdCall
257  unsigned int size)
258 {
259  return mySend(pmsg, message, size, 0);
260 }
261 
262 LIBCOM_API int epicsStdCall
264  unsigned int size)
265 {
266  return mySend(pmsg, message, size, -1);
267 }
268 
269 LIBCOM_API int epicsStdCall
271  unsigned int size, double timeout)
272 {
273  return mySend(pmsg, message, size, timeout);
274 }
275 
276 static int
277 myReceive(epicsMessageQueueId pmsg, void *message, unsigned int size,
278  double timeout)
279 {
280  char *myOutPtr;
281  unsigned long l;
282  struct threadNode *pthr;
283 
284  /*
285  * If there's a message on the queue, copy it
286  */
287  epicsMutexMustLock(pmsg->mutex);
288 
289  myOutPtr = (char *)pmsg->outPtr;
290  if ((myOutPtr != pmsg->inPtr) || pmsg->full) {
291  int ret;
292  l = *(unsigned long *)myOutPtr;
293  if (l <= size) {
294  memcpy(message, (unsigned long *)myOutPtr + 1, l);
295  ret = l;
296  }
297  else {
298  ret = -1;
299  }
300  if (myOutPtr == pmsg->lastMessageSlot)
301  pmsg->outPtr = pmsg->firstMessageSlot;
302  else
303  pmsg->outPtr += pmsg->slotSize;
304  pmsg->full = false;
305 
306  /*
307  * Wake up the oldest task waiting to send
308  */
309  if ((pthr = reinterpret_cast < struct threadNode * >
310  ( ellGet(&pmsg->sendQueue) ) ) != NULL) {
311  pmsg->numberOfSendersWaiting--;
312  pthr->eventSent = true;
313  epicsEventSignal(pthr->evp->event);
314  }
315  epicsMutexUnlock(pmsg->mutex);
316  return ret;
317  }
318 
319  /*
320  * Return if not allowed to wait. NB -1 means wait forever.
321  */
322  if (timeout == 0) {
323  epicsMutexUnlock(pmsg->mutex);
324  return -1;
325  }
326 
327  /*
328  * Indicate that we're waiting
329  */
330  struct threadNode threadNode;
331  threadNode.evp = getEventNode(pmsg);
332  threadNode.buf = message;
333  threadNode.size = size;
334  threadNode.eventSent = false;
335 
336  if (!threadNode.evp) {
337  epicsMutexUnlock(pmsg->mutex);
338  return -1;
339  }
340 
341  ellAdd(&pmsg->receiveQueue, &threadNode.link);
342 
343  /*
344  * Wake up the oldest task waiting to send
345  */
346  if ((pthr = reinterpret_cast < struct threadNode * >
347  ( ellGet(&pmsg->sendQueue) ) ) != NULL) {
348  pmsg->numberOfSendersWaiting--;
349  pthr->eventSent = true;
350  epicsEventSignal(pthr->evp->event);
351  }
352 
353  epicsMutexUnlock(pmsg->mutex);
354 
355  /*
356  * Wait for a message to arrive
357  */
358  epicsEventStatus status = timeout < 0 ?
359  epicsEventWait(threadNode.evp->event) :
360  epicsEventWaitWithTimeout(threadNode.evp->event, timeout);
361 
362  epicsMutexMustLock(pmsg->mutex);
363 
364  if (!threadNode.eventSent)
365  ellDelete(&pmsg->receiveQueue, &threadNode.link);
366 
367  freeEventNode(pmsg, threadNode.evp, status);
368 
369  epicsMutexUnlock(pmsg->mutex);
370 
371  if (threadNode.eventSent && (threadNode.size <= size))
372  return threadNode.size;
373  return -1;
374 }
375 
376 LIBCOM_API int epicsStdCall
378  unsigned int size)
379 {
380  return myReceive(pmsg, message, size, 0);
381 }
382 
383 LIBCOM_API int epicsStdCall
385  unsigned int size)
386 {
387  return myReceive(pmsg, message, size, -1);
388 }
389 
390 LIBCOM_API int epicsStdCall
392  unsigned int size, double timeout)
393 {
394  return myReceive(pmsg, message, size, timeout);
395 }
396 
397 LIBCOM_API int epicsStdCall
399 {
400  char *myInPtr, *myOutPtr;
401  int nmsg;
402 
403  epicsMutexMustLock(pmsg->mutex);
404  myInPtr = (char *)pmsg->inPtr;
405  myOutPtr = (char *)pmsg->outPtr;
406  if (pmsg->full)
407  nmsg = pmsg->capacity;
408  else if (myInPtr >= myOutPtr)
409  nmsg = (myInPtr - myOutPtr) / pmsg->slotSize;
410  else
411  nmsg = pmsg->capacity - (myOutPtr - myInPtr) / pmsg->slotSize;
412  epicsMutexUnlock(pmsg->mutex);
413  return nmsg;
414 }
415 
416 LIBCOM_API void epicsStdCall
418 {
419  printf("Message Queue Used:%d Slots:%lu",
420  epicsMessageQueuePending(pmsg), pmsg->capacity);
421  if (level >= 1)
422  printf(" Maximum size:%lu", pmsg->maxMessageSize);
423  printf("\n");
424 }
double timeout
Definition: pvutils.cpp:25
struct eventNode * evp
LIBCOM_API int epicsStdCall epicsMessageQueueTryReceive(epicsMessageQueueId pmsg, void *message, unsigned int size)
Try to receive a message.
volatile char * inPtr
void epicsStdCall epicsMutexDestroy(epicsMutexId pmutexNode)
Destroy an epicsMutex semaphore.
Definition: epicsMutex.cpp:127
pvd::Status status
An EPICS-specific replacement for ANSI C&#39;s assert.
LIBCOM_API int epicsStdCall epicsMessageQueueSendWithTimeout(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout)
Send a message or timeout.
volatile char * outPtr
epicsEventId event
LIBCOM_API int epicsStdCall epicsMessageQueueTrySend(epicsMessageQueueId pmsg, void *message, unsigned int size)
Try to send a message.
#define printf
Definition: epicsStdio.h:41
#define epicsEventWait(ID)
Definition: osdEvent.h:19
ELLNODE * ellGet(ELLLIST *pList)
Deletes and returns the first node from a list.
Definition: ellLib.c:147
struct epicsMessageQueueOSD * epicsMessageQueueId
#define NULL
Definition: catime.c:38
LIBCOM_API epicsEventStatus epicsEventWaitWithTimeout(epicsEventId id, double timeOut)
Wait an the event or until the specified timeout period is over.
Definition: osdEvent.c:117
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
A doubly-linked list library.
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
LIBCOM_API epicsMessageQueueId epicsStdCall epicsMessageQueueCreate(unsigned int capacity, unsigned int maxMessageSize)
Create a message queue.
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
APIs for the epicsMutex mutual exclusion semaphore.
LIBCOM_API void epicsEventDestroy(epicsEventId id)
Destroy an epicsEvent and any resources it holds.
Definition: osdEvent.c:70
LIBCOM_API void epicsStdCall epicsMessageQueueDestroy(epicsMessageQueueId pmsg)
Destroy a message queue, release all its memory.
List node type.
Definition: ellLib.h:45
APIs for the epicsEvent binary semaphore.
LIBCOM_API int epicsStdCall epicsMessageQueueReceive(epicsMessageQueueId pmsg, void *message, unsigned int size)
Fetch the next message on the queue.
unsigned long maxMessageSize
epicsEventStatus
Return status from several C API routines.
Definition: epicsEvent.h:49
A C++ and a C facility for communication between threads.
#define ellInit(PLIST)
Initialize a list type.
Definition: ellLib.h:76
#define epicsMutexCreate()
Create an epicsMutex semaphore for use from C code.
Definition: epicsMutex.h:168
LIBCOM_API int epicsStdCall epicsMessageQueueSend(epicsMessageQueueId pmsg, void *message, unsigned int size)
Send a message.
if(yy_init)
Definition: scan.c:972
LIBCOM_API int epicsStdCall epicsMessageQueueReceiveWithTimeout(epicsMessageQueueId pmsg, void *message, unsigned int size, double timeout)
Wait for a message to be queued.
volatile bool eventSent
List header type.
Definition: ellLib.h:56
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int size
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
LIBCOM_API epicsEventId epicsEventCreate(epicsEventInitialState initialState)
Create an epicsEvent for use from C code, or return NULL.
Definition: osdEvent.c:47
LIBCOM_API void epicsStdCall epicsMessageQueueShow(epicsMessageQueueId pmsg, int level)
Displays some information about the message queue.
LIBCOM_API int epicsStdCall epicsMessageQueuePending(epicsMessageQueueId pmsg)
How many messages are queued.
#define ellFirst(PLIST)
Find the first node in list.
Definition: ellLib.h:89