Use optimised complexity safe sorting routine instead of the kernel's

"qsort()".

The kernel's "qsort()" routine can in worst case spend O(N*N) amount of
comparisons before the input array is sorted. It can also recurse a
significant amount of times using up the kernel's interrupt thread
stack.

The custom sorting routine takes advantage of that the sorting key is
only 64 bits. Based on set and cleared bits in the sorting key it
partitions the array until it is sorted. This process has a recursion
limit of 64 times, due to the number of set and cleared bits which can
occur. Compiled with -O2 the sorting routine was measured to use
64-bytes of stack. Multiplying this by 64 gives a maximum stack
consumption of 4096 bytes for AMD64. The same applies to the execution
time, that the array to be sorted will not be traversed more than 64
times.

When serving roughly 80Gb/s with 80K TCP connections, the old method
consisting of "qsort()" and "tcp_lro_mbuf_compare_header()" used 1.4%
CPU, while the new "tcp_lro_sort()" used 1.1% for LRO related sorting
as measured by Intel Vtune. The testing was done using a sysctl to
toggle between "qsort()" and "tcp_lro_sort()".

Differential Revision:	https://reviews.freebsd.org/D6472
Sponsored by:	Mellanox Technologies
Tested by:	Netflix
Reviewed by:	gallatin, rrs, sephe, transport
This commit is contained in:
Hans Petter Selasky 2016-05-26 11:10:31 +00:00
parent 9a5e3bff0d
commit fc271df341
Notes: svn2git 2020-12-20 02:59:44 +00:00
svn path=/head/; revision=300731
2 changed files with 112 additions and 40 deletions

View File

