WvStreams
wvstream.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * Unified support for streams, that is, sequences of bytes that may or
6  * may not be ready for read/write at any given time.
7  *
8  * We provide typical read and write routines, as well as a select() function
9  * for each stream.
10  */
11 #include <time.h>
12 #include <sys/types.h>
13 #include <assert.h>
14 #define __WVSTREAM_UNIT_TEST 1
15 #include "wvstream.h"
16 #include "wvtimeutils.h"
17 #include "wvcont.h"
18 #include "wvstreamsdebugger.h"
19 #include "wvstrutils.h"
20 #include "wvistreamlist.h"
21 #include "wvlinkerhack.h"
22 #include "wvmoniker.h"
23 
24 #ifdef _WIN32
25 #define ENOBUFS WSAENOBUFS
26 #undef errno
27 #define errno GetLastError()
28 #ifdef __GNUC__
29 #include <sys/socket.h>
30 #endif
31 #include "streams.h"
32 #else
33 #include <errno.h>
34 #endif
35 
36 #include <map>
37 
38 using std::make_pair;
39 using std::map;
40 
41 
42 // enable this to add some read/write trace messages (this can be VERY
43 // verbose)
44 #if 0
45 # ifndef _MSC_VER
46 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
47 # else
48 # define TRACE printf
49 # endif
50 #else
51 # ifndef _MSC_VER
52 # define TRACE(x, y...)
53 # else
54 # define TRACE
55 # endif
56 #endif
57 
58 WvStream *WvStream::globalstream = NULL;
59 
64 
65 
66 static map<WSID, WvStream*> *wsid_map;
67 static WSID next_wsid_to_try;
68 
69 
70 WV_LINK(WvStream);
71 
72 static IWvStream *create_null(WvStringParm, IObject *)
73 {
74  return new WvStream();
75 }
76 
77 static WvMoniker<IWvStream> reg("null", create_null);
78 
79 
80 IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
81 {
82  IWvStream *s = wvcreate<IWvStream>(moniker, obj);
83  if (!s)
84  {
85  s = new WvStream();
86  s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
87  WVRELEASE(obj); // we're not going to use it after all
88  }
89  return s;
90 }
91 
92 
93 static bool is_prefix_insensitive(const char *str, const char *prefix)
94 {
95  size_t len = strlen(prefix);
96  return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
97 }
98 
99 
100 static const char *strstr_insensitive(const char *haystack, const char *needle)
101 {
102  while (*haystack != '\0')
103  {
104  if (is_prefix_insensitive(haystack, needle))
105  return haystack;
106  ++haystack;
107  }
108  return NULL;
109 }
110 
111 
112 static bool contains_insensitive(const char *haystack, const char *needle)
113 {
114  return strstr_insensitive(haystack, needle) != NULL;
115 }
116 
117 
118 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119 static inline const char *Yes_No(bool val)
120 {
121  return val? "Yes": "No";
122 }
123 
124 
125 void WvStream::debugger_streams_display_header(WvStringParm cmd,
126  WvStreamsDebugger::ResultCallback result_cb)
127 {
128  WvStringList result;
129  result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
130  "----------------Type", "-", "Name--------------------");
131  result_cb(cmd, result);
132 }
133 
134 
135 // Set to fit in 6 chars
136 static WvString friendly_ms(time_t ms)
137 {
138  if (ms <= 0)
139  return WvString("(%s)", ms);
140  else if (ms < 1000)
141  return WvString("%sms", ms);
142  else if (ms < 60*1000)
143  return WvString("%ss", ms/1000);
144  else if (ms < 60*60*1000)
145  return WvString("%sm", ms/(60*1000));
146  else if (ms <= 24*60*60*1000)
147  return WvString("%sh", ms/(60*60*1000));
148  else
149  return WvString("%sd", ms/(24*60*60*1000));
150 }
151 
152 void WvStream::debugger_streams_display_one_stream(WvStream *s,
153  WvStringParm cmd,
154  WvStreamsDebugger::ResultCallback result_cb)
155 {
156  WvStringList result;
157  s->addRef();
158  unsigned refcount = s->release();
159  result.append(list_format,
160  s->wsid(), " ",
161  refcount, " ",
162  Yes_No(s->isok()), " ",
163  Yes_No(s->uses_continue_select), " ",
164  friendly_ms(s->alarm_remaining()), " ",
165  s->wstype(), " ",
166  s->wsname());
167  result_cb(cmd, result);
168 }
169 
170 
171 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
172  WvStringParm cmd,
173  const WvStringList &args,
174  WvStreamsDebugger::ResultCallback result_cb)
175 {
176  bool show = args.isempty();
177  WvStringList::Iter arg(args);
178  for (arg.rewind(); arg.next(); )
179  {
180  WSID wsid;
181  bool is_num = wvstring_to_num(*arg, wsid);
182 
183  if (is_num)
184  {
185  if (s->wsid() == wsid)
186  {
187  show = true;
188  break;
189  }
190  }
191  else
192  {
193  if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194  || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
195  {
196  show = true;
197  break;
198  }
199  }
200  }
201  if (show)
202  debugger_streams_display_one_stream(s, cmd, result_cb);
203 }
204 
205 
206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
207  WvStringList &args,
208  WvStreamsDebugger::ResultCallback result_cb, void *)
209 {
210  debugger_streams_display_header(cmd, result_cb);
211  if (wsid_map)
212  {
213  map<WSID, WvStream*>::iterator it;
214 
215  for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216  debugger_streams_maybe_display_one_stream(it->second, cmd, args,
217  result_cb);
218  }
219 
220  return WvString::null;
221 }
222 
223 
224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
225  WvStringList &args,
226  WvStreamsDebugger::ResultCallback result_cb, void *)
227 {
228  if (args.isempty())
229  return WvString("Usage: %s <WSID>", cmd);
230  WSID wsid;
231  WvString wsid_str = args.popstr();
232  if (!wvstring_to_num(wsid_str, wsid))
233  return WvString("Invalid WSID '%s'", wsid_str);
234  IWvStream *s = WvStream::find_by_wsid(wsid);
235  if (!s)
236  return WvString("No such stream");
237  s->close();
238  return WvString::null;
239 }
240 
241 
242 void WvStream::add_debugger_commands()
243 {
244  WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
245  WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
246 }
247 
248 
250  read_requires_writable(NULL),
251  write_requires_readable(NULL),
252  uses_continue_select(false),
253  personal_stack_size(131072),
254  alarm_was_ticking(false),
255  stop_read(false),
256  stop_write(false),
257  closed(false),
258  readcb(wv::bind(&WvStream::legacy_callback, this)),
259  max_outbuf_size(0),
260  outbuf_delayed_flush(false),
261  is_auto_flush(true),
262  want_to_flush(true),
263  is_flushing(false),
264  queue_min(0),
265  autoclose_time(0),
266  alarm_time(wvtime_zero),
267  last_alarm_check(wvtime_zero)
268 {
269  TRACE("Creating wvstream %p\n", this);
270 
271  static bool first = true;
272  if (first)
273  {
274  first = false;
275  WvStream::add_debugger_commands();
276  }
277 
278  // Choose a wsid;
279  if (!wsid_map)
280  wsid_map = new map<WSID, WvStream*>;
281  WSID first_wsid_tried = next_wsid_to_try;
282  do
283  {
284  if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
285  break;
286  ++next_wsid_to_try;
287  } while (next_wsid_to_try != first_wsid_tried);
288  my_wsid = next_wsid_to_try++;
289  bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
290  assert(inserted);
291 
292 #ifdef _WIN32
293  WSAData wsaData;
294  int result = WSAStartup(MAKEWORD(2,0), &wsaData);
295  assert(result == 0);
296 #endif
297 }
298 
299 
300 // FIXME: interfaces (IWvStream) shouldn't have implementations!
301 IWvStream::IWvStream()
302 {
303 }
304 
305 
306 IWvStream::~IWvStream()
307 {
308 }
309 
310 
311 WvStream::~WvStream()
312 {
313  TRACE("destroying %p\n", this);
314  close();
315 
316  // if this assertion fails, then uses_continue_select is true, but you
317  // didn't call terminate_continue_select() or close() before destroying
318  // your object. Shame on you!
319  assert(!uses_continue_select || !call_ctx);
320 
321  call_ctx = 0; // finish running the suspended callback, if any
322 
323  assert(wsid_map);
324  wsid_map->erase(my_wsid);
325  if (wsid_map->empty())
326  {
327  delete wsid_map;
328  wsid_map = NULL;
329  }
330 
331  // eventually, streams will auto-add themselves to the globallist. But
332  // even before then, it'll never be useful for them to be on the
333  // globallist *after* they get destroyed, so we might as well auto-remove
334  // them already. It's harmless for people to try to remove them twice.
335  WvIStreamList::globallist.unlink(this);
336 
337  TRACE("done destroying %p\n", this);
338 }
339 
340 
342 {
343  TRACE("flushing in wvstream...\n");
344  flush(2000); // fixme: should not hardcode this stuff
345  TRACE("(flushed)\n");
346 
347  closed = true;
348 
349  if (!!closecb)
350  {
351  IWvStreamCallback cb = closecb;
352  closecb = 0; // ensure callback is only called once
353  cb();
354  }
355 
356  // I would like to delete call_ctx here, but then if someone calls
357  // close() from *inside* a continuable callback, we explode. Oops!
358  //call_ctx = 0; // destroy the context, if necessary
359 }
360 
361 
363 {
364  setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
366 }
367 
368 
370 {
371  setcallback(0);
372  read_requires_writable = NULL;
373 }
374 
375 
376 void WvStream::autoforward_callback(WvStream &input, WvStream &output)
377 {
378  char buf[1024];
379  size_t len;
380 
381  len = input.read(buf, sizeof(buf));
382  output.write(buf, len);
383 }
384 
385 
387 {
388  execute();
389  if (!!callfunc)
390  callfunc();
391 }
392 
393 
394 void *WvStream::_callwrap(void *)
395 {
396  _callback();
397  return NULL;
398 }
399 
400 
402 {
403  TRACE("(?)");
404 
405  // if the alarm has gone off and we're calling callback... good!
406  if (alarm_remaining() == 0)
407  {
408  alarm_time = wvtime_zero;
409  alarm_was_ticking = true;
410  }
411  else
412  alarm_was_ticking = false;
413 
414  assert(!uses_continue_select || personal_stack_size >= 1024);
415 
416 #define TEST_CONTINUES_HARSHLY 0
417 #if TEST_CONTINUES_HARSHLY
418 #ifndef _WIN32
419 # warning "Using WvCont for *all* streams for testing!"
420 #endif
421  if (1)
422 #else
424 #endif
425  {
426  if (!call_ctx) // no context exists yet!
427  {
428  call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
430  }
431 
432  call_ctx(NULL);
433  }
434  else
435  _callback();
436 
437  // if this assertion fails, a derived class's virtual execute() function
438  // didn't call its parent's execute() function, and we didn't make it
439  // all the way back up to WvStream::execute(). This doesn't always
440  // matter right now, but it could lead to obscure bugs later, so we'll
441  // enforce it.
442 }
443 
444 
445 bool WvStream::isok() const
446 {
447  return !closed && WvErrorBase::isok();
448 }
449 
450 
451 void WvStream::seterr(int _errnum)
452 {
453  if (!geterr()) // no pre-existing error
454  {
455  WvErrorBase::seterr(_errnum);
456  close();
457  }
458 }
459 
460 
461 size_t WvStream::read(WvBuf &outbuf, size_t count)
462 {
463  // for now, just wrap the older read function
464  size_t free = outbuf.free();
465  if (count > free)
466  count = free;
467 
468  WvDynBuf tmp;
469  unsigned char *buf = tmp.alloc(count);
470  size_t len = read(buf, count);
471  tmp.unalloc(count - len);
472  outbuf.merge(tmp);
473  return len;
474 }
475 
476 
477 size_t WvStream::write(WvBuf &inbuf, size_t count)
478 {
479  // for now, just wrap the older write function
480  size_t avail = inbuf.used();
481  if (count > avail)
482  count = avail;
483  const unsigned char *buf = inbuf.get(count);
484  size_t len = write(buf, count);
485  inbuf.unget(count - len);
486  return len;
487 }
488 
489 
490 size_t WvStream::read(void *buf, size_t count)
491 {
492  assert(!count || buf);
493 
494  size_t bufu, i;
495  unsigned char *newbuf;
496 
497  bufu = inbuf.used();
498  if (bufu < queue_min)
499  {
500  newbuf = inbuf.alloc(queue_min - bufu);
501  assert(newbuf);
502  i = uread(newbuf, queue_min - bufu);
503  inbuf.unalloc(queue_min - bufu - i);
504 
505  bufu = inbuf.used();
506  }
507 
508  if (bufu < queue_min)
509  {
510  maybe_autoclose();
511  return 0;
512  }
513 
514  // if buffer is empty, do a hard read
515  if (!bufu)
516  bufu = uread(buf, count);
517  else
518  {
519  // otherwise just read from the buffer
520  if (bufu > count)
521  bufu = count;
522 
523  memcpy(buf, inbuf.get(bufu), bufu);
524  }
525 
526  TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
527  maybe_autoclose();
528  return bufu;
529 }
530 
531 
532 size_t WvStream::write(const void *buf, size_t count)
533 {
534  assert(!count || buf);
535  if (!isok() || !buf || !count || stop_write) return 0;
536 
537  size_t wrote = 0;
538  if (!outbuf_delayed_flush && !outbuf.used())
539  {
540  wrote = uwrite(buf, count);
541  count -= wrote;
542  buf = (const unsigned char *)buf + wrote;
543  // if (!count) return wrote; // short circuit if no buffering needed
544  }
545  if (max_outbuf_size != 0)
546  {
547  size_t canbuffer = max_outbuf_size - outbuf.used();
548  if (count > canbuffer)
549  count = canbuffer; // can't write the whole amount
550  }
551  if (count != 0)
552  {
553  outbuf.put(buf, count);
554  wrote += count;
555  }
556 
557  if (should_flush())
558  {
559  if (is_auto_flush)
560  flush(0);
561  else
562  flush_outbuf(0);
563  }
564 
565  return wrote;
566 }
567 
568 
570 {
571  stop_read = true;
572  maybe_autoclose();
573 }
574 
575 
577 {
578  stop_write = true;
579  maybe_autoclose();
580 }
581 
582 
584 {
585  if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
586  close();
587 }
588 
589 
591 {
592  return isok() && select(0, true, false, false);
593 }
594 
595 
597 {
598  return !stop_write && isok() && select(0, false, true, false);
599 }
600 
601 
602 char *WvStream::blocking_getline(time_t wait_msec, int separator,
603  int readahead)
604 {
605  assert(separator >= 0);
606  assert(separator <= 255);
607 
608  //assert(uses_continue_select || wait_msec == 0);
609 
610  WvTime timeout_time(0);
611  if (wait_msec > 0)
612  timeout_time = msecadd(wvtime(), wait_msec);
613 
614  maybe_autoclose();
615 
616  // if we get here, we either want to wait a bit or there is data
617  // available.
618  while (isok())
619  {
620  // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
621  queuemin(0);
622 
623  // if there is a newline already, we have enough data.
624  if (inbuf.strchr(separator) > 0)
625  break;
626  else if (!isok() || stop_read) // uh oh, stream is in trouble.
627  break;
628 
629  // make select not return true until more data is available
630  queuemin(inbuf.used() + 1);
631 
632  // compute remaining timeout
633  if (wait_msec > 0)
634  {
635  wait_msec = msecdiff(timeout_time, wvtime());
636  if (wait_msec < 0)
637  wait_msec = 0;
638  }
639 
640  // FIXME: this is blocking_getline. It shouldn't
641  // call continue_select()!
642  bool hasdata;
643  if (wait_msec != 0 && uses_continue_select)
644  hasdata = continue_select(wait_msec);
645  else
646  hasdata = select(wait_msec, true, false);
647 
648  if (!isok())
649  break;
650 
651  if (hasdata)
652  {
653  // read a few bytes
654  WvDynBuf tmp;
655  unsigned char *buf = tmp.alloc(readahead);
656  assert(buf);
657  size_t len = uread(buf, readahead);
658  tmp.unalloc(readahead - len);
659  inbuf.put(tmp.get(len), len);
660  hasdata = len > 0; // enough?
661  }
662 
663  if (!isok())
664  break;
665 
666  if (!hasdata && wait_msec == 0)
667  return NULL; // handle timeout
668  }
669  if (!inbuf.used())
670  return NULL;
671 
672  // return the appropriate data
673  size_t i = 0;
674  i = inbuf.strchr(separator);
675  if (i > 0) {
676  char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
677  assert(eol && *eol == separator);
678  *eol = 0;
679  return const_cast<char*>((const char *)inbuf.get(i));
680  } else {
681  // handle "EOF without newline" condition
682  // FIXME: it's very silly that buffers can't return editable
683  // char* arrays.
684  inbuf.alloc(1)[0] = 0; // null-terminate it
685  return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
686  }
687 }
688 
689 
690 char *WvStream::continue_getline(time_t wait_msec, int separator,
691  int readahead)
692 {
693  assert(false && "not implemented, come back later!");
694  assert(uses_continue_select);
695  return NULL;
696 }
697 
698 
700 {
701  char buf[1024];
702  while (isreadable())
703  read(buf, sizeof(buf));
704 }
705 
706 
707 bool WvStream::flush(time_t msec_timeout)
708 {
709  if (is_flushing) return false;
710 
711  TRACE("%p flush starts\n", this);
712 
713  is_flushing = true;
714  want_to_flush = true;
715  bool done = flush_internal(msec_timeout) // any other internal buffers
716  && flush_outbuf(msec_timeout); // our own outbuf
717  is_flushing = false;
718 
719  TRACE("flush stops (%d)\n", done);
720  return done;
721 }
722 
723 
725 {
726  return want_to_flush;
727 }
728 
729 
730 bool WvStream::flush_outbuf(time_t msec_timeout)
731 {
732  TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
733  bool outbuf_was_used = outbuf.used();
734 
735  // do-nothing shortcut for speed
736  // FIXME: definitely makes a "measurable" difference...
737  // but is it worth the risk?
738  if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
739  {
740  maybe_autoclose();
741  return true;
742  }
743 
744  WvTime stoptime = msecadd(wvtime(), msec_timeout);
745 
746  // flush outbuf
747  while (outbuf_was_used && isok())
748  {
749 // fprintf(stderr, "%p: fd:%d/%d, used:%d\n",
750 // this, getrfd(), getwfd(), outbuf.used());
751 
752  size_t attempt = outbuf.optgettable();
753  size_t real = uwrite(outbuf.get(attempt), attempt);
754 
755  // WARNING: uwrite() may have messed up our outbuf!
756  // This probably only happens if uwrite() closed the stream because
757  // of an error, so we'll check isok().
758  if (isok() && real < attempt)
759  {
760  TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
761  assert(outbuf.ungettable() >= attempt - real);
762  outbuf.unget(attempt - real);
763  }
764 
765  // since post_select() can call us, and select() calls post_select(),
766  // we need to be careful not to call select() if we don't need to!
767  // post_select() will only call us with msec_timeout==0, and we don't
768  // need to do select() in that case anyway.
769  if (!msec_timeout)
770  break;
771  if (msec_timeout >= 0
772  && (stoptime < wvtime() || !select(msec_timeout, false, true)))
773  break;
774 
775  outbuf_was_used = outbuf.used();
776  }
777 
778  // handle autoclose
779  if (autoclose_time && isok())
780  {
781  time_t now = time(NULL);
782  TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783  this, now - autoclose_time, outbuf.used());
784  if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
785  {
786  autoclose_time = 0; // avoid infinite recursion!
787  close();
788  }
789  }
790 
791  TRACE("flush_outbuf: after autoclose chunk\n");
792  if (outbuf_delayed_flush && !outbuf_was_used)
793  want_to_flush = false;
794 
795  TRACE("flush_outbuf: now isok=%d\n", isok());
796 
797  // if we can't flush the outbuf, at least empty it!
798  if (outbuf_was_used && !isok())
799  outbuf.zap();
800 
801  maybe_autoclose();
802  TRACE("flush_outbuf stops\n");
803 
804  return !outbuf_was_used;
805 }
806 
807 
808 bool WvStream::flush_internal(time_t msec_timeout)
809 {
810  // once outbuf emptied, that's it for most streams
811  return true;
812 }
813 
814 
815 int WvStream::getrfd() const
816 {
817  return -1;
818 }
819 
820 
821 int WvStream::getwfd() const
822 {
823  return -1;
824 }
825 
826 
827 void WvStream::flush_then_close(int msec_timeout)
828 {
829  time_t now = time(NULL);
830  autoclose_time = now + (msec_timeout + 999) / 1000;
831 
832  TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833  this, outbuf.used(), autoclose_time - now);
834 
835  // as a fast track, we _could_ close here: but that's not a good idea,
836  // since flush_then_close() deals with obscure situations, and we don't
837  // want the caller to use it incorrectly. So we make things _always_
838  // break when the caller forgets to call select() later.
839 
840  flush(0);
841 }
842 
843 
845 {
846  maybe_autoclose();
847 
848  time_t alarmleft = alarm_remaining();
849 
850  if (!isok() || (!si.inherit_request && alarmleft == 0))
851  {
852  si.msec_timeout = 0;
853  return; // alarm has rung
854  }
855 
856  if (!si.inherit_request)
857  {
858  si.wants.readable |= static_cast<bool>(readcb);
859  si.wants.writable |= static_cast<bool>(writecb);
860  si.wants.isexception |= static_cast<bool>(exceptcb);
861  }
862 
863  // handle read-ahead buffering
864  if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
865  {
866  si.msec_timeout = 0; // already ready
867  return;
868  }
869  if (alarmleft >= 0
870  && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871  si.msec_timeout = alarmleft + 10;
872 }
873 
874 
876 {
877  if (!si.inherit_request)
878  {
879  si.wants.readable |= static_cast<bool>(readcb);
880  si.wants.writable |= static_cast<bool>(writecb);
881  si.wants.isexception |= static_cast<bool>(exceptcb);
882  }
883 
884  // FIXME: need sane buffer flush support for non FD-based streams
885  // FIXME: need read_requires_writable and write_requires_readable
886  // support for non FD-based streams
887 
888  // note: flush(nonzero) might call select(), but flush(0) never does,
889  // so this is safe.
890  if (should_flush())
891  flush(0);
892  if (!si.inherit_request && alarm_remaining() == 0)
893  return true; // alarm ticked
894  if ((si.wants.readable || (!si.inherit_request && readcb))
895  && inbuf.used() && inbuf.used() >= queue_min)
896  return true; // already ready
897  return false;
898 }
899 
900 
901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902  bool readable, bool writable, bool isexcept, bool forceable)
903 {
904  FD_ZERO(&si.read);
905  FD_ZERO(&si.write);
906  FD_ZERO(&si.except);
907 
908  if (forceable)
909  {
910  si.wants.readable = static_cast<bool>(readcb);
911  si.wants.writable = static_cast<bool>(writecb);
912  si.wants.isexception = static_cast<bool>(exceptcb);
913  }
914  else
915  {
916  si.wants.readable = readable;
917  si.wants.writable = writable;
918  si.wants.isexception = isexcept;
919  }
920 
921  si.max_fd = -1;
922  si.msec_timeout = msec_timeout;
923  si.inherit_request = ! forceable;
924  si.global_sure = false;
925 
926  wvstime_sync();
927 
928  pre_select(si);
929  if (globalstream && forceable && (globalstream != this))
930  {
931  WvStream *s = globalstream;
932  globalstream = NULL; // prevent recursion
933  s->xpre_select(si, SelectRequest(false, false, false));
934  globalstream = s;
935  }
936 }
937 
938 
939 int WvStream::_do_select(SelectInfo &si)
940 {
941  // prepare timeout
942  timeval tv;
943  tv.tv_sec = si.msec_timeout / 1000;
944  tv.tv_usec = (si.msec_timeout % 1000) * 1000;
945 
946 #ifdef _WIN32
947  // selecting on an empty set of sockets doesn't cause a delay in win32.
948  SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949  FD_SET(fakefd, &si.except);
950 #endif
951 
952  // block
953  int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
954  si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
955 
956  // handle errors.
957  // EAGAIN and EINTR don't matter because they're totally normal.
958  // ENOBUFS is hopefully transient.
959  // EBADF is kind of gross and might imply that something is wrong,
960  // but it happens sometimes...
961  if (sel < 0
962  && errno != EAGAIN && errno != EINTR
963  && errno != EBADF
964  && errno != ENOBUFS
965  )
966  {
967  seterr(errno);
968  }
969 #ifdef _WIN32
970  ::close(fakefd);
971 #endif
972  TRACE("select() returned %d\n", sel);
973  return sel;
974 }
975 
976 
977 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
978 {
979  // We cannot move the clock backward here, because timers that
980  // were expired in pre_select could then not be expired anymore,
981  // and while time going backward is rather unsettling in general,
982  // for it to be happening between pre_select and post_select is
983  // just outright insanity.
984  wvstime_sync_forward();
985 
986  bool sure = post_select(si);
987  if (globalstream && forceable && (globalstream != this))
988  {
989  WvStream *s = globalstream;
990  globalstream = NULL; // prevent recursion
991  si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
992  || si.global_sure;
993  globalstream = s;
994  }
995  return sure;
996 }
997 
998 
999 bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
1000  bool isexcept, bool forceable)
1001 {
1002  // Detect use of deleted stream
1003  assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1004 
1005  SelectInfo si;
1006  _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1007  forceable);
1008 
1009  bool sure = false;
1010  int sel = _do_select(si);
1011  if (sel >= 0)
1012  sure = _process_selectinfo(si, forceable);
1013  if (si.global_sure && globalstream && forceable && (globalstream != this))
1014  globalstream->callback();
1015 
1016  return sure;
1017 }
1018 
1019 
1021 {
1022  return IWvStream::SelectRequest(static_cast<bool>(readcb), static_cast<bool>(writecb), static_cast<bool>(exceptcb));
1023 }
1024 
1025 
1026 void WvStream::force_select(bool readable, bool writable, bool isexception)
1027 {
1028  if (readable)
1029  readcb = wv::bind(&WvStream::legacy_callback, this);
1030  if (writable)
1031  writecb = wv::bind(&WvStream::legacy_callback, this);
1032  if (isexception)
1033  exceptcb = wv::bind(&WvStream::legacy_callback, this);
1034 }
1035 
1036 
1037 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
1038 {
1039  if (readable)
1040  readcb = 0;
1041  if (writable)
1042  writecb = 0;
1043  if (isexception)
1044  exceptcb = 0;
1045 }
1046 
1047 
1048 void WvStream::alarm(time_t msec_timeout)
1049 {
1050  if (msec_timeout >= 0)
1051  alarm_time = msecadd(wvstime(), msec_timeout);
1052  else
1053  alarm_time = wvtime_zero;
1054 }
1055 
1056 
1058 {
1059  if (alarm_time.tv_sec)
1060  {
1061  WvTime now = wvstime();
1062 
1063  // Time is going backward!
1064  if (now < last_alarm_check)
1065  {
1066 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
1067  // warn only if it's a "big" difference (sigh...)
1068  if (msecdiff(last_alarm_check, now) > 200)
1069  fprintf(stderr, " ************* TIME WENT BACKWARDS! "
1070  "(%ld:%ld %ld:%ld)\n",
1071  last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1072  now.tv_sec, now.tv_usec);
1073 #endif
1074  alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1075  }
1076 
1077  last_alarm_check = now;
1078 
1079  time_t remaining = msecdiff(alarm_time, now);
1080  if (remaining < 0)
1081  remaining = 0;
1082  return remaining;
1083  }
1084  return -1;
1085 }
1086 
1087 
1088 bool WvStream::continue_select(time_t msec_timeout)
1089 {
1090  assert(uses_continue_select);
1091 
1092  // if this assertion triggers, you probably tried to do continue_select()
1093  // while inside terminate_continue_select().
1094  assert(call_ctx);
1095 
1096  if (msec_timeout >= 0)
1097  alarm(msec_timeout);
1098 
1099  alarm(msec_timeout);
1100  WvCont::yield();
1101  alarm(-1); // cancel the still-pending alarm, or it might go off later!
1102 
1103  // when we get here, someone has jumped back into our task.
1104  // We have to select(0) here because it's possible that the alarm was
1105  // ticking _and_ data was available. This is aggravated especially if
1106  // msec_delay was zero. Note that running select() here isn't
1107  // inefficient, because if the alarm was expired then pre_select()
1108  // returned true anyway and short-circuited the previous select().
1109  TRACE("hello-%p\n", this);
1110  return !alarm_was_ticking || select(0,
1111  static_cast<bool>(readcb),
1112  static_cast<bool>(writecb),
1113  static_cast<bool>(exceptcb));
1114 }
1115 
1116 
1118 {
1119  close();
1120  call_ctx = 0; // destroy the context, if necessary
1121 }
1122 
1123 
1124 const WvAddr *WvStream::src() const
1125 {
1126  return NULL;
1127 }
1128 
1129 
1130 void WvStream::setcallback(IWvStreamCallback _callfunc)
1131 {
1132  callfunc = _callfunc;
1133  call_ctx = 0; // delete any in-progress WvCont
1134 }
1135 
1136 
1137 void WvStream::legacy_callback()
1138 {
1139  execute();
1140  if (!!callfunc)
1141  callfunc();
1142 }
1143 
1144 
1145 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
1146 {
1147  IWvStreamCallback tmp = readcb;
1148 
1149  readcb = _callback;
1150 
1151  return tmp;
1152 }
1153 
1154 
1155 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
1156 {
1157  IWvStreamCallback tmp = writecb;
1158 
1159  writecb = _callback;
1160 
1161  return tmp;
1162 }
1163 
1164 
1165 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
1166 {
1167  IWvStreamCallback tmp = exceptcb;
1168 
1169  exceptcb = _callback;
1170 
1171  return tmp;
1172 }
1173 
1174 
1175 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
1176 {
1177  IWvStreamCallback tmp = closecb;
1178  if (isok())
1179  closecb = _callback;
1180  else
1181  {
1182  // already closed? notify immediately!
1183  closecb = 0;
1184  if (!!_callback)
1185  _callback();
1186  }
1187  return tmp;
1188 }
1189 
1190 
1191 void WvStream::unread(WvBuf &unreadbuf, size_t count)
1192 {
1193  WvDynBuf tmp;
1194  tmp.merge(unreadbuf, count);
1195  tmp.merge(inbuf);
1196  inbuf.zap();
1197  inbuf.merge(tmp);
1198 }
1199 
1200 
1201 IWvStream *WvStream::find_by_wsid(WSID wsid)
1202 {
1203  IWvStream *retval = NULL;
1204 
1205  if (wsid_map)
1206  {
1207  map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1208 
1209  if (it != wsid_map->end())
1210  retval = it->second;
1211  }
1212 
1213  return retval;
1214 }
WvBufBaseCommonImpl::get
const T * get(size_t count)
Reads exactly the specified number of elements and returns a pointer to a storage location owned by t...
Definition: wvbufbase.h:114
WvStream::maybe_autoclose
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
Definition: wvstream.cc:583
WvStream::alarm_remaining
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition: wvstream.cc:1057
WvStream::write
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
WvErrorBase::geterr
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition: wverror.h:48
WvStringList::popstr
WvString popstr()
get the first string in the list, or an empty string if the list is empty.
Definition: wvstringlist.cc:55
WvBufBaseCommonImpl::unget
void unget(size_t count)
Ungets exactly the specified number of elements by returning them to the buffer for subsequent reads.
Definition: wvbufbase.h:177
WvStream::blocking_getline
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
Definition: wvstream.cc:602
WvStream::flush_then_close
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
Definition: wvstream.cc:827
WvBufBaseCommonImpl::alloc
T * alloc(size_t count)
Allocates exactly the specified number of elements and returns a pointer to an UNINITIALIZED storage ...
Definition: wvbufbase.h:379
WvStream::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvstream.h:652
WvStream::read_requires_writable
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
Definition: wvstream.h:36
WvStream::uread
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
Definition: wvstream.h:146
WvBufBaseCommonImpl::unalloc
void unalloc(size_t count)
Unallocates exactly the specified number of elements by removing them from the buffer and releasing t...
Definition: wvbufbase.h:421
WvStream::flush
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
Definition: wvstream.cc:707
UUID_MAP_ENTRY
#define UUID_MAP_ENTRY(iface)
Add an entry to an interface map.
Definition: utils.h:68
WvBufBaseCommonImpl::free
size_t free() const
Returns the number of elements that the buffer can currently accept for writing.
Definition: wvbufbase.h:353
WvBufBaseCommonImpl::optgettable
size_t optgettable() const
Returns the optimal maximum number of elements in the buffer currently available for reading without ...
Definition: wvbufbase.h:154
WvBufBaseCommonImpl::ungettable
size_t ungettable() const
Returns the maximum number of elements that may be ungotten at this time.
Definition: wvbufbase.h:188
WvStream::select
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
Definition: wvstream.h:376
UUID_MAP_BEGIN
#define UUID_MAP_BEGIN(component)
Start the interface map for "component".
Definition: utils.h:63
WvStream::alarm_was_ticking
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition: wvstream.h:54
WvTime
Based on (and interchangeable with) struct timeval.
Definition: wvtimeutils.h:17
WvErrorBase::isok
virtual bool isok() const
By default, returns true if geterr() == 0.
Definition: wverror.h:39
IWvStream
Definition: iwvstream.h:24
WvStream::callback
virtual void callback()
if the stream has a callback function defined, call it now.
Definition: wvstream.cc:401
WvStream::queuemin
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
Definition: wvstream.h:232
WvErrorBase::seterr
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
Definition: wverror.cc:144
WvStream::setexceptcallback
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
Definition: wvstream.cc:1165
WvStream::setreadcallback
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
Definition: wvstream.cc:1145
WvStream::iswritable
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Definition: wvstream.cc:596
WvStream::WvStream
WvStream()
Basic constructor for just a do-nothing WvStream.
Definition: wvstream.cc:249
WvStream::personal_stack_size
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
Definition: wvstream.h:48
WvStream::close
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition: wvstream.cc:341
WvStream::seterr
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
WvStream::uses_continue_select
bool uses_continue_select
If this is set, enables the use of continue_select().
Definition: wvstream.h:45
WvString
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
WvCont
WvCont provides "continuations", which are apparently also known as semi-coroutines.
Definition: wvcont.h:29
WvStream::force_select
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
Definition: wvstream.cc:1026
WvStream::isreadable
virtual bool isreadable()
Returns true if the stream is readable.
Definition: wvstream.cc:590
wvstrutils.h
IWvStream::SelectInfo
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50
WvStream::continue_getline
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
Definition: wvstream.cc:690
WvStream::terminate_continue_select
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
Definition: wvstream.cc:1117
WvStream::_callwrap
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
Definition: wvstream.cc:394
WvMoniker
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition: wvmoniker.h:61
WvAddr
Base class for different address types, each of which will have the ability to convert itself to/from...
Definition: wvaddr.h:118
WvBufBase< unsigned char >
Specialization of WvBufBase for unsigned char type buffers intended for use with raw memory buffers.
Definition: wvbuf.h:22
WvStream::_callback
void _callback()
Actually call the registered callfunc and execute().
Definition: wvstream.cc:386
WvStream::xpre_select
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
Definition: wvstream.h:318
WvStream::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstream.cc:875
IObject::release
virtual unsigned int release()=0
Indicate that you are finished using this object.
WvStream::read
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
UUID_MAP_END
#define UUID_MAP_END
Marks the end of an interface map.
Definition: utils.h:80
WvBufBaseCommonImpl::zap
void zap()
Clears the buffer.
Definition: wvbufbase.h:257
WvStream::autoforward
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
Definition: wvstream.cc:362
WvStream::unread
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition: wvstream.cc:1191
IObject
Definition: IObject.h:65
WvStream::alarm
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1048
WvStream::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstream.cc:445
WvStream
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition: wvstream.h:24
WvStream::should_flush
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
Definition: wvstream.cc:724
IWvStream::SelectRequest
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition: iwvstream.h:34
WvDynBufBase< unsigned char >
WvStream::setcallback
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
Definition: wvstream.cc:1130
WvStream::noautoforward
void noautoforward()
Stops autoforwarding.
Definition: wvstream.cc:369
WvStream::get_select_request
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
Definition: wvstream.cc:1020
WvStream::nowrite
virtual void nowrite()
Shuts down the writing side of the stream.
Definition: wvstream.cc:576
WvBufBaseCommonImpl::used
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
WvStream::setclosecallback
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
Definition: wvstream.cc:1175
WvStream::undo_force_select
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
Definition: wvstream.cc:1037
WvStream::setwritecallback
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
Definition: wvstream.cc:1155
WvStream::continue_select
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
Definition: wvstream.cc:1088
WvStream::uwrite
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
Definition: wvstream.h:156
WvStream::xpost_select
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
Definition: wvstream.h:339
WvBufBaseCommonImpl::mutablepeek
T * mutablepeek(int offset, size_t count)
Returns a non-const pointer info the buffer at the specified offset to the specified number of elemen...
Definition: wvbufbase.h:461
WvStream::src
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
Definition: wvstream.cc:1124
WvBufBaseCommonImpl::merge
void merge(Buffer &inbuf, size_t count)
Efficiently moves count bytes from the specified buffer into this one.
Definition: wvbufbase.h:558
WvStringList
This is a WvList of WvStrings, and is a really handy way to parse strings.
Definition: wvstringlist.h:27
WvStream::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvstream.cc:844
IObject::addRef
virtual unsigned int addRef()=0
Indicate you are using this object.
WvBufBase< unsigned char >::strchr
size_t strchr(int ch)
Returns the number of characters that would have to be read to find the first instance of the charact...
Definition: wvbuffer.cc:46
WvCont::yield
static void * yield(void *ret=0)
"return" from the current callback, giving value 'ret' to the person who called us.
Definition: wvcont.cc:222
WvStream::noread
virtual void noread()
Shuts down the reading side of the stream.
Definition: wvstream.cc:569
WvStream::stop_read
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
Definition: wvstream.h:57
WvStream::drain
void drain()
drain the input buffer (read and discard data until select(0) returns false)
Definition: wvstream.cc:699