event/opdl: fix atomic queue race condition
If application link one atomic queue to multiple ports, and each worker core update flow_id, there will have a chance to hit race condition issue and lead to double processing same event. This fix solve the problem and eliminate the race condition issue. Fixes: 4236ce9bf5bf ("event/opdl: add OPDL ring infrastructure library") Cc: stable@dpdk.org Signed-off-by: Liang Ma <liang.j.ma@intel.com> Signed-off-by: Peter Mccarthy <peter.mccarthy@intel.com> Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
This commit is contained in:
parent
4deeb214ac
commit
b770f952de
@ -733,6 +733,9 @@ initialise_all_other_ports(struct rte_eventdev *dev)
|
||||
queue->ports[queue->nb_ports] = port;
|
||||
port->instance_id = queue->nb_ports;
|
||||
queue->nb_ports++;
|
||||
opdl_stage_set_queue_id(stage_inst,
|
||||
port->queue_id);
|
||||
|
||||
} else if (queue->q_pos == OPDL_Q_POS_END) {
|
||||
|
||||
/* tx port */
|
||||
|
@ -25,7 +25,10 @@
|
||||
#define OPDL_NAME_SIZE 64
|
||||
|
||||
|
||||
#define OPDL_EVENT_MASK (0xFFFF0000000FFFFFULL)
|
||||
#define OPDL_EVENT_MASK (0x00000000000FFFFFULL)
|
||||
#define OPDL_FLOWID_MASK (0xFFFFF)
|
||||
#define OPDL_OPA_MASK (0xFF)
|
||||
#define OPDL_OPA_OFFSET (0x38)
|
||||
|
||||
int opdl_logtype_driver;
|
||||
|
||||
@ -86,7 +89,6 @@ struct opdl_stage {
|
||||
*/
|
||||
uint32_t available_seq;
|
||||
uint32_t head; /* Current head for single-thread operation */
|
||||
uint32_t shadow_head; /* Shadow head for single-thread operation */
|
||||
uint32_t nb_instance; /* Number of instances */
|
||||
uint32_t instance_id; /* ID of this stage instance */
|
||||
uint16_t num_claimed; /* Number of slots claimed */
|
||||
@ -102,6 +104,9 @@ struct opdl_stage {
|
||||
/* For managing disclaims in multi-threaded processing stages */
|
||||
struct claim_manager pending_disclaims[RTE_MAX_LCORE]
|
||||
__rte_cache_aligned;
|
||||
uint32_t shadow_head; /* Shadow head for single-thread operation */
|
||||
uint32_t queue_id; /* ID of Queue which is assigned to this stage */
|
||||
uint32_t pos; /* Atomic scan position */
|
||||
} __rte_cache_aligned;
|
||||
|
||||
/* Context for opdl_ring */
|
||||
@ -494,6 +499,9 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
|
||||
uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
|
||||
{
|
||||
uint32_t i = 0, j = 0, offset;
|
||||
uint32_t opa_id = 0;
|
||||
uint32_t flow_id = 0;
|
||||
uint64_t event = 0;
|
||||
void *get_slots;
|
||||
struct rte_event *ev;
|
||||
RTE_SET_USED(seq);
|
||||
@ -520,7 +528,17 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
|
||||
|
||||
for (j = 0; j < num_entries; j++) {
|
||||
ev = (struct rte_event *)get_slot(t, s->head+j);
|
||||
if ((ev->flow_id%s->nb_instance) == s->instance_id) {
|
||||
|
||||
event = __atomic_load_n(&(ev->event),
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
|
||||
flow_id = OPDL_FLOWID_MASK & event;
|
||||
|
||||
if (opa_id >= s->queue_id)
|
||||
continue;
|
||||
|
||||
if ((flow_id % s->nb_instance) == s->instance_id) {
|
||||
memcpy(entries_offset, ev, t->slot_size);
|
||||
entries_offset += t->slot_size;
|
||||
i++;
|
||||
@ -531,6 +549,7 @@ opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
|
||||
s->head += num_entries;
|
||||
s->num_claimed = num_entries;
|
||||
s->num_event = i;
|
||||
s->pos = 0;
|
||||
|
||||
/* automatically disclaim entries if number of rte_events is zero */
|
||||
if (unlikely(i == 0))
|
||||
@ -953,21 +972,26 @@ opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
|
||||
}
|
||||
|
||||
bool
|
||||
opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
|
||||
opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
|
||||
uint32_t index, bool atomic)
|
||||
{
|
||||
uint32_t i = 0, j = 0, offset;
|
||||
uint32_t i = 0, offset;
|
||||
struct opdl_ring *t = s->t;
|
||||
struct rte_event *ev_orig = NULL;
|
||||
bool ev_updated = false;
|
||||
uint64_t ev_temp = 0;
|
||||
uint64_t ev_temp = 0;
|
||||
uint64_t ev_update = 0;
|
||||
|
||||
uint32_t opa_id = 0;
|
||||
uint32_t flow_id = 0;
|
||||
uint64_t event = 0;
|
||||
|
||||
if (index > s->num_event) {
|
||||
PMD_DRV_LOG(ERR, "index is overflow");
|
||||
return ev_updated;
|
||||
}
|
||||
|
||||
ev_temp = ev->event&OPDL_EVENT_MASK;
|
||||
ev_temp = ev->event & OPDL_EVENT_MASK;
|
||||
|
||||
if (!atomic) {
|
||||
offset = opdl_first_entry_id(s->seq, s->nb_instance,
|
||||
@ -984,27 +1008,39 @@ opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
|
||||
}
|
||||
|
||||
} else {
|
||||
for (i = 0; i < s->num_claimed; i++) {
|
||||
for (i = s->pos; i < s->num_claimed; i++) {
|
||||
ev_orig = (struct rte_event *)
|
||||
get_slot(t, s->shadow_head+i);
|
||||
|
||||
if ((ev_orig->flow_id%s->nb_instance) ==
|
||||
s->instance_id) {
|
||||
event = __atomic_load_n(&(ev_orig->event),
|
||||
__ATOMIC_ACQUIRE);
|
||||
|
||||
if (j == index) {
|
||||
if ((ev_orig->event&OPDL_EVENT_MASK) !=
|
||||
ev_temp) {
|
||||
ev_orig->event = ev->event;
|
||||
ev_updated = true;
|
||||
}
|
||||
if (ev_orig->u64 != ev->u64) {
|
||||
ev_orig->u64 = ev->u64;
|
||||
ev_updated = true;
|
||||
}
|
||||
opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
|
||||
flow_id = OPDL_FLOWID_MASK & event;
|
||||
|
||||
break;
|
||||
if (opa_id >= s->queue_id)
|
||||
continue;
|
||||
|
||||
if ((flow_id % s->nb_instance) == s->instance_id) {
|
||||
ev_update = s->queue_id;
|
||||
ev_update = (ev_update << OPDL_OPA_OFFSET)
|
||||
| ev->event;
|
||||
|
||||
s->pos = i + 1;
|
||||
|
||||
if ((event & OPDL_EVENT_MASK) !=
|
||||
ev_temp) {
|
||||
__atomic_store_n(&(ev_orig->event),
|
||||
ev_update,
|
||||
__ATOMIC_RELEASE);
|
||||
ev_updated = true;
|
||||
}
|
||||
j++;
|
||||
if (ev_orig->u64 != ev->u64) {
|
||||
ev_orig->u64 = ev->u64;
|
||||
ev_updated = true;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1049,11 +1085,7 @@ check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
|
||||
return -EINVAL;
|
||||
}
|
||||
}
|
||||
if (num_deps > t->num_stages) {
|
||||
PMD_DRV_LOG(ERR, "num_deps (%u) > number stages (%u)",
|
||||
num_deps, t->num_stages);
|
||||
return -EINVAL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1153,6 +1185,13 @@ opdl_stage_get_opdl_ring(const struct opdl_stage *s)
|
||||
return s->t;
|
||||
}
|
||||
|
||||
void
|
||||
opdl_stage_set_queue_id(struct opdl_stage *s,
|
||||
uint32_t queue_id)
|
||||
{
|
||||
s->queue_id = queue_id;
|
||||
}
|
||||
|
||||
void
|
||||
opdl_ring_dump(const struct opdl_ring *t, FILE *f)
|
||||
{
|
||||
|
@ -518,6 +518,20 @@ opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries);
|
||||
struct opdl_stage *
|
||||
opdl_stage_create(struct opdl_ring *t, bool threadsafe);
|
||||
|
||||
|
||||
/**
|
||||
* Set the internal queue id for each stage instance.
|
||||
*
|
||||
* @param s
|
||||
* The pointer of stage instance.
|
||||
*
|
||||
* @param queue_id
|
||||
* The value of internal queue id.
|
||||
*/
|
||||
void
|
||||
opdl_stage_set_queue_id(struct opdl_stage *s,
|
||||
uint32_t queue_id);
|
||||
|
||||
/**
|
||||
* Prints information on opdl_ring instance and all its stages
|
||||
*
|
||||
@ -590,7 +604,7 @@ opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe);
|
||||
*/
|
||||
|
||||
bool
|
||||
opdl_ring_cas_slot(const struct opdl_stage *s, const struct rte_event *ev,
|
||||
opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
|
||||
uint32_t index, bool atomic);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
Loading…
x
Reference in New Issue
Block a user