about summary refs log tree commit diff stats
diff options
context:
space:
mode:
-rw-r--r--include/uv_link_t.h43
-rw-r--r--src/uv_link_observer_t.c123
-rw-r--r--src/uv_link_source_t.c11
-rw-r--r--src/uv_link_t.c57
-rw-r--r--uv_link_t.gyp1
5 files changed, 224 insertions, 11 deletions
diff --git a/include/uv_link_t.h b/include/uv_link_t.h
index 45a179c..ced26ad 100644
--- a/include/uv_link_t.h
+++ b/include/uv_link_t.h
@@ -5,6 +5,7 @@
 
 typedef struct uv_link_s uv_link_t;
 typedef struct uv_link_source_s uv_link_source_t;
+typedef struct uv_link_observer_s uv_link_observer_t;
 
 typedef void (*uv_link_alloc_cb)(uv_link_t* link,
                                  size_t suggested_size,
@@ -38,24 +39,52 @@ struct uv_link_s {
   int (*shutdown)(uv_link_t* link, uv_link_shutdown_cb cb);
 };
 
-struct uv_link_source_s {
-  uv_link_t link;
-
-  uv_stream_t* stream;
-};
-
 UV_EXTERN int uv_link_init(uv_loop_t* loop, uv_link_t* link);
 UV_EXTERN void uv_link_close(uv_link_t* link);
 
-UV_EXTERN int uv_link_chain(uv_link_t* from, uv_link_t* to);
+UV_EXTERN int uv_link_chain(uv_link_t* from,
+                            uv_link_t* to,
+                            uv_link_alloc_cb alloc_cb,
+                            uv_link_read_cb read_cb);
 UV_EXTERN int uv_link_unchain(uv_link_t* from, uv_link_t* to);
 
+/* Invoke read_cb and alloc_cb with proper link */
+UV_EXTERN void uv_link_invoke_alloc_cb(uv_link_t* link,
+                                       size_t suggested_size,
+                                       uv_buf_t* buf);
+UV_EXTERN void uv_link_invoke_read_cb(uv_link_t* link,
+                                      ssize_t nread,
+                                      const uv_buf_t* buf);
+
 /* Source */
 
+struct uv_link_source_s {
+  uv_link_t link;
+
+  uv_stream_t* stream;
+};
+
 /* NOTE: uses `stream->data` field */
 UV_EXTERN int uv_link_source_init(uv_loop_t* loop,
                                   uv_link_source_t* source,
                                   uv_stream_t* stream);
 UV_EXTERN void uv_link_source_close(uv_link_source_t* source);
 
+/* Observer */
+
+struct uv_link_observer_s {
+  uv_link_t link;
+  uv_link_t* target;
+
+  /* This will be called, even if the ones in `link` will be overwritten */
+  void (*read_cb)(uv_link_observer_t* observer,
+                  ssize_t nread,
+                  const uv_buf_t* buf);
+};
+
+UV_EXTERN int uv_link_observer_init(uv_loop_t* loop,
+                                    uv_link_observer_t* observer,
+                                    uv_link_t* target);
+UV_EXTERN int uv_link_observer_close(uv_link_observer_t* observer);
+
 #endif  /* INCLUDE_UV_LINK_H_ */
diff --git a/src/uv_link_observer_t.c b/src/uv_link_observer_t.c
new file mode 100644
index 0000000..35e6b03
--- /dev/null
+++ b/src/uv_link_observer_t.c
@@ -0,0 +1,123 @@
+#include <stdlib.h>
+#include <string.h>
+
+#include "common.h"
+
+
+static int uv_link_observer_read_start(uv_link_t* link) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  return observer->target->read_start(observer->target);
+}
+
+
+static int uv_link_observer_read_stop(uv_link_t* link) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  return observer->target->read_stop(observer->target);
+}
+
+
+static int uv_link_observer_write(uv_link_t* link,
+                                  const uv_buf_t bufs[],
+                                  unsigned int nbufs,
+                                  uv_stream_t* send_handle,
+                                  uv_link_write_cb cb) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  return observer->target->write(observer->target, bufs, nbufs, send_handle, cb);
+}
+
+
+static int uv_link_observer_try_write(uv_link_t* link,
+                                      const uv_buf_t bufs[],
+                                      unsigned int nbufs) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  return observer->target->try_write(observer->target, bufs, nbufs);
+}
+
+
+static int uv_link_observer_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  return observer->target->shutdown(observer->target, cb);
+}
+
+
+static void uv_link_observer_alloc_cb(uv_link_t* link,
+                                      size_t suggested_size,
+                                      uv_buf_t* buf) {
+  return uv_link_invoke_alloc_cb(link, suggested_size, buf);
+}
+
+
+static void uv_link_observer_read_cb(uv_link_t* link,
+                                     ssize_t nread,
+                                     const uv_buf_t* buf) {
+  uv_link_observer_t* observer;
+
+  observer = container_of(link, uv_link_observer_t, link);
+
+  if (observer->read_cb != NULL)
+    observer->read_cb(observer, nread, buf);
+
+  return uv_link_invoke_read_cb(link, nread, buf);
+}
+
+
+int uv_link_observer_init(uv_loop_t* loop,
+                          uv_link_observer_t* observer,
+                          uv_link_t* target) {
+  int err;
+  uv_link_t* l;
+
+  memset(observer, 0, sizeof(*observer));
+
+  err = uv_link_init(loop, &observer->link);
+  if (err != 0)
+    return err;
+
+  observer->target = target;
+
+  l = &observer->link;
+  l->read_start = uv_link_observer_read_start;
+  l->read_stop = uv_link_observer_read_stop;
+  l->write = uv_link_observer_write;
+  l->try_write = uv_link_observer_try_write;
+  l->shutdown = uv_link_observer_shutdown;
+
+  err = uv_link_chain(target, l, uv_link_observer_alloc_cb,
+                      uv_link_observer_read_cb);
+  if (err != 0) {
+    uv_link_close(&observer->link);
+    return err;
+  }
+
+  return 0;
+}
+
+
+int uv_link_observer_close(uv_link_observer_t* observer) {
+  int err;
+
+  err = uv_link_unchain(observer->target, &observer->link);
+  if (err != 0)
+    return err;
+
+  uv_link_close(&observer->link);
+
+  observer->target = NULL;
+
+  return 0;
+}
diff --git a/src/uv_link_source_t.c b/src/uv_link_source_t.c
index c58e8ec..ff2af73 100644
--- a/src/uv_link_source_t.c
+++ b/src/uv_link_source_t.c
@@ -1,4 +1,5 @@
 #include <stdlib.h>
