This is Unofficial EPICS BASE Doxygen Site
poolJob.c
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2014 Brookhaven Science Associates, as Operator of
3 * Brookhaven National Laboratory.
4 * EPICS BASE is distributed subject to a Software License Agreement found
5 * in file LICENSE that is included with this distribution.
6 \*************************************************************************/
7 
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 
12 
13 #include "dbDefs.h"
14 #include "errlog.h"
15 #include "ellLib.h"
16 #include "epicsThread.h"
17 #include "epicsMutex.h"
18 #include "epicsEvent.h"
19 #include "epicsInterrupt.h"
20 
21 #include "epicsThreadPool.h"
22 #include "poolPriv.h"
23 
25 
26 static
27 void workerMain(void *arg)
28 {
30  unsigned int nrun, ocnt;
31 
32  /* workers are created with counts
33  * in the running, sleeping, and (possibly) waking counters
34  */
35 
37  pool->threadsAreAwake++;
38  pool->threadsSleeping--;
39 
40  while (1) {
41  ELLNODE *cur;
42 
43  pool->threadsAreAwake--;
44  pool->threadsSleeping++;
45  epicsMutexUnlock(pool->guard);
46 
48 
50  pool->threadsSleeping--;
51  pool->threadsAreAwake++;
52 
53  if (pool->threadsWaking==0)
54  continue;
55 
56  pool->threadsWaking--;
57 
58  CHECKCOUNT(pool);
59 
60  if (pool->shutdown)
61  break;
62 
63  if (pool->pauserun)
64  continue;
65 
66  /* more threads to wakeup */
67  if (pool->threadsWaking) {
69  }
70 
71  while ((cur=ellGet(&pool->jobs)) != NULL) {
72  epicsJob *job = CONTAINER(cur, epicsJob, jobnode);
73 
74  assert(job->queued && !job->running);
75 
76  job->queued=0;
77  job->running=1;
78 
79  epicsMutexUnlock(pool->guard);
80  (*job->func)(job->arg, epicsJobModeRun);
82 
83  if (job->freewhendone) {
84  job->dead=1;
85  free(job);
86  }
87  else {
88  job->running=0;
89  /* job may be re-queued from within callback */
90  if (job->queued)
91  ellAdd(&pool->jobs, &job->jobnode);
92  else
93  ellAdd(&pool->owned, &job->jobnode);
94  }
95  }
96 
97  if (pool->observerCount)
99  }
100 
101  pool->threadsAreAwake--;
102  pool->threadsRunning--;
103 
104  nrun = pool->threadsRunning;
105  ocnt = pool->observerCount;
106  epicsMutexUnlock(pool->guard);
107 
108  if (ocnt)
110 
111  if (nrun)
112  epicsEventSignal(pool->workerWakeup); /* pass along */
113  else
115 }
116 
118 {
119  epicsThreadId tid;
120 
121  tid = epicsThreadCreate("PoolWorker",
122  pool->conf.workerPriority,
123  pool->conf.workerStack,
124  &workerMain,
125  pool);
126  if (!tid)
127  return S_pool_noThreads;
128 
129  pool->threadsRunning++;
130  pool->threadsSleeping++;
131  return 0;
132 }
133 
136  void *arg)
137 {
138  epicsJob *job = calloc(1, sizeof(*job));
139 
140  if (!job)
141  return NULL;
142 
143  if (arg == &epicsJobArgSelfMagic)
144  arg = job;
145 
146  job->pool = NULL;
147  job->func = func;
148  job->arg = arg;
149 
150  epicsJobMove(job, pool);
151 
152  return job;
153 }
154 
156 {
158  if (!job || !job->pool) {
159  free(job);
160  return;
161  }
162  pool = job->pool;
163 
164  epicsMutexMustLock(pool->guard);
165 
166  assert(!job->dead);
167 
168  epicsJobUnqueue(job);
169 
170  if (job->running || job->freewhendone) {
171  job->freewhendone = 1;
172  }
173  else {
174  ellDelete(&pool->owned, &job->jobnode);
175  job->dead = 1;
176  free(job);
177  }
178 
179  epicsMutexUnlock(pool->guard);
180 }
181 
183 {
184  epicsThreadPool *pool = job->pool;
185 
186  /* remove from current pool */
187  if (pool) {
188  epicsMutexMustLock(pool->guard);
189 
190  if (job->queued || job->running) {
191  epicsMutexUnlock(pool->guard);
192  return S_pool_jobBusy;
193  }
194 
195  ellDelete(&pool->owned, &job->jobnode);
196 
197  epicsMutexUnlock(pool->guard);
198  }
199 
200  pool = job->pool = newpool;
201 
202  /* add to new pool */
203  if (pool) {
204  epicsMutexMustLock(pool->guard);
205 
206  ellAdd(&pool->owned, &job->jobnode);
207 
208  epicsMutexUnlock(pool->guard);
209  }
210 
211  return 0;
212 }
213 
215 {
216  int ret = 0;
217  epicsThreadPool *pool = job->pool;
218 
219  if (!pool)
220  return S_pool_noPool;
221 
222  epicsMutexMustLock(pool->guard);
223 
224  assert(!job->dead);
225 
226  if (pool->pauseadd) {
227  ret = S_pool_paused;
228  goto done;
229  }
230  else if (job->freewhendone) {
231  ret = S_pool_jobBusy;
232  goto done;
233  }
234  else if (job->queued) {
235  goto done;
236  }
237 
238  job->queued = 1;
239  /* Job may be queued from within a callback */
240  if (!job->running) {
241  ellDelete(&pool->owned, &job->jobnode);
242  ellAdd(&pool->jobs, &job->jobnode);
243  }
244  else {
245  /* some worker will find it again before sleeping */
246  goto done;
247  }
248 
249  /* Since we hold the lock, we can be certain that all awake worker are
250  * executing work functions. The current thread may be a worker.
251  * We prefer to wakeup a new worker rather then wait for a busy worker to
252  * finish. However, after we initiate a wakeup there will be a race
253  * between the worker waking up, and a busy worker finishing.
254  * Thus we can't avoid spurious wakeups.
255  */
256 
257  if (pool->threadsRunning >= pool->conf.maxThreads) {
258  /* all workers created... */
259  /* ... but some are sleeping, so wake one up */
260  if (pool->threadsWaking < pool->threadsSleeping) {
261  pool->threadsWaking++;
263  }
264  /*else one of the running workers will find this job before sleeping */
265  CHECKCOUNT(pool);
266 
267  }
268  else {
269  /* could create more workers so
270  * will either create a new worker, or wakeup an existing worker
271  */
272 
273  if (pool->threadsWaking >= pool->threadsSleeping) {
274  /* all sleeping workers have already been woken.
275  * start a new worker for this job
276  */
277  if (createPoolThread(pool) && pool->threadsRunning == 0) {
278  /* oops, we couldn't lazy create our first worker
279  * so this job would never run!
280  */
281  ret = S_pool_noThreads;
282  job->queued = 0;
283  /* if threadsRunning==0 then no jobs can be running */
284  assert(!job->running);
285  ellDelete(&pool->jobs, &job->jobnode);
286  ellAdd(&pool->owned, &job->jobnode);
287  }
288  }
289  if (ret == 0) {
290  pool->threadsWaking++;
292  }
293  CHECKCOUNT(pool);
294  }
295 
296 done:
297  epicsMutexUnlock(pool->guard);
298  return ret;
299 }
300 
302 {
303  int ret = S_pool_jobIdle;
304  epicsThreadPool *pool = job->pool;
305 
306  if (!pool)
307  return S_pool_noPool;
308 
309  epicsMutexMustLock(pool->guard);
310 
311  assert(!job->dead);
312 
313  if (job->queued) {
314  if (!job->running) {
315  ellDelete(&pool->jobs, &job->jobnode);
316  ellAdd(&pool->owned, &job->jobnode);
317  }
318  job->queued = 0;
319  ret = 0;
320  }
321 
322  epicsMutexUnlock(pool->guard);
323 
324  return ret;
325 }
326 
epicsEventId workerWakeup
Definition: poolPriv.h:42
int epicsJobUnqueue(epicsJob *job)
Definition: poolJob.c:301
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
#define CHECKCOUNT(pPool)
Definition: poolPriv.h:63
epicsThreadPool * pool
Definition: poolPriv.h:89
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
int epicsJobQueue(epicsJob *job)
Definition: poolJob.c:214
#define S_pool_jobBusy
#define CONTAINER(ptr, structure, member)
Find parent object from a member pointer.
Definition: dbDefs.h:66
unsigned int freewhendone
Definition: poolPriv.h:93
void epicsJobDestroy(epicsJob *job)
Definition: poolJob.c:155
int createPoolThread(epicsThreadPool *pool)
Definition: poolJob.c:117
void * arg
Definition: poolPriv.h:88
ELLNODE jobnode
Definition: poolPriv.h:86
ELLLIST jobs
Definition: poolPriv.h:21
ELLNODE * ellGet(ELLLIST *pList)
Deletes and returns the first node from a list.
Definition: ellLib.c:147
#define NULL
Definition: catime.c:38
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
Miscellaneous macro definitions.
LIBCOM_API void epicsEventMustWait(epicsEventId id)
Wait for an event (see epicsEventWait()).
Definition: epicsEvent.cpp:123
epicsThreadId epicsStdCall epicsThreadCreate(const char *name, unsigned int priority, unsigned int stackSize, EPICSTHREADFUNC funptr, void *parm)
Definition: epicsThread.cpp:33
epicsJob * epicsJobCreate(epicsThreadPool *pool, epicsJobFunction func, void *arg)
Definition: poolJob.c:134
epicsJobFunction func
Definition: poolPriv.h:87
int epicsJobMove(epicsJob *job, epicsThreadPool *newpool)
Definition: poolJob.c:182
unsigned int threadsWaking
Definition: poolPriv.h:33
A doubly-linked list library.
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
unsigned int dead
Definition: poolPriv.h:94
unsigned int threadsAreAwake
Definition: poolPriv.h:31
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
APIs for the epicsMutex mutual exclusion semaphore.
unsigned int workerStack
unsigned int workerPriority
unsigned int running
Definition: poolPriv.h:92
#define S_pool_jobIdle
epicsEventId shutdownEvent
Definition: poolPriv.h:43
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
unsigned int observerCount
Definition: poolPriv.h:40
List node type.
Definition: ellLib.h:45
APIs for the epicsEvent binary semaphore.
unsigned int pauseadd
Definition: poolPriv.h:48
unsigned int threadsRunning
Definition: poolPriv.h:37
#define S_pool_paused
void done(int k)
Definition: antelope.c:77
unsigned int threadsSleeping
Definition: poolPriv.h:35
#define S_pool_noPool
void(* epicsJobFunction)(void *arg, epicsJobMode mode)
unsigned int pauserun
Definition: poolPriv.h:50
C++ and C descriptions for a thread.
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
unsigned int shutdown
Definition: poolPriv.h:54
void * epicsJobArgSelfMagic
Definition: poolJob.c:24
#define S_pool_noThreads
epicsEventId observerWakeup
Definition: poolPriv.h:45