@ -111,9 +111,9 @@ tcp_lro_init_args(struct lro_ctrl *lc, struct ifnet *ifp,
LIST_INIT(&lc->lro_active);
/* compute size to allocate */
size = (lro_mbufs * sizeof(struct mbuf *)) +
size = (lro_mbufs * sizeof(struct lro_mbuf_sort)) +
(lro_entries * sizeof(*le));
lc->lro_mbuf_data = (struct mbuf **)
lc->lro_mbuf_data = (struct lro_mbuf_sort *)
malloc(size, M_LRO, M_NOWAIT | M_ZERO);
/* check for out of memory */
@ -149,7 +149,7 @@ tcp_lro_free(struct lro_ctrl *lc)
/* free mbuf array, if any */
for (x = 0; x != lc->lro_mbuf_count; x++)
m_freem(lc->lro_mbuf_data[x]);
m_freem(lc->lro_mbuf_data[x].mb);
lc->lro_mbuf_count = 0;
/* free allocated memory, if any */
@ -365,32 +365,102 @@ tcp_lro_flush(struct lro_ctrl *lc, struct lro_entry *le)
LIST_INSERT_HEAD(&lc->lro_free, le, next);
}
static int
tcp_lro_mbuf_compare_header(const void *ppa, const void *ppb)
#ifdef HAVE_INLINE_FLSLL
#define tcp_lro_msb_64(x) (1ULL << (flsll(x) - 1))
#else
static inline uint64_t
tcp_lro_msb_64(uint64_t x)
{
const struct mbuf *ma = *((const struct mbuf * const *)ppa);
const struct mbuf *mb = *((const struct mbuf * const *)ppb);
int ret;
x |= (x >> 1);
x |= (x >> 2);
x |= (x >> 4);
x |= (x >> 8);
x |= (x >> 16);
x |= (x >> 32);
return (x & ~(x >> 1));
}
#endif
ret = M_HASHTYPE_GET(ma) - M_HASHTYPE_GET(mb);
if (ret != 0)
goto done;
/*
* The tcp_lro_sort() routine is comparable to qsort(), except it has
* a worst case complexity limit of O(MIN(N,64)*N), where N is the
* number of elements to sort and 64 is the number of sequence bits
* available. The algorithm is bit-slicing the 64-bit sequence number,
* sorting one bit at a time from the most significant bit until the
* least significant one, skipping the constant bits.
*/
static void
tcp_lro_sort(struct lro_mbuf_sort *parray, uint32_t size)
{
struct lro_mbuf_sort temp;
uint64_t ones;
uint64_t zeros;
uint32_t x;
uint32_t y;
if (ma->m_pkthdr.flowid > mb->m_pkthdr.flowid)
return (1);
else if (ma->m_pkthdr.flowid < mb->m_pkthdr.flowid)
return (-1);
repeat:
/* for small arrays bubble sort is faster */
if (size <= 12) {
for (x = 0; x != size; x++) {
for (y = x + 1; y != size; y++) {
if (parray[x].seq > parray[y].seq) {
/* swap entries */
temp = parray[x];
parray[x] = parray[y];
parray[y] = temp;
}
}
}
return;
}
ret = TCP_LRO_SEQUENCE(ma) - TCP_LRO_SEQUENCE(mb);
done:
return (ret);
/* compute sequence bits which are constant */
ones = 0;
zeros = 0;
for (x = 0; x != size; x++) {
ones |= parray[x].seq;
zeros |= ~parray[x].seq;
}
/* compute bits which are not constant into "ones" */
ones &= zeros;
if (ones == 0)
return;
/* pick the most significant bit which is not constant */
ones = tcp_lro_msb_64(ones);
/*
* Move entries having cleared sequence bits to the beginning
* of the array:
*/
for (x = y = 0; y != size; y++) {
/* skip set bits */
if (parray[y].seq & ones)
continue;
/* swap entries */
temp = parray[x];
parray[x] = parray[y];
parray[y] = temp;
x++;
}
KASSERT(x != 0 && x != size, ("Memory is corrupted\n"));
/* sort zeros */
tcp_lro_sort(parray, x);
/* sort ones */
parray += x;
size -= x;
goto repeat;
}
void
tcp_lro_flush_all(struct lro_ctrl *lc)
{
uint32_t hashtype;
uint32_t flowid;
uint64_t seq;
uint64_t nseq;
unsigned x;
/* check if no mbufs to flush */
@ -398,30 +468,27 @@ tcp_lro_flush_all(struct lro_ctrl *lc)
goto done;
/* sort all mbufs according to stream */
qsort(lc->lro_mbuf_data, lc->lro_mbuf_count, sizeof(struct mbuf *),
&tcp_lro_mbuf_compare_header);
tcp_lro_sort(lc->lro_mbuf_data, lc->lro_mbuf_count);
/* input data into LRO engine, stream by stream */
flowid = 0;
hashtype = M_HASHTYPE_NONE;
seq = 0;
for (x = 0; x != lc->lro_mbuf_count; x++) {
struct mbuf *mb;
mb = lc->lro_mbuf_data[x];
/* get mbuf */
mb = lc->lro_mbuf_data[x].mb;
/* get sequence number, masking away the packet index */
nseq = lc->lro_mbuf_data[x].seq & (-1ULL << 24);
/* check for new stream */
if (mb->m_pkthdr.flowid != flowid ||
M_HASHTYPE_GET(mb) != hashtype) {
flowid = mb->m_pkthdr.flowid;
hashtype = M_HASHTYPE_GET(mb);
if (seq != nseq) {
seq = nseq;
/* flush active streams */
tcp_lro_rx_done(lc);
}
#ifdef TCP_LRO_RESET_SEQUENCE
/* reset sequence number */
TCP_LRO_SEQUENCE(mb) = 0;
#endif
/* add packet to LRO engine */
if (tcp_lro_rx(lc, mb, 0) != 0) {
/* input packet to network layer */
@ -799,11 +866,14 @@ tcp_lro_queue_mbuf(struct lro_ctrl *lc, struct mbuf *mb)
if (__predict_false(lc->lro_mbuf_count == lc->lro_mbuf_max))
tcp_lro_flush_all(lc);
/* store sequence number */
TCP_LRO_SEQUENCE(mb) = lc->lro_mbuf_count;
/* create sequence number */
lc->lro_mbuf_data[lc->lro_mbuf_count].seq =
(((uint64_t)M_HASHTYPE_GET(mb)) << 56) |
(((uint64_t)mb->m_pkthdr.flowid) << 24) |
((uint64_t)lc->lro_mbuf_count);
/* enter mbuf */
lc->lro_mbuf_data[lc->lro_mbuf_count++] = mb;
lc->lro_mbuf_data[lc->lro_mbuf_count++].mb = mb;
}
/* end */

View File

@ -38,9 +38,6 @@
#define TCP_LRO_ENTRIES 8
#endif
#define TCP_LRO_SEQUENCE(mb) \
(mb)->m_pkthdr.PH_loc.thirtytwo[0]
struct lro_entry {
LIST_ENTRY(lro_entry) next;
struct mbuf *m_head;
@ -80,10 +77,15 @@ LIST_HEAD(lro_head, lro_entry);
#define source_ip6 lesource.s_ip6
#define dest_ip6 ledest.d_ip6
struct lro_mbuf_sort {
uint64_t seq;
struct mbuf *mb;
};
/* NB: This is part of driver structs. */
struct lro_ctrl {
struct ifnet *ifp;
struct mbuf **lro_mbuf_data;
struct lro_mbuf_sort *lro_mbuf_data;
uint64_t lro_queued;
uint64_t lro_flushed;
uint64_t lro_bad_csum;