wibble  0.1.28
pipe.h
Go to the documentation of this file.
1 // -*- C++ -*- (c) 2008 Petr Rockai <me@mornfall.net>
2 
3 #include <wibble/sys/macros.h>
4 
5 #ifdef POSIX
6 #include <fcntl.h>
7 #include <sys/select.h>
8 
9 #include <deque>
10 #include <cerrno>
11 
12 #include <wibble/exception.h>
13 #include <wibble/sys/thread.h>
14 #include <wibble/sys/mutex.h>
15 
16 #ifndef WIBBLE_SYS_PIPE_H
17 #define WIBBLE_SYS_PIPE_H
18 
19 namespace wibble {
20 namespace sys {
21 
22 namespace wexcept = wibble::exception;
23 
24 struct Pipe {
25 
26  struct Writer : wibble::sys::Thread {
27  int fd;
28  bool close;
29  std::string data;
30  bool running;
31  bool closed;
32  wibble::sys::Mutex mutex;
33 
34  Writer() : fd( -1 ), close( false ), running( false ) {}
35 
36  void *main() {
37  do {
38  int wrote = 0;
39 
40  {
41  wibble::sys::MutexLock __l( mutex );
42  wrote = ::write( fd, data.c_str(), data.length() );
43  if ( wrote > 0 )
44  data.erase( data.begin(), data.begin() + wrote );
45  }
46 
47  if ( wrote == -1 ) {
48  if ( errno == EAGAIN || errno == EWOULDBLOCK )
49  sched_yield();
50  else
51  throw wexcept::System( "writing to pipe" );
52  }
53  } while ( !done() );
54 
55  if ( close )
56  ::close( fd );
57 
58  return 0;
59  }
60 
61  bool done() {
62  wibble::sys::MutexLock __l( mutex );
63  if ( data.empty() )
64  running = false;
65  return !running;
66  }
67 
68  void run( int _fd, std::string what ) {
69  wibble::sys::MutexLock __l( mutex );
70 
71  if ( running )
72  assert_eq( _fd, fd );
73  fd = _fd;
74  assert_neq( fd, -1 );
75 
76  data += what;
77  if ( running )
78  return;
79  running = true;
80  start();
81  }
82  };
83 
84  typedef std::deque< char > Buffer;
85  Buffer buffer;
86  int fd;
87  bool _eof;
88  Writer writer;
89 
90  Pipe( int p ) : fd( p ), _eof( false )
91  {
92  if ( p == -1 )
93  return;
94  if ( fcntl( fd, F_SETFL, O_NONBLOCK ) == -1 )
95  throw wexcept::System( "fcntl on a pipe" );
96  }
97  Pipe() : fd( -1 ), _eof( false ) {}
98 
99  /* Writes data to the pipe, asynchronously. */
100  void write( std::string what ) {
101  writer.run( fd, what );
102  }
103 
104  void close() {
105  writer.close = true;
106  writer.run( fd, "" );
107  }
108 
109  bool valid() {
110  return fd != -1;
111  }
112 
113  bool active() {
114  return valid() && !eof();
115  }
116 
117  bool eof() {
118  return _eof;
119  }
120 
121  int readMore() {
122  assert( valid() );
123  char _buffer[1024];
124  int r = ::read( fd, _buffer, 1023 );
125  if ( r == -1 && errno != EAGAIN && errno != EWOULDBLOCK )
126  throw wexcept::System( "reading from pipe" );
127  else if ( r == -1 )
128  return 0;
129  if ( r == 0 )
130  _eof = true;
131  else
132  std::copy( _buffer, _buffer + r, std::back_inserter( buffer ) );
133  return r;
134  }
135 
136  std::string nextChunk() {
137  std::string line( buffer.begin(), buffer.end() );
138  buffer.clear();
139  return line;
140  }
141 
142  std::string nextLine() {
143  assert( valid() );
144  Buffer::iterator nl =
145  std::find( buffer.begin(), buffer.end(), '\n' );
146  while ( nl == buffer.end() ) {
147  if ( !readMore() )
148  return ""; // would block, so give up
149  nl = std::find( buffer.begin(), buffer.end(), '\n' );
150  }
151  std::string line( buffer.begin(), nl );
152 
153  if ( nl != buffer.end() )
154  ++ nl;
155  buffer.erase( buffer.begin(), nl );
156 
157  return line;
158  }
159 
160  /* Only returns on eof() or when data is buffered. */
161  void wait() {
162  assert( valid() );
163  fd_set fds;
164  FD_ZERO( &fds );
165  while ( buffer.empty() && !eof() ) {
166  if ( readMore() )
167  return;
168  if ( eof() )
169  return;
170  FD_SET( fd, &fds );
171  select( fd + 1, &fds, 0, 0, 0 );
172  }
173  }
174  std::string nextLineBlocking() {
175  assert( valid() );
176  std::string l;
177  while ( !eof() ) {
178  l = nextLine();
179  if ( !l.empty() )
180  return l;
181  if ( eof() )
182  return std::string( buffer.begin(), buffer.end() );
183  wait();
184  }
185  return l;
186  }
187 
188 };
189 
190 }
191 }
192 #endif
193 #endif