From 843137c39e9553cb8318ccdcd0ae4842304b370a Mon Sep 17 00:00:00 2001 From: vmaffione Date: Wed, 23 Jan 2019 14:51:36 +0000 Subject: [PATCH] netmap: improvements to the netmap kloop (CSB mode) Changelist: - Add the proper memory barriers in the kloop ring processing functions. - Fix memory barriers usage in the user helpers (nm_sync_kloop_appl_write, nm_sync_kloop_appl_read). - Fix nm_kr_txempty() helper to look at rhead rather than rcur. This is important since the kloop can read a value of rcur which is ahead of the value of rhead (see explanation in nm_sync_kloop_appl_write) - Remove obsolete ptnetmap_guest_write_kring_csb() and ptnet_guest_read_kring_csb(), and update if_ptnet(4) to use those. - Prepare in advance the arguments for netmap_sync_kloop_[tr]x_ring(), to make the kloop faster. - Provide kernel and user implementation for nm_ldld_barrier() and nm_ldst_barrier() MFC after: 2 weeks --- sys/dev/netmap/if_ptnet.c | 16 +++- sys/dev/netmap/netmap_kern.h | 53 +----------- sys/dev/netmap/netmap_kloop.c | 146 ++++++++++++++++++++++------------ sys/net/netmap.h | 33 ++++++-- 4 files changed, 139 insertions(+), 109 deletions(-) diff --git a/sys/dev/netmap/if_ptnet.c b/sys/dev/netmap/if_ptnet.c index c362018b95af..02f91ee3d888 100644 --- a/sys/dev/netmap/if_ptnet.c +++ b/sys/dev/netmap/if_ptnet.c @@ -1688,7 +1688,7 @@ ptnet_ring_update(struct ptnet_queue *pq, struct netmap_kring *kring, /* Mimic nm_txsync_prologue/nm_rxsync_prologue. */ kring->rcur = kring->rhead = head; - ptnetmap_guest_write_kring_csb(atok, kring->rcur, kring->rhead); + nm_sync_kloop_appl_write(atok, kring->rcur, kring->rhead); /* Kick the host if needed. */ if (NM_ACCESS_ONCE(ktoa->kern_need_kick)) { @@ -1764,7 +1764,12 @@ ptnet_drain_transmit_queue(struct ptnet_queue *pq, unsigned int budget, * the host. */ atok->appl_need_kick = 1; - /* Double-check. */ + /* Double check. We need a full barrier to + * prevent the store to atok->appl_need_kick + * to be reordered with the load from + * ktoa->hwcur and ktoa->hwtail (store-load + * barrier). */ + nm_stld_barrier(); ptnet_sync_tail(ktoa, kring); if (likely(PTNET_TX_NOSPACE(head, kring, minspace))) { @@ -2046,7 +2051,12 @@ ptnet_rx_eof(struct ptnet_queue *pq, unsigned int budget, bool may_resched) * last interrupt. */ atok->appl_need_kick = 1; - /* Double-check. */ + /* Double check for more completed RX slots. + * We need a full barrier to prevent the store + * to atok->appl_need_kick to be reordered with + * the load from ktoa->hwcur and ktoa->hwtail + * (store-load barrier). */ + nm_stld_barrier(); ptnet_sync_tail(ktoa, kring); if (likely(head == ring->tail)) { break; diff --git a/sys/dev/netmap/netmap_kern.h b/sys/dev/netmap/netmap_kern.h index e30fef343732..e9b83a23532b 100644 --- a/sys/dev/netmap/netmap_kern.h +++ b/sys/dev/netmap/netmap_kern.h @@ -1159,7 +1159,7 @@ nm_kr_rxspace(struct netmap_kring *k) static inline int nm_kr_txempty(struct netmap_kring *kring) { - return kring->rcur == kring->nr_hwtail; + return kring->rhead == kring->nr_hwtail; } /* True if no more completed slots in the rx ring, only valid after @@ -2245,61 +2245,14 @@ int ptnet_nm_krings_create(struct netmap_adapter *na); void ptnet_nm_krings_delete(struct netmap_adapter *na); void ptnet_nm_dtor(struct netmap_adapter *na); -/* Guest driver: Write kring pointers (cur, head) to the CSB. - * This routine is coupled with ptnetmap_host_read_kring_csb(). */ -static inline void -ptnetmap_guest_write_kring_csb(struct nm_csb_atok *atok, uint32_t cur, - uint32_t head) -{ - /* - * We need to write cur and head to the CSB but we cannot do it atomically. - * There is no way we can prevent the host from reading the updated value - * of one of the two and the old value of the other. However, if we make - * sure that the host never reads a value of head more recent than the - * value of cur we are safe. We can allow the host to read a value of cur - * more recent than the value of head, since in the netmap ring cur can be - * ahead of head and cur cannot wrap around head because it must be behind - * tail. Inverting the order of writes below could instead result into the - * host to think head went ahead of cur, which would cause the sync - * prologue to fail. - * - * The following memory barrier scheme is used to make this happen: - * - * Guest Host - * - * STORE(cur) LOAD(head) - * mb() <-----------> mb() - * STORE(head) LOAD(cur) - */ - atok->cur = cur; - nm_stst_barrier(); - atok->head = head; -} - -/* Guest driver: Read kring pointers (hwcur, hwtail) from the CSB. - * This routine is coupled with ptnetmap_host_write_kring_csb(). */ -static inline void -ptnetmap_guest_read_kring_csb(struct nm_csb_ktoa *ktoa, - struct netmap_kring *kring) -{ - /* - * We place a memory barrier to make sure that the update of hwtail never - * overtakes the update of hwcur. - * (see explanation in ptnetmap_host_write_kring_csb). - */ - kring->nr_hwtail = ktoa->hwtail; - nm_stst_barrier(); - kring->nr_hwcur = ktoa->hwcur; -} - -/* Helper function wrapping ptnetmap_guest_read_kring_csb(). */ +/* Helper function wrapping nm_sync_kloop_appl_read(). */ static inline void ptnet_sync_tail(struct nm_csb_ktoa *ktoa, struct netmap_kring *kring) { struct netmap_ring *ring = kring->ring; /* Update hwcur and hwtail as known by the host. */ - ptnetmap_guest_read_kring_csb(ktoa, kring); + nm_sync_kloop_appl_read(ktoa, &kring->nr_hwtail, &kring->nr_hwcur); /* nm_sync_finalize */ ring->tail = kring->rtail = kring->nr_hwtail; diff --git a/sys/dev/netmap/netmap_kloop.c b/sys/dev/netmap/netmap_kloop.c index 11ec7091467f..5a6243e16cc1 100644 --- a/sys/dev/netmap/netmap_kloop.c +++ b/sys/dev/netmap/netmap_kloop.c @@ -66,8 +66,12 @@ static inline void sync_kloop_kernel_write(struct nm_csb_ktoa __user *ptr, uint32_t hwcur, uint32_t hwtail) { + /* Issue a first store-store barrier to make sure writes to the + * netmap ring do not overcome updates on ktoa->hwcur and ktoa->hwtail. */ + nm_stst_barrier(); + /* - * The same scheme used in ptnetmap_guest_write_kring_csb() applies here. + * The same scheme used in nm_sync_kloop_appl_write() applies here. * We allow the application to read a value of hwcur more recent than the value * of hwtail, since this would anyway result in a consistent view of the * ring state (and hwcur can never wraparound hwtail, since hwcur must be @@ -75,11 +79,11 @@ sync_kloop_kernel_write(struct nm_csb_ktoa __user *ptr, uint32_t hwcur, * * The following memory barrier scheme is used to make this happen: * - * Application Kernel + * Application Kernel * - * STORE(hwcur) LOAD(hwtail) - * mb() <-------------> mb() - * STORE(hwtail) LOAD(hwcur) + * STORE(hwcur) LOAD(hwtail) + * wmb() <-------------> rmb() + * STORE(hwtail) LOAD(hwcur) */ CSB_WRITE(ptr, hwcur, hwcur); nm_stst_barrier(); @@ -96,12 +100,16 @@ sync_kloop_kernel_read(struct nm_csb_atok __user *ptr, /* * We place a memory barrier to make sure that the update of head never * overtakes the update of cur. - * (see explanation in ptnetmap_guest_write_kring_csb). + * (see explanation in sync_kloop_kernel_write). */ CSB_READ(ptr, head, shadow_ring->head); - nm_stst_barrier(); + nm_ldld_barrier(); CSB_READ(ptr, cur, shadow_ring->cur); CSB_READ(ptr, sync_flags, shadow_ring->flags); + + /* Make sure that loads from atok->head and atok->cur are not delayed + * after the loads from the netmap ring. */ + nm_ldld_barrier(); } /* Enable or disable application --> kernel kicks. */ @@ -127,10 +135,10 @@ csb_atok_intr_enabled(struct nm_csb_atok __user *csb_atok) static inline void sync_kloop_kring_dump(const char *title, const struct netmap_kring *kring) { - nm_prinf("%s - name: %s hwcur: %d hwtail: %d " - "rhead: %d rcur: %d rtail: %d", - title, kring->name, kring->nr_hwcur, kring->nr_hwtail, - kring->rhead, kring->rcur, kring->rtail); + nm_prinf("%s, kring %s, hwcur %d, rhead %d, " + "rcur %d, rtail %d, hwtail %d", + title, kring->name, kring->nr_hwcur, kring->rhead, + kring->rcur, kring->rtail, kring->nr_hwtail); } struct sync_kloop_ring_args { @@ -240,7 +248,8 @@ netmap_sync_kloop_tx_ring(const struct sync_kloop_ring_args *a) */ /* Reenable notifications. */ csb_ktoa_kick_enable(csb_ktoa, 1); - /* Doublecheck. */ + /* Double check, with store-load memory barrier. */ + nm_stld_barrier(); sync_kloop_kernel_read(csb_atok, &shadow_ring, num_slots); if (shadow_ring.head != kring->rhead) { /* We won the race condition, there are more packets to @@ -358,7 +367,8 @@ netmap_sync_kloop_rx_ring(const struct sync_kloop_ring_args *a) */ /* Reenable notifications. */ csb_ktoa_kick_enable(csb_ktoa, 1); - /* Doublecheck. */ + /* Double check, with store-load memory barrier. */ + nm_stld_barrier(); sync_kloop_kernel_read(csb_atok, &shadow_ring, num_slots); if (!sync_kloop_norxslots(kring, shadow_ring.head)) { /* We won the race condition, more slots are available. Disable @@ -439,6 +449,7 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) struct sync_kloop_poll_ctx *poll_ctx = NULL; #endif /* SYNC_KLOOP_POLL */ int num_rx_rings, num_tx_rings, num_rings; + struct sync_kloop_ring_args *args = NULL; uint32_t sleep_us = req->sleep_us; struct nm_csb_atok* csb_atok_base; struct nm_csb_ktoa* csb_ktoa_base; @@ -488,6 +499,12 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) num_tx_rings = priv->np_qlast[NR_TX] - priv->np_qfirst[NR_TX]; num_rings = num_tx_rings + num_rx_rings; + args = nm_os_malloc(num_rings * sizeof(args[0])); + if (!args) { + err = ENOMEM; + goto out; + } + /* Validate notification options. */ opt = nmreq_findoption((struct nmreq_option *)(uintptr_t)hdr->nr_options, NETMAP_REQ_OPT_SYNC_KLOOP_EVENTFDS); @@ -558,8 +575,8 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) si[NR_TX] = nm_si_user(priv, NR_TX) ? &na->si[NR_TX] : &na->tx_rings[priv->np_qfirst[NR_TX]]->si; NMG_UNLOCK(); - poll_wait(priv->np_filp, si[NR_RX], &poll_ctx->wait_table); poll_wait(priv->np_filp, si[NR_TX], &poll_ctx->wait_table); + poll_wait(priv->np_filp, si[NR_RX], &poll_ctx->wait_table); } #else /* SYNC_KLOOP_POLL */ opt->nro_status = EOPNOTSUPP; @@ -567,6 +584,31 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) #endif /* SYNC_KLOOP_POLL */ } + /* Prepare the arguments for netmap_sync_kloop_tx_ring() + * and netmap_sync_kloop_rx_ring(). */ + for (i = 0; i < num_tx_rings; i++) { + struct sync_kloop_ring_args *a = args + i; + + a->kring = NMR(na, NR_TX)[i + priv->np_qfirst[NR_TX]]; + a->csb_atok = csb_atok_base + i; + a->csb_ktoa = csb_ktoa_base + i; +#ifdef SYNC_KLOOP_POLL + if (poll_ctx) + a->irq_ctx = poll_ctx->entries[i].irq_ctx; +#endif /* SYNC_KLOOP_POLL */ + } + for (i = 0; i < num_rx_rings; i++) { + struct sync_kloop_ring_args *a = args + num_tx_rings + i; + + a->kring = NMR(na, NR_RX)[i + priv->np_qfirst[NR_RX]]; + a->csb_atok = csb_atok_base + num_tx_rings + i; + a->csb_ktoa = csb_ktoa_base + num_tx_rings + i; +#ifdef SYNC_KLOOP_POLL + if (poll_ctx) + a->irq_ctx = poll_ctx->entries[num_tx_rings + i].irq_ctx; +#endif /* SYNC_KLOOP_POLL */ + } + /* Main loop. */ for (;;) { if (unlikely(NM_ACCESS_ONCE(priv->np_kloop_state) & NM_SYNC_KLOOP_STOPPING)) { @@ -574,47 +616,40 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) } #ifdef SYNC_KLOOP_POLL - if (poll_ctx) - __set_current_state(TASK_INTERRUPTIBLE); + if (poll_ctx) { + /* It is important to set the task state as + * interruptible before processing any TX/RX ring, + * so that if a notification on ring Y comes after + * we have processed ring Y, but before we call + * schedule(), we don't miss it. This is true because + * the wake up function will change the the task state, + * and therefore the schedule_timeout() call below + * will observe the change). + */ + set_current_state(TASK_INTERRUPTIBLE); + } #endif /* SYNC_KLOOP_POLL */ /* Process all the TX rings bound to this file descriptor. */ for (i = 0; i < num_tx_rings; i++) { - struct sync_kloop_ring_args a = { - .kring = NMR(na, NR_TX)[i + priv->np_qfirst[NR_TX]], - .csb_atok = csb_atok_base + i, - .csb_ktoa = csb_ktoa_base + i, - }; + struct sync_kloop_ring_args *a = args + i; -#ifdef SYNC_KLOOP_POLL - if (poll_ctx) - a.irq_ctx = poll_ctx->entries[i].irq_ctx; -#endif /* SYNC_KLOOP_POLL */ - if (unlikely(nm_kr_tryget(a.kring, 1, NULL))) { + if (unlikely(nm_kr_tryget(a->kring, 1, NULL))) { continue; } - netmap_sync_kloop_tx_ring(&a); - nm_kr_put(a.kring); + netmap_sync_kloop_tx_ring(a); + nm_kr_put(a->kring); } /* Process all the RX rings bound to this file descriptor. */ for (i = 0; i < num_rx_rings; i++) { - struct sync_kloop_ring_args a = { - .kring = NMR(na, NR_RX)[i + priv->np_qfirst[NR_RX]], - .csb_atok = csb_atok_base + num_tx_rings + i, - .csb_ktoa = csb_ktoa_base + num_tx_rings + i, - }; + struct sync_kloop_ring_args *a = args + num_tx_rings + i; -#ifdef SYNC_KLOOP_POLL - if (poll_ctx) - a.irq_ctx = poll_ctx->entries[num_tx_rings + i].irq_ctx; -#endif /* SYNC_KLOOP_POLL */ - - if (unlikely(nm_kr_tryget(a.kring, 1, NULL))) { + if (unlikely(nm_kr_tryget(a->kring, 1, NULL))) { continue; } - netmap_sync_kloop_rx_ring(&a); - nm_kr_put(a.kring); + netmap_sync_kloop_rx_ring(a); + nm_kr_put(a->kring); } #ifdef SYNC_KLOOP_POLL @@ -622,7 +657,7 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) /* If a poll context is present, yield to the scheduler * waiting for a notification to come either from * netmap or the application. */ - schedule_timeout_interruptible(msecs_to_jiffies(1000)); + schedule_timeout(msecs_to_jiffies(20000)); } else #endif /* SYNC_KLOOP_POLL */ { @@ -657,6 +692,11 @@ netmap_sync_kloop(struct netmap_priv_d *priv, struct nmreq_header *hdr) } #endif /* SYNC_KLOOP_POLL */ + if (args) { + nm_os_free(args); + args = NULL; + } + /* Reset the kloop state. */ NMG_LOCK(); priv->np_kloop_state = 0; @@ -719,7 +759,7 @@ netmap_pt_guest_txsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, * packets. */ kring->nr_hwcur = ktoa->hwcur; - ptnetmap_guest_write_kring_csb(atok, kring->rcur, kring->rhead); + nm_sync_kloop_appl_write(atok, kring->rcur, kring->rhead); /* Ask for a kick from a guest to the host if needed. */ if (((kring->rhead != kring->nr_hwcur || nm_kr_txempty(kring)) @@ -733,7 +773,8 @@ netmap_pt_guest_txsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, * Second part: reclaim buffers for completed transmissions. */ if (nm_kr_txempty(kring) || (flags & NAF_FORCE_RECLAIM)) { - ptnetmap_guest_read_kring_csb(ktoa, kring); + nm_sync_kloop_appl_read(ktoa, &kring->nr_hwtail, + &kring->nr_hwcur); } /* @@ -744,8 +785,10 @@ netmap_pt_guest_txsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, if (nm_kr_txempty(kring) && !(kring->nr_kflags & NKR_NOINTR)) { /* Reenable notifications. */ atok->appl_need_kick = 1; - /* Double check */ - ptnetmap_guest_read_kring_csb(ktoa, kring); + /* Double check, with store-load memory barrier. */ + nm_stld_barrier(); + nm_sync_kloop_appl_read(ktoa, &kring->nr_hwtail, + &kring->nr_hwcur); /* If there is new free space, disable notifications */ if (unlikely(!nm_kr_txempty(kring))) { atok->appl_need_kick = 0; @@ -784,7 +827,7 @@ netmap_pt_guest_rxsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, * hwtail to the hwtail known from the host (read from the CSB). * This also updates the kring hwcur. */ - ptnetmap_guest_read_kring_csb(ktoa, kring); + nm_sync_kloop_appl_read(ktoa, &kring->nr_hwtail, &kring->nr_hwcur); kring->nr_kflags &= ~NKR_PENDINTR; /* @@ -792,8 +835,7 @@ netmap_pt_guest_rxsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, * released, by updating cur and head in the CSB. */ if (kring->rhead != kring->nr_hwcur) { - ptnetmap_guest_write_kring_csb(atok, kring->rcur, - kring->rhead); + nm_sync_kloop_appl_write(atok, kring->rcur, kring->rhead); /* Ask for a kick from the guest to the host if needed. */ if (NM_ACCESS_ONCE(ktoa->kern_need_kick)) { atok->sync_flags = flags; @@ -809,8 +851,10 @@ netmap_pt_guest_rxsync(struct nm_csb_atok *atok, struct nm_csb_ktoa *ktoa, if (nm_kr_rxempty(kring) && !(kring->nr_kflags & NKR_NOINTR)) { /* Reenable notifications. */ atok->appl_need_kick = 1; - /* Double check */ - ptnetmap_guest_read_kring_csb(ktoa, kring); + /* Double check, with store-load memory barrier. */ + nm_stld_barrier(); + nm_sync_kloop_appl_read(ktoa, &kring->nr_hwtail, + &kring->nr_hwcur); /* If there are new slots, disable notifications. */ if (!nm_kr_rxempty(kring)) { atok->appl_need_kick = 0; diff --git a/sys/net/netmap.h b/sys/net/netmap.h index f3688fc9e919..098d369b07d6 100644 --- a/sys/net/netmap.h +++ b/sys/net/netmap.h @@ -769,6 +769,8 @@ struct nm_csb_ktoa { #ifdef __KERNEL__ #define nm_stst_barrier smp_wmb +#define nm_ldld_barrier smp_rmb +#define nm_stld_barrier smp_mb #else /* !__KERNEL__ */ static inline void nm_stst_barrier(void) { @@ -777,18 +779,31 @@ static inline void nm_stst_barrier(void) * which is fine for us. */ __atomic_thread_fence(__ATOMIC_RELEASE); } +static inline void nm_ldld_barrier(void) +{ + /* A memory barrier with acquire semantic has the combined + * effect of a load-load barrier and a store-load barrier, + * which is fine for us. */ + __atomic_thread_fence(__ATOMIC_ACQUIRE); +} #endif /* !__KERNEL__ */ #elif defined(__FreeBSD__) #ifdef _KERNEL #define nm_stst_barrier atomic_thread_fence_rel +#define nm_ldld_barrier atomic_thread_fence_acq +#define nm_stld_barrier atomic_thread_fence_seq_cst #else /* !_KERNEL */ #include static inline void nm_stst_barrier(void) { atomic_thread_fence(memory_order_release); } +static inline void nm_ldld_barrier(void) +{ + atomic_thread_fence(memory_order_acquire); +} #endif /* !_KERNEL */ #else /* !__linux__ && !__FreeBSD__ */ @@ -801,6 +816,10 @@ static inline void nm_sync_kloop_appl_write(struct nm_csb_atok *atok, uint32_t cur, uint32_t head) { + /* Issue a first store-store barrier to make sure writes to the + * netmap ring do not overcome updates on atok->cur and atok->head. */ + nm_stst_barrier(); + /* * We need to write cur and head to the CSB but we cannot do it atomically. * There is no way we can prevent the host from reading the updated value @@ -815,11 +834,11 @@ nm_sync_kloop_appl_write(struct nm_csb_atok *atok, uint32_t cur, * * The following memory barrier scheme is used to make this happen: * - * Guest Host + * Guest Host * - * STORE(cur) LOAD(head) - * mb() <-----------> mb() - * STORE(head) LOAD(cur) + * STORE(cur) LOAD(head) + * wmb() <-----------> rmb() + * STORE(head) LOAD(cur) * */ atok->cur = cur; @@ -839,8 +858,12 @@ nm_sync_kloop_appl_read(struct nm_csb_ktoa *ktoa, uint32_t *hwtail, * (see explanation in sync_kloop_kernel_write). */ *hwtail = ktoa->hwtail; - nm_stst_barrier(); + nm_ldld_barrier(); *hwcur = ktoa->hwcur; + + /* Make sure that loads from ktoa->hwtail and ktoa->hwcur are not delayed + * after the loads from the netmap ring. */ + nm_ldld_barrier(); } /*