This is Unofficial EPICS BASE Doxygen Site
threadPool.c File Reference
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include "dbDefs.h"
#include "errlog.h"
#include "ellLib.h"
#include "epicsThread.h"
#include "epicsMutex.h"
#include "epicsEvent.h"
#include "epicsInterrupt.h"
#include "cantProceed.h"
#include "epicsThreadPool.h"
#include "poolPriv.h"
+ Include dependency graph for threadPool.c:

Go to the source code of this file.

Functions

void epicsThreadPoolConfigDefaults (epicsThreadPoolConfig *opts)
 
epicsThreadPoolepicsThreadPoolCreate (epicsThreadPoolConfig *opts)
 
void epicsThreadPoolControl (epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val)
 
int epicsThreadPoolWait (epicsThreadPool *pool, double timeout)
 
void epicsThreadPoolDestroy (epicsThreadPool *pool)
 
void epicsThreadPoolReport (epicsThreadPool *pool, FILE *fd)
 
unsigned int epicsThreadPoolNThreads (epicsThreadPool *pool)
 
LIBCOM_API epicsThreadPoolepicsThreadPoolGetShared (epicsThreadPoolConfig *opts)
 
LIBCOM_API void epicsThreadPoolReleaseShared (epicsThreadPool *pool)
 

Function Documentation

void epicsThreadPoolConfigDefaults ( epicsThreadPoolConfig opts)

Definition at line 26 of file threadPool.c.

27 {
28  memset(opts, 0, sizeof(*opts));
31 
35 }
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
LIBCOM_API int epicsThreadGetCPUs(void)
Definition: osdThread.c:990
LIBCOM_API epicsThreadBooleanStatus epicsStdCall epicsThreadLowestPriorityLevelAbove(unsigned int priority, unsigned *pPriorityJustAbove)
Definition: osdThread.c:757
unsigned int workerStack
unsigned int workerPriority
#define epicsThreadPriorityCAServerHigh
Definition: epicsThread.h:81
void epicsThreadPoolControl ( epicsThreadPool pool,
epicsThreadPoolOption  opt,
unsigned int  val 
)

Definition at line 152 of file threadPool.c.

153 {
154  epicsMutexMustLock(pool->guard);
155  epicsThreadPoolControlImpl(pool, opt, val);
156  epicsMutexUnlock(pool->guard);
157 }
epicsMutexId guard
Definition: poolPriv.h:56
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
epicsThreadPool* epicsThreadPoolCreate ( epicsThreadPoolConfig opts)

Definition at line 37 of file threadPool.c.

38 {
39  size_t i;
40  epicsThreadPool *pool;
41 
42  /* caller likely didn't initialize the options structure */
43  if (opts && opts->maxThreads == 0) {
44  errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
45  return NULL;
46  }
47 
48  pool = calloc(1, sizeof(*pool));
49  if (!pool)
50  return NULL;
51 
52  if (opts)
53  memcpy(&pool->conf, opts, sizeof(*opts));
54  else
56 
57  if (pool->conf.initialThreads > pool->conf.maxThreads)
58  pool->conf.initialThreads = pool->conf.maxThreads;
59 
63  pool->guard = epicsMutexCreate();
64 
65  if (!pool->workerWakeup || !pool->shutdownEvent ||
66  !pool->observerWakeup || !pool->guard)
67  goto cleanup;
68 
69  ellInit(&pool->jobs);
70  ellInit(&pool->owned);
71 
73 
74  for (i = 0; i < pool->conf.initialThreads; i++) {
75  createPoolThread(pool);
76  }
77 
78  if (pool->threadsRunning == 0 && pool->conf.initialThreads != 0) {
79  epicsMutexUnlock(pool->guard);
80  errlogPrintf("Error: Unable to create any threads for thread pool\n");
81  goto cleanup;
82 
83  }
84  else if (pool->threadsRunning < pool->conf.initialThreads) {
85  errlogPrintf("Warning: Unable to create all threads for thread pool (%u/%u)\n",
86  pool->threadsRunning, pool->conf.initialThreads);
87  }
88 
89  epicsMutexUnlock(pool->guard);
90 
91  return pool;
92 
93 cleanup:
94  if (pool->workerWakeup)
96  if (pool->shutdownEvent)
98  if (pool->observerWakeup)
100  if (pool->guard)
101  epicsMutexDestroy(pool->guard);
102 
103  free(pool);
104  return NULL;
105 }
epicsEventId workerWakeup
Definition: poolPriv.h:42
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
Definition: threadPool.c:26
void epicsStdCall epicsMutexDestroy(epicsMutexId pmutexNode)
Destroy an epicsMutex semaphore.
Definition: epicsMutex.cpp:127
int i
Definition: scan.c:967
int createPoolThread(epicsThreadPool *pool)
Definition: poolJob.c:117
ELLLIST jobs
Definition: poolPriv.h:21
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int initialThreads
LIBCOM_API void epicsEventDestroy(epicsEventId id)
Destroy an epicsEvent and any resources it holds.
Definition: osdEvent.c:70
epicsEventId shutdownEvent
Definition: poolPriv.h:43
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
unsigned int threadsRunning
Definition: poolPriv.h:37
int errlogMessage(const char *message)
Definition: errlog.c:180
#define ellInit(PLIST)
Initialize a list type.
Definition: ellLib.h:76
#define epicsMutexCreate()
Create an epicsMutex semaphore for use from C code.
Definition: epicsMutex.h:168
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API epicsEventId epicsEventCreate(epicsEventInitialState initialState)
Create an epicsEvent for use from C code, or return NULL.
Definition: osdEvent.c:47
epicsEventId observerWakeup
Definition: poolPriv.h:45
void epicsThreadPoolDestroy ( epicsThreadPool pool)

