This is Unofficial EPICS BASE Doxygen Site
epicsThreadPool.h File Reference
#include <stdlib.h>
#include <stdio.h>
#include "libComAPI.h"
#include "errMdef.h"
+ Include dependency graph for epicsThreadPool.h:
+ This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  epicsThreadPoolConfig
 

Macros

#define S_pool_jobBusy   (M_pool| 1) /*Job already queued or running*/
 
#define S_pool_jobIdle   (M_pool| 2) /*Job was not queued or running*/
 
#define S_pool_noPool   (M_pool| 3) /*Job not associated with a pool*/
 
#define S_pool_paused   (M_pool| 4) /*Pool not currently accepting jobs*/
 
#define S_pool_noThreads   (M_pool| 5) /*Can't create worker thread*/
 
#define S_pool_timeout   (M_pool| 6) /*Pool still busy after timeout*/
 
#define EPICSJOB_SELF   epicsJobArgSelfMagic
 

Typedefs

typedef struct epicsThreadPool epicsThreadPool
 
typedef void(* epicsJobFunction) (void *arg, epicsJobMode mode)
 
typedef struct epicsJob epicsJob
 

Enumerations

enum  epicsJobMode { epicsJobModeRun, epicsJobModeCleanup }
 
enum  epicsThreadPoolOption { epicsThreadPoolQueueAdd, epicsThreadPoolQueueRun }
 

Functions

LIBCOM_API void epicsThreadPoolConfigDefaults (epicsThreadPoolConfig *)
 
LIBCOM_API epicsThreadPoolepicsThreadPoolGetShared (epicsThreadPoolConfig *opts)
 
LIBCOM_API void epicsThreadPoolReleaseShared (epicsThreadPool *pool)
 
LIBCOM_API epicsThreadPoolepicsThreadPoolCreate (epicsThreadPoolConfig *opts)
 
LIBCOM_API void epicsThreadPoolDestroy (epicsThreadPool *)
 
LIBCOM_API void epicsThreadPoolControl (epicsThreadPool *pool, epicsThreadPoolOption opt, unsigned int val)
 
LIBCOM_API int epicsThreadPoolWait (epicsThreadPool *pool, double timeout)
 
LIBCOM_API epicsJobepicsJobCreate (epicsThreadPool *pool, epicsJobFunction cb, void *user)
 
LIBCOM_API void epicsJobDestroy (epicsJob *)
 
LIBCOM_API int epicsJobMove (epicsJob *job, epicsThreadPool *pool)
 
LIBCOM_API int epicsJobQueue (epicsJob *)
 
LIBCOM_API int epicsJobUnqueue (epicsJob *)
 
LIBCOM_API void epicsThreadPoolReport (epicsThreadPool *pool, FILE *fd)
 
LIBCOM_API unsigned int epicsThreadPoolNThreads (epicsThreadPool *)
 

Variables

LIBCOM_API void * epicsJobArgSelfMagic
 

Macro Definition Documentation

#define EPICSJOB_SELF   epicsJobArgSelfMagic

Definition at line 107 of file epicsThreadPool.h.

#define S_pool_jobBusy   (M_pool| 1) /*Job already queued or running*/

Definition at line 19 of file epicsThreadPool.h.

#define S_pool_jobIdle   (M_pool| 2) /*Job was not queued or running*/

Definition at line 20 of file epicsThreadPool.h.

#define S_pool_noPool   (M_pool| 3) /*Job not associated with a pool*/

Definition at line 21 of file epicsThreadPool.h.

#define S_pool_noThreads   (M_pool| 5) /*Can't create worker thread*/

Definition at line 23 of file epicsThreadPool.h.

#define S_pool_paused   (M_pool| 4) /*Pool not currently accepting jobs*/

Definition at line 22 of file epicsThreadPool.h.

#define S_pool_timeout   (M_pool| 6) /*Pool still busy after timeout*/

Definition at line 24 of file epicsThreadPool.h.

Typedef Documentation

typedef struct epicsJob epicsJob

Definition at line 54 of file epicsThreadPool.h.

typedef void(* epicsJobFunction) (void *arg, epicsJobMode mode)

