00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <iostream>
00021 #include <algorithm>
00022 #include <archon/util/logger.H>
00023 #include <archon/util/text.H>
00024
00025 #include <archon/util/job.H>
00026
00027 namespace Archon
00028 {
00029 namespace Utilities
00030 {
00031 using namespace std;
00032
00033 static inline void log(string m)
00034 {
00035 Logger::get()->log(m);
00036 }
00037
00038 void Job::Queue::worker()
00039 {
00040 unsigned long id;
00041
00042 {
00043 Mutex::Lock l(mutex);
00044 id = nextThreadId++;
00045 }
00046
00047
00048
00049 for(;;)
00050 {
00051 Mutex::Lock l(mutex);
00052
00053 if(pendingJobs.empty())
00054 {
00055 --activeThreads;
00056 idleThreads.push_back(id);
00057
00058 if(activeThreads == 0) allDone.notifyAll();
00059
00060 Time wakeup = Time::now();
00061 wakeup += maxIdleTime;
00062
00063 for(;;)
00064 {
00065
00066
00067 const bool timedOut = newJob.timedWait(wakeup);
00068
00069 if(shutdown)
00070 {
00071
00072
00073
00074
00075
00076
00077 idleThreads.pop_back();
00078
00079 threadQuit.notifyAll();
00080 return;
00081 }
00082
00083 if(timedOut)
00084 {
00085 idleThreads.erase(find(idleThreads.begin(), idleThreads.end(), id));
00086 if(!pendingJobs.empty()) break;
00087
00088 threadQuit.notifyAll();
00089 return;
00090 }
00091
00092 if(pendingJobs.empty() || idleThreads.back() != id) continue;
00093 idleThreads.pop_back();
00094 break;
00095 }
00096
00097 ++activeThreads;
00098 }
00099
00100 Ref<Job> job = pendingJobs.front();
00101 pendingJobs.pop_front();
00102
00103 job->thread = Thread::self().get();
00104
00105 l.release();
00106
00107
00108 job->main();
00109
00110 {
00111 Mutex::Lock l(mutex);
00112 job->thread = 0;
00113 }
00114
00115 Thread::selfResurrect();
00116 }
00117 }
00118
00119 Job::Queue::Queue(int maxThreads, Time maxIdleTime):
00120 maxThreads(maxThreads), maxIdleTime(maxIdleTime),
00121 newJob(mutex), threadQuit(mutex), allDone(mutex),
00122 nextThreadId(0), activeThreads(0), shutdown(false)
00123 {
00124 }
00125
00126 Job::Queue::~Queue()
00127 {{
00128 Mutex::Lock l(mutex);
00129
00130
00131 while(!pendingJobs.empty() || activeThreads) allDone.wait();
00132
00133
00134 shutdown = true;
00135 newJob.notifyAll();
00136
00137
00138
00139
00140
00141
00142
00143 while(!idleThreads.empty()) threadQuit.wait();
00144 }}
00145
00146 void Job::Queue::workerEntry(Job::Queue *q)
00147 {
00148 q->worker();
00149 }
00150
00151 void Job::Queue::add(Ref<Job> j)
00152 {
00153 Mutex::Lock l(mutex);
00154 if(shutdown) ARCHON_THROW1(StateException, "Shutting down");
00155 pendingJobs.push_back(j);
00156 if(idleThreads.empty() && activeThreads < maxThreads)
00157 {
00158 ++activeThreads;
00159 Thread::run(workerEntry, this);
00160 }
00161 newJob.notifyAll();
00162 }
00163
00167 void Job::Queue::cancel(Ref<Job> j)
00168 {
00169 Mutex::Lock l(mutex);
00170 if(j->thread)
00171 {
00172 Ref<Thread> t(j->thread, RefSafeIncTag());
00173 l.release();
00174 if(!t) return;
00175 t->terminate();
00176 }
00177 else pendingJobs.remove(j);
00178 }
00179
00180 void Job::Queue::wait()
00181 {
00182 Mutex::Lock l(mutex);
00183 while(!pendingJobs.empty() || activeThreads > 0) allDone.wait();
00184 }
00185 }
00186 }