pf tests: Accelerate tests

Make the tests run slightly faster by having pft_ping.py end the capture
of packets as soon as it sees the expected packet, rather than
continuing to sniff.

MFC after:	2 weeks
This commit is contained in:
kp 2019-03-07 11:09:29 +00:00
parent 29e838d661
commit c225ffd5c2

View File

@ -8,26 +8,38 @@
PAYLOAD_MAGIC = 0x42c0ffee PAYLOAD_MAGIC = 0x42c0ffee
class Sniffer(threading.Thread): class Sniffer(threading.Thread):
def __init__(self, recvif): def __init__(self, args, check_function):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self._recvif = recvif self._args = args
self._recvif = args.recvif[0]
self._check_function = check_function
self.foundCorrectPacket = False
self.start() self.start()
def _checkPacket(self, packet):
ret = self._check_function(self._args, packet)
if ret:
self.foundCorrectPacket = True
return ret
def run(self): def run(self):
self.packets = sp.sniff(iface=self._recvif, timeout=3) self.packets = sp.sniff(iface=self._recvif,
stop_filter=self._checkPacket, timeout=3)
def check_ping_request(packet, dst_ip, args): def check_ping_request(args, packet):
if args.ip6: if args.ip6:
return check_ping6_request(packet, dst_ip, args) return check_ping6_request(args, packet)
else: else:
return check_ping4_request(packet, dst_ip, args) return check_ping4_request(args, packet)
def check_ping4_request(packet, dst_ip, args): def check_ping4_request(args, packet):
""" """
Verify that the packet matches what we'd have sent Verify that the packet matches what we'd have sent
""" """
dst_ip = args.to[0]
ip = packet.getlayer(sp.IP) ip = packet.getlayer(sp.IP)
if not ip: if not ip:
return False return False
@ -54,13 +66,14 @@ def check_ping4_request(packet, dst_ip, args):
% (ip.tos, args.expect_tos[0]) % (ip.tos, args.expect_tos[0])
return False return False
return True return True
def check_ping6_request(packet, dst_ip, args): def check_ping6_request(args, packet):
""" """
Verify that the packet matches what we'd have sent Verify that the packet matches what we'd have sent
""" """
dst_ip = args.to[0]
ip = packet.getlayer(sp.IPv6) ip = packet.getlayer(sp.IPv6)
if not ip: if not ip:
return False return False
@ -124,7 +137,7 @@ def main():
sniffer = None sniffer = None
if not args.recvif is None: if not args.recvif is None:
sniffer = Sniffer(args.recvif[0]) sniffer = Sniffer(args, check_ping_request)
if args.ip6: if args.ip6:
ping6(args.sendif[0], args.to[0], args) ping6(args.sendif[0], args.to[0], args)
@ -134,12 +147,10 @@ def main():
if sniffer: if sniffer:
sniffer.join() sniffer.join()
for packet in sniffer.packets: if sniffer.foundCorrectPacket:
if check_ping_request(packet, args.to[0], args): sys.exit(0)
sys.exit(0) else:
sys.exit(1)
# We did not get the packet we expected
sys.exit(1)
if __name__ == '__main__': if __name__ == '__main__':
main() main()