QUIC: stream lingering.

Now ngx_quic_stream_t is decoupled from ngx_connection_t in a way that it
can persist after connection is closed by application.  During this period,
server is expecting stream final size from client for correct flow control.
Also, buffered output is sent to client as more flow control credit is granted.
This commit is contained in:
Roman Arutyunyan 2022-02-05 12:54:54 +03:00
parent 6e67500606
commit 28919d3e59
6 changed files with 272 additions and 193 deletions

View File

@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_quic_conf_t *conf,
ctp->active_connection_id_limit = 2;
ngx_queue_init(&qc->streams.uninitialized);
ngx_queue_init(&qc->streams.free);
qc->streams.recv_max_data = qc->tp.initial_max_data;
qc->streams.recv_window = qc->streams.recv_max_data;

View File

@ -78,12 +78,14 @@ struct ngx_quic_stream_s {
uint64_t id;
uint64_t acked;
uint64_t send_max_data;
uint64_t send_offset;
uint64_t send_final_size;
uint64_t recv_max_data;
uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last;
uint64_t recv_size;
uint64_t final_size;
uint64_t recv_final_size;
ngx_chain_t *in;
ngx_chain_t *out;
ngx_uint_t cancelable; /* unsigned cancelable:1; */

View File

@ -114,13 +114,16 @@ struct ngx_quic_socket_s {
typedef struct {
ngx_rbtree_t tree;
ngx_rbtree_node_t sentinel;
ngx_queue_t uninitialized;
ngx_queue_t free;
uint64_t sent;
uint64_t recv_offset;
uint64_t recv_window;
uint64_t recv_last;
uint64_t recv_max_data;
uint64_t send_offset;
uint64_t send_max_data;
uint64_t server_max_streams_uni;

View File

@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c, ngx_quic_frame_t *f, size_t len)
return NGX_ERROR;
}
if (f->type == NGX_QUIC_FT_STREAM) {
f->u.stream.fin = 0;
}
ngx_queue_insert_after(&f->queue, &nf->queue);
return NGX_OK;

View File

@ -13,6 +13,8 @@
#define NGX_QUIC_STREAM_GONE (void *) -1
static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs,
ngx_uint_t err);
static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c);
static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c);
static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id);
@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf,
size_t size);
static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit);
static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs);
static void ngx_quic_stream_cleanup_handler(void *data);
static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c);
static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs);
static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last);
static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last);
static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs);
static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c);
static void ngx_quic_set_event(ngx_event_t *ev);
@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
ns = 0;
#endif
for (node = ngx_rbtree_min(tree->root, tree->sentinel);
node;
node = ngx_rbtree_next(tree, node))
{
node = ngx_rbtree_min(tree->root, tree->sentinel);
while (node) {
qs = (ngx_quic_stream_t *) node;
node = ngx_rbtree_next(tree, node);
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
if (qs->connection == NULL) {
ngx_quic_close_stream(qs);
continue;
}
ngx_quic_set_event(qs->connection->read);
ngx_quic_set_event(qs->connection->write);
@ -212,14 +220,18 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
ngx_int_t
ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
{
return ngx_quic_do_reset_stream(c->quic, err);
}
static ngx_int_t
ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err)
{
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
qs = c->quic;
if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
}
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
qs->send_final_size = qs->send_offset;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL reset", qs->id);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
frame->type = NGX_QUIC_FT_RESET_STREAM;
frame->u.reset_stream.id = qs->id;
frame->u.reset_stream.error_code = err;
frame->u.reset_stream.final_size = c->sent;
frame->u.reset_stream.final_size = qs->send_offset;
ngx_quic_queue_frame(qc, frame);
ngx_quic_free_chain(pc, qs->out);
qs->out = NULL;
return NGX_OK;
}
@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
static ngx_int_t
ngx_quic_shutdown_stream_send(ngx_connection_t *c)
{
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_t *qs;
qs = c->quic;
@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connection_t *c)
return NGX_OK;
}
qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
qs->send_final_size = c->sent;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
"quic stream id:0x%xL send shutdown", qs->id);
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_STREAM;
frame->u.stream.off = 1;
frame->u.stream.len = 1;
frame->u.stream.fin = 1;
frame->u.stream.stream_id = qs->id;
frame->u.stream.offset = c->sent;
frame->u.stream.length = 0;
ngx_quic_queue_frame(qc, frame);
return NGX_OK;
return ngx_quic_stream_flush(qs);
}
@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL recv shutdown", qs->id);
frame->level = ssl_encryption_application;
@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
{
ngx_log_t *log;
ngx_pool_t *pool;
ngx_queue_t *q;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_pool_cleanup_t *cln;
@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
qc = ngx_quic_get_connection(c);
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
return NULL;
if (!ngx_queue_empty(&qc->streams.free)) {
q = ngx_queue_head(&qc->streams.free);
qs = ngx_queue_data(q, ngx_quic_stream_t, queue);
ngx_queue_remove(&qs->queue);
} else {
/*
* the number of streams is limited by transport
* parameters and application requirements
*/
qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t));
if (qs == NULL) {
return NULL;
}
}
qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t));
if (qs == NULL) {
ngx_destroy_pool(pool);
return NULL;
}
ngx_memzero(qs, sizeof(ngx_quic_stream_t));
qs->node.key = id;
qs->parent = c;
qs->id = id;
qs->final_size = (uint64_t) -1;
qs->send_final_size = (uint64_t) -1;
qs->recv_final_size = (uint64_t) -1;
pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log);
if (pool == NULL) {
ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
log = ngx_palloc(pool, sizeof(ngx_log_t));
if (log == NULL) {
ngx_destroy_pool(pool);
ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
sc = ngx_get_connection(c->fd, log);
if (sc == NULL) {
ngx_destroy_pool(pool);
ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
if (cln == NULL) {
ngx_close_connection(sc);
ngx_destroy_pool(pool);
ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
return NULL;
}
@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
return NGX_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL recv buf:%uz", qs->id, size);
if (size == 0) {
@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
rev->ready = 0;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD
&& qs->recv_offset == qs->final_size)
&& qs->recv_offset == qs->recv_final_size)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
}
@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv len:%z", qs->id, len);
if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) {
return NGX_ERROR;
}
@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
off_t flow;
size_t n;
ngx_event_t *wev;
ngx_chain_t *out;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
flow = ngx_quic_max_stream_flow(c);
flow = qs->acked + qc->conf->stream_buffer_size - c->sent;
if (flow == 0) {
wev->ready = 0;
return in;
@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
limit = flow;
}
in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n);
in = ngx_quic_write_chain(pc, &qs->out, in, limit,
c->sent - qs->send_offset, &n);
if (in == NGX_CHAIN_ERROR) {
return NGX_CHAIN_ERROR;
}
out = ngx_quic_read_chain(pc, &qs->out, n);
if (out == NGX_CHAIN_ERROR) {
return NGX_CHAIN_ERROR;
}
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_CHAIN_ERROR;
}
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_STREAM;
frame->data = out;
frame->u.stream.off = 1;
frame->u.stream.len = 1;
frame->u.stream.fin = 0;
frame->u.stream.stream_id = qs->id;
frame->u.stream.offset = c->sent;
frame->u.stream.length = n;
c->sent += n;
qc->streams.sent += n;
ngx_quic_queue_frame(qc, frame);
if (flow == (off_t) n) {
wev->ready = 0;
}
@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send_chain sent:%uz", n);
if (ngx_quic_stream_flush(qs) != NGX_OK) {
return NGX_CHAIN_ERROR;
}
return in;
}
static size_t
ngx_quic_max_stream_flow(ngx_connection_t *c)
static ngx_int_t
ngx_quic_stream_flush(ngx_quic_stream_t *qs)
{
size_t size;
uint64_t sent, unacked;
ngx_quic_stream_t *qs;
off_t limit;
size_t len;
ngx_uint_t last;
ngx_chain_t *out, *cl;
ngx_quic_frame_t *frame;
ngx_connection_t *pc;
ngx_quic_connection_t *qc;
qs = c->quic;
qc = ngx_quic_get_connection(qs->parent);
if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) {
return NGX_OK;
}
size = qc->conf->stream_buffer_size;
sent = c->sent;
unacked = sent - qs->acked;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
if (qc->streams.send_max_data == 0) {
qc->streams.send_max_data = qc->ctp.initial_max_data;
}
if (unacked >= size) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit buffer size");
return 0;
limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset,
qs->send_max_data - qs->send_offset);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL flush limit:%O", qs->id, limit);
out = ngx_quic_read_chain(pc, &qs->out, limit);
if (out == NGX_CHAIN_ERROR) {
return NGX_ERROR;
}
size -= unacked;
len = 0;
last = 0;
if (qc->streams.sent >= qc->streams.send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit MAX_DATA");
return 0;
for (cl = out; cl; cl = cl->next) {
len += cl->buf->last - cl->buf->pos;
}
if (qc->streams.sent + size > qc->streams.send_max_data) {
size = qc->streams.send_max_data - qc->streams.sent;
if (qs->send_final_size != (uint64_t) -1
&& qs->send_final_size == qs->send_offset + len)
{
qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
last = 1;
}
if (sent >= qs->send_max_data) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow hit MAX_STREAM_DATA");
return 0;
if (len == 0 && !last) {
return NGX_OK;
}
if (sent + size > qs->send_max_data) {
size = qs->send_max_data - sent;
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic send flow:%uz", size);
frame->level = ssl_encryption_application;
frame->type = NGX_QUIC_FT_STREAM;
frame->data = out;
return size;
frame->u.stream.off = 1;
frame->u.stream.len = 1;
frame->u.stream.fin = last;
frame->u.stream.stream_id = qs->id;
frame->u.stream.offset = qs->send_offset;
frame->u.stream.length = len;
ngx_quic_queue_frame(qc, frame);
qs->send_offset += len;
qc->streams.send_offset += len;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL flush len:%uz last:%ui",
qs->id, len, last);
if (qs->connection == NULL) {
return ngx_quic_close_stream(qs);
}
return NGX_OK;
}
@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *data)
{
ngx_connection_t *c = data;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_t *qs;
qs = c->quic;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0,
"quic stream id:0x%xL cleanup", qs->id);
if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) {
ngx_quic_close_connection(c, NGX_ERROR);
return;
}
qs->connection = NULL;
if (ngx_quic_close_stream(qs) != NGX_OK) {
ngx_quic_close_connection(c, NGX_ERROR);
return;
}
}
static ngx_int_t
ngx_quic_close_stream(ngx_quic_stream_t *qs)
{
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_connection_t *qc;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL cleanup", qs->id);
if (!qc->closing) {
/* make sure everything is sent and final size is received */
if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
|| qs->send_state == NGX_QUIC_STREAM_SEND_READY
|| qs->send_state == NGX_QUIC_STREAM_SEND_SEND)
{
return NGX_OK;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL close", qs->id);
ngx_rbtree_delete(&qc->streams.tree, &qs->node);
ngx_quic_free_chain(pc, qs->in);
ngx_quic_free_chain(pc, qs->out);
ngx_rbtree_delete(&qc->streams.tree, &qs->node);
ngx_queue_insert_tail(&qc->streams.free, &qs->queue);
if (qc->closing) {
/* schedule handler call to continue ngx_quic_close_connection() */
ngx_post_event(pc->read, &ngx_posted_events);
return;
return NGX_OK;
}
if (qc->error) {
goto done;
}
(void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN);
(void) ngx_quic_update_flow(c, qs->recv_last);
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) {
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {
goto done;
return NGX_ERROR;
}
frame->level = ssl_encryption_application;
@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *data)
ngx_quic_queue_frame(qc, frame);
}
done:
(void) ngx_quic_output(pc);
if (qc->shutdown) {
ngx_post_event(pc->read, &ngx_posted_events);
}
return NGX_OK;
}
@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
{
size_t size;
uint64_t last;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
ngx_quic_stream_frame_t *f;
@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_OK;
}
sc = qs->connection;
if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
&& qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
{
return NGX_OK;
}
if (ngx_quic_control_flow(sc, last) != NGX_OK) {
if (ngx_quic_control_flow(qs, last) != NGX_OK) {
return NGX_ERROR;
}
if (qs->final_size != (uint64_t) -1 && last > qs->final_size) {
if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
}
if (f->fin) {
if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last)
{
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_ERROR;
}
qs->final_size = last;
qs->recv_final_size = last;
qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
}
@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
qs->recv_size += size;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
&& qs->recv_size == qs->final_size)
&& qs->recv_size == qs->recv_final_size)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD;
}
if (qs->connection == NULL) {
return ngx_quic_close_stream(qs);
}
if (f->offset == qs->recv_offset) {
ngx_quic_set_event(sc->read);
ngx_quic_set_event(qs->connection->read);
}
return NGX_OK;
@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_connection_t *c,
return NGX_OK;
}
if (tree->root != tree->sentinel
&& qc->streams.sent >= qc->streams.send_max_data)
if (tree->root == tree->sentinel
|| qc->streams.send_offset < qc->streams.send_max_data)
{
for (node = ngx_rbtree_min(tree->root, tree->sentinel);
node;
node = ngx_rbtree_next(tree, node))
{
qs = (ngx_quic_stream_t *) node;
ngx_quic_set_event(qs->connection->write);
}
/* not blocked on MAX_DATA */
qc->streams.send_max_data = f->max_data;
return NGX_OK;
}
qc->streams.send_max_data = f->max_data;
node = ngx_rbtree_min(tree->root, tree->sentinel);
while (node && qc->streams.send_offset < qc->streams.send_max_data) {
qs = (ngx_quic_stream_t *) node;
node = ngx_rbtree_next(tree, node);
if (ngx_quic_stream_flush(qs) != NGX_OK) {
return NGX_ERROR;
}
}
return NGX_OK;
}
@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
return NGX_OK;
}
return ngx_quic_update_max_stream_data(qs->connection);
return ngx_quic_update_max_stream_data(qs);
}
@ -1197,7 +1258,6 @@ ngx_int_t
ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f)
{
uint64_t sent;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c,
return NGX_OK;
}
sent = qs->connection->sent;
if (sent >= qs->send_max_data) {
ngx_quic_set_event(qs->connection->write);
if (qs->send_offset < qs->send_max_data) {
/* not blocked on MAX_STREAM_DATA */
qs->send_max_data = f->limit;
return NGX_OK;
}
qs->send_max_data = f->limit;
return NGX_OK;
return ngx_quic_stream_flush(qs);
}
@ -1240,7 +1300,6 @@ ngx_int_t
ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
{
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
sc = qs->connection;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) {
return NGX_ERROR;
}
if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
if (qs->recv_final_size != (uint64_t) -1
&& qs->recv_final_size != f->final_size)
{
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
}
@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_ERROR;
}
qs->final_size = f->final_size;
qs->recv_final_size = f->final_size;
if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) {
return NGX_ERROR;
}
if (qs->connection == NULL) {
return ngx_quic_close_stream(qs);
}
ngx_quic_set_event(qs->connection->read);
return NGX_OK;
@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
return NGX_OK;
}
if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) {
if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) {
return NGX_ERROR;
}
if (qs->connection == NULL) {
return ngx_quic_close_stream(qs);
}
ngx_quic_set_event(qs->connection->write);
return NGX_OK;
@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
return;
}
sent = qs->connection->sent;
unacked = sent - qs->acked;
if (unacked >= qc->conf->stream_buffer_size) {
ngx_quic_set_event(qs->connection->write);
if (qs->connection == NULL) {
qs->acked += f->u.stream.length;
return;
}
sent = qs->connection->sent;
unacked = sent - qs->acked;
qs->acked += f->u.stream.length;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0,
"quic stream ack len:%uL acked:%uL unacked:%uL",
f->u.stream.length, qs->acked, sent - qs->acked);
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL",
qs->id, f->u.stream.length, qs->acked, sent - qs->acked);
if (unacked != qc->conf->stream_buffer_size) {
/* not blocked on buffer size */
return;
}
ngx_quic_set_event(qs->connection->write);
}
static ngx_int_t
ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last)
{
uint64_t len;
ngx_quic_stream_t *qs;
ngx_connection_t *pc;
ngx_quic_connection_t *qc;
qs = c->quic;
qc = ngx_quic_get_connection(qs->parent);
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
if (last <= qs->recv_last) {
return NGX_OK;
@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
len = last - qs->recv_last;
ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow control msd:%uL/%uL md:%uL/%uL",
last, qs->recv_max_data, qc->streams.recv_last + len,
ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL",
qs->id, last, qs->recv_max_data, qc->streams.recv_last + len,
qc->streams.recv_max_data);
qs->recv_last += len;
@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
static ngx_int_t
ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last)
{
uint64_t len;
ngx_connection_t *pc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
len = last - qs->recv_offset;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow update %uL", last);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL flow update %uL", qs->id, last);
qs->recv_offset += len;
if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
if (ngx_quic_update_max_stream_data(qs) != NGX_OK) {
return NGX_ERROR;
}
}
@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
static ngx_int_t
ngx_quic_update_max_stream_data(ngx_connection_t *c)
ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs)
{
uint64_t recv_max_data;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_connection_t *c)
qs->recv_max_data = recv_max_data;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic flow update msd:%uL", qs->recv_max_data);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0,
"quic stream id:0x%xL flow update msd:%uL",
qs->id, qs->recv_max_data);
frame = ngx_quic_alloc_frame(pc);
if (frame == NULL) {

View File

@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_event_t *wev)
}
/* XXX async & buffered stream writes */
ngx_connection_t *
ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id)
{