QUIC: introduced explicit stream states.

This allows to eliminate the usage of stream connection event flags for tracking
stream state.
This commit is contained in:
Roman Arutyunyan 2022-01-31 09:46:02 +03:00
parent 6850f6e935
commit 6e7c3ad42c
3 changed files with 117 additions and 75 deletions

View File

@ -28,6 +28,26 @@
#define NGX_QUIC_STREAM_UNIDIRECTIONAL 0x02
typedef enum {
NGX_QUIC_STREAM_SEND_READY = 0,
NGX_QUIC_STREAM_SEND_SEND,
NGX_QUIC_STREAM_SEND_DATA_SENT,
NGX_QUIC_STREAM_SEND_DATA_RECVD,
NGX_QUIC_STREAM_SEND_RESET_SENT,
NGX_QUIC_STREAM_SEND_RESET_RECVD
} ngx_quic_stream_send_state_e;
typedef enum {
NGX_QUIC_STREAM_RECV_RECV = 0,
NGX_QUIC_STREAM_RECV_SIZE_KNOWN,
NGX_QUIC_STREAM_RECV_DATA_RECVD,
NGX_QUIC_STREAM_RECV_DATA_READ,
NGX_QUIC_STREAM_RECV_RESET_RECVD,
NGX_QUIC_STREAM_RECV_RESET_READ
} ngx_quic_stream_recv_state_e;
typedef struct {
ngx_ssl_t *ssl;
@ -66,6 +86,8 @@ struct ngx_quic_stream_s {
ngx_chain_t *in;
ngx_chain_t *out;
ngx_uint_t cancelable; /* unsigned cancelable:1; */
ngx_quic_stream_send_state_e send_state;
ngx_quic_stream_recv_state_e recv_state;
};

View File

@ -617,10 +617,13 @@ ngx_quic_resend_frames(ngx_connection_t *c, ngx_quic_send_ctx_t *ctx)
case NGX_QUIC_FT_STREAM:
qs = ngx_quic_find_stream(&qc->streams.tree, f->u.stream.stream_id);
if (qs && qs->connection->write->error) {
/* RESET_STREAM was sent */
ngx_quic_free_frame(c, f);
break;
if (qs) {
if (qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
{
ngx_quic_free_frame(c, f);
break;
}
}
/* fall through */

View File

@ -192,12 +192,13 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
{
qs = (ngx_quic_stream_t *) node;
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
rev = qs->connection->read;
rev->error = 1;
rev->ready = 1;
wev = qs->connection->write;
wev->error = 1;
wev->ready = 1;
ngx_post_event(rev, &ngx_posted_events);
@ -221,19 +222,22 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc)
ngx_int_t
ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
{
ngx_event_t *wev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
wev = c->write;
qs = c->quic;
if (wev->error) {
if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT
|| qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD)
{
return NGX_OK;
}
qs = c->quic;
qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -250,9 +254,6 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
ngx_quic_queue_frame(qc, frame);
wev->error = 1;
wev->ready = 1;
return NGX_OK;
}
@ -260,27 +261,15 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err)
ngx_int_t
ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
{
ngx_quic_stream_t *qs;
qs = c->quic;
if (how == NGX_RDWR_SHUTDOWN || how == NGX_WRITE_SHUTDOWN) {
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED)
|| (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
{
if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_quic_shutdown_stream_send(c) != NGX_OK) {
return NGX_ERROR;
}
}
if (how == NGX_RDWR_SHUTDOWN || how == NGX_READ_SHUTDOWN) {
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
|| (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
{
if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_quic_shutdown_stream_recv(c) != NGX_OK) {
return NGX_ERROR;
}
}
@ -291,19 +280,21 @@ ngx_quic_shutdown_stream(ngx_connection_t *c, int how)
static ngx_int_t
ngx_quic_shutdown_stream_send(ngx_connection_t *c)
{
ngx_event_t *wev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
wev = c->write;
qs = c->quic;
if (wev->error) {
if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
&& qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
{
return NGX_OK;
}
qs = c->quic;
qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -327,8 +318,6 @@ ngx_quic_shutdown_stream_send(ngx_connection_t *c)
ngx_quic_queue_frame(qc, frame);
wev->error = 1;
return NGX_OK;
}
@ -336,19 +325,19 @@ ngx_quic_shutdown_stream_send(ngx_connection_t *c)
static ngx_int_t
ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
{
ngx_event_t *rev;
ngx_connection_t *pc;
ngx_quic_frame_t *frame;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
rev = c->read;
qs = c->quic;
if (rev->pending_eof || rev->error) {
if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
&& qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
{
return NGX_OK;
}
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -371,8 +360,6 @@ ngx_quic_shutdown_stream_recv(ngx_connection_t *c)
ngx_quic_queue_frame(qc, frame);
rev->error = 1;
return NGX_OK;
}
@ -690,9 +677,13 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
if (id & NGX_QUIC_STREAM_UNIDIRECTIONAL) {
if (id & NGX_QUIC_STREAM_SERVER_INITIATED) {
qs->send_max_data = qc->ctp.initial_max_stream_data_uni;
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
qs->send_state = NGX_QUIC_STREAM_SEND_READY;
} else {
qs->recv_max_data = qc->tp.initial_max_stream_data_uni;
qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
qs->send_state = NGX_QUIC_STREAM_SEND_DATA_RECVD;
}
} else {
@ -704,6 +695,9 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
qs->send_max_data = qc->ctp.initial_max_stream_data_bidi_local;
qs->recv_max_data = qc->tp.initial_max_stream_data_bidi_remote;
}
qs->recv_state = NGX_QUIC_STREAM_RECV_RECV;
qs->send_state = NGX_QUIC_STREAM_SEND_READY;
}
qs->recv_window = qs->recv_max_data;
@ -744,25 +738,19 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
pc = qs->parent;
rev = c->read;
if (rev->error) {
if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
|| qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_READ;
rev->error = 1;
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv eof:%d buf:%uz",
qs->id, rev->pending_eof, size);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv buf:%uz", qs->id, size);
if (qs->in == NULL || qs->in->buf->sync) {
rev->ready = 0;
if (qs->recv_offset == qs->final_size) {
rev->eof = 1;
return 0;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv() not ready", qs->id);
return NGX_AGAIN;
if (size == 0) {
return 0;
}
in = ngx_quic_read_chain(pc, &qs->in, size);
@ -780,8 +768,23 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_free_chain(pc, in);
if (qs->in == NULL) {
rev->ready = rev->pending_eof;
if (len == 0) {
rev->ready = 0;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN
&& qs->recv_offset == qs->final_size)
{
qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ;
}
if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_READ) {
rev->eof = 1;
return 0;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv() not ready", qs->id);
return NGX_AGAIN;
}
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
@ -839,10 +842,15 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
qc = ngx_quic_get_connection(pc);
wev = c->write;
if (wev->error) {
if (qs->send_state != NGX_QUIC_STREAM_SEND_READY
&& qs->send_state != NGX_QUIC_STREAM_SEND_SEND)
{
wev->error = 1;
return NGX_CHAIN_ERROR;
}
qs->send_state = NGX_QUIC_STREAM_SEND_SEND;
flow = ngx_quic_max_stream_flow(c);
if (flow == 0) {
wev->ready = 0;
@ -1051,9 +1059,9 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
sc = qs->connection;
rev = sc->read;
if (rev->error) {
if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV
&& qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN)
{
return NGX_OK;
}
@ -1086,8 +1094,8 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_ERROR;
}
rev->pending_eof = 1;
qs->final_size = last;
qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN;
}
if (ngx_quic_write_chain(c, &qs->in, frame->data, f->length,
@ -1098,6 +1106,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
}
if (f->offset == qs->recv_offset) {
rev = sc->read;
rev->ready = 1;
if (rev->active) {
@ -1273,11 +1282,15 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_OK;
}
sc = qs->connection;
if (qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_RECVD
|| qs->recv_state == NGX_QUIC_STREAM_RECV_RESET_READ)
{
return NGX_OK;
}
rev = sc->read;
rev->error = 1;
rev->ready = 1;
qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD;
sc = qs->connection;
if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
return NGX_ERROR;
@ -1299,6 +1312,9 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_ERROR;
}
rev = sc->read;
rev->ready = 1;
if (rev->active) {
ngx_post_event(rev, &ngx_posted_events);
}
@ -1341,6 +1357,7 @@ ngx_quic_handle_stop_sending_frame(ngx_connection_t *c,
wev = qs->connection->write;
if (wev->active) {
wev->ready = 1;
ngx_post_event(wev, &ngx_posted_events);
}
@ -1413,11 +1430,9 @@ static ngx_int_t
ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
ngx_event_t *rev;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
rev = c->read;
qs = c->quic;
qc = ngx_quic_get_connection(qs->parent);
@ -1434,7 +1449,9 @@ ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
qs->recv_last += len;
if (!rev->error && qs->recv_last > qs->recv_max_data) {
if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV
&& qs->recv_last > qs->recv_max_data)
{
qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
return NGX_ERROR;
}
@ -1454,12 +1471,10 @@ static ngx_int_t
ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
{
uint64_t len;
ngx_event_t *rev;
ngx_connection_t *pc;
ngx_quic_stream_t *qs;
ngx_quic_connection_t *qc;
rev = c->read;
qs = c->quic;
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
@ -1475,9 +1490,7 @@ ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
qs->recv_offset += len;
if (!rev->pending_eof && !rev->error
&& qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
{
if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) {
if (ngx_quic_update_max_stream_data(c) != NGX_OK) {
return NGX_ERROR;
}
@ -1510,6 +1523,10 @@ ngx_quic_update_max_stream_data(ngx_connection_t *c)
pc = qs->parent;
qc = ngx_quic_get_connection(pc);
if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV) {
return NGX_OK;
}
recv_max_data = qs->recv_offset + qs->recv_window;
if (qs->recv_max_data == recv_max_data) {