From 11b67b956bc403d5c1815af6a6b7404e001435e9 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Tue, 25 Nov 2014 21:10:35 +0100 Subject: MAJOR: session: implement a wait-queue for sessions who need a buffer When a session_alloc_buffers() fails to allocate one or two buffers, it subscribes the session to buffer_wq, and waits for another session to release buffers. It's then removed from the queue and woken up with TASK_WAKE_RES, and can attempt its allocation again. We decide to try to wake as many waiters as we release buffers so that if we release 2 and two waiters need only once, they both have their chance. We must never come to the situation where we don't wake enough tasks up. It's common to release buffers after the completion of an I/O callback, which can happen even if the I/O could not be performed due to half a failure on memory allocation. In this situation, we don't want to move out of the wait queue the session that was just added, otherwise it will never get any buffer. Thus, we only force ourselves out of the queue when freeing the session. Note: at the moment, since session_alloc_buffers() is not used, no task is subscribed to the wait queue. --- include/proto/session.h | 2 ++ include/types/session.h | 1 + src/session.c | 66 +++++++++++++++++++++++++++++++++++++++++-------- src/stream_interface.c | 2 ++ 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/include/proto/session.h b/include/proto/session.h index 907e32d..6cec6de 100644 --- a/include/proto/session.h +++ b/include/proto/session.h @@ -30,6 +30,7 @@ extern struct pool_head *pool2_session; extern struct list sessions; +extern struct list buffer_wq; extern struct data_cb sess_conn_cb; @@ -53,6 +54,7 @@ int parse_track_counters(char **args, int *arg, /* Update the session's backend and server time stats */ void session_update_time_stats(struct session *s); +void session_offer_buffers(int count); int session_alloc_buffers(struct session *s); void session_release_buffers(struct session *s); int session_alloc_one_buffer(struct session *s, struct buffer **buf); diff --git a/include/types/session.h b/include/types/session.h index f17aff4..1f3ba48 100644 --- a/include/types/session.h +++ b/include/types/session.h @@ -123,6 +123,7 @@ struct session { struct list list; /* position in global sessions list */ struct list by_srv; /* position in server session list */ struct list back_refs; /* list of users tracking this session */ + struct list buffer_wait; /* position in the list of sessions waiting for a buffer */ struct { struct stksess *ts; diff --git a/src/session.c b/src/session.c index f7644f5..e8e1e9c 100644 --- a/src/session.c +++ b/src/session.c @@ -51,6 +51,9 @@ struct pool_head *pool2_session; struct list sessions; +/* list of sessions waiting for at least one buffer */ +struct list buffer_wq = LIST_HEAD_INIT(buffer_wq); + static int conn_session_complete(struct connection *conn); static int conn_session_update(struct connection *conn); static struct task *expire_mini_session(struct task *t); @@ -409,6 +412,7 @@ int session_complete(struct session *s) /* OK, we're keeping the session, so let's properly initialize the session */ LIST_ADDQ(&sessions, &s->list); LIST_INIT(&s->back_refs); + LIST_INIT(&s->buffer_wait); s->flags |= SN_INITIALIZED; s->unique_id = NULL; @@ -618,8 +622,16 @@ static void session_free(struct session *s) if (s->rep->pipe) put_pipe(s->rep->pipe); - b_free(&s->req->buf); - b_free(&s->rep->buf); + /* We may still be present in the buffer wait queue */ + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); + } + + b_drop(&s->req->buf); + b_drop(&s->rep->buf); + if (!LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(1); pool_free2(pool2_channel, s->req); pool_free2(pool2_channel, s->rep); @@ -688,9 +700,8 @@ int session_alloc_one_buffer(struct session *s, struct buffer **buf) if (b) return 1; - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + if (LIST_ISEMPTY(&s->buffer_wait)) + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } @@ -703,28 +714,63 @@ int session_alloc_buffers(struct session *s) { int ret; + if (!LIST_ISEMPTY(&s->buffer_wait)) { + LIST_DEL(&s->buffer_wait); + LIST_INIT(&s->buffer_wait); + } + ret = b_alloc_pair(&s->req->buf, &s->rep->buf); if (ret) return ret; - /* FIXME: normally we're supposed to subscribe to a list of waiters - * for buffers. We release what we failed to allocate. - */ + session_release_buffers(s); + LIST_ADDQ(&buffer_wq, &s->buffer_wait); return 0; } /* releases unused buffers after processing. Typically used at the end of the - * update() functions. + * update() functions. It will try to wake up as many tasks as the number of + * buffers that it releases. In practice, most often sessions are blocked on + * a single buffer, so it makes sense to try to wake two up when two buffers + * are released at once. */ void session_release_buffers(struct session *s) { + int release_count = 0; + + release_count = !!s->req->buf->size + !!s->rep->buf->size; + if (s->req->buf->size && buffer_empty(s->req->buf)) b_free(&s->req->buf); if (s->rep->buf->size && buffer_empty(s->rep->buf)) b_free(&s->rep->buf); - /* FIXME: normally we want to wake up pending tasks */ + /* if we're certain to have at least 1 buffer available, and there is + * someone waiting, we can wake up a waiter and offer them. + */ + if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq)) + session_offer_buffers(release_count); +} + +/* run across the list of pending sessions waiting for a buffer and wake + * one up if buffers are available. + */ +void session_offer_buffers(int count) +{ + struct session *sess, *bak; + + list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) { + if (sess->task->state & TASK_RUNNING) + continue; + + LIST_DEL(&sess->buffer_wait); + LIST_INIT(&sess->buffer_wait); + + task_wakeup(sess->task, TASK_WOKEN_RES); + if (--count <= 0) + break; + } } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 9f7e979..5d07125 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include #include +#include #include #include -- 1.7.12.1