14#define __WVSTREAM_UNIT_TEST 1
16#include "wvtimeutils.h"
18#include "wvstreamsdebugger.h"
19#include "wvstrutils.h"
20#include "wvistreamlist.h"
21#include "wvlinkerhack.h"
25#define ENOBUFS WSAENOBUFS
27#define errno GetLastError()
29#include <sys/socket.h>
46# define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
52# define TRACE(x, y...)
58WvStream *WvStream::globalstream = NULL;
66static map<WSID, WvStream*> *wsid_map;
67static WSID next_wsid_to_try;
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
86 s->seterr_both(EINVAL,
"Unknown moniker '%s'", moniker);
93static bool is_prefix_insensitive(
const char *str,
const char *prefix)
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
100static const char *strstr_insensitive(
const char *haystack,
const char *needle)
102 while (*haystack !=
'\0')
104 if (is_prefix_insensitive(haystack, needle))
112static bool contains_insensitive(
const char *haystack,
const char *needle)
114 return strstr_insensitive(haystack, needle) != NULL;
118static const char *list_format =
"%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119static inline const char *Yes_No(
bool val)
121 return val?
"Yes":
"No";
125void WvStream::debugger_streams_display_header(
WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
129 result.append(list_format,
"--WSID",
"-",
"RC",
"-",
"-Ok",
"-",
"-Cs",
"-",
"-AlRem",
"-",
130 "----------------Type",
"-",
"Name--------------------");
131 result_cb(cmd, result);
136static WvString friendly_ms(time_t ms)
142 else if (ms < 60*1000)
144 else if (ms < 60*60*1000)
145 return WvString(
"%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString(
"%sh", ms/(60*60*1000));
149 return WvString(
"%sd", ms/(24*60*60*1000));
152void WvStream::debugger_streams_display_one_stream(
WvStream *s,
154 WvStreamsDebugger::ResultCallback result_cb)
158 unsigned refcount = s->
release();
159 result.append(list_format,
162 Yes_No(s->
isok()),
" ",
167 result_cb(cmd, result);
171void WvStream::debugger_streams_maybe_display_one_stream(
WvStream *s,
174 WvStreamsDebugger::ResultCallback result_cb)
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
181 bool is_num = wvstring_to_num(*arg, wsid);
185 if (s->wsid() == wsid)
193 if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194 || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
202 debugger_streams_display_one_stream(s, cmd, result_cb);
208 WvStreamsDebugger::ResultCallback result_cb,
void *)
210 debugger_streams_display_header(cmd, result_cb);
213 map<WSID, WvStream*>::iterator it;
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
220 return WvString::null;
226 WvStreamsDebugger::ResultCallback result_cb,
void *)
229 return WvString(
"Usage: %s <WSID>", cmd);
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString(
"Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
238 return WvString::null;
242void WvStream::add_debugger_commands()
244 WvStreamsDebugger::add_command(
"streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command(
"close", 0, debugger_close_run_cb, 0);
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
258 readcb(wv::bind(&
WvStream::legacy_callback, this)),
260 outbuf_delayed_flush(false),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
269 TRACE(
"Creating wvstream %p\n",
this);
271 static bool first =
true;
275 WvStream::add_debugger_commands();
280 wsid_map =
new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
287 }
while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid,
this)).second;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
301IWvStream::IWvStream()
306IWvStream::~IWvStream()
313 TRACE(
"destroying %p\n",
this);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
335 WvIStreamList::globallist.unlink(
this);
337 TRACE(
"done destroying %p\n",
this);
343 TRACE(
"flushing in wvstream...\n");
345 TRACE(
"(flushed)\n");
351 IWvStreamCallback cb = closecb;
364 setcallback(wv::bind(autoforward_callback, wv::ref(*
this), wv::ref(s)));
381 len = input.
read(buf,
sizeof(buf));
382 output.
write(buf, len);
408 alarm_time = wvtime_zero;
416#define TEST_CONTINUES_HARSHLY 0
417#if TEST_CONTINUES_HARSHLY
419# warning "Using WvCont for *all* streams for testing!"
464 size_t free = outbuf.free();
469 unsigned char *buf = tmp.
alloc(count);
470 size_t len =
read(buf, count);
480 size_t avail = inbuf.used();
483 const unsigned char *buf = inbuf.get(count);
484 size_t len =
write(buf, count);
485 inbuf.unget(count - len);
492 assert(!count || buf);
495 unsigned char *newbuf;
498 if (bufu < queue_min)
500 newbuf = inbuf.
alloc(queue_min - bufu);
502 i =
uread(newbuf, queue_min - bufu);
503 inbuf.
unalloc(queue_min - bufu - i);
508 if (bufu < queue_min)
516 bufu =
uread(buf, count);
523 memcpy(buf, inbuf.
get(bufu), bufu);
526 TRACE(
"read obj 0x%08x, bytes %d/%d\n", (
unsigned int)
this, bufu, count);
534 assert(!count || buf);
535 if (!
isok() || !buf || !count || stop_write)
return 0;
538 if (!outbuf_delayed_flush && !outbuf.
used())
540 wrote =
uwrite(buf, count);
542 buf = (
const unsigned char *)buf + wrote;
545 if (max_outbuf_size != 0)
547 size_t canbuffer = max_outbuf_size - outbuf.
used();
548 if (count > canbuffer)
553 outbuf.
put(buf, count);
592 return isok() &&
select(0,
true,
false,
false);
598 return !stop_write &&
isok() &&
select(0,
false,
true,
false);
605 assert(separator >= 0);
606 assert(separator <= 255);
612 timeout_time = msecadd(wvtime(), wait_msec);
624 if (inbuf.strchr(separator) > 0)
635 wait_msec = msecdiff(timeout_time, wvtime());
646 hasdata =
select(wait_msec,
true,
false);
655 unsigned char *buf = tmp.
alloc(readahead);
657 size_t len =
uread(buf, readahead);
659 inbuf.
put(tmp.
get(len), len);
666 if (!hasdata && wait_msec == 0)
674 i = inbuf.strchr(separator);
677 assert(eol && *eol == separator);
679 return const_cast<char*
>((
const char *)inbuf.
get(i));
684 inbuf.
alloc(1)[0] = 0;
685 return const_cast<char *
>((
const char *)inbuf.
get(inbuf.
used()));
693 assert(
false &&
"not implemented, come back later!");
703 read(buf,
sizeof(buf));
709 if (is_flushing)
return false;
711 TRACE(
"%p flush starts\n",
this);
714 want_to_flush =
true;
715 bool done = flush_internal(msec_timeout)
716 && flush_outbuf(msec_timeout);
719 TRACE(
"flush stops (%d)\n", done);
726 return want_to_flush;
730bool WvStream::flush_outbuf(time_t msec_timeout)
732 TRACE(
"%p flush_outbuf starts (isok=%d)\n",
this,
isok());
733 bool outbuf_was_used = outbuf.
used();
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
747 while (outbuf_was_used &&
isok())
753 size_t real =
uwrite(outbuf.
get(attempt), attempt);
758 if (
isok() && real < attempt)
760 TRACE(
"flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.
ungettable() >= attempt - real);
762 outbuf.
unget(attempt - real);
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !
select(msec_timeout,
false,
true)))
775 outbuf_was_used = outbuf.
used();
779 if (autoclose_time &&
isok())
781 time_t now = time(NULL);
782 TRACE(
"Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.
used());
784 if ((flush_internal(0) && !outbuf.
used()) || now > autoclose_time)
791 TRACE(
"flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush =
false;
795 TRACE(
"flush_outbuf: now isok=%d\n",
isok());
798 if (outbuf_was_used && !
isok())
802 TRACE(
"flush_outbuf stops\n");
804 return !outbuf_was_used;
808bool WvStream::flush_internal(time_t msec_timeout)
815int WvStream::getrfd()
const
821int WvStream::getwfd()
const
829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
832 TRACE(
"Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.
used(), autoclose_time - now);
850 if (!
isok() || (!si.inherit_request && alarmleft == 0))
856 if (!si.inherit_request)
858 si.wants.readable |=
static_cast<bool>(readcb);
859 si.wants.writable |=
static_cast<bool>(writecb);
860 si.wants.isexception |=
static_cast<bool>(exceptcb);
864 if (si.wants.readable && inbuf.
used() && inbuf.
used() >= queue_min)
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
877 if (!si.inherit_request)
879 si.wants.readable |=
static_cast<bool>(readcb);
880 si.wants.writable |=
static_cast<bool>(writecb);
881 si.wants.isexception |=
static_cast<bool>(exceptcb);
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.
used() && inbuf.
used() >= queue_min)
901void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
902 bool readable,
bool writable,
bool isexcept,
bool forceable)
910 si.wants.readable =
static_cast<bool>(readcb);
911 si.wants.writable =
static_cast<bool>(writecb);
912 si.wants.isexception =
static_cast<bool>(exceptcb);
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure =
false;
929 if (globalstream && forceable && (globalstream !=
this))
933 s->
xpre_select(si, SelectRequest(
false,
false,
false));
939int WvStream::_do_select(SelectInfo &si)
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
953 int sel =
::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
962 && errno != EAGAIN && errno != EINTR
972 TRACE(
"select() returned %d\n", sel);
977bool WvStream::_process_selectinfo(SelectInfo &si,
bool forceable)
984 wvstime_sync_forward();
987 if (globalstream && forceable && (globalstream !=
this))
991 si.global_sure = s->
xpost_select(si, SelectRequest(
false,
false,
false))
999bool WvStream::_select(time_t msec_timeout,
bool readable,
bool writable,
1000 bool isexcept,
bool forceable)
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1010 int sel = _do_select(si);
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream !=
this))
1023 static_cast<bool>(exceptcb));
1030 readcb = wv::bind(&WvStream::legacy_callback,
this);
1032 writecb = wv::bind(&WvStream::legacy_callback,
this);
1034 exceptcb = wv::bind(&WvStream::legacy_callback,
this);
1051 if (msec_timeout >= 0)
1052 alarm_time = msecadd(wvstime(), msec_timeout);
1054 alarm_time = wvtime_zero;
1060 if (alarm_time.tv_sec)
1065 if (now < last_alarm_check)
1069 if (msecdiff(last_alarm_check, now) > 200)
1070 fprintf(stderr,
" ************* TIME WENT BACKWARDS! "
1071 "(%ld:%ld %ld:%ld)\n",
1072 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1073 now.tv_sec, now.tv_usec);
1075 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1078 last_alarm_check = now;
1080 time_t remaining = msecdiff(alarm_time, now);
1097 if (msec_timeout >= 0)
1098 alarm(msec_timeout);
1100 alarm(msec_timeout);
1110 TRACE(
"hello-%p\n",
this);
1112 static_cast<bool>(writecb),
static_cast<bool>(exceptcb));
1131 callfunc = _callfunc;
1136void WvStream::legacy_callback()
1146 IWvStreamCallback tmp = readcb;
1156 IWvStreamCallback tmp = writecb;
1166 IWvStreamCallback tmp = exceptcb;
1176 IWvStreamCallback tmp = closecb;
1193 tmp.
merge(unreadbuf, count);
1200IWvStream *WvStream::find_by_wsid(WSID wsid)
1206 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1208 if (it != wsid_map->end())
1209 retval = it->second;
The basic interface which is included by all other XPLC interfaces and objects.
virtual unsigned int addRef()=0
Indicate you are using this object.
virtual unsigned int release()=0
Indicate that you are finished using this object.
Base class for different address types, each of which will have the ability to convert itself to/from...
void merge(Buffer &inbuf, size_t count)
Efficiently moves count bytes from the specified buffer into this one.
size_t optgettable() const
Returns the optimal maximum number of elements in the buffer currently available for reading without ...
T * mutablepeek(int offset, size_t count)
Returns a non-const pointer info the buffer at the specified offset to the specified number of elemen...
size_t ungettable() const
Returns the maximum number of elements that may be ungotten at this time.
const T * get(size_t count)
Reads exactly the specified number of elements and returns a pointer to a storage location owned by t...
void put(const T *data, size_t count)
Writes the specified number of elements from the specified storage location into the buffer at its ta...
void unget(size_t count)
Ungets exactly the specified number of elements by returning them to the buffer for subsequent reads.
void unalloc(size_t count)
Unallocates exactly the specified number of elements by removing them from the buffer and releasing t...
T * alloc(size_t count)
Allocates exactly the specified number of elements and returns a pointer to an UNINITIALIZED storage ...
void zap()
Clears the buffer.
size_t used() const
Returns the number of elements in the buffer currently available for reading.
WvCont provides "continuations", which are apparently also known as semi-coroutines.
static void * yield(void *ret=0)
"return" from the current callback, giving value 'ret' to the person who called us.
virtual bool isok() const
By default, returns true if geterr() == 0.
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
virtual bool isreadable()
Returns true if the stream is readable.
virtual bool isok() const
return true if the stream is actually usable right now
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
virtual void nowrite()
Shuts down the writing side of the stream.
void alarm(time_t msec_timeout)
set an alarm, ie.
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
WvStream()
Basic constructor for just a do-nothing WvStream.
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
bool uses_continue_select
If this is set, enables the use of continue_select().
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
void noautoforward()
Stops autoforwarding.
void _callback()
Actually call the registered callfunc and execute().
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
void drain()
drain the input buffer (read and discard data until select(0) returns false)
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
virtual void noread()
Shuts down the reading side of the stream.
virtual void callback()
if the stream has a callback function defined, call it now.
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
This is a WvList of WvStrings, and is a really handy way to parse strings.
WvString popstr()
get the first string in the list, or an empty string if the list is empty.
WvString is an implementation of a simple and efficient printable-string class.
Based on (and interchangeable with) struct timeval.
#define UUID_MAP_END
Marks the end of an interface map.
#define UUID_MAP_BEGIN(component)
Start the interface map for "component".
#define UUID_MAP_ENTRY(iface)
Add an entry to an interface map.
the data structure used by pre_select()/post_select() and internally by select().
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...