examples/ipsec-secgw: add scapy based tests

Add new unittest-like mechanism which uses scapy to craft custom
packets and a set of assertions to check how ipsec-secgw example
application is processing them. Python3 with scapy module is
required by pkttest.sh to run test scripts.

A new mechanism is used to test IPv6 transport mode traffic with
header extensions (trs_ipv6opts.py).

Fix incomplete test log problem by disabling buffering of ipsec-secgw
standard output with stdbuf application.

Signed-off-by: Marcin Smoczynski <marcinx.smoczynski@intel.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Acked-by: Akhil Goyal <akhil.goyal@nxp.com>
Tested-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
This commit is contained in:
Marcin Smoczynski 2019-06-24 15:40:00 +02:00 committed by Akhil Goyal
parent 1e05895e35
commit 9a18283a54
6 changed files with 519 additions and 85 deletions

View File

@ -1,22 +1,10 @@
#! /bin/bash
#check that env vars are properly defined
#check SGW_PATH
if [[ -z "${SGW_PATH}" || ! -x ${SGW_PATH} ]]; then
echo "SGW_PATH is invalid"
exit 127
fi
#check ETH_DEV
if [[ -z "${ETH_DEV}" ]]; then
echo "ETH_DEV is invalid"
exit 127
fi
#setup SGW_LCORE
SGW_LCORE=${SGW_LCORE:-0}
#check that REMOTE_HOST is reachable
ssh ${REMOTE_HOST} echo
st=$?
@ -47,14 +35,6 @@ LOCAL_IPV6=fd12:3456:789a:0031:0000:0000:0000:0092
DPDK_PATH=${RTE_SDK:-${PWD}}
DPDK_BUILD=${RTE_TARGET:-x86_64-native-linux-gcc}
SGW_OUT_FILE=./ipsec-secgw.out1
SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4 ${ETH_DEV}"
SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
SGW_CFG_FILE=$(mktemp)
# configure local host/ifaces
config_local_iface()
{
@ -126,37 +106,7 @@ config6_iface()
config6_remote_iface
}
#start ipsec-secgw
secgw_start()
{
SGW_EXEC_FILE=$(mktemp)
cat <<EOF > ${SGW_EXEC_FILE}
${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
--vdev="net_tap0,mac=fixed" \
-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
${SGW_OUT_FILE} 2>&1 &
p=\$!
echo \$p
EOF
cat ${SGW_EXEC_FILE}
SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
# wait till ipsec-secgw start properly
i=0
st=1
while [[ $i -ne 10 && st -ne 0 ]]; do
sleep 1
ifconfig ${LOCAL_IFACE}
st=$?
let i++
done
}
#stop ipsec-secgw and cleanup
secgw_stop()
{
kill ${SGW_PID}
rm -f ${SGW_EXEC_FILE}
rm -f ${SGW_CFG_FILE}
}
# secgw application parameters setup
SGW_PORT_CFG="--vdev=\"net_tap0,mac=fixed\" ${ETH_DEV}"
SGW_WAIT_DEV="${LOCAL_IFACE}"
. ${DIR}/common_defs_secgw.sh

View File

@ -0,0 +1,65 @@
#!/bin/bash
# check required parameters
SGW_REQ_VARS="SGW_PATH SGW_PORT_CFG SGW_WAIT_DEV"
for reqvar in ${SGW_REQ_VARS}
do
if [[ -z "${!reqvar}" ]]; then
echo "Required parameter ${reqvar} is empty"
exit 127
fi
done
# check if SGW_PATH point to an executable
if [[ ! -x ${SGW_PATH} ]]; then
echo "${SGW_PATH} is not executable"
exit 127
fi
# setup SGW_LCORE
SGW_LCORE=${SGW_LCORE:-0}
# setup config and output filenames
SGW_OUT_FILE=./ipsec-secgw.out1
SGW_CFG_FILE=$(mktemp)
# setup secgw parameters
SGW_CMD_EAL_PRM="--lcores=${SGW_LCORE} -n 4"
SGW_CMD_CFG="(0,0,${SGW_LCORE}),(1,0,${SGW_LCORE})"
SGW_CMD_PRM="-p 0x3 -u 1 -P --config=\"${SGW_CMD_CFG}\""
# start ipsec-secgw
secgw_start()
{
SGW_EXEC_FILE=$(mktemp)
cat <<EOF > ${SGW_EXEC_FILE}
stdbuf -o0 ${SGW_PATH} ${SGW_CMD_EAL_PRM} ${CRYPTO_DEV} \
${SGW_PORT_CFG} ${SGW_EAL_XPRM} \
-- ${SGW_CMD_PRM} ${SGW_CMD_XPRM} -f ${SGW_CFG_FILE} > \
${SGW_OUT_FILE} 2>&1 &
p=\$!
echo \$p
EOF
cat ${SGW_EXEC_FILE}
cat ${SGW_CFG_FILE}
SGW_PID=`/bin/bash -x ${SGW_EXEC_FILE}`
# wait till ipsec-secgw start properly
i=0
st=1
while [[ $i -ne 10 && $st -ne 0 ]]; do
sleep 1
ifconfig ${SGW_WAIT_DEV}
st=$?
let i++
done
}
# stop ipsec-secgw and cleanup
secgw_stop()
{
kill ${SGW_PID}
rm -f ${SGW_EXEC_FILE}
rm -f ${SGW_CFG_FILE}
}

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
import fcntl
import pkg_resources
import socket
import struct
import sys
import unittest
if sys.version_info < (3, 0):
print("Python3 is required to run this script")
sys.exit(1)
try:
from scapy.all import Ether
except ImportError:
print("Scapy module is required")
sys.exit(1)
PKTTEST_REQ = [
"scapy==2.4.3rc1",
]
def assert_requirements(req):
"""
assert requirement is met
req can hold a string or a list of strings
"""
try:
pkg_resources.require(req)
except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
print("Requirement assertion: " + str(e))
sys.exit(1)
TAP_UNPROTECTED = "dtap1"
TAP_PROTECTED = "dtap0"
class Interface(object):
ETH_P_ALL = 3
MAX_PACKET_SIZE = 1280
IOCTL_GET_INFO = 0x8927
SOCKET_TIMEOUT = 0.5
def __init__(self, ifname):
self.name = ifname
# create and bind socket to specified interface
self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
self.s.settimeout(Interface.SOCKET_TIMEOUT)
self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))
# get interface MAC address
info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO, struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
self.mac = ':'.join(['%02x' % i for i in info[18:24]])
def __del__(self):
self.s.close()
def send_l3packet(self, pkt, mac):
e = Ether(src=self.mac, dst=mac)
self.send_packet(e/pkt)
def send_packet(self, pkt):
self.send_bytes(bytes(pkt))
def send_bytes(self, bytedata):
self.s.send(bytedata)
def recv_packet(self):
return Ether(self.recv_bytes())
def recv_bytes(self):
return self.s.recv(Interface.MAX_PACKET_SIZE)
def get_mac(self):
return self.mac
class PacketXfer(object):
def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
self.protected_port = Interface(protected_iface)
self.unprotected_port = Interface(unprotected_iface)
def send_to_protected_port(self, pkt, remote_mac=None):
if remote_mac is None:
remote_mac = self.unprotected_port.get_mac()
self.protected_port.send_l3packet(pkt, remote_mac)
def send_to_unprotected_port(self, pkt, remote_mac=None):
if remote_mac is None:
remote_mac = self.protected_port.get_mac()
self.unprotected_port.send_l3packet(pkt, remote_mac)
def xfer_unprotected(self, pkt):
self.send_to_unprotected_port(pkt)
return self.protected_port.recv_packet()
def xfer_protected(self, pkt):
self.send_to_protected_port(pkt)
return self.unprotected_port.recv_packet()
def pkttest():
if len(sys.argv) == 1:
sys.exit(unittest.main(verbosity=2))
elif len(sys.argv) == 2:
if sys.argv[1] == "config":
module = __import__('__main__')
try:
print(module.config())
except AttributeError:
sys.stderr.write("Cannot find \"config()\" in a test")
sys.exit(1)
else:
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
assert_requirements(PKTTEST_REQ)
else:
print("Usage: " + sys.argv[0] + " check_reqs")

