This is Unofficial EPICS BASE Doxygen Site
tpool.cpp
Go to the documentation of this file.
1 
2 #include <typeinfo>
3 #include <stdexcept>
4 
5 #include <epicsEvent.h>
6 #include <epicsGuard.h>
7 #include <epicsThread.h>
8 #include <errlog.h>
9 
10 #include <pv/sharedPtr.h>
11 
12 #include "helper.h"
13 #include "tpool.h"
14 
17 
18 WorkQueue::WorkQueue(const std::string& name)
19  :name(name)
20  ,state(Idle)
21 {}
22 
24 
25 void WorkQueue::start(unsigned nworkers, unsigned prio)
26 {
27  Guard G(mutex);
28 
29  if(state!=Idle)
30  throw std::logic_error("Already started");
31 
32  try {
33  state = Active;
34 
35  for(unsigned i=0; i<nworkers; i++) {
36  p2p::auto_ptr<epicsThread> worker(new epicsThread(*this, name.c_str(),
38  prio));
39 
40  worker->start();
41 
42  workers.push_back(worker.get());
43  worker.release();
44  }
45  }catch(...){
46  UnGuard U(G); // unlock as close() blocks to join any workers which were started
47  close();
48  throw;
49  }
50 }
51 
53 {
54  workers_t temp;
55 
56  {
57  Guard G(mutex);
58  if(state!=Active)
59  return;
60 
61  temp.swap(workers);
62  state = Stopping;
63  }
64 
65  wakeup.signal();
66 
67  for(workers_t::iterator it(temp.begin()), end(temp.end()); it!=end; ++it)
68  {
69  (*it)->exitWait();
70  delete *it;
71  }
72 
73  {
74  Guard G(mutex);
75  state = Idle;
76  }
77 }
78 
79 void WorkQueue::add(const value_type& work)
80 {
81  bool empty;
82 
83  {
84  Guard G(mutex);
85  if(state!=Active)
86  return;
87 
88  empty = queue.empty();
89 
90  queue.push_back(work);
91  }
92 
93  if(empty) {
94  wakeup.signal();
95  }
96 }
97 
98 void WorkQueue::run()
99 {
100  Guard G(mutex);
101 
102  std::tr1::shared_ptr<epicsThreadRunable> work;
103 
104  while(state==Active) {
105 
106  if(!queue.empty()) {
107  work = queue.front().lock();
108  queue.pop_front();
109  }
110 
111  bool last = queue.empty();
112 
113  {
114  UnGuard U(G);
115 
116  if(work) {
117  try {
118  work->run();
119  work.reset();
120  }catch(std::exception& e){
121  errlogPrintf("%s Unhandled exception from %s: %s\n",
122  name.c_str(), typeid(work.get()).name(), e.what());
123  work.reset();
124  }
125  }
126 
127  if(last) {
128  wakeup.wait();
129  }
130  }
131  }
132 
133  // pass along the close() signal to next worker
134  wakeup.signal();
135 }
epics::pvData::BitSetPtr empty
Definition: pvAccess.cpp:135
virtual ~WorkQueue()
Definition: tpool.cpp:23
epicsGuard< epicsMutex > Guard
Definition: tpool.cpp:15
int i
Definition: scan.c:967
std::tr1::weak_ptr< epicsThreadRunable > value_type
Definition: tpool.h:20
void close()
Definition: tpool.cpp:52
WorkQueue(const std::string &name)
Definition: tpool.cpp:18
void add(const value_type &work)
Definition: tpool.cpp:79
LIBCOM_API unsigned int epicsStdCall epicsThreadGetStackSize(epicsThreadStackSizeClass size)
Definition: osdThread.c:466
int errlogPrintf(const char *pFormat,...)
Definition: errlog.c:105
APIs for the epicsEvent binary semaphore.
epicsGuardRelease< epicsMutex > UnGuard
Definition: tpool.cpp:16
C++ and C descriptions for a thread.
void start(unsigned nworkers=1, unsigned prio=epicsThreadPriorityLow)
Definition: tpool.cpp:25