Stream: filters.
This commit is contained in:
parent
f03438cca0
commit
bb3fcf3e3f
|
@ -973,7 +973,8 @@ if [ $STREAM != NO ]; then
|
|||
ngx_stream_core_module \
|
||||
ngx_stream_log_module \
|
||||
ngx_stream_proxy_module \
|
||||
ngx_stream_upstream_module"
|
||||
ngx_stream_upstream_module \
|
||||
ngx_stream_write_filter_module"
|
||||
ngx_module_incs="src/stream"
|
||||
ngx_module_deps="src/stream/ngx_stream.h \
|
||||
src/stream/ngx_stream_variables.h \
|
||||
|
@ -988,7 +989,8 @@ if [ $STREAM != NO ]; then
|
|||
src/stream/ngx_stream_log_module.c \
|
||||
src/stream/ngx_stream_proxy_module.c \
|
||||
src/stream/ngx_stream_upstream.c \
|
||||
src/stream/ngx_stream_upstream_round_robin.c"
|
||||
src/stream/ngx_stream_upstream_round_robin.c \
|
||||
src/stream/ngx_stream_write_filter_module.c"
|
||||
|
||||
. auto/module
|
||||
|
||||
|
|
|
@ -167,6 +167,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
|
|||
src/os/unix/ngx_send.c \
|
||||
src/os/unix/ngx_writev_chain.c \
|
||||
src/os/unix/ngx_udp_send.c \
|
||||
src/os/unix/ngx_udp_sendmsg_chain.c \
|
||||
src/os/unix/ngx_channel.c \
|
||||
src/os/unix/ngx_shmem.c \
|
||||
src/os/unix/ngx_process.c \
|
||||
|
|
|
@ -93,6 +93,8 @@ ngx_os_io_t ngx_iocp_io = {
|
|||
NULL,
|
||||
ngx_udp_overlapped_wsarecv,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
ngx_overlapped_wsasend_chain,
|
||||
0
|
||||
};
|
||||
|
|
|
@ -430,6 +430,7 @@ extern ngx_os_io_t ngx_io;
|
|||
#define ngx_send ngx_io.send
|
||||
#define ngx_send_chain ngx_io.send_chain
|
||||
#define ngx_udp_send ngx_io.udp_send
|
||||
#define ngx_udp_send_chain ngx_io.udp_send_chain
|
||||
|
||||
|
||||
#define NGX_EVENT_MODULE 0x544E5645 /* "EVNT" */
|
||||
|
|
|
@ -467,6 +467,7 @@ ngx_event_recvmsg(ngx_event_t *ev)
|
|||
*log = ls->log;
|
||||
|
||||
c->send = ngx_udp_send;
|
||||
c->send_chain = ngx_udp_send_chain;
|
||||
|
||||
c->log = log;
|
||||
c->pool->log = log;
|
||||
|
|
|
@ -166,6 +166,7 @@ ngx_event_connect_peer(ngx_peer_connection_t *pc)
|
|||
} else { /* type == SOCK_DGRAM */
|
||||
c->recv = ngx_udp_recv;
|
||||
c->send = ngx_send;
|
||||
c->send_chain = ngx_udp_send_chain;
|
||||
}
|
||||
|
||||
c->log_error = pc->log_error;
|
||||
|
|
|
@ -24,6 +24,7 @@ static ngx_os_io_t ngx_darwin_io = {
|
|||
ngx_udp_unix_recv,
|
||||
ngx_unix_send,
|
||||
ngx_udp_unix_send,
|
||||
ngx_udp_unix_sendmsg_chain,
|
||||
#if (NGX_HAVE_SENDFILE)
|
||||
ngx_darwin_sendfile_chain,
|
||||
NGX_IO_SENDFILE
|
||||
|
|
|
@ -33,6 +33,7 @@ static ngx_os_io_t ngx_freebsd_io = {
|
|||
ngx_udp_unix_recv,
|
||||
ngx_unix_send,
|
||||
ngx_udp_unix_send,
|
||||
ngx_udp_unix_sendmsg_chain,
|
||||
#if (NGX_HAVE_SENDFILE)
|
||||
ngx_freebsd_sendfile_chain,
|
||||
NGX_IO_SENDFILE
|
||||
|
|
|
@ -19,6 +19,7 @@ static ngx_os_io_t ngx_linux_io = {
|
|||
ngx_udp_unix_recv,
|
||||
ngx_unix_send,
|
||||
ngx_udp_unix_send,
|
||||
ngx_udp_unix_sendmsg_chain,
|
||||
#if (NGX_HAVE_SENDFILE)
|
||||
ngx_linux_sendfile_chain,
|
||||
NGX_IO_SENDFILE
|
||||
|
|
|
@ -29,6 +29,7 @@ typedef struct {
|
|||
ngx_recv_pt udp_recv;
|
||||
ngx_send_pt send;
|
||||
ngx_send_pt udp_send;
|
||||
ngx_send_chain_pt udp_send_chain;
|
||||
ngx_send_chain_pt send_chain;
|
||||
ngx_uint_t flags;
|
||||
} ngx_os_io_t;
|
||||
|
@ -49,6 +50,8 @@ ssize_t ngx_unix_send(ngx_connection_t *c, u_char *buf, size_t size);
|
|||
ngx_chain_t *ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in,
|
||||
off_t limit);
|
||||
ssize_t ngx_udp_unix_send(ngx_connection_t *c, u_char *buf, size_t size);
|
||||
ngx_chain_t *ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in,
|
||||
off_t limit);
|
||||
|
||||
|
||||
#if (IOV_MAX > 64)
|
||||
|
|
|
@ -25,6 +25,7 @@ ngx_os_io_t ngx_os_io = {
|
|||
ngx_udp_unix_recv,
|
||||
ngx_unix_send,
|
||||
ngx_udp_unix_send,
|
||||
ngx_udp_unix_sendmsg_chain,
|
||||
ngx_writev_chain,
|
||||
0
|
||||
};
|
||||
|
|
|
@ -20,6 +20,7 @@ static ngx_os_io_t ngx_solaris_io = {
|
|||
ngx_udp_unix_recv,
|
||||
ngx_unix_send,
|
||||
ngx_udp_unix_send,
|
||||
ngx_udp_unix_sendmsg_chain,
|
||||
#if (NGX_HAVE_SENDFILE)
|
||||
ngx_solaris_sendfilev_chain,
|
||||
NGX_IO_SENDFILE
|
||||
|
|
|
@ -0,0 +1,245 @@
|
|||
|
||||
/*
|
||||
* Copyright (C) Igor Sysoev
|
||||
* Copyright (C) Nginx, Inc.
|
||||
*/
|
||||
|
||||
|
||||
#include <ngx_config.h>
|
||||
#include <ngx_core.h>
|
||||
#include <ngx_event.h>
|
||||
|
||||
|
||||
static ngx_chain_t *ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec,
|
||||
ngx_chain_t *in, ngx_log_t *log);
|
||||
static ssize_t ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec);
|
||||
|
||||
|
||||
ngx_chain_t *
|
||||
ngx_udp_unix_sendmsg_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
|
||||
{
|
||||
ssize_t n;
|
||||
off_t send;
|
||||
ngx_chain_t *cl;
|
||||
ngx_event_t *wev;
|
||||
ngx_iovec_t vec;
|
||||
struct iovec iovs[NGX_IOVS_PREALLOCATE];
|
||||
|
||||
wev = c->write;
|
||||
|
||||
if (!wev->ready) {
|
||||
return in;
|
||||
}
|
||||
|
||||
#if (NGX_HAVE_KQUEUE)
|
||||
|
||||
if ((ngx_event_flags & NGX_USE_KQUEUE_EVENT) && wev->pending_eof) {
|
||||
(void) ngx_connection_error(c, wev->kq_errno,
|
||||
"kevent() reported about an closed connection");
|
||||
wev->error = 1;
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* the maximum limit size is the maximum size_t value - the page size */
|
||||
|
||||
if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
|
||||
limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;
|
||||
}
|
||||
|
||||
send = 0;
|
||||
|
||||
vec.iovs = iovs;
|
||||
vec.nalloc = NGX_IOVS_PREALLOCATE;
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
/* create the iovec and coalesce the neighbouring bufs */
|
||||
|
||||
cl = ngx_udp_output_chain_to_iovec(&vec, in, c->log);
|
||||
|
||||
if (cl == NGX_CHAIN_ERROR) {
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
if (cl && cl->buf->in_file) {
|
||||
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
|
||||
"file buf in sendmsg "
|
||||
"t:%d r:%d f:%d %p %p-%p %p %O-%O",
|
||||
cl->buf->temporary,
|
||||
cl->buf->recycled,
|
||||
cl->buf->in_file,
|
||||
cl->buf->start,
|
||||
cl->buf->pos,
|
||||
cl->buf->last,
|
||||
cl->buf->file,
|
||||
cl->buf->file_pos,
|
||||
cl->buf->file_last);
|
||||
|
||||
ngx_debug_point();
|
||||
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
if (cl == in) {
|
||||
return in;
|
||||
}
|
||||
|
||||
send += vec.size;
|
||||
|
||||
n = ngx_sendmsg(c, &vec);
|
||||
|
||||
if (n == NGX_ERROR) {
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
if (n == NGX_AGAIN) {
|
||||
wev->ready = 0;
|
||||
return in;
|
||||
}
|
||||
|
||||
c->sent += n;
|
||||
|
||||
in = ngx_chain_update_sent(in, n);
|
||||
|
||||
if (send >= limit || in == NULL) {
|
||||
return in;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ngx_chain_t *
|
||||
ngx_udp_output_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *in, ngx_log_t *log)
|
||||
{
|
||||
size_t total, size;
|
||||
u_char *prev;
|
||||
ngx_uint_t n, flush;
|
||||
ngx_chain_t *cl;
|
||||
struct iovec *iov;
|
||||
|
||||
cl = in;
|
||||
iov = NULL;
|
||||
prev = NULL;
|
||||
total = 0;
|
||||
n = 0;
|
||||
flush = 0;
|
||||
|
||||
for ( /* void */ ; in && !flush; in = in->next) {
|
||||
|
||||
if (in->buf->flush || in->buf->last_buf) {
|
||||
flush = 1;
|
||||
}
|
||||
|
||||
if (ngx_buf_special(in->buf)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (in->buf->in_file) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!ngx_buf_in_memory(in->buf)) {
|
||||
ngx_log_error(NGX_LOG_ALERT, log, 0,
|
||||
"bad buf in output chain "
|
||||
"t:%d r:%d f:%d %p %p-%p %p %O-%O",
|
||||
in->buf->temporary,
|
||||
in->buf->recycled,
|
||||
in->buf->in_file,
|
||||
in->buf->start,
|
||||
in->buf->pos,
|
||||
in->buf->last,
|
||||
in->buf->file,
|
||||
in->buf->file_pos,
|
||||
in->buf->file_last);
|
||||
|
||||
ngx_debug_point();
|
||||
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
size = in->buf->last - in->buf->pos;
|
||||
|
||||
if (prev == in->buf->pos) {
|
||||
iov->iov_len += size;
|
||||
|
||||
} else {
|
||||
if (n == vec->nalloc) {
|
||||
ngx_log_error(NGX_LOG_ALERT, log, 0,
|
||||
"too many parts in a datagram");
|
||||
return NGX_CHAIN_ERROR;
|
||||
}
|
||||
|
||||
iov = &vec->iovs[n++];
|
||||
|
||||
iov->iov_base = (void *) in->buf->pos;
|
||||
iov->iov_len = size;
|
||||
}
|
||||
|
||||
prev = in->buf->pos + size;
|
||||
total += size;
|
||||
}
|
||||
|
||||
if (!flush) {
|
||||
#if (NGX_SUPPRESS_WARN)
|
||||
vec->size = 0;
|
||||
vec->count = 0;
|
||||
#endif
|
||||
return cl;
|
||||
}
|
||||
|
||||
vec->count = n;
|
||||
vec->size = total;
|
||||
|
||||
return in;
|
||||
}
|
||||
|
||||
|
||||
static ssize_t
|
||||
ngx_sendmsg(ngx_connection_t *c, ngx_iovec_t *vec)
|
||||
{
|
||||
ssize_t n;
|
||||
ngx_err_t err;
|
||||
struct msghdr msg;
|
||||
|
||||
ngx_memzero(&msg, sizeof(struct msghdr));
|
||||
|
||||
if (c->socklen) {
|
||||
msg.msg_name = c->sockaddr;
|
||||
msg.msg_namelen = c->socklen;
|
||||
}
|
||||
|
||||
msg.msg_iov = vec->iovs;
|
||||
msg.msg_iovlen = vec->count;
|
||||
|
||||
eintr:
|
||||
|
||||
n = sendmsg(c->fd, &msg, 0);
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"sendmsg: %z of %uz", n, vec->size);
|
||||
|
||||
if (n == -1) {
|
||||
err = ngx_errno;
|
||||
|
||||
switch (err) {
|
||||
case NGX_EAGAIN:
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
|
||||
"sendmsg() not ready");
|
||||
return NGX_AGAIN;
|
||||
|
||||
case NGX_EINTR:
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err,
|
||||
"sendmsg() was interrupted");
|
||||
goto eintr;
|
||||
|
||||
default:
|
||||
c->write->error = 1;
|
||||
ngx_connection_error(c, err, "sendmsg() failed");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
|
@ -28,6 +28,8 @@ typedef struct {
|
|||
ngx_recv_chain_pt recv_chain;
|
||||
ngx_recv_pt udp_recv;
|
||||
ngx_send_pt send;
|
||||
ngx_send_pt udp_send;
|
||||
ngx_send_chain_pt udp_send_chain;
|
||||
ngx_send_chain_pt send_chain;
|
||||
ngx_uint_t flags;
|
||||
} ngx_os_io_t;
|
||||
|
|
|
@ -25,6 +25,8 @@ ngx_os_io_t ngx_os_io = {
|
|||
ngx_wsarecv_chain,
|
||||
ngx_udp_wsarecv,
|
||||
ngx_wsasend,
|
||||
NULL,
|
||||
NULL,
|
||||
ngx_wsasend_chain,
|
||||
0
|
||||
};
|
||||
|
|
|
@ -27,6 +27,9 @@ static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two);
|
|||
ngx_uint_t ngx_stream_max_module;
|
||||
|
||||
|
||||
ngx_stream_filter_pt ngx_stream_top_filter;
|
||||
|
||||
|
||||
static ngx_command_t ngx_stream_commands[] = {
|
||||
|
||||
{ ngx_string("stream"),
|
||||
|
|
|
@ -243,6 +243,9 @@ typedef struct {
|
|||
NULL)
|
||||
|
||||
|
||||
#define NGX_STREAM_WRITE_BUFFERED 0x10
|
||||
|
||||
|
||||
void ngx_stream_init_connection(ngx_connection_t *c);
|
||||
void ngx_stream_finalize_session(ngx_stream_session_t *s, ngx_uint_t rc);
|
||||
|
||||
|
@ -252,4 +255,11 @@ extern ngx_uint_t ngx_stream_max_module;
|
|||
extern ngx_module_t ngx_stream_core_module;
|
||||
|
||||
|
||||
typedef ngx_int_t (*ngx_stream_filter_pt)(ngx_stream_session_t *s,
|
||||
ngx_chain_t *chain, ngx_uint_t from_upstream);
|
||||
|
||||
|
||||
extern ngx_stream_filter_pt ngx_stream_top_filter;
|
||||
|
||||
|
||||
#endif /* _NGX_STREAM_H_INCLUDED_ */
|
||||
|
|
|
@ -134,6 +134,10 @@ ngx_stream_init_connection(ngx_connection_t *c)
|
|||
s->ssl = addr_conf->ssl;
|
||||
#endif
|
||||
|
||||
if (c->buffer) {
|
||||
s->received += c->buffer->last - c->buffer->pos;
|
||||
}
|
||||
|
||||
s->connection = c;
|
||||
c->data = s;
|
||||
|
||||
|
|
|
@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd,
|
|||
void *conf);
|
||||
static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
|
||||
void *conf);
|
||||
static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
|
||||
|
||||
#if (NGX_STREAM_SSL)
|
||||
|
||||
static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
|
||||
static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
|
||||
ngx_command_t *cmd, void *conf);
|
||||
static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
|
||||
|
@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
|
|||
}
|
||||
|
||||
u->peer.type = c->type;
|
||||
|
||||
u->proxy_protocol = pscf->proxy_protocol;
|
||||
u->start_sec = ngx_time();
|
||||
|
||||
c->write->handler = ngx_stream_proxy_downstream_handler;
|
||||
|
@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
|
|||
u->downstream_buf.pos = p;
|
||||
u->downstream_buf.last = p;
|
||||
|
||||
if (u->proxy_protocol
|
||||
#if (NGX_STREAM_SSL)
|
||||
&& pscf->ssl == NULL
|
||||
#endif
|
||||
&& pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
|
||||
{
|
||||
/* optimization for a typical case */
|
||||
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream proxy send PROXY protocol header");
|
||||
|
||||
p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
|
||||
u->downstream_buf.end);
|
||||
if (p == NULL) {
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
u->downstream_buf.last = p;
|
||||
u->proxy_protocol = 0;
|
||||
}
|
||||
|
||||
if (c->read->ready) {
|
||||
ngx_post_event(c->read, &ngx_posted_events);
|
||||
}
|
||||
|
@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_session_t *s)
|
|||
|
||||
c->log->action = "connecting to upstream";
|
||||
|
||||
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
|
||||
|
||||
u = s->upstream;
|
||||
|
||||
u->connected = 0;
|
||||
u->proxy_protocol = pscf->proxy_protocol;
|
||||
|
||||
if (u->state) {
|
||||
u->state->response_time = ngx_current_msec - u->state->response_time;
|
||||
}
|
||||
|
@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_session_t *s)
|
|||
pc->read->handler = ngx_stream_proxy_connect_handler;
|
||||
pc->write->handler = ngx_stream_proxy_connect_handler;
|
||||
|
||||
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
|
||||
|
||||
ngx_add_timer(pc->write, pscf->connect_timeout);
|
||||
}
|
||||
|
||||
|
@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
|
|||
{
|
||||
int tcp_nodelay;
|
||||
u_char *p;
|
||||
ngx_chain_t *cl;
|
||||
ngx_connection_t *c, *pc;
|
||||
ngx_log_handler_pt handler;
|
||||
ngx_stream_upstream_t *u;
|
||||
|
@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
|
|||
pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
|
||||
}
|
||||
|
||||
if (u->proxy_protocol) {
|
||||
if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
|
||||
return;
|
||||
}
|
||||
|
||||
u->proxy_protocol = 0;
|
||||
}
|
||||
|
||||
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
|
||||
|
||||
#if (NGX_STREAM_SSL)
|
||||
if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
|
||||
ngx_stream_proxy_ssl_init_connection(s);
|
||||
return;
|
||||
|
||||
if (pc->type == SOCK_STREAM && pscf->ssl) {
|
||||
|
||||
if (u->proxy_protocol) {
|
||||
if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
|
||||
return;
|
||||
}
|
||||
|
||||
u->proxy_protocol = 0;
|
||||
}
|
||||
|
||||
if (pc->ssl == NULL) {
|
||||
ngx_stream_proxy_ssl_init_connection(s);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
c = s->connection;
|
||||
|
@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
|
|||
u->upstream_buf.last = p;
|
||||
}
|
||||
|
||||
if (c->type == SOCK_DGRAM) {
|
||||
s->received = c->buffer->last - c->buffer->pos;
|
||||
u->downstream_buf = *c->buffer;
|
||||
if (c->buffer && c->buffer->pos < c->buffer->last) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream proxy add preread buffer: %uz",
|
||||
c->buffer->last - c->buffer->pos);
|
||||
|
||||
if (pscf->responses == 0) {
|
||||
pc->read->ready = 0;
|
||||
pc->read->eof = 1;
|
||||
cl = ngx_chain_get_free_buf(c->pool, &u->free);
|
||||
if (cl == NULL) {
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
*cl->buf = *c->buffer;
|
||||
|
||||
cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
|
||||
cl->buf->flush = 1;
|
||||
cl->buf->last_buf = (c->type == SOCK_DGRAM);
|
||||
|
||||
cl->next = u->upstream_out;
|
||||
u->upstream_out = cl;
|
||||
}
|
||||
|
||||
if (u->proxy_protocol) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream proxy add PROXY protocol header");
|
||||
|
||||
cl = ngx_chain_get_free_buf(c->pool, &u->free);
|
||||
if (cl == NULL) {
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
|
||||
if (p == NULL) {
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
cl->buf->pos = p;
|
||||
|
||||
p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
|
||||
if (p == NULL) {
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
cl->buf->last = p;
|
||||
cl->buf->temporary = 1;
|
||||
cl->buf->flush = 0;
|
||||
cl->buf->last_buf = 0;
|
||||
cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
|
||||
|
||||
cl->next = u->upstream_out;
|
||||
u->upstream_out = cl;
|
||||
|
||||
u->proxy_protocol = 0;
|
||||
}
|
||||
|
||||
if (c->type == SOCK_DGRAM && pscf->responses == 0) {
|
||||
pc->read->ready = 0;
|
||||
pc->read->eof = 1;
|
||||
}
|
||||
|
||||
u->connected = 1;
|
||||
|
@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
|
|||
}
|
||||
|
||||
|
||||
#if (NGX_STREAM_SSL)
|
||||
|
||||
static ngx_int_t
|
||||
ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
|
||||
{
|
||||
|
@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
|
|||
}
|
||||
|
||||
|
||||
#if (NGX_STREAM_SSL)
|
||||
|
||||
static char *
|
||||
ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
|
||||
void *conf)
|
||||
|
@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
size_t size, limit_rate;
|
||||
ssize_t n;
|
||||
ngx_buf_t *b;
|
||||
ngx_int_t rc;
|
||||
ngx_uint_t flags;
|
||||
ngx_msec_t delay;
|
||||
ngx_chain_t *cl, **ll, **out, **busy;
|
||||
ngx_connection_t *c, *pc, *src, *dst;
|
||||
ngx_log_handler_pt handler;
|
||||
ngx_stream_upstream_t *u;
|
||||
|
@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
b = &u->upstream_buf;
|
||||
limit_rate = pscf->download_rate;
|
||||
received = &u->received;
|
||||
out = &u->downstream_out;
|
||||
busy = &u->downstream_busy;
|
||||
|
||||
} else {
|
||||
src = c;
|
||||
|
@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
b = &u->downstream_buf;
|
||||
limit_rate = pscf->upload_rate;
|
||||
received = &s->received;
|
||||
out = &u->upstream_out;
|
||||
busy = &u->upstream_busy;
|
||||
}
|
||||
|
||||
for ( ;; ) {
|
||||
|
||||
if (do_write) {
|
||||
if (do_write && dst) {
|
||||
|
||||
size = b->last - b->pos;
|
||||
if (*out || *busy || dst->buffered) {
|
||||
rc = ngx_stream_top_filter(s, *out, from_upstream);
|
||||
|
||||
if (size && dst && dst->write->ready) {
|
||||
|
||||
n = dst->send(dst, b->pos, size);
|
||||
|
||||
if (n == NGX_AGAIN && dst->shared) {
|
||||
/* cannot wait on a shared socket */
|
||||
n = NGX_ERROR;
|
||||
}
|
||||
|
||||
if (n == NGX_ERROR) {
|
||||
if (rc == NGX_ERROR) {
|
||||
if (c->type == SOCK_DGRAM && !from_upstream) {
|
||||
ngx_stream_proxy_next_upstream(s);
|
||||
return;
|
||||
|
@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
return;
|
||||
}
|
||||
|
||||
if (n > 0) {
|
||||
b->pos += n;
|
||||
ngx_chain_update_chains(c->pool, &u->free, busy, out,
|
||||
(ngx_buf_tag_t) &ngx_stream_proxy_module);
|
||||
|
||||
if (b->pos == b->last) {
|
||||
b->pos = b->start;
|
||||
b->last = b->start;
|
||||
}
|
||||
if (*busy == NULL) {
|
||||
b->pos = b->start;
|
||||
b->last = b->start;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
|
||||
n = src->recv(src, b->last, size);
|
||||
|
||||
if (n == NGX_AGAIN || n == 0) {
|
||||
if (n == NGX_AGAIN) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (n > 0) {
|
||||
if (n == NGX_ERROR) {
|
||||
if (c->type == SOCK_DGRAM && u->received == 0) {
|
||||
ngx_stream_proxy_next_upstream(s);
|
||||
return;
|
||||
}
|
||||
|
||||
src->read->eof = 1;
|
||||
n = 0;
|
||||
}
|
||||
|
||||
if (n >= 0) {
|
||||
if (limit_rate) {
|
||||
delay = (ngx_msec_t) (n * 1000 / limit_rate);
|
||||
|
||||
|
@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
|
|||
src->read->eof = 1;
|
||||
}
|
||||
|
||||
for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
|
||||
|
||||
cl = ngx_chain_get_free_buf(c->pool, &u->free);
|
||||
if (cl == NULL) {
|
||||
ngx_stream_proxy_finalize(s,
|
||||
NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
*ll = cl;
|
||||
|
||||
cl->buf->pos = b->last;
|
||||
cl->buf->last = b->last + n;
|
||||
cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
|
||||
|
||||
cl->buf->temporary = (n ? 1 : 0);
|
||||
cl->buf->last_buf = src->read->eof;
|
||||
cl->buf->flush = 1;
|
||||
|
||||
*received += n;
|
||||
b->last += n;
|
||||
do_write = 1;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (n == NGX_ERROR) {
|
||||
if (c->type == SOCK_DGRAM && u->received == 0) {
|
||||
ngx_stream_proxy_next_upstream(s);
|
||||
return;
|
||||
}
|
||||
|
||||
src->read->eof = 1;
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
|
||||
if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
|
||||
handler = c->log->handler;
|
||||
c->log->handler = NULL;
|
||||
|
||||
|
@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_stream_session_t *s)
|
|||
"stream proxy next upstream");
|
||||
|
||||
u = s->upstream;
|
||||
pc = u->peer.connection;
|
||||
|
||||
if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
|
||||
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
|
||||
"pending buffers on next upstream");
|
||||
ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
if (u->peer.sockaddr) {
|
||||
u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
|
||||
|
@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_stream_session_t *s)
|
|||
return;
|
||||
}
|
||||
|
||||
pc = u->peer.connection;
|
||||
|
||||
if (pc) {
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
|
||||
"close proxy upstream connection: %d", pc->fd);
|
||||
|
|
|
@ -11,12 +11,12 @@
|
|||
|
||||
|
||||
typedef struct {
|
||||
ngx_stream_complex_value_t text;
|
||||
ngx_stream_complex_value_t text;
|
||||
} ngx_stream_return_srv_conf_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_buf_t buf;
|
||||
ngx_chain_t *out;
|
||||
} ngx_stream_return_ctx_t;
|
||||
|
||||
|
||||
|
@ -72,6 +72,7 @@ static void
|
|||
ngx_stream_return_handler(ngx_stream_session_t *s)
|
||||
{
|
||||
ngx_str_t text;
|
||||
ngx_buf_t *b;
|
||||
ngx_connection_t *c;
|
||||
ngx_stream_return_ctx_t *ctx;
|
||||
ngx_stream_return_srv_conf_t *rscf;
|
||||
|
@ -103,8 +104,25 @@ ngx_stream_return_handler(ngx_stream_session_t *s)
|
|||
|
||||
ngx_stream_set_ctx(s, ctx, ngx_stream_return_module);
|
||||
|
||||
ctx->buf.pos = text.data;
|
||||
ctx->buf.last = text.data + text.len;
|
||||
b = ngx_calloc_buf(c->pool);
|
||||
if (b == NULL) {
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
b->memory = 1;
|
||||
b->pos = text.data;
|
||||
b->last = text.data + text.len;
|
||||
b->last_buf = 1;
|
||||
|
||||
ctx->out = ngx_alloc_chain_link(c->pool);
|
||||
if (ctx->out == NULL) {
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
ctx->out->buf = b;
|
||||
ctx->out->next = NULL;
|
||||
|
||||
c->write->handler = ngx_stream_return_write_handler;
|
||||
|
||||
|
@ -115,8 +133,6 @@ ngx_stream_return_handler(ngx_stream_session_t *s)
|
|||
static void
|
||||
ngx_stream_return_write_handler(ngx_event_t *ev)
|
||||
{
|
||||
ssize_t n;
|
||||
ngx_buf_t *b;
|
||||
ngx_connection_t *c;
|
||||
ngx_stream_session_t *s;
|
||||
ngx_stream_return_ctx_t *ctx;
|
||||
|
@ -130,25 +146,20 @@ ngx_stream_return_write_handler(ngx_event_t *ev)
|
|||
return;
|
||||
}
|
||||
|
||||
if (ev->ready) {
|
||||
ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
|
||||
ctx = ngx_stream_get_module_ctx(s, ngx_stream_return_module);
|
||||
|
||||
b = &ctx->buf;
|
||||
if (ngx_stream_top_filter(s, ctx->out, 1) == NGX_ERROR) {
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
n = c->send(c, b->pos, b->last - b->pos);
|
||||
if (n == NGX_ERROR) {
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_OK);
|
||||
return;
|
||||
}
|
||||
ctx->out = NULL;
|
||||
|
||||
if (n > 0) {
|
||||
b->pos += n;
|
||||
|
||||
if (b->pos == b->last) {
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_OK);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (!c->buffered) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream return done sending");
|
||||
ngx_stream_finalize_session(s, NGX_STREAM_OK);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ngx_handle_write_event(ev, 0) != NGX_OK) {
|
||||
|
|
|
@ -106,14 +106,24 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
ngx_peer_connection_t peer;
|
||||
|
||||
ngx_buf_t downstream_buf;
|
||||
ngx_buf_t upstream_buf;
|
||||
|
||||
ngx_chain_t *free;
|
||||
ngx_chain_t *upstream_out;
|
||||
ngx_chain_t *upstream_busy;
|
||||
ngx_chain_t *downstream_out;
|
||||
ngx_chain_t *downstream_busy;
|
||||
|
||||
off_t received;
|
||||
time_t start_sec;
|
||||
ngx_uint_t responses;
|
||||
|
||||
#if (NGX_STREAM_SSL)
|
||||
ngx_str_t ssl_name;
|
||||
#endif
|
||||
|
||||
ngx_stream_upstream_resolved_t *resolved;
|
||||
ngx_stream_upstream_state_t *state;
|
||||
unsigned connected:1;
|
||||
|
|
|
@ -0,0 +1,273 @@
|
|||
|
||||
/*
|
||||
* Copyright (C) Igor Sysoev
|
||||
* Copyright (C) Nginx, Inc.
|
||||
*/
|
||||
|
||||
|
||||
#include <ngx_config.h>
|
||||
#include <ngx_core.h>
|
||||
#include <ngx_stream.h>
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_chain_t *from_upstream;
|
||||
ngx_chain_t *from_downstream;
|
||||
} ngx_stream_write_filter_ctx_t;
|
||||
|
||||
|
||||
static ngx_int_t ngx_stream_write_filter(ngx_stream_session_t *s,
|
||||
ngx_chain_t *in, ngx_uint_t from_upstream);
|
||||
static ngx_int_t ngx_stream_write_filter_init(ngx_conf_t *cf);
|
||||
|
||||
|
||||
static ngx_stream_module_t ngx_stream_write_filter_module_ctx = {
|
||||
NULL, /* preconfiguration */
|
||||
ngx_stream_write_filter_init, /* postconfiguration */
|
||||
|
||||
NULL, /* create main configuration */
|
||||
NULL, /* init main configuration */
|
||||
|
||||
NULL, /* create server configuration */
|
||||
NULL /* merge server configuration */
|
||||
};
|
||||
|
||||
|
||||
ngx_module_t ngx_stream_write_filter_module = {
|
||||
NGX_MODULE_V1,
|
||||
&ngx_stream_write_filter_module_ctx, /* module context */
|
||||
NULL, /* module directives */
|
||||
NGX_STREAM_MODULE, /* module type */
|
||||
NULL, /* init master */
|
||||
NULL, /* init module */
|
||||
NULL, /* init process */
|
||||
NULL, /* init thread */
|
||||
NULL, /* exit thread */
|
||||
NULL, /* exit process */
|
||||
NULL, /* exit master */
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_stream_write_filter(ngx_stream_session_t *s, ngx_chain_t *in,
|
||||
ngx_uint_t from_upstream)
|
||||
{
|
||||
off_t size;
|
||||
ngx_uint_t last, flush, sync;
|
||||
ngx_chain_t *cl, *ln, **ll, **out, *chain;
|
||||
ngx_connection_t *c;
|
||||
ngx_stream_write_filter_ctx_t *ctx;
|
||||
|
||||
ctx = ngx_stream_get_module_ctx(s, ngx_stream_write_filter_module);
|
||||
|
||||
if (ctx == NULL) {
|
||||
ctx = ngx_pcalloc(s->connection->pool,
|
||||
sizeof(ngx_stream_write_filter_ctx_t));
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_stream_set_ctx(s, ctx, ngx_stream_write_filter_module);
|
||||
}
|
||||
|
||||
if (from_upstream) {
|
||||
c = s->connection;
|
||||
out = &ctx->from_upstream;
|
||||
|
||||
} else {
|
||||
c = s->upstream->peer.connection;
|
||||
out = &ctx->from_downstream;
|
||||
}
|
||||
|
||||
if (c->error) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
size = 0;
|
||||
flush = 0;
|
||||
sync = 0;
|
||||
last = 0;
|
||||
ll = out;
|
||||
|
||||
/* find the size, the flush point and the last link of the saved chain */
|
||||
|
||||
for (cl = *out; cl; cl = cl->next) {
|
||||
ll = &cl->next;
|
||||
|
||||
ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"write old buf t:%d f:%d %p, pos %p, size: %z "
|
||||
"file: %O, size: %O",
|
||||
cl->buf->temporary, cl->buf->in_file,
|
||||
cl->buf->start, cl->buf->pos,
|
||||
cl->buf->last - cl->buf->pos,
|
||||
cl->buf->file_pos,
|
||||
cl->buf->file_last - cl->buf->file_pos);
|
||||
|
||||
#if 1
|
||||
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
|
||||
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
|
||||
"zero size buf in writer "
|
||||
"t:%d r:%d f:%d %p %p-%p %p %O-%O",
|
||||
cl->buf->temporary,
|
||||
cl->buf->recycled,
|
||||
cl->buf->in_file,
|
||||
cl->buf->start,
|
||||
cl->buf->pos,
|
||||
cl->buf->last,
|
||||
cl->buf->file,
|
||||
cl->buf->file_pos,
|
||||
cl->buf->file_last);
|
||||
|
||||
ngx_debug_point();
|
||||
return NGX_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
size += ngx_buf_size(cl->buf);
|
||||
|
||||
if (cl->buf->flush || cl->buf->recycled) {
|
||||
flush = 1;
|
||||
}
|
||||
|
||||
if (cl->buf->sync) {
|
||||
sync = 1;
|
||||
}
|
||||
|
||||
if (cl->buf->last_buf) {
|
||||
last = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* add the new chain to the existent one */
|
||||
|
||||
for (ln = in; ln; ln = ln->next) {
|
||||
cl = ngx_alloc_chain_link(c->pool);
|
||||
if (cl == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
cl->buf = ln->buf;
|
||||
*ll = cl;
|
||||
ll = &cl->next;
|
||||
|
||||
ngx_log_debug7(NGX_LOG_DEBUG_EVENT, c->log, 0,
|
||||
"write new buf t:%d f:%d %p, pos %p, size: %z "
|
||||
"file: %O, size: %O",
|
||||
cl->buf->temporary, cl->buf->in_file,
|
||||
cl->buf->start, cl->buf->pos,
|
||||
cl->buf->last - cl->buf->pos,
|
||||
cl->buf->file_pos,
|
||||
cl->buf->file_last - cl->buf->file_pos);
|
||||
|
||||
#if 1
|
||||
if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
|
||||
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
|
||||
"zero size buf in writer "
|
||||
"t:%d r:%d f:%d %p %p-%p %p %O-%O",
|
||||
cl->buf->temporary,
|
||||
cl->buf->recycled,
|
||||
cl->buf->in_file,
|
||||
cl->buf->start,
|
||||
cl->buf->pos,
|
||||
cl->buf->last,
|
||||
cl->buf->file,
|
||||
cl->buf->file_pos,
|
||||
cl->buf->file_last);
|
||||
|
||||
ngx_debug_point();
|
||||
return NGX_ERROR;
|
||||
}
|
||||
#endif
|
||||
|
||||
size += ngx_buf_size(cl->buf);
|
||||
|
||||
if (cl->buf->flush || cl->buf->recycled) {
|
||||
flush = 1;
|
||||
}
|
||||
|
||||
if (cl->buf->sync) {
|
||||
sync = 1;
|
||||
}
|
||||
|
||||
if (cl->buf->last_buf) {
|
||||
last = 1;
|
||||
}
|
||||
}
|
||||
|
||||
*ll = NULL;
|
||||
|
||||
ngx_log_debug3(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream write filter: l:%ui f:%ui s:%O", last, flush, size);
|
||||
|
||||
if (size == 0
|
||||
&& !(c->buffered & NGX_LOWLEVEL_BUFFERED)
|
||||
&& !(last && c->need_last_buf))
|
||||
{
|
||||
if (last || flush || sync) {
|
||||
for (cl = *out; cl; /* void */) {
|
||||
ln = cl;
|
||||
cl = cl->next;
|
||||
ngx_free_chain(c->pool, ln);
|
||||
}
|
||||
|
||||
*out = NULL;
|
||||
c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
|
||||
"the stream output chain is empty");
|
||||
|
||||
ngx_debug_point();
|
||||
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
chain = c->send_chain(c, *out, 0);
|
||||
|
||||
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
|
||||
"stream write filter %p", chain);
|
||||
|
||||
if (chain == NGX_CHAIN_ERROR) {
|
||||
c->error = 1;
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
for (cl = *out; cl && cl != chain; /* void */) {
|
||||
ln = cl;
|
||||
cl = cl->next;
|
||||
ngx_free_chain(c->pool, ln);
|
||||
}
|
||||
|
||||
*out = chain;
|
||||
|
||||
if (chain) {
|
||||
if (c->shared) {
|
||||
ngx_log_error(NGX_LOG_ALERT, c->log, 0,
|
||||
"shared connection is busy");
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
c->buffered |= NGX_STREAM_WRITE_BUFFERED;
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
c->buffered &= ~NGX_STREAM_WRITE_BUFFERED;
|
||||
|
||||
if (c->buffered & NGX_LOWLEVEL_BUFFERED) {
|
||||
return NGX_AGAIN;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_stream_write_filter_init(ngx_conf_t *cf)
|
||||
{
|
||||
ngx_stream_top_filter = ngx_stream_write_filter;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
Loading…
Reference in New Issue