Definition at line 199 of file threadPool.c.

200 {
201  unsigned int nThr;
202  ELLLIST notify;
203  ELLNODE *cur;
204 
205  if (!pool)
206  return;
207 
208  ellInit(&notify);
209 
210  epicsMutexMustLock(pool->guard);
211 
212  /* run remaining queued jobs */
213  epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0);
214  epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1);
215  nThr = pool->threadsRunning;
216  pool->freezeopt = 1;
217 
218  epicsMutexUnlock(pool->guard);
219 
220  epicsThreadPoolWait(pool, -1.0);
221  /* At this point all queued jobs have run */
222 
223  epicsMutexMustLock(pool->guard);
224 
225  pool->shutdown = 1;
226  /* wakeup all */
227  if (pool->threadsWaking < pool->threadsSleeping) {
228  pool->threadsWaking = pool->threadsSleeping;
230  }
231 
232  ellConcat(&notify, &pool->owned);
233  ellConcat(&notify, &pool->jobs);
234 
235  epicsMutexUnlock(pool->guard);
236 
237  if (nThr && epicsEventWait(pool->shutdownEvent) != epicsEventWaitOK){
238  errlogMessage("epicsThreadPoolDestroy: wait error");
239  return;
240  }
241 
242  /* all workers are now shutdown */
243 
244  /* notify remaining jobs that pool is being destroyed */
245  while ((cur = ellGet(&notify)) != NULL) {
246  epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
247 
248  job->running = 1;
249  job->func(job->arg, epicsJobModeCleanup);
250  job->running = 0;
251  if (job->freewhendone)
252  free(job);
253  else
254  job->pool = NULL; /* orphan */
255  }
256 
260  epicsMutexDestroy(pool->guard);
261 
262  free(pool);
263 }
epicsEventId workerWakeup
Definition: poolPriv.h:42
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
epicsThreadPool * pool
Definition: poolPriv.h:89
void epicsStdCall epicsMutexDestroy(epicsMutexId pmutexNode)
Destroy an epicsMutex semaphore.
Definition: epicsMutex.cpp:127
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
unsigned int freewhendone
Definition: poolPriv.h:93
void * arg
Definition: poolPriv.h:88
#define epicsEventWait(ID)
Definition: osdEvent.h:19
ELLLIST jobs
Definition: poolPriv.h:21
ELLNODE * ellGet(ELLLIST *pList)
Deletes and returns the first node from a list.
Definition: ellLib.c:147
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsJobFunction func
Definition: poolPriv.h:87
unsigned int threadsWaking
Definition: poolPriv.h:33
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
unsigned int running
Definition: poolPriv.h:92
LIBCOM_API void epicsEventDestroy(epicsEventId id)
Destroy an epicsEvent and any resources it holds.
Definition: osdEvent.c:70
epicsEventId shutdownEvent
Definition: poolPriv.h:43
List node type.
Definition: ellLib.h:45
unsigned int threadsRunning
Definition: poolPriv.h:37
int epicsThreadPoolWait(epicsThreadPool *pool, double timeout)
Definition: threadPool.c:159
int errlogMessage(const char *message)
Definition: errlog.c:180
unsigned int freezeopt
Definition: poolPriv.h:52
#define ellInit(PLIST)
Initialize a list type.
Definition: ellLib.h:76
#define epicsEventWaitOK
Old name provided for backwards compatibility.
Definition: epicsEvent.h:58
unsigned int threadsSleeping
Definition: poolPriv.h:35
void ellConcat(ELLLIST *pDstList, ELLLIST *pAddList)
Concatenates a list to the end of another list. The list to be added is left empty. Either list (or both) can be empty at the beginning of the operation.
Definition: ellLib.c:46
List header type.
Definition: ellLib.h:56
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int shutdown
Definition: poolPriv.h:54
epicsEventId observerWakeup
Definition: poolPriv.h:45
LIBCOM_API epicsThreadPool* epicsThreadPoolGetShared ( epicsThreadPoolConfig opts)

