WvStreams
wvstream.h
1 /* -*- Mode: C++ -*-
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * Provides basic streaming I/O support.
6  */
7 #ifndef __WVSTREAM_H
8 #define __WVSTREAM_H
9 
10 #include "iwvstream.h"
11 #include "wvtimeutils.h"
12 #include "wvstreamsdebugger.h"
13 #include <errno.h>
14 #include <limits.h>
15 #include "wvattrs.h"
16 
24 class WvStream: public IWvStream
25 {
26  IMPLEMENT_IOBJECT(WvStream);
27 
28  WvString my_wsname;
29  WSID my_wsid;
30  WvAttrs attrs;
31 public:
37 
43 
46 
49 
55 
57  bool stop_read, stop_write, closed;
58 
60  WvStream();
61  virtual ~WvStream();
62 
70  virtual void close();
71 
73  virtual void seterr(int _errnum);
74  void seterr(WvStringParm specialerr)
75  { WvErrorBase::seterr(specialerr); }
76  void seterr(WVSTRING_FORMAT_DECL)
77  { seterr(WvString(WVSTRING_FORMAT_CALL)); }
78 
80  virtual bool isok() const;
81 
83  virtual size_t read(void *buf, size_t count);
84 
94  virtual size_t read(WvBuf &outbuf, size_t count);
95 
101  virtual void unread(WvBuf &outbuf, size_t count);
102 
109  virtual size_t write(const void *buf, size_t count);
110 
118  virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
119 
129  void outbuf_limit(size_t size)
130  { max_outbuf_size = size; }
131 
132  virtual void noread();
133  virtual void nowrite();
134  virtual void maybe_autoclose();
135 
136  virtual bool isreadable();
137  virtual bool iswritable();
138 
146  virtual size_t uread(void *buf, size_t count)
147  { return 0; /* basic WvStream doesn't actually do anything! */ }
148 
156  virtual size_t uwrite(const void *buf, size_t count)
157  { return count; /* basic WvStream doesn't actually do anything! */ }
158 
175  char *getline(time_t wait_msec = 0,
176  char separator = '\n', int readahead = 1024)
177  {
178  return blocking_getline(wait_msec, separator, readahead);
179  }
180 
182  char *getline(int wait_msec,
183  char separator = '\n', int readahead = 1024)
184  {
185  return getline(time_t(wait_msec), separator, readahead);
186  }
187 
189  char *getline(double wait_msec,
190  char separator = '\n', int readahead = 1024)
191  {
192  return getline(time_t(wait_msec), separator, readahead);
193  }
194 
195 private:
200  char *getline(char, int i = 0);
201  char *getline(bool, int i = 0);
202 public:
203 
215  char *blocking_getline(time_t wait_msec, int separator = '\n',
216  int readahead = 1024);
217 
222  char *continue_getline(time_t wait_msec, int separator = '\n',
223  int readahead = 1024);
224 
232  void queuemin(size_t count)
233  { queue_min = count; }
234 
239  void drain();
240 
246  void delay_output(bool is_delayed)
247  {
248  outbuf_delayed_flush = is_delayed;
249  want_to_flush = !is_delayed;
250  }
251 
258  void auto_flush(bool is_automatic)
259  { is_auto_flush = is_automatic; }
260 
267  virtual bool flush(time_t msec_timeout);
268 
269  virtual bool should_flush();
270 
277  void flush_then_close(int msec_timeout);
278 
300  virtual void pre_select(SelectInfo &si);
301 
306  void pre_select(SelectInfo &si, const SelectRequest &r)
307  {
308  SelectRequest oldwant = si.wants;
309  si.wants = r;
310  pre_select(si);
311  si.wants = oldwant;
312  }
313 
319  { pre_select(si, r); }
320 
333  virtual bool post_select(SelectInfo &si);
334 
340  { return post_select(si, r); }
341 
347  {
348  SelectRequest oldwant = si.wants;
349  si.wants = r;
350  bool val = post_select(si);
351  si.wants = oldwant;
352  return val;
353  }
354 
376  bool select(time_t msec_timeout)
377  { return _select(msec_timeout, false, false, false, true); }
378 
391  void runonce(time_t msec_timeout = -1)
392  { if (select(msec_timeout)) callback(); }
393 
415  bool select(time_t msec_timeout,
416  bool readable, bool writable, bool isex = false)
417  { return _select(msec_timeout, readable, writable, isex, false); }
418 
425 
434  void force_select(bool readable, bool writable, bool isexception = false);
435 
440  void undo_force_select(bool readable, bool writable,
441  bool isexception = false);
442 
460  bool continue_select(time_t msec_timeout);
461 
468 
473  virtual const WvAddr *src() const;
474 
479  void setcallback(IWvStreamCallback _callfunc);
480 
482  IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
483 
485  IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
486 
489  IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
490 
492  IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
493 
499  void autoforward(WvStream &s);
500 
502  void noautoforward();
503  static void autoforward_callback(WvStream &input, WvStream &output);
504 
508  void *_callwrap(void *);
509 
513  void _callback();
514 
519  virtual void callback();
520 
525  void alarm(time_t msec_timeout);
526 
532  time_t alarm_remaining();
533 
538  size_t write(WvStringParm s)
539  { return write(s.cstr(), s.len()); }
540  size_t print(WvStringParm s)
541  { return write(s); }
542  size_t operator() (WvStringParm s)
543  { return write(s); }
544 
546  size_t print(WVSTRING_FORMAT_DECL)
547  { return write(WvString(WVSTRING_FORMAT_CALL)); }
548  size_t operator() (WVSTRING_FORMAT_DECL)
549  { return write(WvString(WVSTRING_FORMAT_CALL)); }
550 
551  const char *wsname() const
552  { return my_wsname; }
553  void set_wsname(WvStringParm wsname)
554  { my_wsname = wsname; }
555  void set_wsname(WVSTRING_FORMAT_DECL)
556  { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
557 
558  const char *wstype() const { return "WvStream"; }
559 
560  WSID wsid() const { return my_wsid; }
561  static IWvStream *find_by_wsid(WSID wsid);
562 
563  virtual WvString getattr(WvStringParm name) const
564  { return attrs.get(name); }
565 
566  // ridiculous hackery for now so that the wvstream unit test can poke
567  // around in the insides of WvStream. Eventually, inbuf will go away
568  // from the base WvStream class, so nothing like this will be needed.
569 #ifdef __WVSTREAM_UNIT_TEST
570 public:
571  size_t outbuf_used()
572  { return outbuf.used(); }
573  size_t inbuf_used()
574  { return inbuf.used(); }
575  void inbuf_putstr(WvStringParm t)
576  { inbuf.putstr(t); }
577 #endif
578 
579 protected:
580  void setattr(WvStringParm name, WvStringParm value)
581  { attrs.set(name, value); }
582  // builds the SelectInfo data structure (runs pre_select)
583  // returns true if there are callbacks to be dispatched
584  //
585  // all of the fields are filled in with new values
586  // si.msec_timeout contains the time until the next alarm expires
587  void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
588  bool readable, bool writable, bool isexcept,
589  bool forceable);
590 
591  // runs the actual select() function over the given
592  // SelectInfo data structure, returns the number of descriptors
593  // in the set, and sets the error code if a problem occurs
594  int _do_select(SelectInfo &si);
595 
596  // processes the SelectInfo data structure (runs post_select)
597  // returns true if there are callbacks to be dispatched
598  bool _process_selectinfo(SelectInfo &si, bool forceable);
599 
600  // tries to empty the output buffer if the stream is writable
601  // not quite the same as flush() since it merely empties the output
602  // buffer asynchronously whereas flush() might have other semantics
603  // also handles autoclose (eg. after flush)
604  bool flush_outbuf(time_t msec_timeout);
605 
606  // called once flush() has emptied outbuf to ensure that any other
607  // internal stream buffers actually do get flushed before it returns
608  virtual bool flush_internal(time_t msec_timeout);
609 
610  // the real implementations for these are actually in WvFDStream, which
611  // is where they belong. By IWvStream needs them to exist for now, so
612  // it's a hack. In standard WvStream they return -1.
613  virtual int getrfd() const;
614  virtual int getwfd() const;
615 
616  // FIXME: this one is so bad, I'm not touching it. Quick hack to
617  // make it work anyway.
618  friend class WvHTTPClientProxyStream;
619 
620  WvDynBuf inbuf, outbuf;
621 
622  IWvStreamCallback callfunc;
623  wv::function<void*(void*)> call_ctx;
624 
625  IWvStreamCallback readcb, writecb, exceptcb, closecb;
626 
627  size_t max_outbuf_size;
628  bool outbuf_delayed_flush;
629  bool is_auto_flush;
630 
631  // Used to guard against excessive flushing when using delay_flush
632  bool want_to_flush;
633 
634  // Used to ensure we don't flush recursively.
635  bool is_flushing;
636 
637  size_t queue_min; // minimum bytes to read()
638  time_t autoclose_time; // close eventually, even if output is queued
639  WvTime alarm_time; // select() returns true at this time
640  WvTime last_alarm_check; // last time we checked the alarm_remaining
641 
652  virtual void execute()
653  { }
654 
655  // every call to select() selects on the globalstream.
656  static WvStream *globalstream;
657 
658  static void debugger_streams_display_header(WvStringParm cmd,
659  WvStreamsDebugger::ResultCallback result_cb);
660  static void debugger_streams_display_one_stream(WvStream *s,
661  WvStringParm cmd,
662  WvStreamsDebugger::ResultCallback result_cb);
663  static void debugger_streams_maybe_display_one_stream(WvStream *s,
664  WvStringParm cmd,
665  const WvStringList &args,
666  WvStreamsDebugger::ResultCallback result_cb);
667 
668 private:
670  bool _select(time_t msec_timeout,
671  bool readable, bool writable, bool isexcept,
672  bool forceable);
673 
674  void legacy_callback();
675 
677  WvStream(const WvStream &s);
678  WvStream& operator= (const WvStream &s);
679  static void add_debugger_commands();
680 
681  static WvString debugger_streams_run_cb(WvStringParm cmd,
682  WvStringList &args,
683  WvStreamsDebugger::ResultCallback result_cb, void *);
684  static WvString debugger_close_run_cb(WvStringParm cmd,
685  WvStringList &args,
686  WvStreamsDebugger::ResultCallback result_cb, void *);
687 };
688 
695 extern WvStream *wvcon; // tied stdin and stdout stream
696 extern WvStream *wvin; // stdin stream
697 extern WvStream *wvout; // stdout stream
698 extern WvStream *wverr; // stderr stream
699 
700 #endif // __WVSTREAM_H
WvStream::delay_output
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition: wvstream.h:246
WvStream::write_requires_readable
WvStream * write_requires_readable
If this is set, select() doesn't return true for write unless the given stream also returns true for ...
Definition: wvstream.h:42
WvStream::maybe_autoclose
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
Definition: wvstream.cc:583
WvStream::pre_select
void pre_select(SelectInfo &si, const SelectRequest &r)
A more convenient version of pre_select() usable for overriding the 'want' value temporarily.
Definition: wvstream.h:306
WvStream::alarm_remaining
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition: wvstream.cc:1057
WvStream::runonce
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition: wvstream.h:391
WvStream::write
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
WvStream::blocking_getline
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
Definition: wvstream.cc:602
WvStream::auto_flush
void auto_flush(bool is_automatic)
if true, force write() to call flush() each time, the default behavour.
Definition: wvstream.h:258
WvStream::flush_then_close
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
Definition: wvstream.cc:827
WvStream::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvstream.h:652
WvStream::read_requires_writable
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
Definition: wvstream.h:36
WvStream::uread
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
Definition: wvstream.h:146
WvStream::flush
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
Definition: wvstream.cc:707
WvStream::select
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
Definition: wvstream.h:376
WvBufBase< unsigned char >::putstr
void putstr(WvStringParm str)
Copies a WvString into the buffer, excluding the null-terminator.
Definition: wvbuffer.cc:11
WvStream::alarm_was_ticking
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition: wvstream.h:54
WvTime
Based on (and interchangeable with) struct timeval.
Definition: wvtimeutils.h:17
IWvStream
Definition: iwvstream.h:24
WvStream::callback
virtual void callback()
if the stream has a callback function defined, call it now.
Definition: wvstream.cc:401
WvStream::queuemin
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
Definition: wvstream.h:232
WvErrorBase::seterr
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
Definition: wverror.cc:144
WvStream::setexceptcallback
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
Definition: wvstream.cc:1165
WvStream::setreadcallback
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
Definition: wvstream.cc:1145
WvStream::iswritable
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Definition: wvstream.cc:596
WvStream::WvStream
WvStream()
Basic constructor for just a do-nothing WvStream.
Definition: wvstream.cc:249
WvStream::personal_stack_size
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
Definition: wvstream.h:48
WvStream::close
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition: wvstream.cc:341
WvStream::seterr
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
WvStream::uses_continue_select
bool uses_continue_select
If this is set, enables the use of continue_select().
Definition: wvstream.h:45
WvString
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
WvStream::force_select
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
Definition: wvstream.cc:1026
WvAttrs
Definition: wvattrs.h:6
WvStream::isreadable
virtual bool isreadable()
Returns true if the stream is readable.
Definition: wvstream.cc:590
WvStream::post_select
bool post_select(SelectInfo &si, const SelectRequest &r)
A more convenient version of post_select() usable for overriding the 'want' value temporarily.
Definition: wvstream.h:346
IWvStream::SelectInfo
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50
WvStream::getline
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
WvStream::continue_getline
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
Definition: wvstream.cc:690
WvStream::terminate_continue_select
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
Definition: wvstream.cc:1117
WvStream::_callwrap
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
Definition: wvstream.cc:394
WvAddr
Base class for different address types, each of which will have the ability to convert itself to/from...
Definition: wvaddr.h:118
WvBufBase< unsigned char >
Specialization of WvBufBase for unsigned char type buffers intended for use with raw memory buffers.
Definition: wvbuf.h:22
WvStream::_callback
void _callback()
Actually call the registered callfunc and execute().
Definition: wvstream.cc:386
WvStream::xpre_select
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
Definition: wvstream.h:318
WvStream::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstream.cc:875
WvStream::read
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
WvStream::autoforward
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
Definition: wvstream.cc:362
WvStream::unread
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition: wvstream.cc:1191
WvStream::alarm
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1048
WvStream::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstream.cc:445
WvStream
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition: wvstream.h:24
WvStream::getline
char * getline(double wait_msec, char separator='\n', int readahead=1024)
Auto-convert double to time_t.
Definition: wvstream.h:189
WvStream::should_flush
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
Definition: wvstream.cc:724
IWvStream::SelectRequest
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition: iwvstream.h:34
WvDynBufBase< unsigned char >
WvStream::setcallback
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
Definition: wvstream.cc:1130
WvStream::noautoforward
void noautoforward()
Stops autoforwarding.
Definition: wvstream.cc:369
WvStream::get_select_request
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
Definition: wvstream.cc:1020
WvStream::nowrite
virtual void nowrite()
Shuts down the writing side of the stream.
Definition: wvstream.cc:576
WvStream::getline
char * getline(int wait_msec, char separator='\n', int readahead=1024)
Auto-convert int to time_t.
Definition: wvstream.h:182
WvBufBaseCommonImpl::used
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
WvStream::setclosecallback
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
Definition: wvstream.cc:1175
WvStream::undo_force_select
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
Definition: wvstream.cc:1037
WvStream::setwritecallback
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
Definition: wvstream.cc:1155
WvStream::continue_select
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
Definition: wvstream.cc:1088
WvStream::uwrite
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
Definition: wvstream.h:156
WvStream::xpost_select
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
Definition: wvstream.h:339
WvStream::print
size_t print(WVSTRING_FORMAT_DECL)
preformat and write() a string.
Definition: wvstream.h:546
WvStream::src
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
Definition: wvstream.cc:1124
WvStringList
This is a WvList of WvStrings, and is a really handy way to parse strings.
Definition: wvstringlist.h:27
WvStream::select
bool select(time_t msec_timeout, bool readable, bool writable, bool isex=false)
This version of select() sets forceable==false, so we use the exact readable/writable/isexception opt...
Definition: wvstream.h:415
WvStream::outbuf_limit
void outbuf_limit(size_t size)
set the maximum size of outbuf, beyond which a call to write() will return 0.
Definition: wvstream.h:129
WvStream::write
size_t write(WvStringParm s)
print a preformatted WvString to the stream.
Definition: wvstream.h:538
WvStream::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvstream.cc:844
WvStream::noread
virtual void noread()
Shuts down the reading side of the stream.
Definition: wvstream.cc:569
WvStream::stop_read
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
Definition: wvstream.h:57
WvStream::drain
void drain()
drain the input buffer (read and discard data until select(0) returns false)
Definition: wvstream.cc:699