View File

@ -0,0 +1,65 @@
#!/bin/bash
DIR=$(dirname $0)
if [ $(id -u) -ne 0 ]; then
echo "Run as root"
exit 1
fi
# check python requirements
python3 ${DIR}/pkttest.py check_reqs
if [ $? -ne 0 ]; then
echo "Requirements for Python not met, exiting"
exit 1
fi
# secgw application parameters setup
CRYPTO_DEV="--vdev=crypto_null0"
SGW_PORT_CFG="--vdev=net_tap0,mac=fixed --vdev=net_tap1,mac=fixed"
SGW_EAL_XPRM="--no-pci"
SGW_CMD_XPRM=-l
SGW_WAIT_DEV="dtap0"
. ${DIR}/common_defs_secgw.sh
echo "Running tests: $*"
for testcase in $*
do
# check test file presence
testfile="${DIR}/${testcase}.py"
if [ ! -f ${testfile} ]; then
echo "Invalid test ${testcase}"
continue
fi
# prepare test config
python3 ${testfile} config > ${SGW_CFG_FILE}
if [ $? -ne 0 ]; then
rm -f ${SGW_CFG_FILE}
echo "Cannot get secgw configuration for test ${testcase}"
exit 1
fi
# start the application
secgw_start
# setup interfaces
ifconfig dtap0 up
ifconfig dtap1 up
# run the test
echo "Running test case: ${testcase}"
python3 ${testfile}
st=$?
# stop the application
secgw_stop
# report test result and exit on failure
if [ $st -eq 0 ]; then
echo "Test case ${testcase} succeeded"
else
echo "Test case ${testcase} failed!"
exit $st
fi
done

