This is Unofficial EPICS BASE Doxygen Site
poolJob.c File Reference
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include "dbDefs.h"
#include "errlog.h"
#include "ellLib.h"
#include "epicsThread.h"
#include "epicsMutex.h"
#include "epicsEvent.h"
#include "epicsInterrupt.h"
#include "epicsThreadPool.h"
#include "poolPriv.h"
+ Include dependency graph for poolJob.c:

Go to the source code of this file.

Functions

int createPoolThread (epicsThreadPool *pool)
 
epicsJobepicsJobCreate (epicsThreadPool *pool, epicsJobFunction func, void *arg)
 
void epicsJobDestroy (epicsJob *job)
 
int epicsJobMove (epicsJob *job, epicsThreadPool *newpool)
 
int epicsJobQueue (epicsJob *job)
 
int epicsJobUnqueue (epicsJob *job)
 

Variables

void * epicsJobArgSelfMagic = &epicsJobArgSelfMagic
 

Function Documentation

int createPoolThread ( epicsThreadPool pool)

Definition at line 117 of file poolJob.c.

118 {
119  epicsThreadId tid;
120 
121  tid = epicsThreadCreate("PoolWorker",
122  pool->conf.workerPriority,
123  pool->conf.workerStack,
124  &workerMain,
125  pool);
126  if (!tid)
127  return S_pool_noThreads;
128 
129  pool->threadsRunning++;
130  pool->threadsSleeping++;
131  return 0;
132 }
epicsThreadId epicsStdCall epicsThreadCreate(const char *name, unsigned int priority, unsigned int stackSize, EPICSTHREADFUNC funptr, void *parm)
Definition: epicsThread.cpp:33
unsigned int workerStack
unsigned int workerPriority
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
unsigned int threadsRunning
Definition: poolPriv.h:37
unsigned int threadsSleeping
Definition: poolPriv.h:35
#define S_pool_noThreads
epicsJob* epicsJobCreate ( epicsThreadPool pool,
epicsJobFunction  func,
void *  arg 
)

Definition at line 134 of file poolJob.c.

137 {
138  epicsJob *job = calloc(1, sizeof(*job));
139 
140  if (!job)
141  return NULL;
142 
143  if (arg == &epicsJobArgSelfMagic)
144  arg = job;
145 
146  job->pool = NULL;
147  job->func = func;
148  job->arg = arg;
149 
150  epicsJobMove(job, pool);
151 
152  return job;
153 }
epicsThreadPool * pool
Definition: poolPriv.h:89
void * arg
Definition: poolPriv.h:88
#define NULL
Definition: catime.c:38
epicsJobFunction func
Definition: poolPriv.h:87
int epicsJobMove(epicsJob *job, epicsThreadPool *newpool)
Definition: poolJob.c:182
void * epicsJobArgSelfMagic
Definition: poolJob.c:24
void epicsJobDestroy ( epicsJob job)

Definition at line 155 of file poolJob.c.

156 {
157  epicsThreadPool *pool;
158  if (!job || !job->pool) {
159  free(job);
160  return;
161  }
162  pool = job->pool;
163 
164  epicsMutexMustLock(pool->guard);
165 
166  assert(!job->dead);
167 
168  epicsJobUnqueue(job);
169 
170  if (job->running || job->freewhendone) {
171  job->freewhendone = 1;
172  }
173  else {
174  ellDelete(&pool->owned, &job->jobnode);
175  job->dead = 1;
176  free(job);
177  }
178 
179  epicsMutexUnlock(pool->guard);
180 }
int epicsJobUnqueue(epicsJob *job)
Definition: poolJob.c:301
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
epicsThreadPool * pool
Definition: poolPriv.h:89
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
unsigned int freewhendone
Definition: poolPriv.h:93
ELLNODE jobnode
Definition: poolPriv.h:86
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int dead
Definition: poolPriv.h:94
unsigned int running
Definition: poolPriv.h:92
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
int epicsJobMove ( epicsJob job,
epicsThreadPool newpool 
)

Definition at line 182 of file poolJob.c.

183 {
184  epicsThreadPool *pool = job->pool;
185 
186  /* remove from current pool */
187  if (pool) {
188  epicsMutexMustLock(pool->guard);
189 
190  if (job->queued || job->running) {
191  epicsMutexUnlock(pool->guard);
192  return S_pool_jobBusy;
193  }
194 
195  ellDelete(&pool->owned, &job->jobnode);
196 
197  epicsMutexUnlock(pool->guard);
198  }
199 
200  pool = job->pool = newpool;
201 
202  /* add to new pool */
203  if (pool) {
204  epicsMutexMustLock(pool->guard);
205 
206  ellAdd(&pool->owned, &job->jobnode);
207 
208  epicsMutexUnlock(pool->guard);
209  }
210 
211  return 0;
212 }
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
epicsThreadPool * pool
Definition: poolPriv.h:89
#define S_pool_jobBusy
ELLNODE jobnode
Definition: poolPriv.h:86
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
unsigned int running
Definition: poolPriv.h:92
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
int epicsJobQueue ( epicsJob job)

Definition at line 214 of file poolJob.c.

