diff options
-rw-r--r-- | include/uv_link_t.h | 43 | ||||
-rw-r--r-- | src/uv_link_observer_t.c | 123 | ||||
-rw-r--r-- | src/uv_link_source_t.c | 11 | ||||
-rw-r--r-- | src/uv_link_t.c | 57 | ||||
-rw-r--r-- | uv_link_t.gyp | 1 |
5 files changed, 224 insertions, 11 deletions
diff --git a/include/uv_link_t.h b/include/uv_link_t.h index 45a179c..ced26ad 100644 --- a/include/uv_link_t.h +++ b/include/uv_link_t.h @@ -5,6 +5,7 @@ typedef struct uv_link_s uv_link_t; typedef struct uv_link_source_s uv_link_source_t; +typedef struct uv_link_observer_s uv_link_observer_t; typedef void (*uv_link_alloc_cb)(uv_link_t* link, size_t suggested_size, @@ -38,24 +39,52 @@ struct uv_link_s { int (*shutdown)(uv_link_t* link, uv_link_shutdown_cb cb); }; -struct uv_link_source_s { - uv_link_t link; - - uv_stream_t* stream; -}; - UV_EXTERN int uv_link_init(uv_loop_t* loop, uv_link_t* link); UV_EXTERN void uv_link_close(uv_link_t* link); -UV_EXTERN int uv_link_chain(uv_link_t* from, uv_link_t* to); +UV_EXTERN int uv_link_chain(uv_link_t* from, + uv_link_t* to, + uv_link_alloc_cb alloc_cb, + uv_link_read_cb read_cb); UV_EXTERN int uv_link_unchain(uv_link_t* from, uv_link_t* to); +/* Invoke read_cb and alloc_cb with proper link */ +UV_EXTERN void uv_link_invoke_alloc_cb(uv_link_t* link, + size_t suggested_size, + uv_buf_t* buf); +UV_EXTERN void uv_link_invoke_read_cb(uv_link_t* link, + ssize_t nread, + const uv_buf_t* buf); + /* Source */ +struct uv_link_source_s { + uv_link_t link; + + uv_stream_t* stream; +}; + /* NOTE: uses `stream->data` field */ UV_EXTERN int uv_link_source_init(uv_loop_t* loop, uv_link_source_t* source, uv_stream_t* stream); UV_EXTERN void uv_link_source_close(uv_link_source_t* source); +/* Observer */ + +struct uv_link_observer_s { + uv_link_t link; + uv_link_t* target; + + /* This will be called, even if the ones in `link` will be overwritten */ + void (*read_cb)(uv_link_observer_t* observer, + ssize_t nread, + const uv_buf_t* buf); +}; + +UV_EXTERN int uv_link_observer_init(uv_loop_t* loop, + uv_link_observer_t* observer, + uv_link_t* target); +UV_EXTERN int uv_link_observer_close(uv_link_observer_t* observer); + #endif /* INCLUDE_UV_LINK_H_ */ diff --git a/src/uv_link_observer_t.c b/src/uv_link_observer_t.c new file mode 100644 index 0000000..35e6b03 --- /dev/null +++ b/src/uv_link_observer_t.c @@ -0,0 +1,123 @@ +#include <stdlib.h> +#include <string.h> + +#include "common.h" + + +static int uv_link_observer_read_start(uv_link_t* link) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + return observer->target->read_start(observer->target); +} + + +static int uv_link_observer_read_stop(uv_link_t* link) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + return observer->target->read_stop(observer->target); +} + + +static int uv_link_observer_write(uv_link_t* link, + const uv_buf_t bufs[], + unsigned int nbufs, + uv_stream_t* send_handle, + uv_link_write_cb cb) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + return observer->target->write(observer->target, bufs, nbufs, send_handle, cb); +} + + +static int uv_link_observer_try_write(uv_link_t* link, + const uv_buf_t bufs[], + unsigned int nbufs) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + return observer->target->try_write(observer->target, bufs, nbufs); +} + + +static int uv_link_observer_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + return observer->target->shutdown(observer->target, cb); +} + + +static void uv_link_observer_alloc_cb(uv_link_t* link, + size_t suggested_size, + uv_buf_t* buf) { + return uv_link_invoke_alloc_cb(link, suggested_size, buf); +} + + +static void uv_link_observer_read_cb(uv_link_t* link, + ssize_t nread, + const uv_buf_t* buf) { + uv_link_observer_t* observer; + + observer = container_of(link, uv_link_observer_t, link); + + if (observer->read_cb != NULL) + observer->read_cb(observer, nread, buf); + + return uv_link_invoke_read_cb(link, nread, buf); +} + + +int uv_link_observer_init(uv_loop_t* loop, + uv_link_observer_t* observer, + uv_link_t* target) { + int err; + uv_link_t* l; + + memset(observer, 0, sizeof(*observer)); + + err = uv_link_init(loop, &observer->link); + if (err != 0) + return err; + + observer->target = target; + + l = &observer->link; + l->read_start = uv_link_observer_read_start; + l->read_stop = uv_link_observer_read_stop; + l->write = uv_link_observer_write; + l->try_write = uv_link_observer_try_write; + l->shutdown = uv_link_observer_shutdown; + + err = uv_link_chain(target, l, uv_link_observer_alloc_cb, + uv_link_observer_read_cb); + if (err != 0) { + uv_link_close(&observer->link); + return err; + } + + return 0; +} + + +int uv_link_observer_close(uv_link_observer_t* observer) { + int err; + + err = uv_link_unchain(observer->target, &observer->link); + if (err != 0) + return err; + + uv_link_close(&observer->link); + + observer->target = NULL; + + return 0; +} diff --git a/src/uv_link_source_t.c b/src/uv_link_source_t.c index c58e8ec..ff2af73 100644 --- a/src/uv_link_source_t.c +++ b/src/uv_link_source_t.c @@ -1,4 +1,5 @@ #include <stdlib.h> +#include <string.h> #include "common.h" @@ -25,7 +26,7 @@ static void uv_link_source_wrap_alloc_cb(uv_handle_t* handle, source = handle->data; - source->link.alloc_cb(&source->link, suggested_size, buf); + uv_link_invoke_alloc_cb(&source->link, suggested_size, buf); } @@ -36,7 +37,7 @@ static void uv_link_source_wrap_read_cb(uv_stream_t* stream, source = stream->data; - source->link.read_cb(&source->link, nread, buf); + uv_link_invoke_read_cb(&source->link, nread, buf); } @@ -110,7 +111,7 @@ static void uv_link_source_wrap_shutdown_cb(uv_shutdown_t* req, int status) { } -static int uv_link_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) { +static int uv_link_source_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) { uv_link_source_t* source; uv_link_source_shutdown_t* req; @@ -134,6 +135,8 @@ int uv_link_source_init(uv_loop_t* loop, int err; uv_link_t* l; + memset(source, 0, sizeof(*source)); + err = uv_link_init(loop, &source->link); if (err != 0) return err; @@ -146,7 +149,7 @@ int uv_link_source_init(uv_loop_t* loop, l->read_stop = uv_link_source_read_stop; l->write = uv_link_source_write; l->try_write = uv_link_source_try_write; - l->shutdown = uv_link_shutdown; + l->shutdown = uv_link_source_shutdown; return 0; } diff --git a/src/uv_link_t.c b/src/uv_link_t.c index 891640c..e9f88b5 100644 --- a/src/uv_link_t.c +++ b/src/uv_link_t.c @@ -10,3 +10,60 @@ int uv_link_init(uv_loop_t* loop, uv_link_t* link) { void uv_link_close(uv_link_t* link) { } + + +int uv_link_chain(uv_link_t* from, + uv_link_t* to, + uv_link_alloc_cb alloc_cb, + uv_link_read_cb read_cb) { + if (from->child != NULL || to->parent != NULL) + return -1; + + from->child = to; + to->parent = from; + + from->alloc_cb = alloc_cb; + from->read_cb = read_cb; + + return 0; +} + + +int uv_link_unchain(uv_link_t* from, uv_link_t* to) { + if (from->child != to || to->parent != from) + return -1; + + from->child = NULL; + to->parent = NULL; + + from->alloc_cb = NULL; + from->read_cb = NULL; + + return from->read_stop(from); +} + + +void uv_link_invoke_alloc_cb(uv_link_t* link, + size_t suggested_size, + uv_buf_t* buf) { + uv_link_t* target; + + target = link; + if (link->child != NULL) + target = link->child; + + link->alloc_cb(target, suggested_size, buf); +} + + +void uv_link_invoke_read_cb(uv_link_t* link, + ssize_t nread, + const uv_buf_t* buf) { + uv_link_t* target; + + target = link; + if (link->child != NULL) + target = link->child; + + link->read_cb(target, nread, buf); +} diff --git a/uv_link_t.gyp b/uv_link_t.gyp index 7e881e0..f089c42 100644 --- a/uv_link_t.gyp +++ b/uv_link_t.gyp @@ -16,6 +16,7 @@ "sources": [ "src/uv_link_t.c", + "src/uv_link_observer_t.c", "src/uv_link_source_t.c", ], }], |