From e385a982383f31e57b93dd4c4e18ce241f383b6b Mon Sep 17 00:00:00 2001 From: Andri Yngvason Date: Fri, 7 Apr 2023 10:50:10 +0000 Subject: [PATCH] stream: Add a cork to pause sending --- include/stream.h | 5 +++++ src/stream-tcp.c | 19 +++++++++++++++++++ src/stream.c | 6 ++++++ 3 files changed, 30 insertions(+) diff --git a/include/stream.h b/include/stream.h index f166745..f8ee239 100644 --- a/include/stream.h +++ b/include/stream.h @@ -21,6 +21,7 @@ #include "rcbuf.h" #include +#include #ifdef ENABLE_TLS #include @@ -65,6 +66,7 @@ struct stream_impl { ssize_t (*read)(struct stream*, void* dst, size_t size); int (*send)(struct stream*, struct rcbuf* payload, stream_req_fn on_done, void* userdata); + int (*send_first)(struct stream*, struct rcbuf* payload); }; struct stream { @@ -85,6 +87,8 @@ struct stream { uint32_t bytes_sent; uint32_t bytes_received; + + bool cork; }; struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata); @@ -95,6 +99,7 @@ int stream_write(struct stream* self, const void* payload, size_t len, stream_req_fn on_done, void* userdata); int stream_send(struct stream* self, struct rcbuf* payload, stream_req_fn on_done, void* userdata); +int stream_send_first(struct stream* self, struct rcbuf* payload); #ifdef ENABLE_TLS int stream_upgrade_to_tls(struct stream* self, void* context); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 7cff31f..a6c2c42 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -60,6 +60,9 @@ static void stream_tcp_destroy(struct stream* self) static int stream_tcp__flush(struct stream* self) { + if (self->cork) + return 0; + static struct iovec iov[IOV_MAX]; size_t n_msgs = 0; ssize_t bytes_sent; @@ -195,11 +198,27 @@ static int stream_tcp_send(struct stream* self, struct rcbuf* payload, return stream_tcp__flush(self); } +static int stream_tcp_send_first(struct stream* self, struct rcbuf* payload) +{ + if (self->state == STREAM_STATE_CLOSED) + return -1; + + struct stream_req* req = calloc(1, sizeof(*req)); + if (!req) + return -1; + + req->payload = payload; + TAILQ_INSERT_HEAD(&self->send_queue, req, link); + + return stream_tcp__flush(self); +} + static struct stream_impl impl = { .close = stream_tcp_close, .destroy = stream_tcp_destroy, .read = stream_tcp_read, .send = stream_tcp_send, + .send_first = stream_tcp_send_first, }; struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata) diff --git a/src/stream.c b/src/stream.c index 7f762e1..ba9cd56 100644 --- a/src/stream.c +++ b/src/stream.c @@ -37,6 +37,12 @@ int stream_send(struct stream* self, struct rcbuf* payload, return self->impl->send(self, payload, on_done, userdata); } +int stream_send_first(struct stream* self, struct rcbuf* payload) +{ + assert(self->impl && self->impl->send); + return self->impl->send_first(self, payload); +} + int stream_write(struct stream* self, const void* payload, size_t len, stream_req_fn on_done, void* userdata) {