Definition at line 52 of file epicsThreadPool.h.

Definition at line 37 of file epicsThreadPool.h.

Enumeration Type Documentation

Enumerator
epicsJobModeRun 
epicsJobModeCleanup 

Definition at line 40 of file epicsThreadPool.h.

40  {
41  /* Normal run of job */
43 
44  /* Thread pool is being destroyed.
45  * A chance to cleanup the job immediately with epicsJobDestroy().
46  * If ignored, the job is orphaned (dissociated from the thread pool)
47  * and epicsJobDestroy() must be called later.
48  */
50 } epicsJobMode;
epicsJobMode
Enumerator
epicsThreadPoolQueueAdd 
epicsThreadPoolQueueRun 

Definition at line 82 of file epicsThreadPool.h.

82  {
83  epicsThreadPoolQueueAdd, /* val==0 causes epicsJobQueue to fail, 1 is default */
84  epicsThreadPoolQueueRun /* val==0 prevents workers from running jobs, 1 is default */
epicsThreadPoolOption

Function Documentation

LIBCOM_API epicsJob* epicsJobCreate ( epicsThreadPool pool,
epicsJobFunction  cb,
void *  user 
)

Definition at line 134 of file poolJob.c.

137 {
138  epicsJob *job = calloc(1, sizeof(*job));
139 
140  if (!job)
141  return NULL;
142 
143  if (arg == &epicsJobArgSelfMagic)
144  arg = job;
145 
146  job->pool = NULL;
147  job->func = func;
148  job->arg = arg;
149 
150  epicsJobMove(job, pool);
151 
152  return job;
153 }
epicsThreadPool * pool
Definition: poolPriv.h:89
void * arg
Definition: poolPriv.h:88
#define NULL
Definition: catime.c:38
epicsJobFunction func
Definition: poolPriv.h:87
int epicsJobMove(epicsJob *job, epicsThreadPool *newpool)
Definition: poolJob.c:182
void * epicsJobArgSelfMagic
Definition: poolJob.c:24
LIBCOM_API void epicsJobDestroy ( epicsJob )

Definition at line 155 of file poolJob.c.

156 {
157  epicsThreadPool *pool;
158  if (!job || !job->pool) {
159  free(job);
160  return;
161  }
162  pool = job->pool;
163 
164  epicsMutexMustLock(pool->guard);
165 
166  assert(!job->dead);
167 
168  epicsJobUnqueue(job);
169 
170  if (job->running || job->freewhendone) {
171  job->freewhendone = 1;
172  }
173  else {
174  ellDelete(&pool->owned, &job->jobnode);
175  job->dead = 1;
176  free(job);
177  }
178 
179  epicsMutexUnlock(pool->guard);
180 }
int epicsJobUnqueue(epicsJob *job)
Definition: poolJob.c:301
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
LIBCOM_API int epicsJobMove ( epicsJob job,
epicsThreadPool pool 
)

Definition at line 182 of file poolJob.c.

183 {
184  epicsThreadPool *pool = job->pool;
185 
186  /* remove from current pool */
187  if (pool) {
188  epicsMutexMustLock(pool->guard);
189 
190  if (job->queued || job->running) {
191  epicsMutexUnlock(pool->guard);
192  return S_pool_jobBusy;
193  }
194 
195  ellDelete(&pool->owned, &job->jobnode);
196 
197  epicsMutexUnlock(pool->guard);
198  }
199 
200  pool = job->pool = newpool;
201 
202  /* add to new pool */
203  if (pool) {
204  epicsMutexMustLock(pool->guard);
205 
206  ellAdd(&pool->owned, &job->jobnode);
207 
208  epicsMutexUnlock(pool->guard);
209  }
210 
211  return 0;
212 }
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
epicsThreadPool * pool
Definition: poolPriv.h:89
#define S_pool_jobBusy
ELLNODE jobnode
Definition: poolPriv.h:86
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
unsigned int running
Definition: poolPriv.h:92
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
LIBCOM_API int epicsJobQueue ( epicsJob )

Definition at line 214 of file poolJob.c.

215 {
216  int ret = 0;
217  epicsThreadPool *pool = job->pool;
218 
219  if (!pool)
220  return S_pool_noPool;
221 
222  epicsMutexMustLock(pool->guard);
223 
224  assert(!job->dead);
225 
226  if (pool->pauseadd) {
227  ret = S_pool_paused;
228  goto done;
229  }
230  else if (job->freewhendone) {
231  ret = S_pool_jobBusy;
232  goto done;
233  }
234  else if (job->queued) {
235  goto done;
236  }
237 
238  job->queued = 1;
239  /* Job may be queued from within a callback */
240  if (!job->running) {
241  ellDelete(&pool->owned, &job->jobnode);
242  ellAdd(&pool->jobs, &job->jobnode);
243  }
244  else {
245  /* some worker will find it again before sleeping */
246  goto done;
247  }
248 
249  /* Since we hold the lock, we can be certain that all awake worker are
250  * executing work functions. The current thread may be a worker.
251  * We prefer to wakeup a new worker rather then wait for a busy worker to
252  * finish. However, after we initiate a wakeup there will be a race
253  * between the worker waking up, and a busy worker finishing.
254  * Thus we can't avoid spurious wakeups.
255  */
256 
257  if (pool->threadsRunning >= pool->conf.maxThreads) {
258  /* all workers created... */
259  /* ... but some are sleeping, so wake one up */
260  if (pool->threadsWaking < pool->threadsSleeping) {
261  pool->threadsWaking++;
263  }
264  /*else one of the running workers will find this job before sleeping */
265  CHECKCOUNT(pool);
266 
267  }
268  else {
269  /* could create more workers so
270  * will either create a new worker, or wakeup an existing worker
271  */
272 
273  if (pool->threadsWaking >= pool->threadsSleeping) {
274  /* all sleeping workers have already been woken.
275  * start a new worker for this job
276  */
277  if (createPoolThread(pool) && pool->threadsRunning == 0) {
278  /* oops, we couldn't lazy create our first worker
279  * so this job would never run!
280  */
281  ret = S_pool_noThreads;
282  job->queued = 0;
283  /* if threadsRunning==0 then no jobs can be running */
284  assert(!job->running);
285  ellDelete(&pool->jobs, &job->jobnode);
286  ellAdd(&pool->owned, &job->jobnode);
287  }
288  }
289  if (ret == 0) {
290  pool->threadsWaking++;
292  }
293  CHECKCOUNT(pool);
294  }
295 
296 done:
297  epicsMutexUnlock(pool->guard);
298  return ret;
299 }
epicsEventId workerWakeup
Definition: poolPriv.h:42
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
#define CHECKCOUNT(pPool)
Definition: poolPriv.h:63
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define S_pool_jobBusy
int createPoolThread(epicsThreadPool *pool)
Definition: poolJob.c:117
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int threadsWaking
Definition: poolPriv.h:33
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
unsigned int pauseadd
Definition: poolPriv.h:48
unsigned int threadsRunning
Definition: poolPriv.h:37
#define S_pool_paused
void done(int k)
Definition: antelope.c:77
unsigned int threadsSleeping
Definition: poolPriv.h:35
#define S_pool_noPool
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
#define S_pool_noThreads
LIBCOM_API int epicsJobUnqueue ( epicsJob )

Definition at line 301 of file poolJob.c.

302 {
303  int ret = S_pool_jobIdle;
304  epicsThreadPool *pool = job->pool;
305 
306  if (!pool)
307  return S_pool_noPool;
308 
309  epicsMutexMustLock(pool->guard);
310 
311  assert(!job->dead);
312 
313  if (job->queued) {
314  if (!job->running) {
315  ellDelete(&pool->jobs, &job->jobnode);
316  ellAdd(&pool->owned, &job->jobnode);
317  }
318  job->queued = 0;
319  ret = 0;
320  }
321 
322  epicsMutexUnlock(pool->guard);
323 
324  return ret;
325 }
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
#define S_pool_jobIdle
#define S_pool_noPool
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
LIBCOM_API void epicsThreadPoolConfigDefaults ( epicsThreadPoolConfig )

Definition at line 26 of file threadPool.c.

27 {
28  memset(opts, 0, sizeof(*opts));
29  opts->maxThreads = epicsThreadGetCPUs();
31 
34  opts->workerPriority = epicsThreadPriorityMedium;
35 }
#define epicsThreadPriorityMedium
Definition: epicsThread.h:76
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
LIBCOM_API int epicsThreadGetCPUs(void)
Definition: osdThread.c:990
LIBCOM_API epicsThreadBooleanStatus epicsStdCall epicsThreadLowestPriorityLevelAbove(unsigned int priority, unsigned *pPriorityJustAbove)
Definition: osdThread.c:757
#define epicsThreadPriorityCAServerHigh
Definition: epicsThread.h:81
LIBCOM_API void epicsThreadPoolControl ( epicsThreadPool pool,
epicsThreadPoolOption  opt,
unsigned int  val 
)

Definition at line 152 of file threadPool.c.

153 {
154  epicsMutexMustLock(pool->guard);
155  epicsThreadPoolControlImpl(pool, opt, val);
156  epicsMutexUnlock(pool->guard);
157 }
epicsMutexId guard
Definition: poolPriv.h:56
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API epicsThreadPool* epicsThreadPoolCreate ( epicsThreadPoolConfig opts)

Definition at line 37 of file threadPool.c.

38 {
39  size_t i;
40  epicsThreadPool *pool;
41 
42  /* caller likely didn't initialize the options structure */
43  if (opts && opts->maxThreads == 0) {
44  errlogMessage("Error: epicsThreadPoolCreate() options provided, but not initialized");
45  return NULL;
46  }
47 
48  pool = calloc(1, sizeof(*pool));
49  if (!pool)
50  return NULL;
51 
52  if (opts)
53  memcpy(&pool->conf, opts, sizeof(*opts));
54  else
56 
57  if (pool->conf.initialThreads > pool->conf.maxThreads)
58  pool->conf.initialThreads = pool->conf.maxThreads;
59 
63  pool->guard = epicsMutexCreate();
64 
65  if (!pool->workerWakeup || !pool->shutdownEvent ||
66  !pool->observerWakeup || !pool->guard)
67  goto cleanup;
68 
69  ellInit(&pool->jobs);
70  ellInit(&pool->owned);
71 
73 
74  for (i = 0; i < pool->conf.initialThreads; i++) {
75  createPoolThread(pool);
76  }
77 
78  if (pool->threadsRunning == 0 && pool->conf.initialThreads != 0) {
79  epicsMutexUnlock(pool->guard);
80  errlogPrintf("Error: Unable to create any threads for thread pool\n");
81  goto cleanup;
82 
83  }
84  else if (pool->threadsRunning < pool->conf.initialThreads) {
85  errlogPrintf("Warning: Unable to create all threads for thread pool (%u/%u)\n",
86  pool->threadsRunning, pool->conf.initialThreads);
87  }
88 
89  epicsMutexUnlock(pool->guard);
90 
91  return pool;
92 
93 cleanup:
94  if (pool->workerWakeup)
96  if (pool->shutdownEvent)
98  if (pool->observerWakeup)
100  if (pool->guard)
101  epicsMutexDestroy(pool->guard);
102 
103  free(pool);
104  return NULL;
105 }
epicsEventId workerWakeup
Definition: poolPriv.h:42
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
Definition: threadPool.c:26
void epicsStdCall epicsMutexDestroy(epicsMutexId pmutexNode)
Destroy an epicsMutex semaphore.
Definition: epicsMutex.cpp:127
int i
Definition: scan.c:967
int createPoolThread(epicsThreadPool *pool)
Definition: poolJob.c:117
ELLLIST jobs
Definition: poolPriv.h:21
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int initialThreads
LIBCOM_API void epicsEventDestroy(epicsEventId id)
Destroy an epicsEvent and any resources it holds.
Definition: osdEvent.c:70
epicsEventId shutdownEvent
Definition: poolPriv.h:43
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
unsigned int threadsRunning
Definition: poolPriv.h:37
int errlogMessage(const char *message)
Definition: errlog.c:180
#define ellInit(PLIST)
Initialize a list type.
Definition: ellLib.h:76
#define epicsMutexCreate()
Create an epicsMutex semaphore for use from C code.
Definition: epicsMutex.h:168
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API epicsEventId epicsEventCreate(epicsEventInitialState initialState)
Create an epicsEvent for use from C code, or return NULL.
Definition: osdEvent.c:47
epicsEventId observerWakeup
Definition: poolPriv.h:45
LIBCOM_API void epicsThreadPoolDestroy ( epicsThreadPool )

Definition at line 199 of file threadPool.c.

200 {
201  unsigned int nThr;
202  ELLLIST notify;
203  ELLNODE *cur;
204 
205  if (!pool)
206  return;
207 
208  ellInit(&notify);
209 
210  epicsMutexMustLock(pool->guard);
211 
212  /* run remaining queued jobs */
213  epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueAdd, 0);
214  epicsThreadPoolControlImpl(pool, epicsThreadPoolQueueRun, 1);
215  nThr = pool->threadsRunning;
216  pool->freezeopt = 1;
217 
218  epicsMutexUnlock(pool->guard);
219 
220  epicsThreadPoolWait(pool, -1.0);
221  /* At this point all queued jobs have run */
222 
223  epicsMutexMustLock(pool->guard);
224 
225  pool->shutdown = 1;
226  /* wakeup all */
227  if (pool->threadsWaking < pool->threadsSleeping) {
228  pool->threadsWaking = pool->threadsSleeping;
229  epicsEventSignal(pool->workerWakeup);
230  }
231 
232  ellConcat(&notify, &pool->owned);
233  ellConcat(&notify, &pool->jobs);
234 
235  epicsMutexUnlock(pool->guard);
236 
237  if (nThr && epicsEventWait(pool->shutdownEvent) != epicsEventWaitOK){
238  errlogMessage("epicsThreadPoolDestroy: wait error");
239  return;
240  }
241 
242  /* all workers are now shutdown */
243 
244  /* notify remaining jobs that pool is being destroyed */
245  while ((cur = ellGet(&notify)) != NULL) {
246  epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
247 
248  job->running = 1;
249  job->func(job->arg, epicsJobModeCleanup);
250  job->running = 0;
251  if (job->freewhendone)
252  free(job);
253  else
254  job->pool = NULL; /* orphan */
255  }
256 
257  epicsEventDestroy(pool->workerWakeup);
258  epicsEventDestroy(pool->shutdownEvent);
259  epicsEventDestroy(pool->observerWakeup);
260  epicsMutexDestroy(pool->guard);
261 
262  free(pool);
263 }
epicsThreadPool * pool
Definition: poolPriv.h:89
void epicsStdCall epicsMutexDestroy(epicsMutexId pmutexNode)
Destroy an epicsMutex semaphore.
Definition: epicsMutex.cpp:127
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
unsigned int freewhendone
Definition: poolPriv.h:93
void * arg
Definition: poolPriv.h:88
#define epicsEventWait(ID)
Definition: osdEvent.h:19
ELLNODE * ellGet(ELLLIST *pList)
Deletes and returns the first node from a list.
Definition: ellLib.c:147
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsJobFunction func
Definition: poolPriv.h:87
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
unsigned int running
Definition: poolPriv.h:92
LIBCOM_API void epicsEventDestroy(epicsEventId id)
Destroy an epicsEvent and any resources it holds.
Definition: osdEvent.c:70
List node type.
Definition: ellLib.h:45
int epicsThreadPoolWait(epicsThreadPool *pool, double timeout)
Definition: threadPool.c:159
int errlogMessage(const char *message)
Definition: errlog.c:180
#define ellInit(PLIST)
Initialize a list type.
Definition: ellLib.h:76
#define epicsEventWaitOK
Old name provided for backwards compatibility.
Definition: epicsEvent.h:58
void ellConcat(ELLLIST *pDstList, ELLLIST *pAddList)
Concatenates a list to the end of another list. The list to be added is left empty. Either list (or both) can be empty at the beginning of the operation.
Definition: ellLib.c:46
List header type.
Definition: ellLib.h:56
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API epicsThreadPool* epicsThreadPoolGetShared ( epicsThreadPoolConfig opts)