108
examples/ipsec-secgw/test/run_test.sh Normal file → Executable file
View File

@ -17,6 +17,17 @@
# naming convention:
# 'old' means that ipsec-secgw will run in legacy (non-librte_ipsec mode)
# 'tun/trs' refer to tunnel/transport mode respectively
usage()
{
echo "Usage:"
echo -e "\t$0 -[46p]"
echo -e "\t\t-4 Perform Linux IPv4 network tests"
echo -e "\t\t-6 Perform Linux IPv6 network tests"
echo -e "\t\t-p Perform packet validation tests"
echo -e "\t\t-h Display this help"
}
LINUX_TEST="tun_aescbc_sha1 \
tun_aescbc_sha1_esn \
tun_aescbc_sha1_esn_atom \
@ -50,47 +61,82 @@ trs_3descbc_sha1_old \
trs_3descbc_sha1_esn \
trs_3descbc_sha1_esn_atom"
DIR=`dirname $0`
PKT_TESTS="trs_ipv6opts"
DIR=$(dirname $0)
# get input options
st=0
run4=0
run6=0
while [[ ${st} -eq 0 ]]; do
getopts ":46" opt
st=$?
if [[ "${opt}" == "4" ]]; then
run4=1
elif [[ "${opt}" == "6" ]]; then
run6=1
fi
runpkt=0
while getopts ":46ph" opt
do
case $opt in
4)
run4=1
;;
6)
run6=1
;;
p)
runpkt=1
;;
h)
usage
exit 0
;;
?)
echo "Invalid option"
usage
exit 127
;;
esac
done
if [[ ${run4} -eq 0 && ${run6} -eq 0 ]]; then
# no test suite has been selected
if [[ ${run4} -eq 0 && ${run6} -eq 0 && ${runpkt} -eq 0 ]]; then
usage
exit 127
fi
for i in ${LINUX_TEST}; do
# perform packet processing validation tests
st=0
if [ $runpkt -eq 1 ]; then
echo "Performing packet validation tests"
/bin/bash ${DIR}/pkttest.sh ${PKT_TESTS}
st=$?
echo "starting test ${i}"
st4=0
if [[ ${run4} -ne 0 ]]; then
/bin/bash ${DIR}/linux_test4.sh ${i}
st4=$?
echo "test4 ${i} finished with status ${st4}"
echo "pkttests finished with status ${st}"
if [[ ${st} -ne 0 ]]; then
echo "ERROR pkttests FAILED"
exit ${st}
fi
fi
st6=0
if [[ ${run6} -ne 0 ]]; then
/bin/bash ${DIR}/linux_test6.sh ${i}
st6=$?
echo "test6 ${i} finished with status ${st6}"
fi
# perform network tests
if [[ ${run4} -eq 1 || ${run6} -eq 1 ]]; then
for i in ${LINUX_TEST}; do
let "st = st4 + st6"
if [[ $st -ne 0 ]]; then
echo "ERROR test ${i} FAILED"
exit $st
fi
done
echo "starting test ${i}"
st4=0
if [[ ${run4} -ne 0 ]]; then
/bin/bash ${DIR}/linux_test4.sh ${i}
st4=$?
echo "test4 ${i} finished with status ${st4}"
fi
st6=0
if [[ ${run6} -ne 0 ]]; then
/bin/bash ${DIR}/linux_test6.sh ${i}
st6=$?
echo "test6 ${i} finished with status ${st6}"
fi
let "st = st4 + st6"
if [[ $st -ne 0 ]]; then
echo "ERROR test ${i} FAILED"
exit $st
fi
done
fi

