WvStreams
wvhttpstream.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
6  *
7  * See wvhttppool.h.
8  */
9 #include "wvhttppool.h"
10 #include "wvtcp.h"
11 #include "wvsslstream.h"
12 #include "wvbuf.h"
13 #include "wvbase64.h"
14 #include "strutils.h"
15 #ifdef HAVE_EXECINFO_H
16 #include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
17 #endif
18 
19 #ifdef _WIN32
20 #define ETIMEDOUT WSAETIMEDOUT
21 #endif
22 
23 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
24  bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
25  : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
26  pipeline_incompatible(_pipeline_incompatible),
27  in_doneurl(false)
28 {
29  log("Opening server connection.\n");
30  http_response = "";
31  encoding = Unknown;
32  bytes_remaining = 0;
33  in_chunk_trailer = false;
34  pipeline_test_count = 0;
35  last_was_pipeline_test = false;
36 
37  enable_pipelining = global_enable_pipelining
38  && !pipeline_incompatible[target.remaddr];
39  ssl = _ssl;
40 
41  if (ssl)
42  cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
43 
44  sent_url_request = false;
45 
46  alarm(60000); // timeout if no connection, or something goes wrong
47 }
48 
49 
50 WvHttpStream::~WvHttpStream()
51 {
52  log(WvLog::Debug2, "Deleting.\n");
53 
54 #if 0
55 #ifdef HAVE_EXECINFO_H
56  void* trace[10];
57  int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
58  char** tracedump = backtrace_symbols(trace, count);
59  log(WvLog::Debug, "TRACE");
60  for (int i = 0; i < count; ++i)
61  log(WvLog::Debug, ":%s", tracedump[i]);
62  log(WvLog::Debug, "\n");
63  free(tracedump);
64 #endif
65 #endif
66 
67  if (geterr())
68  log("Error was: %s\n", errstr());
69  close();
70 }
71 
72 
74 {
75  log("close called\n");
76 
77 #if 0
78 #ifdef HAVE_EXECINFO_H
79  void *trace[10];
80  int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
81  char** tracedump = backtrace_symbols(trace, count);
82  log(WvLog::Debug, "TRACE");
83  for (int i = 0; i < count; ++i)
84  log(WvLog::Debug, ":%s", tracedump[i]);
85  log(WvLog::Debug, "\n");
86  free(tracedump);
87 #endif
88 #endif
89 
90  // assume pipelining is broken if we're closing without doing at least
91  // one successful pipelining test and a following non-test request.
92  if (enable_pipelining && max_requests > 1
93  && (pipeline_test_count < 1
94  || (pipeline_test_count == 1 && last_was_pipeline_test)))
95  pipelining_is_broken(2);
96 
97  if (isok())
98  log("Closing.\n");
100 
101  if (geterr())
102  {
103  // if there was an error, count the first URL as done. This prevents
104  // retrying indefinitely.
105  WvUrlRequest *msgurl = curl;
106  if (!msgurl && !urls.isempty())
107  msgurl = urls.first();
108  if (!msgurl && !waiting_urls.isempty())
109  msgurl = waiting_urls.first();
110 
111  if (msgurl)
112  {
113  log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
114  errstr());
115  curl = msgurl;
116  doneurl();
117  }
118  }
119  waiting_urls.zap();
120  if (curl)
121  {
122  log("curl is %s\n", curl->url);
123  doneurl();
124  }
125  log("close done\n");
126 }
127 
128 
129 void WvHttpStream::doneurl()
130 {
131  // There is a slight chance that we might receive an error during or just before
132  // this function is called, which means that the write occuring during
133  // start_pipeline_test() would be called, which would call close() because of the
134  // error, which would call doneurl() again. We don't want to execute doneurl()
135  // a second time when we're already in the middle.
136  if (in_doneurl)
137  return;
138  in_doneurl = true;
139 
140  assert(curl != NULL);
141  WvString last_response(http_response);
142  log("Done URL: %s\n", curl->url);
143 
144  http_response = "";
145  encoding = Unknown;
146  in_chunk_trailer = false;
147  bytes_remaining = 0;
148 
149  last_was_pipeline_test = curl->pipeline_test;
150  bool broken = false;
151  if (last_was_pipeline_test)
152  {
153  pipeline_test_count++;
154  if (pipeline_test_count == 1)
155  start_pipeline_test(&curl->url);
156  else if (pipeline_test_response != last_response)
157  {
158  // getting a bit late in the game to be detecting brokenness :(
159  // However, if the response code isn't the same for both tests,
160  // something's definitely screwy.
161  pipelining_is_broken(4);
162  broken = true;
163  }
164  pipeline_test_response = last_response;
165  }
166 
167  assert(curl == urls.first());
168  curl->done();
169  curl = NULL;
170  sent_url_request = false;
171  urls.unlink_first();
172 
173  if (broken)
174  close();
175 
176  request_next();
177  in_doneurl = false;
178 }
179 
180 
181 static WvString encode64(WvStringParm user, WvStringParm password)
182 {
183  WvBase64Encoder encoder;
184  WvString ret;
185  encoder.flushstrstr(WvString("%s:%s", user, password), ret);
186  return ret;
187 }
188 
189 
190 static WvString fixnl(WvStringParm nonl)
191 {
192  WvDynBuf b;
193  const char *cptr;
194 
195  for (cptr = nonl; cptr && *cptr; cptr++)
196  {
197  if (*cptr == '\r')
198  continue;
199  else if (*cptr == '\n')
200  b.put("\r", 1); // put BOTH \r and \n
201  b.put(cptr, 1);
202  }
203 
204  return b.getstr();
205 }
206 
207 
208 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
209 {
210  WvString request;
211  WvString auth("");
212  if(!!url->url.getuser() && !!url->url.getpassword())
213  auth = WvString("Authorization: Basic %s\n",
214  encode64(url->url.getuser(), url->url.getpassword()));
215 
216  request = fixnl(
217  WvString(
218  "%s %s HTTP/1.1\n"
219  "Host: %s:%s\n"
220  "Connection: %s\n"
221  "%s"
222  "%s"
223  "%s%s"
224  "\n",
225  url->method,
226  url->url.getfile(),
227  url->url.gethost(), url->url.getport(),
228  keepalive ? "keep-alive" : "close",
229  auth,
230  (putstream_data.used() > 0 ? WvString(
231  "Content-Length: %s\n", putstream_data.used()) : ""),
232  trim_string(url->headers.edit()),
233  !!url->headers ? "\n" : ""));
234  return request;
235 }
236 
237 
238 void WvHttpStream::send_request(WvUrlRequest *url)
239 {
240  request_count++;
241  log("Request #%s: %s\n", request_count, url->url);
242  write(request_str(url, url->pipeline_test
243  || request_count < max_requests));
244  write(putstream_data);
245  sent_url_request = true;
246  alarm(60000);
247 }
248 
249 
250 void WvHttpStream::start_pipeline_test(WvUrl *url)
251 {
252  WvUrl location(WvString(
253  "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
254  url->getproto(), url->gethost(), url->getport()));
255  WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
256  false, true);
257  testurl->instream = this;
258  send_request(testurl);
259  urls.append(testurl, true, "sent_running_url");
260 }
261 
262 
263 void WvHttpStream::request_next()
264 {
265  // Clear the putstream buffer before we start any new requests
266  putstream_data.zap();
267 
268  // don't do a request if we've done too many already or we have none
269  // waiting.
270  if (request_count >= max_requests || waiting_urls.isempty())
271  return;
272 
273  // don't do more than one request at a time if we're not pipelining.
274  if (!enable_pipelining && !urls.isempty())
275  return;
276 
277  // okay then, we really do want to send a new request.
278  WvUrlRequest *url = waiting_urls.first();
279 
280  waiting_urls.unlink_first();
281  if (!url->putstream)
282  {
283  if (enable_pipelining && !request_count && max_requests > 1)
284  start_pipeline_test(&url->url);
285  send_request(url);
286  }
287  urls.append(url, false, "sent_running_url");
288 }
289 
290 
291 void WvHttpStream::pipelining_is_broken(int why)
292 {
293  if (!pipeline_incompatible[target.remaddr])
294  {
295  pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
296  log("Pipelining is broken on this server (%s)! Disabling.\n", why);
297  }
298 }
299 
300 
302 {
303  SelectRequest oldwant = si.wants;
304  WvUrlRequest *url;
305 
307 
308  if (!urls.isempty())
309  {
310  url = urls.first();
311  if(url && url->putstream)
312  url->putstream->pre_select(si);
313  }
314 
315  si.wants = oldwant;
316 }
317 
318 
320 {
321  SelectRequest oldwant = si.wants;
322  WvUrlRequest *url;
323 
324  if (WvUrlStream::post_select(si))
325  return true;
326 
327  if (!urls.isempty())
328  {
329  url = urls.first();
330  if(url && url->putstream && url->putstream->post_select(si))
331  return true;
332  }
333 
334  si.wants = oldwant;
335  return false;
336 }
337 
338 
340 {
341  char buf[1024], *line;
342  size_t len;
343 
345 
346  // make connections timeout after some idleness
347  if (alarm_was_ticking)
348  {
349  log(WvLog::Debug4, "urls count: %s\n", urls.count());
350  if (!urls.isempty())
351  {
352  seterr(ETIMEDOUT);
353 
354  // Must check again here since seterr()
355  // will close our stream and if we only
356  // had one url then it'll be gone.
357  if (!urls.isempty())
358  {
359  WvUrlRequest *url = urls.first();
360  if (url->outstream)
361  url->outstream->seterr(ETIMEDOUT);
362  }
363  }
364  else
365  close(); // timed out, but not really an error
366  return;
367  }
368 
369  // Die if somebody closed our outstream. This is so that if we were
370  // downloading a really big file, they can stop it in the middle and
371  // our next url request can start downloading immediately.
372  if (curl && !curl->outstream)
373  {
374  if (!(encoding == PostHeadInfinity
375  || encoding == PostHeadChunked
376  || encoding == PostHeadStream))
377  {
378  // don't complain about pipelining failures
379  pipeline_test_count++;
380  last_was_pipeline_test = false;
381  close();
382  }
383 
384  if (curl)
385  doneurl();
386  return;
387  }
388  else if (curl)
389  curl->inuse = true;
390 
391  if(!sent_url_request && !urls.isempty())
392  {
393  WvUrlRequest *url = urls.first();
394  if(url)
395  {
396  if(url->putstream)
397  {
398  int len = 0;
399  if(url->putstream->isok())
400  len = url->putstream->read(putstream_data, 1024);
401 
402  if(!url->putstream->isok() || len == 0)
403  {
404  url->putstream = NULL;
405  send_request(url);
406  }
407  }
408  }
409  }
410 
411  if (!curl)
412  {
413  // in the header section
414  line = getline();
415  if (line)
416  {
417  line = trim_string(line);
418  log(WvLog::Debug4, "Header: '%s'\n", line);
419  if (!http_response)
420  {
421  http_response = line;
422 
423  // there are never two pipeline test requests in a row, so
424  // a second response string exactly like the pipeline test
425  // response implies that everything between the first and
426  // second test requests was lost: bad!
427  if (last_was_pipeline_test
428  && http_response == pipeline_test_response)
429  {
430  pipelining_is_broken(1);
431  close();
432  return;
433  }
434 
435  // http response #400 is "invalid request", which we
436  // shouldn't be sending. If we get one of these right after
437  // a test, it probably means the stuff that came after it
438  // was mangled in some way during transmission ...and we
439  // should throw it away.
440  if (last_was_pipeline_test && !!http_response)
441  {
442  const char *cptr = strchr(http_response, ' ');
443  if (cptr && atoi(cptr+1) == 400)
444  {
445  pipelining_is_broken(3);
446  close();
447  return;
448  }
449  }
450  }
451 
452  if (urls.isempty())
453  {
454  log("got unsolicited data.\n");
455  seterr("unsolicited data from server!");
456  return;
457  }
458 
459  if (!strncasecmp(line, "Content-length: ", 16))
460  {
461  bytes_remaining = atoi(line+16);
462  encoding = ContentLength;
463  }
464  else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
465  && strstr(line+19, "chunked"))
466  {
467  encoding = Chunked;
468  }
469 
470  if (line[0])
471  {
472  char *p;
473  WvBufUrlStream *outstream = urls.first()->outstream;
474 
475  if ((p = strchr(line, ':')) != NULL)
476  {
477  *p = 0;
478  p = trim_string(p+1);
479  if (outstream) {
480  struct WvHTTPHeader *h;
481  h = new struct WvHTTPHeader(line, p);
482  outstream->headers.add(h, true);
483  }
484  }
485  else if (strncasecmp(line, "HTTP/", 5) == 0)
486  {
487  char *p = strchr(line, ' ');
488  if (p)
489  {
490  *p = 0;
491  if (outstream)
492  {
493  outstream->version = line+5;
494  outstream->status = atoi(p+1);
495  }
496  }
497  }
498  }
499  else
500  {
501  // blank line is the beginning of data section
502  curl = urls.first();
503  in_chunk_trailer = false;
504  log(WvLog::Debug4,
505  "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
506 
507  if (encoding == Unknown)
508  encoding = Infinity; // go until connection closes itself
509 
510  if (curl->method == "HEAD")
511  {
512  log("Got all headers.\n");
513  if (!enable_pipelining)
514  doneurl();
515 
516  if (encoding == Infinity)
517  encoding = PostHeadInfinity;
518  else if (encoding == Chunked)
519  encoding = PostHeadChunked;
520  else
521  encoding = PostHeadStream;
522  }
523  }
524  }
525  }
526  else if (encoding == PostHeadInfinity
527  || encoding == PostHeadChunked
528  || encoding == PostHeadStream)
529  {
530  WvDynBuf chkbuf;
531  len = read(chkbuf, 5);
532 
533  // If there is more data available right away, and it isn't an
534  // HTTP header from another request, then it's a stupid web
535  // server that likes to send bodies with HEAD requests.
536  if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
537  "HTTP/", 5))
538  {
539  if (encoding == PostHeadInfinity)
540  encoding = ChuckInfinity;
541  else if (encoding == PostHeadChunked)
542  encoding = ChuckChunked;
543  else if (encoding == PostHeadStream)
544  encoding = ChuckStream;
545  else
546  log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
547  }
548  else
549  doneurl();
550 
551  unread(chkbuf, len);
552  }
553  else if (encoding == ChuckInfinity)
554  {
555  len = read(buf, sizeof(buf));
556  if (len)
557  log(WvLog::Debug5, "Chucking %s bytes.\n", len);
558  if (!isok())
559  doneurl();
560  }
561  else if (encoding == ChuckChunked && !bytes_remaining)
562  {
563  encoding = Chunked;
564  }
565  else if (encoding == ChuckChunked || encoding == ChuckStream)
566  {
567  if (bytes_remaining > sizeof(buf))
568  len = read(buf, sizeof(buf));
569  else
570  len = read(buf, bytes_remaining);
571  bytes_remaining -= len;
572  if (len)
573  log(WvLog::Debug5,
574  "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
575  if (!bytes_remaining && encoding == ContentLength)
576  doneurl();
577  if (bytes_remaining && !isok())
578  seterr("connection interrupted");
579  }
580  else if (encoding == Chunked && !bytes_remaining)
581  {
582  line = getline();
583  if (line)
584  {
585  line = trim_string(line);
586 
587  if (in_chunk_trailer)
588  {
589  // in the trailer section of a chunked encoding
590  log(WvLog::Debug4, "Trailer: '%s'\n", line);
591 
592  // a blank line means we're finally done!
593  if (!line[0])
594  doneurl();
595  }
596  else
597  {
598  // in the "length line" section of a chunked encoding
599  if (line[0])
600  {
601  bytes_remaining = (size_t)strtoul(line, NULL, 16);
602  if (!bytes_remaining)
603  in_chunk_trailer = true;
604  log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
605  bytes_remaining, line);
606  }
607  }
608  }
609  }
610  else if (encoding == Infinity)
611  {
612  // just read data until the connection closes, and assume all was
613  // well. It sucks, but there's no way to tell if all the data arrived
614  // okay... that's why Chunked or ContentLength encoding is better.
615  len = read(buf, sizeof(buf));
616  if (!isok())
617  return;
618 
619  if (len)
620  log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
621  if (curl && curl->outstream)
622  curl->outstream->write(buf, len);
623 
624  if (!isok() && curl)
625  doneurl();
626  }
627  else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
628  {
629  // in the data section of a chunked or content-length encoding,
630  // with 'bytes_remaining' bytes of data left.
631 
632  if (bytes_remaining > sizeof(buf))
633  len = read(buf, sizeof(buf));
634  else
635  len = read(buf, bytes_remaining);
636  if (!isok())
637  return;
638 
639  bytes_remaining -= len;
640  if (len)
641  log(WvLog::Debug5,
642  "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
643  if (curl && curl->outstream)
644  curl->outstream->write(buf, len);
645 
646  if (!bytes_remaining && encoding == ContentLength && curl)
647  doneurl();
648 
649  if (bytes_remaining && !isok())
650  seterr("connection interrupted");
651 
652  if (!isok())
653  doneurl();
654  }
655 
656  if (urls.isempty())
657  alarm(5000); // just wait a few seconds before closing connection
658  else
659  alarm(60000); // give the server a minute to respond, if we're waiting
660 }
WvString::edit
char * edit()
make the string editable, and return a non-const (char*)
Definition: wvstring.h:397
WvStream::write
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
WvStreamClone::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvstreamclone.cc:272
WvStreamClone::close
virtual void close()
Close this stream.
Definition: wvstreamclone.cc:83
WvSSLStream
SSL Stream, handles SSLv2, SSLv3, and TLS Methods - If you want it to be a server,...
Definition: wvsslstream.h:35
WvBufBase< unsigned char >::getstr
WvString getstr()
Returns the entire buffer as a null-terminated WvString.
Definition: wvbuffer.cc:17
WvHttpStream::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvhttpstream.cc:319
WvStream::alarm_was_ticking
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition: wvstream.h:54
trim_string
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition: strutils.cc:59
WvBase64Encoder
A base 64 encoder.
Definition: wvbase64.h:20
WvStream::seterr
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
WvString
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
WvHttpStream::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvhttpstream.cc:339
IWvStream::SelectInfo
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50
WvStream::getline
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
WvIPPortAddr
An IP+Port address also includes a port number, with the resulting form www.xxx.yyy....
Definition: wvaddr.h:393
WvStream::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstream.cc:875
WvHttpStream::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvhttpstream.cc:301
WvStream::read
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
WvBufBaseCommonImpl::zap
void zap()
Clears the buffer.
Definition: wvbufbase.h:257
WvStream::unread
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition: wvstream.cc:1191
WvStream::alarm
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1048
WvBufUrlStream
Definition: wvhttppool.h:80
WvStream::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstream.cc:445
WvStreamClone::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvstreamclone.cc:199
WvEncoder::flushstrstr
bool flushstrstr(WvStringParm instr, WvString &outstr, bool finish=false)
Flushes data through the encoder from a string to a string.
Definition: wvencoder.cc:86
WvUrlStream
Definition: wvhttppool.h:104
WvStreamClone::geterr
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition: wvstreamclone.cc:149
IWvStream::SelectRequest
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition: iwvstream.h:34
WvStreamClone::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstreamclone.cc:136
WvDynBufBase< unsigned char >
WvFdStream
Base class for streams built on Unix file descriptors.
Definition: wvfdstream.h:20
WvUrl
Definition: wvurl.h:16
WvBufBaseCommonImpl::used
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
WvHTTPHeader
Definition: wvhttppool.h:30
WvBufBaseCommonImpl::peek
const T * peek(int offset, size_t count)
Returns a const pointer into the buffer at the specified offset to the specified number of elements w...
Definition: wvbufbase.h:225
WvHttpStream::close
virtual void close()
Close this stream.
Definition: wvhttpstream.cc:73
WvStream::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvstream.cc:844
WvStreamClone::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvstreamclone.cc:222
WvUrlRequest
Definition: wvhttppool.h:43