Introduce new per-thread lock to protect the list of requests.

This allows to slightly simplify svc_run_internal() code: if we processed
all the requests in a queue, then we know that new one will not appear.

MFC after:	2 weeks
This commit is contained in:
Alexander Motin 2014-06-08 09:40:26 +00:00
parent 45b4fb0449
commit b776fb2d67
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=267221
3 changed files with 57 additions and 85 deletions

View File

@ -288,11 +288,7 @@ fha_hash_entry_add_op(struct fha_hash_entry *fhe, int locktype, int count)
* Get the service thread currently associated with the fhe that is
* appropriate to handle this operation.
*/
SVCTHREAD *
fha_hash_entry_choose_thread(struct fha_params *softc,
struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread);
SVCTHREAD *
static SVCTHREAD *
fha_hash_entry_choose_thread(struct fha_params *softc,
struct fha_hash_entry *fhe, struct fha_info *i, SVCTHREAD *this_thread)
{
@ -428,13 +424,13 @@ fha_assign(SVCTHREAD *this_thread, struct svc_req *req,
* Grab the pool lock here to not let chosen thread go away before
* the new request inserted to its queue while we drop fhe lock.
*/
mtx_lock(&(*softc->pool)->sp_lock);
mtx_lock(&thread->st_lock);
mtx_unlock(fhe->mtx);
return (thread);
thist:
req->rq_p1 = NULL;
mtx_lock(&(*softc->pool)->sp_lock);
mtx_lock(&this_thread->st_lock);
return (this_thread);
}

View File

@ -1070,7 +1070,6 @@ svc_request_space_available(SVCPOOL *pool)
static void
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
{
struct svc_reqlist reqs;
SVCTHREAD *st, *stpref;
SVCXPRT *xprt;
enum xprt_stat stat;
@ -1079,11 +1078,11 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
int error;
st = mem_alloc(sizeof(*st));
mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
st->st_pool = pool;
st->st_xprt = NULL;
STAILQ_INIT(&st->st_reqs);
cv_init(&st->st_cond, "rpcsvc");
STAILQ_INIT(&reqs);
mtx_lock(&pool->sp_lock);
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
@ -1117,7 +1116,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
xprt = st->st_xprt;
if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
if (!xprt) {
/*
* Enforce maxthreads count.
*/
@ -1159,8 +1158,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
if (!ismaster
&& (pool->sp_threadcount
> pool->sp_minthreads)
&& !st->st_xprt
&& STAILQ_EMPTY(&st->st_reqs))
&& !st->st_xprt)
break;
} else if (error) {
mtx_unlock(&pool->sp_lock);
@ -1170,93 +1168,69 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
}
continue;
}
mtx_unlock(&pool->sp_lock);
if (xprt) {
/*
* Drain the transport socket and queue up any
* RPCs.
*/
xprt->xp_lastactive = time_uptime;
do {
if (!svc_request_space_available(pool))
break;
mtx_unlock(&pool->sp_lock);
rqstp = NULL;
stat = svc_getreq(xprt, &rqstp);
if (rqstp) {
svc_change_space_used(pool, rqstp->rq_size);
/*
* See if the application has
* a preference for some other
* thread.
*/
stpref = st;
if (pool->sp_assign)
stpref = pool->sp_assign(st,
rqstp);
else
mtx_lock(&pool->sp_lock);
rqstp->rq_thread = stpref;
/*
* Drain the transport socket and queue up any RPCs.
*/
xprt->xp_lastactive = time_uptime;
do {
if (!svc_request_space_available(pool))
break;
rqstp = NULL;
stat = svc_getreq(xprt, &rqstp);
if (rqstp) {
svc_change_space_used(pool, rqstp->rq_size);
/*
* See if the application has a preference
* for some other thread.
*/
if (pool->sp_assign) {
stpref = pool->sp_assign(st, rqstp);
STAILQ_INSERT_TAIL(&stpref->st_reqs,
rqstp, rq_link);
/*
* If we assigned the request
* to another thread, make
* sure its awake and continue
* reading from the
* socket. Otherwise, try to
* find some other thread to
* read from the socket and
* execute the request
* immediately.
*/
if (stpref == st)
break;
if (stpref->st_idle) {
LIST_REMOVE(stpref, st_ilink);
stpref->st_idle = FALSE;
cv_signal(&stpref->st_cond);
}
mtx_unlock(&stpref->st_lock);
rqstp->rq_thread = stpref;
if (stpref != st)
rqstp = NULL;
} else
mtx_lock(&pool->sp_lock);
} while (stat == XPRT_MOREREQS
&& pool->sp_state != SVCPOOL_CLOSING);
/*
* Move this transport to the end of the
* active list to ensure fairness when
* multiple transports are active. If this was
* the last queued request, svc_getreq will
* end up calling xprt_inactive to remove from
* the active list.
*/
xprt->xp_thread = NULL;
st->st_xprt = NULL;
if (xprt->xp_active) {
if (!svc_request_space_available(pool) ||
!xprt_assignthread(xprt))
TAILQ_INSERT_TAIL(&pool->sp_active,
xprt, xp_alink);
STAILQ_INSERT_TAIL(&st->st_reqs,
rqstp, rq_link);
}
STAILQ_CONCAT(&reqs, &st->st_reqs);
mtx_unlock(&pool->sp_lock);
SVC_RELEASE(xprt);
} else {
STAILQ_CONCAT(&reqs, &st->st_reqs);
mtx_unlock(&pool->sp_lock);
} while (rqstp == NULL && stat == XPRT_MOREREQS
&& pool->sp_state != SVCPOOL_CLOSING);
/*
* Move this transport to the end of the active list to
* ensure fairness when multiple transports are active.
* If this was the last queued request, svc_getreq will end
* up calling xprt_inactive to remove from the active list.
*/
mtx_lock(&pool->sp_lock);
xprt->xp_thread = NULL;
st->st_xprt = NULL;
if (xprt->xp_active) {
if (!svc_request_space_available(pool) ||
!xprt_assignthread(xprt))
TAILQ_INSERT_TAIL(&pool->sp_active,
xprt, xp_alink);
}
mtx_unlock(&pool->sp_lock);
SVC_RELEASE(xprt);
/*
* Execute what we have queued.
*/
sz = 0;
while ((rqstp = STAILQ_FIRST(&reqs)) != NULL) {
STAILQ_REMOVE_HEAD(&reqs, rq_link);
mtx_lock(&st->st_lock);
while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
mtx_unlock(&st->st_lock);
sz += rqstp->rq_size;
svc_executereq(rqstp);
mtx_lock(&st->st_lock);
}
mtx_unlock(&st->st_lock);
svc_change_space_used(pool, -sz);
mtx_lock(&pool->sp_lock);
}
@ -1273,6 +1247,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
mtx_unlock(&pool->sp_lock);
mtx_destroy(&st->st_lock);
cv_destroy(&st->st_cond);
mem_free(st, sizeof(*st));

View File

@ -291,6 +291,7 @@ STAILQ_HEAD(svc_reqlist, svc_req);
* thread to read and execute pending RPCs.
*/
typedef struct __rpc_svcthread {
struct mtx_padalign st_lock; /* protects st_reqs field */
struct __rpc_svcpool *st_pool;
SVCXPRT *st_xprt; /* transport we are processing */
struct svc_reqlist st_reqs; /* RPC requests to execute */