View File

@ -0,0 +1,181 @@
#!/usr/bin/env python3
from scapy.all import *
import unittest
import pkttest
SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001"
DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001"
SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64"
DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64"
def config():
return """
sp ipv6 out esp protect 5 pri 1 \\
src {0} \\
dst {1} \\
sport 0:65535 dport 0:65535
sp ipv6 in esp protect 6 pri 1 \\
src {1} \\
dst {0} \\
sport 0:65535 dport 0:65535
sa out 5 cipher_algo null auth_algo null mode transport
sa in 6 cipher_algo null auth_algo null mode transport
rt ipv6 dst {0} port 1
rt ipv6 dst {1} port 0
""".format(SRC_NET, DST_NET)
class TestTransportWithIPv6Ext(unittest.TestCase):
# There is a bug in the IPsec Scapy implementation
# which causes invalid packet reconstruction after
# successful decryption. This method is a workaround.
@staticmethod
def decrypt(pkt, sa):
esp = pkt[ESP]
# decrypt dummy packet with no extensions
d = sa.decrypt(IPv6()/esp)
# fix 'next header' in the preceding header of the original
# packet and remove ESP
pkt[ESP].underlayer.nh = d[IPv6].nh
pkt[ESP].underlayer.remove_payload()
# combine L3 header with decrypted payload
npkt = pkt/d[IPv6].payload
# fix length
npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
return npkt
def setUp(self):
self.px = pkttest.PacketXfer()
self.outb_sa = SecurityAssociation(ESP, spi=5)
self.inb_sa = SecurityAssociation(ESP, spi=6)
def test_outb_ipv6_noopt(self):
pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
# send and check response
resp = self.px.xfer_unprotected(pkt)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
self.assertEqual(resp[ESP].spi, 5)
# decrypt response, check packet after decryption
d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
self.assertEqual(d[UDP].sport, 123)
self.assertEqual(d[UDP].dport, 456)
self.assertEqual(bytes(d[UDP].payload), b'abc')
def test_outb_ipv6_opt(self):
hoptions = []
hoptions.append(RouterAlert(value=2))
hoptions.append(Jumbo(jumboplen=5000))
hoptions.append(Pad1())
doptions = []
doptions.append(HAO(hoa="1234::4321"))
pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
pkt /= IPv6ExtHdrHopByHop(options=hoptions)
pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
pkt /= IPv6ExtHdrDestOpt(options=doptions)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
# send and check response
resp = self.px.xfer_unprotected(pkt)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
# check extensions
self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
# check ESP
self.assertEqual(resp[ESP].spi, 5)
# decrypt response, check packet after decryption
d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(d[UDP].sport, 123)
self.assertEqual(d[UDP].dport, 456)
self.assertEqual(bytes(d[UDP].payload), b'abc')
def test_inb_ipv6_noopt(self):
# encrypt and send raw UDP packet
pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# send and check response
resp = self.px.xfer_protected(e)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
# check UDP packet
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
def test_inb_ipv6_opt(self):
hoptions = []
hoptions.append(RouterAlert(value=2))
hoptions.append(Jumbo(jumboplen=5000))
hoptions.append(Pad1())
doptions = []
doptions.append(HAO(hoa="1234::4321"))
# prepare packet with options
pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
pkt /= IPv6ExtHdrHopByHop(options=hoptions)
pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
pkt /= IPv6ExtHdrDestOpt(options=doptions)
pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# self encrypted packet and check response
resp = self.px.xfer_protected(e)
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
def test_inb_ipv6_frag(self):
# prepare ESP payload
pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
e = self.inb_sa.encrypt(pkt)
# craft and send inbound packet
e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
resp = self.px.xfer_protected(e)
# check response
self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
# check UDP
self.assertEqual(resp[UDP].sport, 123)
self.assertEqual(resp[UDP].dport, 456)
self.assertEqual(bytes(resp[UDP].payload), b'abc')
pkttest.pkttest()