stream: Add a cork to pause sending

websockets
Andri Yngvason 2023-04-07 10:50:10 +00:00
parent 979d10ce62
commit e385a98238
3 changed files with 30 additions and 0 deletions

View File

@ -21,6 +21,7 @@
#include "rcbuf.h" #include "rcbuf.h"
#include <stdint.h> #include <stdint.h>
#include <stdbool.h>
#ifdef ENABLE_TLS #ifdef ENABLE_TLS
#include <gnutls/gnutls.h> #include <gnutls/gnutls.h>
@ -65,6 +66,7 @@ struct stream_impl {
ssize_t (*read)(struct stream*, void* dst, size_t size); ssize_t (*read)(struct stream*, void* dst, size_t size);
int (*send)(struct stream*, struct rcbuf* payload, int (*send)(struct stream*, struct rcbuf* payload,
stream_req_fn on_done, void* userdata); stream_req_fn on_done, void* userdata);
int (*send_first)(struct stream*, struct rcbuf* payload);
}; };
struct stream { struct stream {
@ -85,6 +87,8 @@ struct stream {
uint32_t bytes_sent; uint32_t bytes_sent;
uint32_t bytes_received; uint32_t bytes_received;
bool cork;
}; };
struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata); struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata);
@ -95,6 +99,7 @@ int stream_write(struct stream* self, const void* payload, size_t len,
stream_req_fn on_done, void* userdata); stream_req_fn on_done, void* userdata);
int stream_send(struct stream* self, struct rcbuf* payload, int stream_send(struct stream* self, struct rcbuf* payload,
stream_req_fn on_done, void* userdata); stream_req_fn on_done, void* userdata);
int stream_send_first(struct stream* self, struct rcbuf* payload);
#ifdef ENABLE_TLS #ifdef ENABLE_TLS
int stream_upgrade_to_tls(struct stream* self, void* context); int stream_upgrade_to_tls(struct stream* self, void* context);

View File

@ -60,6 +60,9 @@ static void stream_tcp_destroy(struct stream* self)
static int stream_tcp__flush(struct stream* self) static int stream_tcp__flush(struct stream* self)
{ {
if (self->cork)
return 0;
static struct iovec iov[IOV_MAX]; static struct iovec iov[IOV_MAX];
size_t n_msgs = 0; size_t n_msgs = 0;
ssize_t bytes_sent; ssize_t bytes_sent;
@ -195,11 +198,27 @@ static int stream_tcp_send(struct stream* self, struct rcbuf* payload,
return stream_tcp__flush(self); return stream_tcp__flush(self);
} }
static int stream_tcp_send_first(struct stream* self, struct rcbuf* payload)
{
if (self->state == STREAM_STATE_CLOSED)
return -1;
struct stream_req* req = calloc(1, sizeof(*req));
if (!req)
return -1;
req->payload = payload;
TAILQ_INSERT_HEAD(&self->send_queue, req, link);
return stream_tcp__flush(self);
}
static struct stream_impl impl = { static struct stream_impl impl = {
.close = stream_tcp_close, .close = stream_tcp_close,
.destroy = stream_tcp_destroy, .destroy = stream_tcp_destroy,
.read = stream_tcp_read, .read = stream_tcp_read,
.send = stream_tcp_send, .send = stream_tcp_send,
.send_first = stream_tcp_send_first,
}; };
struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata) struct stream* stream_new(int fd, stream_event_fn on_event, void* userdata)

View File

@ -37,6 +37,12 @@ int stream_send(struct stream* self, struct rcbuf* payload,
return self->impl->send(self, payload, on_done, userdata); return self->impl->send(self, payload, on_done, userdata);
} }
int stream_send_first(struct stream* self, struct rcbuf* payload)
{
assert(self->impl && self->impl->send);
return self->impl->send_first(self, payload);
}
int stream_write(struct stream* self, const void* payload, size_t len, int stream_write(struct stream* self, const void* payload, size_t len,
stream_req_fn on_done, void* userdata) stream_req_fn on_done, void* userdata)
{ {