diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c index bcfa17bab2..2ecb365ba8 100644 --- a/drivers/event/dsw/dsw_evdev.c +++ b/drivers/event/dsw/dsw_evdev.c @@ -20,6 +20,7 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, struct dsw_evdev *dsw = dsw_pmd_priv(dev); struct dsw_port *port; struct rte_event_ring *in_ring; + struct rte_ring *ctl_in_ring; char ring_name[RTE_RING_NAMESIZE]; port = &dsw->ports[port_id]; @@ -42,13 +43,29 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id, if (in_ring == NULL) return -ENOMEM; + snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u", + dev->data->dev_id, port_id); + + ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE, + dev->data->socket_id, + RING_F_SC_DEQ|RING_F_EXACT_SZ); + + if (ctl_in_ring == NULL) { + rte_event_ring_free(in_ring); + return -ENOMEM; + } + port->in_ring = in_ring; + port->ctl_in_ring = ctl_in_ring; rte_atomic16_init(&port->load); port->load_update_interval = (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S; + port->migration_interval = + (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S; + dev->data->ports[port_id] = port; return 0; @@ -72,6 +89,7 @@ dsw_port_release(void *p) struct dsw_port *port = p; rte_event_ring_free(port->in_ring); + rte_ring_free(port->ctl_in_ring); } static int @@ -272,6 +290,14 @@ dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len, flush(dev_id, buf[i], flush_arg); } +static void +dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port, + eventdev_stop_flush_t flush, void *flush_arg) +{ + dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len, + flush, flush_arg); +} + static void dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port, eventdev_stop_flush_t flush, void *flush_arg) @@ -308,6 +334,7 @@ dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port = &dsw->ports[port_id]; dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg); + dsw_port_drain_paused(dev_id, port, flush, flush_arg); dsw_port_drain_in_ring(dev_id, port, flush, flush_arg); } } diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h index a5399dda5e..783c418bf1 100644 --- a/drivers/event/dsw/dsw_evdev.h +++ b/drivers/event/dsw/dsw_evdev.h @@ -73,7 +73,37 @@ #define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4) #define DSW_OLD_LOAD_WEIGHT (1) +/* The minimum time (in us) between two flow migrations. What puts an + * upper limit on the actual migration rate is primarily the pace in + * which the ports send and receive control messages, which in turn is + * largely a function of how much cycles are spent the processing of + * an event burst. + */ #define DSW_MIGRATION_INTERVAL (1000) +#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70)) +#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95)) + +#define DSW_MAX_EVENTS_RECORDED (128) + +/* Only one outstanding migration per port is allowed */ +#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS) + +/* Enough room for paus request/confirm and unpaus request/confirm for + * all possible senders. + */ +#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4) + +struct dsw_queue_flow { + uint8_t queue_id; + uint16_t flow_hash; +}; + +enum dsw_migration_state { + DSW_MIGRATION_STATE_IDLE, + DSW_MIGRATION_STATE_PAUSING, + DSW_MIGRATION_STATE_FORWARDING, + DSW_MIGRATION_STATE_UNPAUSING +}; struct dsw_port { uint16_t id; @@ -98,6 +128,7 @@ struct dsw_port { uint16_t ops_since_bg_task; + /* most recent 'background' processing */ uint64_t last_bg; /* For port load measurement. */ @@ -108,11 +139,46 @@ struct dsw_port { uint64_t busy_cycles; uint64_t total_busy_cycles; + /* For the ctl interface and flow migration mechanism. */ + uint64_t next_migration; + uint64_t migration_interval; + enum dsw_migration_state migration_state; + + uint64_t migration_start; + uint64_t migrations; + uint64_t migration_latency; + + uint8_t migration_target_port_id; + struct dsw_queue_flow migration_target_qf; + uint8_t cfm_cnt; + + uint16_t paused_flows_len; + struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS]; + + /* In a very contrived worst case all inflight events can be + * laying around paused here. + */ + uint16_t paused_events_len; + struct rte_event paused_events[DSW_MAX_EVENTS]; + + uint16_t seen_events_len; + uint16_t seen_events_idx; + struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED]; + uint16_t out_buffer_len[DSW_MAX_PORTS]; struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER]; + uint16_t in_buffer_len; + uint16_t in_buffer_start; + /* This buffer may contain events that were read up from the + * in_ring during the flow migration process. + */ + struct rte_event in_buffer[DSW_MAX_EVENTS]; + struct rte_event_ring *in_ring __rte_cache_aligned; + struct rte_ring *ctl_in_ring __rte_cache_aligned; + /* Estimate of current port load. */ rte_atomic16_t load __rte_cache_aligned; } __rte_cache_aligned; @@ -137,6 +203,20 @@ struct dsw_evdev { rte_atomic32_t credits_on_loan __rte_cache_aligned; }; +#define DSW_CTL_PAUS_REQ (0) +#define DSW_CTL_UNPAUS_REQ (1) +#define DSW_CTL_CFM (2) + +/* sizeof(struct dsw_ctl_msg) must be equal or less than + * sizeof(void *), to fit on the control ring. + */ +struct dsw_ctl_msg { + uint8_t type:2; + uint8_t originating_port_id:6; + uint8_t queue_id; + uint16_t flow_hash; +} __rte_packed; + uint16_t dsw_event_enqueue(void *port, const struct rte_event *event); uint16_t dsw_event_enqueue_burst(void *port, const struct rte_event events[], diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c index f326147c98..f0347592d8 100644 --- a/drivers/event/dsw/dsw_event.c +++ b/drivers/event/dsw/dsw_event.c @@ -5,9 +5,11 @@ #include "dsw_evdev.h" #include +#include #include #include +#include #include static bool @@ -140,6 +142,269 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now) dsw_port_load_update(port, now); } +static void +dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg) +{ + void *raw_msg; + + memcpy(&raw_msg, msg, sizeof(*msg)); + + /* there's always room on the ring */ + while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0) + rte_pause(); +} + +static int +dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg) +{ + void *raw_msg; + int rc; + + rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg); + + if (rc == 0) + memcpy(msg, &raw_msg, sizeof(*msg)); + + return rc; +} + +static void +dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port, + uint8_t type, uint8_t queue_id, uint16_t flow_hash) +{ + uint16_t port_id; + struct dsw_ctl_msg msg = { + .type = type, + .originating_port_id = source_port->id, + .queue_id = queue_id, + .flow_hash = flow_hash + }; + + for (port_id = 0; port_id < dsw->num_ports; port_id++) + if (port_id != source_port->id) + dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg); +} + +static bool +dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id, + uint16_t flow_hash) +{ + uint16_t i; + + for (i = 0; i < port->paused_flows_len; i++) { + struct dsw_queue_flow *qf = &port->paused_flows[i]; + if (qf->queue_id == queue_id && + qf->flow_hash == flow_hash) + return true; + } + return false; +} + +static void +dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) { + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + port->paused_flows_len++; +} + +static void +dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + uint16_t i; + + for (i = 0; i < port->paused_flows_len; i++) { + struct dsw_queue_flow *qf = &port->paused_flows[i]; + + if (qf->queue_id == queue_id && + qf->flow_hash == paused_flow_hash) { + uint16_t last_idx = port->paused_flows_len-1; + if (i != last_idx) + port->paused_flows[i] = + port->paused_flows[last_idx]; + port->paused_flows_len--; + break; + } + } +} + +static void +dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); + +static void +dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + struct dsw_ctl_msg cfm = { + .type = DSW_CTL_CFM, + .originating_port_id = port->id, + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + + DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n", + queue_id, paused_flow_hash); + + /* There might be already-scheduled events belonging to the + * paused flow in the output buffers. + */ + dsw_port_flush_out_buffers(dsw, port); + + dsw_port_add_paused_flow(port, queue_id, paused_flow_hash); + + /* Make sure any stores to the original port's in_ring is seen + * before the ctl message. + */ + rte_smp_wmb(); + + dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); +} + +static void +dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids, + uint8_t exclude_port_id, int16_t *port_loads, + uint8_t *target_port_id, int16_t *target_load) +{ + int16_t candidate_port_id = -1; + int16_t candidate_load = DSW_MAX_LOAD; + uint16_t i; + + for (i = 0; i < num_port_ids; i++) { + uint8_t port_id = port_ids[i]; + if (port_id != exclude_port_id) { + int16_t load = port_loads[port_id]; + if (candidate_port_id == -1 || + load < candidate_load) { + candidate_port_id = port_id; + candidate_load = load; + } + } + } + *target_port_id = candidate_port_id; + *target_load = candidate_load; +} + +struct dsw_queue_flow_burst { + struct dsw_queue_flow queue_flow; + uint16_t count; +}; + +static inline int +dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b) +{ + const struct dsw_queue_flow_burst *burst_a = v_burst_a; + const struct dsw_queue_flow_burst *burst_b = v_burst_b; + + int a_count = burst_a->count; + int b_count = burst_b->count; + + return a_count - b_count; +} + +#define DSW_QF_TO_INT(_qf) \ + ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash))) + +static inline int +dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b) +{ + const struct dsw_queue_flow *qf_a = v_qf_a; + const struct dsw_queue_flow *qf_b = v_qf_b; + + return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b); +} + +static uint16_t +dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len, + struct dsw_queue_flow_burst *bursts) +{ + uint16_t i; + struct dsw_queue_flow_burst *current_burst = NULL; + uint16_t num_bursts = 0; + + /* We don't need the stable property, and the list is likely + * large enough for qsort() to outperform dsw_stable_sort(), + * so we use qsort() here. + */ + qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf); + + /* arrange the (now-consecutive) events into bursts */ + for (i = 0; i < qfs_len; i++) { + if (i == 0 || + dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) { + current_burst = &bursts[num_bursts]; + current_burst->queue_flow = qfs[i]; + current_burst->count = 0; + num_bursts++; + } + current_burst->count++; + } + + qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst); + + return num_bursts; +} + +static bool +dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads, + int16_t load_limit) +{ + bool below_limit = false; + uint16_t i; + + for (i = 0; i < dsw->num_ports; i++) { + int16_t load = rte_atomic16_read(&dsw->ports[i].load); + if (load < load_limit) + below_limit = true; + port_loads[i] = load; + } + return below_limit; +} + +static bool +dsw_select_migration_target(struct dsw_evdev *dsw, + struct dsw_port *source_port, + struct dsw_queue_flow_burst *bursts, + uint16_t num_bursts, int16_t *port_loads, + int16_t max_load, struct dsw_queue_flow *target_qf, + uint8_t *target_port_id) +{ + uint16_t source_load = port_loads[source_port->id]; + uint16_t i; + + for (i = 0; i < num_bursts; i++) { + struct dsw_queue_flow *qf = &bursts[i].queue_flow; + + if (dsw_port_is_flow_paused(source_port, qf->queue_id, + qf->flow_hash)) + continue; + + struct dsw_queue *queue = &dsw->queues[qf->queue_id]; + int16_t target_load; + + dsw_find_lowest_load_port(queue->serving_ports, + queue->num_serving_ports, + source_port->id, port_loads, + target_port_id, &target_load); + + if (target_load < source_load && + target_load < max_load) { + *target_qf = *qf; + return true; + } + } + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, " + "no target port found with load less than %d.\n", + num_bursts, DSW_LOAD_TO_PERCENT(max_load)); + + return false; +} + static uint8_t dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash) { @@ -197,6 +462,14 @@ dsw_port_get_parallel_flow_id(struct dsw_port *port) return flow_id; } +static void +dsw_port_buffer_paused(struct dsw_port *port, + const struct rte_event *paused_event) +{ + port->paused_events[port->paused_events_len] = *paused_event; + port->paused_events_len++; +} + static void dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port, uint8_t dest_port_id, const struct rte_event *event) @@ -256,11 +529,401 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port, flow_hash = dsw_flow_id_hash(event->flow_id); + if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id, + flow_hash))) { + dsw_port_buffer_paused(source_port, event); + return; + } + dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash); dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event); } +static void +dsw_port_flush_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint8_t queue_id, uint16_t paused_flow_hash) +{ + uint16_t paused_events_len = source_port->paused_events_len; + struct rte_event paused_events[paused_events_len]; + uint8_t dest_port_id; + uint16_t i; + + if (paused_events_len == 0) + return; + + if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash)) + return; + + rte_memcpy(paused_events, source_port->paused_events, + paused_events_len * sizeof(struct rte_event)); + + source_port->paused_events_len = 0; + + dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash); + + for (i = 0; i < paused_events_len; i++) { + struct rte_event *event = &paused_events[i]; + uint16_t flow_hash; + + flow_hash = dsw_flow_id_hash(event->flow_id); + + if (event->queue_id == queue_id && + flow_hash == paused_flow_hash) + dsw_port_buffer_non_paused(dsw, source_port, + dest_port_id, event); + else + dsw_port_buffer_paused(source_port, event); + } +} + +static void +dsw_port_migration_stats(struct dsw_port *port) +{ + uint64_t migration_latency; + + migration_latency = (rte_get_timer_cycles() - port->migration_start); + port->migration_latency += migration_latency; + port->migrations++; +} + +static void +dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port) +{ + uint8_t queue_id = port->migration_target_qf.queue_id; + uint16_t flow_hash = port->migration_target_qf.flow_hash; + + port->migration_state = DSW_MIGRATION_STATE_IDLE; + port->seen_events_len = 0; + + dsw_port_migration_stats(port); + + if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) { + dsw_port_remove_paused_flow(port, queue_id, flow_hash); + dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash); + } + + DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id " + "%d flow_hash %d.\n", queue_id, flow_hash); +} + +static void +dsw_port_consider_migration(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint64_t now) +{ + bool any_port_below_limit; + struct dsw_queue_flow *seen_events = source_port->seen_events; + uint16_t seen_events_len = source_port->seen_events_len; + struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED]; + uint16_t num_bursts; + int16_t source_port_load; + int16_t port_loads[dsw->num_ports]; + + if (now < source_port->next_migration) + return; + + if (dsw->num_ports == 1) + return; + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n"); + + /* Randomize interval to avoid having all threads considering + * migration at the same in point in time, which might lead to + * all choosing the same target port. + */ + source_port->next_migration = now + + source_port->migration_interval / 2 + + rte_rand() % source_port->migration_interval; + + if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Migration already in progress.\n"); + return; + } + + /* For simplicity, avoid migration in the unlikely case there + * is still events to consume in the in_buffer (from the last + * migration). + */ + if (source_port->in_buffer_len > 0) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still " + "events in the input buffer.\n"); + return; + } + + source_port_load = rte_atomic16_read(&source_port->load); + if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Load %d is below threshold level %d.\n", + DSW_LOAD_TO_PERCENT(source_port_load), + DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)); + return; + } + + /* Avoid starting any expensive operations (sorting etc), in + * case of a scenario with all ports above the load limit. + */ + any_port_below_limit = + dsw_retrieve_port_loads(dsw, port_loads, + DSW_MAX_TARGET_LOAD_FOR_MIGRATION); + if (!any_port_below_limit) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, + "Candidate target ports are all too highly " + "loaded.\n"); + return; + } + + /* Sort flows into 'bursts' to allow attempting to migrating + * small (but still active) flows first - this it to avoid + * having large flows moving around the worker cores too much + * (to avoid cache misses, among other things). Of course, the + * number of recorded events (queue+flow ids) are limited, and + * provides only a snapshot, so only so many conclusions can + * be drawn from this data. + */ + num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len, + bursts); + /* For non-big-little systems, there's no point in moving the + * only (known) flow. + */ + if (num_bursts < 2) { + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow " + "queue_id %d flow_hash %d has been seen.\n", + bursts[0].queue_flow.queue_id, + bursts[0].queue_flow.flow_hash); + return; + } + + /* The strategy is to first try to find a flow to move to a + * port with low load (below the migration-attempt + * threshold). If that fails, we try to find a port which is + * below the max threshold, and also less loaded than this + * port is. + */ + if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts, + port_loads, + DSW_MIN_SOURCE_LOAD_FOR_MIGRATION, + &source_port->migration_target_qf, + &source_port->migration_target_port_id) + && + !dsw_select_migration_target(dsw, source_port, bursts, num_bursts, + port_loads, + DSW_MAX_TARGET_LOAD_FOR_MIGRATION, + &source_port->migration_target_qf, + &source_port->migration_target_port_id)) + return; + + DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d " + "flow_hash %d from port %d to port %d.\n", + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash, + source_port->id, source_port->migration_target_port_id); + + /* We have a winner. */ + + source_port->migration_state = DSW_MIGRATION_STATE_PAUSING; + source_port->migration_start = rte_get_timer_cycles(); + + /* No need to go through the whole pause procedure for + * parallel queues, since atomic/ordered semantics need not to + * be maintained. + */ + + if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type + == RTE_SCHED_TYPE_PARALLEL) { + uint8_t queue_id = source_port->migration_target_qf.queue_id; + uint16_t flow_hash = source_port->migration_target_qf.flow_hash; + uint8_t dest_port_id = source_port->migration_target_port_id; + + /* Single byte-sized stores are always atomic. */ + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + rte_smp_wmb(); + + dsw_port_end_migration(dsw, source_port); + + return; + } + + /* There might be 'loopback' events already scheduled in the + * output buffers. + */ + dsw_port_flush_out_buffers(dsw, source_port); + + dsw_port_add_paused_flow(source_port, + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash); + + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ, + source_port->migration_target_qf.queue_id, + source_port->migration_target_qf.flow_hash); + source_port->cfm_cnt = 0; +} + +static void +dsw_port_flush_paused_events(struct dsw_evdev *dsw, + struct dsw_port *source_port, + uint8_t queue_id, uint16_t paused_flow_hash); + +static void +dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port, + uint8_t originating_port_id, uint8_t queue_id, + uint16_t paused_flow_hash) +{ + struct dsw_ctl_msg cfm = { + .type = DSW_CTL_CFM, + .originating_port_id = port->id, + .queue_id = queue_id, + .flow_hash = paused_flow_hash + }; + + DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n", + queue_id, paused_flow_hash); + + dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash); + + rte_smp_rmb(); + + dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm); + + dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash); +} + +#define FORWARD_BURST_SIZE (32) + +static void +dsw_port_forward_migrated_flow(struct dsw_port *source_port, + struct rte_event_ring *dest_ring, + uint8_t queue_id, + uint16_t flow_hash) +{ + uint16_t events_left; + + /* Control ring message should been seen before the ring count + * is read on the port's in_ring. + */ + rte_smp_rmb(); + + events_left = rte_event_ring_count(source_port->in_ring); + + while (events_left > 0) { + uint16_t in_burst_size = + RTE_MIN(FORWARD_BURST_SIZE, events_left); + struct rte_event in_burst[in_burst_size]; + uint16_t in_len; + uint16_t i; + + in_len = rte_event_ring_dequeue_burst(source_port->in_ring, + in_burst, + in_burst_size, NULL); + /* No need to care about bursting forwarded events (to + * the destination port's in_ring), since migration + * doesn't happen very often, and also the majority of + * the dequeued events will likely *not* be forwarded. + */ + for (i = 0; i < in_len; i++) { + struct rte_event *e = &in_burst[i]; + if (e->queue_id == queue_id && + dsw_flow_id_hash(e->flow_id) == flow_hash) { + while (rte_event_ring_enqueue_burst(dest_ring, + e, 1, + NULL) != 1) + rte_pause(); + } else { + uint16_t last_idx = source_port->in_buffer_len; + source_port->in_buffer[last_idx] = *e; + source_port->in_buffer_len++; + } + } + + events_left -= in_len; + } +} + +static void +dsw_port_move_migrating_flow(struct dsw_evdev *dsw, + struct dsw_port *source_port) +{ + uint8_t queue_id = source_port->migration_target_qf.queue_id; + uint16_t flow_hash = source_port->migration_target_qf.flow_hash; + uint8_t dest_port_id = source_port->migration_target_port_id; + struct dsw_port *dest_port = &dsw->ports[dest_port_id]; + + dsw_port_flush_out_buffers(dsw, source_port); + + rte_smp_wmb(); + + dsw->queues[queue_id].flow_to_port_map[flow_hash] = + dest_port_id; + + dsw_port_forward_migrated_flow(source_port, dest_port->in_ring, + queue_id, flow_hash); + + /* Flow table update and migration destination port's enqueues + * must be seen before the control message. + */ + rte_smp_wmb(); + + dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id, + flow_hash); + source_port->cfm_cnt = 0; + source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING; +} + +static void +dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port) +{ + port->cfm_cnt++; + + if (port->cfm_cnt == (dsw->num_ports-1)) { + switch (port->migration_state) { + case DSW_MIGRATION_STATE_PAUSING: + DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding " + "migration state.\n"); + port->migration_state = DSW_MIGRATION_STATE_FORWARDING; + break; + case DSW_MIGRATION_STATE_UNPAUSING: + dsw_port_end_migration(dsw, port); + break; + default: + RTE_ASSERT(0); + break; + } + } +} + +static void +dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port) +{ + struct dsw_ctl_msg msg; + + /* So any table loads happens before the ring dequeue, in the + * case of a 'paus' message. + */ + rte_smp_rmb(); + + if (dsw_port_ctl_dequeue(port, &msg) == 0) { + switch (msg.type) { + case DSW_CTL_PAUS_REQ: + dsw_port_handle_pause_flow(dsw, port, + msg.originating_port_id, + msg.queue_id, msg.flow_hash); + break; + case DSW_CTL_UNPAUS_REQ: + dsw_port_handle_unpause_flow(dsw, port, + msg.originating_port_id, + msg.queue_id, + msg.flow_hash); + break; + case DSW_CTL_CFM: + dsw_port_handle_confirm(dsw, port); + break; + } + } +} + static void dsw_port_note_op(struct dsw_port *port, uint16_t num_events) { @@ -270,12 +933,24 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events) port->ops_since_bg_task += (num_events+1); } -static void -dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port); - static void dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) { + if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING && + port->pending_releases == 0)) + dsw_port_move_migrating_flow(dsw, port); + + /* Polling the control ring is relatively inexpensive, and + * polling it often helps bringing down migration latency, so + * do this for every iteration. + */ + dsw_port_ctl_process(dsw, port); + + /* To avoid considering migration and flushing output buffers + * on every dequeue/enqueue call, the scheduler only performs + * such 'background' tasks every nth + * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation. + */ if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) { uint64_t now; @@ -290,6 +965,8 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port) dsw_port_consider_load_update(port, now); + dsw_port_consider_migration(dsw, port, now); + port->ops_since_bg_task = 0; } } @@ -339,7 +1016,6 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[], */ if (unlikely(events_len == 0)) { dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK); - dsw_port_flush_out_buffers(dsw, source_port); return 0; } @@ -423,10 +1099,52 @@ dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait) return dsw_event_dequeue_burst(port, events, 1, wait); } +static void +dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events, + uint16_t num) +{ + uint16_t i; + + for (i = 0; i < num; i++) { + uint16_t l_idx = port->seen_events_idx; + struct dsw_queue_flow *qf = &port->seen_events[l_idx]; + struct rte_event *event = &events[i]; + qf->queue_id = event->queue_id; + qf->flow_hash = dsw_flow_id_hash(event->flow_id); + + port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED; + } + + if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED)) + port->seen_events_len = + RTE_MIN(port->seen_events_len + num, + DSW_MAX_EVENTS_RECORDED); +} + static uint16_t dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events, uint16_t num) { + struct dsw_port *source_port = port; + struct dsw_evdev *dsw = source_port->dsw; + + dsw_port_ctl_process(dsw, source_port); + + if (unlikely(port->in_buffer_len > 0)) { + uint16_t dequeued = RTE_MIN(num, port->in_buffer_len); + + rte_memcpy(events, &port->in_buffer[port->in_buffer_start], + dequeued * sizeof(struct rte_event)); + + port->in_buffer_start += dequeued; + port->in_buffer_len -= dequeued; + + if (port->in_buffer_len == 0) + port->in_buffer_start = 0; + + return dequeued; + } + return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL); } @@ -458,6 +1176,15 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num, dequeued); dsw_port_return_credits(dsw, source_port, dequeued); + + /* One potential optimization one might think of is to + * add a migration state (prior to 'pausing'), and + * only record seen events when the port is in this + * state (and transit to 'pausing' when enough events + * have been gathered). However, that schema doesn't + * seem to improve performance. + */ + dsw_port_record_seen_events(port, events, dequeued); } /* XXX: Assuming the port can't produce any more work, * consider flushing the output buffer, on dequeued ==