00001 #include <unistd.h>
00002 #include <string>
00003 #include <iostream>
00004
00005 #include <archon/util/exception.H>
00006 #include <archon/util/stream.H>
00007 #include <archon/util/pipe.H>
00008
00009 using namespace std;
00010 using namespace Archon::Utilities;
00011
00012 struct Producer: Thread
00013 {
00014 Ref<Stream::Writer> w;
00015 const char *s;
00016
00017 Producer(Ref<Stream::Writer> w, const char *s):w(w), s(s) {}
00018
00019 virtual void main()
00020 {
00021 char buffer[64];
00022
00023 for(int i=0;i<20000;++i)
00024 {
00025 sprintf(buffer, "%s(%d)\n", s, i);
00026 w->writeAll(buffer, strlen(buffer));
00027 }
00028 w.reset();
00029 }
00030 };
00031
00032 struct Consumer: Thread
00033 {
00034 Ref<Stream::Reader> r;
00035
00036 Consumer(Ref<Stream::Reader> r):r(r) {}
00037
00038 virtual void main()
00039 {
00040 for(;;)
00041 {
00042 char b[1025];
00043 int n = r->read(b, 1025);
00044 if(n == 0) break;
00045 write(STDOUT_FILENO, b, n);
00046 }
00047 r.reset();
00048 }
00049 };
00050
00051 int main() throw()
00052 {
00053 set_unexpected(Exception::terminal<exceptionCatchInfo>);
00054 set_terminate (Exception::terminal<exceptionCatchInfo>);
00055
00056 Ref<Stream::Reader> r;
00057 Ref<Stream::Writer> w;
00058 Stream::Pipe::make(r, w, 253, true);
00059
00060 RefAny a = r;
00061
00062 cerr << r->getUseCount() << "\n";
00063 cerr << w->getUseCount() << "\n";
00064
00065 Thread::sleep(Time(2.0));
00066
00067 a.reset();
00068
00069 Ref<Producer> producer1 = new Producer(w, "Alpha");
00070 Ref<Producer> producer2 = new Producer(w, "Beta");
00071 Ref<Producer> producer3 = new Producer(w, "Gamma");
00072 Ref<Consumer> consumer = new Consumer(r);
00073 r.reset();
00074 w.reset();
00075 Thread::start(producer1);
00076 Thread::start(producer2);
00077 Thread::start(producer3);
00078 Thread::start(consumer);
00079
00080 Thread::mainExitWait();
00081
00082 return 0;
00083 }