00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef ARCHON_UTILITIES_PIPE_H
00021 #define ARCHON_UTILITIES_PIPE_H
00022
00023 #include <string.h>
00024
00025 #include <archon/util/mutex.H>
00026 #include <archon/util/stream.H>
00027
00028 namespace Archon
00029 {
00030 namespace Utilities
00031 {
00032 namespace Stream
00033 {
00034 using namespace std;
00035
00048 template<class C>
00049 struct BasicPipe: virtual ReaderBase<C>, virtual WriterBase<C>
00050 {
00058 static void make(Ref<ReaderBase<C> > &r,
00059 Ref<WriterBase<C> > &w,
00060 int bufferSize = 4096, bool expand = false)
00061 {
00062 BasicPipe<C> *p = new BasicPipe<C>(bufferSize, expand);
00063 r.reset(p);
00064 w.reset(p);
00065 }
00066
00071 void raiseReadError(string message)
00072 {
00073 Mutex::Lock l(*this);
00074 readError = message;
00075 }
00076
00081 void raiseWriteError(string message)
00082 {
00083 Mutex::Lock l(*this);
00084 writeError = message;
00085 }
00086
00087 int read(C *b, int n)
00088 throw(ReadException, UnexpectedException)
00089 {
00090 if(n == 0) return 0;
00091 Mutex::Lock l(*this);
00092
00093 for(;;)
00094 {
00095 if(readError.size()) ARCHON_THROW1(ReadException, readError);
00096 if(buffers.size() > 1 || readPos != writePos) break;
00097 if(!WriterBase<C>::getWriteCountNoLock()) return 0;
00098 nonEmpty.wait();
00099 }
00100
00101 int rel = writePos - readPos;
00102 int capasity = buffers.size() > 1 || rel < 0 ? bufferSize - readPos : rel;
00103 if(capasity < n) n = capasity;
00104
00105 memcpy(b, buffers.front() + readPos, n*sizeof(C));
00106
00107 readPos += n;
00108 if(readPos == bufferSize)
00109 {
00110 if(expand)
00111 {
00112 delete[] buffers.front();
00113 buffers.pop_front();
00114 }
00115 readPos = 0;
00116 }
00117
00118 l.release();
00119 nonFull.notifyAll();
00120 return n;
00121 }
00122
00123 int write(const C *b, int n)
00124 throw(WriteException, UnexpectedException)
00125 {
00126 if(n == 0) return 0;
00127 Mutex::Lock l(*this);
00128
00129 int capasity;
00130 if(expand)
00131 {
00132 if(writeError.size()) ARCHON_THROW1(WriteException, writeError);
00133 if(!ReaderBase<C>::getReadCountNoLock())
00134 ARCHON_THROW1(WriteException, "Pipe has no more readers");
00135 capasity = bufferSize - writePos;
00136 if(!writePos) buffers.push_back(new C[bufferSize]);
00137 }
00138 else for(;;)
00139 {
00140 if(writeError.size()) ARCHON_THROW1(WriteException, writeError);
00141 if(!ReaderBase<C>::getReadCountNoLock())
00142 ARCHON_THROW1(WriteException, "Pipe has no more readers");
00143 if(writePos+1 != (readPos ? readPos : bufferSize))
00144 {
00145 if(readPos)
00146 {
00147 int rel = readPos - writePos;
00148 capasity = rel > 0 ? rel - 1 : bufferSize - writePos;
00149 }
00150 else capasity = bufferSize - writePos - 1;
00151 if(!buffers.size()) buffers.push_back(new C[bufferSize]);
00152 break;
00153 }
00154 nonFull.wait();
00155 }
00156
00157 if(capasity < n) n = capasity;
00158
00159 memcpy(buffers.back() + writePos, b, n*sizeof(C));
00160
00161 writePos += n;
00162 if(writePos == bufferSize) writePos = 0;
00163
00164 l.release();
00165 nonEmpty.notifyAll();
00166 return n;
00167 }
00168
00169 protected:
00170 BasicPipe(int bufferSize, bool expand):
00171 bufferSize(bufferSize), expand(expand),
00172 readPos(0), writePos(0),
00173 nonEmpty(*this), nonFull(*this) {}
00174
00175 virtual ~BasicPipe()
00176 {
00177 list<char *>::iterator i = buffers.begin();
00178 while(i != buffers.end()) delete[] *i++;
00179 }
00180
00181
00182
00183 void refDispose(Mutex::Lock &l)
00184 {
00185 if(ReaderBase<C>::getReadCountNoLock()) nonEmpty.notifyAll();
00186 else if(WriterBase<C>::getWriteCountNoLock()) nonFull.notifyAll();
00187 else
00188 {
00189 l.release();
00190 delete this;
00191 }
00192 }
00193
00194 private:
00199 int bufferSize;
00200
00207 bool expand;
00208
00225 list<C *> buffers;
00226
00239 int readPos;
00240
00254 int writePos;
00255
00256 string readError;
00257 string writeError;
00258
00259 Condition nonEmpty, nonFull;
00260 };
00261
00262 typedef BasicPipe<char> Pipe;
00263 }
00264 }
00265 }
00266
00267 #endif // ARCHON_UTILITIES_PIPE_H