Definition at line 328 of file threadPool.c.

329 {
330  ELLNODE *node;
331  epicsThreadPool *cur;
332  epicsThreadPoolConfig defopts;
333  size_t N = epicsThreadGetCPUs();
334 
335  if (!opts) {
337  opts = &defopts;
338  }
339  /* shared pools must have a minimum allowed number of workers.
340  * Use the number of CPU cores
341  */
342  if (opts->maxThreads < N)
343  opts->maxThreads = N;
344 
345  epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
346 
347  epicsMutexMustLock(sharedPoolsGuard);
348 
349  for (node = ellFirst(&sharedPools); node; node = ellNext(node)) {
350  cur = CONTAINER(node, epicsThreadPool, sharedNode);
351 
352  /* Must have exactly the requested priority
353  * At least the requested max workers
354  * and at least the requested stack size
355  */
356  if (cur->conf.workerPriority != opts->workerPriority)
357  continue;
358  if (cur->conf.maxThreads < opts->maxThreads)
359  continue;
360  if (cur->conf.workerStack < opts->workerStack)
361  continue;
362 
363  cur->sharedCount++;
364  assert(cur->sharedCount > 0);
365  epicsMutexUnlock(sharedPoolsGuard);
366 
368  *opts = cur->conf;
369  epicsMutexUnlock(cur->guard);
370  return cur;
371  }
372 
373  cur = epicsThreadPoolCreate(opts);
374  if (!cur) {
375  epicsMutexUnlock(sharedPoolsGuard);
376  return NULL;
377  }
378  cur->sharedCount = 1;
379 
380  ellAdd(&sharedPools, &cur->sharedNode);
381  epicsMutexUnlock(sharedPoolsGuard);
382  return cur;
383 }
epicsMutexId guard
Definition: poolPriv.h:56
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
Definition: threadPool.c:26
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsThreadPool * epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
Definition: threadPool.c:37
ELLNODE sharedNode
Definition: poolPriv.h:18
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
#define ellNext(PNODE)
Find the next node in list.
Definition: ellLib.h:99
LIBCOM_API int epicsThreadGetCPUs(void)
Definition: osdThread.c:990
LIBCOM_API void epicsStdCall epicsThreadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC, void *arg)
unsigned int workerStack
unsigned int workerPriority
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
List node type.
Definition: ellLib.h:45
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
size_t sharedCount
Definition: poolPriv.h:19
#define ellFirst(PLIST)
Find the first node in list.
Definition: ellLib.h:89
unsigned int epicsThreadPoolNThreads ( epicsThreadPool pool)

Definition at line 302 of file threadPool.c.

303 {
304  unsigned int ret;
305 
306  epicsMutexMustLock(pool->guard);
307  ret = pool->threadsRunning;
308  epicsMutexUnlock(pool->guard);
309 
310  return ret;
311 }
epicsMutexId guard
Definition: poolPriv.h:56
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int threadsRunning
Definition: poolPriv.h:37
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API void epicsThreadPoolReleaseShared ( epicsThreadPool pool)

Definition at line 385 of file threadPool.c.

