134e17798c
Approved by: trasz MFC after: 1 month Sponsored by: Conclusive Engineering (development), vStack.com (funding)
423 lines
12 KiB
C
423 lines
12 KiB
C
/*
|
|
* Copyright 2016 Jakub Klama <jceel@FreeBSD.org>
|
|
* All rights reserved
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted providing that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
|
|
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
|
|
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
|
|
* STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
|
|
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
#include <errno.h>
|
|
#include <stdlib.h>
|
|
#include <pthread.h>
|
|
#if defined(__FreeBSD__)
|
|
#include <pthread_np.h>
|
|
#endif
|
|
#include <sys/queue.h>
|
|
#include "lib9p.h"
|
|
#include "threadpool.h"
|
|
|
|
static void l9p_threadpool_rflush(struct l9p_threadpool *tp,
|
|
struct l9p_request *req);
|
|
|
|
static void *
|
|
l9p_responder(void *arg)
|
|
{
|
|
struct l9p_threadpool *tp;
|
|
struct l9p_worker *worker = arg;
|
|
struct l9p_request *req;
|
|
|
|
tp = worker->ltw_tp;
|
|
for (;;) {
|
|
/* get next reply to send */
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
while (STAILQ_EMPTY(&tp->ltp_replyq) && !worker->ltw_exiting)
|
|
pthread_cond_wait(&tp->ltp_reply_cv, &tp->ltp_mtx);
|
|
if (worker->ltw_exiting) {
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
break;
|
|
}
|
|
|
|
/* off reply queue */
|
|
req = STAILQ_FIRST(&tp->ltp_replyq);
|
|
STAILQ_REMOVE_HEAD(&tp->ltp_replyq, lr_worklink);
|
|
|
|
/* request is now in final glide path, can't be Tflush-ed */
|
|
req->lr_workstate = L9P_WS_REPLYING;
|
|
|
|
/* any flushers waiting for this request can go now */
|
|
if (req->lr_flushstate != L9P_FLUSH_NONE)
|
|
l9p_threadpool_rflush(tp, req);
|
|
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
|
|
/* send response */
|
|
l9p_respond(req, false, true);
|
|
}
|
|
return (NULL);
|
|
}
|
|
|
|
static void *
|
|
l9p_worker(void *arg)
|
|
{
|
|
struct l9p_threadpool *tp;
|
|
struct l9p_worker *worker = arg;
|
|
struct l9p_request *req;
|
|
|
|
tp = worker->ltw_tp;
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
for (;;) {
|
|
while (STAILQ_EMPTY(&tp->ltp_workq) && !worker->ltw_exiting)
|
|
pthread_cond_wait(&tp->ltp_work_cv, &tp->ltp_mtx);
|
|
if (worker->ltw_exiting)
|
|
break;
|
|
|
|
/* off work queue; now work-in-progress, by us */
|
|
req = STAILQ_FIRST(&tp->ltp_workq);
|
|
STAILQ_REMOVE_HEAD(&tp->ltp_workq, lr_worklink);
|
|
req->lr_workstate = L9P_WS_INPROGRESS;
|
|
req->lr_worker = worker;
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
|
|
/* actually try the request */
|
|
req->lr_error = l9p_dispatch_request(req);
|
|
|
|
/* move to responder queue, updating work-state */
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
req->lr_workstate = L9P_WS_RESPQUEUED;
|
|
req->lr_worker = NULL;
|
|
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
|
|
|
|
/* signal the responder */
|
|
pthread_cond_signal(&tp->ltp_reply_cv);
|
|
}
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
return (NULL);
|
|
}
|
|
|
|
/*
|
|
* Just before finally replying to a request that got touched by
|
|
* a Tflush request, we enqueue its flushers (requests of type
|
|
* Tflush, which are now on the flushee's lr_flushq) onto the
|
|
* response queue.
|
|
*/
|
|
static void
|
|
l9p_threadpool_rflush(struct l9p_threadpool *tp, struct l9p_request *req)
|
|
{
|
|
struct l9p_request *flusher;
|
|
|
|
/*
|
|
* https://swtch.com/plan9port/man/man9/flush.html says:
|
|
*
|
|
* "Should multiple Tflushes be received for a pending
|
|
* request, they must be answered in order. A Rflush for
|
|
* any of the multiple Tflushes implies an answer for all
|
|
* previous ones. Therefore, should a server receive a
|
|
* request and then multiple flushes for that request, it
|
|
* need respond only to the last flush." This means
|
|
* we could march through the queue of flushers here,
|
|
* marking all but the last one as "to be dropped" rather
|
|
* than "to be replied-to".
|
|
*
|
|
* However, we'll leave that for later, if ever -- it
|
|
* should be harmless to respond to each, in order.
|
|
*/
|
|
STAILQ_FOREACH(flusher, &req->lr_flushq, lr_flushlink) {
|
|
flusher->lr_workstate = L9P_WS_RESPQUEUED;
|
|
#ifdef notdef
|
|
if (not the last) {
|
|
flusher->lr_flushstate = L9P_FLUSH_NOT_RUN;
|
|
/* or, flusher->lr_drop = true ? */
|
|
}
|
|
#endif
|
|
STAILQ_INSERT_TAIL(&tp->ltp_replyq, flusher, lr_worklink);
|
|
}
|
|
}
|
|
|
|
int
|
|
l9p_threadpool_init(struct l9p_threadpool *tp, int size)
|
|
{
|
|
struct l9p_worker *worker;
|
|
#if defined(__FreeBSD__)
|
|
char threadname[16];
|
|
#endif
|
|
int error;
|
|
int i, nworkers, nresponders;
|
|
|
|
if (size <= 0)
|
|
return (EINVAL);
|
|
error = pthread_mutex_init(&tp->ltp_mtx, NULL);
|
|
if (error)
|
|
return (error);
|
|
error = pthread_cond_init(&tp->ltp_work_cv, NULL);
|
|
if (error)
|
|
goto fail_work_cv;
|
|
error = pthread_cond_init(&tp->ltp_reply_cv, NULL);
|
|
if (error)
|
|
goto fail_reply_cv;
|
|
|
|
STAILQ_INIT(&tp->ltp_workq);
|
|
STAILQ_INIT(&tp->ltp_replyq);
|
|
LIST_INIT(&tp->ltp_workers);
|
|
|
|
nresponders = 0;
|
|
nworkers = 0;
|
|
for (i = 0; i <= size; i++) {
|
|
worker = calloc(1, sizeof(struct l9p_worker));
|
|
worker->ltw_tp = tp;
|
|
worker->ltw_responder = i == 0;
|
|
error = pthread_create(&worker->ltw_thread, NULL,
|
|
worker->ltw_responder ? l9p_responder : l9p_worker,
|
|
(void *)worker);
|
|
if (error) {
|
|
free(worker);
|
|
break;
|
|
}
|
|
if (worker->ltw_responder)
|
|
nresponders++;
|
|
else
|
|
nworkers++;
|
|
|
|
#if defined(__FreeBSD__)
|
|
if (worker->ltw_responder) {
|
|
pthread_set_name_np(worker->ltw_thread, "9p-responder");
|
|
} else {
|
|
sprintf(threadname, "9p-worker:%d", i - 1);
|
|
pthread_set_name_np(worker->ltw_thread, threadname);
|
|
}
|
|
#endif
|
|
|
|
LIST_INSERT_HEAD(&tp->ltp_workers, worker, ltw_link);
|
|
}
|
|
if (nresponders == 0 || nworkers == 0) {
|
|
/* need the one responder, and at least one worker */
|
|
l9p_threadpool_shutdown(tp);
|
|
return (error);
|
|
}
|
|
return (0);
|
|
|
|
/*
|
|
* We could avoid these labels by having multiple destroy
|
|
* paths (one for each error case), or by having booleans
|
|
* for which variables were initialized. Neither is very
|
|
* appealing...
|
|
*/
|
|
fail_reply_cv:
|
|
pthread_cond_destroy(&tp->ltp_work_cv);
|
|
fail_work_cv:
|
|
pthread_mutex_destroy(&tp->ltp_mtx);
|
|
|
|
return (error);
|
|
}
|
|
|
|
/*
|
|
* Run a request, usually by queueing it.
|
|
*/
|
|
void
|
|
l9p_threadpool_run(struct l9p_threadpool *tp, struct l9p_request *req)
|
|
{
|
|
|
|
/*
|
|
* Flush requests must be handled specially, since they
|
|
* can cancel / kill off regular requests. (But we can
|
|
* run them through the regular dispatch mechanism.)
|
|
*/
|
|
if (req->lr_req.hdr.type == L9P_TFLUSH) {
|
|
/* not on a work queue yet so we can touch state */
|
|
req->lr_workstate = L9P_WS_IMMEDIATE;
|
|
(void) l9p_dispatch_request(req);
|
|
} else {
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
req->lr_workstate = L9P_WS_NOTSTARTED;
|
|
STAILQ_INSERT_TAIL(&tp->ltp_workq, req, lr_worklink);
|
|
pthread_cond_signal(&tp->ltp_work_cv);
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Run a Tflush request. Called via l9p_dispatch_request() since
|
|
* it has some debug code in it, but not called from worker thread.
|
|
*/
|
|
int
|
|
l9p_threadpool_tflush(struct l9p_request *req)
|
|
{
|
|
struct l9p_connection *conn;
|
|
struct l9p_threadpool *tp;
|
|
struct l9p_request *flushee;
|
|
uint16_t oldtag;
|
|
enum l9p_flushstate nstate;
|
|
|
|
/*
|
|
* Find what we're supposed to flush (the flushee, as it were).
|
|
*/
|
|
req->lr_error = 0; /* Tflush always succeeds */
|
|
conn = req->lr_conn;
|
|
tp = &conn->lc_tp;
|
|
oldtag = req->lr_req.tflush.oldtag;
|
|
ht_wrlock(&conn->lc_requests);
|
|
flushee = ht_find_locked(&conn->lc_requests, oldtag);
|
|
if (flushee == NULL) {
|
|
/*
|
|
* Nothing to flush! The old request must have
|
|
* been done and gone already. Just queue this
|
|
* Tflush for a success reply.
|
|
*/
|
|
ht_unlock(&conn->lc_requests);
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
goto done;
|
|
}
|
|
|
|
/*
|
|
* Found the original request. We'll need to inspect its
|
|
* work-state to figure out what to do.
|
|
*/
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
ht_unlock(&conn->lc_requests);
|
|
|
|
switch (flushee->lr_workstate) {
|
|
|
|
case L9P_WS_NOTSTARTED:
|
|
/*
|
|
* Flushee is on work queue, but not yet being
|
|
* handled by a worker.
|
|
*
|
|
* The documentation -- see
|
|
* http://ericvh.github.io/9p-rfc/rfc9p2000.html
|
|
* https://swtch.com/plan9port/man/man9/flush.html
|
|
* -- says that "the server should answer the
|
|
* flush message immediately". However, Linux
|
|
* sends flush requests for operations that
|
|
* must finish, such as Tclunk, and it's not
|
|
* possible to *answer* the flush request until
|
|
* it has been handled (if necessary) or aborted
|
|
* (if allowed).
|
|
*
|
|
* We therefore now just the original request
|
|
* and let the request-handler do whatever is
|
|
* appropriate. NOTE: we could have a table of
|
|
* "requests that can be aborted without being
|
|
* run" vs "requests that must be run to be
|
|
* aborted", but for now that seems like an
|
|
* unnecessary complication.
|
|
*/
|
|
nstate = L9P_FLUSH_REQUESTED_PRE_START;
|
|
break;
|
|
|
|
case L9P_WS_IMMEDIATE:
|
|
/*
|
|
* This state only applies to Tflush requests, and
|
|
* flushing a Tflush is illegal. But we'll do nothing
|
|
* special here, which will make us act like a flush
|
|
* request for the flushee that arrived too late to
|
|
* do anything about the flushee.
|
|
*/
|
|
nstate = L9P_FLUSH_REQUESTED_POST_START;
|
|
break;
|
|
|
|
case L9P_WS_INPROGRESS:
|
|
/*
|
|
* Worker thread flushee->lr_worker is working on it.
|
|
* Kick it to get it out of blocking system calls.
|
|
* (This requires that it carefully set up some
|
|
* signal handlers, and may be FreeBSD-dependent,
|
|
* it probably cannot be handled this way on MacOS.)
|
|
*/
|
|
#ifdef notyet
|
|
pthread_kill(...);
|
|
#endif
|
|
nstate = L9P_FLUSH_REQUESTED_POST_START;
|
|
break;
|
|
|
|
case L9P_WS_RESPQUEUED:
|
|
/*
|
|
* The flushee is already in the response queue.
|
|
* We'll just mark it as having had some flush
|
|
* action applied.
|
|
*/
|
|
nstate = L9P_FLUSH_TOOLATE;
|
|
break;
|
|
|
|
case L9P_WS_REPLYING:
|
|
/*
|
|
* Although we found the flushee, it's too late to
|
|
* make us depend on it: it's already heading out
|
|
* the door as a reply.
|
|
*
|
|
* We don't want to do anything to the flushee.
|
|
* Instead, we want to work the same way as if
|
|
* we had never found the tag.
|
|
*/
|
|
goto done;
|
|
}
|
|
|
|
/*
|
|
* Now add us to the list of Tflush-es that are waiting
|
|
* for the flushee (creating the list if needed, i.e., if
|
|
* this is the first Tflush for the flushee). We (req)
|
|
* will get queued for reply later, when the responder
|
|
* processes the flushee and calls l9p_threadpool_rflush().
|
|
*/
|
|
if (flushee->lr_flushstate == L9P_FLUSH_NONE)
|
|
STAILQ_INIT(&flushee->lr_flushq);
|
|
flushee->lr_flushstate = nstate;
|
|
STAILQ_INSERT_TAIL(&flushee->lr_flushq, req, lr_flushlink);
|
|
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
|
|
return (0);
|
|
|
|
done:
|
|
/*
|
|
* This immediate op is ready to be replied-to now, so just
|
|
* stick it onto the reply queue.
|
|
*/
|
|
req->lr_workstate = L9P_WS_RESPQUEUED;
|
|
STAILQ_INSERT_TAIL(&tp->ltp_replyq, req, lr_worklink);
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
pthread_cond_signal(&tp->ltp_reply_cv);
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
l9p_threadpool_shutdown(struct l9p_threadpool *tp)
|
|
{
|
|
struct l9p_worker *worker, *tmp;
|
|
|
|
LIST_FOREACH_SAFE(worker, &tp->ltp_workers, ltw_link, tmp) {
|
|
pthread_mutex_lock(&tp->ltp_mtx);
|
|
worker->ltw_exiting = true;
|
|
if (worker->ltw_responder)
|
|
pthread_cond_signal(&tp->ltp_reply_cv);
|
|
else
|
|
pthread_cond_broadcast(&tp->ltp_work_cv);
|
|
pthread_mutex_unlock(&tp->ltp_mtx);
|
|
pthread_join(worker->ltw_thread, NULL);
|
|
LIST_REMOVE(worker, ltw_link);
|
|
free(worker);
|
|
}
|
|
pthread_cond_destroy(&tp->ltp_reply_cv);
|
|
pthread_cond_destroy(&tp->ltp_work_cv);
|
|
pthread_mutex_destroy(&tp->ltp_mtx);
|
|
|
|
return (0);
|
|
}
|