WvStreams
wvstream.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * Unified support for streams, that is, sequences of bytes that may or
6 * may not be ready for read/write at any given time.
7 *
8 * We provide typical read and write routines, as well as a select() function
9 * for each stream.
10 */
11#include <time.h>
12#include <sys/types.h>
13#include <assert.h>
14#define __WVSTREAM_UNIT_TEST 1
15#include "wvstream.h"
16#include "wvtimeutils.h"
17#include "wvcont.h"
18#include "wvstreamsdebugger.h"
19#include "wvstrutils.h"
20#include "wvistreamlist.h"
21#include "wvlinkerhack.h"
22#include "wvmoniker.h"
23
24#ifdef _WIN32
25#define ENOBUFS WSAENOBUFS
26#undef errno
27#define errno GetLastError()
28#ifdef __GNUC__
29#include <sys/socket.h>
30#endif
31#include "streams.h"
32#else
33#include <errno.h>
34#endif
35
36#include <map>
37
38using std::make_pair;
39using std::map;
40
41
42// enable this to add some read/write trace messages (this can be VERY
43// verbose)
44#if 0
45# ifndef _MSC_VER
46# define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
47# else
48# define TRACE printf
49# endif
50#else
51# ifndef _MSC_VER
52# define TRACE(x, y...)
53# else
54# define TRACE
55# endif
56#endif
57
58WvStream *WvStream::globalstream = NULL;
59
64
65
66static map<WSID, WvStream*> *wsid_map;
67static WSID next_wsid_to_try;
68
69
70WV_LINK(WvStream);
71
72static IWvStream *create_null(WvStringParm, IObject *)
73{
74 return new WvStream();
75}
76
77static WvMoniker<IWvStream> reg("null", create_null);
78
79
80IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
81{
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
83 if (!s)
84 {
85 s = new WvStream();
86 s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
87 WVRELEASE(obj); // we're not going to use it after all
88 }
89 return s;
90}
91
92
93static bool is_prefix_insensitive(const char *str, const char *prefix)
94{
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
97}
98
99
100static const char *strstr_insensitive(const char *haystack, const char *needle)
101{
102 while (*haystack != '\0')
103 {
104 if (is_prefix_insensitive(haystack, needle))
105 return haystack;
106 ++haystack;
107 }
108 return NULL;
109}
110
111
112static bool contains_insensitive(const char *haystack, const char *needle)
113{
114 return strstr_insensitive(haystack, needle) != NULL;
115}
116
117
118static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119static inline const char *Yes_No(bool val)
120{
121 return val? "Yes": "No";
122}
123
124
125void WvStream::debugger_streams_display_header(WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
127{
128 WvStringList result;
129 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
130 "----------------Type", "-", "Name--------------------");
131 result_cb(cmd, result);
132}
133
134
135// Set to fit in 6 chars
136static WvString friendly_ms(time_t ms)
137{
138 if (ms <= 0)
139 return WvString("(%s)", ms);
140 else if (ms < 1000)
141 return WvString("%sms", ms);
142 else if (ms < 60*1000)
143 return WvString("%ss", ms/1000);
144 else if (ms < 60*60*1000)
145 return WvString("%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString("%sh", ms/(60*60*1000));
148 else
149 return WvString("%sd", ms/(24*60*60*1000));
150}
151
152void WvStream::debugger_streams_display_one_stream(WvStream *s,
153 WvStringParm cmd,
154 WvStreamsDebugger::ResultCallback result_cb)
155{
156 WvStringList result;
157 s->addRef();
158 unsigned refcount = s->release();
159 result.append(list_format,
160 s->wsid(), " ",
161 refcount, " ",
162 Yes_No(s->isok()), " ",
163 Yes_No(s->uses_continue_select), " ",
164 friendly_ms(s->alarm_remaining()), " ",
165 s->wstype(), " ",
166 s->wsname());
167 result_cb(cmd, result);
168}
169
170
171void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
172 WvStringParm cmd,
173 const WvStringList &args,
174 WvStreamsDebugger::ResultCallback result_cb)
175{
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
179 {
180 WSID wsid;
181 bool is_num = wvstring_to_num(*arg, wsid);
182
183 if (is_num)
184 {
185 if (s->wsid() == wsid)
186 {
187 show = true;
188 break;
189 }
190 }
191 else
192 {
193 if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194 || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
195 {
196 show = true;
197 break;
198 }
199 }
200 }
201 if (show)
202 debugger_streams_display_one_stream(s, cmd, result_cb);
203}
204
205
206WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
207 WvStringList &args,
208 WvStreamsDebugger::ResultCallback result_cb, void *)
209{
210 debugger_streams_display_header(cmd, result_cb);
211 if (wsid_map)
212 {
213 map<WSID, WvStream*>::iterator it;
214
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
217 result_cb);
218 }
219
220 return WvString::null;
221}
222
223
224WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
225 WvStringList &args,
226 WvStreamsDebugger::ResultCallback result_cb, void *)
227{
228 if (args.isempty())
229 return WvString("Usage: %s <WSID>", cmd);
230 WSID wsid;
231 WvString wsid_str = args.popstr();
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString("Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
235 if (!s)
236 return WvString("No such stream");
237 s->close();
238 return WvString::null;
239}
240
241
242void WvStream::add_debugger_commands()
243{
244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
246}
247
248
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
255 stop_read(false),
256 stop_write(false),
257 closed(false),
258 readcb(wv::bind(&WvStream::legacy_callback, this)),
259 max_outbuf_size(0),
260 outbuf_delayed_flush(false),
261 is_auto_flush(true),
262 want_to_flush(true),
263 is_flushing(false),
264 queue_min(0),
265 autoclose_time(0),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
268{
269 TRACE("Creating wvstream %p\n", this);
270
271 static bool first = true;
272 if (first)
273 {
274 first = false;
275 WvStream::add_debugger_commands();
276 }
277
278 // Choose a wsid;
279 if (!wsid_map)
280 wsid_map = new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
282 do
283 {
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
285 break;
286 ++next_wsid_to_try;
287 } while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
290 assert(inserted);
291
292#ifdef _WIN32
293 WSAData wsaData;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
295 assert(result == 0);
296#endif
297}
298
299
300// FIXME: interfaces (IWvStream) shouldn't have implementations!
301IWvStream::IWvStream()
302{
303}
304
305
306IWvStream::~IWvStream()
307{
308}
309
310
311WvStream::~WvStream()
312{
313 TRACE("destroying %p\n", this);
314 close();
315
316 // if this assertion fails, then uses_continue_select is true, but you
317 // didn't call terminate_continue_select() or close() before destroying
318 // your object. Shame on you!
319 assert(!uses_continue_select || !call_ctx);
320
321 call_ctx = 0; // finish running the suspended callback, if any
322
323 assert(wsid_map);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
326 {
327 delete wsid_map;
328 wsid_map = NULL;
329 }
330
331 // eventually, streams will auto-add themselves to the globallist. But
332 // even before then, it'll never be useful for them to be on the
333 // globallist *after* they get destroyed, so we might as well auto-remove
334 // them already. It's harmless for people to try to remove them twice.
335 WvIStreamList::globallist.unlink(this);
336
337 TRACE("done destroying %p\n", this);
338}
339
340
342{
343 TRACE("flushing in wvstream...\n");
344 flush(2000); // fixme: should not hardcode this stuff
345 TRACE("(flushed)\n");
346
347 closed = true;
348
349 if (!!closecb)
350 {
351 IWvStreamCallback cb = closecb;
352 closecb = 0; // ensure callback is only called once
353 cb();
354 }
355
356 // I would like to delete call_ctx here, but then if someone calls
357 // close() from *inside* a continuable callback, we explode. Oops!
358 //call_ctx = 0; // destroy the context, if necessary
359}
360
361
363{
364 setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
366}
367
368
370{
371 setcallback(0);
373}
374
375
376void WvStream::autoforward_callback(WvStream &input, WvStream &output)
377{
378 char buf[1024];
379 size_t len;
380
381 len = input.read(buf, sizeof(buf));
382 output.write(buf, len);
383}
384
385
387{
388 execute();
389 if (!!callfunc)
390 callfunc();
391}
392
393
395{
396 _callback();
397 return NULL;
398}
399
400
402{
403 TRACE("(?)");
404
405 // if the alarm has gone off and we're calling callback... good!
406 if (alarm_remaining() == 0)
407 {
408 alarm_time = wvtime_zero;
409 alarm_was_ticking = true;
410 }
411 else
412 alarm_was_ticking = false;
413
414 assert(!uses_continue_select || personal_stack_size >= 1024);
415
416#define TEST_CONTINUES_HARSHLY 0
417#if TEST_CONTINUES_HARSHLY
418#ifndef _WIN32
419# warning "Using WvCont for *all* streams for testing!"
420#endif
421 if (1)
422#else
424#endif
425 {
426 if (!call_ctx) // no context exists yet!
427 {
428 call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
430 }
431
432 call_ctx(NULL);
433 }
434 else
435 _callback();
436
437 // if this assertion fails, a derived class's virtual execute() function
438 // didn't call its parent's execute() function, and we didn't make it
439 // all the way back up to WvStream::execute(). This doesn't always
440 // matter right now, but it could lead to obscure bugs later, so we'll
441 // enforce it.
442}
443
444
445bool WvStream::isok() const
446{
447 return !closed && WvErrorBase::isok();
448}
449
450
451void WvStream::seterr(int _errnum)
452{
453 if (!geterr()) // no pre-existing error
454 {
455 WvErrorBase::seterr(_errnum);
456 close();
457 }
458}
459
460
461size_t WvStream::read(WvBuf &outbuf, size_t count)
462{
463 // for now, just wrap the older read function
464 size_t free = outbuf.free();
465 if (count > free)
466 count = free;
467
468 WvDynBuf tmp;
469 unsigned char *buf = tmp.alloc(count);
470 size_t len = read(buf, count);
471 tmp.unalloc(count - len);
472 outbuf.merge(tmp);
473 return len;
474}
475
476
477size_t WvStream::write(WvBuf &inbuf, size_t count)
478{
479 // for now, just wrap the older write function
480 size_t avail = inbuf.used();
481 if (count > avail)
482 count = avail;
483 const unsigned char *buf = inbuf.get(count);
484 size_t len = write(buf, count);
485 inbuf.unget(count - len);
486 return len;
487}
488
489
490size_t WvStream::read(void *buf, size_t count)
491{
492 assert(!count || buf);
493
494 size_t bufu, i;
495 unsigned char *newbuf;
496
497 bufu = inbuf.used();
498 if (bufu < queue_min)
499 {
500 newbuf = inbuf.alloc(queue_min - bufu);
501 assert(newbuf);
502 i = uread(newbuf, queue_min - bufu);
503 inbuf.unalloc(queue_min - bufu - i);
504
505 bufu = inbuf.used();
506 }
507
508 if (bufu < queue_min)
509 {
511 return 0;
512 }
513
514 // if buffer is empty, do a hard read
515 if (!bufu)
516 bufu = uread(buf, count);
517 else
518 {
519 // otherwise just read from the buffer
520 if (bufu > count)
521 bufu = count;
522
523 memcpy(buf, inbuf.get(bufu), bufu);
524 }
525
526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
528 return bufu;
529}
530
531
532size_t WvStream::write(const void *buf, size_t count)
533{
534 assert(!count || buf);
535 if (!isok() || !buf || !count || stop_write) return 0;
536
537 size_t wrote = 0;
538 if (!outbuf_delayed_flush && !outbuf.used())
539 {
540 wrote = uwrite(buf, count);
541 count -= wrote;
542 buf = (const unsigned char *)buf + wrote;
543 // if (!count) return wrote; // short circuit if no buffering needed
544 }
545 if (max_outbuf_size != 0)
546 {
547 size_t canbuffer = max_outbuf_size - outbuf.used();
548 if (count > canbuffer)
549 count = canbuffer; // can't write the whole amount
550 }
551 if (count != 0)
552 {
553 outbuf.put(buf, count);
554 wrote += count;
555 }
556
557 if (should_flush())
558 {
559 if (is_auto_flush)
560 flush(0);
561 else
562 flush_outbuf(0);
563 }
564
565 return wrote;
566}
567
568
570{
571 stop_read = true;
573}
574
575
577{
578 stop_write = true;
580}
581
582
584{
585 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
586 close();
587}
588
589
591{
592 return isok() && select(0, true, false, false);
593}
594
595
597{
598 return !stop_write && isok() && select(0, false, true, false);
599}
600
601
602char *WvStream::blocking_getline(time_t wait_msec, int separator,
603 int readahead)
604{
605 assert(separator >= 0);
606 assert(separator <= 255);
607
608 //assert(uses_continue_select || wait_msec == 0);
609
610 WvTime timeout_time(0);
611 if (wait_msec > 0)
612 timeout_time = msecadd(wvtime(), wait_msec);
613
615
616 // if we get here, we either want to wait a bit or there is data
617 // available.
618 while (isok())
619 {
620 // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
621 queuemin(0);
622
623 // if there is a newline already, we have enough data.
624 if (inbuf.strchr(separator) > 0)
625 break;
626 else if (!isok() || stop_read) // uh oh, stream is in trouble.
627 break;
628
629 // make select not return true until more data is available
630 queuemin(inbuf.used() + 1);
631
632 // compute remaining timeout
633 if (wait_msec > 0)
634 {
635 wait_msec = msecdiff(timeout_time, wvtime());
636 if (wait_msec < 0)
637 wait_msec = 0;
638 }
639
640 // FIXME: this is blocking_getline. It shouldn't
641 // call continue_select()!
642 bool hasdata;
643 if (wait_msec != 0 && uses_continue_select)
644 hasdata = continue_select(wait_msec);
645 else
646 hasdata = select(wait_msec, true, false);
647
648 if (!isok())
649 break;
650
651 if (hasdata)
652 {
653 // read a few bytes
654 WvDynBuf tmp;
655 unsigned char *buf = tmp.alloc(readahead);
656 assert(buf);
657 size_t len = uread(buf, readahead);
658 tmp.unalloc(readahead - len);
659 inbuf.put(tmp.get(len), len);
660 hasdata = len > 0; // enough?
661 }
662
663 if (!isok())
664 break;
665
666 if (!hasdata && wait_msec == 0)
667 return NULL; // handle timeout
668 }
669 if (!inbuf.used())
670 return NULL;
671
672 // return the appropriate data
673 size_t i = 0;
674 i = inbuf.strchr(separator);
675 if (i > 0) {
676 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
677 assert(eol && *eol == separator);
678 *eol = 0;
679 return const_cast<char*>((const char *)inbuf.get(i));
680 } else {
681 // handle "EOF without newline" condition
682 // FIXME: it's very silly that buffers can't return editable
683 // char* arrays.
684 inbuf.alloc(1)[0] = 0; // null-terminate it
685 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
686 }
687}
688
689
690char *WvStream::continue_getline(time_t wait_msec, int separator,
691 int readahead)
692{
693 assert(false && "not implemented, come back later!");
694 assert(uses_continue_select);
695 return NULL;
696}
697
698
700{
701 char buf[1024];
702 while (isreadable())
703 read(buf, sizeof(buf));
704}
705
706
707bool WvStream::flush(time_t msec_timeout)
708{
709 if (is_flushing) return false;
710
711 TRACE("%p flush starts\n", this);
712
713 is_flushing = true;
714 want_to_flush = true;
715 bool done = flush_internal(msec_timeout) // any other internal buffers
716 && flush_outbuf(msec_timeout); // our own outbuf
717 is_flushing = false;
718
719 TRACE("flush stops (%d)\n", done);
720 return done;
721}
722
723
725{
726 return want_to_flush;
727}
728
729
730bool WvStream::flush_outbuf(time_t msec_timeout)
731{
732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
733 bool outbuf_was_used = outbuf.used();
734
735 // do-nothing shortcut for speed
736 // FIXME: definitely makes a "measurable" difference...
737 // but is it worth the risk?
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
739 {
741 return true;
742 }
743
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
745
746 // flush outbuf
747 while (outbuf_was_used && isok())
748 {
749// fprintf(stderr, "%p: fd:%d/%d, used:%d\n",
750// this, getrfd(), getwfd(), outbuf.used());
751
752 size_t attempt = outbuf.optgettable();
753 size_t real = uwrite(outbuf.get(attempt), attempt);
754
755 // WARNING: uwrite() may have messed up our outbuf!
756 // This probably only happens if uwrite() closed the stream because
757 // of an error, so we'll check isok().
758 if (isok() && real < attempt)
759 {
760 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.ungettable() >= attempt - real);
762 outbuf.unget(attempt - real);
763 }
764
765 // since post_select() can call us, and select() calls post_select(),
766 // we need to be careful not to call select() if we don't need to!
767 // post_select() will only call us with msec_timeout==0, and we don't
768 // need to do select() in that case anyway.
769 if (!msec_timeout)
770 break;
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
773 break;
774
775 outbuf_was_used = outbuf.used();
776 }
777
778 // handle autoclose
779 if (autoclose_time && isok())
780 {
781 time_t now = time(NULL);
782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.used());
784 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
785 {
786 autoclose_time = 0; // avoid infinite recursion!
787 close();
788 }
789 }
790
791 TRACE("flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush = false;
794
795 TRACE("flush_outbuf: now isok=%d\n", isok());
796
797 // if we can't flush the outbuf, at least empty it!
798 if (outbuf_was_used && !isok())
799 outbuf.zap();
800
802 TRACE("flush_outbuf stops\n");
803
804 return !outbuf_was_used;
805}
806
807
808bool WvStream::flush_internal(time_t msec_timeout)
809{
810 // once outbuf emptied, that's it for most streams
811 return true;
812}
813
814
815int WvStream::getrfd() const
816{
817 return -1;
818}
819
820
821int WvStream::getwfd() const
822{
823 return -1;
824}
825
826
827void WvStream::flush_then_close(int msec_timeout)
828{
829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
831
832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.used(), autoclose_time - now);
834
835 // as a fast track, we _could_ close here: but that's not a good idea,
836 // since flush_then_close() deals with obscure situations, and we don't
837 // want the caller to use it incorrectly. So we make things _always_
838 // break when the caller forgets to call select() later.
839
840 flush(0);
841}
842
843
845{
847
848 time_t alarmleft = alarm_remaining();
849
850 if (!isok() || (!si.inherit_request && alarmleft == 0))
851 {
852 si.msec_timeout = 0;
853 return; // alarm has rung
854 }
855
856 if (!si.inherit_request)
857 {
858 si.wants.readable |= static_cast<bool>(readcb);
859 si.wants.writable |= static_cast<bool>(writecb);
860 si.wants.isexception |= static_cast<bool>(exceptcb);
861 }
862
863 // handle read-ahead buffering
864 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
865 {
866 si.msec_timeout = 0; // already ready
867 return;
868 }
869 if (alarmleft >= 0
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
872}
873
874
876{
877 if (!si.inherit_request)
878 {
879 si.wants.readable |= static_cast<bool>(readcb);
880 si.wants.writable |= static_cast<bool>(writecb);
881 si.wants.isexception |= static_cast<bool>(exceptcb);
882 }
883
884 // FIXME: need sane buffer flush support for non FD-based streams
885 // FIXME: need read_requires_writable and write_requires_readable
886 // support for non FD-based streams
887
888 // note: flush(nonzero) might call select(), but flush(0) never does,
889 // so this is safe.
890 if (should_flush())
891 flush(0);
892 if (!si.inherit_request && alarm_remaining() == 0)
893 return true; // alarm ticked
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.used() && inbuf.used() >= queue_min)
896 return true; // already ready
897 return false;
898}
899
900
901void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902 bool readable, bool writable, bool isexcept, bool forceable)
903{
904 FD_ZERO(&si.read);
905 FD_ZERO(&si.write);
906 FD_ZERO(&si.except);
907
908 if (forceable)
909 {
910 si.wants.readable = static_cast<bool>(readcb);
911 si.wants.writable = static_cast<bool>(writecb);
912 si.wants.isexception = static_cast<bool>(exceptcb);
913 }
914 else
915 {
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
919 }
920
921 si.max_fd = -1;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure = false;
925
926 wvstime_sync();
927
928 pre_select(si);
929 if (globalstream && forceable && (globalstream != this))
930 {
931 WvStream *s = globalstream;
932 globalstream = NULL; // prevent recursion
933 s->xpre_select(si, SelectRequest(false, false, false));
934 globalstream = s;
935 }
936}
937
938
939int WvStream::_do_select(SelectInfo &si)
940{
941 // prepare timeout
942 timeval tv;
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
945
946#ifdef _WIN32
947 // selecting on an empty set of sockets doesn't cause a delay in win32.
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
950#endif
951
952 // block
953 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
955
956 // handle errors.
957 // EAGAIN and EINTR don't matter because they're totally normal.
958 // ENOBUFS is hopefully transient.
959 // EBADF is kind of gross and might imply that something is wrong,
960 // but it happens sometimes...
961 if (sel < 0
962 && errno != EAGAIN && errno != EINTR
963 && errno != EBADF
964 && errno != ENOBUFS
965 )
966 {
967 seterr(errno);
968 }
969#ifdef _WIN32
970 ::close(fakefd);
971#endif
972 TRACE("select() returned %d\n", sel);
973 return sel;
974}
975
976
977bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
978{
979 // We cannot move the clock backward here, because timers that
980 // were expired in pre_select could then not be expired anymore,
981 // and while time going backward is rather unsettling in general,
982 // for it to be happening between pre_select and post_select is
983 // just outright insanity.
984 wvstime_sync_forward();
985
986 bool sure = post_select(si);
987 if (globalstream && forceable && (globalstream != this))
988 {
989 WvStream *s = globalstream;
990 globalstream = NULL; // prevent recursion
991 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
992 || si.global_sure;
993 globalstream = s;
994 }
995 return sure;
996}
997
998
999bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
1000 bool isexcept, bool forceable)
1001{
1002 // Detect use of deleted stream
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1004
1005 SelectInfo si;
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1007 forceable);
1008
1009 bool sure = false;
1010 int sel = _do_select(si);
1011 if (sel >= 0)
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream != this))
1014 globalstream->callback();
1015
1016 return sure;
1017}
1018
1019
1021{
1022 return IWvStream::SelectRequest(static_cast<bool>(readcb), static_cast<bool>(writecb),
1023 static_cast<bool>(exceptcb));
1024}
1025
1026
1027void WvStream::force_select(bool readable, bool writable, bool isexception)
1028{
1029 if (readable)
1030 readcb = wv::bind(&WvStream::legacy_callback, this);
1031 if (writable)
1032 writecb = wv::bind(&WvStream::legacy_callback, this);
1033 if (isexception)
1034 exceptcb = wv::bind(&WvStream::legacy_callback, this);
1035}
1036
1037
1038void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
1039{
1040 if (readable)
1041 readcb = 0;
1042 if (writable)
1043 writecb = 0;
1044 if (isexception)
1045 exceptcb = 0;
1046}
1047
1048
1049void WvStream::alarm(time_t msec_timeout)
1050{
1051 if (msec_timeout >= 0)
1052 alarm_time = msecadd(wvstime(), msec_timeout);
1053 else
1054 alarm_time = wvtime_zero;
1055}
1056
1057
1059{
1060 if (alarm_time.tv_sec)
1061 {
1062 WvTime now = wvstime();
1063
1064 // Time is going backward!
1065 if (now < last_alarm_check)
1066 {
1067#if 0 // okay, I give up. Time just plain goes backwards on some systems.
1068 // warn only if it's a "big" difference (sigh...)
1069 if (msecdiff(last_alarm_check, now) > 200)
1070 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
1071 "(%ld:%ld %ld:%ld)\n",
1072 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1073 now.tv_sec, now.tv_usec);
1074#endif
1075 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1076 }
1077
1078 last_alarm_check = now;
1079
1080 time_t remaining = msecdiff(alarm_time, now);
1081 if (remaining < 0)
1082 remaining = 0;
1083 return remaining;
1084 }
1085 return -1;
1086}
1087
1088
1089bool WvStream::continue_select(time_t msec_timeout)
1090{
1091 assert(uses_continue_select);
1092
1093 // if this assertion triggers, you probably tried to do continue_select()
1094 // while inside terminate_continue_select().
1095 assert(call_ctx);
1096
1097 if (msec_timeout >= 0)
1098 alarm(msec_timeout);
1099
1100 alarm(msec_timeout);
1101 WvCont::yield();
1102 alarm(-1); // cancel the still-pending alarm, or it might go off later!
1103
1104 // when we get here, someone has jumped back into our task.
1105 // We have to select(0) here because it's possible that the alarm was
1106 // ticking _and_ data was available. This is aggravated especially if
1107 // msec_delay was zero. Note that running select() here isn't
1108 // inefficient, because if the alarm was expired then pre_select()
1109 // returned true anyway and short-circuited the previous select().
1110 TRACE("hello-%p\n", this);
1111 return !alarm_was_ticking || select(0, static_cast<bool>(readcb),
1112 static_cast<bool>(writecb), static_cast<bool>(exceptcb));
1113}
1114
1115
1117{
1118 close();
1119 call_ctx = 0; // destroy the context, if necessary
1120}
1121
1122
1123const WvAddr *WvStream::src() const
1124{
1125 return NULL;
1126}
1127
1128
1129void WvStream::setcallback(IWvStreamCallback _callfunc)
1130{
1131 callfunc = _callfunc;
1132 call_ctx = 0; // delete any in-progress WvCont
1133}
1134
1135
1136void WvStream::legacy_callback()
1137{
1138 execute();
1139 if (!!callfunc)
1140 callfunc();
1141}
1142
1143
1144IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
1145{
1146 IWvStreamCallback tmp = readcb;
1147
1148 readcb = _callback;
1149
1150 return tmp;
1151}
1152
1153
1154IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
1155{
1156 IWvStreamCallback tmp = writecb;
1157
1158 writecb = _callback;
1159
1160 return tmp;
1161}
1162
1163
1164IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
1165{
1166 IWvStreamCallback tmp = exceptcb;
1167
1168 exceptcb = _callback;
1169
1170 return tmp;
1171}
1172
1173
1174IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
1175{
1176 IWvStreamCallback tmp = closecb;
1177 if (isok())
1178 closecb = _callback;
1179 else
1180 {
1181 // already closed? notify immediately!
1182 closecb = 0;
1183 if (!!_callback)
1184 _callback();
1185 }
1186 return tmp;
1187}
1188
1189
1190void WvStream::unread(WvBuf &unreadbuf, size_t count)
1191{
1192 WvDynBuf tmp;
1193 tmp.merge(unreadbuf, count);
1194 tmp.merge(inbuf);
1195 inbuf.zap();
1196 inbuf.merge(tmp);
1197}
1198
1199
1200IWvStream *WvStream::find_by_wsid(WSID wsid)
1201{
1202 IWvStream *retval = NULL;
1203
1204 if (wsid_map)
1205 {
1206 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1207
1208 if (it != wsid_map->end())
1209 retval = it->second;
1210 }
1211
1212 return retval;
1213}
The basic interface which is included by all other XPLC interfaces and objects.
Definition IObject.h:65
virtual unsigned int addRef()=0
Indicate you are using this object.
virtual unsigned int release()=0
Indicate that you are finished using this object.
Base class for different address types, each of which will have the ability to convert itself to/from...
Definition wvaddr.h:119
void merge(Buffer &inbuf, size_t count)
Efficiently moves count bytes from the specified buffer into this one.
Definition wvbufbase.h:558
size_t optgettable() const
Returns the optimal maximum number of elements in the buffer currently available for reading without ...
Definition wvbufbase.h:154
T * mutablepeek(int offset, size_t count)
Returns a non-const pointer info the buffer at the specified offset to the specified number of elemen...
Definition wvbufbase.h:461
size_t ungettable() const
Returns the maximum number of elements that may be ungotten at this time.
Definition wvbufbase.h:188
const T * get(size_t count)
Reads exactly the specified number of elements and returns a pointer to a storage location owned by t...
Definition wvbufbase.h:114
void put(const T *data, size_t count)
Writes the specified number of elements from the specified storage location into the buffer at its ta...
Definition wvbufbase.h:483
void unget(size_t count)
Ungets exactly the specified number of elements by returning them to the buffer for subsequent reads.
Definition wvbufbase.h:177
void unalloc(size_t count)
Unallocates exactly the specified number of elements by removing them from the buffer and releasing t...
Definition wvbufbase.h:421
T * alloc(size_t count)
Allocates exactly the specified number of elements and returns a pointer to an UNINITIALIZED storage ...
Definition wvbufbase.h:379
void zap()
Clears the buffer.
Definition wvbufbase.h:257
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition wvbufbase.h:92
WvCont provides "continuations", which are apparently also known as semi-coroutines.
Definition wvcont.h:30
static void * yield(void *ret=0)
"return" from the current callback, giving value 'ret' to the person who called us.
Definition wvcont.cc:222
virtual bool isok() const
By default, returns true if geterr() == 0.
Definition wverror.h:39
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
Definition wverror.cc:144
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition wverror.h:48
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition wvstring.h:94
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition wvmoniker.h:62
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition wvstream.h:25
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
Definition wvstream.h:339
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
Definition wvstream.cc:827
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition wvstream.h:652
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
Definition wvstream.cc:1174
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
Definition wvstream.h:232
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
Definition wvstream.cc:1020
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
Definition wvstream.cc:875
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
Definition wvstream.cc:707
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
Definition wvstream.cc:602
virtual bool isreadable()
Returns true if the stream is readable.
Definition wvstream.cc:590
virtual bool isok() const
return true if the stream is actually usable right now
Definition wvstream.cc:445
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
Definition wvstream.cc:690
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
Definition wvstream.cc:1038
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
Definition wvstream.cc:362
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
Definition wvstream.h:146
virtual void nowrite()
Shuts down the writing side of the stream.
Definition wvstream.cc:576
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition wvstream.cc:1049
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
Definition wvstream.cc:1129
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
Definition wvstream.cc:1027
WvStream()
Basic constructor for just a do-nothing WvStream.
Definition wvstream.cc:249
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
Definition wvstream.h:156
bool uses_continue_select
If this is set, enables the use of continue_select().
Definition wvstream.h:45
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
Definition wvstream.h:57
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
Definition wvstream.cc:394
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
Definition wvstream.cc:724
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
Definition wvstream.cc:844
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
Definition wvstream.h:376
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
Definition wvstream.cc:583
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition wvstream.cc:490
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
Definition wvstream.cc:1123
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
Definition wvstream.h:318
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
Definition wvstream.cc:1164
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
Definition wvstream.cc:1116
void noautoforward()
Stops autoforwarding.
Definition wvstream.cc:369
void _callback()
Actually call the registered callfunc and execute().
Definition wvstream.cc:386
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition wvstream.cc:341
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition wvstream.cc:1058
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Definition wvstream.cc:596
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
Definition wvstream.cc:1144
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
Definition wvstream.h:48
void drain()
drain the input buffer (read and discard data until select(0) returns false)
Definition wvstream.cc:699
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
Definition wvstream.cc:1089
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition wvstream.cc:451
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
Definition wvstream.cc:1154
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition wvstream.cc:1190
virtual void noread()
Shuts down the reading side of the stream.
Definition wvstream.cc:569
virtual void callback()
if the stream has a callback function defined, call it now.
Definition wvstream.cc:401
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
Definition wvstream.h:36
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition wvstream.h:54
This is a WvList of WvStrings, and is a really handy way to parse strings.
WvString popstr()
get the first string in the list, or an empty string if the list is empty.
WvString is an implementation of a simple and efficient printable-string class.
Definition wvstring.h:330
Based on (and interchangeable with) struct timeval.
Definition wvtimeutils.h:18
the data structure used by pre_select()/post_select() and internally by select().
Definition iwvstream.h:50
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition iwvstream.h:34
#define UUID_MAP_END
Marks the end of an interface map.
Definition utils.h:80
#define UUID_MAP_BEGIN(component)
Start the interface map for "component".
Definition utils.h:63
#define UUID_MAP_ENTRY(iface)
Add an entry to an interface map.
Definition utils.h:68
Various little string functions.