WvStreams
wvhttppool.cc
1 /*
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
6  *
7  * See wvhttppool.h.
8  */
9 #include <ctype.h>
10 #include <time.h>
11 #include "wvhttppool.h"
12 #include "wvbufstream.h"
13 #include "wvtcp.h"
14 #include "strutils.h"
15 
16 bool WvHttpStream::global_enable_pipelining = true;
17 int WvUrlStream::max_requests = 100;
18 
19 unsigned WvHash(const WvUrlStream::Target &n)
20 {
21  WvString key("%s%s", n.remaddr, n.username);
22  return (WvHash(key));
23 }
24 
25 
26 WvUrlRequest::WvUrlRequest(WvStringParm _url, WvStringParm _method,
27  WvStringParm _headers, WvStream *content_source,
28  bool _create_dirs, bool _pipeline_test)
29  : url(_url), headers(_headers)
30 {
31  instream = NULL;
32  create_dirs = _create_dirs;
33  pipeline_test = _pipeline_test;
34  method = _method;
35  is_dir = false; // for ftp primarily; set later
36 
37  if (pipeline_test)
38  {
39  outstream = NULL;
40  putstream = NULL;
41  }
42  else
43  {
45  outstream = x;
46  x->url = url;
47 
48  putstream = content_source;
49  }
50  inuse = false;
51 }
52 
53 
54 WvUrlRequest::~WvUrlRequest()
55 {
56  done();
57 }
58 
59 
60 void WvUrlRequest::done()
61 {
62  if (outstream)
63  {
64  outstream->seteof();
65  outstream = NULL;
66  }
67  if (putstream)
68  putstream = NULL;
69  inuse = false;
70 }
71 
72 
73 void WvUrlStream::addurl(WvUrlRequest *url)
74 {
75  log(WvLog::Debug4, "Adding a new url: '%s'\n", url->url);
76 
77  assert(url->outstream);
78 
79  if (!url->url.isok())
80  return;
81 
82  waiting_urls.append(url, false, "waiting_url");
83  request_next();
84 }
85 
86 
87 void WvUrlStream::delurl(WvUrlRequest *url)
88 {
89  log(WvLog::Debug4, "Removing an url: '%s'\n", url->url);
90 
91  if (url == curl)
92  doneurl();
93  waiting_urls.unlink(url);
94  urls.unlink(url);
95 }
96 
97 
98 WvHttpPool::WvHttpPool()
99  : log("HTTP Pool", WvLog::Debug), conns(10),
100  pipeline_incompatible(50)
101 {
102  log("Pool initializing.\n");
103  num_streams_created = 0;
104 }
105 
106 
107 WvHttpPool::~WvHttpPool()
108 {
109  log("Created %s individual session%s during this run.\n",
110  num_streams_created, num_streams_created == 1 ? "" : "s");
111  if (geterr())
112  log("Error was: %s\n", errstr());
113 
114  // these must get zapped before the URL list, since they have pointers
115  // to URLs.
116  zap();
117  conns.zap();
118 }
119 
120 
122 {
123  // log(WvLog::Debug5, "pre_select: main:%s conns:%s urls:%s\n",
124  // count(), conns.count(), urls.count());
125 
127 
128  WvUrlStreamDict::Iter ci(conns);
129  for (ci.rewind(); ci.next(); )
130  {
131  if (!ci->isok())
132  si.msec_timeout = 0;
133  }
134 
135  WvUrlRequestList::Iter i(urls);
136  for (i.rewind(); i.next(); )
137  {
138  if (!i->instream)
139  {
140  log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
141  if (i->url.resolve())
142  si.msec_timeout = 0;
143  else
144  dns.pre_select(i->url.gethost(), si);
145  }
146  }
147 }
148 
149 
151 {
152  bool sure = false;
153 
154  WvUrlStreamDict::Iter ci(conns);
155  for (ci.rewind(); ci.next(); )
156  {
157  if (!ci->isok())
158  {
159  log(WvLog::Debug4, "Selecting true because of a dead stream.\n");
160  unconnect(ci.ptr());
161  ci.rewind();
162  sure = true;
163  }
164  }
165 
166  WvUrlRequestList::Iter i(urls);
167  for (i.rewind(); i.next(); )
168  {
169  if ((!i->outstream && !i->inuse) || !i->url.isok())
170  {
171  //log("'%s' is dead: %s/%s\n",
172  // i->url, i->url.isok(), i.outstream->isok());
173  if (!i->url.isok())
174  {
175  log("URL not okay: '%s'\n", i->url);
176  i->done();
177  }
178  // nicely delete the url request
179  WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
180  WvUrlStream *s = conns[target];
181  if (s)
182  s->delurl(i.ptr());
183  i.xunlink();
184  continue;
185  }
186 
187  if (!i->instream)
188  {
189  log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
190  if (i->url.resolve() || dns.post_select(i->url.gethost(), si))
191  {
192  log(WvLog::Debug4, "Selecting true because of '%s'\n", i->url);
193  sure = true;
194  }
195  }
196  }
197 
198  return WvIStreamList::post_select(si) || sure;
199 }
200 
201 
203 {
205 
206  WvUrlRequestList::Iter i(urls);
207  for (i.rewind(); i.next(); )
208  {
209  WvUrlStream *s;
210 
211  if (!i->outstream || !i->url.isok() || !i->url.resolve())
212  continue; // skip it for now
213 
214  WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
215 
216  //log(WvLog::Info, "remaddr is %s; username is %s\n", target.remaddr,
217  // target.username);
218  s = conns[target];
219  //if (!s) log("conn for '%s' is not found.\n", ip);
220 
221  if (s && !s->isok())
222  {
223  unconnect(s);
224  s = NULL;
225  }
226 
227  if (!i->outstream)
228  continue; // unconnect might have caused this URL to be marked bad
229 
230  if (!s)
231  {
232  num_streams_created++;
233  if (!strncasecmp(i->url.getproto(), "http", 4))
234  s = new WvHttpStream(target.remaddr, target.username,
235  i->url.getproto() == "https",
236  pipeline_incompatible);
237  else if (!strcasecmp(i->url.getproto(), "ftp"))
238  s = new WvFtpStream(target.remaddr, target.username,
239  i->url.getpassword());
240  conns.add(s, true);
241 
242  // add it to the streamlist, so it can do things
243  append(s, false, "http/ftp stream");
244  }
245 
246  if (!i->instream)
247  {
248  s->addurl(i.ptr());
249  i->instream = s;
250  }
251  }
252 }
253 
254 
255 WvBufUrlStream *WvHttpPool::addurl(WvStringParm _url, WvStringParm _method,
256  WvStringParm _headers, WvStream *content_source, bool create_dirs)
257 {
258  log(WvLog::Debug4, "Adding a new url to pool: '%s'\n", _url);
259  WvUrlRequest *url = new WvUrlRequest(_url, _method, _headers, content_source,
260  create_dirs, false);
261  urls.append(url, true, "addurl");
262 
263  return url->outstream;
264 }
265 
266 
267 void WvHttpPool::unconnect(WvUrlStream *s)
268 {
269  if (!s->target.username)
270  log("Unconnecting stream to %s.\n", s->target.remaddr);
271  else
272  log("Unconnecting stream to %s@%s.\n", s->target.username,
273  s->target.remaddr);
274 
275  WvUrlRequestList::Iter i(urls);
276  for (i.rewind(); i.next(); )
277  {
278  if (i->instream == s)
279  i->instream = NULL;
280  }
281 
282  unlink(s);
283  conns.remove(s);
284 }
WvIStreamList::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvistreamlist.cc:141
WvHttpStream
Definition: wvhttppool.h:162
WvIStreamList::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvistreamlist.cc:205
WvResolver::pre_select
void pre_select(WvStringParm hostname, WvStream::SelectInfo &si)
add all of our waiting fds to an fd_set for use with select().
Definition: wvresolver.cc:316
WvErrorBase::geterr
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition: wverror.h:48
WvIStreamList::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvistreamlist.cc:89
WvResolver::post_select
bool post_select(WvStringParm hostname, WvStream::SelectInfo &si)
determines whether the resolving process is complete.
Definition: wvresolver.cc:331
WvUrlStream::Target
Definition: wvhttppool.h:107
WvString
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:329
WvLog
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv's.
Definition: wvlog.h:56
IWvStream::SelectInfo
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50
WvHttpPool::post_select
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvhttppool.cc:150
WvBufUrlStream
Definition: wvhttppool.h:80
WvStream
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition: wvstream.h:24
WvUrlStream
Definition: wvhttppool.h:104
WvFtpStream
Definition: wvhttppool.h:206
WvStreamClone::isok
virtual bool isok() const
return true if the stream is actually usable right now
Definition: wvstreamclone.cc:136
WvHttpPool::pre_select
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvhttppool.cc:121
WvHttpPool::execute
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvhttppool.cc:202
WvUrlRequest
Definition: wvhttppool.h:43