diff --git a/drivers/crypto/scheduler/scheduler_failover.c b/drivers/crypto/scheduler/scheduler_failover.c index 2a0e29fa72..7fadcf66d0 100644 --- a/drivers/crypto/scheduler/scheduler_failover.c +++ b/drivers/crypto/scheduler/scheduler_failover.c @@ -16,18 +16,19 @@ struct fo_scheduler_qp_ctx { struct scheduler_worker primary_worker; struct scheduler_worker secondary_worker; + uint8_t primary_worker_index; + uint8_t secondary_worker_index; uint8_t deq_idx; }; static __rte_always_inline uint16_t failover_worker_enqueue(struct scheduler_worker *worker, - struct rte_crypto_op **ops, uint16_t nb_ops) + struct rte_crypto_op **ops, uint16_t nb_ops, uint8_t index) { - uint16_t i, processed_ops; + uint16_t processed_ops; - for (i = 0; i < nb_ops && i < 4; i++) - rte_prefetch0(ops[i]->sym->session); + scheduler_set_worker_session(ops, nb_ops, index); processed_ops = rte_cryptodev_enqueue_burst(worker->dev_id, worker->qp_id, ops, nb_ops); @@ -47,13 +48,14 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) return 0; enqueued_ops = failover_worker_enqueue(&qp_ctx->primary_worker, - ops, nb_ops); + ops, nb_ops, PRIMARY_WORKER_IDX); if (enqueued_ops < nb_ops) enqueued_ops += failover_worker_enqueue( &qp_ctx->secondary_worker, &ops[enqueued_ops], - nb_ops - enqueued_ops); + nb_ops - enqueued_ops, + SECONDARY_WORKER_IDX); return enqueued_ops; } @@ -94,7 +96,7 @@ schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) qp_ctx->deq_idx = (~qp_ctx->deq_idx) & WORKER_SWITCH_MASK; if (nb_deq_ops == nb_ops) - return nb_deq_ops; + goto retrieve_session; worker = workers[qp_ctx->deq_idx]; @@ -104,6 +106,9 @@ schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) worker->nb_inflight_cops -= nb_deq_ops2; } +retrieve_session: + scheduler_retrieve_session(ops, nb_deq_ops + nb_deq_ops2); + return nb_deq_ops + nb_deq_ops2; } diff --git a/drivers/crypto/scheduler/scheduler_multicore.c b/drivers/crypto/scheduler/scheduler_multicore.c index 900ab4049d..3dea850661 100644 --- a/drivers/crypto/scheduler/scheduler_multicore.c +++ b/drivers/crypto/scheduler/scheduler_multicore.c @@ -183,11 +183,19 @@ mc_scheduler_worker(struct rte_cryptodev *dev) while (!mc_ctx->stop_signal) { if (pending_enq_ops) { + scheduler_set_worker_session( + &enq_ops[pending_enq_ops_idx], pending_enq_ops, + worker_idx); processed_ops = rte_cryptodev_enqueue_burst(worker->dev_id, worker->qp_id, &enq_ops[pending_enq_ops_idx], pending_enq_ops); + if (processed_ops < pending_deq_ops) + scheduler_retrieve_session( + &enq_ops[pending_enq_ops_idx + + processed_ops], + pending_deq_ops - processed_ops); pending_enq_ops -= processed_ops; pending_enq_ops_idx += processed_ops; inflight_ops += processed_ops; @@ -195,9 +203,16 @@ mc_scheduler_worker(struct rte_cryptodev *dev) processed_ops = rte_ring_dequeue_burst(enq_ring, (void *)enq_ops, MC_SCHED_BUFFER_SIZE, NULL); if (processed_ops) { + scheduler_set_worker_session(enq_ops, + processed_ops, worker_idx); pending_enq_ops_idx = rte_cryptodev_enqueue_burst( worker->dev_id, worker->qp_id, enq_ops, processed_ops); + if (pending_enq_ops_idx < processed_ops) + scheduler_retrieve_session( + enq_ops + pending_enq_ops_idx, + processed_ops - + pending_enq_ops_idx); pending_enq_ops = processed_ops - pending_enq_ops_idx; inflight_ops += pending_enq_ops_idx; } @@ -214,6 +229,8 @@ mc_scheduler_worker(struct rte_cryptodev *dev) worker->dev_id, worker->qp_id, deq_ops, MC_SCHED_BUFFER_SIZE); if (processed_ops) { + scheduler_retrieve_session(deq_ops, + processed_ops); inflight_ops -= processed_ops; if (reordering_enabled) { uint16_t j; diff --git a/drivers/crypto/scheduler/scheduler_pkt_size_distr.c b/drivers/crypto/scheduler/scheduler_pkt_size_distr.c index 933a5c6978..9204f6f608 100644 --- a/drivers/crypto/scheduler/scheduler_pkt_size_distr.c +++ b/drivers/crypto/scheduler/scheduler_pkt_size_distr.c @@ -48,34 +48,54 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) }; struct psd_schedule_op *p_enq_op; uint16_t i, processed_ops_pri = 0, processed_ops_sec = 0; - uint32_t job_len; if (unlikely(nb_ops == 0)) return 0; for (i = 0; i < nb_ops && i < 4; i++) { rte_prefetch0(ops[i]->sym); - rte_prefetch0(ops[i]->sym->session); + rte_prefetch0((uint8_t *)ops[i]->sym->session + + sizeof(struct rte_cryptodev_sym_session)); } for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) { + struct scheduler_session_ctx *sess_ctx[4]; + uint8_t target[4]; + uint32_t job_len[4]; + rte_prefetch0(ops[i + 4]->sym); - rte_prefetch0(ops[i + 4]->sym->session); + rte_prefetch0((uint8_t *)ops[i + 4]->sym->session + + sizeof(struct rte_cryptodev_sym_session)); rte_prefetch0(ops[i + 5]->sym); - rte_prefetch0(ops[i + 5]->sym->session); + rte_prefetch0((uint8_t *)ops[i + 5]->sym->session + + sizeof(struct rte_cryptodev_sym_session)); rte_prefetch0(ops[i + 6]->sym); - rte_prefetch0(ops[i + 6]->sym->session); + rte_prefetch0((uint8_t *)ops[i + 6]->sym->session + + sizeof(struct rte_cryptodev_sym_session)); rte_prefetch0(ops[i + 7]->sym); - rte_prefetch0(ops[i + 7]->sym->session); + rte_prefetch0((uint8_t *)ops[i + 7]->sym->session + + sizeof(struct rte_cryptodev_sym_session)); + + sess_ctx[0] = (void *)ops[i]->sym->session->driver_priv_data; + sess_ctx[1] = + (void *)ops[i + 1]->sym->session->driver_priv_data; + sess_ctx[2] = + (void *)ops[i + 2]->sym->session->driver_priv_data; + sess_ctx[3] = + (void *)ops[i + 3]->sym->session->driver_priv_data; /* job_len is initialized as cipher data length, once * it is 0, equals to auth data length */ - job_len = ops[i]->sym->cipher.data.length; - job_len += (ops[i]->sym->cipher.data.length == 0) * + job_len[0] = ops[i]->sym->cipher.data.length; + job_len[0] += (ops[i]->sym->cipher.data.length == 0) * ops[i]->sym->auth.data.length; /* decide the target op based on the job length */ - p_enq_op = &enq_ops[!(job_len & psd_qp_ctx->threshold)]; + target[0] = !(job_len[0] & psd_qp_ctx->threshold); + if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + ops[i]->sym->session = + sess_ctx[0]->worker_sess[target[0]]; + p_enq_op = &enq_ops[target[0]]; /* stop schedule cops before the queue is full, this shall * prevent the failed enqueue @@ -89,10 +109,14 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i]; p_enq_op->pos++; - job_len = ops[i+1]->sym->cipher.data.length; - job_len += (ops[i+1]->sym->cipher.data.length == 0) * + job_len[1] = ops[i + 1]->sym->cipher.data.length; + job_len[1] += (ops[i + 1]->sym->cipher.data.length == 0) * ops[i+1]->sym->auth.data.length; - p_enq_op = &enq_ops[!(job_len & psd_qp_ctx->threshold)]; + target[1] = !(job_len[1] & psd_qp_ctx->threshold); + if (ops[i + 1]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + ops[i + 1]->sym->session = + sess_ctx[1]->worker_sess[target[1]]; + p_enq_op = &enq_ops[target[1]]; if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] == qp_ctx->max_nb_objs) { @@ -103,10 +127,14 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i+1]; p_enq_op->pos++; - job_len = ops[i+2]->sym->cipher.data.length; - job_len += (ops[i+2]->sym->cipher.data.length == 0) * - ops[i+2]->sym->auth.data.length; - p_enq_op = &enq_ops[!(job_len & psd_qp_ctx->threshold)]; + job_len[2] = ops[i + 2]->sym->cipher.data.length; + job_len[2] += (ops[i + 2]->sym->cipher.data.length == 0) * + ops[i + 2]->sym->auth.data.length; + target[2] = !(job_len[2] & psd_qp_ctx->threshold); + if (ops[i + 2]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + ops[i + 2]->sym->session = + sess_ctx[2]->worker_sess[target[2]]; + p_enq_op = &enq_ops[target[2]]; if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] == qp_ctx->max_nb_objs) { @@ -117,10 +145,14 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) sched_ops[p_enq_op->worker_idx][p_enq_op->pos] = ops[i+2]; p_enq_op->pos++; - job_len = ops[i+3]->sym->cipher.data.length; - job_len += (ops[i+3]->sym->cipher.data.length == 0) * - ops[i+3]->sym->auth.data.length; - p_enq_op = &enq_ops[!(job_len & psd_qp_ctx->threshold)]; + job_len[3] = ops[i + 3]->sym->cipher.data.length; + job_len[3] += (ops[i + 3]->sym->cipher.data.length == 0) * + ops[i + 3]->sym->auth.data.length; + target[3] = !(job_len[3] & psd_qp_ctx->threshold); + if (ops[i + 3]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + ops[i + 3]->sym->session = + sess_ctx[1]->worker_sess[target[3]]; + p_enq_op = &enq_ops[target[3]]; if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] == qp_ctx->max_nb_objs) { @@ -133,10 +165,18 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) } for (; i < nb_ops; i++) { + struct scheduler_session_ctx *sess_ctx = + (void *)ops[i]->sym->session->driver_priv_data; + uint32_t job_len; + uint8_t target; + job_len = ops[i]->sym->cipher.data.length; job_len += (ops[i]->sym->cipher.data.length == 0) * ops[i]->sym->auth.data.length; - p_enq_op = &enq_ops[!(job_len & psd_qp_ctx->threshold)]; + target = !(job_len & psd_qp_ctx->threshold); + if (ops[i]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + ops[i]->sym->session = sess_ctx->worker_sess[target]; + p_enq_op = &enq_ops[target]; if (p_enq_op->pos + in_flight_ops[p_enq_op->worker_idx] == qp_ctx->max_nb_objs) { @@ -199,6 +239,7 @@ schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) if (worker->nb_inflight_cops) { nb_deq_ops_pri = rte_cryptodev_dequeue_burst(worker->dev_id, worker->qp_id, ops, nb_ops); + scheduler_retrieve_session(ops, nb_deq_ops_pri); worker->nb_inflight_cops -= nb_deq_ops_pri; } @@ -213,6 +254,7 @@ schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) nb_deq_ops_sec = rte_cryptodev_dequeue_burst(worker->dev_id, worker->qp_id, &ops[nb_deq_ops_pri], nb_ops - nb_deq_ops_pri); + scheduler_retrieve_session(ops, nb_deq_ops_sec); worker->nb_inflight_cops -= nb_deq_ops_sec; if (!worker->nb_inflight_cops) diff --git a/drivers/crypto/scheduler/scheduler_pmd_ops.c b/drivers/crypto/scheduler/scheduler_pmd_ops.c index 03df424140..b3b05e08b7 100644 --- a/drivers/crypto/scheduler/scheduler_pmd_ops.c +++ b/drivers/crypto/scheduler/scheduler_pmd_ops.c @@ -9,6 +9,7 @@ #include #include #include +#include #include "scheduler_pmd_private.h" @@ -469,19 +470,113 @@ scheduler_pmd_sym_session_get_size(struct rte_cryptodev *dev __rte_unused) return max_priv_sess_size; } +struct scheduler_configured_sess_info { + uint8_t dev_id; + uint8_t driver_id; + struct rte_cryptodev_sym_session *sess; +}; + static int -scheduler_pmd_sym_session_configure(struct rte_cryptodev *dev __rte_unused, - struct rte_crypto_sym_xform *xform __rte_unused, - struct rte_cryptodev_sym_session *sess __rte_unused) +scheduler_pmd_sym_session_configure(struct rte_cryptodev *dev, + struct rte_crypto_sym_xform *xform, + struct rte_cryptodev_sym_session *sess) { + struct scheduler_ctx *sched_ctx = dev->data->dev_private; + struct rte_mempool *mp = rte_mempool_from_obj(sess); + struct scheduler_session_ctx *sess_ctx = (void *)sess->driver_priv_data; + struct scheduler_configured_sess_info configured_sess[ + RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS] = {{0}}; + uint32_t i, j, n_configured_sess = 0; + int ret = 0; + + if (mp == NULL) + return -EINVAL; + + for (i = 0; i < sched_ctx->nb_workers; i++) { + struct scheduler_worker *worker = &sched_ctx->workers[i]; + struct rte_cryptodev_sym_session *worker_sess; + uint8_t next_worker = 0; + + for (j = 0; j < n_configured_sess; j++) { + if (configured_sess[j].driver_id == + worker->driver_id) { + sess_ctx->worker_sess[i] = + configured_sess[j].sess; + next_worker = 1; + break; + } + } + if (next_worker) + continue; + + if (rte_mempool_avail_count(mp) == 0) { + ret = -ENOMEM; + goto error_exit; + } + + worker_sess = rte_cryptodev_sym_session_create(worker->dev_id, + xform, mp); + if (worker_sess == NULL) { + ret = -rte_errno; + goto error_exit; + } + + worker_sess->opaque_data = (uint64_t)sess; + sess_ctx->worker_sess[i] = worker_sess; + configured_sess[n_configured_sess].driver_id = + worker->driver_id; + configured_sess[n_configured_sess].dev_id = worker->dev_id; + configured_sess[n_configured_sess].sess = worker_sess; + n_configured_sess++; + } + return 0; +error_exit: + sess_ctx->ref_cnt = sched_ctx->ref_cnt; + for (i = 0; i < n_configured_sess; i++) + rte_cryptodev_sym_session_free(configured_sess[i].dev_id, + configured_sess[i].sess); + return ret; } /** Clear the memory of session so it doesn't leave key material behind */ static void -scheduler_pmd_sym_session_clear(struct rte_cryptodev *dev __rte_unused, - struct rte_cryptodev_sym_session *sess __rte_unused) -{} +scheduler_pmd_sym_session_clear(struct rte_cryptodev *dev, + struct rte_cryptodev_sym_session *sess) +{ + struct scheduler_ctx *sched_ctx = dev->data->dev_private; + struct scheduler_session_ctx *sess_ctx = (void *)sess->driver_priv_data; + struct scheduler_configured_sess_info deleted_sess[ + RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS] = {{0}}; + uint32_t i, j, n_deleted_sess = 0; + + if (sched_ctx->ref_cnt != sess_ctx->ref_cnt) { + CR_SCHED_LOG(WARNING, + "Worker updated between session creation/deletion. " + "The session may not be freed fully."); + } + + for (i = 0; i < sched_ctx->nb_workers; i++) { + struct scheduler_worker *worker = &sched_ctx->workers[i]; + uint8_t next_worker = 0; + + for (j = 0; j < n_deleted_sess; j++) { + if (deleted_sess[j].driver_id == worker->driver_id) { + sess_ctx->worker_sess[i] = NULL; + next_worker = 1; + break; + } + } + if (next_worker) + continue; + + rte_cryptodev_sym_session_free(worker->dev_id, + sess_ctx->worker_sess[i]); + + deleted_sess[n_deleted_sess++].driver_id = worker->driver_id; + sess_ctx->worker_sess[i] = NULL; + } +} static struct rte_cryptodev_ops scheduler_pmd_ops = { .dev_configure = scheduler_pmd_config, diff --git a/drivers/crypto/scheduler/scheduler_pmd_private.h b/drivers/crypto/scheduler/scheduler_pmd_private.h index 4d33b9ab44..0e508727a4 100644 --- a/drivers/crypto/scheduler/scheduler_pmd_private.h +++ b/drivers/crypto/scheduler/scheduler_pmd_private.h @@ -22,7 +22,6 @@ struct scheduler_worker { uint8_t dev_id; uint16_t qp_id; uint32_t nb_inflight_cops; - uint8_t driver_id; }; @@ -37,6 +36,8 @@ struct scheduler_ctx { struct scheduler_worker workers[RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS]; uint32_t nb_workers; + /* reference count when the workers are incremented/decremented */ + uint32_t ref_cnt; enum rte_cryptodev_scheduler_mode mode; @@ -61,6 +62,11 @@ struct scheduler_qp_ctx { struct rte_ring *order_ring; } __rte_cache_aligned; +struct scheduler_session_ctx { + uint32_t ref_cnt; + struct rte_cryptodev_sym_session *worker_sess[ + RTE_CRYPTODEV_SCHEDULER_MAX_NB_WORKERS]; +}; extern uint8_t cryptodev_scheduler_driver_id; @@ -101,6 +107,118 @@ scheduler_order_drain(struct rte_ring *order_ring, return nb_ops_to_deq; } +static __rte_always_inline void +scheduler_set_worker_session(struct rte_crypto_op **ops, uint16_t nb_ops, + uint8_t worker_index) +{ + struct rte_crypto_op **op = ops; + uint16_t n = nb_ops; + + if (n >= 4) { + rte_prefetch0(op[0]->sym->session); + rte_prefetch0(op[1]->sym->session); + rte_prefetch0(op[2]->sym->session); + rte_prefetch0(op[3]->sym->session); + } + + while (n >= 4) { + if (n >= 8) { + rte_prefetch0(op[4]->sym->session); + rte_prefetch0(op[5]->sym->session); + rte_prefetch0(op[6]->sym->session); + rte_prefetch0(op[7]->sym->session); + } + + if (op[0]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { + struct scheduler_session_ctx *sess_ctx = + (void *)op[0]->sym->session->driver_priv_data; + op[0]->sym->session = + sess_ctx->worker_sess[worker_index]; + } + + if (op[1]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { + struct scheduler_session_ctx *sess_ctx = + (void *)op[1]->sym->session->driver_priv_data; + op[1]->sym->session = + sess_ctx->worker_sess[worker_index]; + } + + if (op[2]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { + struct scheduler_session_ctx *sess_ctx = + (void *)op[2]->sym->session->driver_priv_data; + op[2]->sym->session = + sess_ctx->worker_sess[worker_index]; + } + + if (op[3]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { + struct scheduler_session_ctx *sess_ctx = + (void *)op[3]->sym->session->driver_priv_data; + op[3]->sym->session = + sess_ctx->worker_sess[worker_index]; + } + + op += 4; + n -= 4; + } + + while (n--) { + if (op[0]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) { + struct scheduler_session_ctx *sess_ctx = + (void *)op[0]->sym->session->driver_priv_data; + + op[0]->sym->session = + sess_ctx->worker_sess[worker_index]; + op++; + } + } +} + +static __rte_always_inline void +scheduler_retrieve_session(struct rte_crypto_op **ops, uint16_t nb_ops) +{ + uint16_t n = nb_ops; + struct rte_crypto_op **op = ops; + + if (n >= 4) { + rte_prefetch0(op[0]->sym->session); + rte_prefetch0(op[1]->sym->session); + rte_prefetch0(op[2]->sym->session); + rte_prefetch0(op[3]->sym->session); + } + + while (n >= 4) { + if (n >= 8) { + rte_prefetch0(op[4]->sym->session); + rte_prefetch0(op[5]->sym->session); + rte_prefetch0(op[6]->sym->session); + rte_prefetch0(op[7]->sym->session); + } + + if (op[0]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + op[0]->sym->session = + (void *)op[0]->sym->session->opaque_data; + if (op[1]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + op[1]->sym->session = + (void *)op[1]->sym->session->opaque_data; + if (op[2]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + op[2]->sym->session = + (void *)op[2]->sym->session->opaque_data; + if (op[3]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + op[3]->sym->session = + (void *)op[3]->sym->session->opaque_data; + + op += 4; + n -= 4; + } + + while (n--) { + if (op[0]->sess_type == RTE_CRYPTO_OP_WITH_SESSION) + op[0]->sym->session = + (void *)op[0]->sym->session->opaque_data; + op++; + } +} + /** device specific operations function pointer structure */ extern struct rte_cryptodev_ops *rte_crypto_scheduler_pmd_ops; diff --git a/drivers/crypto/scheduler/scheduler_roundrobin.c b/drivers/crypto/scheduler/scheduler_roundrobin.c index ace2dec2ec..ad3f8b842a 100644 --- a/drivers/crypto/scheduler/scheduler_roundrobin.c +++ b/drivers/crypto/scheduler/scheduler_roundrobin.c @@ -23,16 +23,17 @@ schedule_enqueue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) ((struct scheduler_qp_ctx *)qp)->private_qp_ctx; uint32_t worker_idx = rr_qp_ctx->last_enq_worker_idx; struct scheduler_worker *worker = &rr_qp_ctx->workers[worker_idx]; - uint16_t i, processed_ops; + uint16_t processed_ops; if (unlikely(nb_ops == 0)) return 0; - for (i = 0; i < nb_ops && i < 4; i++) - rte_prefetch0(ops[i]->sym->session); - + scheduler_set_worker_session(ops, nb_ops, worker_idx); processed_ops = rte_cryptodev_enqueue_burst(worker->dev_id, worker->qp_id, ops, nb_ops); + if (processed_ops < nb_ops) + scheduler_retrieve_session(ops + processed_ops, + nb_ops - processed_ops); worker->nb_inflight_cops += processed_ops; @@ -86,7 +87,7 @@ schedule_dequeue(void *qp, struct rte_crypto_op **ops, uint16_t nb_ops) nb_deq_ops = rte_cryptodev_dequeue_burst(worker->dev_id, worker->qp_id, ops, nb_ops); - + scheduler_retrieve_session(ops, nb_deq_ops); last_worker_idx += 1; last_worker_idx %= rr_qp_ctx->nb_workers;