215 {
216  int ret = 0;
217  epicsThreadPool *pool = job->pool;
218 
219  if (!pool)
220  return S_pool_noPool;
221 
222  epicsMutexMustLock(pool->guard);
223 
224  assert(!job->dead);
225 
226  if (pool->pauseadd) {
227  ret = S_pool_paused;
228  goto done;
229  }
230  else if (job->freewhendone) {
231  ret = S_pool_jobBusy;
232  goto done;
233  }
234  else if (job->queued) {
235  goto done;
236  }
237 
238  job->queued = 1;
239  /* Job may be queued from within a callback */
240  if (!job->running) {
241  ellDelete(&pool->owned, &job->jobnode);
242  ellAdd(&pool->jobs, &job->jobnode);
243  }
244  else {
245  /* some worker will find it again before sleeping */
246  goto done;
247  }
248 
249  /* Since we hold the lock, we can be certain that all awake worker are
250  * executing work functions. The current thread may be a worker.
251  * We prefer to wakeup a new worker rather then wait for a busy worker to
252  * finish. However, after we initiate a wakeup there will be a race
253  * between the worker waking up, and a busy worker finishing.
254  * Thus we can't avoid spurious wakeups.
255  */
256 
257  if (pool->threadsRunning >= pool->conf.maxThreads) {
258  /* all workers created... */
259  /* ... but some are sleeping, so wake one up */
260  if (pool->threadsWaking < pool->threadsSleeping) {
261  pool->threadsWaking++;
263  }
264  /*else one of the running workers will find this job before sleeping */
265  CHECKCOUNT(pool);
266 
267  }
268  else {
269  /* could create more workers so
270  * will either create a new worker, or wakeup an existing worker
271  */
272 
273  if (pool->threadsWaking >= pool->threadsSleeping) {
274  /* all sleeping workers have already been woken.
275  * start a new worker for this job
276  */
277  if (createPoolThread(pool) && pool->threadsRunning == 0) {
278  /* oops, we couldn't lazy create our first worker
279  * so this job would never run!
280  */
281  ret = S_pool_noThreads;
282  job->queued = 0;
283  /* if threadsRunning==0 then no jobs can be running */
284  assert(!job->running);
285  ellDelete(&pool->jobs, &job->jobnode);
286  ellAdd(&pool->owned, &job->jobnode);
287  }
288  }
289  if (ret == 0) {
290  pool->threadsWaking++;
292  }
293  CHECKCOUNT(pool);
294  }
295 
296 done:
297  epicsMutexUnlock(pool->guard);
298  return ret;
299 }
epicsEventId workerWakeup
Definition: poolPriv.h:42
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
#define CHECKCOUNT(pPool)
Definition: poolPriv.h:63
epicsThreadPool * pool
Definition: poolPriv.h:89
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
#define S_pool_jobBusy
unsigned int freewhendone
Definition: poolPriv.h:93
int createPoolThread(epicsThreadPool *pool)
Definition: poolJob.c:117
ELLNODE jobnode
Definition: poolPriv.h:86
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
unsigned int threadsWaking
Definition: poolPriv.h:33
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
unsigned int dead
Definition: poolPriv.h:94
#define epicsEventSignal(ID)
A synonym for epicsEventTrigger().
Definition: epicsEvent.h:172
unsigned int running
Definition: poolPriv.h:92
epicsThreadPoolConfig conf
Definition: poolPriv.h:59
unsigned int pauseadd
Definition: poolPriv.h:48
unsigned int threadsRunning
Definition: poolPriv.h:37
#define S_pool_paused
void done(int k)
Definition: antelope.c:77
unsigned int threadsSleeping
Definition: poolPriv.h:35
#define S_pool_noPool
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75
#define S_pool_noThreads
int epicsJobUnqueue ( epicsJob job)

Definition at line 301 of file poolJob.c.

302 {
303  int ret = S_pool_jobIdle;
304  epicsThreadPool *pool = job->pool;
305 
306  if (!pool)
307  return S_pool_noPool;
308 
309  epicsMutexMustLock(pool->guard);
310 
311  assert(!job->dead);
312 
313  if (job->queued) {
314  if (!job->running) {
315  ellDelete(&pool->jobs, &job->jobnode);
316  ellAdd(&pool->owned, &job->jobnode);
317  }
318  job->queued = 0;
319  ret = 0;
320  }
321 
322  epicsMutexUnlock(pool->guard);
323 
324  return ret;
325 }
ELLLIST owned
Definition: poolPriv.h:22
epicsMutexId guard
Definition: poolPriv.h:56
epicsThreadPool * pool
Definition: poolPriv.h:89
#define assert(exp)
Declare that a condition should be true.
Definition: epicsAssert.h:70
ELLNODE jobnode
Definition: poolPriv.h:86
ELLLIST jobs
Definition: poolPriv.h:21
void epicsStdCall epicsMutexUnlock(epicsMutexId pmutexNode)
Release the semaphore.
Definition: epicsMutex.cpp:140
void ellAdd(ELLLIST *pList, ELLNODE *pNode)
Adds a node to the end of a list.
Definition: ellLib.c:24
unsigned int dead
Definition: poolPriv.h:94
unsigned int running
Definition: poolPriv.h:92
#define S_pool_jobIdle
#define S_pool_noPool
#define epicsMutexMustLock(ID)
Claim a semaphore (see epicsMutexLock()).
Definition: epicsMutex.h:214
unsigned int queued
Definition: poolPriv.h:91
void ellDelete(ELLLIST *pList, ELLNODE *pNode)
Deletes a node from a list.
Definition: ellLib.c:75

Variable Documentation

void* epicsJobArgSelfMagic = &epicsJobArgSelfMagic

Definition at line 24 of file poolJob.c.