12 #include <sys/types.h>
14 #define __WVSTREAM_UNIT_TEST 1
16 #include "wvtimeutils.h"
18 #include "wvstreamsdebugger.h"
20 #include "wvistreamlist.h"
21 #include "wvlinkerhack.h"
22 #include "wvmoniker.h"
25 #define ENOBUFS WSAENOBUFS
27 #define errno GetLastError()
29 #include <sys/socket.h>
46 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
52 # define TRACE(x, y...)
58 WvStream *WvStream::globalstream = NULL;
66 static map<WSID, WvStream*> *wsid_map;
67 static WSID next_wsid_to_try;
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
86 s->seterr_both(EINVAL,
"Unknown moniker '%s'", moniker);
93 static bool is_prefix_insensitive(
const char *str,
const char *prefix)
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
100 static const char *strstr_insensitive(
const char *haystack,
const char *needle)
102 while (*haystack !=
'\0')
104 if (is_prefix_insensitive(haystack, needle))
112 static bool contains_insensitive(
const char *haystack,
const char *needle)
114 return strstr_insensitive(haystack, needle) != NULL;
118 static const char *list_format =
"%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119 static inline const char *Yes_No(
bool val)
121 return val?
"Yes":
"No";
125 void WvStream::debugger_streams_display_header(WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
129 result.append(list_format,
"--WSID",
"-",
"RC",
"-",
"-Ok",
"-",
"-Cs",
"-",
"-AlRem",
"-",
130 "----------------Type",
"-",
"Name--------------------");
131 result_cb(cmd, result);
136 static WvString friendly_ms(time_t ms)
142 else if (ms < 60*1000)
144 else if (ms < 60*60*1000)
145 return WvString(
"%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString(
"%sh", ms/(60*60*1000));
149 return WvString(
"%sd", ms/(24*60*60*1000));
152 void WvStream::debugger_streams_display_one_stream(
WvStream *s,
154 WvStreamsDebugger::ResultCallback result_cb)
158 unsigned refcount = s->
release();
159 result.append(list_format,
162 Yes_No(s->
isok()),
" ",
167 result_cb(cmd, result);
171 void WvStream::debugger_streams_maybe_display_one_stream(
WvStream *s,
174 WvStreamsDebugger::ResultCallback result_cb)
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
181 bool is_num = wvstring_to_num(*arg, wsid);
185 if (s->wsid() == wsid)
193 if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194 || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
202 debugger_streams_display_one_stream(s, cmd, result_cb);
206 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
208 WvStreamsDebugger::ResultCallback result_cb,
void *)
210 debugger_streams_display_header(cmd, result_cb);
213 map<WSID, WvStream*>::iterator it;
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
220 return WvString::null;
224 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
226 WvStreamsDebugger::ResultCallback result_cb,
void *)
229 return WvString(
"Usage: %s <WSID>", cmd);
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString(
"Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
238 return WvString::null;
242 void WvStream::add_debugger_commands()
244 WvStreamsDebugger::add_command(
"streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command(
"close", 0, debugger_close_run_cb, 0);
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
258 readcb(wv::bind(&
WvStream::legacy_callback, this)),
260 outbuf_delayed_flush(false),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
269 TRACE(
"Creating wvstream %p\n",
this);
271 static bool first =
true;
275 WvStream::add_debugger_commands();
280 wsid_map =
new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
287 }
while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid,
this)).second;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
301 IWvStream::IWvStream()
306 IWvStream::~IWvStream()
311 WvStream::~WvStream()
313 TRACE(
"destroying %p\n",
this);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
335 WvIStreamList::globallist.unlink(
this);
337 TRACE(
"done destroying %p\n",
this);
343 TRACE(
"flushing in wvstream...\n");
345 TRACE(
"(flushed)\n");
351 IWvStreamCallback cb = closecb;
364 setcallback(wv::bind(autoforward_callback, wv::ref(*
this), wv::ref(s)));
381 len = input.
read(buf,
sizeof(buf));
382 output.
write(buf, len);
408 alarm_time = wvtime_zero;
416 #define TEST_CONTINUES_HARSHLY 0
417 #if TEST_CONTINUES_HARSHLY
419 # warning "Using WvCont for *all* streams for testing!"
464 size_t free = outbuf.
free();
469 unsigned char *buf = tmp.
alloc(count);
470 size_t len =
read(buf, count);
480 size_t avail = inbuf.
used();
483 const unsigned char *buf = inbuf.
get(count);
484 size_t len =
write(buf, count);
485 inbuf.
unget(count - len);
492 assert(!count || buf);
495 unsigned char *newbuf;
498 if (bufu < queue_min)
500 newbuf = inbuf.
alloc(queue_min - bufu);
502 i =
uread(newbuf, queue_min - bufu);
503 inbuf.
unalloc(queue_min - bufu - i);
508 if (bufu < queue_min)
516 bufu =
uread(buf, count);
523 memcpy(buf, inbuf.
get(bufu), bufu);
526 TRACE(
"read obj 0x%08x, bytes %d/%d\n", (
unsigned int)
this, bufu, count);
534 assert(!count || buf);
535 if (!
isok() || !buf || !count || stop_write)
return 0;
538 if (!outbuf_delayed_flush && !outbuf.
used())
540 wrote =
uwrite(buf, count);
542 buf = (
const unsigned char *)buf + wrote;
545 if (max_outbuf_size != 0)
547 size_t canbuffer = max_outbuf_size - outbuf.
used();
548 if (count > canbuffer)
553 outbuf.put(buf, count);
592 return isok() &&
select(0,
true,
false,
false);
598 return !stop_write &&
isok() &&
select(0,
false,
true,
false);
605 assert(separator >= 0);
606 assert(separator <= 255);
612 timeout_time = msecadd(wvtime(), wait_msec);
624 if (inbuf.
strchr(separator) > 0)
635 wait_msec = msecdiff(timeout_time, wvtime());
646 hasdata =
select(wait_msec,
true,
false);
655 unsigned char *buf = tmp.
alloc(readahead);
657 size_t len =
uread(buf, readahead);
659 inbuf.put(tmp.
get(len), len);
666 if (!hasdata && wait_msec == 0)
674 i = inbuf.
strchr(separator);
677 assert(eol && *eol == separator);
679 return const_cast<char*
>((
const char *)inbuf.
get(i));
684 inbuf.
alloc(1)[0] = 0;
685 return const_cast<char *
>((
const char *)inbuf.
get(inbuf.
used()));
693 assert(
false &&
"not implemented, come back later!");
703 read(buf,
sizeof(buf));
709 if (is_flushing)
return false;
711 TRACE(
"%p flush starts\n",
this);
714 want_to_flush =
true;
715 bool done = flush_internal(msec_timeout)
716 && flush_outbuf(msec_timeout);
719 TRACE(
"flush stops (%d)\n", done);
726 return want_to_flush;
730 bool WvStream::flush_outbuf(time_t msec_timeout)
732 TRACE(
"%p flush_outbuf starts (isok=%d)\n",
this,
isok());
733 bool outbuf_was_used = outbuf.
used();
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
747 while (outbuf_was_used &&
isok())
753 size_t real =
uwrite(outbuf.
get(attempt), attempt);
758 if (
isok() && real < attempt)
760 TRACE(
"flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.
ungettable() >= attempt - real);
762 outbuf.
unget(attempt - real);
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !
select(msec_timeout,
false,
true)))
775 outbuf_was_used = outbuf.
used();
779 if (autoclose_time &&
isok())
781 time_t now = time(NULL);
782 TRACE(
"Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.
used());
784 if ((flush_internal(0) && !outbuf.
used()) || now > autoclose_time)
791 TRACE(
"flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush =
false;
795 TRACE(
"flush_outbuf: now isok=%d\n",
isok());
798 if (outbuf_was_used && !
isok())
802 TRACE(
"flush_outbuf stops\n");
804 return !outbuf_was_used;
808 bool WvStream::flush_internal(time_t msec_timeout)
815 int WvStream::getrfd()
const
821 int WvStream::getwfd()
const
829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
832 TRACE(
"Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.
used(), autoclose_time - now);
850 if (!
isok() || (!si.inherit_request && alarmleft == 0))
856 if (!si.inherit_request)
858 si.wants.readable |=
static_cast<bool>(readcb);
859 si.wants.writable |=
static_cast<bool>(writecb);
860 si.wants.isexception |=
static_cast<bool>(exceptcb);
864 if (si.wants.readable && inbuf.
used() && inbuf.
used() >= queue_min)
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
877 if (!si.inherit_request)
879 si.wants.readable |=
static_cast<bool>(readcb);
880 si.wants.writable |=
static_cast<bool>(writecb);
881 si.wants.isexception |=
static_cast<bool>(exceptcb);
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.
used() && inbuf.
used() >= queue_min)
901 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902 bool readable,
bool writable,
bool isexcept,
bool forceable)
910 si.wants.readable =
static_cast<bool>(readcb);
911 si.wants.writable =
static_cast<bool>(writecb);
912 si.wants.isexception =
static_cast<bool>(exceptcb);
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure =
false;
929 if (globalstream && forceable && (globalstream !=
this))
933 s->
xpre_select(si, SelectRequest(
false,
false,
false));
939 int WvStream::_do_select(SelectInfo &si)
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
953 int sel =
::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
962 && errno != EAGAIN && errno != EINTR
972 TRACE(
"select() returned %d\n", sel);
977 bool WvStream::_process_selectinfo(SelectInfo &si,
bool forceable)
984 wvstime_sync_forward();
987 if (globalstream && forceable && (globalstream !=
this))
991 si.global_sure = s->
xpost_select(si, SelectRequest(
false,
false,
false))
999 bool WvStream::_select(time_t msec_timeout,
bool readable,
bool writable,
1000 bool isexcept,
bool forceable)
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1010 int sel = _do_select(si);
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream !=
this))
1022 return IWvStream::SelectRequest(
static_cast<bool>(readcb),
static_cast<bool>(writecb),
static_cast<bool>(exceptcb));
1029 readcb = wv::bind(&WvStream::legacy_callback,
this);
1031 writecb = wv::bind(&WvStream::legacy_callback,
this);
1033 exceptcb = wv::bind(&WvStream::legacy_callback,
this);
1050 if (msec_timeout >= 0)
1051 alarm_time = msecadd(wvstime(), msec_timeout);
1053 alarm_time = wvtime_zero;
1059 if (alarm_time.tv_sec)
1064 if (now < last_alarm_check)
1066 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
1068 if (msecdiff(last_alarm_check, now) > 200)
1069 fprintf(stderr,
" ************* TIME WENT BACKWARDS! "
1070 "(%ld:%ld %ld:%ld)\n",
1071 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1072 now.tv_sec, now.tv_usec);
1074 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1077 last_alarm_check = now;
1079 time_t remaining = msecdiff(alarm_time, now);
1096 if (msec_timeout >= 0)
1097 alarm(msec_timeout);
1099 alarm(msec_timeout);
1109 TRACE(
"hello-%p\n",
this);
1111 static_cast<bool>(readcb),
1112 static_cast<bool>(writecb),
1113 static_cast<bool>(exceptcb));
1132 callfunc = _callfunc;
1137 void WvStream::legacy_callback()
1147 IWvStreamCallback tmp = readcb;
1157 IWvStreamCallback tmp = writecb;
1167 IWvStreamCallback tmp = exceptcb;
1177 IWvStreamCallback tmp = closecb;
1194 tmp.
merge(unreadbuf, count);
1201 IWvStream *WvStream::find_by_wsid(WSID wsid)
1207 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1209 if (it != wsid_map->end())
1210 retval = it->second;