kq domain + queue cache

This commit is contained in:
Charlie Root 2019-04-01 21:35:09 -04:00
parent eb525254bc
commit b2e5289a2d
5 changed files with 838 additions and 195 deletions

File diff suppressed because it is too large Load Diff

View File

@ -335,7 +335,7 @@ kern_thr_exit(struct thread *td)
* Release the event queues
*/
if (td->td_kevq_thred != NULL)
kevq_thred_drain(td->td_kevq_thred);
kevq_thred_drain(td->td_kevq_thred, td);
/*
* If all of the threads in a process call this routine to

View File

@ -224,7 +224,7 @@ SLIST_HEAD(klist, knote);
struct kqueue;
TAILQ_HEAD(kqlist, kqueue);
struct kevq;
SLIST_HEAD(kevqlist, kevq);
LIST_HEAD(kevqlist, kevq);
struct knlist {
struct klist kl_list;
@ -275,9 +275,15 @@ struct filterops {
void (*f_touch)(struct knote *kn, struct kevent *kev, u_long type);
};
/* The ioctl to set multithreaded mode
/*
* The ioctl to set multithreaded mode
*/
#define FKQMULTI _IO('f', 89)
#define FKQMULTI _IOW('f', 89, int)
/*
* KQ scheduler flags
*/
#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */
/*
* An in-flux knote cannot be dropped from its kq while the kq is
@ -294,8 +300,11 @@ struct knote {
struct knlist *kn_knlist; /* f_attach populated */
TAILQ_ENTRY(knote) kn_tqe;
struct kqueue *kn_kq; /* which kqueue we are on */
struct kevq *kn_org_kevq; /* the kevq that registered the knote */
struct kevq *kn_kevq; /* the kevq the knote is on */
/* used by the scheduler */
struct kevq *kn_org_kevq; /* the kevq that registered the knote */
struct kqdom *kn_kqd; /* the kqdomain the knote belongs to */
/* end scheduler */
struct kevent kn_kevent;
void *kn_hook;
int kn_hookid;
@ -338,7 +347,7 @@ struct kevent_copyops {
struct kevq_thred {
u_long kevq_hashmask; /* hash mask for kevqs */
struct kevqlist *kevq_hash; /* hash table for kevqs */
TAILQ_HEAD(, kevq) kevq_tq;
struct kevqlist kevq_list;
struct mtx lock; /* the lock for the kevq*/
};
@ -374,7 +383,7 @@ int kqfd_register(int fd, struct kevent *kev, struct thread *p,
int kqueue_add_filteropts(int filt, struct filterops *filtops);
int kqueue_del_filteropts(int filt);
void kevq_thred_drain(struct kevq_thred *kevq_th);
void kevq_thred_drain(struct kevq_thred *kevq_th, struct thread *td);
#else /* !_KERNEL */
#include <sys/cdefs.h>

View File

@ -41,10 +41,12 @@
#define KQEXTENT 256 /* linear growth by this amount */
struct kevq {
SLIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */
TAILQ_ENTRY(kevq) kq_e; /* entry into kqueue's list */
TAILQ_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's TAILQ */
struct kqueue *kq; /* the kq that the kevq belongs to */
LIST_ENTRY(kevq) kevq_th_e; /* entry into kevq_thred's hashtable */
LIST_ENTRY(kevq) kqd_e; /* entry into kqdomain */
LIST_ENTRY(kevq) kq_e; /* entry into kq */
LIST_ENTRY(kevq) kevq_th_tqe; /* entry into kevq_thred's kevq_list */
struct kqueue *kq; /* the kq that the kevq belongs to */
struct kqdom *kevq_kqd; /* the kq domain the kevq is on */
struct kevq_thred *kevq_th; /* the thread that the kevq belongs to */
struct mtx lock; /* the lock for the kevq */
TAILQ_HEAD(, knote) kn_head; /* list of pending knotes */
@ -54,6 +56,26 @@ struct kevq {
#define KEVQ_RDY 0x04
int kevq_state;
int kevq_refcnt;
/* Used by the scheduler */
struct timespec kevq_avg_lat;
struct timespec kevq_last_kev;
int kevq_last_nkev;
};
/* TODO: assumed that threads don't get rescheduled across cores */
struct kqdom {
struct mtx kqd_lock;
TAILQ_ENTRY(kqdom) child_e;
struct kqdom *parent;
int id;
struct timespec kqd_avg_lat;
cpuset_t cpu_mask;
int num_children;
int num_kevq;
TAILQ_HEAD(, kqdom) children;
struct kevqlist kqd_kevqlist; /* list of kevqs on the kdomain, only set for leaf domains*/
struct kevq *kqd_ckevq;
};
struct kqueue {
@ -75,10 +97,15 @@ struct kqueue {
struct klist *kq_knlist; /* list of knotes */
u_long kq_knhashmask; /* size of knhash */
struct klist *kq_knhash; /* hash table for knotes */
TAILQ_HEAD(, kevq) kq_kevqlist; /* list of kevqs interested in the kqueue */
struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue */
struct kevq *kq_kevq; /* the kevq for kq, always created, act as buffer queue in multithreaded mode */
struct task kq_task;
struct ucred *kq_cred;
/* scheduling stuff */
struct kevqlist kq_kevqlist; /* list of kevqs for fall-back round robbin */
struct kqdom *kq_kqd; /* root domain */
struct kevq *kq_ckevq; /* current kevq for multithreaded kqueue, used for round robbin */
int kq_sched_flags; /* Scheduler flag for the KQ */
};
#endif /* !_SYS_EVENTVAR_H_ */

View File

@ -21,14 +21,27 @@
#include <sys/ioctl.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/sysctl.h>
#include <pthread_np.h>
#define FKQMULTI _IO('f', 89)
#define TEST_DEBUG
/*
* The ioctl to set multithreaded mode
*/
#define FKQMULTI _IOW('f', 89, int)
/*
* KQ scheduler flags
*/
#define KQ_SCHED_QUEUE 0x1 /* make kq affinitize the knote depending on the cpu it's scheduled */
//#define TEST_DEBUG
struct thread_info {
pthread_t thrd;
int can_crash;
pthread_mutex_t lock;
int group_id;
int evcnt;
int tid;
};
@ -37,8 +50,8 @@ struct thread_info {
* Read test
*/
#define THREAD_CNT (32)
#define PACKET_CNT (1000)
#define THREAD_CNT (16)
#define PACKET_CNT (1600)
int g_kqfd;
int g_sockfd[2];
@ -98,7 +111,9 @@ socket_push(int sockfd, char ch)
}
}
/* for multi threaded read */
/***************************
* Read test
***************************/
static void*
test_socket_read_thrd(void* args)
{
@ -127,6 +142,11 @@ test_socket_read_thrd(void* args)
sem_post(&g_sem_driver);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
sem_post(&g_sem_driver);
pthread_exit(0);
}
@ -184,14 +204,19 @@ test_socket_read(void)
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for(int i = 0; i < PACKET_CNT; i++) {
for(int i = 0; i < THREAD_CNT; i++) {
socket_push(g_sockfd[1], 'e');
sem_wait(&g_sem_driver);
}
for (int i = 0; i < THREAD_CNT; i++) {
pthread_join(g_thrd_info[i].thrd, NULL);
}
#ifdef TEST_DEBUG
printf("READ_M: clearing kevent...\n");
#endif
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
@ -202,7 +227,205 @@ test_socket_read(void)
#endif
err(1, "kevent_delete");
}
#ifdef TEST_DEBUG
printf("READ_M: closing sockets...\n");
#endif
close(g_sockfd[0]);
close(g_sockfd[1]);
success();
}
/***************************
* Queue test
***************************/
#define THREAD_QUEUE_CNT (4)
#define PACKET_QUEUE_CNT (1000)
static int
get_ncpu()
{
int mib[4];
int numcpu;
size_t len = sizeof(numcpu);
mib[0] = CTL_HW;
mib[1] = HW_NCPU;
sysctl(mib, 2, &numcpu, &len, NULL, 0);
if (numcpu < 1)
{
err(1, "< 1 cpu detected");
}
return numcpu;
}
static void*
test_socket_queue_thrd(void* args)
{
struct thread_info *info = (struct thread_info *) args;
char dat;
struct kevent *ret;
while (1) {
#ifdef TEST_DEBUG
printf("READ_M: thread %d waiting for events\n", info->tid);
#endif
ret = kevent_get(g_kqfd);
#ifdef TEST_DEBUG
printf("READ_M: thread %d woke up\n", info->tid);
#endif
dat = socket_pop(ret->ident);
free(ret);
if (dat == 'e')
break;
info->evcnt++;
/* signal the driver */
sem_post(&g_sem_driver);
}
#ifdef TEST_DEBUG
printf("READ_M: thread %d exiting\n", info->tid);
#endif
sem_post(&g_sem_driver);
pthread_exit(0);
}
static void
test_socket_queue(void)
{
int error = 0;
const char *test_id = "[Multi][Queue]kevent(EVFILT_READ)";
test_begin(test_id);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, &g_sockfd[0]) < 0)
err(1, "kevent_read socket");
struct kevent kev;
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &g_sockfd[0]);
sem_init(&g_sem_driver, 0, 0);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent add failed with %d\n", errno);
#endif
err(1, "kevent_add");
}
cpuset_t cpuset;
int ncpu = get_ncpu();
int tid = 0;
#ifdef TEST_DEBUG
printf("READ_M: detected %d cores...\n", ncpu);
#endif
struct thread_info **group = malloc(sizeof(struct thread_info*) * ncpu);
for (int i = 0; i < ncpu; i++) {
group[i] = malloc(sizeof(struct thread_info) * THREAD_QUEUE_CNT);
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
group[i][j].tid = tid;
tid++;
group[i][j].evcnt = 0;
group[i][j].group_id = i;
pthread_create(&group[i][j].thrd, NULL, test_socket_queue_thrd, &group[i][j]);
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
if (pthread_setaffinity_np(group[i][j].thrd, sizeof(cpuset_t), &cpuset) < 0) {
err(1, "thread_affinity");
}
#ifdef TEST_DEBUG
printf("READ_M: created and affinitized thread %d to core group %d\n", group[i][j].tid, i);
#endif
}
}
#ifdef TEST_DEBUG
printf("READ_M: waiting for threads to wait on KQ...\n");
#endif
sleep(3);
int affinity_group = -1;
for(int k = 1; k <= PACKET_QUEUE_CNT; k++) {
#ifdef TEST_DEBUG
printf("READ_M: processing packet %d\n", k);
#endif
socket_push(g_sockfd[1], '.');
/* wait for thread events */
sem_wait(&g_sem_driver);
/* basically only one group should get events, do this for now, ideally we can have a table that remembers each knote's affinity*/
for(int i = 0; i < ncpu; i++) {
int sum = 0;
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
sum += group[i][j].evcnt;
}
if (sum != 0 && affinity_group == -1) {
affinity_group = i;
}
#ifdef TEST_DEBUG
printf("READ_M: group %d sum %d, affinity group: %d\n", i, sum, affinity_group);
#endif
if (i == affinity_group) {
if (sum != k) {
err(1, "affinity group sum != 1");
}
} else {
if (sum != 0) {
err(1, "non-affinity group sum != 0");
}
}
}
}
#ifdef TEST_DEBUG
printf("READ_M: finished testing, system shutting down...\n");
#endif
for(int i = 0; i < THREAD_QUEUE_CNT * ncpu; i++) {
socket_push(g_sockfd[1], 'e');
sem_wait(&g_sem_driver);
}
for (int i = 0; i < ncpu; i++) {
for (int j = 0; j < THREAD_QUEUE_CNT; j++) {
pthread_join(group[i][j].thrd, NULL);
}
free(group[i]);
}
free(group);
#ifdef TEST_DEBUG
printf("READ_M: clearing kevent...\n");
#endif
EV_SET(&kev, g_sockfd[0], EVFILT_READ, EV_DELETE, 0, 0, &g_sockfd[0]);
error = kevent(g_kqfd, &kev, 1, NULL, 0, NULL);
if (error == -1) {
#ifdef TEST_DEBUG
printf("READ_M: kevent delete failed with %d\n", errno);
#endif
err(1, "kevent_delete");
}
#ifdef TEST_DEBUG
printf("READ_M: closing sockets...\n");
#endif
close(g_sockfd[0]);
close(g_sockfd[1]);
@ -210,17 +433,16 @@ test_socket_read(void)
}
/*
/***************************
* Brutal test
*/
***************************/
#define THREAD_BRUTE_CNT (32)
#define SOCK_BRUTE_CNT (64)
#define PACKET_BRUTE_CNT (10000)
#define THREAD_EXIT_PROB (50)
#define RAND_SLEEP (29)
#define RAND_SEND_SLEEP (13)
#define RAND_SEND_SLEEP (7)
int brute_sockfd[SOCK_BRUTE_CNT][2];
@ -365,11 +587,13 @@ test_socket_brutal()
success();
}
void
test_evfilt_read_m()
{
int flags = 0;
g_kqfd = kqueue();
int error = ioctl(g_kqfd, FKQMULTI);
int error = ioctl(g_kqfd, FKQMULTI, &flags);
if (error == -1) {
err(1, "ioctl");
}
@ -378,4 +602,14 @@ test_evfilt_read_m()
test_socket_brutal();
close(g_kqfd);
/* test scheduler */
flags = KQ_SCHED_QUEUE;
g_kqfd = kqueue();
error = ioctl(g_kqfd, FKQMULTI, &flags);
test_socket_queue();
test_socket_brutal();
close(g_kqfd);
}