event/dlb2: add delayed token pop logic

The code contained in this commit was inadvertently omitted
when dissecting the dlb2 code base into discrete patches for
upstream.

Signed-off-by: Timothy McDaniel <timothy.mcdaniel@intel.com>
Reviewed-by: Mike Ximing Chen <mike.ximing.chen@intel.com>
This commit is contained in:
Timothy McDaniel 2020-11-11 14:26:59 -06:00 committed by Jerin Jacob
parent 95aa7101cd
commit 07d55c418d
2 changed files with 201 additions and 117 deletions

View File

@ -1082,6 +1082,25 @@ error_exit:
return ret;
}
static inline uint16_t
dlb2_event_enqueue_delayed(void *event_port,
const struct rte_event events[]);
static inline uint16_t
dlb2_event_enqueue_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num);
static inline uint16_t
dlb2_event_enqueue_new_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num);
static inline uint16_t
dlb2_event_enqueue_forward_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num);
static int
dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
struct dlb2_eventdev_port *ev_port,
@ -1198,6 +1217,20 @@ dlb2_hw_create_ldb_port(struct dlb2_eventdev *dlb2,
qm_port->dequeue_depth = dequeue_depth;
qm_port->token_pop_thresh = dequeue_depth;
/* The default enqueue functions do not include delayed-pop support for
* performance reasons.
*/
if (qm_port->token_pop_mode == DELAYED_POP) {
dlb2->event_dev->enqueue = dlb2_event_enqueue_delayed;
dlb2->event_dev->enqueue_burst =
dlb2_event_enqueue_burst_delayed;
dlb2->event_dev->enqueue_new_burst =
dlb2_event_enqueue_new_burst_delayed;
dlb2->event_dev->enqueue_forward_burst =
dlb2_event_enqueue_forward_burst_delayed;
}
qm_port->owed_tokens = 0;
qm_port->issued_releases = 0;
@ -2427,11 +2460,6 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
case 3:
case 2:
case 1:
/* At least one QE will be valid, so only zero out three */
qe[1].cmd_byte = 0;
qe[2].cmd_byte = 0;
qe[3].cmd_byte = 0;
for (i = 0; i < num; i++) {
qe[i].cmd_byte =
cmd_byte_map[qm_port->is_directed][ev[i].op];
@ -2452,6 +2480,8 @@ dlb2_event_build_hcws(struct dlb2_port *qm_port,
qe[i].u.event_type.sub = ev[i].sub_event_type;
}
break;
case 0:
break;
}
}
@ -2578,29 +2608,57 @@ op_check:
}
static inline uint16_t
dlb2_event_enqueue_burst(void *event_port,
const struct rte_event events[],
uint16_t num)
__dlb2_event_enqueue_burst(void *event_port,
const struct rte_event events[],
uint16_t num,
bool use_delayed)
{
struct dlb2_eventdev_port *ev_port = event_port;
struct dlb2_port *qm_port = &ev_port->qm_port;
struct process_local_port_data *port_data;
int i, cnt;
int i;
RTE_ASSERT(ev_port->enq_configured);
RTE_ASSERT(events != NULL);
cnt = 0;
i = 0;
port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
for (i = 0; i < num; i += DLB2_NUM_QES_PER_CACHE_LINE) {
while (i < num) {
uint8_t sched_types[DLB2_NUM_QES_PER_CACHE_LINE];
uint8_t queue_ids[DLB2_NUM_QES_PER_CACHE_LINE];
int pop_offs = 0;
int j = 0;
memset(qm_port->qe4,
0,
DLB2_NUM_QES_PER_CACHE_LINE *
sizeof(struct dlb2_enqueue_qe));
for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
const struct rte_event *ev = &events[i + j];
int16_t thresh = qm_port->token_pop_thresh;
if (use_delayed &&
qm_port->token_pop_mode == DELAYED_POP &&
(ev->op == RTE_EVENT_OP_FORWARD ||
ev->op == RTE_EVENT_OP_RELEASE) &&
qm_port->issued_releases >= thresh - 1) {
/* Insert the token pop QE and break out. This
* may result in a partial HCW, but that is
* simpler than supporting arbitrary QE
* insertion.
*/
dlb2_construct_token_pop_qe(qm_port, j);
/* Reset the releases for the next QE batch */
qm_port->issued_releases -= thresh;
pop_offs = 1;
j++;
break;
}
if (dlb2_event_enqueue_prep(ev_port, qm_port, ev,
&sched_types[j],
@ -2611,38 +2669,52 @@ dlb2_event_enqueue_burst(void *event_port,
if (j == 0)
break;
dlb2_event_build_hcws(qm_port, &events[i], j,
dlb2_event_build_hcws(qm_port, &events[i], j - pop_offs,
sched_types, queue_ids);
if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
dlb2_construct_token_pop_qe(qm_port, j);
/* Reset the releases counter for the next QE batch */
qm_port->issued_releases -= qm_port->token_pop_thresh;
}
dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
cnt += j;
/* Don't include the token pop QE in the enqueue count */
i += j - pop_offs;
if (j < DLB2_NUM_QES_PER_CACHE_LINE)
/* Don't interpret j < DLB2_NUM_... as out-of-credits if
* pop_offs != 0
*/
if (j < DLB2_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
break;
}
if (qm_port->token_pop_mode == DELAYED_POP &&
qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
qm_port->issued_releases -= qm_port->token_pop_thresh;
}
return cnt;
return i;
}
static uint16_t
dlb2_event_enqueue_burst(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return __dlb2_event_enqueue_burst(event_port, events, num, false);
}
static uint16_t
dlb2_event_enqueue_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return __dlb2_event_enqueue_burst(event_port, events, num, true);
}
static inline uint16_t
dlb2_event_enqueue(void *event_port,
const struct rte_event events[])
{
return dlb2_event_enqueue_burst(event_port, events, 1);
return __dlb2_event_enqueue_burst(event_port, events, 1, false);
}
static inline uint16_t
dlb2_event_enqueue_delayed(void *event_port,
const struct rte_event events[])
{
return __dlb2_event_enqueue_burst(event_port, events, 1, true);
}
static uint16_t
@ -2650,7 +2722,15 @@ dlb2_event_enqueue_new_burst(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return dlb2_event_enqueue_burst(event_port, events, num);
return __dlb2_event_enqueue_burst(event_port, events, num, false);
}
static uint16_t
dlb2_event_enqueue_new_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return __dlb2_event_enqueue_burst(event_port, events, num, true);
}
static uint16_t
@ -2658,7 +2738,93 @@ dlb2_event_enqueue_forward_burst(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return dlb2_event_enqueue_burst(event_port, events, num);
return __dlb2_event_enqueue_burst(event_port, events, num, false);
}
static uint16_t
dlb2_event_enqueue_forward_burst_delayed(void *event_port,
const struct rte_event events[],
uint16_t num)
{
return __dlb2_event_enqueue_burst(event_port, events, num, true);
}
static void
dlb2_event_release(struct dlb2_eventdev *dlb2,
uint8_t port_id,
int n)
{
struct process_local_port_data *port_data;
struct dlb2_eventdev_port *ev_port;
struct dlb2_port *qm_port;
int i;
if (port_id > dlb2->num_ports) {
DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
port_id);
rte_errno = -EINVAL;
return;
}
ev_port = &dlb2->ev_ports[port_id];
qm_port = &ev_port->qm_port;
port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
i = 0;
if (qm_port->is_directed) {
i = n;
goto sw_credit_update;
}
while (i < n) {
int pop_offs = 0;
int j = 0;
/* Zero-out QEs */
qm_port->qe4[0].cmd_byte = 0;
qm_port->qe4[1].cmd_byte = 0;
qm_port->qe4[2].cmd_byte = 0;
qm_port->qe4[3].cmd_byte = 0;
for (; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
int16_t thresh = qm_port->token_pop_thresh;
if (qm_port->token_pop_mode == DELAYED_POP &&
qm_port->issued_releases >= thresh - 1) {
/* Insert the token pop QE */
dlb2_construct_token_pop_qe(qm_port, j);
/* Reset the releases for the next QE batch */
qm_port->issued_releases -= thresh;
pop_offs = 1;
j++;
break;
}
qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
qm_port->issued_releases++;
}
dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
/* Don't include the token pop QE in the release count */
i += j - pop_offs;
}
sw_credit_update:
/* each release returns one credit */
if (!ev_port->outstanding_releases) {
DLB2_LOG_ERR("%s: Outstanding releases underflowed.\n",
__func__);
return;
}
ev_port->outstanding_releases -= i;
ev_port->inflight_credits += i;
/* Replenish s/w credits if enough releases are performed */
dlb2_replenish_sw_credits(dlb2, ev_port);
}
static inline void
@ -3067,86 +3233,6 @@ dlb2_inc_cq_idx(struct dlb2_port *qm_port, int cnt)
qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
}
static int
dlb2_event_release(struct dlb2_eventdev *dlb2,
uint8_t port_id,
int n)
{
struct process_local_port_data *port_data;
struct dlb2_eventdev_port *ev_port;
struct dlb2_port *qm_port;
int i, cnt;
if (port_id > dlb2->num_ports) {
DLB2_LOG_ERR("Invalid port id %d in dlb2-event_release\n",
port_id);
rte_errno = -EINVAL;
return rte_errno;
}
ev_port = &dlb2->ev_ports[port_id];
qm_port = &ev_port->qm_port;
port_data = &dlb2_port[qm_port->id][PORT_TYPE(qm_port)];
cnt = 0;
if (qm_port->is_directed) {
cnt = n;
goto sw_credit_update;
}
for (i = 0; i < n; i += DLB2_NUM_QES_PER_CACHE_LINE) {
int j;
/* Zero-out QEs */
qm_port->qe4[0].cmd_byte = 0;
qm_port->qe4[1].cmd_byte = 0;
qm_port->qe4[2].cmd_byte = 0;
qm_port->qe4[3].cmd_byte = 0;
for (j = 0; j < DLB2_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++)
qm_port->qe4[j].cmd_byte = DLB2_COMP_CMD_BYTE;
qm_port->issued_releases += j;
if (j == 0)
break;
if (qm_port->token_pop_mode == DELAYED_POP && j < 4 &&
qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
dlb2_construct_token_pop_qe(qm_port, j);
/* Reset the releases counter for the next QE batch */
qm_port->issued_releases -= qm_port->token_pop_thresh;
}
dlb2_hw_do_enqueue(qm_port, i == 0, port_data);
cnt += j;
}
if (qm_port->token_pop_mode == DELAYED_POP &&
qm_port->issued_releases >= qm_port->token_pop_thresh - 1) {
dlb2_consume_qe_immediate(qm_port, qm_port->owed_tokens);
qm_port->issued_releases -= qm_port->token_pop_thresh;
}
sw_credit_update:
/* each release returns one credit */
if (!ev_port->outstanding_releases) {
DLB2_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
rte_errno = -ENOTRECOVERABLE;
return rte_errno;
}
ev_port->outstanding_releases -= cnt;
ev_port->inflight_credits += cnt;
/* Replenish s/w credits if enough releases are performed */
dlb2_replenish_sw_credits(dlb2, ev_port);
return 0;
}
static inline int16_t
dlb2_hw_dequeue_sparse(struct dlb2_eventdev *dlb2,
struct dlb2_eventdev_port *ev_port,
@ -3367,8 +3453,7 @@ dlb2_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
uint16_t out_rels = ev_port->outstanding_releases;
if (dlb2_event_release(dlb2, ev_port->id, out_rels))
return 0; /* rte_errno is set */
dlb2_event_release(dlb2, ev_port->id, out_rels);
DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
}
@ -3405,8 +3490,7 @@ dlb2_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
uint16_t out_rels = ev_port->outstanding_releases;
if (dlb2_event_release(dlb2, ev_port->id, out_rels))
return 0; /* rte_errno is set */
dlb2_event_release(dlb2, ev_port->id, out_rels);
DLB2_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
}

View File

@ -1320,7 +1320,7 @@ test_delayed_pop(void)
}
}
/* Dequeue dequeue_depth events but only release dequeue_depth - 2.
/* Dequeue dequeue_depth events but only release dequeue_depth - 1.
* Delayed pop won't perform the pop and no more events will be
* scheduled.
*/
@ -1336,7 +1336,7 @@ test_delayed_pop(void)
ev.op = RTE_EVENT_OP_RELEASE;
for (i = 0; i < port_conf.dequeue_depth - 2; i++) {
for (i = 0; i < port_conf.dequeue_depth - 1; i++) {
if (rte_event_enqueue_burst(evdev, 0, &ev, 1) != 1) {
printf("%d: RELEASE enqueue expected to succeed\n",
__LINE__);