fusefs: give priority to FUSE_INTERRUPT operations
When interrupting a FUSE operation, send the FUSE_INTERRUPT op to the daemon ASAP, ahead of other unrelated operations. PR: 236530 Sponsored by: The FreeBSD Foundation
This commit is contained in:
parent
7b8de92822
commit
747b1535ae
@ -295,7 +295,7 @@ fuse_internal_fsync(struct vnode *vp,
|
|||||||
} else {
|
} else {
|
||||||
fuse_insert_callback(fdi.tick,
|
fuse_insert_callback(fdi.tick,
|
||||||
fuse_internal_fsync_callback);
|
fuse_internal_fsync_callback);
|
||||||
fuse_insert_message(fdi.tick);
|
fuse_insert_message(fdi.tick, false);
|
||||||
}
|
}
|
||||||
if (err == ENOSYS) {
|
if (err == ENOSYS) {
|
||||||
/* ENOSYS means "success, and don't call again" */
|
/* ENOSYS means "success, and don't call again" */
|
||||||
@ -593,7 +593,7 @@ fuse_internal_forget_send(struct mount *mp,
|
|||||||
ffi = fdi.indata;
|
ffi = fdi.indata;
|
||||||
ffi->nlookup = nlookup;
|
ffi->nlookup = nlookup;
|
||||||
|
|
||||||
fuse_insert_message(fdi.tick);
|
fuse_insert_message(fdi.tick, false);
|
||||||
fdisp_destroy(&fdi);
|
fdisp_destroy(&fdi);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -736,7 +736,7 @@ fuse_internal_send_init(struct fuse_data *data, struct thread *td)
|
|||||||
fiii->flags = FUSE_POSIX_LOCKS;
|
fiii->flags = FUSE_POSIX_LOCKS;
|
||||||
|
|
||||||
fuse_insert_callback(fdi.tick, fuse_internal_init_callback);
|
fuse_insert_callback(fdi.tick, fuse_internal_init_callback);
|
||||||
fuse_insert_message(fdi.tick);
|
fuse_insert_message(fdi.tick, false);
|
||||||
fdisp_destroy(&fdi);
|
fdisp_destroy(&fdi);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,7 +238,8 @@ fuse_interrupt_send(struct fuse_ticket *otick, int err)
|
|||||||
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
|
fuse_insert_callback(fdi.tick, fuse_interrupt_callback);
|
||||||
|
|
||||||
otick->irq_unique = fdi.tick->tk_unique;
|
otick->irq_unique = fdi.tick->tk_unique;
|
||||||
fuse_insert_message(fdi.tick);
|
/* Interrupt ops should be delivered ASAP */
|
||||||
|
fuse_insert_message(fdi.tick, true);
|
||||||
fdisp_destroy(&fdi);
|
fdisp_destroy(&fdi);
|
||||||
} else {
|
} else {
|
||||||
/* This ticket has already been interrupted */
|
/* This ticket has already been interrupted */
|
||||||
@ -660,8 +661,14 @@ fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t * handler)
|
|||||||
fuse_lck_mtx_unlock(ftick->tk_data->aw_mtx);
|
fuse_lck_mtx_unlock(ftick->tk_data->aw_mtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Insert a new upgoing ticket into the message queue
|
||||||
|
*
|
||||||
|
* If urgent is true, insert at the front of the queue. Otherwise, insert in
|
||||||
|
* FIFO order.
|
||||||
|
*/
|
||||||
void
|
void
|
||||||
fuse_insert_message(struct fuse_ticket *ftick)
|
fuse_insert_message(struct fuse_ticket *ftick, bool urgent)
|
||||||
{
|
{
|
||||||
if (ftick->tk_flag & FT_DIRTY) {
|
if (ftick->tk_flag & FT_DIRTY) {
|
||||||
panic("FUSE: ticket reused without being refreshed");
|
panic("FUSE: ticket reused without being refreshed");
|
||||||
@ -672,7 +679,10 @@ fuse_insert_message(struct fuse_ticket *ftick)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
fuse_lck_mtx_lock(ftick->tk_data->ms_mtx);
|
fuse_lck_mtx_lock(ftick->tk_data->ms_mtx);
|
||||||
fuse_ms_push(ftick);
|
if (urgent)
|
||||||
|
fuse_ms_push_head(ftick);
|
||||||
|
else
|
||||||
|
fuse_ms_push(ftick);
|
||||||
wakeup_one(ftick->tk_data);
|
wakeup_one(ftick->tk_data);
|
||||||
selwakeuppri(&ftick->tk_data->ks_rsel, PZERO + 1);
|
selwakeuppri(&ftick->tk_data->ks_rsel, PZERO + 1);
|
||||||
fuse_lck_mtx_unlock(ftick->tk_data->ms_mtx);
|
fuse_lck_mtx_unlock(ftick->tk_data->ms_mtx);
|
||||||
@ -972,7 +982,7 @@ fdisp_wait_answ(struct fuse_dispatcher *fdip)
|
|||||||
|
|
||||||
fdip->answ_stat = 0;
|
fdip->answ_stat = 0;
|
||||||
fuse_insert_callback(fdip->tick, fuse_standard_handler);
|
fuse_insert_callback(fdip->tick, fuse_standard_handler);
|
||||||
fuse_insert_message(fdip->tick);
|
fuse_insert_message(fdip->tick, false);
|
||||||
|
|
||||||
if ((err = fticket_wait_answer(fdip->tick))) {
|
if ((err = fticket_wait_answer(fdip->tick))) {
|
||||||
fuse_lck_mtx_lock(fdip->tick->tk_aw_mtx);
|
fuse_lck_mtx_lock(fdip->tick->tk_aw_mtx);
|
||||||
|
@ -285,6 +285,7 @@ fsess_opt_brokenio(struct mount *mp)
|
|||||||
return (fuse_fix_broken_io || (data->dataflags & FSESS_BROKENIO));
|
return (fuse_fix_broken_io || (data->dataflags & FSESS_BROKENIO));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Insert a new upgoing message */
|
||||||
static inline void
|
static inline void
|
||||||
fuse_ms_push(struct fuse_ticket *ftick)
|
fuse_ms_push(struct fuse_ticket *ftick)
|
||||||
{
|
{
|
||||||
@ -293,6 +294,15 @@ fuse_ms_push(struct fuse_ticket *ftick)
|
|||||||
STAILQ_INSERT_TAIL(&ftick->tk_data->ms_head, ftick, tk_ms_link);
|
STAILQ_INSERT_TAIL(&ftick->tk_data->ms_head, ftick, tk_ms_link);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Insert a new upgoing message to the front of the queue */
|
||||||
|
static inline void
|
||||||
|
fuse_ms_push_head(struct fuse_ticket *ftick)
|
||||||
|
{
|
||||||
|
mtx_assert(&ftick->tk_data->ms_mtx, MA_OWNED);
|
||||||
|
refcount_acquire(&ftick->tk_refcount);
|
||||||
|
STAILQ_INSERT_HEAD(&ftick->tk_data->ms_head, ftick, tk_ms_link);
|
||||||
|
}
|
||||||
|
|
||||||
static inline struct fuse_ticket *
|
static inline struct fuse_ticket *
|
||||||
fuse_ms_pop(struct fuse_data *data)
|
fuse_ms_pop(struct fuse_data *data)
|
||||||
{
|
{
|
||||||
@ -345,7 +355,7 @@ fuse_aw_pop(struct fuse_data *data)
|
|||||||
struct fuse_ticket *fuse_ticket_fetch(struct fuse_data *data);
|
struct fuse_ticket *fuse_ticket_fetch(struct fuse_data *data);
|
||||||
int fuse_ticket_drop(struct fuse_ticket *ftick);
|
int fuse_ticket_drop(struct fuse_ticket *ftick);
|
||||||
void fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t *handler);
|
void fuse_insert_callback(struct fuse_ticket *ftick, fuse_handler_t *handler);
|
||||||
void fuse_insert_message(struct fuse_ticket *ftick);
|
void fuse_insert_message(struct fuse_ticket *ftick, bool irq);
|
||||||
|
|
||||||
static inline bool
|
static inline bool
|
||||||
fuse_libabi_geq(struct fuse_data *data, uint32_t abi_maj, uint32_t abi_min)
|
fuse_libabi_geq(struct fuse_data *data, uint32_t abi_maj, uint32_t abi_min)
|
||||||
|
@ -592,7 +592,7 @@ fuse_vnop_create(struct vop_create_args *ap)
|
|||||||
fri->fh = fh_id;
|
fri->fh = fh_id;
|
||||||
fri->flags = flags;
|
fri->flags = flags;
|
||||||
fuse_insert_callback(fdip->tick, fuse_internal_forget_callback);
|
fuse_insert_callback(fdip->tick, fuse_internal_forget_callback);
|
||||||
fuse_insert_message(fdip->tick);
|
fuse_insert_message(fdip->tick, false);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
ASSERT_VOP_ELOCKED(*vpp, "fuse_vnop_create");
|
ASSERT_VOP_ELOCKED(*vpp, "fuse_vnop_create");
|
||||||
|
@ -46,6 +46,8 @@ using namespace testing;
|
|||||||
/* Initial size of files used by these tests */
|
/* Initial size of files used by these tests */
|
||||||
const off_t FILESIZE = 1000;
|
const off_t FILESIZE = 1000;
|
||||||
|
|
||||||
|
static sem_t *signaled_semaphore;
|
||||||
|
|
||||||
/* Don't do anything; all we care about is that the syscall gets interrupted */
|
/* Don't do anything; all we care about is that the syscall gets interrupted */
|
||||||
void sigusr2_handler(int __unused sig) {
|
void sigusr2_handler(int __unused sig) {
|
||||||
if (verbosity > 1) {
|
if (verbosity > 1) {
|
||||||
@ -63,6 +65,8 @@ void* killer(void* target) {
|
|||||||
if (verbosity > 1)
|
if (verbosity > 1)
|
||||||
printf("Signalling! thread %p\n", target);
|
printf("Signalling! thread %p\n", target);
|
||||||
pthread_kill((pthread_t)target, SIGUSR2);
|
pthread_kill((pthread_t)target, SIGUSR2);
|
||||||
|
if (signaled_semaphore != NULL)
|
||||||
|
sem_post(signaled_semaphore);
|
||||||
|
|
||||||
return(NULL);
|
return(NULL);
|
||||||
}
|
}
|
||||||
@ -112,13 +116,18 @@ void expect_write(uint64_t ino, uint64_t *write_unique)
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
void setup_interruptor(pthread_t self)
|
void setup_interruptor(pthread_t target)
|
||||||
{
|
{
|
||||||
ASSERT_NE(SIG_ERR, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
|
ASSERT_NE(SIG_ERR, signal(SIGUSR2, sigusr2_handler)) << strerror(errno);
|
||||||
ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)self))
|
ASSERT_EQ(0, pthread_create(&m_child, NULL, killer, (void*)target))
|
||||||
<< strerror(errno);
|
<< strerror(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void SetUp() {
|
||||||
|
signaled_semaphore = NULL;
|
||||||
|
FuseTest::SetUp();
|
||||||
|
}
|
||||||
|
|
||||||
void TearDown() {
|
void TearDown() {
|
||||||
struct sigaction sa;
|
struct sigaction sa;
|
||||||
|
|
||||||
@ -626,6 +635,98 @@ TEST_F(Interrupt, in_progress_read)
|
|||||||
/* Deliberately leak fd. close(2) will be tested in release.cc */
|
/* Deliberately leak fd. close(2) will be tested in release.cc */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* FUSE_INTERRUPT operations should take priority over other pending ops */
|
||||||
|
TEST_F(Interrupt, priority)
|
||||||
|
{
|
||||||
|
const char FULLPATH0[] = "mountpoint/some_file.txt";
|
||||||
|
const char RELPATH0[] = "some_file.txt";
|
||||||
|
const char FULLPATH1[] = "mountpoint/other_file.txt";
|
||||||
|
const char RELPATH1[] = "other_file.txt";
|
||||||
|
const char *CONTENTS = "ijklmnop";
|
||||||
|
Sequence seq;
|
||||||
|
ssize_t bufsize = strlen(CONTENTS);
|
||||||
|
uint64_t ino0 = 42, ino1 = 43;
|
||||||
|
int fd0, fd1;
|
||||||
|
uint64_t write_unique;
|
||||||
|
pthread_t self, th0;
|
||||||
|
sem_t sem0, sem1;
|
||||||
|
|
||||||
|
ASSERT_EQ(0, sem_init(&sem0, 0, 0)) << strerror(errno);
|
||||||
|
ASSERT_EQ(0, sem_init(&sem1, 0, 0)) << strerror(errno);
|
||||||
|
self = pthread_self();
|
||||||
|
|
||||||
|
expect_lookup(RELPATH0, ino0);
|
||||||
|
expect_open(ino0, 0, 1);
|
||||||
|
expect_lookup(RELPATH1, ino1);
|
||||||
|
expect_open(ino1, 0, 1);
|
||||||
|
EXPECT_CALL(*m_mock, process(
|
||||||
|
ResultOf([=](auto in) {
|
||||||
|
return (in->header.opcode == FUSE_WRITE &&
|
||||||
|
in->header.nodeid == ino0);
|
||||||
|
}, Eq(true)),
|
||||||
|
_)
|
||||||
|
).InSequence(seq)
|
||||||
|
.WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) {
|
||||||
|
write_unique = in->header.unique;
|
||||||
|
|
||||||
|
/* Let the next write proceed */
|
||||||
|
sem_post(&sem1);
|
||||||
|
|
||||||
|
/* Pause the daemon thread so it won't read the next op */
|
||||||
|
sem_wait(&sem0);
|
||||||
|
|
||||||
|
/* Finally, interrupt the original op */
|
||||||
|
out->header.error = -EINTR;
|
||||||
|
out->header.unique = write_unique;
|
||||||
|
out->header.len = sizeof(out->header);
|
||||||
|
})));
|
||||||
|
/*
|
||||||
|
* FUSE_INTERRUPT should be received before the second FUSE_WRITE, even
|
||||||
|
* though it was generated later
|
||||||
|
*/
|
||||||
|
EXPECT_CALL(*m_mock, process(
|
||||||
|
ResultOf([&](auto in) {
|
||||||
|
return (in->header.opcode == FUSE_INTERRUPT &&
|
||||||
|
in->body.interrupt.unique == write_unique);
|
||||||
|
}, Eq(true)),
|
||||||
|
_)
|
||||||
|
).InSequence(seq)
|
||||||
|
.WillOnce(Invoke(ReturnErrno(EAGAIN)));
|
||||||
|
EXPECT_CALL(*m_mock, process(
|
||||||
|
ResultOf([&](auto in) {
|
||||||
|
return (in->header.opcode == FUSE_WRITE &&
|
||||||
|
in->header.nodeid == ino1);
|
||||||
|
}, Eq(true)),
|
||||||
|
_)
|
||||||
|
).InSequence(seq)
|
||||||
|
.WillOnce(Invoke(ReturnImmediate([=](auto in , auto out) {
|
||||||
|
SET_OUT_HEADER_LEN(out, write);
|
||||||
|
out->body.write.size = in->body.write.size;
|
||||||
|
})));
|
||||||
|
|
||||||
|
fd0 = open(FULLPATH0, O_WRONLY);
|
||||||
|
ASSERT_LE(0, fd0) << strerror(errno);
|
||||||
|
fd1 = open(FULLPATH1, O_WRONLY);
|
||||||
|
ASSERT_LE(0, fd1) << strerror(errno);
|
||||||
|
|
||||||
|
/* Use a separate thread for the first write */
|
||||||
|
ASSERT_EQ(0, pthread_create(&th0, NULL, write0, (void*)(intptr_t)fd0))
|
||||||
|
<< strerror(errno);
|
||||||
|
|
||||||
|
signaled_semaphore = &sem0;
|
||||||
|
|
||||||
|
sem_wait(&sem1); /* Sequence the two writes */
|
||||||
|
setup_interruptor(th0);
|
||||||
|
ASSERT_EQ(bufsize, write(fd1, CONTENTS, bufsize)) << strerror(errno);
|
||||||
|
|
||||||
|
/* Wait awhile to make sure the signal generates no FUSE_INTERRUPT */
|
||||||
|
usleep(250'000);
|
||||||
|
|
||||||
|
pthread_join(th0, NULL);
|
||||||
|
sem_destroy(&sem1);
|
||||||
|
sem_destroy(&sem0);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the FUSE filesystem receives the FUSE_INTERRUPT operation before
|
* If the FUSE filesystem receives the FUSE_INTERRUPT operation before
|
||||||
* processing the original, then it should wait for "some timeout" for the
|
* processing the original, then it should wait for "some timeout" for the
|
||||||
|
Loading…
x
Reference in New Issue
Block a user