Definition at line 328 of file threadPool.c.

329 {
330  ELLNODE *node;
331  epicsThreadPool *cur;
332  epicsThreadPoolConfig defopts;
333  size_t N = epicsThreadGetCPUs();
334 
335  if (!opts) {
337  opts = &defopts;
338  }
339  /* shared pools must have a minimum allowed number of workers.
340  * Use the number of CPU cores
341  */
342  if (opts->maxThreads < N)
343  opts->maxThreads = N;
344 
345  epicsThreadOnce(&sharedPoolsOnce, &sharedPoolsInit, NULL);
346 
347  epicsMutexMustLock(sharedPoolsGuard);
348 
349  for (node = ellFirst(&sharedPools); node; node = ellNext(node)) {
350  cur = CONTAINER(node, epicsThreadPool, sharedNode);
351 
352  /* Must have exactly the requested priority
353  * At least the requested max workers
354  * and at least the requested stack size
355  */
356  if (cur->conf.workerPriority != opts->workerPriority)
357  continue;
358  if (cur->conf.maxThreads < opts->maxThreads)
359  continue;
360  if (cur->conf.workerStack < opts->workerStack)
361  continue;
362 
363  cur->sharedCount++;
364  assert(cur->sharedCount > 0);
365  epicsMutexUnlock(sharedPoolsGuard);
366 
368  *opts = cur->conf;
369  epicsMutexUnlock(cur->guard);
370  return cur;
371  }
372 
373  cur = epicsThreadPoolCreate(opts);
374  if (!cur) {
375  epicsMutexUnlock(sharedPoolsGuard);
376  return NULL;
377  }
378  cur->sharedCount = 1;
379 
380  ellAdd(&sharedPools, &cur->sharedNode);
381  epicsMutexUnlock(sharedPoolsGuard);
382  return cur;
383 }
epicsMutexId guard
Definition: poolPriv.h:56
void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
Definition: threadPool.c:26
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsThreadPool * epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
Definition: threadPool.c:37
ELLNODE sharedNode
Definition: poolPriv.h:18
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
#define ellNext(PNODE)
Find the next node in list.
Definition: ellLib.h:99
LIBCOM_API int epicsThreadGetCPUs(void)
Definition: osdThread.c:990
LIBCOM_API void epicsStdCall epicsThreadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC, void *arg)
unsigned int workerStack
unsigned int workerPriority
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
List node type.
Definition: ellLib.h:45
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
size_t sharedCount
Definition: poolPriv.h:19
#define ellFirst(PLIST)
Find the first node in list.
Definition: ellLib.h:89
LIBCOM_API unsigned int epicsThreadPoolNThreads ( epicsThreadPool )

