MFC r267228:
Split RPC pool threads into number of smaller semi-isolated groups. Old design with unified thread pool was good from the point of thread utilization. But single pool-wide mutex became huge congestion point for systems with many CPUs. To reduce the congestion create several thread groups within a pool (one group for every 6 CPUs and 12 threads), each group with own mutex. Each connection during its registration is assigned to one of the groups in round-robin fashion. File affinify code may still move requests between the groups, but otherwise groups are self-contained.
This commit is contained in:
parent
1d8eae9d0a
commit
23b32162ce
337
sys/rpc/svc.c
337
sys/rpc/svc.c
@ -56,6 +56,7 @@ __FBSDID("$FreeBSD$");
|
|||||||
#include <sys/queue.h>
|
#include <sys/queue.h>
|
||||||
#include <sys/socketvar.h>
|
#include <sys/socketvar.h>
|
||||||
#include <sys/systm.h>
|
#include <sys/systm.h>
|
||||||
|
#include <sys/smp.h>
|
||||||
#include <sys/sx.h>
|
#include <sys/sx.h>
|
||||||
#include <sys/ucred.h>
|
#include <sys/ucred.h>
|
||||||
|
|
||||||
@ -70,7 +71,7 @@ __FBSDID("$FreeBSD$");
|
|||||||
|
|
||||||
static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
|
static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
|
||||||
char *);
|
char *);
|
||||||
static void svc_new_thread(SVCPOOL *pool);
|
static void svc_new_thread(SVCGROUP *grp);
|
||||||
static void xprt_unregister_locked(SVCXPRT *xprt);
|
static void xprt_unregister_locked(SVCXPRT *xprt);
|
||||||
static void svc_change_space_used(SVCPOOL *pool, int delta);
|
static void svc_change_space_used(SVCPOOL *pool, int delta);
|
||||||
static bool_t svc_request_space_available(SVCPOOL *pool);
|
static bool_t svc_request_space_available(SVCPOOL *pool);
|
||||||
@ -79,11 +80,14 @@ static bool_t svc_request_space_available(SVCPOOL *pool);
|
|||||||
|
|
||||||
static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
|
static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
|
||||||
static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
|
static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
|
||||||
|
static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
|
||||||
|
|
||||||
SVCPOOL*
|
SVCPOOL*
|
||||||
svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
|
svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool;
|
SVCPOOL *pool;
|
||||||
|
SVCGROUP *grp;
|
||||||
|
int g;
|
||||||
|
|
||||||
pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
|
pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
|
||||||
|
|
||||||
@ -91,15 +95,22 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
|
|||||||
pool->sp_name = name;
|
pool->sp_name = name;
|
||||||
pool->sp_state = SVCPOOL_INIT;
|
pool->sp_state = SVCPOOL_INIT;
|
||||||
pool->sp_proc = NULL;
|
pool->sp_proc = NULL;
|
||||||
TAILQ_INIT(&pool->sp_xlist);
|
|
||||||
TAILQ_INIT(&pool->sp_active);
|
|
||||||
TAILQ_INIT(&pool->sp_callouts);
|
TAILQ_INIT(&pool->sp_callouts);
|
||||||
TAILQ_INIT(&pool->sp_lcallouts);
|
TAILQ_INIT(&pool->sp_lcallouts);
|
||||||
LIST_INIT(&pool->sp_threads);
|
|
||||||
LIST_INIT(&pool->sp_idlethreads);
|
|
||||||
pool->sp_minthreads = 1;
|
pool->sp_minthreads = 1;
|
||||||
pool->sp_maxthreads = 1;
|
pool->sp_maxthreads = 1;
|
||||||
pool->sp_threadcount = 0;
|
pool->sp_groupcount = 1;
|
||||||
|
for (g = 0; g < SVC_MAXGROUPS; g++) {
|
||||||
|
grp = &pool->sp_groups[g];
|
||||||
|
mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
|
||||||
|
grp->sg_pool = pool;
|
||||||
|
grp->sg_state = SVCPOOL_ACTIVE;
|
||||||
|
TAILQ_INIT(&grp->sg_xlist);
|
||||||
|
TAILQ_INIT(&grp->sg_active);
|
||||||
|
LIST_INIT(&grp->sg_idlethreads);
|
||||||
|
grp->sg_minthreads = 1;
|
||||||
|
grp->sg_maxthreads = 1;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Don't use more than a quarter of mbuf clusters or more than
|
* Don't use more than a quarter of mbuf clusters or more than
|
||||||
@ -114,12 +125,19 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
|
|||||||
if (sysctl_base) {
|
if (sysctl_base) {
|
||||||
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
||||||
"minthreads", CTLTYPE_INT | CTLFLAG_RW,
|
"minthreads", CTLTYPE_INT | CTLFLAG_RW,
|
||||||
pool, 0, svcpool_minthread_sysctl, "I", "");
|
pool, 0, svcpool_minthread_sysctl, "I",
|
||||||
|
"Minimal number of threads");
|
||||||
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
||||||
"maxthreads", CTLTYPE_INT | CTLFLAG_RW,
|
"maxthreads", CTLTYPE_INT | CTLFLAG_RW,
|
||||||
pool, 0, svcpool_maxthread_sysctl, "I", "");
|
pool, 0, svcpool_maxthread_sysctl, "I",
|
||||||
|
"Maximal number of threads");
|
||||||
|
SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
||||||
|
"threads", CTLTYPE_INT | CTLFLAG_RD,
|
||||||
|
pool, 0, svcpool_threads_sysctl, "I",
|
||||||
|
"Current number of threads");
|
||||||
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
||||||
"threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
|
"groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
|
||||||
|
"Number of thread groups");
|
||||||
|
|
||||||
SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
|
||||||
"request_space_used", CTLFLAG_RD,
|
"request_space_used", CTLFLAG_RD,
|
||||||
@ -158,20 +176,29 @@ svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
|
|||||||
void
|
void
|
||||||
svcpool_destroy(SVCPOOL *pool)
|
svcpool_destroy(SVCPOOL *pool)
|
||||||
{
|
{
|
||||||
|
SVCGROUP *grp;
|
||||||
SVCXPRT *xprt, *nxprt;
|
SVCXPRT *xprt, *nxprt;
|
||||||
struct svc_callout *s;
|
struct svc_callout *s;
|
||||||
struct svc_loss_callout *sl;
|
struct svc_loss_callout *sl;
|
||||||
struct svcxprt_list cleanup;
|
struct svcxprt_list cleanup;
|
||||||
|
int g;
|
||||||
|
|
||||||
TAILQ_INIT(&cleanup);
|
TAILQ_INIT(&cleanup);
|
||||||
mtx_lock(&pool->sp_lock);
|
|
||||||
|
|
||||||
while (TAILQ_FIRST(&pool->sp_xlist)) {
|
for (g = 0; g < SVC_MAXGROUPS; g++) {
|
||||||
xprt = TAILQ_FIRST(&pool->sp_xlist);
|
grp = &pool->sp_groups[g];
|
||||||
|
mtx_lock(&grp->sg_lock);
|
||||||
|
while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
|
||||||
xprt_unregister_locked(xprt);
|
xprt_unregister_locked(xprt);
|
||||||
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
|
TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
|
||||||
}
|
}
|
||||||
|
mtx_unlock(&grp->sg_lock);
|
||||||
|
}
|
||||||
|
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
|
||||||
|
SVC_RELEASE(xprt);
|
||||||
|
}
|
||||||
|
|
||||||
|
mtx_lock(&pool->sp_lock);
|
||||||
while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
|
while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&pool->sp_lock);
|
||||||
svc_unreg(pool, s->sc_prog, s->sc_vers);
|
svc_unreg(pool, s->sc_prog, s->sc_vers);
|
||||||
@ -184,10 +211,10 @@ svcpool_destroy(SVCPOOL *pool)
|
|||||||
}
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&pool->sp_lock);
|
||||||
|
|
||||||
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
|
for (g = 0; g < SVC_MAXGROUPS; g++) {
|
||||||
SVC_RELEASE(xprt);
|
grp = &pool->sp_groups[g];
|
||||||
|
mtx_destroy(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
mtx_destroy(&pool->sp_lock);
|
mtx_destroy(&pool->sp_lock);
|
||||||
|
|
||||||
if (pool->sp_rcache)
|
if (pool->sp_rcache)
|
||||||
@ -197,14 +224,23 @@ svcpool_destroy(SVCPOOL *pool)
|
|||||||
free(pool, M_RPC);
|
free(pool, M_RPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool_t
|
/*
|
||||||
svcpool_active(SVCPOOL *pool)
|
* Sysctl handler to get the present thread count on a pool
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
|
||||||
{
|
{
|
||||||
enum svcpool_state state = pool->sp_state;
|
SVCPOOL *pool;
|
||||||
|
int threads, error, g;
|
||||||
|
|
||||||
if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
|
pool = oidp->oid_arg1;
|
||||||
return (FALSE);
|
threads = 0;
|
||||||
return (TRUE);
|
mtx_lock(&pool->sp_lock);
|
||||||
|
for (g = 0; g < pool->sp_groupcount; g++)
|
||||||
|
threads += pool->sp_groups[g].sg_threadcount;
|
||||||
|
mtx_unlock(&pool->sp_lock);
|
||||||
|
error = sysctl_handle_int(oidp, &threads, 0, req);
|
||||||
|
return (error);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -214,7 +250,7 @@ static int
|
|||||||
svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
|
svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool;
|
SVCPOOL *pool;
|
||||||
int newminthreads, error, n;
|
int newminthreads, error, g;
|
||||||
|
|
||||||
pool = oidp->oid_arg1;
|
pool = oidp->oid_arg1;
|
||||||
newminthreads = pool->sp_minthreads;
|
newminthreads = pool->sp_minthreads;
|
||||||
@ -223,21 +259,11 @@ svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
|
|||||||
if (newminthreads > pool->sp_maxthreads)
|
if (newminthreads > pool->sp_maxthreads)
|
||||||
return (EINVAL);
|
return (EINVAL);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&pool->sp_lock);
|
||||||
if (newminthreads > pool->sp_minthreads
|
|
||||||
&& svcpool_active(pool)) {
|
|
||||||
/*
|
|
||||||
* If the pool is running and we are
|
|
||||||
* increasing, create some more threads now.
|
|
||||||
*/
|
|
||||||
n = newminthreads - pool->sp_threadcount;
|
|
||||||
if (n > 0) {
|
|
||||||
mtx_unlock(&pool->sp_lock);
|
|
||||||
while (n--)
|
|
||||||
svc_new_thread(pool);
|
|
||||||
mtx_lock(&pool->sp_lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pool->sp_minthreads = newminthreads;
|
pool->sp_minthreads = newminthreads;
|
||||||
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
|
pool->sp_groups[g].sg_minthreads = max(1,
|
||||||
|
pool->sp_minthreads / pool->sp_groupcount);
|
||||||
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&pool->sp_lock);
|
||||||
}
|
}
|
||||||
return (error);
|
return (error);
|
||||||
@ -250,8 +276,7 @@ static int
|
|||||||
svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
|
svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool;
|
SVCPOOL *pool;
|
||||||
SVCTHREAD *st;
|
int newmaxthreads, error, g;
|
||||||
int newmaxthreads, error;
|
|
||||||
|
|
||||||
pool = oidp->oid_arg1;
|
pool = oidp->oid_arg1;
|
||||||
newmaxthreads = pool->sp_maxthreads;
|
newmaxthreads = pool->sp_maxthreads;
|
||||||
@ -260,17 +285,11 @@ svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
|
|||||||
if (newmaxthreads < pool->sp_minthreads)
|
if (newmaxthreads < pool->sp_minthreads)
|
||||||
return (EINVAL);
|
return (EINVAL);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&pool->sp_lock);
|
||||||
if (newmaxthreads < pool->sp_maxthreads
|
|
||||||
&& svcpool_active(pool)) {
|
|
||||||
/*
|
|
||||||
* If the pool is running and we are
|
|
||||||
* decreasing, wake up some idle threads to
|
|
||||||
* encourage them to exit.
|
|
||||||
*/
|
|
||||||
LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
|
|
||||||
cv_signal(&st->st_cond);
|
|
||||||
}
|
|
||||||
pool->sp_maxthreads = newmaxthreads;
|
pool->sp_maxthreads = newmaxthreads;
|
||||||
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
|
pool->sp_groups[g].sg_maxthreads = max(1,
|
||||||
|
pool->sp_maxthreads / pool->sp_groupcount);
|
||||||
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&pool->sp_lock);
|
||||||
}
|
}
|
||||||
return (error);
|
return (error);
|
||||||
@ -283,13 +302,17 @@ void
|
|||||||
xprt_register(SVCXPRT *xprt)
|
xprt_register(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCPOOL *pool = xprt->xp_pool;
|
||||||
|
SVCGROUP *grp;
|
||||||
|
int g;
|
||||||
|
|
||||||
SVC_ACQUIRE(xprt);
|
SVC_ACQUIRE(xprt);
|
||||||
mtx_lock(&pool->sp_lock);
|
g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
|
||||||
|
xprt->xp_group = grp = &pool->sp_groups[g];
|
||||||
|
mtx_lock(&grp->sg_lock);
|
||||||
xprt->xp_registered = TRUE;
|
xprt->xp_registered = TRUE;
|
||||||
xprt->xp_active = FALSE;
|
xprt->xp_active = FALSE;
|
||||||
TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
|
TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -300,29 +323,29 @@ xprt_register(SVCXPRT *xprt)
|
|||||||
static void
|
static void
|
||||||
xprt_unregister_locked(SVCXPRT *xprt)
|
xprt_unregister_locked(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
|
|
||||||
mtx_assert(&pool->sp_lock, MA_OWNED);
|
mtx_assert(&grp->sg_lock, MA_OWNED);
|
||||||
KASSERT(xprt->xp_registered == TRUE,
|
KASSERT(xprt->xp_registered == TRUE,
|
||||||
("xprt_unregister_locked: not registered"));
|
("xprt_unregister_locked: not registered"));
|
||||||
xprt_inactive_locked(xprt);
|
xprt_inactive_locked(xprt);
|
||||||
TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
|
TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
|
||||||
xprt->xp_registered = FALSE;
|
xprt->xp_registered = FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
xprt_unregister(SVCXPRT *xprt)
|
xprt_unregister(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
if (xprt->xp_registered == FALSE) {
|
if (xprt->xp_registered == FALSE) {
|
||||||
/* Already unregistered by another thread */
|
/* Already unregistered by another thread */
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
xprt_unregister_locked(xprt);
|
xprt_unregister_locked(xprt);
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
|
|
||||||
SVC_RELEASE(xprt);
|
SVC_RELEASE(xprt);
|
||||||
}
|
}
|
||||||
@ -333,11 +356,11 @@ xprt_unregister(SVCXPRT *xprt)
|
|||||||
static int
|
static int
|
||||||
xprt_assignthread(SVCXPRT *xprt)
|
xprt_assignthread(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
SVCTHREAD *st;
|
SVCTHREAD *st;
|
||||||
|
|
||||||
mtx_assert(&pool->sp_lock, MA_OWNED);
|
mtx_assert(&grp->sg_lock, MA_OWNED);
|
||||||
st = LIST_FIRST(&pool->sp_idlethreads);
|
st = LIST_FIRST(&grp->sg_idlethreads);
|
||||||
if (st) {
|
if (st) {
|
||||||
LIST_REMOVE(st, st_ilink);
|
LIST_REMOVE(st, st_ilink);
|
||||||
SVC_ACQUIRE(xprt);
|
SVC_ACQUIRE(xprt);
|
||||||
@ -354,10 +377,10 @@ xprt_assignthread(SVCXPRT *xprt)
|
|||||||
* from a socket upcall). Don't create more
|
* from a socket upcall). Don't create more
|
||||||
* than one thread per second.
|
* than one thread per second.
|
||||||
*/
|
*/
|
||||||
if (pool->sp_state == SVCPOOL_ACTIVE
|
if (grp->sg_state == SVCPOOL_ACTIVE
|
||||||
&& pool->sp_lastcreatetime < time_uptime
|
&& grp->sg_lastcreatetime < time_uptime
|
||||||
&& pool->sp_threadcount < pool->sp_maxthreads) {
|
&& grp->sg_threadcount < grp->sg_maxthreads) {
|
||||||
pool->sp_state = SVCPOOL_THREADWANTED;
|
grp->sg_state = SVCPOOL_THREADWANTED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (FALSE);
|
return (FALSE);
|
||||||
@ -366,40 +389,40 @@ xprt_assignthread(SVCXPRT *xprt)
|
|||||||
void
|
void
|
||||||
xprt_active(SVCXPRT *xprt)
|
xprt_active(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
|
|
||||||
if (!xprt->xp_registered) {
|
if (!xprt->xp_registered) {
|
||||||
/*
|
/*
|
||||||
* Race with xprt_unregister - we lose.
|
* Race with xprt_unregister - we lose.
|
||||||
*/
|
*/
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!xprt->xp_active) {
|
if (!xprt->xp_active) {
|
||||||
xprt->xp_active = TRUE;
|
xprt->xp_active = TRUE;
|
||||||
if (xprt->xp_thread == NULL) {
|
if (xprt->xp_thread == NULL) {
|
||||||
if (!svc_request_space_available(pool) ||
|
if (!svc_request_space_available(xprt->xp_pool) ||
|
||||||
!xprt_assignthread(xprt))
|
!xprt_assignthread(xprt))
|
||||||
TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
|
TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
|
||||||
xp_alink);
|
xp_alink);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
xprt_inactive_locked(SVCXPRT *xprt)
|
xprt_inactive_locked(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
|
|
||||||
mtx_assert(&pool->sp_lock, MA_OWNED);
|
mtx_assert(&grp->sg_lock, MA_OWNED);
|
||||||
if (xprt->xp_active) {
|
if (xprt->xp_active) {
|
||||||
if (xprt->xp_thread == NULL)
|
if (xprt->xp_thread == NULL)
|
||||||
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
|
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
|
||||||
xprt->xp_active = FALSE;
|
xprt->xp_active = FALSE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -407,11 +430,11 @@ xprt_inactive_locked(SVCXPRT *xprt)
|
|||||||
void
|
void
|
||||||
xprt_inactive(SVCXPRT *xprt)
|
xprt_inactive(SVCXPRT *xprt)
|
||||||
{
|
{
|
||||||
SVCPOOL *pool = xprt->xp_pool;
|
SVCGROUP *grp = xprt->xp_group;
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
xprt_inactive_locked(xprt);
|
xprt_inactive_locked(xprt);
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -991,14 +1014,14 @@ svc_executereq(struct svc_req *rqstp)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
svc_checkidle(SVCPOOL *pool)
|
svc_checkidle(SVCGROUP *grp)
|
||||||
{
|
{
|
||||||
SVCXPRT *xprt, *nxprt;
|
SVCXPRT *xprt, *nxprt;
|
||||||
time_t timo;
|
time_t timo;
|
||||||
struct svcxprt_list cleanup;
|
struct svcxprt_list cleanup;
|
||||||
|
|
||||||
TAILQ_INIT(&cleanup);
|
TAILQ_INIT(&cleanup);
|
||||||
TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
|
TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
|
||||||
/*
|
/*
|
||||||
* Only some transports have idle timers. Don't time
|
* Only some transports have idle timers. Don't time
|
||||||
* something out which is just waking up.
|
* something out which is just waking up.
|
||||||
@ -1013,27 +1036,31 @@ svc_checkidle(SVCPOOL *pool)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
|
TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
|
||||||
SVC_RELEASE(xprt);
|
SVC_RELEASE(xprt);
|
||||||
}
|
}
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
svc_assign_waiting_sockets(SVCPOOL *pool)
|
svc_assign_waiting_sockets(SVCPOOL *pool)
|
||||||
{
|
{
|
||||||
|
SVCGROUP *grp;
|
||||||
SVCXPRT *xprt;
|
SVCXPRT *xprt;
|
||||||
|
int g;
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
while ((xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
|
grp = &pool->sp_groups[g];
|
||||||
|
mtx_lock(&grp->sg_lock);
|
||||||
|
while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
|
||||||
if (xprt_assignthread(xprt))
|
if (xprt_assignthread(xprt))
|
||||||
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
|
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
|
||||||
else
|
else
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -1067,8 +1094,9 @@ svc_request_space_available(SVCPOOL *pool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
svc_run_internal(SVCGROUP *grp, bool_t ismaster)
|
||||||
{
|
{
|
||||||
|
SVCPOOL *pool = grp->sg_pool;
|
||||||
SVCTHREAD *st, *stpref;
|
SVCTHREAD *st, *stpref;
|
||||||
SVCXPRT *xprt;
|
SVCXPRT *xprt;
|
||||||
enum xprt_stat stat;
|
enum xprt_stat stat;
|
||||||
@ -1083,35 +1111,34 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
STAILQ_INIT(&st->st_reqs);
|
STAILQ_INIT(&st->st_reqs);
|
||||||
cv_init(&st->st_cond, "rpcsvc");
|
cv_init(&st->st_cond, "rpcsvc");
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we are a new thread which was spawned to cope with
|
* If we are a new thread which was spawned to cope with
|
||||||
* increased load, set the state back to SVCPOOL_ACTIVE.
|
* increased load, set the state back to SVCPOOL_ACTIVE.
|
||||||
*/
|
*/
|
||||||
if (pool->sp_state == SVCPOOL_THREADSTARTING)
|
if (grp->sg_state == SVCPOOL_THREADSTARTING)
|
||||||
pool->sp_state = SVCPOOL_ACTIVE;
|
grp->sg_state = SVCPOOL_ACTIVE;
|
||||||
|
|
||||||
while (pool->sp_state != SVCPOOL_CLOSING) {
|
while (grp->sg_state != SVCPOOL_CLOSING) {
|
||||||
/*
|
/*
|
||||||
* Create new thread if requested.
|
* Create new thread if requested.
|
||||||
*/
|
*/
|
||||||
if (pool->sp_state == SVCPOOL_THREADWANTED) {
|
if (grp->sg_state == SVCPOOL_THREADWANTED) {
|
||||||
pool->sp_state = SVCPOOL_THREADSTARTING;
|
grp->sg_state = SVCPOOL_THREADSTARTING;
|
||||||
pool->sp_lastcreatetime = time_uptime;
|
grp->sg_lastcreatetime = time_uptime;
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
svc_new_thread(pool);
|
svc_new_thread(grp);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check for idle transports once per second.
|
* Check for idle transports once per second.
|
||||||
*/
|
*/
|
||||||
if (time_uptime > pool->sp_lastidlecheck) {
|
if (time_uptime > grp->sg_lastidlecheck) {
|
||||||
pool->sp_lastidlecheck = time_uptime;
|
grp->sg_lastidlecheck = time_uptime;
|
||||||
svc_checkidle(pool);
|
svc_checkidle(grp);
|
||||||
}
|
}
|
||||||
|
|
||||||
xprt = st->st_xprt;
|
xprt = st->st_xprt;
|
||||||
@ -1119,7 +1146,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
/*
|
/*
|
||||||
* Enforce maxthreads count.
|
* Enforce maxthreads count.
|
||||||
*/
|
*/
|
||||||
if (pool->sp_threadcount > pool->sp_maxthreads)
|
if (grp->sg_threadcount > grp->sg_maxthreads)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1128,22 +1155,22 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
* by a thread.
|
* by a thread.
|
||||||
*/
|
*/
|
||||||
if (svc_request_space_available(pool) &&
|
if (svc_request_space_available(pool) &&
|
||||||
(xprt = TAILQ_FIRST(&pool->sp_active)) != NULL) {
|
(xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
|
||||||
TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
|
TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
|
||||||
SVC_ACQUIRE(xprt);
|
SVC_ACQUIRE(xprt);
|
||||||
xprt->xp_thread = st;
|
xprt->xp_thread = st;
|
||||||
st->st_xprt = xprt;
|
st->st_xprt = xprt;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
|
LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
|
||||||
if (ismaster || (!ismaster &&
|
if (ismaster || (!ismaster &&
|
||||||
pool->sp_threadcount > pool->sp_minthreads))
|
grp->sg_threadcount > grp->sg_minthreads))
|
||||||
error = cv_timedwait_sig(&st->st_cond,
|
error = cv_timedwait_sig(&st->st_cond,
|
||||||
&pool->sp_lock, 5 * hz);
|
&grp->sg_lock, 5 * hz);
|
||||||
else
|
else
|
||||||
error = cv_wait_sig(&st->st_cond,
|
error = cv_wait_sig(&st->st_cond,
|
||||||
&pool->sp_lock);
|
&grp->sg_lock);
|
||||||
if (st->st_xprt == NULL)
|
if (st->st_xprt == NULL)
|
||||||
LIST_REMOVE(st, st_ilink);
|
LIST_REMOVE(st, st_ilink);
|
||||||
|
|
||||||
@ -1152,19 +1179,19 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
*/
|
*/
|
||||||
if (error == EWOULDBLOCK) {
|
if (error == EWOULDBLOCK) {
|
||||||
if (!ismaster
|
if (!ismaster
|
||||||
&& (pool->sp_threadcount
|
&& (grp->sg_threadcount
|
||||||
> pool->sp_minthreads)
|
> grp->sg_minthreads)
|
||||||
&& !st->st_xprt)
|
&& !st->st_xprt)
|
||||||
break;
|
break;
|
||||||
} else if (error) {
|
} else if (error) {
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
svc_exit(pool);
|
svc_exit(pool);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Drain the transport socket and queue up any RPCs.
|
* Drain the transport socket and queue up any RPCs.
|
||||||
@ -1196,7 +1223,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (rqstp == NULL && stat == XPRT_MOREREQS
|
} while (rqstp == NULL && stat == XPRT_MOREREQS
|
||||||
&& pool->sp_state != SVCPOOL_CLOSING);
|
&& grp->sg_state != SVCPOOL_CLOSING);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Move this transport to the end of the active list to
|
* Move this transport to the end of the active list to
|
||||||
@ -1204,16 +1231,16 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
* If this was the last queued request, svc_getreq will end
|
* If this was the last queued request, svc_getreq will end
|
||||||
* up calling xprt_inactive to remove from the active list.
|
* up calling xprt_inactive to remove from the active list.
|
||||||
*/
|
*/
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
xprt->xp_thread = NULL;
|
xprt->xp_thread = NULL;
|
||||||
st->st_xprt = NULL;
|
st->st_xprt = NULL;
|
||||||
if (xprt->xp_active) {
|
if (xprt->xp_active) {
|
||||||
if (!svc_request_space_available(pool) ||
|
if (!svc_request_space_available(pool) ||
|
||||||
!xprt_assignthread(xprt))
|
!xprt_assignthread(xprt))
|
||||||
TAILQ_INSERT_TAIL(&pool->sp_active,
|
TAILQ_INSERT_TAIL(&grp->sg_active,
|
||||||
xprt, xp_alink);
|
xprt, xp_alink);
|
||||||
}
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
SVC_RELEASE(xprt);
|
SVC_RELEASE(xprt);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1230,7 +1257,7 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
}
|
}
|
||||||
mtx_unlock(&st->st_lock);
|
mtx_unlock(&st->st_lock);
|
||||||
svc_change_space_used(pool, -sz);
|
svc_change_space_used(pool, -sz);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (st->st_xprt) {
|
if (st->st_xprt) {
|
||||||
@ -1238,46 +1265,43 @@ svc_run_internal(SVCPOOL *pool, bool_t ismaster)
|
|||||||
st->st_xprt = NULL;
|
st->st_xprt = NULL;
|
||||||
SVC_RELEASE(xprt);
|
SVC_RELEASE(xprt);
|
||||||
}
|
}
|
||||||
|
|
||||||
KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
|
KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
|
||||||
LIST_REMOVE(st, st_link);
|
|
||||||
pool->sp_threadcount--;
|
|
||||||
|
|
||||||
mtx_unlock(&pool->sp_lock);
|
|
||||||
|
|
||||||
mtx_destroy(&st->st_lock);
|
mtx_destroy(&st->st_lock);
|
||||||
cv_destroy(&st->st_cond);
|
cv_destroy(&st->st_cond);
|
||||||
mem_free(st, sizeof(*st));
|
mem_free(st, sizeof(*st));
|
||||||
|
|
||||||
|
grp->sg_threadcount--;
|
||||||
if (!ismaster)
|
if (!ismaster)
|
||||||
wakeup(pool);
|
wakeup(grp);
|
||||||
|
mtx_unlock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
svc_thread_start(void *arg)
|
svc_thread_start(void *arg)
|
||||||
{
|
{
|
||||||
|
|
||||||
svc_run_internal((SVCPOOL *) arg, FALSE);
|
svc_run_internal((SVCGROUP *) arg, FALSE);
|
||||||
kthread_exit();
|
kthread_exit();
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
svc_new_thread(SVCPOOL *pool)
|
svc_new_thread(SVCGROUP *grp)
|
||||||
{
|
{
|
||||||
|
SVCPOOL *pool = grp->sg_pool;
|
||||||
struct thread *td;
|
struct thread *td;
|
||||||
|
|
||||||
pool->sp_threadcount++;
|
grp->sg_threadcount++;
|
||||||
kthread_add(svc_thread_start, pool,
|
kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
|
||||||
pool->sp_proc, &td, 0, 0,
|
|
||||||
"%s: service", pool->sp_name);
|
"%s: service", pool->sp_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
svc_run(SVCPOOL *pool)
|
svc_run(SVCPOOL *pool)
|
||||||
{
|
{
|
||||||
int i;
|
int g, i;
|
||||||
struct proc *p;
|
struct proc *p;
|
||||||
struct thread *td;
|
struct thread *td;
|
||||||
|
SVCGROUP *grp;
|
||||||
|
|
||||||
p = curproc;
|
p = curproc;
|
||||||
td = curthread;
|
td = curthread;
|
||||||
@ -1285,35 +1309,56 @@ svc_run(SVCPOOL *pool)
|
|||||||
"%s: master", pool->sp_name);
|
"%s: master", pool->sp_name);
|
||||||
pool->sp_state = SVCPOOL_ACTIVE;
|
pool->sp_state = SVCPOOL_ACTIVE;
|
||||||
pool->sp_proc = p;
|
pool->sp_proc = p;
|
||||||
pool->sp_lastcreatetime = time_uptime;
|
|
||||||
pool->sp_threadcount = 1;
|
|
||||||
|
|
||||||
for (i = 1; i < pool->sp_minthreads; i++) {
|
/* Choose group count based on number of threads and CPUs. */
|
||||||
svc_new_thread(pool);
|
pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
|
||||||
|
min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
|
||||||
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
|
grp = &pool->sp_groups[g];
|
||||||
|
grp->sg_minthreads = max(1,
|
||||||
|
pool->sp_minthreads / pool->sp_groupcount);
|
||||||
|
grp->sg_maxthreads = max(1,
|
||||||
|
pool->sp_maxthreads / pool->sp_groupcount);
|
||||||
|
grp->sg_lastcreatetime = time_uptime;
|
||||||
}
|
}
|
||||||
|
|
||||||
svc_run_internal(pool, TRUE);
|
/* Starting threads */
|
||||||
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
|
grp = &pool->sp_groups[g];
|
||||||
|
for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
|
||||||
|
svc_new_thread(grp);
|
||||||
|
}
|
||||||
|
pool->sp_groups[0].sg_threadcount++;
|
||||||
|
svc_run_internal(&pool->sp_groups[0], TRUE);
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
/* Waiting for threads to stop. */
|
||||||
while (pool->sp_threadcount > 0)
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
|
grp = &pool->sp_groups[g];
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
|
while (grp->sg_threadcount > 0)
|
||||||
|
msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
|
||||||
|
mtx_unlock(&grp->sg_lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
svc_exit(SVCPOOL *pool)
|
svc_exit(SVCPOOL *pool)
|
||||||
{
|
{
|
||||||
|
SVCGROUP *grp;
|
||||||
SVCTHREAD *st;
|
SVCTHREAD *st;
|
||||||
|
int g;
|
||||||
|
|
||||||
mtx_lock(&pool->sp_lock);
|
|
||||||
|
|
||||||
if (pool->sp_state != SVCPOOL_CLOSING) {
|
|
||||||
pool->sp_state = SVCPOOL_CLOSING;
|
pool->sp_state = SVCPOOL_CLOSING;
|
||||||
LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
|
for (g = 0; g < pool->sp_groupcount; g++) {
|
||||||
|
grp = &pool->sp_groups[g];
|
||||||
|
mtx_lock(&grp->sg_lock);
|
||||||
|
if (grp->sg_state != SVCPOOL_CLOSING) {
|
||||||
|
grp->sg_state = SVCPOOL_CLOSING;
|
||||||
|
LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
|
||||||
cv_signal(&st->st_cond);
|
cv_signal(&st->st_cond);
|
||||||
}
|
}
|
||||||
|
mtx_unlock(&grp->sg_lock);
|
||||||
mtx_unlock(&pool->sp_lock);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool_t
|
bool_t
|
||||||
|
@ -137,6 +137,7 @@ struct xp_ops2 {
|
|||||||
|
|
||||||
#ifdef _KERNEL
|
#ifdef _KERNEL
|
||||||
struct __rpc_svcpool;
|
struct __rpc_svcpool;
|
||||||
|
struct __rpc_svcgroup;
|
||||||
struct __rpc_svcthread;
|
struct __rpc_svcthread;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -150,6 +151,7 @@ typedef struct __rpc_svcxprt {
|
|||||||
volatile u_int xp_refs;
|
volatile u_int xp_refs;
|
||||||
struct sx xp_lock;
|
struct sx xp_lock;
|
||||||
struct __rpc_svcpool *xp_pool; /* owning pool (see below) */
|
struct __rpc_svcpool *xp_pool; /* owning pool (see below) */
|
||||||
|
struct __rpc_svcgroup *xp_group; /* owning group (see below) */
|
||||||
TAILQ_ENTRY(__rpc_svcxprt) xp_link;
|
TAILQ_ENTRY(__rpc_svcxprt) xp_link;
|
||||||
TAILQ_ENTRY(__rpc_svcxprt) xp_alink;
|
TAILQ_ENTRY(__rpc_svcxprt) xp_alink;
|
||||||
bool_t xp_registered; /* xprt_register has been called */
|
bool_t xp_registered; /* xprt_register has been called */
|
||||||
@ -245,8 +247,6 @@ struct svc_loss_callout {
|
|||||||
};
|
};
|
||||||
TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
|
TAILQ_HEAD(svc_loss_callout_list, svc_loss_callout);
|
||||||
|
|
||||||
struct __rpc_svcthread;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Service request
|
* Service request
|
||||||
*/
|
*/
|
||||||
@ -296,7 +296,6 @@ typedef struct __rpc_svcthread {
|
|||||||
SVCXPRT *st_xprt; /* transport we are processing */
|
SVCXPRT *st_xprt; /* transport we are processing */
|
||||||
struct svc_reqlist st_reqs; /* RPC requests to execute */
|
struct svc_reqlist st_reqs; /* RPC requests to execute */
|
||||||
struct cv st_cond; /* sleeping for work */
|
struct cv st_cond; /* sleeping for work */
|
||||||
LIST_ENTRY(__rpc_svcthread) st_link; /* all threads list */
|
|
||||||
LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
|
LIST_ENTRY(__rpc_svcthread) st_ilink; /* idle threads list */
|
||||||
LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
|
LIST_ENTRY(__rpc_svcthread) st_alink; /* application thread list */
|
||||||
int st_p2; /* application workspace */
|
int st_p2; /* application workspace */
|
||||||
@ -304,6 +303,36 @@ typedef struct __rpc_svcthread {
|
|||||||
} SVCTHREAD;
|
} SVCTHREAD;
|
||||||
LIST_HEAD(svcthread_list, __rpc_svcthread);
|
LIST_HEAD(svcthread_list, __rpc_svcthread);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A thread group contain all information needed to assign subset of
|
||||||
|
* transports to subset of threads. On systems with many CPUs and many
|
||||||
|
* threads that allows to reduce lock congestion and improve performance.
|
||||||
|
* Hundreds of threads on dozens of CPUs sharing the single pool lock do
|
||||||
|
* not scale well otherwise.
|
||||||
|
*/
|
||||||
|
TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
|
||||||
|
enum svcpool_state {
|
||||||
|
SVCPOOL_INIT, /* svc_run not called yet */
|
||||||
|
SVCPOOL_ACTIVE, /* normal running state */
|
||||||
|
SVCPOOL_THREADWANTED, /* new service thread requested */
|
||||||
|
SVCPOOL_THREADSTARTING, /* new service thread started */
|
||||||
|
SVCPOOL_CLOSING /* svc_exit called */
|
||||||
|
};
|
||||||
|
typedef struct __rpc_svcgroup {
|
||||||
|
struct mtx_padalign sg_lock; /* protect the thread/req lists */
|
||||||
|
struct __rpc_svcpool *sg_pool;
|
||||||
|
enum svcpool_state sg_state; /* current pool state */
|
||||||
|
struct svcxprt_list sg_xlist; /* all transports in the group */
|
||||||
|
struct svcxprt_list sg_active; /* transports needing service */
|
||||||
|
struct svcthread_list sg_idlethreads; /* idle service threads */
|
||||||
|
|
||||||
|
int sg_minthreads; /* minimum service thread count */
|
||||||
|
int sg_maxthreads; /* maximum service thread count */
|
||||||
|
int sg_threadcount; /* current service thread count */
|
||||||
|
time_t sg_lastcreatetime; /* when we last started a thread */
|
||||||
|
time_t sg_lastidlecheck; /* when we last checked idle transports */
|
||||||
|
} SVCGROUP;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In the kernel, we can't use global variables to store lists of
|
* In the kernel, we can't use global variables to store lists of
|
||||||
* transports etc. since otherwise we could not have two unrelated RPC
|
* transports etc. since otherwise we could not have two unrelated RPC
|
||||||
@ -316,32 +345,18 @@ LIST_HEAD(svcthread_list, __rpc_svcthread);
|
|||||||
* this to support something similar to the Solaris multi-threaded RPC
|
* this to support something similar to the Solaris multi-threaded RPC
|
||||||
* server.
|
* server.
|
||||||
*/
|
*/
|
||||||
TAILQ_HEAD(svcxprt_list, __rpc_svcxprt);
|
|
||||||
enum svcpool_state {
|
|
||||||
SVCPOOL_INIT, /* svc_run not called yet */
|
|
||||||
SVCPOOL_ACTIVE, /* normal running state */
|
|
||||||
SVCPOOL_THREADWANTED, /* new service thread requested */
|
|
||||||
SVCPOOL_THREADSTARTING, /* new service thread started */
|
|
||||||
SVCPOOL_CLOSING /* svc_exit called */
|
|
||||||
};
|
|
||||||
typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
|
typedef SVCTHREAD *pool_assign_fn(SVCTHREAD *, struct svc_req *);
|
||||||
typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
|
typedef void pool_done_fn(SVCTHREAD *, struct svc_req *);
|
||||||
|
#define SVC_MAXGROUPS 16
|
||||||
typedef struct __rpc_svcpool {
|
typedef struct __rpc_svcpool {
|
||||||
struct mtx_padalign sp_lock; /* protect the transport lists */
|
struct mtx_padalign sp_lock; /* protect the transport lists */
|
||||||
const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */
|
const char *sp_name; /* pool name (e.g. "nfsd", "NLM" */
|
||||||
enum svcpool_state sp_state; /* current pool state */
|
enum svcpool_state sp_state; /* current pool state */
|
||||||
struct proc *sp_proc; /* process which is in svc_run */
|
struct proc *sp_proc; /* process which is in svc_run */
|
||||||
struct svcxprt_list sp_xlist; /* all transports in the pool */
|
|
||||||
struct svcxprt_list sp_active; /* transports needing service */
|
|
||||||
struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
|
struct svc_callout_list sp_callouts; /* (prog,vers)->dispatch list */
|
||||||
struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
|
struct svc_loss_callout_list sp_lcallouts; /* loss->dispatch list */
|
||||||
struct svcthread_list sp_threads; /* service threads */
|
|
||||||
struct svcthread_list sp_idlethreads; /* idle service threads */
|
|
||||||
int sp_minthreads; /* minimum service thread count */
|
int sp_minthreads; /* minimum service thread count */
|
||||||
int sp_maxthreads; /* maximum service thread count */
|
int sp_maxthreads; /* maximum service thread count */
|
||||||
int sp_threadcount; /* current service thread count */
|
|
||||||
time_t sp_lastcreatetime; /* when we last started a thread */
|
|
||||||
time_t sp_lastidlecheck; /* when we last checked idle transports */
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Hooks to allow an application to control request to thread
|
* Hooks to allow an application to control request to thread
|
||||||
@ -364,6 +379,10 @@ typedef struct __rpc_svcpool {
|
|||||||
|
|
||||||
struct replay_cache *sp_rcache; /* optional replay cache */
|
struct replay_cache *sp_rcache; /* optional replay cache */
|
||||||
struct sysctl_ctx_list sp_sysctl;
|
struct sysctl_ctx_list sp_sysctl;
|
||||||
|
|
||||||
|
int sp_groupcount; /* Number of groups in the pool. */
|
||||||
|
int sp_nextgroup; /* Next group to assign port. */
|
||||||
|
SVCGROUP sp_groups[SVC_MAXGROUPS]; /* Thread/port groups. */
|
||||||
} SVCPOOL;
|
} SVCPOOL;
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
@ -86,7 +86,8 @@ svc_create(
|
|||||||
rpcvers_t versnum, /* Version number */
|
rpcvers_t versnum, /* Version number */
|
||||||
const char *nettype) /* Networktype token */
|
const char *nettype) /* Networktype token */
|
||||||
{
|
{
|
||||||
int num = 0;
|
int g, num = 0;
|
||||||
|
SVCGROUP *grp;
|
||||||
SVCXPRT *xprt;
|
SVCXPRT *xprt;
|
||||||
struct netconfig *nconf;
|
struct netconfig *nconf;
|
||||||
void *handle;
|
void *handle;
|
||||||
@ -96,11 +97,14 @@ svc_create(
|
|||||||
return (0);
|
return (0);
|
||||||
}
|
}
|
||||||
while ((nconf = __rpc_getconf(handle)) != NULL) {
|
while ((nconf = __rpc_getconf(handle)) != NULL) {
|
||||||
mtx_lock(&pool->sp_lock);
|
for (g = 0; g < SVC_MAXGROUPS; g++) {
|
||||||
TAILQ_FOREACH(xprt, &pool->sp_xlist, xp_link) {
|
grp = &pool->sp_groups[g];
|
||||||
if (strcmp(xprt->xp_netid, nconf->nc_netid) == 0) {
|
mtx_lock(&grp->sg_lock);
|
||||||
|
TAILQ_FOREACH(xprt, &grp->sg_xlist, xp_link) {
|
||||||
|
if (strcmp(xprt->xp_netid, nconf->nc_netid))
|
||||||
|
continue;
|
||||||
/* Found an old one, use it */
|
/* Found an old one, use it */
|
||||||
mtx_unlock(&pool->sp_lock);
|
mtx_unlock(&grp->sg_lock);
|
||||||
(void) rpcb_unset(prognum, versnum, nconf);
|
(void) rpcb_unset(prognum, versnum, nconf);
|
||||||
if (svc_reg(xprt, prognum, versnum,
|
if (svc_reg(xprt, prognum, versnum,
|
||||||
dispatch, nconf) == FALSE) {
|
dispatch, nconf) == FALSE) {
|
||||||
@ -108,15 +112,15 @@ svc_create(
|
|||||||
"svc_create: could not register prog %u vers %u on %s\n",
|
"svc_create: could not register prog %u vers %u on %s\n",
|
||||||
(unsigned)prognum, (unsigned)versnum,
|
(unsigned)prognum, (unsigned)versnum,
|
||||||
nconf->nc_netid);
|
nconf->nc_netid);
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
} else {
|
} else {
|
||||||
num++;
|
num++;
|
||||||
mtx_lock(&pool->sp_lock);
|
mtx_lock(&grp->sg_lock);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mtx_unlock(&grp->sg_lock);
|
||||||
}
|
}
|
||||||
mtx_unlock(&pool->sp_lock);
|
|
||||||
if (xprt == NULL) {
|
if (xprt == NULL) {
|
||||||
/* It was not found. Now create a new one */
|
/* It was not found. Now create a new one */
|
||||||
xprt = svc_tp_create(pool, dispatch, prognum, versnum,
|
xprt = svc_tp_create(pool, dispatch, prognum, versnum,
|
||||||
|
Loading…
Reference in New Issue
Block a user