386 {
387  if (!pool)
388  return;
389 
390  epicsMutexMustLock(sharedPoolsGuard);
391 
392  assert(pool->sharedCount > 0);
393 
394  pool->sharedCount--;
395 
396  if (pool->sharedCount == 0) {
397  ellDelete(&sharedPools, &pool->sharedNode);
399  }
400 
401  epicsMutexUnlock(sharedPoolsGuard);
402 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
ELLNODE sharedNode
Definition: poolPriv.h:18
void epicsThreadPoolDestroy(epicsThreadPool *pool)
Definition: threadPool.c:199
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
size_t sharedCount
Definition: poolPriv.h:19
void epicsThreadPoolReport ( epicsThreadPool pool,
FILE *  fd 
)

Definition at line 266 of file threadPool.c.

267 {
268  ELLNODE *cur;
269  epicsMutexMustLock(pool->guard);
270 
271  fprintf(fd, "Thread Pool with %u/%u threads\n"
272  " running %d jobs with %u threads\n",
273  pool->threadsRunning,
274  pool->conf.maxThreads,
275  ellCount(&pool->jobs),
276  pool->threadsAreAwake);
277  if (pool->pauseadd)
278  fprintf(fd, " Inhibit queueing\n");
279  if (pool->pauserun)
280  fprintf(fd, " Pause workers\n");
281  if (pool->shutdown)
282  fprintf(fd, " Shutdown in progress\n");
283 
284  for (cur = ellFirst(&pool->jobs); cur; cur = ellNext(cur)) {
285  epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
286 
287  fprintf(fd, " job %p func: %p, arg: %p ",
288  job, job->func,
289  job->arg);
290  if (job->queued)
291  fprintf(fd, "Queued ");
292  if (job->running)
293  fprintf(fd, "Running ");
294  if (job->freewhendone)
295  fprintf(fd, "Free ");
296  fprintf(fd, "\n");
297  }
298 
299  epicsMutexUnlock(pool->guard);
300 }
epicsMutexId guard
Definition: poolPriv.h:56
#define ellCount(PLIST)
Report the number of nodes in a list.
Definition: ellLib.h:84
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
unsigned int freewhendone
Definition: poolPriv.h:93
void * arg
Definition: poolPriv.h:88
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsJobFunction func
Definition: poolPriv.h:87
#define ellNext(PNODE)
Find the next node in list.
Definition: ellLib.h:99
unsigned int threadsAreAwake
Definition: poolPriv.h:31
unsigned int running
Definition: poolPriv.h:92
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
List node type.
Definition: ellLib.h:45
unsigned int pauseadd
Definition: poolPriv.h:48
unsigned int threadsRunning
Definition: poolPriv.h:37
unsigned int pauserun
Definition: poolPriv.h:50
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
unsigned int shutdown
Definition: poolPriv.h:54
#define ellFirst(PLIST)
Find the first node in list.
Definition: ellLib.h:89
int epicsThreadPoolWait ( epicsThreadPool pool,
double  timeout 
)

Definition at line 159 of file threadPool.c.

160 {
161  int ret = 0;
162  epicsMutexMustLock(pool->guard);
163 
164  while (ellCount(&pool->jobs) > 0 || pool->threadsAreAwake > 0) {
165  pool->observerCount++;
166  epicsMutexUnlock(pool->guard);
167 
168  if (timeout < 0.0) {
170  }
171  else {
173  case epicsEventWaitError:
174  cantProceed("epicsThreadPoolWait: failed to wait for Event");
175  break;
177  ret = S_pool_timeout;
178  break;
179  case epicsEventWaitOK:
180  ret = 0;
181  break;
182  }
183  }
184 
185  epicsMutexMustLock(pool->guard);
186  pool->observerCount--;
187 
188  if (pool->observerCount)
190 
191  if (ret != 0)
192  break;
193  }
194 
195  epicsMutexUnlock(pool->guard);
196  return ret;
197 }
double timeout
Definition: pvutils.cpp:25
epicsMutexId guard
Definition: poolPriv.h:56
#define ellCount(PLIST)
Report the number of nodes in a list.
Definition: ellLib.h:84
ELLLIST jobs
Definition: poolPriv.h:21
LIBCOM_API epicsEventStatus epicsEventWaitWithTimeout(epicsEventId id, double timeOut)
Wait an the event or until the specified timeout period is over.
Definition: osdEvent.c:117
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
LIBCOM_API void epicsEventMustWait(epicsEventId id)
Wait for an event (see epicsEventWait()).
Definition: epicsEvent.cpp:123
unsigned int threadsAreAwake
Definition: poolPriv.h:31
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
unsigned int observerCount
Definition: poolPriv.h:40
#define epicsEventWaitOK
Old name provided for backwards compatibility.
Definition: epicsEvent.h:58
#define S_pool_timeout
LIBCOM_API void cantProceed(const char *msg,...)
Definition: cantProceed.c:54
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
#define epicsEventWaitError
Old name provided for backwards compatibility.
Definition: epicsEvent.h:60
epicsEventId observerWakeup
Definition: poolPriv.h:45