Definition at line 302 of file threadPool.c.

303 {
304  unsigned int ret;
305 
306  epicsMutexMustLock(pool->guard);
307  ret = pool->threadsRunning;
308  epicsMutexUnlock(pool->guard);
309 
310  return ret;
311 }
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
LIBCOM_API void epicsThreadPoolReleaseShared ( epicsThreadPool pool)

Definition at line 385 of file threadPool.c.

386 {
387  if (!pool)
388  return;
389 
390  epicsMutexMustLock(sharedPoolsGuard);
391 
392  assert(pool->sharedCount > 0);
393 
394  pool->sharedCount--;
395 
396  if (pool->sharedCount == 0) {
397  ellDelete(&sharedPools, &pool->sharedNode);
399  }
400 
401  epicsMutexUnlock(sharedPoolsGuard);
402 }
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
ELLNODE sharedNode
Definition: poolPriv.h:18
void epicsThreadPoolDestroy(epicsThreadPool *pool)
Definition: threadPool.c:199
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
size_t sharedCount
Definition: poolPriv.h:19
LIBCOM_API void epicsThreadPoolReport ( epicsThreadPool pool,
FILE *  fd 
)

Definition at line 266 of file threadPool.c.

267 {
268  ELLNODE *cur;
269  epicsMutexMustLock(pool->guard);
270 
271  fprintf(fd, "Thread Pool with %u/%u threads\n"
272  " running %d jobs with %u threads\n",
273  pool->threadsRunning,
274  pool->conf.maxThreads,
275  ellCount(&pool->jobs),
276  pool->threadsAreAwake);
277  if (pool->pauseadd)
278  fprintf(fd, " Inhibit queueing\n");
279  if (pool->pauserun)
280  fprintf(fd, " Pause workers\n");
281  if (pool->shutdown)
282  fprintf(fd, " Shutdown in progress\n");
283 
284  for (cur = ellFirst(&pool->jobs); cur; cur = ellNext(cur)) {
285  epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
286 
287  fprintf(fd, " job %p func: %p, arg: %p ",
288  job, job->func,
289  job->arg);
290  if (job->queued)
291  fprintf(fd, "Queued ");
292  if (job->running)
293  fprintf(fd, "Running ");
294  if (job->freewhendone)
295  fprintf(fd, "Free ");
296  fprintf(fd, "\n");
297  }
298 
299  epicsMutexUnlock(pool->guard);
300 }
epicsMutexId guard
Definition: poolPriv.h:56
#define ellCount(PLIST)
Report the number of nodes in a list.
Definition: ellLib.h:84
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
unsigned int freewhendone
Definition: poolPriv.h:93
void * arg
Definition: poolPriv.h:88
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
epicsJobFunction func
Definition: poolPriv.h:87
#define ellNext(PNODE)
Find the next node in list.
Definition: ellLib.h:99
unsigned int threadsAreAwake
Definition: poolPriv.h:31
unsigned int running
Definition: poolPriv.h:92
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
List node type.
Definition: ellLib.h:45
unsigned int pauseadd
Definition: poolPriv.h:48
unsigned int threadsRunning
Definition: poolPriv.h:37
unsigned int pauserun
Definition: poolPriv.h:50
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
unsigned int shutdown
Definition: poolPriv.h:54
#define ellFirst(PLIST)
Find the first node in list.
Definition: ellLib.h:89
LIBCOM_API int epicsThreadPoolWait ( epicsThreadPool pool,
double  timeout 
)

