|
|
|
@ -3,6 +3,7 @@
|
|
|
|
|
# SPDX-License-Identifier: BSD-2-Clause
|
|
|
|
|
#
|
|
|
|
|
# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org>
|
|
|
|
|
# Copyright (c) 2023 Kajetan Staszkiewicz <vegeta@tuxpowered.net>
|
|
|
|
|
#
|
|
|
|
|
# Redistribution and use in source and binary forms, with or without
|
|
|
|
|
# modification, are permitted provided that the following conditions
|
|
|
|
@ -29,333 +30,505 @@
|
|
|
|
|
import argparse
|
|
|
|
|
import logging
|
|
|
|
|
logging.getLogger("scapy").setLevel(logging.CRITICAL)
|
|
|
|
|
import math
|
|
|
|
|
import scapy.all as sp
|
|
|
|
|
import socket
|
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
from copy import copy
|
|
|
|
|
from sniffer import Sniffer
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(format='%(message)s')
|
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
|
|
|
|
|
|
|
|
|
|
dup_found = 0
|
|
|
|
|
def build_payload(l):
|
|
|
|
|
pl = len(PAYLOAD_MAGIC)
|
|
|
|
|
ret = PAYLOAD_MAGIC * math.floor(l/pl)
|
|
|
|
|
ret += PAYLOAD_MAGIC[0:(l % pl)]
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
|
|
def check_dup(args, packet):
|
|
|
|
|
"""
|
|
|
|
|
Verify that this is an ICMP packet, and that we only see one
|
|
|
|
|
"""
|
|
|
|
|
global dup_found
|
|
|
|
|
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if not icmp:
|
|
|
|
|
return False
|
|
|
|
|
def prepare_ipv6(dst_address, send_params):
|
|
|
|
|
src_address = send_params.get('src_address')
|
|
|
|
|
hlim = send_params.get('hlim')
|
|
|
|
|
tc = send_params.get('tc')
|
|
|
|
|
ip6 = sp.IPv6(dst=dst_address)
|
|
|
|
|
if src_address:
|
|
|
|
|
ip6.src = src_address
|
|
|
|
|
if hlim:
|
|
|
|
|
ip6.hlim = hlim
|
|
|
|
|
if tc:
|
|
|
|
|
ip6.tc = tc
|
|
|
|
|
return ip6
|
|
|
|
|
|
|
|
|
|
raw = packet.getlayer(sp.Raw)
|
|
|
|
|
if not raw:
|
|
|
|
|
return False
|
|
|
|
|
if raw.load != PAYLOAD_MAGIC:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
dup_found = dup_found + 1
|
|
|
|
|
return False
|
|
|
|
|
def prepare_ipv4(dst_address, send_params):
|
|
|
|
|
src_address = send_params.get('src_address')
|
|
|
|
|
flags = send_params.get('flags')
|
|
|
|
|
tos = send_params.get('tc')
|
|
|
|
|
ttl = send_params.get('hlim')
|
|
|
|
|
ip = sp.IP(dst=dst_address)
|
|
|
|
|
if src_address:
|
|
|
|
|
ip.src = src_address
|
|
|
|
|
if flags:
|
|
|
|
|
ip.flags = flags
|
|
|
|
|
if tos:
|
|
|
|
|
ip.tos = tos
|
|
|
|
|
if ttl:
|
|
|
|
|
ip.ttl = ttl
|
|
|
|
|
return ip
|
|
|
|
|
|
|
|
|
|
def check_ping_request(args, packet):
|
|
|
|
|
if args.ip6:
|
|
|
|
|
return check_ping6_request(args, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_ping4_request(args, packet)
|
|
|
|
|
|
|
|
|
|
def check_ping4_request(args, packet):
|
|
|
|
|
"""
|
|
|
|
|
Verify that the packet matches what we'd have sent
|
|
|
|
|
"""
|
|
|
|
|
dst_ip = args.to[0]
|
|
|
|
|
def send_icmp_ping(dst_address, sendif, send_params):
|
|
|
|
|
send_length = send_params['length']
|
|
|
|
|
ether = sp.Ether()
|
|
|
|
|
if ':' in dst_address:
|
|
|
|
|
ip6 = prepare_ipv6(dst_address, send_params)
|
|
|
|
|
icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
|
|
|
|
|
req = ether / ip6 / icmp
|
|
|
|
|
else:
|
|
|
|
|
ip = prepare_ipv4(dst_address, send_params)
|
|
|
|
|
icmp = sp.ICMP(type='echo-request')
|
|
|
|
|
raw = sp.raw(build_payload(send_length))
|
|
|
|
|
req = ether / ip / icmp / raw
|
|
|
|
|
sp.sendp(req, sendif, verbose=False)
|
|
|
|
|
|
|
|
|
|
ip = packet.getlayer(sp.IP)
|
|
|
|
|
if not ip:
|
|
|
|
|
return False
|
|
|
|
|
if ip.dst != dst_ip:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if not icmp:
|
|
|
|
|
return False
|
|
|
|
|
if sp.icmptypes[icmp.type] != 'echo-request':
|
|
|
|
|
return False
|
|
|
|
|
def send_tcp_syn(dst_address, sendif, send_params):
|
|
|
|
|
tcpopt_unaligned = send_params.get('tcpopt_unaligned')
|
|
|
|
|
seq = send_params.get('seq')
|
|
|
|
|
mss = send_params.get('mss')
|
|
|
|
|
ether = sp.Ether()
|
|
|
|
|
opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
|
|
|
|
|
if tcpopt_unaligned:
|
|
|
|
|
opts = [('NOP', 0 )] + opts
|
|
|
|
|
if ':' in dst_address:
|
|
|
|
|
ip = prepare_ipv6(dst_address, send_params)
|
|
|
|
|
else:
|
|
|
|
|
ip = prepare_ipv4(dst_address, send_params)
|
|
|
|
|
tcp = sp.TCP(dport=666, flags='S', options=opts, seq=seq)
|
|
|
|
|
req = ether / ip / tcp
|
|
|
|
|
sp.sendp(req, iface=sendif, verbose=False)
|
|
|
|
|
|
|
|
|
|
raw = packet.getlayer(sp.Raw)
|
|
|
|
|
if not raw:
|
|
|
|
|
return False
|
|
|
|
|
if raw.load != PAYLOAD_MAGIC:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Wait to check expectations until we've established this is the packet we
|
|
|
|
|
# sent.
|
|
|
|
|
if args.expect_tos:
|
|
|
|
|
if ip.tos != int(args.expect_tos[0]):
|
|
|
|
|
print("Unexpected ToS value %d, expected %d" \
|
|
|
|
|
% (ip.tos, int(args.expect_tos[0])))
|
|
|
|
|
return False
|
|
|
|
|
def send_ping(dst_address, sendif, ping_type, send_params):
|
|
|
|
|
if ping_type == 'icmp':
|
|
|
|
|
send_icmp_ping(dst_address, sendif, send_params)
|
|
|
|
|
elif ping_type == 'tcpsyn':
|
|
|
|
|
send_tcp_syn(dst_address, sendif, send_params)
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unspported ping type')
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def check_ping6_request(args, packet):
|
|
|
|
|
"""
|
|
|
|
|
Verify that the packet matches what we'd have sent
|
|
|
|
|
"""
|
|
|
|
|
dst_ip = args.to[0]
|
|
|
|
|
def check_ipv4(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
flags = expect_params.get('flags')
|
|
|
|
|
tos = expect_params.get('tc')
|
|
|
|
|
ttl = expect_params.get('hlim')
|
|
|
|
|
ip = packet.getlayer(sp.IP)
|
|
|
|
|
if not ip:
|
|
|
|
|
LOGGER.debug('Packet is not IPv4!')
|
|
|
|
|
return False
|
|
|
|
|
if src_address and ip.src != src_address:
|
|
|
|
|
LOGGER.debug('Source IPv4 address does not match!')
|
|
|
|
|
return False
|
|
|
|
|
if dst_address and ip.dst != dst_address:
|
|
|
|
|
LOGGER.debug('Destination IPv4 address does not match!')
|
|
|
|
|
return False
|
|
|
|
|
chksum = ip.chksum
|
|
|
|
|
ip.chksum = None
|
|
|
|
|
new_chksum = sp.IP(sp.raw(ip)).chksum
|
|
|
|
|
if chksum != new_chksum:
|
|
|
|
|
LOGGER.debug(f'Expected IP checksum {new_chksum} but found {chksum}')
|
|
|
|
|
return False
|
|
|
|
|
if flags and ip.flags != flags:
|
|
|
|
|
LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
|
|
|
|
|
return False
|
|
|
|
|
if tos and ip.tos != tos:
|
|
|
|
|
LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
|
|
|
|
|
return False
|
|
|
|
|
if ttl and ip.ttl != ttl:
|
|
|
|
|
LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
ip = packet.getlayer(sp.IPv6)
|
|
|
|
|
if not ip:
|
|
|
|
|
return False
|
|
|
|
|
if ip.dst != dst_ip:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
icmp = packet.getlayer(sp.ICMPv6EchoRequest)
|
|
|
|
|
if not icmp:
|
|
|
|
|
return False
|
|
|
|
|
if icmp.data != PAYLOAD_MAGIC:
|
|
|
|
|
return False
|
|
|
|
|
def check_ipv6(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
flags = expect_params.get('flags')
|
|
|
|
|
hlim = expect_params.get('hlim')
|
|
|
|
|
tc = expect_params.get('tc')
|
|
|
|
|
ip6 = packet.getlayer(sp.IPv6)
|
|
|
|
|
if not ip6:
|
|
|
|
|
LOGGER.debug('Packet is not IPv6!')
|
|
|
|
|
return False
|
|
|
|
|
if src_address and ip6.src != src_address:
|
|
|
|
|
LOGGER.debug('Source IPv6 address does not match!')
|
|
|
|
|
return False
|
|
|
|
|
if dst_address and ip6.dst != dst_address:
|
|
|
|
|
LOGGER.debug('Destination IPv6 address does not match!')
|
|
|
|
|
return False
|
|
|
|
|
# IPv6 has no IP-level checksum.
|
|
|
|
|
if flags:
|
|
|
|
|
raise Exception("There's no fragmentation flags in IPv6")
|
|
|
|
|
if hlim and ip6.hlim != hlim:
|
|
|
|
|
LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
|
|
|
|
|
return False
|
|
|
|
|
if tc and ip6.tc != tc:
|
|
|
|
|
LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# Wait to check expectations until we've established this is the packet we
|
|
|
|
|
# sent.
|
|
|
|
|
if args.expect_tc:
|
|
|
|
|
if ip.tc != int(args.expect_tc[0]):
|
|
|
|
|
print("Unexpected traffic class value %d, expected %d" \
|
|
|
|
|
% (ip.tc, int(args.expect_tc[0])))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
def check_ping_4(expect_params, packet):
|
|
|
|
|
expect_length = expect_params['length']
|
|
|
|
|
if not check_ipv4(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if not icmp:
|
|
|
|
|
LOGGER.debug('Packet is not IPv4 ICMP!')
|
|
|
|
|
return False
|
|
|
|
|
raw = packet.getlayer(sp.Raw)
|
|
|
|
|
if not raw:
|
|
|
|
|
LOGGER.debug('Packet contains no payload!')
|
|
|
|
|
return False
|
|
|
|
|
if raw.load != build_payload(expect_length):
|
|
|
|
|
LOGGER.debug('Payload magic does not match!')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def check_ping_reply(args, packet):
|
|
|
|
|
if args.ip6:
|
|
|
|
|
return check_ping6_reply(args, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_ping4_reply(args, packet)
|
|
|
|
|
|
|
|
|
|
def check_ping4_reply(args, packet):
|
|
|
|
|
"""
|
|
|
|
|
Check that this is a reply to the ping request we sent
|
|
|
|
|
"""
|
|
|
|
|
dst_ip = args.to[0]
|
|
|
|
|
def check_ping_request_4(expect_params, packet):
|
|
|
|
|
if not check_ping_4(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if sp.icmptypes[icmp.type] != 'echo-request':
|
|
|
|
|
LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
ip = packet.getlayer(sp.IP)
|
|
|
|
|
if not ip:
|
|
|
|
|
return False
|
|
|
|
|
if ip.src != dst_ip:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if not icmp:
|
|
|
|
|
return False
|
|
|
|
|
if sp.icmptypes[icmp.type] != 'echo-reply':
|
|
|
|
|
return False
|
|
|
|
|
def check_ping_reply_4(expect_params, packet):
|
|
|
|
|
if not check_ping_4(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
icmp = packet.getlayer(sp.ICMP)
|
|
|
|
|
if sp.icmptypes[icmp.type] != 'echo-reply':
|
|
|
|
|
LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
raw = packet.getlayer(sp.Raw)
|
|
|
|
|
if not raw:
|
|
|
|
|
return False
|
|
|
|
|
if raw.load != PAYLOAD_MAGIC:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if args.expect_tos:
|
|
|
|
|
if ip.tos != int(args.expect_tos[0]):
|
|
|
|
|
print("Unexpected ToS value %d, expected %d" \
|
|
|
|
|
% (ip.tos, int(args.expect_tos[0])))
|
|
|
|
|
return False
|
|
|
|
|
def check_ping_request_6(expect_params, packet):
|
|
|
|
|
expect_length = expect_params['length']
|
|
|
|
|
if not check_ipv6(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
icmp = packet.getlayer(sp.ICMPv6EchoRequest)
|
|
|
|
|
if not icmp:
|
|
|
|
|
LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
|
|
|
|
|
return False
|
|
|
|
|
if icmp.data != build_payload(expect_length):
|
|
|
|
|
LOGGER.debug('Payload magic does not match!')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def check_ping6_reply(args, packet):
|
|
|
|
|
"""
|
|
|
|
|
Check that this is a reply to the ping request we sent
|
|
|
|
|
"""
|
|
|
|
|
dst_ip = args.to[0]
|
|
|
|
|
def check_ping_reply_6(expect_params, packet):
|
|
|
|
|
expect_length = expect_params['length']
|
|
|
|
|
if not check_ipv6(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
icmp = packet.getlayer(sp.ICMPv6EchoReply)
|
|
|
|
|
if not icmp:
|
|
|
|
|
LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
|
|
|
|
|
return False
|
|
|
|
|
if icmp.data != build_payload(expect_length):
|
|
|
|
|
LOGGER.debug('Payload magic does not match!')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
ip = packet.getlayer(sp.IPv6)
|
|
|
|
|
if not ip:
|
|
|
|
|
return False
|
|
|
|
|
if ip.src != dst_ip:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
icmp = packet.getlayer(sp.ICMPv6EchoReply)
|
|
|
|
|
if not icmp:
|
|
|
|
|
print("No echo reply!")
|
|
|
|
|
return False
|
|
|
|
|
def check_ping_request(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
if not (src_address or dst_address):
|
|
|
|
|
raise Exception('Source or destination address must be given to match the ping request!')
|
|
|
|
|
if (
|
|
|
|
|
(src_address and ':' in src_address) or
|
|
|
|
|
(dst_address and ':' in dst_address)
|
|
|
|
|
):
|
|
|
|
|
return check_ping_request_6(expect_params, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_ping_request_4(expect_params, packet)
|
|
|
|
|
|
|
|
|
|
if icmp.data != PAYLOAD_MAGIC:
|
|
|
|
|
print("data mismatch")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if args.expect_tc:
|
|
|
|
|
if ip.tc != int(args.expect_tc[0]):
|
|
|
|
|
print("Unexpected traffic class value %d, expected %d" \
|
|
|
|
|
% (ip.tc, int(args.expect_tc[0])))
|
|
|
|
|
return False
|
|
|
|
|
def check_ping_reply(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
if not (src_address or dst_address):
|
|
|
|
|
raise Exception('Source or destination address must be given to match the ping reply!')
|
|
|
|
|
if (
|
|
|
|
|
(src_address and ':' in src_address) or
|
|
|
|
|
(dst_address and ':' in dst_address)
|
|
|
|
|
):
|
|
|
|
|
return check_ping_reply_6(expect_params, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_ping_reply_4(expect_params, packet)
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def ping(send_if, dst_ip, args):
|
|
|
|
|
ether = sp.Ether()
|
|
|
|
|
ip = sp.IP(dst=dst_ip)
|
|
|
|
|
icmp = sp.ICMP(type='echo-request')
|
|
|
|
|
raw = sp.raw(PAYLOAD_MAGIC)
|
|
|
|
|
def check_tcp(expect_params, packet):
|
|
|
|
|
tcp_flags = expect_params.get('tcp_flags')
|
|
|
|
|
mss = expect_params.get('mss')
|
|
|
|
|
seq = expect_params.get('seq')
|
|
|
|
|
tcp = packet.getlayer(sp.TCP)
|
|
|
|
|
if not tcp:
|
|
|
|
|
LOGGER.debug('Packet is not TCP!')
|
|
|
|
|
return False
|
|
|
|
|
chksum = tcp.chksum
|
|
|
|
|
tcp.chksum = None
|
|
|
|
|
newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
|
|
|
|
|
new_chksum = newpacket[sp.TCP].chksum
|
|
|
|
|
if chksum != new_chksum:
|
|
|
|
|
LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
|
|
|
|
|
return False
|
|
|
|
|
if tcp_flags and tcp.flags != tcp_flags:
|
|
|
|
|
LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
|
|
|
|
|
return False
|
|
|
|
|
if seq:
|
|
|
|
|
if tcp_flags == 'S':
|
|
|
|
|
tcp_seq = tcp.seq
|
|
|
|
|
elif tcp_flags == 'SA':
|
|
|
|
|
tcp_seq = tcp.ack - 1
|
|
|
|
|
if seq != tcp_seq:
|
|
|
|
|
LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
|
|
|
|
|
return False
|
|
|
|
|
if mss:
|
|
|
|
|
for option in tcp.options:
|
|
|
|
|
if option[0] == 'MSS':
|
|
|
|
|
if option[1] != mss:
|
|
|
|
|
LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if args.send_tos:
|
|
|
|
|
ip.tos = int(args.send_tos[0])
|
|
|
|
|
|
|
|
|
|
if args.fromaddr:
|
|
|
|
|
ip.src = args.fromaddr[0]
|
|
|
|
|
def check_tcp_syn_request_4(expect_params, packet):
|
|
|
|
|
if not check_ipv4(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
req = ether / ip / icmp / raw
|
|
|
|
|
sp.sendp(req, iface=send_if, verbose=False)
|
|
|
|
|
|
|
|
|
|
def ping6(send_if, dst_ip, args):
|
|
|
|
|
ether = sp.Ether()
|
|
|
|
|
ip6 = sp.IPv6(dst=dst_ip)
|
|
|
|
|
icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC))
|
|
|
|
|
def check_tcp_syn_reply_4(expect_params, packet):
|
|
|
|
|
if not check_ipv4(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if args.send_tc:
|
|
|
|
|
ip6.tc = int(args.send_tc[0])
|
|
|
|
|
|
|
|
|
|
if args.fromaddr:
|
|
|
|
|
ip6.src = args.fromaddr[0]
|
|
|
|
|
def check_tcp_syn_request_6(expect_params, packet):
|
|
|
|
|
if not check_ipv6(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
req = ether / ip6 / icmp
|
|
|
|
|
sp.sendp(req, iface=send_if, verbose=False)
|
|
|
|
|
|
|
|
|
|
def check_tcpsyn(args, packet):
|
|
|
|
|
dst_ip = args.to[0]
|
|
|
|
|
def check_tcp_syn_reply_6(expect_params, packet):
|
|
|
|
|
if not check_ipv6(expect_params, packet):
|
|
|
|
|
return False
|
|
|
|
|
if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
|
|
|
|
|
return False
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
ip = packet.getlayer(sp.IP)
|
|
|
|
|
if not ip:
|
|
|
|
|
return False
|
|
|
|
|
if ip.dst != dst_ip:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
tcp = packet.getlayer(sp.TCP)
|
|
|
|
|
if not tcp:
|
|
|
|
|
return False
|
|
|
|
|
def check_tcp_syn_request(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
if not (src_address or dst_address):
|
|
|
|
|
raise Exception('Source or destination address must be given to match the tcp syn request!')
|
|
|
|
|
if (
|
|
|
|
|
(src_address and ':' in src_address) or
|
|
|
|
|
(dst_address and ':' in dst_address)
|
|
|
|
|
):
|
|
|
|
|
return check_tcp_syn_request_6(expect_params, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_tcp_syn_request_4(expect_params, packet)
|
|
|
|
|
|
|
|
|
|
# Verify IP checksum
|
|
|
|
|
chksum = ip.chksum
|
|
|
|
|
ip.chksum = None
|
|
|
|
|
new_chksum = sp.IP(sp.raw(ip)).chksum
|
|
|
|
|
if chksum != new_chksum:
|
|
|
|
|
print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Verify TCP checksum
|
|
|
|
|
chksum = tcp.chksum
|
|
|
|
|
packet_raw = sp.raw(packet)
|
|
|
|
|
tcp.chksum = None
|
|
|
|
|
newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
|
|
|
|
|
new_chksum = newpacket[sp.TCP].chksum
|
|
|
|
|
if chksum != new_chksum:
|
|
|
|
|
print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum))
|
|
|
|
|
return False
|
|
|
|
|
def check_tcp_syn_reply(expect_params, packet):
|
|
|
|
|
src_address = expect_params.get('src_address')
|
|
|
|
|
dst_address = expect_params.get('dst_address')
|
|
|
|
|
if not (src_address or dst_address):
|
|
|
|
|
raise Exception('Source or destination address must be given to match the tcp syn reply!')
|
|
|
|
|
if (
|
|
|
|
|
(src_address and ':' in src_address) or
|
|
|
|
|
(dst_address and ':' in dst_address)
|
|
|
|
|
):
|
|
|
|
|
return check_tcp_syn_reply_6(expect_params, packet)
|
|
|
|
|
else:
|
|
|
|
|
return check_tcp_syn_reply_4(expect_params, packet)
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def tcpsyn(send_if, dst_ip, args):
|
|
|
|
|
opts=[('Timestamp', (1, 1)), ('MSS', 1280)]
|
|
|
|
|
def setup_sniffer(recvif, ping_type, sniff_type, expect_params):
|
|
|
|
|
if ping_type == 'icmp' and sniff_type == 'request':
|
|
|
|
|
checkfn = check_ping_request
|
|
|
|
|
elif ping_type == 'icmp' and sniff_type == 'reply':
|
|
|
|
|
checkfn = check_ping_reply
|
|
|
|
|
elif ping_type == 'tcpsyn' and sniff_type == 'request':
|
|
|
|
|
checkfn = check_tcp_syn_request
|
|
|
|
|
elif ping_type == 'tcpsyn' and sniff_type == 'reply':
|
|
|
|
|
checkfn = check_tcp_syn_reply
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Unspported ping or sniff type')
|
|
|
|
|
|
|
|
|
|
if args.tcpopt_unaligned:
|
|
|
|
|
opts = [('NOP', 0 )] + opts
|
|
|
|
|
return Sniffer(expect_params, checkfn, recvif)
|
|
|
|
|
|
|
|
|
|
ether = sp.Ether()
|
|
|
|
|
ip = sp.IP(dst=dst_ip)
|
|
|
|
|
tcp = sp.TCP(dport=666, flags='S', options=opts)
|
|
|
|
|
|
|
|
|
|
req = ether / ip / tcp
|
|
|
|
|
sp.sendp(req, iface=send_if, verbose=False)
|
|
|
|
|
def parse_args():
|
|
|
|
|
parser = argparse.ArgumentParser("pft_ping.py",
|
|
|
|
|
description="Ping test tool")
|
|
|
|
|
|
|
|
|
|
# Parameters of sent ping request
|
|
|
|
|
parser.add_argument('--sendif', nargs=1,
|
|
|
|
|
required=True,
|
|
|
|
|
help='The interface through which the packet(s) will be sent')
|
|
|
|
|
parser.add_argument('--to', nargs=1,
|
|
|
|
|
required=True,
|
|
|
|
|
help='The destination IP address for the ping request')
|
|
|
|
|
parser.add_argument('--ping-type',
|
|
|
|
|
choices=('icmp', 'tcpsyn'),
|
|
|
|
|
help='Type of ping: ICMP (default) or TCP SYN',
|
|
|
|
|
default='icmp')
|
|
|
|
|
parser.add_argument('--fromaddr', nargs=1,
|
|
|
|
|
help='The source IP address for the ping request')
|
|
|
|
|
|
|
|
|
|
# Where to look for packets to analyze.
|
|
|
|
|
# The '+' format is ugly as it mixes positional with optional syntax.
|
|
|
|
|
# But we have no positional parameters so I guess it's fine to use it.
|
|
|
|
|
parser.add_argument('--recvif', nargs='+',
|
|
|
|
|
help='The interfaces on which to expect the ping request')
|
|
|
|
|
parser.add_argument('--replyif', nargs='+',
|
|
|
|
|
help='The interfaces which to expect the ping response')
|
|
|
|
|
|
|
|
|
|
# Packet settings
|
|
|
|
|
parser_send = parser.add_argument_group('Values set in transmitted packets')
|
|
|
|
|
parser_send.add_argument('--send-flags', nargs=1, type=str,
|
|
|
|
|
help='IPv4 fragmentation flags')
|
|
|
|
|
parser_send.add_argument('--send-hlim', nargs=1, type=int,
|
|
|
|
|
help='IPv6 Hop Limit or IPv4 Time To Live')
|
|
|
|
|
parser_send.add_argument('--send-mss', nargs=1, type=int,
|
|
|
|
|
help='TCP Maximum Segment Size')
|
|
|
|
|
parser_send.add_argument('--send-seq', nargs=1, type=int,
|
|
|
|
|
help='TCP sequence number')
|
|
|
|
|
parser_send.add_argument('--send-length', nargs=1, type=int,
|
|
|
|
|
default=[len(PAYLOAD_MAGIC)], help='ICMP Echo Request payload size')
|
|
|
|
|
parser_send.add_argument('--send-tc', nargs=1, type=int,
|
|
|
|
|
help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
|
|
|
|
|
parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
|
|
|
|
|
help='Include unaligned TCP options')
|
|
|
|
|
|
|
|
|
|
# Expectations
|
|
|
|
|
parser_expect = parser.add_argument_group('Values expected in sniffed packets')
|
|
|
|
|
parser_expect.add_argument('--expect-flags', nargs=1, type=str,
|
|
|
|
|
help='IPv4 fragmentation flags')
|
|
|
|
|
parser_expect.add_argument('--expect-hlim', nargs=1, type=int,
|
|
|
|
|
help='IPv6 Hop Limit or IPv4 Time To Live')
|
|
|
|
|
parser_expect.add_argument('--expect-mss', nargs=1, type=int,
|
|
|
|
|
help='TCP Maximum Segment Size')
|
|
|
|
|
parser_send.add_argument('--expect-seq', nargs=1, type=int,
|
|
|
|
|
help='TCP sequence number')
|
|
|
|
|
parser_expect.add_argument('--expect-tc', nargs=1, type=int,
|
|
|
|
|
help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
|
|
|
|
|
|
|
|
|
|
parser.add_argument('-v', '--verbose', action='store_true',
|
|
|
|
|
help=('Enable verbose logging. Apart of potentially useful information '
|
|
|
|
|
'you might see warnings from parsing packets like NDP or other '
|
|
|
|
|
'packets not related to the test being run. Use only when '
|
|
|
|
|
'developing because real tests expect empty stderr and stdout.'))
|
|
|
|
|
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = argparse.ArgumentParser("pft_ping.py",
|
|
|
|
|
description="Ping test tool")
|
|
|
|
|
parser.add_argument('--sendif', nargs=1,
|
|
|
|
|
required=True,
|
|
|
|
|
help='The interface through which the packet(s) will be sent')
|
|
|
|
|
parser.add_argument('--recvif', nargs=1,
|
|
|
|
|
help='The interface on which to expect the ICMP echo request')
|
|
|
|
|
parser.add_argument('--replyif', nargs=1,
|
|
|
|
|
help='The interface on which to expect the ICMP echo response')
|
|
|
|
|
parser.add_argument('--checkdup', nargs=1,
|
|
|
|
|
help='The interface on which to expect the duplicated ICMP packets')
|
|
|
|
|
parser.add_argument('--ip6', action='store_true',
|
|
|
|
|
help='Use IPv6')
|
|
|
|
|
parser.add_argument('--to', nargs=1,
|
|
|
|
|
required=True,
|
|
|
|
|
help='The destination IP address for the ICMP echo request')
|
|
|
|
|
parser.add_argument('--fromaddr', nargs=1,
|
|
|
|
|
help='The source IP address for the ICMP echo request')
|
|
|
|
|
args = parse_args()
|
|
|
|
|
|
|
|
|
|
# TCP options
|
|
|
|
|
parser.add_argument('--tcpsyn', action='store_true',
|
|
|
|
|
help='Send a TCP SYN packet')
|
|
|
|
|
parser.add_argument('--tcpopt_unaligned', action='store_true',
|
|
|
|
|
help='Include unaligned TCP options')
|
|
|
|
|
if args.verbose:
|
|
|
|
|
LOGGER.setLevel(logging.DEBUG)
|
|
|
|
|
|
|
|
|
|
# Packet settings
|
|
|
|
|
parser.add_argument('--send-tos', nargs=1,
|
|
|
|
|
help='Set the ToS value for the transmitted packet')
|
|
|
|
|
parser.add_argument('--send-tc', nargs=1,
|
|
|
|
|
help='Set the traffic class value for the transmitted packet')
|
|
|
|
|
# Dig out real values of program arguments
|
|
|
|
|
send_if = args.sendif[0]
|
|
|
|
|
reply_ifs = args.replyif
|
|
|
|
|
recv_ifs = args.recvif
|
|
|
|
|
dst_address = args.to[0]
|
|
|
|
|
|
|
|
|
|
# Expectations
|
|
|
|
|
parser.add_argument('--expect-tos', nargs=1,
|
|
|
|
|
help='The expected ToS value in the received packet')
|
|
|
|
|
parser.add_argument('--expect-tc', nargs=1,
|
|
|
|
|
help='The expected traffic class value in the received packet')
|
|
|
|
|
# Standardize parameters which have nargs=1.
|
|
|
|
|
send_params = {}
|
|
|
|
|
expect_params = {}
|
|
|
|
|
for param_name in ('flags', 'hlim', 'length', 'mss', 'seq', 'tc'):
|
|
|
|
|
param_arg = vars(args).get(f'send_{param_name}')
|
|
|
|
|
send_params[param_name] = param_arg[0] if param_arg else None
|
|
|
|
|
param_arg = vars(args).get(f'expect_{param_name}')
|
|
|
|
|
expect_params[param_name] = param_arg[0] if param_arg else None
|
|
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
expect_params['length'] = send_params['length']
|
|
|
|
|
send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
|
|
|
|
|
send_params['src_address'] = args.fromaddr[0] if args.fromaddr else None
|
|
|
|
|
|
|
|
|
|
# We may not have a default route. Tell scapy where to start looking for routes
|
|
|
|
|
sp.conf.iface6 = args.sendif[0]
|
|
|
|
|
# We may not have a default route. Tell scapy where to start looking for routes
|
|
|
|
|
sp.conf.iface6 = send_if
|
|
|
|
|
|
|
|
|
|
sniffer = None
|
|
|
|
|
if not args.recvif is None:
|
|
|
|
|
checkfn=check_ping_request
|
|
|
|
|
if args.tcpsyn:
|
|
|
|
|
checkfn=check_tcpsyn
|
|
|
|
|
# Configuration sanity checking.
|
|
|
|
|
if not (reply_ifs or recv_ifs):
|
|
|
|
|
raise Exception('With no reply or recv interface specified no traffic '
|
|
|
|
|
'can be sniffed and verified!'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
sniffer = Sniffer(args, checkfn, args.recvif[0])
|
|
|
|
|
sniffers = []
|
|
|
|
|
|
|
|
|
|
replysniffer = None
|
|
|
|
|
if not args.replyif is None:
|
|
|
|
|
checkfn=check_ping_reply
|
|
|
|
|
replysniffer = Sniffer(args, checkfn, args.replyif[0])
|
|
|
|
|
if recv_ifs:
|
|
|
|
|
sniffer_params = copy(expect_params)
|
|
|
|
|
sniffer_params['src_address'] = None
|
|
|
|
|
sniffer_params['dst_address'] = dst_address
|
|
|
|
|
for iface in recv_ifs:
|
|
|
|
|
LOGGER.debug(f'Installing receive sniffer on {iface}')
|
|
|
|
|
sniffers.append(
|
|
|
|
|
setup_sniffer(iface, args.ping_type, 'request', sniffer_params,
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
dupsniffer = None
|
|
|
|
|
if args.checkdup is not None:
|
|
|
|
|
dupsniffer = Sniffer(args, check_dup, args.checkdup[0])
|
|
|
|
|
if reply_ifs:
|
|
|
|
|
sniffer_params = copy(expect_params)
|
|
|
|
|
sniffer_params['src_address'] = dst_address
|
|
|
|
|
sniffer_params['dst_address'] = None
|
|
|
|
|
for iface in reply_ifs:
|
|
|
|
|
LOGGER.debug(f'Installing reply sniffer on {iface}')
|
|
|
|
|
sniffers.append(
|
|
|
|
|
setup_sniffer(iface, args.ping_type, 'reply', sniffer_params,
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
if args.tcpsyn:
|
|
|
|
|
tcpsyn(args.sendif[0], args.to[0], args)
|
|
|
|
|
else:
|
|
|
|
|
if args.ip6:
|
|
|
|
|
ping6(args.sendif[0], args.to[0], args)
|
|
|
|
|
else:
|
|
|
|
|
ping(args.sendif[0], args.to[0], args)
|
|
|
|
|
LOGGER.debug(f'Installed {len(sniffers)} sniffers')
|
|
|
|
|
|
|
|
|
|
if dupsniffer:
|
|
|
|
|
dupsniffer.join()
|
|
|
|
|
if dup_found != 1:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
send_ping(dst_address, send_if, args.ping_type, send_params)
|
|
|
|
|
|
|
|
|
|
if sniffer:
|
|
|
|
|
sniffer.join()
|
|
|
|
|
err = 0
|
|
|
|
|
sniffer_num = 0
|
|
|
|
|
for sniffer in sniffers:
|
|
|
|
|
sniffer.join()
|
|
|
|
|
if sniffer.correctPackets == 1:
|
|
|
|
|
LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
|
|
|
|
|
else:
|
|
|
|
|
# Set a bit in err for each failed sniffer.
|
|
|
|
|
err |= 1<<sniffer_num
|
|
|
|
|
if sniffer.correctPackets > 1:
|
|
|
|
|
LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
|
|
|
|
|
else:
|
|
|
|
|
LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
|
|
|
|
|
sniffer_num += 1
|
|
|
|
|
|
|
|
|
|
if sniffer.correctPackets:
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
else:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
if replysniffer:
|
|
|
|
|
replysniffer.join()
|
|
|
|
|
|
|
|
|
|
if replysniffer.correctPackets:
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
else:
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|
|
|
|
|
sys.exit(main())
|
|
|
|
|