00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 #ifndef ARCHON_UTILITIES_PIPE_H
00021 #define ARCHON_UTILITIES_PIPE_H
00022 
00023 #include <string.h>
00024 
00025 #include <archon/util/mutex.H>
00026 #include <archon/util/stream.H>
00027 
00028 namespace Archon
00029 {
00030   namespace Utilities
00031   {
00032     namespace Stream
00033     {
00034       using namespace std;
00035 
00048       template<class C>
00049       struct BasicPipe: virtual ReaderBase<C>, virtual WriterBase<C>
00050       {
00058         static void make(Ref<ReaderBase<C> > &r,
00059                          Ref<WriterBase<C> > &w,
00060                          int bufferSize = 4096, bool expand = false)
00061         {
00062           BasicPipe<C> *p = new BasicPipe<C>(bufferSize, expand);
00063           r.reset(p);
00064           w.reset(p);
00065         }
00066 
00071         void raiseReadError(string message)
00072         {
00073           Mutex::Lock l(*this);
00074           readError = message;
00075         }
00076 
00081         void raiseWriteError(string message)
00082         {
00083           Mutex::Lock l(*this);
00084           writeError = message;
00085         }
00086 
00087         int read(C *b, int n)
00088           throw(ReadException, UnexpectedException)
00089         {
00090           if(n == 0) return 0;
00091           Mutex::Lock l(*this);
00092 
00093           for(;;)
00094           {
00095             if(readError.size()) ARCHON_THROW1(ReadException, readError);
00096             if(buffers.size() > 1 || readPos != writePos) break;
00097             if(!WriterBase<C>::getWriteCountNoLock()) return 0;
00098             nonEmpty.wait();
00099           }
00100 
00101           int rel = writePos - readPos;
00102           int capasity = buffers.size() > 1 || rel < 0 ? bufferSize - readPos : rel;
00103           if(capasity < n) n = capasity;
00104 
00105           memcpy(b, buffers.front() + readPos, n*sizeof(C));
00106 
00107           readPos += n;
00108           if(readPos == bufferSize)
00109           {
00110             if(expand)
00111             {
00112               delete[] buffers.front();
00113               buffers.pop_front();
00114             }
00115             readPos = 0;
00116           }
00117 
00118           l.release();
00119           nonFull.notifyAll();
00120           return n;
00121         }
00122 
00123         int write(const C *b, int n)
00124           throw(WriteException, UnexpectedException)
00125         {
00126           if(n == 0) return 0;
00127           Mutex::Lock l(*this);
00128 
00129           int capasity;
00130           if(expand)
00131           {
00132             if(writeError.size()) ARCHON_THROW1(WriteException, writeError);
00133             if(!ReaderBase<C>::getReadCountNoLock())
00134               ARCHON_THROW1(WriteException, "Pipe has no more readers");
00135             capasity = bufferSize - writePos;
00136             if(!writePos) buffers.push_back(new C[bufferSize]);
00137           }
00138           else for(;;)
00139           {
00140             if(writeError.size()) ARCHON_THROW1(WriteException, writeError);
00141             if(!ReaderBase<C>::getReadCountNoLock())
00142               ARCHON_THROW1(WriteException, "Pipe has no more readers");
00143             if(writePos+1 != (readPos ? readPos : bufferSize))
00144             {
00145               if(readPos)
00146               {
00147                 int rel = readPos - writePos;
00148                 capasity = rel > 0 ? rel - 1 : bufferSize - writePos;
00149               }
00150               else capasity = bufferSize - writePos - 1;
00151               if(!buffers.size()) buffers.push_back(new C[bufferSize]);
00152               break;
00153             }
00154             nonFull.wait();
00155           }
00156 
00157           if(capasity < n) n = capasity;
00158           
00159           memcpy(buffers.back() + writePos, b, n*sizeof(C));
00160 
00161           writePos += n;
00162           if(writePos == bufferSize) writePos = 0;
00163 
00164           l.release();
00165           nonEmpty.notifyAll();
00166           return n;
00167         }
00168 
00169       protected:
00170         BasicPipe(int bufferSize, bool expand):
00171           bufferSize(bufferSize), expand(expand),
00172           readPos(0), writePos(0),
00173           nonEmpty(*this), nonFull(*this) {}
00174 
00175         virtual ~BasicPipe()
00176         {
00177           list<char *>::iterator i = buffers.begin();
00178           while(i != buffers.end()) delete[] *i++;
00179         }
00180 
00181         
00182         
00183         void refDispose(Mutex::Lock &l)
00184         {
00185           if(ReaderBase<C>::getReadCountNoLock()) nonEmpty.notifyAll();
00186           else if(WriterBase<C>::getWriteCountNoLock()) nonFull.notifyAll();
00187           else
00188           {
00189             l.release();
00190             delete this;
00191           }
00192         }
00193 
00194       private:
00199         int bufferSize;
00200 
00207         bool expand;
00208 
00225         list<C *> buffers;
00226 
00239         int readPos;
00240 
00254         int writePos;
00255 
00256         string readError;
00257         string writeError;
00258 
00259         Condition nonEmpty, nonFull;
00260       };
00261 
00262       typedef BasicPipe<char> Pipe;
00263     }
00264   }
00265 }
00266 
00267 #endif // ARCHON_UTILITIES_PIPE_H