Thread pools implementation.

This commit is contained in:
Valentin Bartenev 2015-03-14 17:37:07 +03:00
parent fcb4524be7
commit 065a0f544a
26 changed files with 1112 additions and 2 deletions

1
auto/configure vendored
View File

@ -58,6 +58,7 @@ if [ "$NGX_PLATFORM" != win32 ]; then
. auto/unix
fi
. auto/threads
. auto/modules
. auto/lib/conf

View File

@ -432,6 +432,12 @@ fi
modules="$CORE_MODULES $EVENT_MODULES"
# thread pool module should be initialized after events
if [ $USE_THREADS = YES ]; then
modules="$modules $THREAD_POOL_MODULE"
fi
if [ $USE_OPENSSL = YES ]; then
modules="$modules $OPENSSL_MODULE"
CORE_DEPS="$CORE_DEPS $OPENSSL_DEPS"

View File

@ -190,6 +190,8 @@ do
--without-poll_module) EVENT_POLL=NONE ;;
--with-aio_module) EVENT_AIO=YES ;;
--with-threads) USE_THREADS=YES ;;
--with-file-aio) NGX_FILE_AIO=YES ;;
--with-ipv6) NGX_IPV6=YES ;;
@ -351,6 +353,8 @@ cat << END
--with-poll_module enable poll module
--without-poll_module disable poll module
--with-threads enable thread pool support
--with-file-aio enable file AIO support
--with-ipv6 enable IPv6 support

View File

@ -193,6 +193,13 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
POSIX_DEPS=src/os/unix/ngx_posix_config.h
THREAD_POOL_MODULE=ngx_thread_pool_module
THREAD_POOL_DEPS=src/core/ngx_thread_pool.h
THREAD_POOL_SRCS="src/core/ngx_thread_pool.c
src/os/unix/ngx_thread_cond.c
src/os/unix/ngx_thread_mutex.c
src/os/unix/ngx_thread_id.c"
FREEBSD_DEPS="src/os/unix/ngx_freebsd_config.h src/os/unix/ngx_freebsd.h"
FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c
FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c

View File

@ -7,6 +7,10 @@ echo
echo "Configuration summary"
if [ $USE_THREADS = YES ]; then
echo " + using threads"
fi
if [ $USE_PCRE = DISABLED ]; then
echo " + PCRE library is disabled"

20
auto/threads Normal file
View File

@ -0,0 +1,20 @@
# Copyright (C) Nginx, Inc.
if [ $USE_THREADS = YES ]; then
if [ "$NGX_PLATFORM" = win32 ]; then
cat << END
$0: --with-threads is not supported on Windows
END
exit 1
fi
have=NGX_THREADS . auto/have
CORE_DEPS="$CORE_DEPS $THREAD_POOL_DEPS"
CORE_SRCS="$CORE_SRCS $THREAD_POOL_SRCS"
CORE_LIBS="$CORE_LIBS -lpthread"
fi

View File

@ -22,6 +22,10 @@ typedef struct ngx_event_s ngx_event_t;
typedef struct ngx_event_aio_s ngx_event_aio_t;
typedef struct ngx_connection_s ngx_connection_t;
#if (NGX_THREADS)
typedef struct ngx_thread_task_s ngx_thread_task_t;
#endif
typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);
typedef void (*ngx_connection_handler_pt)(ngx_connection_t *c);

631
src/core/ngx_thread_pool.c Normal file
View File