+#include <string.h>
 
 #include "common.h"
 
@@ -25,7 +26,7 @@ static void uv_link_source_wrap_alloc_cb(uv_handle_t* handle,
 
   source = handle->data;
 
-  source->link.alloc_cb(&source->link, suggested_size, buf);
+  uv_link_invoke_alloc_cb(&source->link, suggested_size, buf);
 }
 
 
@@ -36,7 +37,7 @@ static void uv_link_source_wrap_read_cb(uv_stream_t* stream,
 
   source = stream->data;
 
-  source->link.read_cb(&source->link, nread, buf);
+  uv_link_invoke_read_cb(&source->link, nread, buf);
 }
 
 
@@ -110,7 +111,7 @@ static void uv_link_source_wrap_shutdown_cb(uv_shutdown_t* req, int status) {
 }
 
 
-static int uv_link_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) {
+static int uv_link_source_shutdown(uv_link_t* link, uv_link_shutdown_cb cb) {
   uv_link_source_t* source;
   uv_link_source_shutdown_t* req;
 
@@ -134,6 +135,8 @@ int uv_link_source_init(uv_loop_t* loop,
   int err;
   uv_link_t* l;
 
+  memset(source, 0, sizeof(*source));
+
   err = uv_link_init(loop, &source->link);
   if (err != 0)
     return err;
@@ -146,7 +149,7 @@ int uv_link_source_init(uv_loop_t* loop,
   l->read_stop = uv_link_source_read_stop;
   l->write = uv_link_source_write;
   l->try_write = uv_link_source_try_write;
-  l->shutdown = uv_link_shutdown;
+  l->shutdown = uv_link_source_shutdown;
 
   return 0;
 }
diff --git a/src/uv_link_t.c b/src/uv_link_t.c
index 891640c..e9f88b5 100644
--- a/src/uv_link_t.c
+++ b/src/uv_link_t.c
@@ -10,3 +10,60 @@ int uv_link_init(uv_loop_t* loop, uv_link_t* link) {
 
 void uv_link_close(uv_link_t* link) {
 }
+
+
+int uv_link_chain(uv_link_t* from,
+                  uv_link_t* to,
+                  uv_link_alloc_cb alloc_cb,
+                  uv_link_read_cb read_cb) {
+  if (from->child != NULL || to->parent != NULL)
+    return -1;
+
+  from->child = to;
+  to->parent = from;
+
+  from->alloc_cb = alloc_cb;
+  from->read_cb = read_cb;
+
+  return 0;
+}
+
+
+int uv_link_unchain(uv_link_t* from, uv_link_t* to) {
+  if (from->child != to || to->parent != from)
+    return -1;
+
+  from->child = NULL;
+  to->parent = NULL;
+
+  from->alloc_cb = NULL;
+  from->read_cb = NULL;
+
+  return from->read_stop(from);
+}
+
+
+void uv_link_invoke_alloc_cb(uv_link_t* link,
+                             size_t suggested_size,
+                             uv_buf_t* buf) {
+  uv_link_t* target;
+
+  target = link;
+  if (link->child != NULL)
+    target = link->child;
+
+  link->alloc_cb(target, suggested_size, buf);
+}
+
+
+void uv_link_invoke_read_cb(uv_link_t* link,
+                            ssize_t nread,
+                            const uv_buf_t* buf) {
+  uv_link_t* target;
+
+  target = link;
+  if (link->child != NULL)
+    target = link->child;
+
+  link->read_cb(target, nread, buf);
+}
diff --git a/uv_link_t.gyp b/uv_link_t.gyp
index 7e881e0..f089c42 100644
--- a/uv_link_t.gyp
+++ b/uv_link_t.gyp
@@ -16,6 +16,7 @@
 
     "sources": [
       "src/uv_link_t.c",
+      "src/uv_link_observer_t.c",
       "src/uv_link_source_t.c",
     ],
   }],