stream: Add exec_and_send function

This allows us to execute a function right before a leaves the send queue
and is really only useful for NTP as far as I can tell.
websockets
Andri Yngvason 2023-04-08 13:02:43 +00:00
parent 19172140ba
commit 0cdbf6a602
5 changed files with 94 additions and 5 deletions

View File

@ -50,10 +50,12 @@ struct stream;
typedef void (*stream_event_fn)(struct stream*, enum stream_event); typedef void (*stream_event_fn)(struct stream*, enum stream_event);
typedef void (*stream_req_fn)(void*, enum stream_req_status); typedef void (*stream_req_fn)(void*, enum stream_req_status);
typedef struct rcbuf* (*stream_exec_fn)(struct stream*, void* userdata);
struct stream_req { struct stream_req {
struct rcbuf* payload; struct rcbuf* payload;
stream_req_fn on_done; stream_req_fn on_done;
stream_exec_fn exec;
void* userdata; void* userdata;
TAILQ_ENTRY(stream_req) link; TAILQ_ENTRY(stream_req) link;
}; };
@ -67,6 +69,7 @@ struct stream_impl {
int (*send)(struct stream*, struct rcbuf* payload, int (*send)(struct stream*, struct rcbuf* payload,
stream_req_fn on_done, void* userdata); stream_req_fn on_done, void* userdata);
int (*send_first)(struct stream*, struct rcbuf* payload); int (*send_first)(struct stream*, struct rcbuf* payload);
void (*exec_and_send)(struct stream*, stream_exec_fn, void* userdata);
}; };
struct stream { struct stream {
@ -105,6 +108,9 @@ int stream_send(struct stream* self, struct rcbuf* payload,
stream_req_fn on_done, void* userdata); stream_req_fn on_done, void* userdata);
int stream_send_first(struct stream* self, struct rcbuf* payload); int stream_send_first(struct stream* self, struct rcbuf* payload);
// Queue a pure function to be executed when time comes to send it.
void stream_exec_and_send(struct stream* self, stream_exec_fn, void* userdata);
#ifdef ENABLE_TLS #ifdef ENABLE_TLS
int stream_upgrade_to_tls(struct stream* self, void* context); int stream_upgrade_to_tls(struct stream* self, void* context);
#endif #endif

View File

@ -24,6 +24,10 @@ void stream_req__finish(struct stream_req* req, enum stream_req_status status)
if (req->on_done) if (req->on_done)
req->on_done(req->userdata, status); req->on_done(req->userdata, status);
// exec userdata is heap allocated
if (req->exec && req->userdata)
free(req->userdata);
rcbuf_unref(req->payload); rcbuf_unref(req->payload);
free(req); free(req);
} }

View File

@ -69,6 +69,12 @@ static int stream_tcp__flush(struct stream* self)
struct stream_req* req; struct stream_req* req;
TAILQ_FOREACH(req, &self->send_queue, link) { TAILQ_FOREACH(req, &self->send_queue, link) {
if (req->exec) {
if (req->payload)
rcbuf_unref(req->payload);
req->payload = req->exec(self, req->userdata);
}
iov[n_msgs].iov_base = req->payload->payload; iov[n_msgs].iov_base = req->payload->payload;
iov[n_msgs].iov_len = req->payload->size; iov[n_msgs].iov_len = req->payload->size;
@ -108,6 +114,11 @@ static int stream_tcp__flush(struct stream* self)
TAILQ_REMOVE(&self->send_queue, req, link); TAILQ_REMOVE(&self->send_queue, req, link);
stream_req__finish(req, STREAM_REQ_DONE); stream_req__finish(req, STREAM_REQ_DONE);
} else { } else {
if (req->exec) {
free(req->userdata);
req->userdata = NULL;
req->exec = NULL;
}
char* p = req->payload->payload; char* p = req->payload->payload;
size_t s = req->payload->size; size_t s = req->payload->size;
memmove(p, p + s + bytes_left, -bytes_left); memmove(p, p + s + bytes_left, -bytes_left);
@ -213,12 +224,31 @@ static int stream_tcp_send_first(struct stream* self, struct rcbuf* payload)
return stream_tcp__flush(self); return stream_tcp__flush(self);
} }
static void stream_tcp_exec_and_send(struct stream* self,
stream_exec_fn exec_fn, void* userdata)
{
if (self->state == STREAM_STATE_CLOSED)
return;
struct stream_req* req = calloc(1, sizeof(*req));
if (!req)
return;
req->exec = exec_fn;
req->userdata = userdata;
TAILQ_INSERT_TAIL(&self->send_queue, req, link);
stream_tcp__flush(self);
}
static struct stream_impl impl = { static struct stream_impl impl = {
.close = stream_tcp_close, .close = stream_tcp_close,
.destroy = stream_tcp_destroy, .destroy = stream_tcp_destroy,
.read = stream_tcp_read, .read = stream_tcp_read,
.send = stream_tcp_send, .send = stream_tcp_send,
.send_first = stream_tcp_send_first, .send_first = stream_tcp_send_first,
.exec_and_send = stream_tcp_exec_and_send,
}; };
struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata) struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata)

