diff --git a/sys/fs/fuse/fuse_device.c b/sys/fs/fuse/fuse_device.c index 63c41edd1a19..97c10b2fdd28 100644 --- a/sys/fs/fuse/fuse_device.c +++ b/sys/fs/fuse/fuse_device.c @@ -205,11 +205,8 @@ fuse_device_filt_read(struct knote *kn, long hint) kn->kn_data = 1; ready = 1; } else if (STAILQ_FIRST(&data->ms_head)) { - /* - * There is at least one event to read. - * TODO: keep a counter of the number of events to read - */ - kn->kn_data = 1; + MPASS(data->ms_count >= 1); + kn->kn_data = data->ms_count; ready = 1; } else { ready = 0; diff --git a/sys/fs/fuse/fuse_ipc.c b/sys/fs/fuse/fuse_ipc.c index 4bbb6aa9d196..3c3aa82e6da5 100644 --- a/sys/fs/fuse/fuse_ipc.c +++ b/sys/fs/fuse/fuse_ipc.c @@ -204,6 +204,7 @@ fuse_interrupt_send(struct fuse_ticket *otick, int err) if (tick == otick) { STAILQ_REMOVE(&otick->tk_data->ms_head, tick, fuse_ticket, tk_ms_link); + otick->tk_data->ms_count--; otick->tk_ms_link.stqe_next = NULL; fuse_lck_mtx_unlock(data->ms_mtx); @@ -586,6 +587,7 @@ fdata_alloc(struct cdev *fdev, struct ucred *cred) data->fdev = fdev; mtx_init(&data->ms_mtx, "fuse message list mutex", NULL, MTX_DEF); STAILQ_INIT(&data->ms_head); + data->ms_count = 0; knlist_init_mtx(&data->ks_rsel.si_note, &data->ms_mtx); mtx_init(&data->aw_mtx, "fuse answer list mutex", NULL, MTX_DEF); TAILQ_INIT(&data->aw_head); diff --git a/sys/fs/fuse/fuse_ipc.h b/sys/fs/fuse/fuse_ipc.h index d323ded260e3..ff7ca0a2df61 100644 --- a/sys/fs/fuse/fuse_ipc.h +++ b/sys/fs/fuse/fuse_ipc.h @@ -180,6 +180,7 @@ struct fuse_data { struct mtx ms_mtx; STAILQ_HEAD(, fuse_ticket) ms_head; + int ms_count; struct mtx aw_mtx; TAILQ_HEAD(, fuse_ticket) aw_head; @@ -290,6 +291,7 @@ fuse_ms_push(struct fuse_ticket *ftick) mtx_assert(&ftick->tk_data->ms_mtx, MA_OWNED); refcount_acquire(&ftick->tk_refcount); STAILQ_INSERT_TAIL(&ftick->tk_data->ms_head, ftick, tk_ms_link); + ftick->tk_data->ms_count++; } /* Insert a new upgoing message to the front of the queue */ @@ -299,6 +301,7 @@ fuse_ms_push_head(struct fuse_ticket *ftick) mtx_assert(&ftick->tk_data->ms_mtx, MA_OWNED); refcount_acquire(&ftick->tk_refcount); STAILQ_INSERT_HEAD(&ftick->tk_data->ms_head, ftick, tk_ms_link); + ftick->tk_data->ms_count++; } static inline struct fuse_ticket * @@ -310,7 +313,9 @@ fuse_ms_pop(struct fuse_data *data) if ((ftick = STAILQ_FIRST(&data->ms_head))) { STAILQ_REMOVE_HEAD(&data->ms_head, tk_ms_link); + data->ms_count--; #ifdef INVARIANTS + MPASS(data->ms_count >= 0); ftick->tk_ms_link.stqe_next = NULL; #endif } diff --git a/tests/sys/fs/fusefs/dev_fuse_poll.cc b/tests/sys/fs/fusefs/dev_fuse_poll.cc index e9085e4f42b0..626f533ca9e0 100644 --- a/tests/sys/fs/fusefs/dev_fuse_poll.cc +++ b/tests/sys/fs/fusefs/dev_fuse_poll.cc @@ -34,6 +34,7 @@ extern "C" { #include +#include #include } @@ -71,6 +72,13 @@ class DevFusePoll: public FuseTest, public WithParamInterface { } }; +class Kqueue: public FuseTest { + virtual void SetUp() { + m_pm = KQ; + FuseTest::SetUp(); + } +}; + TEST_P(DevFusePoll, access) { expect_access(1, X_OK, 0); @@ -91,3 +99,126 @@ TEST_P(DevFusePoll, destroy) INSTANTIATE_TEST_CASE_P(PM, DevFusePoll, ::testing::Values("BLOCKING", "KQ", "POLL", "SELECT")); + +static void* statter(void* arg) { + const char *name; + struct stat sb; + + name = (const char*)arg; + stat(name, &sb); + return 0; +} + +/* + * A kevent's data field should contain the number of operations available to + * be immediately rea. + */ +TEST_F(Kqueue, data) +{ + pthread_t th0, th1, th2; + sem_t sem0, sem1; + int nready0, nready1, nready2; + uint64_t foo_ino = 42; + uint64_t bar_ino = 43; + uint64_t baz_ino = 44; + + ASSERT_EQ(0, sem_init(&sem0, 0, 0)) << strerror(errno); + ASSERT_EQ(0, sem_init(&sem1, 0, 0)) << strerror(errno); + + EXPECT_LOOKUP(1, "foo") + .WillOnce(Invoke(ReturnImmediate([=](auto in __unused, auto out) { + SET_OUT_HEADER_LEN(out, entry); + out->body.entry.entry_valid = UINT64_MAX; + out->body.entry.attr.mode = S_IFREG | 0644; + out->body.entry.nodeid = foo_ino; + }))); + EXPECT_LOOKUP(1, "bar") + .WillOnce(Invoke(ReturnImmediate([=](auto in __unused, auto out) { + SET_OUT_HEADER_LEN(out, entry); + out->body.entry.entry_valid = UINT64_MAX; + out->body.entry.attr.mode = S_IFREG | 0644; + out->body.entry.nodeid = bar_ino; + }))); + EXPECT_LOOKUP(1, "baz") + .WillOnce(Invoke(ReturnImmediate([=](auto in __unused, auto out) { + SET_OUT_HEADER_LEN(out, entry); + out->body.entry.entry_valid = UINT64_MAX; + out->body.entry.attr.mode = S_IFREG | 0644; + out->body.entry.nodeid = baz_ino; + }))); + + EXPECT_CALL(*m_mock, process( + ResultOf([=](auto in) { + return (in->header.opcode == FUSE_GETATTR && + in->header.nodeid == foo_ino); + }, Eq(true)), + _) + ) + .WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) { + nready0 = m_mock->m_nready; + + sem_post(&sem0); + // Block the daemon so we can accumulate a few more ops + sem_wait(&sem1); + + out->header.unique = in->header.unique; + out->header.error = -EIO; + out->header.len = sizeof(out->header); + }))); + + EXPECT_CALL(*m_mock, process( + ResultOf([=](auto in) { + return (in->header.opcode == FUSE_GETATTR && + in->header.nodeid == bar_ino); + }, Eq(true)), + _) + ) + .WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) { + nready1 = m_mock->m_nready; + out->header.unique = in->header.unique; + out->header.error = -EIO; + out->header.len = sizeof(out->header); + }))); + EXPECT_CALL(*m_mock, process( + ResultOf([=](auto in) { + return (in->header.opcode == FUSE_GETATTR && + in->header.nodeid == baz_ino); + }, Eq(true)), + _) + ) + .WillOnce(Invoke(ReturnImmediate([&](auto in, auto out) { + nready2 = m_mock->m_nready; + out->header.unique = in->header.unique; + out->header.error = -EIO; + out->header.len = sizeof(out->header); + }))); + + /* + * Create cached lookup entries for these files. It seems that only + * one thread at a time can be in VOP_LOOKUP for a given directory + */ + access("mountpoint/foo", F_OK); + access("mountpoint/bar", F_OK); + access("mountpoint/baz", F_OK); + ASSERT_EQ(0, pthread_create(&th0, NULL, statter, + (void*)"mountpoint/foo")) << strerror(errno); + EXPECT_EQ(0, sem_wait(&sem0)) << strerror(errno); + ASSERT_EQ(0, pthread_create(&th1, NULL, statter, + (void*)"mountpoint/bar")) << strerror(errno); + ASSERT_EQ(0, pthread_create(&th2, NULL, statter, + (void*)"mountpoint/baz")) << strerror(errno); + + nap(); // Allow th1 and th2 to send their ops to the daemon + EXPECT_EQ(0, sem_post(&sem1)) << strerror(errno); + + pthread_join(th0, NULL); + pthread_join(th1, NULL); + pthread_join(th2, NULL); + + EXPECT_EQ(1, nready0); + EXPECT_EQ(2, nready1); + EXPECT_EQ(1, nready2); + + sem_destroy(&sem0); + sem_destroy(&sem1); +} diff --git a/tests/sys/fs/fusefs/mockfs.cc b/tests/sys/fs/fusefs/mockfs.cc index 73fd5507cb43..053fff399e59 100644 --- a/tests/sys/fs/fusefs/mockfs.cc +++ b/tests/sys/fs/fusefs/mockfs.cc @@ -294,6 +294,7 @@ MockFS::MockFS(int max_readahead, bool allow_other, bool default_permissions, m_daemon_id = NULL; m_maxreadahead = max_readahead; + m_nready = -1; m_pm = pm; m_quit = false; if (m_pm == KQ) @@ -521,6 +522,7 @@ void MockFS::read_request(mockfs_buf_in *in) { FAIL() << strerror(events[0].data); else if (events[0].flags & EV_EOF) FAIL() << strerror(events[0].fflags); + m_nready = events[0].data; break; case POLL: fds[0].fd = m_fuse_fd; diff --git a/tests/sys/fs/fusefs/mockfs.hh b/tests/sys/fs/fusefs/mockfs.hh index ad02d55bb90e..b1ccc770f91a 100644 --- a/tests/sys/fs/fusefs/mockfs.hh +++ b/tests/sys/fs/fusefs/mockfs.hh @@ -163,6 +163,14 @@ ProcessMockerT ReturnImmediate( std::function f); +/* How the daemon should check /dev/fuse for readiness */ +enum poll_method { + BLOCKING, + SELECT, + POLL, + KQ +}; + /* * Fake FUSE filesystem * @@ -183,12 +191,17 @@ class MockFS { /* file descriptor of /dev/fuse control device */ int m_fuse_fd; + int m_kq; + /* The max_readahead filesystem option */ uint32_t m_maxreadahead; /* pid of the test process */ pid_t m_pid; + /* Method the daemon should use for I/O to and from /dev/fuse */ + enum poll_method m_pm; + /* Initialize a session after mounting */ void init(uint32_t flags); @@ -205,6 +218,9 @@ class MockFS { /* Read, but do not process, a single request from the kernel */ void read_request(mockfs_buf_in*); + /* Write a single response back to the kernel */ + void write_response(mockfs_buf_out *out); + public: /* pid of child process, for two-process test cases */ pid_t m_child_pid; @@ -212,13 +228,19 @@ class MockFS { /* Maximum size of a FUSE_WRITE write */ uint32_t m_max_write; + /* + * Number of events that were available from /dev/fuse after the last + * kevent call. Only valid when m_pm = KQ. + */ + int m_nready; + /* Tell the daemon to shut down ASAP */ bool m_quit; /* Create a new mockfs and mount it to a tempdir */ MockFS(int max_readahead, bool allow_other, bool default_permissions, bool push_symlinks_in, bool ro, - uint32_t flags); + enum poll_method pm, uint32_t flags); virtual ~MockFS(); /* Kill the filesystem daemon without unmounting the filesystem */