net/mlx5: add reference counter on DPDK Tx queues

Use the same design for DPDK queue as for Verbs queue for symmetry, this
also helps in fixing some issues like the DPDK release queue API which
is not expected to fail.  With such design, the queue is released when
the reference counters reaches 0.

Signed-off-by: Nelio Laranjeiro <nelio.laranjeiro@6wind.com>
Acked-by: Yongseok Koh <yskoh@mellanox.com>
This commit is contained in:
Nélio Laranjeiro 2017-10-09 16:44:48 +02:00 committed by Ferruh Yigit
parent faf2667fe8
commit 6e78005a9b
6 changed files with 404 additions and 262 deletions

View File

@ -225,17 +225,8 @@ mlx5_dev_close(struct rte_eth_dev *dev)
if (priv->txqs != NULL) {
/* XXX race condition if mlx5_tx_burst() is still running. */
usleep(1000);
for (i = 0; (i != priv->txqs_n); ++i) {
struct mlx5_txq_data *txq = (*priv->txqs)[i];
struct mlx5_txq_ctrl *txq_ctrl;
if (txq == NULL)
continue;
txq_ctrl = container_of(txq, struct mlx5_txq_ctrl, txq);
(*priv->txqs)[i] = NULL;
mlx5_txq_cleanup(txq_ctrl);
rte_free(txq_ctrl);
}
for (i = 0; (i != priv->txqs_n); ++i)
mlx5_priv_txq_release(priv, i);
priv->txqs_n = 0;
priv->txqs = NULL;
}
@ -259,6 +250,9 @@ mlx5_dev_close(struct rte_eth_dev *dev)
ret = mlx5_priv_txq_ibv_verify(priv);
if (ret)
WARN("%p: some Verbs Tx queue still remain", (void *)priv);
ret = mlx5_priv_txq_verify(priv);
if (ret)
WARN("%p: some Tx Queues still remain", (void *)priv);
ret = priv_flow_verify(priv);
if (ret)
WARN("%p: some flows still remain", (void *)priv);

View File