View File

@ -17,6 +17,7 @@
#include "stream.h" #include "stream.h"
#include "stream-common.h" #include "stream-common.h"
#include "websocket.h" #include "websocket.h"
#include "vec.h"
#include "neatvnc.h" #include "neatvnc.h"
#include <assert.h> #include <assert.h>
@ -30,6 +31,11 @@ enum stream_ws_state {
STREAM_WS_STATE_READY, STREAM_WS_STATE_READY,
}; };
struct stream_ws_exec_ctx {
stream_exec_fn exec;
void* userdata;
};
struct stream_ws { struct stream_ws {
struct stream base; struct stream base;
enum stream_ws_state ws_state; enum stream_ws_state ws_state;
@ -155,11 +161,6 @@ static ssize_t stream_ws_read_frame(struct stream_ws* ws, void* dst,
return 0; return 0;
} }
nvnc_trace("Got frame header: opcode=%s, header-len: %zu, payload-len: %zu, read-buffer-len: %zu",
ws_opcode_name(ws->header.opcode),
ws->header.header_length, ws->header.payload_length,
ws->read_index);
if (ws->header.opcode != WS_OPCODE_CONT) { if (ws->header.opcode != WS_OPCODE_CONT) {
ws->current_opcode = ws->header.opcode; ws->current_opcode = ws->header.opcode;
} }
@ -250,6 +251,43 @@ static int stream_ws_send(struct stream* self, struct rcbuf* payload,
return stream_send(ws->tcp_stream, payload, on_done, userdata); return stream_send(ws->tcp_stream, payload, on_done, userdata);
} }
static struct rcbuf* stream_ws_chained_exec(struct stream* tcp_stream,
void* userdata)
{
struct stream_ws_exec_ctx* ctx = userdata;
struct stream_ws* ws = tcp_stream->userdata;
struct rcbuf* buf = ctx->exec(&ws->base, ctx->userdata);
struct vec out;
vec_init(&out, WS_HEADER_MIN_SIZE + buf->size + 1);
struct ws_frame_header head = {
.fin = true,
.opcode = WS_OPCODE_BIN,
.payload_length = buf->size,
};
int head_len = ws_write_frame_header(out.data, &head);
out.len += head_len;
vec_append(&out, buf->payload, buf->size);
return rcbuf_new(out.data, out.len);
}
static void stream_ws_exec_and_send(struct stream* self, stream_exec_fn exec,
void* userdata)
{
struct stream_ws* ws = (struct stream_ws*)self;
struct stream_ws_exec_ctx* ctx = calloc(1, sizeof(*ctx));
assert(ctx);
ctx->exec = exec;
ctx->userdata = userdata;
stream_exec_and_send(ws->tcp_stream, stream_ws_chained_exec, ctx);
}
static void stream_ws_event(struct stream* self, enum stream_event event) static void stream_ws_event(struct stream* self, enum stream_event event)
{ {
struct stream_ws* ws = self->userdata; struct stream_ws* ws = self->userdata;
@ -266,6 +304,7 @@ static struct stream_impl impl = {
.destroy = stream_ws_destroy, .destroy = stream_ws_destroy,
.read = stream_ws_read, .read = stream_ws_read,
.send = stream_ws_send, .send = stream_ws_send,
.exec_and_send = stream_ws_exec_and_send,
}; };
struct stream* stream_ws_new(int fd, stream_event_fn on_event, void* userdata) struct stream* stream_ws_new(int fd, stream_event_fn on_event, void* userdata)

View File

@ -55,3 +55,13 @@ ssize_t stream_read(struct stream* self, void* dst, size_t size)
assert(self->impl && self->impl->read); assert(self->impl && self->impl->read);
return self->impl->read(self, dst, size); return self->impl->read(self, dst, size);
} }
void stream_exec_and_send(struct stream* self, stream_exec_fn exec_fn,
void* userdata)
{
assert(self->impl);
if (self->impl->exec_and_send)
self->impl->exec_and_send(self, exec_fn, userdata);
else
stream_send(self, exec_fn(self, userdata), NULL, NULL);
}