lib/thread: thread_poll() polls until the exiting thread is exited
Extract _spdk_thread_exit() from spdk_thread_exit() and _spdk_thread_poll() calls _spdk_thread_exit() if the thread is in the exiting state. spdk_thread_exit() changes to move the state to the exiting state. The spdk_thread_poll() loop will end after the thread moves to the exited state because the caller of spdk_thread_poll() will check if the thread is in the exited state, and break the loop if true. If the user does not call spdk_thread_exit() explicitly, the reactor has to terminate all existing threads at its shutdown. In this case, multiple threads may have some dependency to release I/O channels or unregister pollers. So the reactor has the large two loops, the first loop calls spdk_thread_exit() on all threads, the second loop calls spdk_thread_destroy() if exited or spdk_thread_poll() otherwise for each thread until all threads are destroyed. Besides, change the return value of spdk_thread_exit() to return always 0. Keep it for ABI compatibility. Change ERRLOG to INFOLOG for _spdk_thread_exit() because it is called repeatedly now. Remove the check of I/O reference count from _spdk_thread_exit() because _free_thread() cannot free I/O channel. Refine the unit test accordingly. Fixes issue #1288. Signed-off-by: Shuhei Matsumoto <shuhei.matsumoto.xt@hitachi.com> Change-Id: Iee5fb984a96bfac53110fe991dd994ded31dffa4 Reviewed-on: https://review.spdk.io/gerrit/c/spdk/spdk/+/1423 Tested-by: SPDK CI Jenkins <sys_sgci@intel.com> Reviewed-by: Aleksey Marchuk <alexeymar@mellanox.com> Reviewed-by: Jim Harris <james.r.harris@intel.com>
This commit is contained in:
parent
282b1505e5
commit
e9aec6746a
@ -267,13 +267,15 @@ void spdk_set_thread(struct spdk_thread *thread);
|
||||
* spdk_poller_register(), and spdk_get_io_channel() calls. May only be called
|
||||
* within an spdk poller or message.
|
||||
*
|
||||
* All I/O channel references associated with the thread must be released using
|
||||
* spdk_put_io_channel(), and all active pollers associated with the thread must
|
||||
* be unregistered using spdk_poller_unregister(), prior to calling this function.
|
||||
* All I/O channel references associated with the thread must be released
|
||||
* using spdk_put_io_channel(), and all active pollers associated with the thread
|
||||
* should be unregistered using spdk_poller_unregister(), prior to calling
|
||||
* this function. This function will complete these processing. The completion can
|
||||
* be queried by spdk_thread_is_exited().
|
||||
*
|
||||
* \param thread The thread to destroy.
|
||||
*
|
||||
* \return 0 on success, negated errno on failure.
|
||||
* \return always 0. (return value was deprecated but keep it for ABI compatibility.)
|
||||
*/
|
||||
int spdk_thread_exit(struct spdk_thread *thread);
|
||||
|
||||
|
@ -366,7 +366,6 @@ _spdk_reactor_run(void *arg)
|
||||
struct spdk_thread *thread;
|
||||
struct spdk_lw_thread *lw_thread, *tmp;
|
||||
char thread_name[32];
|
||||
int rc __attribute__((unused));
|
||||
|
||||
SPDK_NOTICELOG("Reactor started on core %u\n", reactor->lcore);
|
||||
|
||||
@ -386,18 +385,25 @@ _spdk_reactor_run(void *arg)
|
||||
}
|
||||
}
|
||||
|
||||
TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
|
||||
TAILQ_FOREACH(lw_thread, &reactor->threads, link) {
|
||||
thread = spdk_thread_get_from_ctx(lw_thread);
|
||||
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
|
||||
assert(reactor->thread_count > 0);
|
||||
reactor->thread_count--;
|
||||
spdk_set_thread(thread);
|
||||
rc = spdk_thread_exit(thread);
|
||||
assert(rc == 0);
|
||||
while (!spdk_thread_is_exited(thread)) {
|
||||
spdk_thread_poll(thread, 0, 0);
|
||||
spdk_thread_exit(thread);
|
||||
}
|
||||
|
||||
while (!TAILQ_EMPTY(&reactor->threads)) {
|
||||
TAILQ_FOREACH_SAFE(lw_thread, &reactor->threads, link, tmp) {
|
||||
thread = spdk_thread_get_from_ctx(lw_thread);
|
||||
spdk_set_thread(thread);
|
||||
if (spdk_thread_is_exited(thread)) {
|
||||
TAILQ_REMOVE(&reactor->threads, lw_thread, link);
|
||||
assert(reactor->thread_count > 0);
|
||||
reactor->thread_count--;
|
||||
spdk_thread_destroy(thread);
|
||||
} else {
|
||||
spdk_thread_poll(thread, 0, 0);
|
||||
}
|
||||
}
|
||||
spdk_thread_destroy(thread);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -316,7 +316,7 @@ spdk_set_thread(struct spdk_thread *thread)
|
||||
tls_thread = thread;
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
_spdk_thread_exit(struct spdk_thread *thread)
|
||||
{
|
||||
struct spdk_poller *poller;
|
||||
@ -324,36 +324,37 @@ _spdk_thread_exit(struct spdk_thread *thread)
|
||||
|
||||
TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
|
||||
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
|
||||
SPDK_ERRLOG("thread %s still has active poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return -EBUSY;
|
||||
SPDK_INFOLOG(SPDK_LOG_THREAD,
|
||||
"thread %s still has active poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
|
||||
if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
|
||||
SPDK_ERRLOG("thread %s still has active timed poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return -EBUSY;
|
||||
SPDK_INFOLOG(SPDK_LOG_THREAD,
|
||||
"thread %s still has active timed poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
|
||||
SPDK_ERRLOG("thread %s still has paused poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return -EBUSY;
|
||||
SPDK_INFOLOG(SPDK_LOG_THREAD,
|
||||
"thread %s still has paused poller %s\n",
|
||||
thread->name, poller->name);
|
||||
return;
|
||||
}
|
||||
|
||||
TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
|
||||
if (ch->ref != 0) {
|
||||
SPDK_ERRLOG("thread %s still has active channel for io_device %s\n",
|
||||
thread->name, ch->dev->name);
|
||||
return -EBUSY;
|
||||
}
|
||||
SPDK_INFOLOG(SPDK_LOG_THREAD,
|
||||
"thread %s still has channel for io_device %s\n",
|
||||
thread->name, ch->dev->name);
|
||||
return;
|
||||
}
|
||||
|
||||
thread->state = SPDK_THREAD_STATE_EXITED;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int
|
||||
@ -370,7 +371,8 @@ spdk_thread_exit(struct spdk_thread *thread)
|
||||
return 0;
|
||||
}
|
||||
|
||||
return _spdk_thread_exit(thread);
|
||||
thread->state = SPDK_THREAD_STATE_EXITING;
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool
|
||||
@ -661,6 +663,10 @@ spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
|
||||
|
||||
rc = _spdk_thread_poll(thread, max_msgs, now);
|
||||
|
||||
if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
|
||||
_spdk_thread_exit(thread);
|
||||
}
|
||||
|
||||
_spdk_thread_update_stats(thread, spdk_get_ticks(), now, rc);
|
||||
|
||||
tls_thread = orig_thread;
|
||||
|
@ -590,10 +590,7 @@ vhost_parse_core_mask(const char *mask, struct spdk_cpuset *cpumask)
|
||||
static void
|
||||
vhost_dev_thread_exit(void *arg1)
|
||||
{
|
||||
int rc __attribute__((unused));
|
||||
|
||||
rc = spdk_thread_exit(spdk_get_thread());
|
||||
assert(rc == 0);
|
||||
spdk_thread_exit(spdk_get_thread());
|
||||
}
|
||||
|
||||
int
|
||||
|
@ -102,20 +102,33 @@ allocate_threads(int num_threads)
|
||||
void
|
||||
free_threads(void)
|
||||
{
|
||||
uint32_t i;
|
||||
uint32_t i, num_threads;
|
||||
struct spdk_thread *thread;
|
||||
int rc __attribute__((unused));
|
||||
|
||||
for (i = 0; i < g_ut_num_threads; i++) {
|
||||
set_thread(i);
|
||||
thread = g_ut_threads[i].thread;
|
||||
rc = spdk_thread_exit(thread);
|
||||
assert(rc == 0);
|
||||
while (!spdk_thread_is_exited(thread)) {
|
||||
spdk_thread_poll(thread, 0, 0);
|
||||
spdk_thread_exit(thread);
|
||||
}
|
||||
|
||||
num_threads = g_ut_num_threads;
|
||||
|
||||
while (num_threads != 0) {
|
||||
for (i = 0; i < g_ut_num_threads; i++) {
|
||||
set_thread(i);
|
||||
thread = g_ut_threads[i].thread;
|
||||
if (thread == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (spdk_thread_is_exited(thread)) {
|
||||
g_ut_threads[i].thread = NULL;
|
||||
num_threads--;
|
||||
spdk_thread_destroy(thread);
|
||||
} else {
|
||||
spdk_thread_poll(thread, 0, 0);
|
||||
}
|
||||
}
|
||||
spdk_thread_destroy(thread);
|
||||
g_ut_threads[i].thread = NULL;
|
||||
}
|
||||
|
||||
g_ut_num_threads = 0;
|
||||
|
@ -805,41 +805,45 @@ thread_exit(void)
|
||||
{
|
||||
struct spdk_thread *thread;
|
||||
struct spdk_io_channel *ch;
|
||||
struct spdk_poller *poller;
|
||||
struct spdk_poller *poller1, *poller2;
|
||||
void *ctx;
|
||||
bool done1 = false, done2 = false, poller_run = false;
|
||||
bool done1 = false, done2 = false, poller1_run = false, poller2_run = false;
|
||||
int rc __attribute__((unused));
|
||||
|
||||
allocate_threads(6);
|
||||
allocate_threads(3);
|
||||
|
||||
/* Test all pending messages are reaped for the thread marked as exited. */
|
||||
/* Test if all pending messages are reaped for the exiting thread, and the
|
||||
* thread moves to the exited state.
|
||||
*/
|
||||
set_thread(0);
|
||||
thread = spdk_get_thread();
|
||||
|
||||
/* Sending message to thread 0 will be accepted. */
|
||||
set_thread(1);
|
||||
rc = spdk_thread_send_msg(thread, send_msg_cb, &done1);
|
||||
CU_ASSERT(rc == 0);
|
||||
CU_ASSERT(!done1);
|
||||
|
||||
/* Mark thread 0 as exited. */
|
||||
set_thread(0);
|
||||
/* Move thread 0 to the exiting state. */
|
||||
spdk_thread_exit(thread);
|
||||
|
||||
/* Sending message to thread 0 will be rejected. */
|
||||
set_thread(1);
|
||||
rc = spdk_thread_send_msg(thread, send_msg_cb, &done2);
|
||||
CU_ASSERT(rc == -EIO);
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == false);
|
||||
|
||||
/* Thread 0 will reap pending message. */
|
||||
/* Sending message to thread 0 will be still accepted. */
|
||||
rc = spdk_thread_send_msg(thread, send_msg_cb, &done2);
|
||||
CU_ASSERT(rc == 0);
|
||||
|
||||
/* Thread 0 will reap pending messages. */
|
||||
poll_thread(0);
|
||||
CU_ASSERT(done1 == true);
|
||||
CU_ASSERT(done2 == false);
|
||||
CU_ASSERT(done2 == true);
|
||||
|
||||
/* Test releasing I/O channel is reaped even after the thread is marked
|
||||
* as exited.
|
||||
/* Thread 0 will move to the exited state. */
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == true);
|
||||
|
||||
/* Test releasing I/O channel is reaped even after the thread moves to
|
||||
* the exiting state
|
||||
*/
|
||||
set_thread(2);
|
||||
set_thread(1);
|
||||
|
||||
spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);
|
||||
|
||||
@ -857,76 +861,63 @@ thread_exit(void)
|
||||
thread = spdk_get_thread();
|
||||
spdk_thread_exit(thread);
|
||||
|
||||
/* Thread will not be able to get I/O channel after it is marked as exited. */
|
||||
ch = spdk_get_io_channel(&g_device1);
|
||||
CU_ASSERT(ch == NULL);
|
||||
|
||||
poll_threads();
|
||||
CU_ASSERT(g_destroy_cb_calls == 1);
|
||||
|
||||
spdk_io_device_unregister(&g_device1, NULL);
|
||||
poll_threads();
|
||||
|
||||
/* Test 2nd spdk_thread_exit() call is ignored. */
|
||||
set_thread(3);
|
||||
|
||||
thread = spdk_get_thread();
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == 0);
|
||||
CU_ASSERT(spdk_thread_exit(thread) == 0);
|
||||
|
||||
/* Test if spdk_thread_exit() fails when there is any registered poller,
|
||||
* and if no poller is executed after the thread is marked as exited.
|
||||
/* Thread 1 will not move to the exited state yet because I/O channel release
|
||||
* does not complete yet.
|
||||
*/
|
||||
set_thread(4);
|
||||
thread = spdk_get_thread();
|
||||
|
||||
poller = spdk_poller_register(poller_run_done, &poller_run, 0);
|
||||
CU_ASSERT(poller != NULL);
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == -EBUSY);
|
||||
|
||||
spdk_poller_pause(poller);
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == -EBUSY);
|
||||
|
||||
poll_threads();
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == -EBUSY);
|
||||
|
||||
spdk_poller_unregister(&poller);
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == 0);
|
||||
|
||||
poll_threads();
|
||||
|
||||
CU_ASSERT(poller_run == false);
|
||||
|
||||
/* Test if spdk_thread_exit() fails when there is any active I/O channel. */
|
||||
set_thread(5);
|
||||
thread = spdk_get_thread();
|
||||
|
||||
spdk_io_device_register(&g_device1, create_cb_1, destroy_cb_1, sizeof(g_ctx1), NULL);
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == false);
|
||||
|
||||
/* Thread 1 will be able to get the another reference of I/O channel
|
||||
* even after the thread moves to the exiting state.
|
||||
*/
|
||||
g_create_cb_calls = 0;
|
||||
ch = spdk_get_io_channel(&g_device1);
|
||||
CU_ASSERT(g_create_cb_calls == 1);
|
||||
CU_ASSERT(ch != NULL);
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == -EBUSY);
|
||||
CU_ASSERT(g_create_cb_calls == 0);
|
||||
SPDK_CU_ASSERT_FATAL(ch != NULL);
|
||||
|
||||
ctx = spdk_io_channel_get_ctx(ch);
|
||||
CU_ASSERT(*(uint64_t *)ctx == g_ctx1);
|
||||
|
||||
g_destroy_cb_calls = 0;
|
||||
spdk_put_io_channel(ch);
|
||||
CU_ASSERT(g_destroy_cb_calls == 0);
|
||||
|
||||
CU_ASSERT(spdk_thread_exit(thread) == 0);
|
||||
|
||||
poll_threads();
|
||||
CU_ASSERT(g_destroy_cb_calls == 1);
|
||||
|
||||
spdk_io_device_unregister(&g_device1, NULL);
|
||||
/* Thread 1 will move to the exited state after I/O channel is released.
|
||||
* are released.
|
||||
*/
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == true);
|
||||
|
||||
CU_ASSERT(TAILQ_EMPTY(&thread->io_channels));
|
||||
spdk_io_device_unregister(&g_device1, NULL);
|
||||
poll_threads();
|
||||
|
||||
/* Test if unregistering poller is reaped for the exiting thread, and the
|
||||
* thread moves to the exited thread.
|
||||
*/
|
||||
set_thread(2);
|
||||
thread = spdk_get_thread();
|
||||
|
||||
poller1 = spdk_poller_register(poller_run_done, &poller1_run, 0);
|
||||
CU_ASSERT(poller1 != NULL);
|
||||
|
||||
spdk_poller_unregister(&poller1);
|
||||
|
||||
spdk_thread_exit(thread);
|
||||
|
||||
poller2 = spdk_poller_register(poller_run_done, &poller2_run, 0);
|
||||
|
||||
poll_threads();
|
||||
|
||||
CU_ASSERT(poller1_run == false);
|
||||
CU_ASSERT(poller2_run == true);
|
||||
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == false);
|
||||
|
||||
spdk_poller_unregister(&poller2);
|
||||
|
||||
poll_threads();
|
||||
|
||||
CU_ASSERT(spdk_thread_is_exited(thread) == true);
|
||||
|
||||
free_threads();
|
||||
}
|
||||
@ -1012,6 +1003,8 @@ thread_update_stats(void)
|
||||
|
||||
spdk_poller_unregister(&poller);
|
||||
|
||||
MOCK_CLEAR(spdk_get_ticks);
|
||||
|
||||
free_threads();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user