@ -0,0 +1,631 @@
/*
* Copyright (C) Nginx, Inc.
* Copyright (C) Valentin V. Bartenev
* Copyright (C) Ruslan Ermilov
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_thread_pool.h>
typedef struct {
ngx_array_t pools;
} ngx_thread_pool_conf_t;
typedef struct {
ngx_thread_mutex_t mtx;
ngx_uint_t count;
ngx_thread_task_t *first;
ngx_thread_task_t **last;
} ngx_thread_pool_queue_t;
struct ngx_thread_pool_s {
ngx_thread_cond_t cond;
ngx_thread_pool_queue_t queue;
ngx_log_t *log;
ngx_pool_t *pool;
ngx_str_t name;
ngx_uint_t threads;
ngx_uint_t max_queue;
u_char *file;
ngx_uint_t line;
};
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
ngx_pool_t *pool);
static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue,
ngx_log_t *log);
static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue,
ngx_log_t *log);
static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
static void *ngx_thread_pool_cycle(void *data);
static void ngx_thread_pool_handler(ngx_event_t *ev);
static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);
static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);
static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);
static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);
static ngx_command_t ngx_thread_pool_commands[] = {
{ ngx_string("thread_pool"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23,
ngx_thread_pool,
0,
0,
NULL },
ngx_null_command
};
static ngx_core_module_t ngx_thread_pool_module_ctx = {
ngx_string("thread_pool"),
ngx_thread_pool_create_conf,
ngx_thread_pool_init_conf
};
ngx_module_t ngx_thread_pool_module = {
NGX_MODULE_V1,
&ngx_thread_pool_module_ctx, /* module context */
ngx_thread_pool_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_thread_pool_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_thread_pool_exit_worker, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_str_t ngx_thread_pool_default = ngx_string("default");
static ngx_uint_t ngx_thread_pool_task_id;
static ngx_thread_pool_queue_t ngx_thread_pool_done;
static ngx_int_t
ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
{
int err;
pthread_t tid;
ngx_uint_t n;
pthread_attr_t attr;
if (ngx_notify == NULL) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"the configured event method cannot be used with thread pools");
return NGX_ERROR;
}
if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) {
return NGX_ERROR;
}
if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
(void) ngx_thread_pool_queue_destroy(&tp->queue, log);
return NGX_ERROR;
}
tp->log = log;
tp->pool = pool;
err = pthread_attr_init(&attr);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_attr_init() failed");
return NGX_ERROR;
}
#if 0
err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_attr_setstacksize() failed");
return NGX_ERROR;
}
#endif
for (n = 0; n < tp->threads; n++) {
err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
if (err) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_create() failed");
return NGX_ERROR;
}
}
(void) pthread_attr_destroy(&attr);
return NGX_OK;
}
static ngx_int_t
ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
{
queue->count = 0;
queue->first = NULL;
queue->last = &queue->first;
return ngx_thread_mutex_create(&queue->mtx, log);
}
static ngx_int_t
ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
{
return ngx_thread_mutex_destroy(&queue->mtx, log);
}
static void
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
{
/* TODO: exit threads */
(void) ngx_thread_cond_destroy(&tp->cond, tp->log);
(void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log);
}
ngx_thread_task_t *
ngx_thread_task_alloc(ngx_pool_t *pool, size_t size)
{
ngx_thread_task_t *task;
task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size);
if (task == NULL) {
return NULL;
}
task->ctx = task + 1;
return task;
}
ngx_int_t
ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
{
if (task->event.active) {
ngx_log_error(NGX_LOG_ALERT, tp->log, 0,
"task #%ui already active", task->id);
return NGX_ERROR;
}
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
return NGX_ERROR;
}
if (tp->queue.count >= tp->max_queue) {
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
ngx_log_error(NGX_LOG_ERR, tp->log, 0,
"thread pool \"%V\" queue overflow: %ui tasks waiting",
&tp->name, tp->queue.count);
return NGX_ERROR;
}
task->event.active = 1;
task->id = ngx_thread_pool_task_id++;
task->next = NULL;
if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
return NGX_ERROR;
}
*tp->queue.last = task;
tp->queue.last = &task->next;
tp->queue.count++;
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
"task #%ui added to thread pool \"%V\"",
task->id, &tp->name);
return NGX_OK;
}
static void *
ngx_thread_pool_cycle(void *data)
{
ngx_thread_pool_t *tp = data;
int err;
sigset_t set;
ngx_thread_task_t *task;
#if 0
ngx_time_update();
#endif
ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0,
"thread in pool \"%V\" started", &tp->name);
sigfillset(&set);
sigdelset(&set, SIGILL);
sigdelset(&set, SIGFPE);
sigdelset(&set, SIGSEGV);
sigdelset(&set, SIGBUS);
err = pthread_sigmask(SIG_BLOCK, &set, NULL);
if (err) {
ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed");
return NULL;
}
for ( ;; ) {
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
return NULL;
}
while (tp->queue.count == 0) {
if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
!= NGX_OK)
{
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
return NULL;
}
}
tp->queue.count--;
task = tp->queue.first;
tp->queue.first = task->next;
if (tp->queue.first == NULL) {
tp->queue.last = &tp->queue.first;
}
if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) {
return NULL;
}
#if 0
ngx_time_update();
#endif
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
"run task #%ui in thread pool \"%V\"",
task->id, &tp->name);
task->handler(task->ctx, tp->log);
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
"complete task #%ui in thread pool \"%V\"",
task->id, &tp->name);
task->next = NULL;
if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, tp->log)
!= NGX_OK)
{
return NULL;
}
*ngx_thread_pool_done.last = task;
ngx_thread_pool_done.last = &task->next;
if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, tp->log)
!= NGX_OK)
{
return NULL;
}
(void) ngx_notify(ngx_thread_pool_handler);
}
}
static void
ngx_thread_pool_handler(ngx_event_t *ev)
{
ngx_event_t *event;
ngx_thread_task_t *task;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler");
if (ngx_thread_mutex_lock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
return;
}
task = ngx_thread_pool_done.first;
ngx_thread_pool_done.first = NULL;
ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
if (ngx_thread_mutex_unlock(&ngx_thread_pool_done.mtx, ev->log) != NGX_OK) {
return;
}
while (task) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
"run completion handler for task #%ui", task->id);
event = &task->event;
task = task->next;
event->complete = 1;
event->active = 0;
event->handler(event);
}
}
static void *
ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
{
ngx_thread_pool_conf_t *tcf;
tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
if (tcf == NULL) {
return NULL;
}
if (ngx_array_init(&tcf->pools, cycle->pool, 4,
sizeof(ngx_thread_pool_t *))
!= NGX_OK)
{
return NULL;
}
return tcf;
}
static char *
ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
{
ngx_thread_pool_conf_t *tcf = conf;
ngx_uint_t i;
ngx_thread_pool_t **tpp;
tpp = tcf->pools.elts;
for (i = 0; i < tcf->pools.nelts; i++) {
if (tpp[i]->threads) {
continue;
}
if (tpp[i]->name.len == ngx_thread_pool_default.len
&& ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data,
ngx_thread_pool_default.len)
== 0)
{
tpp[i]->threads = 32;
tpp[i]->max_queue = 65536;
continue;
}
ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
"unknown thread pool \"%V\" in %s:%ui",
&tpp[i]->name, tpp[i]->file, tpp[i]->line);
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *
ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value;
ngx_uint_t i;
ngx_thread_pool_t *tp;
value = cf->args->elts;
tp = ngx_thread_pool_add(cf, &value[1]);
if (tp == NULL) {
return NGX_CONF_ERROR;
}
if (tp->threads) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"duplicate thread pool \"%V\"", &tp->name);
return NGX_CONF_ERROR;
}
tp->max_queue = 65536;
for (i = 2; i < cf->args->nelts; i++) {
if (ngx_strncmp(value[i].data, "threads=", 8) == 0) {
tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8);
if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid threads value \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
continue;
}
if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10);
if (tp->max_queue == (ngx_uint_t) NGX_ERROR) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid max_queue value \"%V\"", &value[i]);
return NGX_CONF_ERROR;
}
continue;
}
}
if (tp->threads == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"\"%V\" must have \"threads\" parameter",
&cmd->name);
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
ngx_thread_pool_t *
ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name)
{
ngx_thread_pool_t *tp, **tpp;
ngx_thread_pool_conf_t *tcf;
if (name == NULL) {
name = &ngx_thread_pool_default;
}
tp = ngx_thread_pool_get(cf->cycle, name);
if (tp) {
return tp;
}
tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t));
if (tp == NULL) {
return NULL;
}
tp->name = *name;
tp->file = cf->conf_file->file.name.data;
tp->line = cf->conf_file->line;
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
ngx_thread_pool_module);
tpp = ngx_array_push(&tcf->pools);
if (tpp == NULL) {
return NULL;
}
*tpp = tp;
return tp;
}
ngx_thread_pool_t *
ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name)
{
ngx_uint_t i;
ngx_thread_pool_t **tpp;
ngx_thread_pool_conf_t *tcf;
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_thread_pool_module);
tpp = tcf->pools.elts;
for (i = 0; i < tcf->pools.nelts; i++) {
if (tpp[i]->name.len == name->len
&& ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0)
{
return tpp[i];
}
}
return NULL;
}
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
{
ngx_uint_t i;
ngx_thread_pool_t **tpp;
ngx_thread_pool_conf_t *tcf;
if (ngx_process != NGX_PROCESS_WORKER
&& ngx_process != NGX_PROCESS_SINGLE)
{
return NGX_OK;
}
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_thread_pool_module);
if (tcf == NULL) {
return NGX_OK;
}
if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log)
!= NGX_OK)
{
return NGX_ERROR;
}
tpp = tcf->pools.elts;
for (i = 0; i < tcf->pools.nelts; i++) {
if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
return NGX_ERROR;
}
}
return NGX_OK;
}
static void
ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
{
ngx_uint_t i;
ngx_thread_pool_t **tpp;
ngx_thread_pool_conf_t *tcf;
if (ngx_process != NGX_PROCESS_WORKER
&& ngx_process != NGX_PROCESS_SINGLE)
{
return;
}
tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_thread_pool_module);
if (tcf == NULL) {
return;
}
tpp = tcf->pools.elts;
for (i = 0; i < tcf->pools.nelts; i++) {
ngx_thread_pool_destroy(tpp[i]);
}
(void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log);
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (C) Nginx, Inc.
* Copyright (C) Valentin V. Bartenev
*/
#ifndef _NGX_THREAD_POOL_H_INCLUDED_
#define _NGX_THREAD_POOL_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
struct ngx_thread_task_s {
ngx_thread_task_t *next;
ngx_uint_t id;
void *ctx;
void (*handler)(void *data, ngx_log_t *log);
ngx_event_t event;
};
typedef struct ngx_thread_pool_s ngx_thread_pool_t;
ngx_thread_pool_t *ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name);
ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name);
ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size);
ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task);
#endif /* _NGX_THREAD_POOL_H_INCLUDED_ */