@ -148,6 +148,7 @@ struct priv {
TAILQ_HEAD(mlx5_flows, rte_flow) flows; /* RTE Flow rules. */
LIST_HEAD(mr, mlx5_mr) mr; /* Memory region. */
LIST_HEAD(rxqibv, mlx5_rxq_ibv) rxqsibv; /* Verbs Rx queues. */
LIST_HEAD(txq, mlx5_txq_ctrl) txqsctrl; /* DPDK Tx queues. */
LIST_HEAD(txqibv, mlx5_txq_ibv) txqsibv; /* Verbs Tx queues. */
uint32_t link_speed_capa; /* Link speed capabilities. */
struct mlx5_xstats_ctrl xstats_ctrl; /* Extended stats control. */

View File

@ -111,6 +111,60 @@ static int mlx5_check_mempool(struct rte_mempool *mp, uintptr_t *start,
return data.ret;
}
/**
* Register a Memory Region (MR) <-> Memory Pool (MP) association in
* txq->mp2mr[]. If mp2mr[] is full, remove an entry first.
*
* This function should only be called by txq_mp2mr().
*
* @param priv
* Pointer to private structure.
* @param txq
* Pointer to TX queue structure.
* @param[in] mp
* Memory Pool for which a Memory Region lkey must be returned.
* @param idx
* Index of the next available entry.
*
* @return
* mr on success, NULL on failure.
*/
struct mlx5_mr*
priv_txq_mp2mr_reg(struct priv *priv, struct mlx5_txq_data *txq,
struct rte_mempool *mp, unsigned int idx)
{
struct mlx5_txq_ctrl *txq_ctrl =
container_of(txq, struct mlx5_txq_ctrl, txq);
struct mlx5_mr *mr;
/* Add a new entry, register MR first. */
DEBUG("%p: discovered new memory pool \"%s\" (%p)",
(void *)txq_ctrl, mp->name, (void *)mp);
mr = priv_mr_get(priv, mp);
if (mr == NULL)
mr = priv_mr_new(priv, mp);
if (unlikely(mr == NULL)) {
DEBUG("%p: unable to configure MR, ibv_reg_mr() failed.",
(void *)txq_ctrl);
return NULL;
}
if (unlikely(idx == RTE_DIM(txq->mp2mr))) {
/* Table is full, remove oldest entry. */
DEBUG("%p: MR <-> MP table full, dropping oldest entry.",
(void *)txq_ctrl);
--idx;
priv_mr_release(priv, txq->mp2mr[0]);
memmove(&txq->mp2mr[0], &txq->mp2mr[1],
(sizeof(txq->mp2mr) - sizeof(txq->mp2mr[0])));
}
/* Store the new entry. */
txq_ctrl->txq.mp2mr[idx] = mr;
DEBUG("%p: new MR lkey for MP \"%s\" (%p): 0x%08" PRIu32,
(void *)txq_ctrl, mp->name, (void *)mp,
txq_ctrl->txq.mp2mr[idx]->lkey);
return mr;
}
/**
* Register a Memory Region (MR) <-> Memory Pool (MP) association in
* txq->mp2mr[]. If mp2mr[] is full, remove an entry first.
@ -135,35 +189,13 @@ mlx5_txq_mp2mr_reg(struct mlx5_txq_data *txq, struct rte_mempool *mp,
container_of(txq, struct mlx5_txq_ctrl, txq);
struct mlx5_mr *mr;
/* Add a new entry, register MR first. */
DEBUG("%p: discovered new memory pool \"%s\" (%p)",
(void *)txq_ctrl, mp->name, (void *)mp);
mr = priv_mr_get(txq_ctrl->priv, mp);
if (mr == NULL)
mr = priv_mr_new(txq_ctrl->priv, mp);
if (unlikely(mr == NULL)) {
DEBUG("%p: unable to configure MR, ibv_reg_mr() failed.",
(void *)txq_ctrl);
return NULL;
}
if (unlikely(idx == RTE_DIM(txq->mp2mr))) {
/* Table is full, remove oldest entry. */
DEBUG("%p: MR <-> MP table full, dropping oldest entry.",
(void *)txq_ctrl);
--idx;
priv_mr_release(txq_ctrl->priv, txq->mp2mr[0]);
memmove(&txq->mp2mr[0], &txq->mp2mr[1],
(sizeof(txq->mp2mr) - sizeof(txq->mp2mr[0])));
}
/* Store the new entry. */
txq_ctrl->txq.mp2mr[idx] = mr;
DEBUG("%p: new MR lkey for MP \"%s\" (%p): 0x%08" PRIu32,
(void *)txq_ctrl, mp->name, (void *)mp,
txq_ctrl->txq.mp2mr[idx]->lkey);
priv_lock(txq_ctrl->priv);
mr = priv_txq_mp2mr_reg(txq_ctrl->priv, txq, mp, idx);
priv_unlock(txq_ctrl->priv);
return mr;
}
struct txq_mp2mr_mbuf_check_data {
struct mlx5_mp2mr_mbuf_check_data {
int ret;
};
@ -185,7 +217,7 @@ static void
txq_mp2mr_mbuf_check(struct rte_mempool *mp, void *arg, void *obj,
uint32_t index __rte_unused)
{
struct txq_mp2mr_mbuf_check_data *data = arg;
struct mlx5_mp2mr_mbuf_check_data *data = arg;
struct rte_mbuf *buf = obj;
/*
@ -206,35 +238,24 @@ txq_mp2mr_mbuf_check(struct rte_mempool *mp, void *arg, void *obj,
* Pointer to TX queue structure.
*/
void
mlx5_txq_mp2mr_iter(struct rte_mempool *mp, void *arg)
mlx5_mp2mr_iter(struct rte_mempool *mp, void *arg)
{
struct mlx5_txq_ctrl *txq_ctrl = arg;
struct txq_mp2mr_mbuf_check_data data = {
struct priv *priv = (struct priv *)arg;
struct mlx5_mp2mr_mbuf_check_data data = {
.ret = 0,
};
uintptr_t start;
uintptr_t end;
unsigned int i;
struct mlx5_mr *mr;
/* Register mempool only if the first element looks like a mbuf. */
if (rte_mempool_obj_iter(mp, txq_mp2mr_mbuf_check, &data) == 0 ||
data.ret == -1)
return;
if (mlx5_check_mempool(mp, &start, &end) != 0) {
ERROR("mempool %p: not virtually contiguous",
(void *)mp);
mr = priv_mr_get(priv, mp);
if (mr) {
priv_mr_release(priv, mr);
return;
}
for (i = 0; (i != RTE_DIM(txq_ctrl->txq.mp2mr)); ++i) {
if (unlikely(txq_ctrl->txq.mp2mr[i] == NULL)) {
/* Unknown MP, add a new MR for it. */
break;
}
if (start >= (uintptr_t)txq_ctrl->txq.mp2mr[i]->start &&
end <= (uintptr_t)txq_ctrl->txq.mp2mr[i]->end)
return;
}
mlx5_txq_mp2mr_reg(&txq_ctrl->txq, mp, i);
priv_mr_new(priv, mp);
}
/**

View File

@ -297,6 +297,8 @@ struct mlx5_txq_ibv {
/* TX queue control descriptor. */
struct mlx5_txq_ctrl {
LIST_ENTRY(mlx5_txq_ctrl) next; /* Pointer to the next element. */
rte_atomic32_t refcnt; /* Reference counter. */
struct priv *priv; /* Back pointer to private data. */
unsigned int socket; /* CPU socket ID for allocations. */
unsigned int max_inline_data; /* Max inline data. */
@ -336,9 +338,6 @@ int mlx5_priv_rxq_ibv_verify(struct priv *);
/* mlx5_txq.c */
void mlx5_txq_cleanup(struct mlx5_txq_ctrl *);
int mlx5_txq_ctrl_setup(struct rte_eth_dev *, struct mlx5_txq_ctrl *, uint16_t,
unsigned int, const struct rte_eth_txconf *);
int mlx5_tx_queue_setup(struct rte_eth_dev *, uint16_t, uint16_t, unsigned int,
const struct rte_eth_txconf *);
void mlx5_tx_queue_release(void *);
@ -348,6 +347,14 @@ struct mlx5_txq_ibv *mlx5_priv_txq_ibv_get(struct priv *, uint16_t);
int mlx5_priv_txq_ibv_release(struct priv *, struct mlx5_txq_ibv *);
int mlx5_priv_txq_ibv_releasable(struct priv *, struct mlx5_txq_ibv *);
int mlx5_priv_txq_ibv_verify(struct priv *);
struct mlx5_txq_ctrl *mlx5_priv_txq_new(struct priv *, uint16_t,
uint16_t, unsigned int,
const struct rte_eth_txconf *);
struct mlx5_txq_ctrl *mlx5_priv_txq_get(struct priv *, uint16_t);
int mlx5_priv_txq_release(struct priv *, uint16_t);
int mlx5_priv_txq_releasable(struct priv *, uint16_t);
int mlx5_priv_txq_verify(struct priv *);
void txq_alloc_elts(struct mlx5_txq_ctrl *);
/* mlx5_rxtx.c */
@ -375,7 +382,9 @@ uint16_t mlx5_rx_burst_vec(void *, struct rte_mbuf **, uint16_t);
/* mlx5_mr.c */
void mlx5_txq_mp2mr_iter(struct rte_mempool *, void *);
void mlx5_mp2mr_iter(struct rte_mempool *, void *);
struct mlx5_mr *priv_txq_mp2mr_reg(struct priv *priv, struct mlx5_txq_data *,
struct rte_mempool *, unsigned int);
struct mlx5_mr *mlx5_txq_mp2mr_reg(struct mlx5_txq_data *, struct rte_mempool *,
unsigned int);

View File

@ -41,6 +41,44 @@
#include "mlx5_rxtx.h"
#include "mlx5_utils.h"
static void
priv_txq_stop(struct priv *priv)
{
unsigned int i;
for (i = 0; i != priv->txqs_n; ++i)
mlx5_priv_txq_release(priv, i);
}
static int
priv_txq_start(struct priv *priv)
{
unsigned int i;
int ret = 0;
/* Add memory regions to Tx queues. */
for (i = 0; i != priv->txqs_n; ++i) {
unsigned int idx = 0;
struct mlx5_mr *mr;
struct mlx5_txq_ctrl *txq_ctrl = mlx5_priv_txq_get(priv, i);
if (!txq_ctrl)
continue;
LIST_FOREACH(mr, &priv->mr, next)
priv_txq_mp2mr_reg(priv, &txq_ctrl->txq, mr->mp, idx++);
txq_alloc_elts(txq_ctrl);
txq_ctrl->ibv = mlx5_priv_txq_ibv_new(priv, i);
if (!txq_ctrl->ibv) {
ret = ENOMEM;
goto error;
}
}
return -ret;
error:
priv_txq_stop(priv);
return -ret;
}
/**
* DPDK callback to start the device.
*
@ -56,6 +94,7 @@ int
mlx5_dev_start(struct rte_eth_dev *dev)
{
struct priv *priv = dev->data->dev_private;
struct mlx5_mr *mr = NULL;
int err;
if (mlx5_is_secondary())
@ -63,9 +102,17 @@ mlx5_dev_start(struct rte_eth_dev *dev)
priv_lock(priv);
/* Update Rx/Tx callback. */
priv_dev_select_tx_function(priv, dev);
priv_dev_select_rx_function(priv, dev);
DEBUG("%p: allocating and configuring hash RX queues", (void *)dev);
rte_mempool_walk(mlx5_mp2mr_iter, priv);
err = priv_txq_start(priv);
if (err) {
ERROR("%p: TXQ allocation failed: %s",
(void *)dev, strerror(err));
goto error;
}
/* Update send callback. */
priv_dev_select_tx_function(priv, dev);
err = priv_create_hash_rxqs(priv);
if (!err)
err = priv_rehash_flows(priv);
@ -94,10 +141,13 @@ mlx5_dev_start(struct rte_eth_dev *dev)
return 0;
error:
/* Rollback. */
LIST_FOREACH(mr, &priv->mr, next)
priv_mr_release(priv, mr);
priv_special_flow_disable_all(priv);
priv_mac_addrs_disable(priv);
priv_destroy_hash_rxqs(priv);
priv_flow_stop(priv);
priv_txq_stop(priv);
priv_unlock(priv);
return -err;
}
@ -114,6 +164,7 @@ void
mlx5_dev_stop(struct rte_eth_dev *dev)
{
struct priv *priv = dev->data->dev_private;
struct mlx5_mr *mr;
if (mlx5_is_secondary())
return;
@ -131,6 +182,10 @@ mlx5_dev_stop(struct rte_eth_dev *dev)
priv_destroy_hash_rxqs(priv);
priv_flow_stop(priv);
priv_rx_intr_vec_disable(priv);
priv_txq_stop(priv);
LIST_FOREACH(mr, &priv->mr, next) {
priv_mr_release(priv, mr);
}
priv_dev_interrupt_handler_uninstall(priv, dev);
priv_unlock(priv);
}

View File

@ -65,12 +65,11 @@
*
* @param txq_ctrl
* Pointer to TX queue structure.
* @param elts_n
* Number of elements to allocate.
*/
static void
txq_alloc_elts(struct mlx5_txq_ctrl *txq_ctrl, unsigned int elts_n)
void
txq_alloc_elts(struct mlx5_txq_ctrl *txq_ctrl)
{
const unsigned int elts_n = 1 << txq_ctrl->txq.elts_n;
unsigned int i;
for (i = 0; (i != elts_n); ++i)
@ -116,152 +115,6 @@ txq_free_elts(struct mlx5_txq_ctrl *txq_ctrl)
}
}
/**
* Clean up a TX queue.
*
* Destroy objects, free allocated memory and reset the structure for reuse.
*
* @param txq_ctrl
* Pointer to TX queue structure.
*/
void
mlx5_txq_cleanup(struct mlx5_txq_ctrl *txq_ctrl)
{
size_t i;
DEBUG("cleaning up %p", (void *)txq_ctrl);
txq_free_elts(txq_ctrl);
for (i = 0; (i != RTE_DIM(txq_ctrl->txq.mp2mr)); ++i)
if (txq_ctrl->txq.mp2mr[i])
priv_mr_release(txq_ctrl->priv, txq_ctrl->txq.mp2mr[i]);
if (txq_ctrl->ibv)
mlx5_priv_txq_ibv_release(txq_ctrl->priv, txq_ctrl->ibv);
memset(txq_ctrl, 0, sizeof(*txq_ctrl));
}
/**
* Configure a TX queue.
*
* @param dev
* Pointer to Ethernet device structure.
* @param txq_ctrl
* Pointer to TX queue structure.
* @param desc
* Number of descriptors to configure in queue.
* @param socket
* NUMA socket on which memory must be allocated.
* @param[in] conf
* Thresholds parameters.
*
* @return
* 0 on success, errno value on failure.
*/
int
mlx5_txq_ctrl_setup(struct rte_eth_dev *dev, struct mlx5_txq_ctrl *txq_ctrl,
uint16_t desc, unsigned int socket,
const struct rte_eth_txconf *conf)
{
struct priv *priv = mlx5_get_priv(dev);
struct mlx5_txq_ctrl tmpl = {
.priv = priv,
.socket = socket,
};
const unsigned int max_tso_inline = ((MLX5_MAX_TSO_HEADER +
(RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE);
if (mlx5_getenv_int("MLX5_ENABLE_CQE_COMPRESSION")) {
ERROR("MLX5_ENABLE_CQE_COMPRESSION must never be set");
return ENOTSUP;
}
tmpl.txq.flags = conf->txq_flags;
assert(desc > MLX5_TX_COMP_THRESH);
tmpl.txq.elts_n = log2above(desc);
if (priv->mps == MLX5_MPW_ENHANCED)
tmpl.txq.mpw_hdr_dseg = priv->mpw_hdr_dseg;
/* MRs will be registered in mp2mr[] later. */
DEBUG("priv->device_attr.max_qp_wr is %d",
priv->device_attr.orig_attr.max_qp_wr);
DEBUG("priv->device_attr.max_sge is %d",
priv->device_attr.orig_attr.max_sge);
if (priv->txq_inline && (priv->txqs_n >= priv->txqs_inline)) {
unsigned int ds_cnt;
tmpl.txq.max_inline =
((priv->txq_inline + (RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE);
tmpl.txq.inline_en = 1;
/* TSO and MPS can't be enabled concurrently. */
assert(!priv->tso || !priv->mps);
if (priv->mps == MLX5_MPW_ENHANCED) {
tmpl.txq.inline_max_packet_sz =
priv->inline_max_packet_sz;
/* To minimize the size of data set, avoid requesting
* too large WQ.
*/
tmpl.max_inline_data =
((RTE_MIN(priv->txq_inline,
priv->inline_max_packet_sz) +
(RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE) * RTE_CACHE_LINE_SIZE;
} else if (priv->tso) {
int inline_diff = tmpl.txq.max_inline - max_tso_inline;
/*
* Adjust inline value as Verbs aggregates
* tso_inline and txq_inline fields.
*/
tmpl.max_inline_data = inline_diff > 0 ?
inline_diff *
RTE_CACHE_LINE_SIZE :
0;
} else {
tmpl.max_inline_data =
tmpl.txq.max_inline * RTE_CACHE_LINE_SIZE;
}
/*
* Check if the inline size is too large in a way which
* can make the WQE DS to overflow.
* Considering in calculation:
* WQE CTRL (1 DS)
* WQE ETH (1 DS)
* Inline part (N DS)
*/
ds_cnt = 2 + (tmpl.max_inline_data / MLX5_WQE_DWORD_SIZE);
if (ds_cnt > MLX5_DSEG_MAX) {
unsigned int max_inline = (MLX5_DSEG_MAX - 2) *
MLX5_WQE_DWORD_SIZE;
max_inline = max_inline - (max_inline %
RTE_CACHE_LINE_SIZE);
WARN("txq inline is too large (%d) setting it to "
"the maximum possible: %d\n",
priv->txq_inline, max_inline);
tmpl.txq.max_inline = max_inline / RTE_CACHE_LINE_SIZE;
}
}
if (priv->tso) {
tmpl.max_tso_header = max_tso_inline * RTE_CACHE_LINE_SIZE;
tmpl.txq.max_inline = RTE_MAX(tmpl.txq.max_inline,
max_tso_inline);
tmpl.txq.tso_en = 1;
}
if (priv->tunnel_en)
tmpl.txq.tunnel_en = 1;
tmpl.txq.elts =
(struct rte_mbuf *(*)[1 << tmpl.txq.elts_n])
((uintptr_t)txq_ctrl + sizeof(*txq_ctrl));
txq_alloc_elts(&tmpl, desc);
/* Clean up txq in case we're reinitializing it. */
DEBUG("%p: cleaning-up old txq just in case", (void *)txq_ctrl);
mlx5_txq_cleanup(txq_ctrl);
*txq_ctrl = tmpl;
DEBUG("%p: txq updated with %p", (void *)txq_ctrl, (void *)&tmpl);
/* Pre-register known mempools. */
rte_mempool_walk(mlx5_txq_mp2mr_iter, txq_ctrl);
return 0;
}
/**
* DPDK callback to configure a TX queue.
*
@ -287,7 +140,7 @@ mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
struct mlx5_txq_data *txq = (*priv->txqs)[idx];
struct mlx5_txq_ctrl *txq_ctrl =
container_of(txq, struct mlx5_txq_ctrl, txq);
int ret;
int ret = 0;
if (mlx5_is_secondary())
return -E_RTE_SECONDARY;
@ -314,57 +167,23 @@ mlx5_tx_queue_setup(struct rte_eth_dev *dev, uint16_t idx, uint16_t desc,
priv_unlock(priv);
return -EOVERFLOW;
}
if (txq != NULL) {
DEBUG("%p: reusing already allocated queue index %u (%p)",
(void *)dev, idx, (void *)txq);
if (dev->data->dev_started) {
priv_unlock(priv);
return -EEXIST;
}
(*priv->txqs)[idx] = NULL;
mlx5_txq_cleanup(txq_ctrl);
/* Resize if txq size is changed. */
if (txq_ctrl->txq.elts_n != log2above(desc)) {
txq_ctrl = rte_realloc(txq_ctrl,
sizeof(*txq_ctrl) +
desc * sizeof(struct rte_mbuf *),
RTE_CACHE_LINE_SIZE);
if (!txq_ctrl) {
ERROR("%p: unable to reallocate queue index %u",
(void *)dev, idx);
priv_unlock(priv);
return -ENOMEM;
}
}
} else {
txq_ctrl =
rte_calloc_socket("TXQ", 1,
sizeof(*txq_ctrl) +
desc * sizeof(struct rte_mbuf *),
0, socket);
if (txq_ctrl == NULL) {
ERROR("%p: unable to allocate queue index %u",
(void *)dev, idx);
priv_unlock(priv);
return -ENOMEM;
}
}
ret = mlx5_txq_ctrl_setup(dev, txq_ctrl, desc, socket, conf);
if (ret) {
rte_free(txq_ctrl);
if (!mlx5_priv_txq_releasable(priv, idx)) {
ret = EBUSY;
ERROR("%p: unable to release queue index %u",
(void *)dev, idx);
goto out;
}
mlx5_priv_txq_release(priv, idx);
txq_ctrl = mlx5_priv_txq_new(priv, idx, desc, socket, conf);
if (!txq_ctrl) {
ERROR("%p: unable to allocate queue index %u",
(void *)dev, idx);
ret = ENOMEM;
goto out;
}
txq_ctrl->txq.stats.idx = idx;
DEBUG("%p: adding TX queue %p to list",
(void *)dev, (void *)txq_ctrl);
(*priv->txqs)[idx] = &txq_ctrl->txq;
txq_ctrl->ibv = mlx5_priv_txq_ibv_new(priv, idx);
if (!txq_ctrl->ibv) {
ret = EAGAIN;
goto out;
}
/* Update send callback. */
priv_dev_select_tx_function(priv, priv->dev);
out:
priv_unlock(priv);
return -ret;
@ -396,11 +215,9 @@ mlx5_tx_queue_release(void *dpdk_txq)
if ((*priv->txqs)[i] == txq) {
DEBUG("%p: removing TX queue %p from list",
(void *)priv->dev, (void *)txq_ctrl);
(*priv->txqs)[i] = NULL;
mlx5_priv_txq_release(priv, i);
break;
}
mlx5_txq_cleanup(txq_ctrl);
rte_free(txq_ctrl);
priv_unlock(priv);
}
@ -719,3 +536,248 @@ mlx5_priv_txq_ibv_verify(struct priv *priv)
}
return ret;
}
/**
* Create a DPDK Tx queue.
*
* @param priv
* Pointer to private structure.
* @param idx
* TX queue index.
* @param desc
* Number of descriptors to configure in queue.
* @param socket
* NUMA socket on which memory must be allocated.
* @param[in] conf
* Thresholds parameters.
*
* @return
* A DPDK queue object on success.
*/
struct mlx5_txq_ctrl*
mlx5_priv_txq_new(struct priv *priv, uint16_t idx, uint16_t desc,
unsigned int socket,
const struct rte_eth_txconf *conf)
{
const unsigned int max_tso_inline =
((MLX5_MAX_TSO_HEADER + (RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE);
struct mlx5_txq_ctrl *tmpl;
tmpl = rte_calloc_socket("TXQ", 1,
sizeof(*tmpl) +
desc * sizeof(struct rte_mbuf *),
0, socket);
if (!tmpl)
return NULL;
assert(desc > MLX5_TX_COMP_THRESH);
tmpl->txq.flags = conf->txq_flags;
tmpl->priv = priv;
tmpl->txq.elts_n = log2above(desc);
if (priv->mps == MLX5_MPW_ENHANCED)
tmpl->txq.mpw_hdr_dseg = priv->mpw_hdr_dseg;
/* MRs will be registered in mp2mr[] later. */
DEBUG("priv->device_attr.max_qp_wr is %d",
priv->device_attr.orig_attr.max_qp_wr);
DEBUG("priv->device_attr.max_sge is %d",
priv->device_attr.orig_attr.max_sge);
if (priv->txq_inline && (priv->txqs_n >= priv->txqs_inline)) {
unsigned int ds_cnt;
tmpl->txq.max_inline =
((priv->txq_inline + (RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE);
tmpl->txq.inline_en = 1;
/* TSO and MPS can't be enabled concurrently. */
assert(!priv->tso || !priv->mps);
if (priv->mps == MLX5_MPW_ENHANCED) {
tmpl->txq.inline_max_packet_sz =
priv->inline_max_packet_sz;
/* To minimize the size of data set, avoid requesting
* too large WQ.
*/
tmpl->max_inline_data =
((RTE_MIN(priv->txq_inline,
priv->inline_max_packet_sz) +
(RTE_CACHE_LINE_SIZE - 1)) /
RTE_CACHE_LINE_SIZE) * RTE_CACHE_LINE_SIZE;
} else if (priv->tso) {
int inline_diff = tmpl->txq.max_inline - max_tso_inline;
/*
* Adjust inline value as Verbs aggregates
* tso_inline and txq_inline fields.
*/
tmpl->max_inline_data = inline_diff > 0 ?
inline_diff *
RTE_CACHE_LINE_SIZE :
0;
} else {
tmpl->max_inline_data =
tmpl->txq.max_inline * RTE_CACHE_LINE_SIZE;
}
/*
* Check if the inline size is too large in a way which
* can make the WQE DS to overflow.
* Considering in calculation:
* WQE CTRL (1 DS)
* WQE ETH (1 DS)
* Inline part (N DS)
*/
ds_cnt = 2 + (tmpl->txq.max_inline / MLX5_WQE_DWORD_SIZE);
if (ds_cnt > MLX5_DSEG_MAX) {
unsigned int max_inline = (MLX5_DSEG_MAX - 2) *
MLX5_WQE_DWORD_SIZE;
max_inline = max_inline - (max_inline %
RTE_CACHE_LINE_SIZE);
WARN("txq inline is too large (%d) setting it to "
"the maximum possible: %d\n",
priv->txq_inline, max_inline);
tmpl->txq.max_inline = max_inline / RTE_CACHE_LINE_SIZE;
}
}
if (priv->tso) {
tmpl->max_tso_header = max_tso_inline * RTE_CACHE_LINE_SIZE;
tmpl->txq.max_inline = RTE_MAX(tmpl->txq.max_inline,
max_tso_inline);
tmpl->txq.tso_en = 1;
}
if (priv->tunnel_en)
tmpl->txq.tunnel_en = 1;
tmpl->txq.elts =
(struct rte_mbuf *(*)[1 << tmpl->txq.elts_n])(tmpl + 1);
tmpl->txq.stats.idx = idx;
rte_atomic32_inc(&tmpl->refcnt);
DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
(void *)tmpl, rte_atomic32_read(&tmpl->refcnt));
LIST_INSERT_HEAD(&priv->txqsctrl, tmpl, next);
return tmpl;
}
/**
* Get a Tx queue.
*
* @param priv
* Pointer to private structure.
* @param idx
* TX queue index.
*
* @return
* A pointer to the queue if it exists.
*/
struct mlx5_txq_ctrl*
mlx5_priv_txq_get(struct priv *priv, uint16_t idx)
{
struct mlx5_txq_ctrl *ctrl = NULL;
if ((*priv->txqs)[idx]) {
ctrl = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl,
txq);
unsigned int i;
mlx5_priv_txq_ibv_get(priv, idx);
for (i = 0; i != MLX5_PMD_TX_MP_CACHE; ++i) {
struct mlx5_mr *mr = NULL;
(void)mr;
if (ctrl->txq.mp2mr[i]) {
mr = priv_mr_get(priv, ctrl->txq.mp2mr[i]->mp);
assert(mr);
}
}
rte_atomic32_inc(&ctrl->refcnt);
DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
(void *)ctrl, rte_atomic32_read(&ctrl->refcnt));
}
return ctrl;
}
/**
* Release a Tx queue.
*
* @param priv
* Pointer to private structure.
* @param idx
* TX queue index.
*
* @return
* 0 on success, errno on failure.
*/
int
mlx5_priv_txq_release(struct priv *priv, uint16_t idx)
{
unsigned int i;
struct mlx5_txq_ctrl *txq;
if (!(*priv->txqs)[idx])
return 0;
txq = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl, txq);
DEBUG("%p: Tx queue %p: refcnt %d", (void *)priv,
(void *)txq, rte_atomic32_read(&txq->refcnt));
if (txq->ibv) {
int ret;
ret = mlx5_priv_txq_ibv_release(priv, txq->ibv);
if (!ret)
txq->ibv = NULL;
}
for (i = 0; i != MLX5_PMD_TX_MP_CACHE; ++i) {
if (txq->txq.mp2mr[i]) {
priv_mr_release(priv, txq->txq.mp2mr[i]);
txq->txq.mp2mr[i] = NULL;
}
}
if (rte_atomic32_dec_and_test(&txq->refcnt)) {
txq_free_elts(txq);
LIST_REMOVE(txq, next);
rte_free(txq);
(*priv->txqs)[idx] = NULL;
return 0;
}
return EBUSY;
}
/**
* Verify if the queue can be released.
*
* @param priv
* Pointer to private structure.
* @param idx
* TX queue index.
*
* @return
* 1 if the queue can be released.
*/
int
mlx5_priv_txq_releasable(struct priv *priv, uint16_t idx)
{
struct mlx5_txq_ctrl *txq;
if (!(*priv->txqs)[idx])
return -1;
txq = container_of((*priv->txqs)[idx], struct mlx5_txq_ctrl, txq);
return (rte_atomic32_read(&txq->refcnt) == 1);
}
/**
* Verify the Tx Queue list is empty
*
* @param priv
* Pointer to private structure.
*
* @return the number of object not released.
*/
int
mlx5_priv_txq_verify(struct priv *priv)
{
struct mlx5_txq_ctrl *txq;
int ret = 0;
LIST_FOREACH(txq, &priv->txqsctrl, next) {
DEBUG("%p: Tx Queue %p still referenced", (void *)priv,
(void *)txq);
++ret;
}
return ret;
}