Definition at line 159 of file threadPool.c.

160 {
161  int ret = 0;
162  epicsMutexMustLock(pool->guard);
163 
164  while (ellCount(&pool->jobs) > 0 || pool->threadsAreAwake > 0) {
165  pool->observerCount++;
166  epicsMutexUnlock(pool->guard);
167 
168  if (timeout < 0.0) {
170  }
171  else {
173  case epicsEventWaitError:
174  cantProceed("epicsThreadPoolWait: failed to wait for Event");
175  break;
177  ret = S_pool_timeout;
178  break;
179  case epicsEventWaitOK:
180  ret = 0;
181  break;
182  }
183  }
184 
185  epicsMutexMustLock(pool->guard);
186  pool->observerCount--;
187 
188  if (pool->observerCount)
190 
191  if (ret != 0)
192  break;
193  }
194 
195  epicsMutexUnlock(pool->guard);
196  return ret;
197 }
double timeout
Definition: pvutils.cpp:25
epicsMutexId guard
Definition: poolPriv.h:56
#define ellCount(PLIST)
Report the number of nodes in a list.
Definition: ellLib.h:84
ELLLIST jobs
Definition: poolPriv.h:21
LIBCOM_API epicsEventStatus epicsEventWaitWithTimeout(epicsEventId id, double timeOut)
Wait an the event or until the specified timeout period is over.
Definition: osdEvent.c:117
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
LIBCOM_API void epicsEventMustWait(epicsEventId id)
Wait for an event (see epicsEventWait()).
Definition: epicsEvent.cpp:123
unsigned int threadsAreAwake
Definition: poolPriv.h:31
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
unsigned int observerCount
Definition: poolPriv.h:40
#define epicsEventWaitOK
Old name provided for backwards compatibility.
Definition: epicsEvent.h:58
#define S_pool_timeout
LIBCOM_API void cantProceed(const char *msg,...)
Definition: cantProceed.c:54
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
#define epicsEventWaitError
Old name provided for backwards compatibility.
Definition: epicsEvent.h:60
epicsEventId observerWakeup
Definition: poolPriv.h:45

Variable Documentation

LIBCOM_API void* epicsJobArgSelfMagic

Definition at line 24 of file poolJob.c.