View File

@ -48,6 +48,7 @@ ngx_event_module_t ngx_aio_module_ctx = {
NULL, /* disable an event */
NULL, /* add an connection */
ngx_aio_del_connection, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_aio_process_events, /* process the events */
ngx_aio_init, /* init the events */

View File

@ -88,6 +88,7 @@ ngx_event_module_t ngx_devpoll_module_ctx = {
ngx_devpoll_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_devpoll_process_events, /* process the events */
ngx_devpoll_init, /* init the events */

View File

@ -164,6 +164,7 @@ ngx_event_module_t ngx_epoll_module_ctx = {
ngx_epoll_del_event, /* disable an event */
ngx_epoll_add_connection, /* add an connection */
ngx_epoll_del_connection, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_epoll_process_events, /* process the events */
ngx_epoll_init, /* init the events */

View File

@ -172,6 +172,7 @@ ngx_event_module_t ngx_eventport_module_ctx = {
ngx_eventport_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_eventport_process_events, /* process the events */
ngx_eventport_init, /* init the events */

View File

@ -64,6 +64,7 @@ ngx_event_module_t ngx_iocp_module_ctx = {
NULL, /* disable an event */
NULL, /* add an connection */
ngx_iocp_del_connection, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_iocp_process_events, /* process the events */
ngx_iocp_init, /* init the events */

View File

@ -89,6 +89,7 @@ ngx_event_module_t ngx_kqueue_module_ctx = {
ngx_kqueue_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
ngx_kqueue_process_changes, /* process the changes */
ngx_kqueue_process_events, /* process the events */
ngx_kqueue_init, /* init the events */

View File

@ -39,6 +39,7 @@ ngx_event_module_t ngx_poll_module_ctx = {
ngx_poll_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_poll_process_events, /* process the events */
ngx_poll_init, /* init the events */

View File

@ -130,6 +130,7 @@ ngx_event_module_t ngx_rtsig_module_ctx = {
NULL, /* disable an event */
ngx_rtsig_add_connection, /* add an connection */
ngx_rtsig_del_connection, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_rtsig_process_events, /* process the events */
ngx_rtsig_init, /* init the events */

View File

@ -47,6 +47,7 @@ ngx_event_module_t ngx_select_module_ctx = {
ngx_select_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_select_process_events, /* process the events */
ngx_select_init, /* init the events */

View File

@ -48,6 +48,7 @@ ngx_event_module_t ngx_select_module_ctx = {
ngx_select_del_event, /* disable an event */
NULL, /* add an connection */
NULL, /* delete an connection */
NULL, /* trigger a notify */
NULL, /* process the changes */
ngx_select_process_events, /* process the events */
ngx_select_init, /* init the events */

View File

@ -178,7 +178,7 @@ ngx_event_module_t ngx_event_core_module_ctx = {
ngx_event_core_create_conf, /* create configuration */
ngx_event_core_init_conf, /* init configuration */
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
{ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL }
};

View File

@ -200,6 +200,8 @@ typedef struct {
ngx_int_t (*add_conn)(ngx_connection_t *c);
ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
ngx_int_t (*notify)(ngx_event_handler_pt handler);
ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,
ngx_uint_t flags);
@ -422,6 +424,8 @@ extern ngx_event_actions_t ngx_event_actions;
#define ngx_add_conn ngx_event_actions.add_conn
#define ngx_del_conn ngx_event_actions.del_conn
#define ngx_notify ngx_event_actions.notify
#define ngx_add_timer ngx_event_add_timer
#define ngx_del_timer ngx_event_del_timer

View File

@ -93,11 +93,11 @@ extern ssize_t sendfile(int s, int fd, int32_t *offset, size_t size);
#endif
#if (NGX_HAVE_FILE_AIO)
#if (NGX_HAVE_SYS_EVENTFD_H)
#include <sys/eventfd.h>
#endif
#include <sys/syscall.h>
#if (NGX_HAVE_FILE_AIO)
#include <linux/aio_abi.h>
typedef struct iocb ngx_aiocb_t;
#endif

View File

@ -111,9 +111,61 @@ ngx_int_t ngx_cond_signal(ngx_cond_t *cv);
#define ngx_thread_volatile
#if (NGX_THREADS)
#include <pthread.h>
typedef pthread_mutex_t ngx_thread_mutex_t;
ngx_int_t ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log);
ngx_int_t ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log);
ngx_int_t ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log);
ngx_int_t ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log);
typedef pthread_cond_t ngx_thread_cond_t;
ngx_int_t ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log);
ngx_int_t ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log);
ngx_int_t ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log);
ngx_int_t ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx,
ngx_log_t *log);
#if (NGX_LINUX)
typedef pid_t ngx_tid_t;
#define NGX_TID_T_FMT "%P"
#elif (NGX_FREEBSD)
typedef uint32_t ngx_tid_t;
#define NGX_TID_T_FMT "%uD"
#elif (NGX_DARWIN)
typedef uint64_t ngx_tid_t;
#define NGX_TID_T_FMT "%uA"
#else
typedef uint64_t ngx_tid_t;
#define NGX_TID_T_FMT "%uA"
#endif
ngx_tid_t ngx_thread_tid(void);
#define ngx_log_tid ngx_thread_tid()
#else
#define ngx_log_tid 0
#define NGX_TID_T_FMT "%d"
#endif
#define ngx_mutex_trylock(m) NGX_OK
#define ngx_mutex_lock(m)
#define ngx_mutex_unlock(m)

View File

@ -0,0 +1,87 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
ngx_int_t
ngx_thread_cond_create(ngx_thread_cond_t *cond, ngx_log_t *log)
{
ngx_err_t err;
err = pthread_cond_init(cond, NULL);
if (err == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_cond_init(%p)", cond);
return NGX_OK;
}
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_init() failed");
return NGX_ERROR;
}
ngx_int_t
ngx_thread_cond_destroy(ngx_thread_cond_t *cond, ngx_log_t *log)
{
ngx_err_t err;
err = pthread_cond_destroy(cond);
if (err == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_cond_destroy(%p)", cond);
return NGX_OK;
}
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_destroy() failed");
return NGX_ERROR;
}
ngx_int_t
ngx_thread_cond_signal(ngx_thread_cond_t *cond, ngx_log_t *log)
{
ngx_err_t err;
err = pthread_cond_signal(cond);
if (err == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_cond_signal(%p)", cond);
return NGX_OK;
}
ngx_log_error(NGX_LOG_EMERG, log, err, "pthread_cond_signal() failed");
return NGX_ERROR;
}
ngx_int_t
ngx_thread_cond_wait(ngx_thread_cond_t *cond, ngx_thread_mutex_t *mtx,
ngx_log_t *log)
{
ngx_err_t err;
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_cond_wait(%p) enter", cond);
err = pthread_cond_wait(cond, mtx);
#if 0
ngx_time_update();
#endif
if (err == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_cond_wait(%p) exit", cond);
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_cond_wait() failed");
return NGX_ERROR;
}

View File

@ -0,0 +1,70 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_thread_pool.h>
#if (NGX_LINUX)
/*
* Linux thread id is a pid of thread created by clone(2),
* glibc does not provide a wrapper for gettid().
*/
ngx_tid_t
ngx_thread_tid(void)
{
return syscall(SYS_gettid);
}
#elif (NGX_FREEBSD) && (__FreeBSD_version >= 900031)
#include <pthread_np.h>
ngx_tid_t
ngx_thread_tid(void)
{
return pthread_getthreadid_np();
}
#elif (NGX_DARWIN)
/*
* MacOSX thread has two thread ids:
*
* 1) MacOSX 10.6 (Snow Leoprad) has pthread_threadid_np() returning
* an uint64_t value, which is obtained using the __thread_selfid()
* syscall. It is a number above 300,000.
*/
ngx_tid_t
ngx_thread_tid(void)
{
uint64_t tid;
(void) pthread_threadid_np(NULL, &tid);
return tid;
}
/*
* 2) Kernel thread mach_port_t returned by pthread_mach_thread_np().
* It is a number in range 100-100,000.
*
* return pthread_mach_thread_np(pthread_self());
*/
#else
ngx_tid_t
ngx_thread_tid(void)
{
return (uint64_t) (uintptr_t) pthread_self();
}
#endif

View File

@ -0,0 +1,174 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Nginx, Inc.
*/
#include <ngx_config.h>
#include <ngx_core.h>
/*
* All modern pthread mutex implementations try to acquire a lock
* atomically in userland before going to sleep in kernel. Some
* spins before the sleeping.
*
* In Solaris since version 8 all mutex types spin before sleeping.
* The default spin count is 1000. It can be overridden using
* _THREAD_ADAPTIVE_SPIN=100 environment variable.
*
* In MacOSX all mutex types spin to acquire a lock protecting a mutex's
* internals. If the mutex is busy, thread calls Mach semaphore_wait().
*
*
* PTHREAD_MUTEX_NORMAL lacks deadlock detection and is the fastest
* mutex type.
*
* Linux: No spinning. The internal name PTHREAD_MUTEX_TIMED_NP
* remains from the times when pthread_mutex_timedlock() was
* non-standard extension. Alias name: PTHREAD_MUTEX_FAST_NP.
* FreeBSD: No spinning.
*
*
* PTHREAD_MUTEX_ERRORCHECK is usually as fast as PTHREAD_MUTEX_NORMAL
* yet has lightweight deadlock detection.
*
* Linux: No spinning. The internal name: PTHREAD_MUTEX_ERRORCHECK_NP.
* FreeBSD: No spinning.
*
*
* PTHREAD_MUTEX_RECURSIVE allows recursive locking.
*
* Linux: No spinning. The internal name: PTHREAD_MUTEX_RECURSIVE_NP.
* FreeBSD: No spinning.
*
*
* PTHREAD_MUTEX_ADAPTIVE_NP spins on SMP systems before sleeping.
*
* Linux: No deadlock detection. Dynamically changes a spin count
* for each mutex from 10 to 100 based on spin count taken
* previously.
* FreeBSD: Deadlock detection. The default spin count is 2000.
* It can be overriden using LIBPTHREAD_SPINLOOPS environment
* variable or by pthread_mutex_setspinloops_np(). If a lock
* is still busy, sched_yield() can be called on both UP and
* SMP systems. The default yield loop count is zero, but
* it can be set by LIBPTHREAD_YIELDLOOPS environment
* variable or by pthread_mutex_setyieldloops_np().
* Solaris: No PTHREAD_MUTEX_ADAPTIVE_NP.
* MacOSX: No PTHREAD_MUTEX_ADAPTIVE_NP.
*
*
* PTHREAD_MUTEX_ELISION_NP is a Linux extension to elide locks using
* Intel Restricted Transactional Memory. It is the most suitable for
* rwlock pattern access because it allows simultaneous reads without lock.
* Supported since glibc 2.18.
*
*
* PTHREAD_MUTEX_DEFAULT is default mutex type.
*
* Linux: PTHREAD_MUTEX_NORMAL.
* FreeBSD: PTHREAD_MUTEX_ERRORCHECK.
* Solaris: PTHREAD_MUTEX_NORMAL.
* MacOSX: PTHREAD_MUTEX_NORMAL.
*/
ngx_int_t
ngx_thread_mutex_create(ngx_thread_mutex_t *mtx, ngx_log_t *log)
{
ngx_err_t err;
pthread_mutexattr_t attr;
err = pthread_mutexattr_init(&attr);
if (err != 0) {
ngx_log_error(NGX_LOG_EMERG, log, err,
"pthread_mutexattr_init() failed");
return NGX_ERROR;
}
err = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
if (err != 0) {
ngx_log_error(NGX_LOG_EMERG, log, err,
"pthread_mutexattr_settype"
"(PTHREAD_MUTEX_ERRORCHECK) failed");
return NGX_ERROR;
}
err = pthread_mutex_init(mtx, &attr);
if (err != 0) {
ngx_log_error(NGX_LOG_EMERG, log, err,
"pthread_mutex_init() failed");
return NGX_ERROR;
}
err = pthread_mutexattr_destroy(&attr);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_mutexattr_destroy() failed");
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_mutex_init(%p)", mtx);
return NGX_OK;
}
ngx_int_t
ngx_thread_mutex_destroy(ngx_thread_mutex_t *mtx, ngx_log_t *log)
{
ngx_err_t err;
err = pthread_mutex_destroy(mtx);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, log, err,
"pthread_mutex_destroy() failed");
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_mutex_destroy(%p)", mtx);
return NGX_OK;
}
ngx_int_t
ngx_thread_mutex_lock(ngx_thread_mutex_t *mtx, ngx_log_t *log)
{
ngx_err_t err;
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_mutex_lock(%p) enter", mtx);
err = pthread_mutex_lock(mtx);
if (err == 0) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_lock() failed");
return NGX_ERROR;
}
ngx_int_t
ngx_thread_mutex_unlock(ngx_thread_mutex_t *mtx, ngx_log_t *log)
{
ngx_err_t err;
err = pthread_mutex_unlock(mtx);
#if 0
ngx_time_update();
#endif
if (err == 0) {
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"pthread_mutex_unlock(%p) exit", mtx);
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_mutex_unlock() failed");
return NGX_ERROR;
}