WvStreams
wvstream.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * Unified support for streams, that is, sequences of bytes that may or
6 * may not be ready for read/write at any given time.
7 *
8 * We provide typical read and write routines, as well as a select() function
9 * for each stream.
10 */
11#include <time.h>
12#include <sys/types.h>
13#include <assert.h>
14#define __WVSTREAM_UNIT_TEST 1
15#include "wvstream.h"
16#include "wvtimeutils.h"
17#include "wvcont.h"
18#include "wvstreamsdebugger.h"
19#include "wvstrutils.h"
20#include "wvistreamlist.h"
21#include "wvlinkerhack.h"
22#include "wvmoniker.h"
23
24#ifdef _WIN32
25#define ENOBUFS WSAENOBUFS
26#undef errno
27#define errno GetLastError()
28#ifdef __GNUC__
29#include <sys/socket.h>
30#endif
31#include "streams.h"
32#else
33#include <errno.h>
34#endif
35
36#include <map>
37
38using std::make_pair;
39using std::map;
40
41
42// enable this to add some read/write trace messages (this can be VERY
43// verbose)
44#if 0
45# ifndef _MSC_VER
46# define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
47# else
48# define TRACE printf
49# endif
50#else
51# ifndef _MSC_VER
52# define TRACE(x, y...)
53# else
54# define TRACE
55# endif
56#endif
57
58WvStream *WvStream::globalstream = NULL;
59
64
65
66static map<WSID, WvStream*> *wsid_map;
67static WSID next_wsid_to_try;
68
69
70WV_LINK(WvStream);
71
72static IWvStream *create_null(WvStringParm, IObject *)
73{
74 return new WvStream();
75}
76
77static WvMoniker<IWvStream> reg("null", create_null);
78
79
80IWvStream *IWvStream::create(WvStringParm moniker, IObject *obj)
81{
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
83 if (!s)
84 {
85 s = new WvStream();
86 s->seterr_both(EINVAL, "Unknown moniker '%s'", moniker);
87 WVRELEASE(obj); // we're not going to use it after all
88 }
89 return s;
90}
91
92
93static bool is_prefix_insensitive(const char *str, const char *prefix)
94{
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
97}
98
99
100static const char *strstr_insensitive(const char *haystack, const char *needle)
101{
102 while (*haystack != '\0')
103 {
104 if (is_prefix_insensitive(haystack, needle))
105 return haystack;
106 ++haystack;
107 }
108 return NULL;
109}
110
111
112static bool contains_insensitive(const char *haystack, const char *needle)
113{
114 return strstr_insensitive(haystack, needle) != NULL;
115}
116
117
118static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119static inline const char *Yes_No(bool val)
120{
121 return val? "Yes": "No";
122}
123
124
125void WvStream::debugger_streams_display_header(WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
127{
128 WvStringList result;
129 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
130 "----------------Type", "-", "Name--------------------");
131 result_cb(cmd, result);
132}
133
134
135// Set to fit in 6 chars
136static WvString friendly_ms(time_t ms)
137{
138 if (ms <= 0)
139 return WvString("(%s)", ms);
140 else if (ms < 1000)
141 return WvString("%sms", ms);
142 else if (ms < 60*1000)
143 return WvString("%ss", ms/1000);
144 else if (ms < 60*60*1000)
145 return WvString("%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString("%sh", ms/(60*60*1000));
148 else
149 return WvString("%sd", ms/(24*60*60*1000));
150}
151
152void WvStream::debugger_streams_display_one_stream(WvStream *s,
153 WvStringParm cmd,
154 WvStreamsDebugger::ResultCallback result_cb)
155{
156 WvStringList result;
157 s->addRef();
158 unsigned refcount = s->release();
159 result.append(list_format,
160 s->wsid(), " ",
161 refcount, " ",
162 Yes_No(s->isok()), " ",
163 Yes_No(s->uses_continue_select), " ",
164 friendly_ms(s->alarm_remaining()), " ",
165 s->wstype(), " ",
166 s->wsname());
167 result_cb(cmd, result);
168}
169
170
171void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
172 WvStringParm cmd,
173 const WvStringList &args,
174 WvStreamsDebugger::ResultCallback result_cb)
175{
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
179 {
180 WSID wsid;
181 bool is_num = wvstring_to_num(*arg, wsid);
182
183 if (is_num)
184 {
185 if (s->wsid() == wsid)
186 {
187 show = true;
188 break;
189 }
190 }
191 else
192 {
193 if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194 || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
195 {
196 show = true;
197 break;
198 }
199 }
200 }
201 if (show)
202 debugger_streams_display_one_stream(s, cmd, result_cb);
203}
204
205
206WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
207 WvStringList &args,
208 WvStreamsDebugger::ResultCallback result_cb, void *)
209{
210 debugger_streams_display_header(cmd, result_cb);
211 if (wsid_map)
212 {
213 map<WSID, WvStream*>::iterator it;
214
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
217 result_cb);
218 }
219
220 return WvString::null;
221}
222
223
224WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
225 WvStringList &args,
226 WvStreamsDebugger::ResultCallback result_cb, void *)
227{
228 if (args.isempty())
229 return WvString("Usage: %s <WSID>", cmd);
230 WSID wsid;
231 WvString wsid_str = args.popstr();
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString("Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
235 if (!s)
236 return WvString("No such stream");
237 s->close();
238 return WvString::null;
239}
240
241
242void WvStream::add_debugger_commands()
243{
244 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
246}
247
248
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
255 stop_read(false),
256 stop_write(false),
257 closed(false),
258 readcb(wv::bind(&WvStream::legacy_callback, this)),
259 max_outbuf_size(0),
260 outbuf_delayed_flush(false),
261 is_auto_flush(true),
262 want_to_flush(true),
263 is_flushing(false),
264 queue_min(0),
265 autoclose_time(0),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
268{
269 TRACE("Creating wvstream %p\n", this);
270
271 static bool first = true;
272 if (first)
273 {
274 first = false;
275 WvStream::add_debugger_commands();
276 }
277
278 // Choose a wsid;
279 if (!wsid_map)
280 wsid_map = new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
282 do
283 {
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
285 break;
286 ++next_wsid_to_try;
287 } while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid, this)).second;
290 assert(inserted);
291
292#ifdef _WIN32
293 WSAData wsaData;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
295 assert(result == 0);
296#endif
297}
298
299
300// FIXME: interfaces (IWvStream) shouldn't have implementations!
301IWvStream::IWvStream()
302{
303}
304
305
306IWvStream::~IWvStream()
307{
308}
309
310
311WvStream::~WvStream()
312{
313 TRACE("destroying %p\n", this);
314 close();
315
316 // if this assertion fails, then uses_continue_select is true, but you
317 // didn't call terminate_continue_select() or close() before destroying
318 // your object. Shame on you!
319 assert(!uses_continue_select || !call_ctx);
320
321 call_ctx = 0; // finish running the suspended callback, if any
322
323 assert(wsid_map);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
326 {
327 delete wsid_map;
328 wsid_map = NULL;
329 }
330
331 // eventually, streams will auto-add themselves to the globallist. But
332 // even before then, it'll never be useful for them to be on the
333 // globallist *after* they get destroyed, so we might as well auto-remove
334 // them already. It's harmless for people to try to remove them twice.
335 WvIStreamList::globallist.unlink(this);
336
337 TRACE("done destroying %p\n", this);
338}
339
340
342{
343 TRACE("flushing in wvstream...\n");
344 flush(2000); // fixme: should not hardcode this stuff
345 TRACE("(flushed)\n");
346
347 closed = true;
348
349 if (!!closecb)
350 {
351 IWvStreamCallback cb = closecb;
352 closecb = 0; // ensure callback is only called once
353 cb();
354 }
355
356 // I would like to delete call_ctx here, but then if someone calls
357 // close() from *inside* a continuable callback, we explode. Oops!
358 //call_ctx = 0; // destroy the context, if necessary
359}
360
361
363{
364 setcallback(wv::bind(autoforward_callback, wv::ref(*this), wv::ref(s)));
366}
367
368
370{
371 setcallback(0);
373}
374
375
376void WvStream::autoforward_callback(WvStream &input, WvStream &output)
377{
378 char buf[1024];
379 size_t len;
380
381 len = input.read(buf, sizeof(buf));
382 output.write(buf, len);
383}
384
385
387{
388 execute();
389 if (!!callfunc)
390 callfunc();
391}
392
393
395{
396 _callback();
397 return NULL;
398}
399
400
402{
403 TRACE("(?)");
404
405 // if the alarm has gone off and we're calling callback... good!
406 if (alarm_remaining() == 0)
407 {
408 alarm_time = wvtime_zero;
409 alarm_was_ticking = true;
410 }
411 else
412 alarm_was_ticking = false;
413
414 assert(!uses_continue_select || personal_stack_size >= 1024);
415
416#define TEST_CONTINUES_HARSHLY 0
417#if TEST_CONTINUES_HARSHLY
418#ifndef _WIN32
419# warning "Using WvCont for *all* streams for testing!"
420#endif
421 if (1)
422#else
424#endif
425 {
426 if (!call_ctx) // no context exists yet!
427 {
428 call_ctx = WvCont(wv::bind(&WvStream::_callwrap, this, _1),
430 }
431
432 call_ctx(NULL);
433 }
434 else
435 _callback();
436
437 // if this assertion fails, a derived class's virtual execute() function
438 // didn't call its parent's execute() function, and we didn't make it
439 // all the way back up to WvStream::execute(). This doesn't always
440 // matter right now, but it could lead to obscure bugs later, so we'll
441 // enforce it.
442}
443
444
445bool WvStream::isok() const
446{
447 return !closed && WvErrorBase::isok();
448}
449
450
451void WvStream::seterr(int _errnum)
452{
453 if (!geterr()) // no pre-existing error
454 {
455 WvErrorBase::seterr(_errnum);
456 close();
457 }
458}
459
460
461size_t WvStream::read(WvBuf &outbuf, size_t count)
462{
463 // for now, just wrap the older read function
464 size_t free = outbuf.free();
465 if (count > free)
466 count = free;
467
468 WvDynBuf tmp;
469 unsigned char *buf = tmp.alloc(count);
470 size_t len = read(buf, count);
471 tmp.unalloc(count - len);
472 outbuf.merge(tmp);
473 return len;
474}
475
476
477size_t WvStream::write(WvBuf &inbuf, size_t count)
478{
479 // for now, just wrap the older write function
480 size_t avail = inbuf.used();
481 if (count > avail)
482 count = avail;
483 const unsigned char *buf = inbuf.get(count);
484 size_t len = write(buf, count);
485 inbuf.unget(count - len);
486 return len;
487}
488
489
490size_t WvStream::read(void *buf, size_t count)
491{
492 assert(!count || buf);
493
494 size_t bufu, i;
495 unsigned char *newbuf;
496
497 bufu = inbuf.used();
498 if (bufu < queue_min)
499 {
500 newbuf = inbuf.alloc(queue_min - bufu);
501 assert(newbuf);
502 i = uread(newbuf, queue_min - bufu);
503 inbuf.unalloc(queue_min - bufu - i);
504
505 bufu = inbuf.used();
506 }
507
508 if (bufu < queue_min)
509 {
511 return 0;
512 }
513
514 // if buffer is empty, do a hard read
515 if (!bufu)
516 bufu = uread(buf, count);
517 else
518 {
519 // otherwise just read from the buffer
520 if (bufu > count)
521 bufu = count;
522
523 memcpy(buf, inbuf.get(bufu), bufu);
524 }
525
526 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
528 return bufu;
529}
530
531
532size_t WvStream::write(const void *buf, size_t count)
533{
534 assert(!count || buf);
535 if (!isok() || !buf || !count || stop_write) return 0;
536
537 size_t wrote = 0;
538 if (!outbuf_delayed_flush && !outbuf.used())
539 {
540 wrote = uwrite(buf, count);
541 count -= wrote;
542 buf = (const unsigned char *)buf + wrote;
543 // if (!count) return wrote; // short circuit if no buffering needed
544 }
545 if (max_outbuf_size != 0)
546 {
547 size_t canbuffer = max_outbuf_size - outbuf.used();
548 if (count > canbuffer)
549 count = canbuffer; // can't write the whole amount
550 }
551 if (count != 0)
552 {
553 outbuf.put(buf, count);
554 wrote += count;
555 }
556
557 if (should_flush())
558 {
559 if (is_auto_flush)
560 flush(0);
561 else
562 flush_outbuf(0);
563 }
564
565 return wrote;
566}
567
568
570{
571 stop_read = true;
573}
574
575
577{
578 stop_write = true;
580}
581
582
584{
585 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
586 close();
587}
588
589
591{
592 return isok() && select(0, true, false, false);
593}
594
595
597{
598 return !stop_write && isok() && select(0, false, true, false);
599}
600
601
602char *WvStream::blocking_getline(time_t wait_msec, int separator,
603 int readahead)
604{
605 assert(separator >= 0);
606 assert(separator <= 255);
607
608 //assert(uses_continue_select || wait_msec == 0);
609
610 WvTime timeout_time(0);
611 if (wait_msec > 0)
612 timeout_time = msecadd(wvtime(), wait_msec);
613
615
616 // if we get here, we either want to wait a bit or there is data
617 // available.
618 while (isok())
619 {
620 // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
621 queuemin(0);
622
623 // if there is a newline already, we have enough data.
624 if (inbuf.strchr(separator) > 0)
625 break;
626 else if (!isok() || stop_read) // uh oh, stream is in trouble.
627 break;
628
629 // make select not return true until more data is available
630 queuemin(inbuf.used() + 1);
631
632 // compute remaining timeout
633 if (wait_msec > 0)
634 {
635 wait_msec = msecdiff(timeout_time, wvtime());
636 if (wait_msec < 0)
637 wait_msec = 0;
638 }
639
640 // FIXME: this is blocking_getline. It shouldn't
641 // call continue_select()!
642 bool hasdata;
643 if (wait_msec != 0 && uses_continue_select)
644 hasdata = continue_select(wait_msec);
645 else
646 hasdata = select(wait_msec, true, false);
647
648 if (!isok())
649 break;
650
651 if (hasdata)
652 {
653 // read a few bytes
654 WvDynBuf tmp;
655 unsigned char *buf = tmp.alloc(readahead);
656 assert(buf);
657 size_t len = uread(buf, readahead);
658 tmp.unalloc(readahead - len);
659 inbuf.put(tmp.get(len), len);
660 hasdata = len > 0; // enough?
661 }
662
663 if (!isok())
664 break;
665
666 if (!hasdata && wait_msec == 0)
667 return NULL; // handle timeout
668 }
669 if (!inbuf.used())
670 return NULL;
671
672 // return the appropriate data
673 size_t i = 0;
674 i = inbuf.strchr(separator);
675 if (i > 0) {
676 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
677 assert(eol && *eol == separator);
678 *eol = 0;
679 return const_cast<char*>((const char *)inbuf.get(i));
680 } else {
681 // handle "EOF without newline" condition
682 // FIXME: it's very silly that buffers can't return editable
683 // char* arrays.
684 inbuf.alloc(1)[0] = 0; // null-terminate it
685 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
686 }
687}
688
689
690char *WvStream::continue_getline(time_t wait_msec, int separator,
691 int readahead)
692{
693 assert(false && "not implemented, come back later!");
694 assert(uses_continue_select);
695 return NULL;
696}
697
698
700{
701 char buf[1024];
702 while (isreadable())
703 read(buf, sizeof(buf));
704}
705
706
707bool WvStream::flush(time_t msec_timeout)
708{
709 if (is_flushing) return false;
710
711 TRACE("%p flush starts\n", this);
712
713 is_flushing = true;
714 want_to_flush = true;
715 bool done = flush_internal(msec_timeout) // any other internal buffers
716 && flush_outbuf(msec_timeout); // our own outbuf
717 is_flushing = false;
718
719 TRACE("flush stops (%d)\n", done);
720 return done;
721}
722
723
725{
726 return want_to_flush;
727}
728
729
730bool WvStream::flush_outbuf(time_t msec_timeout)
731{
732 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
733 bool outbuf_was_used = outbuf.used();
734
735 // do-nothing shortcut for speed
736 // FIXME: definitely makes a "measurable" difference...
737 // but is it worth the risk?
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
739 {
741 return true;
742 }
743
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
745
746 // flush outbuf
747 while (outbuf_was_used && isok())
748 {
749// fprintf(stderr, "%p: fd:%d/%d, used:%d\n",
750// this, getrfd(), getwfd(), outbuf.used());
751
752 size_t attempt = outbuf.optgettable();
753 size_t real = uwrite(outbuf.get(attempt), attempt);
754
755 // WARNING: uwrite() may have messed up our outbuf!
756 // This probably only happens if uwrite() closed the stream because
757 // of an error, so we'll check isok().
758 if (isok() && real < attempt)
759 {
760 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.ungettable() >= attempt - real);
762 outbuf.unget(attempt - real);
763 }
764
765 // since post_select() can call us, and select() calls post_select(),
766 // we need to be careful not to call select() if we don't need to!
767 // post_select() will only call us with msec_timeout==0, and we don't
768 // need to do select() in that case anyway.
769 if (!msec_timeout)
770 break;
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
773 break;
774
775 outbuf_was_used = outbuf.used();
776 }
777
778 // handle autoclose
779 if (autoclose_time && isok())
780 {
781 time_t now = time(NULL);
782 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.used());
784 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
785 {
786 autoclose_time = 0; // avoid infinite recursion!
787 close();
788 }
789 }
790
791 TRACE("flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush = false;
794
795 TRACE("flush_outbuf: now isok=%d\n", isok());
796
797 // if we can't flush the outbuf, at least empty it!
798 if (outbuf_was_used && !isok())
799 outbuf.zap();
800
802 TRACE("flush_outbuf stops\n");
803
804 return !outbuf_was_used;
805}
806
807
808bool WvStream::flush_internal(time_t msec_timeout)
809{
810 // once outbuf emptied, that's it for most streams
811 return true;
812}
813
814
815int WvStream::getrfd() const
816{
817 return -1;
818}
819
820
821int WvStream::getwfd() const
822{
823 return -1;
824}
825
826
827void WvStream::flush_then_close(int msec_timeout)
828{
829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
831
832 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.used(), autoclose_time - now);
834
835 // as a fast track, we _could_ close here: but that's not a good idea,
836 // since flush_then_close() deals with obscure situations, and we don't
837 // want the caller to use it incorrectly. So we make things _always_
838 // break when the caller forgets to call select() later.
839
840 flush(0);
841}
842
843
845{
847
848 time_t alarmleft = alarm_remaining();
849
850 if (!isok() || (!si.inherit_request && alarmleft == 0))
851 {
852 si.msec_timeout = 0;
853 return; // alarm has rung
854 }
855
856 if (!si.inherit_request)
857 {
858 si.wants.readable |= static_cast<bool>(readcb);
859 si.wants.writable |= static_cast<bool>(writecb);
860 si.wants.isexception |= static_cast<bool>(exceptcb);
861 }
862
863 // handle read-ahead buffering
864 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
865 {
866 si.msec_timeout = 0; // already ready
867 return;
868 }
869 if (alarmleft >= 0
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
872}
873
874
876{
877 if (!si.inherit_request)
878 {
879 si.wants.readable |= static_cast<bool>(readcb);
880 si.wants.writable |= static_cast<bool>(writecb);
881 si.wants.isexception |= static_cast<bool>(exceptcb);
882 }
883
884 // FIXME: need sane buffer flush support for non FD-based streams
885 // FIXME: need read_requires_writable and write_requires_readable
886 // support for non FD-based streams
887
888 // note: flush(nonzero) might call select(), but flush(0) never does,
889 // so this is safe.
890 if (should_flush())
891 flush(0);
892 if (!si.inherit_request && alarm_remaining() == 0)
893 return true; // alarm ticked
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.used() && inbuf.used() >= queue_min)
896 return true; // already ready
897 return false;
898}
899
900
901void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902 bool readable, bool writable, bool isexcept, bool forceable)
903{
904 FD_ZERO(&si.read);
905 FD_ZERO(&si.write);
906 FD_ZERO(&si.except);
907
908 if (forceable)
909 {
910 si.wants.readable = static_cast<bool>(readcb);
911 si.wants.writable = static_cast<bool>(writecb);
912 si.wants.isexception = static_cast<bool>(exceptcb);
913 }
914 else
915 {
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
919 }
920
921 si.max_fd = -1;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure = false;
925
926 wvstime_sync();
927
928 pre_select(si);
929 if (globalstream && forceable && (globalstream != this))
930 {
931 WvStream *s = globalstream;
932 globalstream = NULL; // prevent recursion
933 s->xpre_select(si, SelectRequest(false, false, false));
934 globalstream = s;
935 }
936}
937
938
939int WvStream::_do_select(SelectInfo &si)
940{
941 // prepare timeout
942 timeval tv;
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
945
946#ifdef _WIN32
947 // selecting on an empty set of sockets doesn't cause a delay in win32.
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
950#endif
951
952 // block
953 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
955
956 // handle errors.
957 // EAGAIN and EINTR don't matter because they're totally normal.
958 // ENOBUFS is hopefully transient.
959 // EBADF is kind of gross and might imply that something is wrong,
960 // but it happens sometimes...
961 if (sel < 0
962 && errno != EAGAIN && errno != EINTR
963 && errno != EBADF
964 && errno != ENOBUFS
965 )
966 {
967 seterr(errno);
968 }
969#ifdef _WIN32
970 ::close(fakefd);
971#endif
972 TRACE("select() returned %d\n", sel);
973 return sel;
974}
975
976
977bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
978{
979 // We cannot move the clock backward here, because timers that
980 // were expired in pre_select could then not be expired anymore,
981 // and while time going backward is rather unsettling in general,
982 // for it to be happening between pre_select and post_select is
983 // just outright insanity.
984 wvstime_sync_forward();
985
986 bool sure = post_select(si);
987 if (globalstream && forceable && (globalstream != this))
988 {
989 WvStream *s = globalstream;
990 globalstream = NULL; // prevent recursion
991 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
992 || si.global_sure;
993 globalstream = s;
994 }
995 return sure;
996}
997
998
999bool WvStream::_select(time_t msec_timeout, bool readable, bool writable,
1000 bool isexcept, bool forceable)
1001{
1002 // Detect use of deleted stream
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1004
1005 SelectInfo si;
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1007 forceable);
1008
1009 bool sure = false;
1010 int sel = _do_select(si);
1011 if (sel >= 0)
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream != this))
1014 globalstream->callback();
1015
1016 return sure;
1017}
1018
1019
1021{
1022 return IWvStream::SelectRequest(static_cast<bool>(readcb), static_cast<bool>(writecb),
1023 static_cast<bool>(exceptcb));
1024}
1025
1026
1027void WvStream::force_select(bool readable, bool writable, bool isexception)
1028{
1029 if (readable)
1030 readcb = wv::bind(&WvStream::legacy_callback, this);
1031 if (writable)
1032 writecb = wv::bind(&WvStream::legacy_callback, this);
1033 if (isexception)
1034 exceptcb = wv::bind(&WvStream::legacy_callback, this);
1035}
1036
1037
1038void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
1039{
1040 if (readable)
1041 readcb = 0;
1042 if (writable)
1043 writecb = 0;
1044 if (isexception)
1045 exceptcb = 0;
1046}
1047
1048
1049void WvStream::alarm(time_t msec_timeout)
1050{
1051 if (msec_timeout >= 0)
1052 alarm_time = msecadd(wvstime(), msec_timeout);
1053 else
1054 alarm_time = wvtime_zero;
1055}
1056
1057
1059{
1060 if (alarm_time.tv_sec)
1061 {
1062 WvTime now = wvstime();
1063
1064 // Time is going backward!
1065 if (now < last_alarm_check)
1066 {
1067#if 0 // okay, I give up. Time just plain goes backwards on some systems.
1068 // warn only if it's a "big" difference (sigh...)
1069 if (msecdiff(last_alarm_check, now) > 200)
1070 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
1071 "(%ld:%ld %ld:%ld)\n",
1072 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1073 now.tv_sec, now.tv_usec);
1074#endif
1075 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1076 }
1077
1078 last_alarm_check = now;
1079
1080 time_t remaining = msecdiff(alarm_time, now);
1081 if (remaining < 0)
1082 remaining = 0;
1083 return remaining;
1084 }
1085 return -1;
1086}
1087
1088
1089bool WvStream::continue_select(time_t msec_timeout)
1090{
1091 assert(uses_continue_select);
1092
1093 // if this assertion triggers, you probably tried to do continue_select()
1094 // while inside terminate_continue_select().
1095 assert(call_ctx);
1096
1097 if (msec_timeout >= 0)
1098 alarm(msec_timeout);
1099
1100 alarm(msec_timeout);
1101 WvCont::yield();
1102 alarm(-1); // cancel the still-pending alarm, or it might go off later!
1103
1104 // when we get here, someone has jumped back into our task.
1105 // We have to select(0) here because it's possible that the alarm was
1106 // ticking _and_ data was available. This is aggravated especially if
1107 // msec_delay was zero. Note that running select() here isn't
1108 // inefficient, because if the alarm was expired then pre_select()
1109 // returned true anyway and short-circuited the previous select().
1110 TRACE("hello-%p\n", this);
1111 return !alarm_was_ticking || select(0, static_cast<bool>(readcb),
1112 static_cast<bool>(writecb), static_cast<bool>(exceptcb));
1113}
1114
1115
1117{
1118 close();
1119 call_ctx = 0; // destroy the context, if necessary
1120}
1121
1122
1123const WvAddr *WvStream::src() const
1124{
1125 return NULL;
1126}
1127
1128
1129void WvStream::setcallback(IWvStreamCallback _callfunc)
1130{
1131 callfunc = _callfunc;
1132 call_ctx = 0; // delete any in-progress WvCont
1133}
1134
1135
1136void WvStream::legacy_callback()
1137{
1138 execute();
1139 if (!!callfunc)
1140 callfunc();
1141}
1142
1143
1144IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
1145{
1146 IWvStreamCallback tmp = readcb;
1147
1148 readcb = _callback;
1149
1150 return tmp;
1151}
1152
1153
1154IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
1155{
1156 IWvStreamCallback tmp = writecb;
1157
1158 writecb = _callback;
1159
1160 return tmp;
1161}
1162
1163
1164IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
1165{
1166 IWvStreamCallback tmp = exceptcb;
1167
1168 exceptcb = _callback;
1169
1170 return tmp;
1171}
1172
1173
1174IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
1175{
1176 IWvStreamCallback tmp = closecb;
1177 if (isok())
1178 closecb = _callback;
1179 else
1180 {
1181 // already closed? notify immediately!
1182 closecb = 0;
1183 if (!!_callback)
1184 _callback();
1185 }
1186 return tmp;
1187}
1188
1189
1190void WvStream::unread(WvBuf &unreadbuf, size_t count)
1191{
1192 WvDynBuf tmp;
1193 tmp.merge(unreadbuf, count);
1194 tmp.merge(inbuf);
1195 inbuf.zap();
1196 inbuf.merge(tmp);
1197}
1198
1199
1200IWvStream *WvStream::find_by_wsid(WSID wsid)
1201{
1202 IWvStream *retval = NULL;
1203
1204 if (wsid_map)
1205 {
1206 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1207
1208 if (it != wsid_map->end())
1209 retval = it->second;
1210 }
1211
1212 return retval;
1213}
The basic interface which is included by all other XPLC interfaces and objects.
virtual unsigned int addRef()=0
Indicate you are using this object.
virtual unsigned int release()=0
Indicate that you are finished using this object.
Base class for different address types, each of which will have the ability to convert itself to/from...
void merge(Buffer &inbuf, size_t count)
Efficiently moves count bytes from the specified buffer into this one.
size_t optgettable() const
Returns the optimal maximum number of elements in the buffer currently available for reading without ...
T * mutablepeek(int offset, size_t count)
Returns a non-const pointer info the buffer at the specified offset to the specified number of elemen...
size_t ungettable() const
Returns the maximum number of elements that may be ungotten at this time.
const T * get(size_t count)
Reads exactly the specified number of elements and returns a pointer to a storage location owned by t...
void put(const T *data, size_t count)
Writes the specified number of elements from the specified storage location into the buffer at its ta...
void unget(size_t count)
Ungets exactly the specified number of elements by returning them to the buffer for subsequent reads.
void unalloc(size_t count)
Unallocates exactly the specified number of elements by removing them from the buffer and releasing t...
T * alloc(size_t count)
Allocates exactly the specified number of elements and returns a pointer to an UNINITIALIZED storage ...
size_t used() const
Returns the number of elements in the buffer currently available for reading.
WvCont provides "continuations", which are apparently also known as semi-coroutines.
static void * yield(void *ret=0)
"return" from the current callback, giving value 'ret' to the person who called us.
Definition wvcont.cc:222
virtual bool isok() const
By default, returns true if geterr() == 0.
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
Definition wverror.cc:144
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
Definition wvstream.cc:827
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
Definition wvstream.cc:1174
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
Definition wvstream.cc:1020
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
Definition wvstream.cc:875
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
Definition wvstream.cc:707
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
Definition wvstream.cc:602
virtual bool isreadable()
Returns true if the stream is readable.
Definition wvstream.cc:590
virtual bool isok() const
return true if the stream is actually usable right now
Definition wvstream.cc:445
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
Definition wvstream.cc:690
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
Definition wvstream.cc:1038
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
Definition wvstream.cc:362
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
virtual void nowrite()
Shuts down the writing side of the stream.
Definition wvstream.cc:576
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition wvstream.cc:1049
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
Definition wvstream.cc:1129
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
Definition wvstream.cc:1027
WvStream()
Basic constructor for just a do-nothing WvStream.
Definition wvstream.cc:249
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
bool uses_continue_select
If this is set, enables the use of continue_select().
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
Definition wvstream.cc:394
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
Definition wvstream.cc:724
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
Definition wvstream.cc:844
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
Definition wvstream.cc:583
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition wvstream.cc:490
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
Definition wvstream.cc:1123
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
Definition wvstream.cc:1164
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
Definition wvstream.cc:1116
void noautoforward()
Stops autoforwarding.
Definition wvstream.cc:369
void _callback()
Actually call the registered callfunc and execute().
Definition wvstream.cc:386
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition wvstream.cc:341
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition wvstream.cc:1058
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Definition wvstream.cc:596
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
Definition wvstream.cc:1144
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
void drain()
drain the input buffer (read and discard data until select(0) returns false)
Definition wvstream.cc:699
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
Definition wvstream.cc:1089
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition wvstream.cc:451
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
Definition wvstream.cc:1154
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition wvstream.cc:1190
virtual void noread()
Shuts down the reading side of the stream.
Definition wvstream.cc:569
virtual void callback()
if the stream has a callback function defined, call it now.
Definition wvstream.cc:401
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
This is a WvList of WvStrings, and is a really handy way to parse strings.
WvString popstr()
get the first string in the list, or an empty string if the list is empty.
WvString is an implementation of a simple and efficient printable-string class.
Based on (and interchangeable with) struct timeval.
#define UUID_MAP_END
Marks the end of an interface map.
#define UUID_MAP_BEGIN(component)
Start the interface map for "component".
#define UUID_MAP_ENTRY(iface)
Add an entry to an interface map.
the data structure used by pre_select()/post_select() and internally by select().
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...