thread: Message passing now handled internally within a thread
Move the message ring and the polling into the thread itself. Change-Id: I5bc53c68bb1314fd69ed80cc98c5b86ce8be880f Signed-off-by: Ben Walker <benjamin.walker@intel.com> Reviewed-on: https://review.gerrithub.io/423768 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Chandler-Test-Pool: SPDK Automated Test System <sys_sgsw@intel.com> Reviewed-by: Darek Stojaczyk <dariusz.stojaczyk@intel.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
parent
0949668118
commit
d761ddbf57
@ -181,6 +181,7 @@ void spdk_thread_lib_fini(void);
|
||||
* \param msg_fn A function that may be called from any thread and is passed a function
|
||||
* pointer (spdk_thread_fn) that must be called on the same thread that spdk_allocate_thread
|
||||
* was called from.
|
||||
* DEPRECATED. Only used in tests. Pass NULL for this parameter.
|
||||
* \param start_poller_fn Function to be called to start a poller for the thread.
|
||||
* DEPRECATED. Only used in tests. Pass NULL for this parameter.
|
||||
* \param stop_poller_fn Function to be called to stop a poller for the thread.
|
||||
@ -208,14 +209,16 @@ struct spdk_thread *spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
|
||||
void spdk_free_thread(void);
|
||||
|
||||
/**
|
||||
* Perform one iteration worth of processing on the thread. This currently only
|
||||
* executes pollers.
|
||||
* Perform one iteration worth of processing on the thread. This includes
|
||||
* both expired and continuous pollers as well as messages.
|
||||
*
|
||||
* \param thread The thread to process
|
||||
* \param max_msgs The maximum number of messages that will be processed.
|
||||
* Use 0 to process the default number of messages (8).
|
||||
*
|
||||
* \return 1 if work was done. 0 if no work was done. -1 if unknown.
|
||||
*/
|
||||
int spdk_thread_poll(struct spdk_thread *thread);
|
||||
int spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs);
|
||||
|
||||
/**
|
||||
* Return the number of ticks until the next timed poller
|
||||
|
@ -173,27 +173,6 @@ _spdk_event_queue_run_batch(struct spdk_reactor *reactor)
|
||||
return count;
|
||||
}
|
||||
|
||||
static void
|
||||
_spdk_reactor_msg_passed(void *arg1, void *arg2)
|
||||
{
|
||||
spdk_thread_fn fn = arg1;
|
||||
|
||||
fn(arg2);
|
||||
}
|
||||
|
||||
static void
|
||||
_spdk_reactor_send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
|
||||
{
|
||||
struct spdk_event *event;
|
||||
struct spdk_reactor *reactor;
|
||||
|
||||
reactor = thread_ctx;
|
||||
|
||||
event = spdk_event_allocate(reactor->lcore, _spdk_reactor_msg_passed, fn, ctx);
|
||||
|
||||
spdk_event_call(event);
|
||||
}
|
||||
|
||||
static int
|
||||
get_rusage(void *arg)
|
||||
{
|
||||
@ -329,7 +308,7 @@ _spdk_reactor_run(void *arg)
|
||||
char thread_name[32];
|
||||
|
||||
snprintf(thread_name, sizeof(thread_name), "reactor_%u", reactor->lcore);
|
||||
thread = spdk_allocate_thread(_spdk_reactor_send_msg, NULL, NULL, reactor, thread_name);
|
||||
thread = spdk_allocate_thread(NULL, NULL, NULL, NULL, thread_name);
|
||||
if (!thread) {
|
||||
return -1;
|
||||
}
|
||||
@ -354,7 +333,7 @@ _spdk_reactor_run(void *arg)
|
||||
took_action = true;
|
||||
}
|
||||
|
||||
rc = spdk_thread_poll(thread);
|
||||
rc = spdk_thread_poll(thread, 0);
|
||||
if (rc != 0) {
|
||||
now = spdk_get_ticks();
|
||||
spdk_reactor_add_tsc_stats(reactor, rc, now);
|
||||
|
@ -49,6 +49,8 @@
|
||||
#include <pthread_np.h>
|
||||
#endif
|
||||
|
||||
#define SPDK_MSG_BATCH_SIZE 8
|
||||
|
||||
static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
struct io_device {
|
||||
@ -69,6 +71,13 @@ struct io_device {
|
||||
|
||||
static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
|
||||
|
||||
struct spdk_msg {
|
||||
spdk_thread_fn fn;
|
||||
void *arg;
|
||||
};
|
||||
|
||||
static struct spdk_mempool *g_spdk_msg_mempool = NULL;
|
||||
|
||||
enum spdk_poller_state {
|
||||
/* The poller is registered with a thread but not currently executing its fn. */
|
||||
SPDK_POLLER_STATE_WAITING,
|
||||
@ -114,6 +123,8 @@ struct spdk_thread {
|
||||
* Contains pollers running on this thread with a periodic timer.
|
||||
*/
|
||||
TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
|
||||
|
||||
struct spdk_ring *messages;
|
||||
};
|
||||
|
||||
static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
|
||||
@ -149,9 +160,40 @@ _set_thread_name(const char *thread_name)
|
||||
#endif
|
||||
}
|
||||
|
||||
static size_t
|
||||
_spdk_thread_lib_get_max_msg_cnt(uint8_t socket_count)
|
||||
{
|
||||
size_t cnt;
|
||||
|
||||
/* Try to make message ring fill at most 2MB of memory,
|
||||
* as some ring implementations may require physical address
|
||||
* contingency. We don't want to introduce a requirement of
|
||||
* at least 2 physically contiguous 2MB hugepages.
|
||||
*/
|
||||
cnt = spdk_min(262144 / socket_count, 262144 / 2);
|
||||
/* Take into account one extra element required by
|
||||
* some ring implementations.
|
||||
*/
|
||||
cnt -= 1;
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int
|
||||
spdk_thread_lib_init(void)
|
||||
{
|
||||
char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
|
||||
|
||||
snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
|
||||
g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
|
||||
_spdk_thread_lib_get_max_msg_cnt(1),
|
||||
sizeof(struct spdk_msg),
|
||||
SPDK_MEMPOOL_DEFAULT_CACHE_SIZE,
|
||||
SPDK_ENV_SOCKET_ID_ANY);
|
||||
|
||||
if (!g_spdk_msg_mempool) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -163,6 +205,10 @@ spdk_thread_lib_fini(void)
|
||||
TAILQ_FOREACH(dev, &g_io_devices, tailq) {
|
||||
SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
|
||||
}
|
||||
|
||||
if (g_spdk_msg_mempool) {
|
||||
spdk_mempool_free(g_spdk_msg_mempool);
|
||||
}
|
||||
}
|
||||
|
||||
struct spdk_thread *
|
||||
@ -206,6 +252,14 @@ spdk_allocate_thread(spdk_thread_pass_msg msg_fn,
|
||||
TAILQ_INIT(&thread->active_pollers);
|
||||
TAILQ_INIT(&thread->timer_pollers);
|
||||
|
||||
thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
|
||||
if (!thread->messages) {
|
||||
SPDK_ERRLOG("Unable to allocate memory for message ring\n");
|
||||
free(thread);
|
||||
pthread_mutex_unlock(&g_devlist_mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
g_thread_count++;
|
||||
if (name) {
|
||||
_set_thread_name(name);
|
||||
@ -247,11 +301,54 @@ spdk_free_thread(void)
|
||||
g_thread_count--;
|
||||
TAILQ_REMOVE(&g_threads, thread, tailq);
|
||||
free(thread->name);
|
||||
|
||||
if (thread->messages) {
|
||||
spdk_ring_free(thread->messages);
|
||||
}
|
||||
|
||||
free(thread);
|
||||
|
||||
pthread_mutex_unlock(&g_devlist_mutex);
|
||||
}
|
||||
|
||||
static inline uint32_t
|
||||
_spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
|
||||
{
|
||||
unsigned count, i;
|
||||
void *messages[SPDK_MSG_BATCH_SIZE];
|
||||
|
||||
#ifdef DEBUG
|
||||
/*
|
||||
* spdk_ring_dequeue() fills messages and returns how many entries it wrote,
|
||||
* so we will never actually read uninitialized data from events, but just to be sure
|
||||
* (and to silence a static analyzer false positive), initialize the array to NULL pointers.
|
||||
*/
|
||||
memset(messages, 0, sizeof(messages));
|
||||
#endif
|
||||
|
||||
if (max_msgs > 0) {
|
||||
max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
|
||||
} else {
|
||||
max_msgs = SPDK_MSG_BATCH_SIZE;
|
||||
}
|
||||
|
||||
count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
|
||||
if (count == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (i = 0; i < count; i++) {
|
||||
struct spdk_msg *msg = messages[i];
|
||||
|
||||
assert(msg != NULL);
|
||||
msg->fn(msg->arg);
|
||||
}
|
||||
|
||||
spdk_mempool_put_bulk(g_spdk_msg_mempool, messages, count);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
static void
|
||||
_spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
|
||||
{
|
||||
@ -275,16 +372,24 @@ _spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller
|
||||
}
|
||||
|
||||
int
|
||||
spdk_thread_poll(struct spdk_thread *thread)
|
||||
spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs)
|
||||
{
|
||||
uint32_t msg_count;
|
||||
struct spdk_poller *poller;
|
||||
int rc = 0;
|
||||
|
||||
msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
|
||||
if (msg_count) {
|
||||
rc = 1;
|
||||
}
|
||||
|
||||
poller = TAILQ_FIRST(&thread->active_pollers);
|
||||
if (poller) {
|
||||
int poller_rc;
|
||||
|
||||
TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
|
||||
poller->state = SPDK_POLLER_STATE_RUNNING;
|
||||
rc = poller->fn(poller->arg);
|
||||
poller_rc = poller->fn(poller->arg);
|
||||
if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
|
||||
free(poller);
|
||||
} else {
|
||||
@ -297,6 +402,10 @@ spdk_thread_poll(struct spdk_thread *thread)
|
||||
SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (poller_rc > rc) {
|
||||
rc = poller_rc;
|
||||
}
|
||||
}
|
||||
|
||||
poller = TAILQ_FIRST(&thread->timer_pollers);
|
||||
@ -382,9 +491,35 @@ spdk_thread_get_name(const struct spdk_thread *thread)
|
||||
void
|
||||
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_thread_fn fn, void *ctx)
|
||||
{
|
||||
thread->msg_fn(fn, ctx, thread->thread_ctx);
|
||||
}
|
||||
struct spdk_msg *msg;
|
||||
int rc;
|
||||
|
||||
if (!thread) {
|
||||
assert(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (thread->msg_fn) {
|
||||
thread->msg_fn(fn, ctx, thread->thread_ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
msg = spdk_mempool_get(g_spdk_msg_mempool);
|
||||
if (!msg) {
|
||||
assert(false);
|
||||
return;
|
||||
}
|
||||
|
||||
msg->fn = fn;
|
||||
msg->arg = ctx;
|
||||
|
||||
rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1);
|
||||
if (rc != 1) {
|
||||
assert(false);
|
||||
spdk_mempool_put(g_spdk_msg_mempool, msg);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
struct spdk_poller *
|
||||
spdk_poller_register(spdk_poller_fn fn,
|
||||
|
Loading…
x
Reference in New Issue
Block a user