examples/l3fwd: improve grouping by destination port

Latest changes introduced a small degradation for the corner case
when each input packet is destined to the different port.
For the test-case when 1 core manages 4 ports and packet stream looks like:
IPV4_DSTPORT0, IPV4_DSTPORT1, IPV4_DSTPORT3, IPV4_DSTPORT4, IPV4_DSTPORT0, ...
non-optimised code path outperforms optimised one by 2-3%.
These changes supposed to close that gap.
From my testing: now for the case described above optimised code path
produces same numbers as non-optimised one.
For other test-cases numbers remain about the same.

Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
This commit is contained in:
Konstantin Ananyev 2014-07-22 17:04:47 +01:00 committed by Thomas Monjalon
parent 030df0102c
commit 7e7ab81422

View File

@ -1271,6 +1271,168 @@ processx4_step3(struct rte_mbuf *pkt[FWDSTEP], uint16_t dst_port[FWDSTEP])
&dst_port[3], pkt[3]->ol_flags);
}
/*
* We group consecutive packets with the same destionation port into one burst.
* To avoid extra latency this is done together with some other packet
* processing, but after we made a final decision about packet's destination.
* To do this we maintain:
* pnum - array of number of consecutive packets with the same dest port for
* each packet in the input burst.
* lp - pointer to the last updated element in the pnum.
* dlp - dest port value lp corresponds to.
*/
#define GRPSZ (1 << FWDSTEP)
#define GRPMSK (GRPSZ - 1)
#define GROUP_PORT_STEP(dlp, dcp, lp, pn, idx) do { \
if (likely((dlp) == (dcp)[(idx)])) { \
(lp)[0]++; \
} else { \
(dlp) = (dcp)[idx]; \
(lp) = (pn) + (idx); \
(lp)[0] = 1; \
} \
} while (0)
/*
* Group consecutive packets with the same destination port in bursts of 4.
* Suppose we have array of destionation ports:
* dst_port[] = {a, b, c, d,, e, ... }
* dp1 should contain: <a, b, c, d>, dp2: <b, c, d, e>.
* We doing 4 comparisions at once and the result is 4 bit mask.
* This mask is used as an index into prebuild array of pnum values.
*/
static inline uint16_t *
port_groupx4(uint16_t pn[FWDSTEP + 1], uint16_t *lp, __m128i dp1, __m128i dp2)
{
static const struct {
uint64_t pnum; /* prebuild 4 values for pnum[]. */
int32_t idx; /* index for new last updated elemnet. */
uint16_t lpv; /* add value to the last updated element. */
} gptbl[GRPSZ] = {
{
/* 0: a != b, b != c, c != d, d != e */
.pnum = UINT64_C(0x0001000100010001),
.idx = 4,
.lpv = 0,
},
{
/* 1: a == b, b != c, c != d, d != e */
.pnum = UINT64_C(0x0001000100010002),
.idx = 4,
.lpv = 1,
},
{
/* 2: a != b, b == c, c != d, d != e */
.pnum = UINT64_C(0x0001000100020001),
.idx = 4,
.lpv = 0,
},
{
/* 3: a == b, b == c, c != d, d != e */
.pnum = UINT64_C(0x0001000100020003),
.idx = 4,
.lpv = 2,
},
{
/* 4: a != b, b != c, c == d, d != e */
.pnum = UINT64_C(0x0001000200010001),
.idx = 4,
.lpv = 0,
},
{
/* 5: a == b, b != c, c == d, d != e */
.pnum = UINT64_C(0x0001000200010002),
.idx = 4,
.lpv = 1,
},
{
/* 6: a != b, b == c, c == d, d != e */
.pnum = UINT64_C(0x0001000200030001),
.idx = 4,
.lpv = 0,
},
{
/* 7: a == b, b == c, c == d, d != e */
.pnum = UINT64_C(0x0001000200030004),
.idx = 4,
.lpv = 3,
},
{
/* 8: a != b, b != c, c != d, d == e */
.pnum = UINT64_C(0x0002000100010001),
.idx = 3,
.lpv = 0,
},
{
/* 9: a == b, b != c, c != d, d == e */
.pnum = UINT64_C(0x0002000100010002),
.idx = 3,
.lpv = 1,
},
{
/* 0xa: a != b, b == c, c != d, d == e */
.pnum = UINT64_C(0x0002000100020001),
.idx = 3,
.lpv = 0,
},
{
/* 0xb: a == b, b == c, c != d, d == e */
.pnum = UINT64_C(0x0002000100020003),
.idx = 3,
.lpv = 2,
},
{
/* 0xc: a != b, b != c, c == d, d == e */
.pnum = UINT64_C(0x0002000300010001),
.idx = 2,
.lpv = 0,
},
{
/* 0xd: a == b, b != c, c == d, d == e */
.pnum = UINT64_C(0x0002000300010002),
.idx = 2,
.lpv = 1,
},
{
/* 0xe: a != b, b == c, c == d, d == e */
.pnum = UINT64_C(0x0002000300040001),
.idx = 1,
.lpv = 0,
},
{
/* 0xf: a == b, b == c, c == d, d == e */
.pnum = UINT64_C(0x0002000300040005),
.idx = 0,
.lpv = 4,
},
};
union {
uint16_t u16[FWDSTEP + 1];
uint64_t u64;
} *pnum = (void *)pn;
int32_t v;
dp1 = _mm_cmpeq_epi16(dp1, dp2);
dp1 = _mm_unpacklo_epi16(dp1, dp1);
v = _mm_movemask_ps((__m128)dp1);
/* update last port counter. */
lp[0] += gptbl[v].lpv;
/* if dest port value has changed. */
if (v != GRPMSK) {
lp = pnum->u16 + gptbl[v].idx;
lp[0] = 1;
pnum->u64 = gptbl[v].pnum;
}
return lp;
}
#endif /* APP_LOOKUP_METHOD */
/* main processing loop */
@ -1289,9 +1451,12 @@ main_loop(__attribute__((unused)) void *dummy)
#if ((APP_LOOKUP_METHOD == APP_LOOKUP_LPM) && \
(ENABLE_MULTI_BUFFER_OPTIMIZE == 1))
int32_t k;
uint16_t dlp;
uint16_t *lp;
uint16_t dst_port[MAX_PKT_BURST];
__m128i dip[MAX_PKT_BURST / FWDSTEP];
uint32_t flag[MAX_PKT_BURST / FWDSTEP];
uint16_t pnum[MAX_PKT_BURST + 1];
#endif
prev_tsc = 0;
@ -1402,9 +1567,61 @@ main_loop(__attribute__((unused)) void *dummy)
&pkts_burst[j], &dst_port[j]);
}
/*
* Finish packet processing and group consecutive
* packets with the same destination port.
*/
k = RTE_ALIGN_FLOOR(nb_rx, FWDSTEP);
for (j = 0; j != k; j += FWDSTEP) {
processx4_step3(&pkts_burst[j], &dst_port[j]);
if (k != 0) {
__m128i dp1, dp2;
lp = pnum;
lp[0] = 1;
processx4_step3(pkts_burst, dst_port);
/* dp1: <d[0], d[1], d[2], d[3], ... > */
dp1 = _mm_loadu_si128((__m128i *)dst_port);
for (j = FWDSTEP; j != k; j += FWDSTEP) {
processx4_step3(&pkts_burst[j],
&dst_port[j]);
/*
* dp2:
* <d[j-3], d[j-2], d[j-1], d[j], ... >
*/
dp2 = _mm_loadu_si128((__m128i *)
&dst_port[j - FWDSTEP + 1]);
lp = port_groupx4(&pnum[j - FWDSTEP],
lp, dp1, dp2);
/*
* dp1:
* <d[j], d[j+1], d[j+2], d[j+3], ... >
*/
dp1 = _mm_srli_si128(dp2,
(FWDSTEP - 1) *
sizeof(dst_port[0]));
}
/*
* dp2: <d[j-3], d[j-2], d[j-1], d[j-1], ... >
*/
dp2 = _mm_shufflelo_epi16(dp1, 0xf9);
lp = port_groupx4(&pnum[j - FWDSTEP], lp,
dp1, dp2);
/*
* remove values added by the last repeated
* dst port.
*/
lp[0]--;
dlp = dst_port[j - 1];
} else {
/* set dlp and lp to the never used values. */
dlp = BAD_PORT - 1;
lp = pnum + MAX_PKT_BURST;
}
/* Process up to last 3 packets one by one. */
@ -1412,39 +1629,41 @@ main_loop(__attribute__((unused)) void *dummy)
case 3:
process_packet(qconf, pkts_burst[j],
dst_port + j, portid);
GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
j++;
case 2:
process_packet(qconf, pkts_burst[j],
dst_port + j, portid);
GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
j++;
case 1:
process_packet(qconf, pkts_burst[j],
dst_port + j, portid);
GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
j++;
}
/*
* Send packets out, through destination port.
* Try to group packets with the same destination port.
* Consecuteve pacekts with the same destination port
* are already grouped together.
* If destination port for the packet equals BAD_PORT,
* then free the packet without sending it out.
*/
for (j = 0; j < nb_rx; j = k) {
for (j = 0; j < nb_rx; j += k) {
uint16_t cn, pn = dst_port[j];
int32_t m;
uint16_t pn;
k = j;
do {
cn = dst_port[k];
} while (cn != BAD_PORT && pn == cn &&
++k < nb_rx);
pn = dst_port[j];
k = pnum[j];
send_packetsx4(qconf, pn, pkts_burst + j,
k - j);
if (cn == BAD_PORT) {
rte_pktmbuf_free(pkts_burst[k]);
k += 1;
if (likely(pn != BAD_PORT)) {
send_packetsx4(qconf, pn,
pkts_burst + j, k);
} else {
for (m = j; m != j + k; m++)
rte_pktmbuf_free(pkts_burst[m]);
}
}