WvStreams
wvhttpstream.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
6 *
7 * See wvhttppool.h.
8 */
9#include "wvhttppool.h"
10#include "wvtcp.h"
11#include "wvsslstream.h"
12#include "wvbuf.h"
13#include "wvbase64.h"
14#include "strutils.h"
15#ifdef HAVE_EXECINFO_H
16#include <execinfo.h> // FIXME: add a WvCrash feature for explicit dumps
17#endif
18
19#ifdef _WIN32
20#define ETIMEDOUT WSAETIMEDOUT
21#endif
22
23WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
24 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
25 : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
26 pipeline_incompatible(_pipeline_incompatible),
27 in_doneurl(false)
28{
29 log("Opening server connection.\n");
30 http_response = "";
31 encoding = Unknown;
32 bytes_remaining = 0;
33 in_chunk_trailer = false;
34 pipeline_test_count = 0;
35 last_was_pipeline_test = false;
36
37 enable_pipelining = global_enable_pipelining
38 && !pipeline_incompatible[target.remaddr];
39 ssl = _ssl;
40
41 if (ssl)
42 cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
43
44 sent_url_request = false;
45
46 alarm(60000); // timeout if no connection, or something goes wrong
47}
48
49
50WvHttpStream::~WvHttpStream()
51{
52 log(WvLog::Debug2, "Deleting.\n");
53
54#if 0
55#ifdef HAVE_EXECINFO_H
56 void* trace[10];
57 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
58 char** tracedump = backtrace_symbols(trace, count);
59 log(WvLog::Debug, "TRACE");
60 for (int i = 0; i < count; ++i)
61 log(WvLog::Debug, ":%s", tracedump[i]);
62 log(WvLog::Debug, "\n");
63 free(tracedump);
64#endif
65#endif
66
67 if (geterr())
68 log("Error was: %s\n", errstr());
69 close();
70}
71
72
74{
75 log("close called\n");
76
77#if 0
78#ifdef HAVE_EXECINFO_H
79 void *trace[10];
80 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
81 char** tracedump = backtrace_symbols(trace, count);
82 log(WvLog::Debug, "TRACE");
83 for (int i = 0; i < count; ++i)
84 log(WvLog::Debug, ":%s", tracedump[i]);
85 log(WvLog::Debug, "\n");
86 free(tracedump);
87#endif
88#endif
89
90 // assume pipelining is broken if we're closing without doing at least
91 // one successful pipelining test and a following non-test request.
92 if (enable_pipelining && max_requests > 1
93 && (pipeline_test_count < 1
94 || (pipeline_test_count == 1 && last_was_pipeline_test)))
95 pipelining_is_broken(2);
96
97 if (isok())
98 log("Closing.\n");
100
101 if (geterr())
102 {
103 // if there was an error, count the first URL as done. This prevents
104 // retrying indefinitely.
105 WvUrlRequest *msgurl = curl;
106 if (!msgurl && !urls.isempty())
107 msgurl = urls.first();
108 if (!msgurl && !waiting_urls.isempty())
109 msgurl = waiting_urls.first();
110
111 if (msgurl)
112 {
113 log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
114 errstr());
115 curl = msgurl;
116 doneurl();
117 }
118 }
119 waiting_urls.zap();
120 if (curl)
121 {
122 log("curl is %s\n", curl->url);
123 doneurl();
124 }
125 log("close done\n");
126}
127
128
129void WvHttpStream::doneurl()
130{
131 // There is a slight chance that we might receive an error during or just before
132 // this function is called, which means that the write occuring during
133 // start_pipeline_test() would be called, which would call close() because of the
134 // error, which would call doneurl() again. We don't want to execute doneurl()
135 // a second time when we're already in the middle.
136 if (in_doneurl)
137 return;
138 in_doneurl = true;
139
140 assert(curl != NULL);
141 WvString last_response(http_response);
142 log("Done URL: %s\n", curl->url);
143
144 http_response = "";
145 encoding = Unknown;
146 in_chunk_trailer = false;
147 bytes_remaining = 0;
148
149 last_was_pipeline_test = curl->pipeline_test;
150 bool broken = false;
151 if (last_was_pipeline_test)
152 {
153 pipeline_test_count++;
154 if (pipeline_test_count == 1)
155 start_pipeline_test(&curl->url);
156 else if (pipeline_test_response != last_response)
157 {
158 // getting a bit late in the game to be detecting brokenness :(
159 // However, if the response code isn't the same for both tests,
160 // something's definitely screwy.
161 pipelining_is_broken(4);
162 broken = true;
163 }
164 pipeline_test_response = last_response;
165 }
166
167 assert(curl == urls.first());
168 curl->done();
169 curl = NULL;
170 sent_url_request = false;
171 urls.unlink_first();
172
173 if (broken)
174 close();
175
176 request_next();
177 in_doneurl = false;
178}
179
180
181static WvString encode64(WvStringParm user, WvStringParm password)
182{
183 WvBase64Encoder encoder;
184 WvString ret;
185 encoder.flushstrstr(WvString("%s:%s", user, password), ret);
186 return ret;
187}
188
189
190static WvString fixnl(WvStringParm nonl)
191{
192 WvDynBuf b;
193 const char *cptr;
194
195 for (cptr = nonl; cptr && *cptr; cptr++)
196 {
197 if (*cptr == '\r')
198 continue;
199 else if (*cptr == '\n')
200 b.put("\r", 1); // put BOTH \r and \n
201 b.put(cptr, 1);
202 }
203
204 return b.getstr();
205}
206
207
208WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
209{
210 WvString request;
211 WvString auth("");
212 if(!!url->url.getuser() && !!url->url.getpassword())
213 auth = WvString("Authorization: Basic %s\n",
214 encode64(url->url.getuser(), url->url.getpassword()));
215
216 request = fixnl(
217 WvString(
218 "%s %s HTTP/1.1\n"
219 "Host: %s:%s\n"
220 "Connection: %s\n"
221 "%s"
222 "%s"
223 "%s%s"
224 "\n",
225 url->method,
226 url->url.getfile(),
227 url->url.gethost(), url->url.getport(),
228 keepalive ? "keep-alive" : "close",
229 auth,
230 (putstream_data.used() > 0 ? WvString(
231 "Content-Length: %s\n", putstream_data.used()) : ""),
232 trim_string(url->headers.edit()),
233 !!url->headers ? "\n" : ""));
234 return request;
235}
236
237
238void WvHttpStream::send_request(WvUrlRequest *url)
239{
240 request_count++;
241 log("Request #%s: %s\n", request_count, url->url);
242 write(request_str(url, url->pipeline_test
243 || request_count < max_requests));
244 write(putstream_data);
245 sent_url_request = true;
246 alarm(60000);
247}
248
249
250void WvHttpStream::start_pipeline_test(WvUrl *url)
251{
252 WvUrl location(WvString(
253 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
254 url->getproto(), url->gethost(), url->getport()));
255 WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
256 false, true);
257 testurl->instream = this;
258 send_request(testurl);
259 urls.append(testurl, true, "sent_running_url");
260}
261
262
263void WvHttpStream::request_next()
264{
265 // Clear the putstream buffer before we start any new requests
266 putstream_data.zap();
267
268 // don't do a request if we've done too many already or we have none
269 // waiting.
270 if (request_count >= max_requests || waiting_urls.isempty())
271 return;
272
273 // don't do more than one request at a time if we're not pipelining.
274 if (!enable_pipelining && !urls.isempty())
275 return;
276
277 // okay then, we really do want to send a new request.
278 WvUrlRequest *url = waiting_urls.first();
279
280 waiting_urls.unlink_first();
281 if (!url->putstream)
282 {
283 if (enable_pipelining && !request_count && max_requests > 1)
284 start_pipeline_test(&url->url);
285 send_request(url);
286 }
287 urls.append(url, false, "sent_running_url");
288}
289
290
291void WvHttpStream::pipelining_is_broken(int why)
292{
293 if (!pipeline_incompatible[target.remaddr])
294 {
295 pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
296 log("Pipelining is broken on this server (%s)! Disabling.\n", why);
297 }
298}
299
300
302{
303 SelectRequest oldwant = si.wants;
304 WvUrlRequest *url;
305
307
308 if (!urls.isempty())
309 {
310 url = urls.first();
311 if(url && url->putstream)
312 url->putstream->pre_select(si);
313 }
314
315 si.wants = oldwant;
316}
317
318
320{
321 SelectRequest oldwant = si.wants;
322 WvUrlRequest *url;
323
325 return true;
326
327 if (!urls.isempty())
328 {
329 url = urls.first();
330 if(url && url->putstream && url->putstream->post_select(si))
331 return true;
332 }
333
334 si.wants = oldwant;
335 return false;
336}
337
338
340{
341 char buf[1024], *line;
342 size_t len;
343
345
346 // make connections timeout after some idleness
348 {
349 log(WvLog::Debug4, "urls count: %s\n", urls.count());
350 if (!urls.isempty())
351 {
352 seterr(ETIMEDOUT);
353
354 // Must check again here since seterr()
355 // will close our stream and if we only
356 // had one url then it'll be gone.
357 if (!urls.isempty())
358 {
359 WvUrlRequest *url = urls.first();
360 if (url->outstream)
361 url->outstream->seterr(ETIMEDOUT);
362 }
363 }
364 else
365 close(); // timed out, but not really an error
366 return;
367 }
368
369 // Die if somebody closed our outstream. This is so that if we were
370 // downloading a really big file, they can stop it in the middle and
371 // our next url request can start downloading immediately.
372 if (curl && !curl->outstream)
373 {
374 if (!(encoding == PostHeadInfinity
375 || encoding == PostHeadChunked
376 || encoding == PostHeadStream))
377 {
378 // don't complain about pipelining failures
379 pipeline_test_count++;
380 last_was_pipeline_test = false;
381 close();
382 }
383
384 if (curl)
385 doneurl();
386 return;
387 }
388 else if (curl)
389 curl->inuse = true;
390
391 if(!sent_url_request && !urls.isempty())
392 {
393 WvUrlRequest *url = urls.first();
394 if(url)
395 {
396 if(url->putstream)
397 {
398 int len = 0;
399 if(url->putstream->isok())
400 len = url->putstream->read(putstream_data, 1024);
401
402 if(!url->putstream->isok() || len == 0)
403 {
404 url->putstream = NULL;
405 send_request(url);
406 }
407 }
408 }
409 }
410
411 if (!curl)
412 {
413 // in the header section
414 line = getline();
415 if (line)
416 {
417 line = trim_string(line);
418 log(WvLog::Debug4, "Header: '%s'\n", line);
419 if (!http_response)
420 {
421 http_response = line;
422
423 // there are never two pipeline test requests in a row, so
424 // a second response string exactly like the pipeline test
425 // response implies that everything between the first and
426 // second test requests was lost: bad!
427 if (last_was_pipeline_test
428 && http_response == pipeline_test_response)
429 {
430 pipelining_is_broken(1);
431 close();
432 return;
433 }
434
435 // http response #400 is "invalid request", which we
436 // shouldn't be sending. If we get one of these right after
437 // a test, it probably means the stuff that came after it
438 // was mangled in some way during transmission ...and we
439 // should throw it away.
440 if (last_was_pipeline_test && !!http_response)
441 {
442 const char *cptr = strchr(http_response, ' ');
443 if (cptr && atoi(cptr+1) == 400)
444 {
445 pipelining_is_broken(3);
446 close();
447 return;
448 }
449 }
450 }
451
452 if (urls.isempty())
453 {
454 log("got unsolicited data.\n");
455 seterr("unsolicited data from server!");
456 return;
457 }
458
459 if (!strncasecmp(line, "Content-length: ", 16))
460 {
461 bytes_remaining = atoi(line+16);
462 encoding = ContentLength;
463 }
464 else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
465 && strstr(line+19, "chunked"))
466 {
467 encoding = Chunked;
468 }
469
470 if (line[0])
471 {
472 char *p;
473 WvBufUrlStream *outstream = urls.first()->outstream;
474
475 if ((p = strchr(line, ':')) != NULL)
476 {
477 *p = 0;
478 p = trim_string(p+1);
479 if (outstream) {
480 struct WvHTTPHeader *h;
481 h = new struct WvHTTPHeader(line, p);
482 outstream->headers.add(h, true);
483 }
484 }
485 else if (strncasecmp(line, "HTTP/", 5) == 0)
486 {
487 char *p = strchr(line, ' ');
488 if (p)
489 {
490 *p = 0;
491 if (outstream)
492 {
493 outstream->version = line+5;
494 outstream->status = atoi(p+1);
495 }
496 }
497 }
498 }
499 else
500 {
501 // blank line is the beginning of data section
502 curl = urls.first();
503 in_chunk_trailer = false;
504 log(WvLog::Debug4,
505 "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
506
507 if (encoding == Unknown)
508 encoding = Infinity; // go until connection closes itself
509
510 if (curl->method == "HEAD")
511 {
512 log("Got all headers.\n");
513 if (!enable_pipelining)
514 doneurl();
515
516 if (encoding == Infinity)
517 encoding = PostHeadInfinity;
518 else if (encoding == Chunked)
519 encoding = PostHeadChunked;
520 else
521 encoding = PostHeadStream;
522 }
523 }
524 }
525 }
526 else if (encoding == PostHeadInfinity
527 || encoding == PostHeadChunked
528 || encoding == PostHeadStream)
529 {
530 WvDynBuf chkbuf;
531 len = read(chkbuf, 5);
532
533 // If there is more data available right away, and it isn't an
534 // HTTP header from another request, then it's a stupid web
535 // server that likes to send bodies with HEAD requests.
536 if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
537 "HTTP/", 5))
538 {
539 if (encoding == PostHeadInfinity)
540 encoding = ChuckInfinity;
541 else if (encoding == PostHeadChunked)
542 encoding = ChuckChunked;
543 else if (encoding == PostHeadStream)
544 encoding = ChuckStream;
545 else
546 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
547 }
548 else
549 doneurl();
550
551 unread(chkbuf, len);
552 }
553 else if (encoding == ChuckInfinity)
554 {
555 len = read(buf, sizeof(buf));
556 if (len)
557 log(WvLog::Debug5, "Chucking %s bytes.\n", len);
558 if (!isok())
559 doneurl();
560 }
561 else if (encoding == ChuckChunked && !bytes_remaining)
562 {
563 encoding = Chunked;
564 }
565 else if (encoding == ChuckChunked || encoding == ChuckStream)
566 {
567 if (bytes_remaining > sizeof(buf))
568 len = read(buf, sizeof(buf));
569 else
570 len = read(buf, bytes_remaining);
571 bytes_remaining -= len;
572 if (len)
573 log(WvLog::Debug5,
574 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
575 if (!bytes_remaining && encoding == ContentLength)
576 doneurl();
577 if (bytes_remaining && !isok())
578 seterr("connection interrupted");
579 }
580 else if (encoding == Chunked && !bytes_remaining)
581 {
582 line = getline();
583 if (line)
584 {
585 line = trim_string(line);
586
587 if (in_chunk_trailer)
588 {
589 // in the trailer section of a chunked encoding
590 log(WvLog::Debug4, "Trailer: '%s'\n", line);
591
592 // a blank line means we're finally done!
593 if (!line[0])
594 doneurl();
595 }
596 else
597 {
598 // in the "length line" section of a chunked encoding
599 if (line[0])
600 {
601 bytes_remaining = (size_t)strtoul(line, NULL, 16);
602 if (!bytes_remaining)
603 in_chunk_trailer = true;
604 log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
605 bytes_remaining, line);
606 }
607 }
608 }
609 }
610 else if (encoding == Infinity)
611 {
612 // just read data until the connection closes, and assume all was
613 // well. It sucks, but there's no way to tell if all the data arrived
614 // okay... that's why Chunked or ContentLength encoding is better.
615 len = read(buf, sizeof(buf));
616 if (!isok())
617 return;
618
619 if (len)
620 log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
621 if (curl && curl->outstream)
622 curl->outstream->write(buf, len);
623
624 if (!isok() && curl)
625 doneurl();
626 }
627 else // not chunked or currently in a chunk - read 'bytes_remaining' bytes.
628 {
629 // in the data section of a chunked or content-length encoding,
630 // with 'bytes_remaining' bytes of data left.
631
632 if (bytes_remaining > sizeof(buf))
633 len = read(buf, sizeof(buf));
634 else
635 len = read(buf, bytes_remaining);
636 if (!isok())
637 return;
638
639 bytes_remaining -= len;
640 if (len)
641 log(WvLog::Debug5,
642 "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
643 if (curl && curl->outstream)
644 curl->outstream->write(buf, len);
645
646 if (!bytes_remaining && encoding == ContentLength && curl)
647 doneurl();
648
649 if (bytes_remaining && !isok())
650 seterr("connection interrupted");
651
652 if (!isok())
653 doneurl();
654 }
655
656 if (urls.isempty())
657 alarm(5000); // just wait a few seconds before closing connection
658 else
659 alarm(60000); // give the server a minute to respond, if we're waiting
660}
void put(const T *data, size_t count)
Writes the specified number of elements from the specified storage location into the buffer at its ta...
const T * peek(int offset, size_t count)
Returns a const pointer into the buffer at the specified offset to the specified number of elements w...
bool flushstrstr(WvStringParm instr, WvString &outstr, bool finish=false)
Flushes data through the encoder from a string to a string.
Definition wvencoder.cc:86
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Base class for streams built on Unix file descriptors.
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
virtual void close()
Close this stream.
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
An IP+Port address also includes a port number, with the resulting form www.xxx.yyy....
SSL Stream, handles SSLv2, SSLv3, and TLS Methods - If you want it to be a server,...
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
virtual void close()
Close this stream.
virtual bool isok() const
return true if the stream is actually usable right now
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
Definition wvstream.cc:875
virtual bool isok() const
return true if the stream is actually usable right now
Definition wvstream.cc:445
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition wvstream.cc:1049
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
Definition wvstream.cc:844
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition wvstream.cc:490
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition wvstream.cc:451
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition wvstream.cc:1190
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
WvString is an implementation of a simple and efficient printable-string class.
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition strutils.cc:59
the data structure used by pre_select()/post_select() and internally by select().
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...