00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <errno.h>
00021 #include <time.h>
00022 #include <stdlib.h>
00023 #include <signal.h>
00024 #include <iostream>
00025
00026 #include <archon/util/condition.H>
00027 #include <archon/util/thread.H>
00028
00029 namespace Archon
00030 {
00031 namespace Utilities
00032 {
00033 using namespace std;
00034
00035 Condition::Condition(Mutex &m): mutex(&m)
00036 {
00037 const int e = pthread_cond_init(&cond, 0);
00038 if(e != 0)
00039 ARCHON_THROW1(ResourceException,
00040 "Could not initialize condition");
00041 }
00042
00043 Condition::~Condition()
00044 {{
00045 const int e = pthread_cond_destroy(&cond);
00046 if(e != 0)
00047 {
00048 if(e == EBUSY)
00049 ARCHON_THROW1(StateException,
00050 "Attempt to destroy condition on which "
00051 "threads are waiting");
00052 ARCHON_THROW1(InternalException,
00053 "Attempt to destroy condition failed");
00054 }
00055 }}
00056
00057 void Condition::wait() throw(UnexpectedException)
00058 {
00059 Ref<Thread> self = Thread::self();
00060 {
00061 Mutex::Lock l(self->waitConditionMutex);
00062 self->waitCondition = this;
00063 }
00064
00065 int error = 0;
00066 if(!self->terminateRequest)
00067 error = pthread_cond_wait(&cond, &mutex->mutex);
00068
00069 {
00070 Mutex::Lock l(self->waitConditionMutex);
00071 self->waitCondition = 0;
00072 }
00073
00074 if(error) ARCHON_THROW1(InternalException,
00075 "pthread_cond_wait failed");
00076 if(self->terminateRequest) ARCHON_THROW(ThreadTerminatedException);
00077 }
00078
00079 bool Condition::timedWait(Time timeout)
00080 throw(UnexpectedException)
00081 {
00082 Ref<Thread> self = Thread::self();
00083 {
00084 Mutex::Lock l(self->waitConditionMutex);
00085 self->waitCondition = this;
00086 }
00087
00088 int error = 0;
00089 if(!self->terminateRequest)
00090 error = pthread_cond_timedwait(&cond, &mutex->mutex, &timeout.ts);
00091
00092 {
00093 Mutex::Lock l(self->waitConditionMutex);
00094 self->waitCondition = 0;
00095 }
00096
00097 bool timedout = false;
00098 if(error)
00099 {
00100 if(error == ETIMEDOUT) timedout = true;
00101 else if(error != EINTR)
00102 ARCHON_THROW1(InternalException,
00103 "pthread_cond_timedwait failed");
00104 }
00105 if(self->terminateRequest) ARCHON_THROW(ThreadTerminatedException);
00106 return timedout;
00107 }
00108
00109
00110 namespace
00111 {
00112 const int signo = SIGUSR1;
00113 bool setupSignalDone = false;
00114 void sigDummy(int) {}
00115 void setupSignal()
00116 {
00117 struct sigaction sa, saOriginal;
00118 sigaction(signo, 0, &sa);
00119 saOriginal = sa;
00120 sa.sa_handler = &sigDummy;
00121 sigemptyset(&sa.sa_mask);
00122 sa.sa_flags &= ~(SA_RESTART|SA_RESETHAND);
00123 sa.sa_flags |= SA_NODEFER;
00124 sigaction(signo, &sa, 0);
00125 }
00126 }
00127
00128
00138 int Condition::select(int n, fd_set *readfds, fd_set *writefds,
00139 fd_set *exceptfds, Time timeout)
00140 throw(UnexpectedException)
00141 {
00142 Time now = Time::now();
00143 if(timeout < now) timeout = Time();
00144 else timeout -= now;
00145
00146 Ref<Thread> self = Thread::self();
00147 {
00148 Mutex::Lock l(self->waitConditionMutex);
00149 self->waitCondition = this;
00150 }
00151
00152 int m = 0;
00153 int e;
00154 if(!self->terminateRequest)
00155 {
00156
00157 sigset_t oldSigset;
00158 sigset_t blockSigset;
00159 sigemptyset(&blockSigset);
00160 sigaddset(&blockSigset, signo);
00161 if(pthread_sigmask(SIG_BLOCK, &blockSigset, &oldSigset)!=0)
00162 ARCHON_THROW1(InternalException,
00163 "pthread_sigmask failed");
00164
00165 if(!setupSignalDone)
00166 {
00167 setupSignalDone = true;
00168 setupSignal();
00169 }
00170
00171 list<pthread_t>::iterator sigThread;
00172 {
00173 Mutex::Lock l(sigThreadsMutex);
00174 sigThreads.push_back(pthread_self());
00175 sigThread = --sigThreads.end();
00176 }
00177
00178 mutex->unlock();
00179
00180 sigset_t unblockSigset = oldSigset;
00181 sigdelset(&blockSigset, signo);
00182 m = pselect(n, readfds, writefds, exceptfds,
00183 &timeout.ts, &unblockSigset);
00184 e = errno;
00185
00186 mutex->lock();
00187
00188 {
00189 Mutex::Lock l(sigThreadsMutex);
00190 sigThreads.erase(sigThread);
00191 }
00192
00193
00194 if(pthread_sigmask(SIG_SETMASK, &oldSigset, 0)!=0)
00195 ARCHON_THROW1(InternalException,
00196 "pthread_sigmask failed");
00197 }
00198
00199 {
00200 Mutex::Lock l(self->waitConditionMutex);
00201 self->waitCondition = 0;
00202 }
00203
00204 if(m < 0 && e != EINTR)
00205 ARCHON_THROW1(InternalException, "pselect failed: " +
00206 string(strerror(e)));
00207 if(self->terminateRequest)
00208 ARCHON_THROW(ThreadTerminatedException);
00209 return m;
00210 }
00211
00212 void Condition::notifySigThreads()
00213 {
00214 list<pthread_t>::iterator i = sigThreads.begin();
00215 while(i != sigThreads.end())
00216 {
00217 if(pthread_kill(*i, signo)!=0)
00218 ARCHON_THROW1(InternalException, "pthread_kill failed");
00219 ++i;
00220 }
00221 }
00222 }
00223 }