From 0cdbf6a602caf99802439b5653128ba15a7566d7 Mon Sep 17 00:00:00 2001 From: Andri Yngvason Date: Sat, 8 Apr 2023 13:02:43 +0000 Subject: [PATCH] stream: Add exec_and_send function This allows us to execute a function right before a leaves the send queue and is really only useful for NTP as far as I can tell. --- include/stream.h | 6 ++++++ src/stream-common.c | 4 ++++ src/stream-tcp.c | 30 +++++++++++++++++++++++++++ src/stream-ws.c | 49 ++++++++++++++++++++++++++++++++++++++++----- src/stream.c | 10 +++++++++ 5 files changed, 94 insertions(+), 5 deletions(-) diff --git a/include/stream.h b/include/stream.h index c5e0dbf..6b1f8cf 100644 --- a/include/stream.h +++ b/include/stream.h @@ -50,10 +50,12 @@ struct stream; typedef void (*stream_event_fn)(struct stream*, enum stream_event); typedef void (*stream_req_fn)(void*, enum stream_req_status); +typedef struct rcbuf* (*stream_exec_fn)(struct stream*, void* userdata); struct stream_req { struct rcbuf* payload; stream_req_fn on_done; + stream_exec_fn exec; void* userdata; TAILQ_ENTRY(stream_req) link; }; @@ -67,6 +69,7 @@ struct stream_impl { int (*send)(struct stream*, struct rcbuf* payload, stream_req_fn on_done, void* userdata); int (*send_first)(struct stream*, struct rcbuf* payload); + void (*exec_and_send)(struct stream*, stream_exec_fn, void* userdata); }; struct stream { @@ -105,6 +108,9 @@ int stream_send(struct stream* self, struct rcbuf* payload, stream_req_fn on_done, void* userdata); int stream_send_first(struct stream* self, struct rcbuf* payload); +// Queue a pure function to be executed when time comes to send it. +void stream_exec_and_send(struct stream* self, stream_exec_fn, void* userdata); + #ifdef ENABLE_TLS int stream_upgrade_to_tls(struct stream* self, void* context); #endif diff --git a/src/stream-common.c b/src/stream-common.c index 63f17c0..9140c1d 100644 --- a/src/stream-common.c +++ b/src/stream-common.c @@ -24,6 +24,10 @@ void stream_req__finish(struct stream_req* req, enum stream_req_status status) if (req->on_done) req->on_done(req->userdata, status); + // exec userdata is heap allocated + if (req->exec && req->userdata) + free(req->userdata); + rcbuf_unref(req->payload); free(req); } diff --git a/src/stream-tcp.c b/src/stream-tcp.c index a6c2c42..a25d437 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -69,6 +69,12 @@ static int stream_tcp__flush(struct stream* self) struct stream_req* req; TAILQ_FOREACH(req, &self->send_queue, link) { + if (req->exec) { + if (req->payload) + rcbuf_unref(req->payload); + req->payload = req->exec(self, req->userdata); + } + iov[n_msgs].iov_base = req->payload->payload; iov[n_msgs].iov_len = req->payload->size; @@ -108,6 +114,11 @@ static int stream_tcp__flush(struct stream* self) TAILQ_REMOVE(&self->send_queue, req, link); stream_req__finish(req, STREAM_REQ_DONE); } else { + if (req->exec) { + free(req->userdata); + req->userdata = NULL; + req->exec = NULL; + } char* p = req->payload->payload; size_t s = req->payload->size; memmove(p, p + s + bytes_left, -bytes_left); @@ -213,12 +224,31 @@ static int stream_tcp_send_first(struct stream* self, struct rcbuf* payload) return stream_tcp__flush(self); } +static void stream_tcp_exec_and_send(struct stream* self, + stream_exec_fn exec_fn, void* userdata) +{ + if (self->state == STREAM_STATE_CLOSED) + return; + + struct stream_req* req = calloc(1, sizeof(*req)); + if (!req) + return; + + req->exec = exec_fn; + req->userdata = userdata; + + TAILQ_INSERT_TAIL(&self->send_queue, req, link); + + stream_tcp__flush(self); +} + static struct stream_impl impl = { .close = stream_tcp_close, .destroy = stream_tcp_destroy, .read = stream_tcp_read, .send = stream_tcp_send, .send_first = stream_tcp_send_first, + .exec_and_send = stream_tcp_exec_and_send, }; struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata) diff --git a/src/stream-ws.c b/src/stream-ws.c index 2944354..d0091c3 100644 --- a/src/stream-ws.c +++ b/src/stream-ws.c @@ -17,6 +17,7 @@ #include "stream.h" #include "stream-common.h" #include "websocket.h" +#include "vec.h" #include "neatvnc.h" #include @@ -30,6 +31,11 @@ enum stream_ws_state { STREAM_WS_STATE_READY, }; +struct stream_ws_exec_ctx { + stream_exec_fn exec; + void* userdata; +}; + struct stream_ws { struct stream base; enum stream_ws_state ws_state; @@ -155,11 +161,6 @@ static ssize_t stream_ws_read_frame(struct stream_ws* ws, void* dst, return 0; } - nvnc_trace("Got frame header: opcode=%s, header-len: %zu, payload-len: %zu, read-buffer-len: %zu", - ws_opcode_name(ws->header.opcode), - ws->header.header_length, ws->header.payload_length, - ws->read_index); - if (ws->header.opcode != WS_OPCODE_CONT) { ws->current_opcode = ws->header.opcode; } @@ -250,6 +251,43 @@ static int stream_ws_send(struct stream* self, struct rcbuf* payload, return stream_send(ws->tcp_stream, payload, on_done, userdata); } +static struct rcbuf* stream_ws_chained_exec(struct stream* tcp_stream, + void* userdata) +{ + struct stream_ws_exec_ctx* ctx = userdata; + struct stream_ws* ws = tcp_stream->userdata; + + struct rcbuf* buf = ctx->exec(&ws->base, ctx->userdata); + + struct vec out; + vec_init(&out, WS_HEADER_MIN_SIZE + buf->size + 1); + + struct ws_frame_header head = { + .fin = true, + .opcode = WS_OPCODE_BIN, + .payload_length = buf->size, + }; + int head_len = ws_write_frame_header(out.data, &head); + out.len += head_len; + + vec_append(&out, buf->payload, buf->size); + return rcbuf_new(out.data, out.len); +} + +static void stream_ws_exec_and_send(struct stream* self, stream_exec_fn exec, + void* userdata) +{ + struct stream_ws* ws = (struct stream_ws*)self; + + struct stream_ws_exec_ctx* ctx = calloc(1, sizeof(*ctx)); + assert(ctx); + + ctx->exec = exec; + ctx->userdata = userdata; + + stream_exec_and_send(ws->tcp_stream, stream_ws_chained_exec, ctx); +} + static void stream_ws_event(struct stream* self, enum stream_event event) { struct stream_ws* ws = self->userdata; @@ -266,6 +304,7 @@ static struct stream_impl impl = { .destroy = stream_ws_destroy, .read = stream_ws_read, .send = stream_ws_send, + .exec_and_send = stream_ws_exec_and_send, }; struct stream* stream_ws_new(int fd, stream_event_fn on_event, void* userdata) diff --git a/src/stream.c b/src/stream.c index ba9cd56..457035c 100644 --- a/src/stream.c +++ b/src/stream.c @@ -55,3 +55,13 @@ ssize_t stream_read(struct stream* self, void* dst, size_t size) assert(self->impl && self->impl->read); return self->impl->read(self, dst, size); } + +void stream_exec_and_send(struct stream* self, stream_exec_fn exec_fn, + void* userdata) +{ + assert(self->impl); + if (self->impl->exec_and_send) + self->impl->exec_and_send(self, exec_fn, userdata); + else + stream_send(self, exec_fn(self, userdata), NULL, NULL); +}