00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <errno.h>
00021 #include <iostream>
00022
00023 #include <archon/util/thread.H>
00024
00025 namespace Archon
00026 {
00027 namespace Utilities
00028 {
00029 using namespace std;
00030
00031 namespace
00032 {
00036 pthread_key_t selfKey;
00037
00041 pthread_once_t selfKeyOnce = PTHREAD_ONCE_INIT;
00042
00046 pthread_mutex_t activeThreadsMutex = PTHREAD_MUTEX_INITIALIZER;
00047
00051 pthread_cond_t lastActiveThread = PTHREAD_COND_INITIALIZER;
00052
00058 unsigned long activeThreads = 0;
00059
00060 void lockMutex(pthread_mutex_t *m)
00061 {
00062 int e = pthread_mutex_lock(m);
00063 if(e == 0) return;
00064 if(e == EDEADLK)
00065 ARCHON_THROW1(StateException,
00066 "Attempt to lock a locked mutex");
00067 if(e == EINVAL)
00068 ARCHON_THROW1(InternalException,
00069 "Attempt to lock uninitialized mutex");
00070 ARCHON_THROW1(InternalException,
00071 "Attempt to lock mutex failed");
00072 }
00073
00074 void unlockMutex(pthread_mutex_t *m)
00075 {
00076 int e = pthread_mutex_unlock(m);
00077 if(e == 0) return;
00078 if(e == EPERM)
00079 ARCHON_THROW1(StateException,
00080 "Attempt to unlock mutex that is locked by another thread");
00081 if(e == EINVAL)
00082 ARCHON_THROW1(InternalException,
00083 "Attempt to unlock uninitialized mutex");
00084 ARCHON_THROW1(InternalException,
00085 "Attempt to unlock mutex failed");
00086 }
00087 }
00088
00089
00090 void Thread::activate(Thread *t)
00091 {
00092 lockMutex(&activeThreadsMutex);
00093 ++activeThreads;
00094 unlockMutex(&activeThreadsMutex);
00095 t->refInc();
00096 }
00097
00109 void Thread::deactivate(Thread *t)
00110 {
00111 t->refDec();
00112 lockMutex(&activeThreadsMutex);
00113 bool l = --activeThreads == 1;
00114 unlockMutex(&activeThreadsMutex);
00115 if(l)
00116 {
00117 const int e = pthread_cond_broadcast(&lastActiveThread);
00118 if(e != 0)
00119 ARCHON_THROW1(InternalException,
00120 "Attempt to broadcast on condition failed");
00121 }
00122 }
00123
00130 void Thread::selfKeyDestroy(void *v) throw()
00131 {
00132 Thread *t = reinterpret_cast<Thread *>(v);
00133 {
00134 Mutex::Lock l(*t);
00135 t->terminated = true;
00136 }
00137 t->termination.notifyAll();
00138 deactivate(t);
00139 }
00140
00144 void Thread::selfKeyAlloc() throw()
00145 {
00146 int error = pthread_key_create(&selfKey, &Thread::selfKeyDestroy);
00147 if(error)
00148 {
00149 if(error == EAGAIN)
00150 ARCHON_THROW1(ResourceException,
00151 "Too many TSD keys");
00152 ARCHON_THROW1(InternalException,
00153 "Could not create new TSD key");
00154 }
00155 }
00156
00160 void Thread::registerSelf(Thread *t)
00161 {
00162 int error = pthread_once(&selfKeyOnce, &selfKeyAlloc);
00163 if(error)
00164 {
00165 deactivate(t);
00166 ARCHON_THROW1(InternalException, "pthread_once failed");
00167 }
00168 error = pthread_setspecific(selfKey, reinterpret_cast<void *>(t));
00169 if(error)
00170 {
00171 deactivate(t);
00172
00173 if(error == EINVAL)
00174 ARCHON_THROW1(InternalException,
00175 "Invalid TSD key in pthread_setspecific");
00176 ARCHON_THROW1(InternalException,
00177 "pthread_setspecific failed");
00178 }
00179 }
00180
00181 Thread::Thread():
00182 started(false), terminated(false), termination(*this),
00183 terminateRequest(false), waitCondition(0)
00184 {
00185 }
00186
00187 Thread::~Thread()
00188 {
00189 }
00190
00194 void *Thread::entry(Thread *t) throw()
00195 {
00196 int error = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, 0);
00197 if(error) ARCHON_THROW1(ResourceException,
00198 "pthread_setcancelstate failed");
00199 registerSelf(t);
00200 try
00201 {
00202 t->main();
00203 }
00204 catch(ThreadTerminatedException &) {}
00205 return 0;
00206 }
00207
00208 void Thread::start(Ref<Thread> t) throw(AlreadyStartedException)
00209 {
00210 if(!t) ARCHON_THROW1(ArgumentException, "Got null thread");
00211 {
00212 Mutex::Lock l(*t.get());
00213 if(t->started) ARCHON_THROW(AlreadyStartedException);
00214 t->started = true;
00215 }
00216
00217 pthread_attr_t attr;
00218 int error = pthread_attr_init(&attr);
00219 if(error) ARCHON_THROW1(ResourceException, "pthread_attr_init failed");
00220 error = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00221 if(error) ARCHON_THROW1(ResourceException,
00222 "pthread_attr_setdetachstate failed");
00223
00224 activate(t.get());
00225 error = pthread_create
00226 (&t->pthread, 0, reinterpret_cast<void *(*)(void *)>(t->entry), t.get());
00227 pthread_attr_destroy(&attr);
00228
00229 if(!error) return;
00230 deactivate(t.get());
00231 if(error == EAGAIN)
00232 ARCHON_THROW1(ResourceException,
00233 "Not enough system resources to create a new thread");
00234 ARCHON_THROW1(InternalException,
00235 "Could not create a new thread");
00236 }
00237
00238
00239 struct Thread::SelfThread: Thread
00240 {
00241 SelfThread()
00242 {
00243 pthread = pthread_self();
00244 started = true;
00245 }
00246
00247 void main() {}
00248 };
00249
00250 Ref<Thread> Thread::self()
00251 {
00252 int error = pthread_once(&selfKeyOnce, &selfKeyAlloc);
00253 if(error) ARCHON_THROW1(InternalException, "pthread_once failed");
00254 Thread *t = reinterpret_cast<Thread *>(pthread_getspecific(selfKey));
00255 if(t) return t;
00256 t = new SelfThread;
00257 activate(t);
00258 registerSelf(t);
00259 return t;
00260 }
00261
00262 void Thread::terminate()
00263 {
00264 terminateRequest = true;
00265 Mutex::Lock l1(waitConditionMutex);
00266 if(waitCondition)
00267 {
00268 Mutex::Lock l2(*waitCondition->mutex);
00269 waitCondition->notifyAll();
00270 }
00271 l1.release();
00272 }
00273
00274 void Thread::selfResurrect()
00275 {
00276 self()->terminateRequest = false;
00277 }
00278
00279 void Thread::wait() throw(UnexpectedException, NotStartedException)
00280 {
00281 Mutex::Lock l(*this);
00282 if(!started) ARCHON_THROW(NotStartedException);
00283 while(!terminated) termination.wait();
00284 }
00285
00286 void Thread::sleep(const Time &period) throw(UnexpectedException)
00287 {
00288 Time timeout(Time::now());
00289 timeout += period;
00290 Mutex mutex;
00291 Condition cond(mutex);
00292 Mutex::Lock lock(mutex);
00293 while(!cond.timedWait(timeout));
00294 }
00295
00296 void Thread::sleepUntil(const Time &timeout) throw(UnexpectedException)
00297 {
00298 Mutex mutex;
00299 Condition cond(mutex);
00300 Mutex::Lock lock(mutex);
00301 while(!cond.timedWait(timeout));
00302 }
00303
00304 void Thread::acceptTermination() throw(UnexpectedException)
00305 {
00306 if(self()->terminateRequest) ARCHON_THROW(ThreadTerminatedException);
00307 }
00308
00309 void Thread::mainExitWait()
00310 {
00311 Thread::self();
00312
00313 lockMutex(&activeThreadsMutex);
00314 while(activeThreads > 1)
00315 if(pthread_cond_wait(&lastActiveThread, &activeThreadsMutex))
00316 ARCHON_THROW1(InternalException, "pthread_cond_wait failed");
00317 unlockMutex(&activeThreadsMutex);
00318 }
00319
00320
00321 void Semaphore::down() throw(UnexpectedException)
00322 {
00323 Mutex::Lock l(mutex);
00324 while(value == 0) nonZero.wait();
00325 --value;
00326 }
00327
00328 void Semaphore::up()
00329 {
00330 Mutex::Lock l(mutex);
00331 ++value;
00332 l.release();
00333 nonZero.notifyAll();
00334 }
00335 }
00336 }