job.C

00001 /*
00002  * This file is part of the "Archon" framework.
00003  * (http://files3d.sourceforge.net)
00004  *
00005  * Copyright © 2002 by Kristian Spangsege and Brian Kristiansen.
00006  *
00007  * Permission to use, copy, modify, and distribute this software and
00008  * its documentation under the terms of the GNU General Public License is
00009  * hereby granted. No representations are made about the suitability of
00010  * this software for any purpose. It is provided "as is" without express
00011  * or implied warranty. See the GNU General Public License
00012  * (http://www.gnu.org/copyleft/gpl.html) for more details.
00013  *
00014  * The characters in this file are ISO8859-1 encoded.
00015  *
00016  * The documentation in this file is in "Doxygen" style
00017  * (http://www.doxygen.org).
00018  */
00019 
00020 #include <iostream>
00021 #include <algorithm>
00022 #include <archon/util/logger.H>
00023 #include <archon/util/text.H>
00024 
00025 #include <archon/util/job.H>
00026 
00027 namespace Archon
00028 {
00029   namespace Utilities
00030   {
00031     using namespace std;
00032 
00033     static inline void log(string m)
00034     {
00035       Logger::get()->log(m);
00036     }
00037 
00038     void Job::Queue::worker()
00039     {
00040       unsigned long id;
00041 
00042       {
00043         Mutex::Lock l(mutex);
00044         id = nextThreadId++;
00045       }
00046 
00047       //log((string)"Job::Queue::worker(" + Text::toString(id) + "): started");
00048     
00049       for(;;)
00050       {
00051         Mutex::Lock l(mutex);
00052 
00053         if(pendingJobs.empty())
00054         {
00055           --activeThreads;
00056           idleThreads.push_back(id);
00057 
00058           if(activeThreads == 0) allDone.notifyAll();
00059 
00060           Time wakeup = Time::now();
00061           wakeup += maxIdleTime;
00062 
00063           for(;;)
00064           {
00065             //log((string)"Job::Queue::worker(" + Text::toString(id) + "): going to sleep");
00066 
00067             const bool timedOut = newJob.timedWait(wakeup);
00068 
00069             if(shutdown)
00070             {
00071               /*
00072                * Note that the contents of 'idleThreads' is undefined after
00073                * 'shutdown' is set to true. The only thing that can be relied on
00074                * is that the size of the list equals the number of threads that
00075                * still have not ended their duty.
00076                */
00077               idleThreads.pop_back();
00078               //log((string)"Job::Queue::worker(" + Text::toString(id) + "): terminated");
00079               threadQuit.notifyAll();
00080               return;
00081             }
00082 
00083             if(timedOut)
00084             {
00085               idleThreads.erase(find(idleThreads.begin(), idleThreads.end(), id));
00086               if(!pendingJobs.empty()) break;
00087               //log((string)"Job::Queue::worker(" + Text::toString(id) + "): timed out");
00088               threadQuit.notifyAll();
00089               return;
00090             }
00091 
00092             if(pendingJobs.empty() || idleThreads.back() != id) continue;
00093             idleThreads.pop_back();
00094             break;
00095           }
00096 
00097           ++activeThreads;
00098         }
00099 
00100         Ref<Job> job = pendingJobs.front();
00101         pendingJobs.pop_front();
00102 
00103         job->thread = Thread::self().get();
00104 
00105         l.release();
00106 
00107         //log((string)"Job::Queue::worker(" + Text::toString(id) + "): running new job");
00108         job->main();
00109 
00110         {
00111           Mutex::Lock l(mutex);
00112           job->thread = 0;
00113         }
00114 
00115         Thread::selfResurrect();
00116       }
00117     }
00118 
00119     Job::Queue::Queue(int maxThreads, Time maxIdleTime):
00120       maxThreads(maxThreads), maxIdleTime(maxIdleTime),
00121       newJob(mutex), threadQuit(mutex), allDone(mutex),
00122       nextThreadId(0), activeThreads(0), shutdown(false)
00123     {
00124     }
00125 
00126     Job::Queue::~Queue()
00127     {{ // The extra scope is needed to work around gcc3.2 bug #8287
00128       Mutex::Lock l(mutex);
00129 
00130       // Wait for all thread to become idle
00131       while(!pendingJobs.empty() || activeThreads) allDone.wait();
00132 
00133       // Shutdown all threads
00134       shutdown = true;
00135       newJob.notifyAll();
00136 
00137       /*
00138        * Note that the contents of 'idleThreads' is undefined after
00139        * 'shutdown' is set to true. The only thing that can be relied on
00140        * is that the size of the list equals the number of threads that
00141        * still have not ended their duty.
00142        */
00143       while(!idleThreads.empty()) threadQuit.wait();
00144     }}
00145 
00146     void Job::Queue::workerEntry(Job::Queue *q)
00147     {
00148       q->worker();
00149     }
00150 
00151     void Job::Queue::add(Ref<Job> j)
00152     {
00153       Mutex::Lock l(mutex);
00154       if(shutdown) ARCHON_THROW1(StateException, "Shutting down");
00155       pendingJobs.push_back(j);
00156       if(idleThreads.empty() && activeThreads < maxThreads)
00157       {
00158         ++activeThreads;
00159         Thread::run(workerEntry, this);
00160       }
00161       newJob.notifyAll();
00162     }
00163 
00167     void Job::Queue::cancel(Ref<Job> j)
00168     {
00169       Mutex::Lock l(mutex);
00170       if(j->thread)
00171       {
00172         Ref<Thread> t(j->thread, RefSafeIncTag());
00173         l.release();
00174         if(!t) return;
00175         t->terminate();
00176       }
00177       else pendingJobs.remove(j);
00178     }
00179 
00180     void Job::Queue::wait()
00181     {
00182       Mutex::Lock l(mutex);
00183       while(!pendingJobs.empty() || activeThreads > 0) allDone.wait();
00184     }
00185   }
00186 }

Generated on Sun Jul 30 22:55:44 2006 for Archon by  doxygen 1.4.4