diff --git a/pyproject.toml b/pyproject.toml index a0e1a2b..1d692de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[tool.pylint.master_control] +init-hook='import sys; sys.path += ["tests"]' + [tool.pylint.messages_control] disable = [ "broad-except", @@ -24,10 +27,22 @@ max-line-length = 88 addopts = "-s" asyncio_mode = "auto" -norecursedirs = "tests/manual tests/podman-trex-extract tests/trex*" +norecursedirs = "tests/external_libs tests/manual tests/podman-trex-extract tests/trex*" +# The order of these really matters, and there's serious voodoo, it has to do +# with trex's use of it's own earlier scapy. It's not enough for utpkt to be +# before stress though, it has to be at the top (or somewhere more than just +# above stress..) who knows...; this works. testpaths = [ - "tests", + "tests/utpkt", + "tests/config", + "tests/console", + "tests/errors", + "tests/phynic", + "tests/simplenet", + "tests/verify", + # need to put trex stuff last so normal scapy is not messed up + "tests/stress", ] log_level = "INFO" diff --git a/scripts/extract-trex.sh b/scripts/extract-trex.sh index ee77686..7f78604 100755 --- a/scripts/extract-trex.sh +++ b/scripts/extract-trex.sh @@ -87,5 +87,5 @@ for symdir in trex trex_stl_lib; do set +x done set -x -ln -fs $tdir/external_libs $TESTSDIR/trex-external-libs +ln -fs $tdir/external_libs $TESTSDIR/ set +x diff --git a/tests/common/iptfs.py b/tests/common/iptfs.py index 8db0823..9a814fd 100644 --- a/tests/common/iptfs.py +++ b/tests/common/iptfs.py @@ -18,11 +18,13 @@ # with this program; see the file COPYING; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # +"IPTFS Scapy Functionality" import logging import socket import struct from functools import partial +from common.scapy import ppp from scapy.compat import orb, raw from scapy.config import conf from scapy.data import IP_PROTOS @@ -58,6 +60,8 @@ def sizeof(x): # This was way too hard to figure out. :) class AllPadField(StrLenField): + "A field for representing all-pad content" + def getfield(self, pkt, s): slen = len(s) return s[slen:], self.m2i(pkt, s[:slen]) @@ -67,6 +71,7 @@ def i2repr(self, pkt, x): class IPTFSPad(Packet): + "An IPTFS pad packet" name = "IPTFSPad" fields_desc = [XByteField("zerotype", 0), AllPadField("pad", "")] @@ -78,6 +83,7 @@ def fraglen_from(*args): class IPTFSFrag(Packet): + "An IPTFS Pcaket" __slots__ = ["fraglen"] fields_desc = [StrLenField("frag", None, length_from=fraglen_from)] @@ -94,15 +100,15 @@ def default_payload_class(self, payload): class IPTFSEndFrag(IPTFSFrag): - pass + "IPTFS packet ending a fragment" class IPTFSIPFrag(IPTFSFrag): - pass + "IPTFS packet containing a fragment" class IPTFSIPv6Frag(IPTFSFrag): - pass + "IPTFS packet containing an IPv6 fragment" def get_frag_class_and_len(data): @@ -191,6 +197,7 @@ def _iptfs_decap_pkt_with_frags(ppkt, pkts, curpkt, data): class IPTFSWithFrags(Packet): + "An IPTFS packet which handles fragments" __slots__ = ["offset", "prevpkts"] @@ -239,6 +246,7 @@ def iptfs_decap_pkt_nofrag(ppkt, pkts, curpkt, data): class IPTFS(Packet): + "An IPTFS Packet" name = "IPTFS" fields_desc = [ FlagsField("flags", 0, 8, ["r0", "r1", "r2", "r3", "r4", "ECN", "CC", "V"]), @@ -249,6 +257,7 @@ class IPTFS(Packet): class IPTFSHeader(Packet): + "An IPTFS Header" name = "IPTFSHeader" fields_desc = [ FlagsField("flags", 0, 8, ["r0", "r1", "r2", "r3", "r4", "ECN", "CC", "V"]), @@ -496,6 +505,93 @@ def gen_encrypt_pkts(sa, sw_intf, src, dst, count=1, payload_size=54): ] +def verify_encrypted(src, dst, sa, expected_count, rxs): + decrypt_pkts = [] + for rx in rxs: + # self.assert_packet_checksums_valid(rx) + assert len(rx) - len(Ether()) == rx[IP].len + dpkts = sa.decrypt(rx[IP]).packets + dpkts = [x for x in dpkts if not isinstance(x, IPTFSPad)] + decrypt_pkts += dpkts + + for decrypt_pkt in dpkts: + try: + assert decrypt_pkt.src == src + assert decrypt_pkt.dst == dst + except: + logging.debug(ppp("Unexpected packet:", rx)) + try: + logging.debug(ppp("Decrypted packet:", decrypt_pkt)) + except Exception: # pylint: disable=W0703 + pass + raise + + assert len(decrypt_pkts) == expected_count + # pkts = reassemble4(decrypt_pkts) + # for pkt in pkts: + # self.assert_packet_checksums_valid(pkt) + + +def verify_encrypted_with_frags(self, src, dst, sa, rxs, cmprxs): + dpkts_pcap = [] + oldrxs = [] + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + assert len(rx) - len(Ether()) == rx[IP].len + dpkts_pcap += sa.decrypt_iptfs_pkt(rx[IP], oldrxs) + oldrxs.append(rx) + + # logging.info("XXXYYY: decrypted packets: {}".format( + # len(dpkts_pcap))) + + # for x in dpkts_pcap: + # try: + # # ix = IPTFS(x) + # ix = x + # logging.info("XXXYYY: decrypted pkt:") + # logging.info("dump: {}".format(ix.show(dump=True))) + # except Exception as expkt: + # logging.info("XXXYYY: decrypted pkt: ex: {}".format( + # str(expkt))) + # logging.info( + # "XXXYYY: decrypted pkt: len {} dump: {}".format( + # len(x), x.show(dump=True))) + + # Join fragments into real packets and drop padding return list of + # real packets. + dpkts = decap_frag_stream(dpkts_pcap) + + for decrypt_pkt in dpkts: + # logging.info("XXXYYY: pktlen {} pkt: {}".format( + # len(decrypt_pkt), decrypt_pkt.show(dump=True))) + try: + assert decrypt_pkt.src == src + assert decrypt_pkt.dst == dst + except: + logging.debug(ppp("Unexpected packet:", decrypt_pkt)) + try: + logging.debug(ppp("Decrypted packet:", decrypt_pkt)) + except Exception: # pylint: disable=W0703 + pass + raise + + # logging.info("XXXYYY: dpkts count {} cmprxs count {}".format( + # len(dpkts), len(cmprxs))) + + assert len(dpkts) == len(cmprxs) + # pkts = reassemble4(decrypt_pkts) + # for pkt in pkts: + # self.assert_packet_checksums_valid(pkt) + + +def verify_decrypted(self, src, dst, rxs): + for rx in rxs: + assert rx[IP].src == src + assert rx[IP].dst == dst + self.assert_packet_checksums_valid(rx) + + class SecurityAssociation(ipsec.SecurityAssociation): """ This class is responsible of "encryption" and "decryption" of IPsec IPTFS packets. @@ -574,7 +670,10 @@ def encrypt_esp_raw(self, payload, seq_num=None, iv=None, esn_en=None, esn=None) esp = self.crypt_algo.encrypt( self, esp, self.crypt_key, esn_en=esn_en or self.esn_en, esn=esn or self.esn ) - self.auth_algo.sign(esp, self.auth_key, high_seq_num) + try: + self.auth_algo.sign(esp, self.auth_key, high_seq_num) + except TypeError: + self.auth_algo.sign(esp, self.auth_key) if ip_header.version == 4: ip_header.len = len(ip_header) + len(esp) @@ -608,7 +707,10 @@ def _decrypt_esp( if verify: self.check_spi(pkt) - self.auth_algo.verify(encrypted, self.auth_key, high_seq_num) + try: + self.auth_algo.verify(encrypted, self.auth_key, high_seq_num) + except TypeError: + self.auth_algo.verify(encrypted, self.auth_key) esp = self.crypt_algo.decrypt( self, diff --git a/tests/common/scapy.py b/tests/common/scapy.py index 9e6c7d5..2a3e667 100644 --- a/tests/common/scapy.py +++ b/tests/common/scapy.py @@ -21,13 +21,21 @@ "Classes and functions for using scapy in tests." import logging +import time +# from common.util import Timeout, chunkit +from common.util import get_intf_stats from scapy.arch import get_if_addr, get_if_hwaddr from scapy.config import conf from scapy.layers.inet import ICMP, IP +from scapy.layers.ipsec import ESP from scapy.layers.l2 import ARP, Ether from scapy.packet import Raw -from scapy.sendrecv import sendp, srp +from scapy.sendrecv import AsyncSniffer, sendp, srp + + +def ppp(headline, pkt): + return f"{headline}: {pkt.show(dump=True)}" class Interface: @@ -109,3 +117,188 @@ def gen_ippkts( # pylint: disable=W0221 # wrap around psize = start + (psize - end) return pkts + + +async def gen_pkts( + unet, + sa, + ping="10.0.0.1", + mtu=1500, + psize=0, + pstep=0, + pmax=0, + count=0, + wrap=False, +): + inner_ip_overhead = len(IP() / ICMP(seq=1)) + + psize = max(psize, inner_ip_overhead) + if pstep: + if pmax: + pmaxsize = max(pmax, psize) + else: + pmaxsize = mtu - sa.get_ipsec_overhead() + logging.info("setting pmaxsize to %s", pmaxsize) + + if count: + pcount = count + else: + # Walk spread one time + pcount = (pmaxsize - psize + 1 + pstep - 1) // pstep + if wrap: + pcount *= wrap + else: + pcount = count if count else 100 + pmaxsize = None + + logging.info( + "GENERATING from %s to %s count %s step %s", psize, pmaxsize, pcount, pstep + ) + opkts = gen_ippkts( + unet.tun_if.local_addr, + ping, + payload_size=psize - inner_ip_overhead, + payload_spread=pmaxsize - inner_ip_overhead if pmaxsize else pmaxsize, + inc=pstep, + count=pcount, + ) + maxsz = max(len(x) for x in opkts) + logging.info("GENERATED %s inner packets max size %s", len(opkts), maxsz) + return opkts + + +# XXX There's a few hard coded values in here that probably need cleaning up +def send_recv_esp_pkts( + osa, encpkts, iface, chunksize=30, faster=False, process_recv_pkts=None +): + del chunksize + + rxs, txs, rxerr, txerr = get_intf_stats(iface) + assert max(rxerr) == 0, f"rxerr not 0, is {max(rxerr)}" + assert max(txerr) == 0, f"txerr not 0, is {max(txerr)}" + + # def get_esp_pkts(pkts): + # rawpkts = (x.answer for x in pkts) + # pkts = [x[IP] for x in rawpkts if x.haslayer(ESP)] + # logging.info("RECEIVED %s ipsec packets", len(pkts)) + # return pkts + + def process_esp_pkts(esppkts, nchunk): + idx = 0 + pkts = [] + try: + for idx, esppkt in enumerate(esppkts): + pkts.append(osa.decrypt(esppkt)) + except Exception as error: + logging.error( + "Exception decrypt recv ESP pkts index %s chunk %s: %s\n", + idx, + nchunk, + error, + exc_info=True, + ) + raise + return pkts + + net0sniffer = AsyncSniffer(iface="net0", promisc=1, filter="icmp[0] == 0") + net0sniffer.start() + + net1sniffer = AsyncSniffer( + iface=iface, + # prn=lambda x: print("-"), + promisc=1, + # filter=f"ip proto esp and ip[((ip[0]&0x0f)<<2):4]=={osa.spi}", + filter="dst host 10.0.1.3", + ) + net1sniffer.start() + + # This sleep seems required or the sniffer misses initial packets!? + time.sleep(1) + + logging.info("SENDING %s ipsec/iptfs packets", len(encpkts)) + + outer_pkts = [] + decpkts = [] + + # Really we want to check for kvm + if faster or len(encpkts) <= 20: + sendp(encpkts, iface=iface, inter=0.001) + else: + sendp(encpkts, iface=iface, inter=0.01) + + # nchunk = 0 + # for chunk in chunkit(encpkts, chunksize): + # # logging.info("SENDING gratiutous arp on %s", tun_if.name) + # # tun_if.send_gratuitous_arp() + # logging.info( + # "sending chunk %s with %s ipsec/iptfs packets", nchunk, len(chunk) + # ) + + # nchunk += 1 + # timeout = 10 + # timeo = Timeout(timeout) + # pkts = srp( + # chunk, + # verbose=0, + # timeout=timeout, + # promisc=1, + # nofilter=1, + # iface=iface, + # inter=0.05, + # chainCC=True, # pass up ^C + # ) + # logging.info("srp returns %s", pkts) + + # _esppkts = get_esp_pkts(pkts[0]) + # outer_pkts.extend(_esppkts) + # if len(_esppkts) == 0 and timeo is not None: + # if timeo.is_expired(): + # logging.info("Ending chunking loop as no packets received (timeout)") + # raise TimeoutError() + # logging.info("Ending chunking loop as no packets received (break)") + # raise KeyboardInterrupt() + # _decpkts = process_esp_pkts(_esppkts, nchunk) + # decpkts.extend(_decpkts) + + # # If we arrive here w/o exceptions (from timeout or break) + # # let's take another second to see if we have anymore packets coming. + # timeout = 1 + # logging.info("Waiting %ss for final packets", timeout) + # pkts = sniff(timeout=timeout, promisc=1, nofilter=1, iface=iface) + # logging.info("Final sniff returns %s", pkts) + + # XXX improve this, sleep 2 seconds for things to flush + time.sleep(2) + + net0results = net0sniffer.stop() + # net0results = [] + net1results = net1sniffer.stop() + + # _esppkts = get_esp_pkts(pkts) + pkts = [x[IP] for x in net1results if x.haslayer(ESP)] + # XXX should use iface ip local addr + _esppkts = [x for x in pkts if x.src != "10.0.1.3"] + logging.info("RECEIVED %s ipsec packets", len(_esppkts)) + + outer_pkts.extend(_esppkts) + if _esppkts: + _decpkts = process_esp_pkts(_esppkts, -1) + decpkts.extend(_decpkts) + + if process_recv_pkts: + inner_pkts, other_inner_pkts = process_recv_pkts(decpkts) + else: + inner_pkts, other_inner_pkts = decpkts, [] + + nrxs, ntxs, rxerr, txerr = get_intf_stats(iface) + assert max(rxerr) == 0, f"rxerr not 0, is {max(rxerr)}" + assert max(txerr) == 0, f"txerr not 0, is {max(txerr)}" + logging.info("STATS for %s: RX %s TX %s", iface, nrxs - rxs, ntxs - txs) + + logging.info( + "DECAP %s inner ICMP replies and %s other pkts from %s ipsec pkts", + len(inner_pkts), + len(other_inner_pkts), + len(outer_pkts), + ) + return inner_pkts, outer_pkts, net0results diff --git a/tests/common/send-icmp.py b/tests/common/send-icmp.py index 027f946..d9b6549 100755 --- a/tests/common/send-icmp.py +++ b/tests/common/send-icmp.py @@ -28,7 +28,7 @@ import struct import time -from common.testutil import convert_number, line_rate_to_ip_pps +from common.util import convert_number, line_rate_to_ip_pps ICMP_ECHO = 8 ICMP_ECHO_CODE = 0 diff --git a/tests/common/send-recv.py b/tests/common/send-recv.py index 38656ae..3ddeabb 100755 --- a/tests/common/send-recv.py +++ b/tests/common/send-recv.py @@ -24,7 +24,7 @@ import logging import socket -from testutil import convert_number, get_human_readable +from common.util import convert_number, get_human_readable BLOCKSIZE = 2**20 diff --git a/tests/common/testutil.py b/tests/common/trexutil.py similarity index 71% rename from tests/common/testutil.py rename to tests/common/trexutil.py index 07b7d11..4e373a9 100644 --- a/tests/common/testutil.py +++ b/tests/common/trexutil.py @@ -18,13 +18,15 @@ # with this program; see the file COPYING; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # +"Utility functions for use with trex" import asyncio import datetime import logging import pprint import time from dataclasses import dataclass -from subprocess import check_output + +from common import util logger = logging.getLogger(__name__) @@ -59,233 +61,6 @@ class Args: user_packet_size: int = 0 -class Timeout: - """An object to passively monitor for timeouts.""" - - def __init__(self, delta): - self.started_on = datetime.datetime.now() - self.expires_on = self.started_on + datetime.timedelta(seconds=delta) - - def elapsed(self): - elapsed = datetime.datetime.now() - self.started_on - return elapsed.total_seconds() - - def is_expired(self): - return datetime.datetime.now() > self.expires_on - - -def chunkit(lst, chunk): - for i in range(0, len(lst), chunk): - yield lst[i : i + chunk] - - -def get_intf_stats(intf): - try: - output = check_output(f"ip -s link show {intf}", shell=True, text=True).strip() - lines = output.split("\n") - rxstats = [int(x) for x in lines[3].strip().split()] - txstats = [int(x) for x in lines[5].strip().split()] - return rxstats[1], txstats[1], rxstats[2:-1], txstats[2:-1] - except Exception as error: - logging.error("Got error getting stats: %s", error) - raise - - -def get_human_readable(v): - for suffix in ["", "K", "M", "G"]: - if v < 1000.0: - return "%3.03f%s" % (v, suffix) - v /= 1000 - return "%3.1f%s" % (v, "T") - - -def convert_number(value): - """Convert a number value with a possible suffix to an integer. - - >>> convert_number("100k") == 100 * 1024 - True - >>> convert_number("100M") == 100 * 1000 * 1000 - True - >>> convert_number("100Gi") == 100 * 1024 * 1024 * 1024 - True - >>> convert_number("55") == 55 - True - """ - if value is None: - return None - rate = str(value) - base = 1000 - if rate[-1] == "i": - base = 1024 - rate = rate[:-1] - suffix = "KMGTPEZY" - index = suffix.find(rate[-1]) - if index == -1: - base = 1024 - index = suffix.lower().find(rate[-1]) - if index != -1: - rate = rate[:-1] - return int(rate) * base ** (index + 1) - - -def line_rate_to_ip_pps(l1_rate, ipmtu): - """Convert an L1 ethernet rate to number of IP packets of ipmtu size per second.""" - # Each IP packet requires 8b l1-preamble 14b l2-hdr 4b l2-crc and 12b l1-gap - # The frame not including the preamble and inter frame gap must be at least 64b - # 46b + 14 + 4 == 64 - emtu = 8 + max(64, 14 + ipmtu + 4) + 12 - return float(l1_rate) / (emtu * 8) - - -def ipsec_overhead(gcm, user_pkt_size=None, ipv6=False, udp=False): - """Get the IPSEC payload size given a target IPTFS packet size""" - # IPsec/ESP packets are aligned to 4 byte boundary. - # target_mtu = target_mtu - (target_mtu % 4) - if ipv6: - # 40 - IP header, 8 ESP Header, 2 ESP Footer - o = 40 + 8 + 2 - else: - # 20 - IP header, 8 ESP Header, 2 ESP Footer - o = 20 + 8 + 2 - if user_pkt_size: - # User + Footer must align to 4 byte boundary - over = (user_pkt_size + 2) % 4 - if over: - o += 4 - over - if udp: - o += 8 - if gcm: - o += 8 + 16 # IV + ICV = 1440 - return o - - -def iptfs_payload_size(target_mtu, gcm, cc=False, ipv6=False, udp=False): - """Get the IPTFS payload size given a target IPTFS packet size""" - # IPsec/ESP packets are aligned to 4 byte boundary. - # target_mtu = target_mtu - (target_mtu % 4) - assert target_mtu % 4 == 0 - iptfs_hdr_size = 4 if not cc else 24 - return target_mtu - ipsec_overhead(gcm, None, ipv6, udp) - iptfs_hdr_size - - -def iptfs_payload_rate(l1_rate, target_mtu, gcm, cc=False, ipv6=False, udp=False): - ps = iptfs_payload_size(target_mtu, gcm, cc, ipv6, udp) - return line_rate_to_ip_pps(l1_rate, target_mtu) * ps - - -def line_rate_to_iptfs_encap_pps( - l1_rate, ipmtu, iptfs_mtu, gcm, cc=False, ipv6=False, udp=False -): - """Convert an l1 line rate to number of inner IP packets per second for a given - IP MTU using (or not) GCM encryption - """ - rate = iptfs_payload_rate(l1_rate, iptfs_mtu, gcm, cc, ipv6, udp) - input_pps = rate / ipmtu - return input_pps - # XXX this max should be based on the *physical* line not on the rate we've - # chosen. - # max_pps = line_rate_to_ip_pps(l1_rate, ipmtu) - # return min(max_pps, input_pps) - - -def line_rate_to_etfs_encap_pps( - tunnel_line_rate, - uf_ip_size, # size of IP frame in user packets - tunnel_etfs_mtu, # size of ethernet payload (== etfs encap framesize) - macsec_enabled, -): # true/false - del macsec_enabled - - uf_eth_size = uf_ip_size + 14 - - # - # Calculate ratio of user frames to tunnel frames. In ETFS - # this number is not exact because fragments have a six-octet - # header whereas full-frames have a two-octet header, but we - # should be able to get reasonably close. - # - # Consider two cases (maybe they will reduce to the same formula): - # - # 1. Small user frames. Multiple full user frames fit into a - # single tunnel frame. - # - # A full user frame takes up 2 + uf_eth_size, so the number - # of full frames that fit is: - # - # NF = int(tunnel_etfs_mtu / (2 + uf_eth_size)) - # - # The remainder is likely to be filled with two fragments, one - # at the head of the tunnel frame and one at the tail. We assume - # a uniform distribution of head fragment lengths (i.e., there is - # an arbitrary shift of the contents with respect to the tunnel - # frame). - # - # The number of actual full user frames in a tunnel packet will - # be either NF or NF-1, with a probability depending almost - # linearly on the size of the remainder. We will simplify for - # now and assume that if the remainder is greater than half the - # size of (UF+2), the actual number of full frames is NF, otherwise - # it will be NF-1. - # - # The number of fragments will usually be two. I think the edge - # cases are improbable enough to ignore for this calculation. - # - # 2. Large user frames. Tunnel frames contain either one or two - # fragments. I think this case applies any time NF is 0. - # - - NF = tunnel_etfs_mtu // (2 + uf_eth_size) - - if NF > 0: - # remainder = tunnel_etfs_mtu - (NF * (2 + uf_eth_size)) - # if remainder > (2 + uf_eth_size) / 2: - # full_frame_count = NF - # else: - # full_frame_count = NF - 1 - - full_frame_headers_per_tunnel_frame = NF - fragment_headers_per_tunnel_frame = 2 - - else: - full_frame_headers_per_tunnel_frame = 0 - fragment_headers_per_tunnel_frame = 2 - - payload = ( - tunnel_etfs_mtu - - (2 * full_frame_headers_per_tunnel_frame) - - (6 * fragment_headers_per_tunnel_frame) - ) - - tunnel_packet_rate = line_rate_to_ip_pps(tunnel_line_rate, tunnel_etfs_mtu - 14) - - tunnel_payload_byte_rate = tunnel_packet_rate * payload - - payload_pps = tunnel_payload_byte_rate / uf_eth_size - - return payload_pps - - -def line_rate_to_pps(args, l1_rate, ipmtu, iptfs_mtu): - """Convert an l1 line rate to number of packets per second for a given - IP MTU using (or not) GCM encryption - """ - - gcm = not args.null - if args.forward_only: - pps = line_rate_to_ip_pps(l1_rate, ipmtu) - elif args.dont_use_ipsec: - ip_ohead = 20 if not args.encap_ipv6 else 40 - pps = line_rate_to_ip_pps(l1_rate, ipmtu + ip_ohead) - elif args.dont_use_tfs: - ipsec_ohead = ipsec_overhead(gcm, ipmtu, args.encap_ipv6, args.encap_udp) - pps = line_rate_to_ip_pps(l1_rate, ipmtu + ipsec_ohead) - else: - pps = line_rate_to_iptfs_encap_pps( - l1_rate, ipmtu, iptfs_mtu, gcm, args.cc, args.encap_ipv6, args.encap_udp - ) - return pps - - def get_max_client_rate(c): if not c: return None @@ -316,7 +91,7 @@ def mps(x): pps_sum = sum(x["pps"] for x in imix_table) avg_ipsize = sum(mps(x["size"]) * x["pps"] for x in imix_table) / pps_sum - pps = line_rate_to_pps(args, l1_rate, avg_ipsize, iptfs_mtu) + pps = util.line_rate_to_pps(args, l1_rate, avg_ipsize, iptfs_mtu) if percentage: pps *= percentage / 100 @@ -345,13 +120,13 @@ def get_udp_spread_table(args, c): spread_count = (args.user_packet_size + 1) - minpkt avg_ipsize = sum(range(minpkt, args.user_packet_size + 1)) / spread_count - pps = line_rate_to_pps(args, args.rate, avg_ipsize, args.iptfs_packet_size) + pps = util.line_rate_to_pps(args, args.rate, avg_ipsize, args.iptfs_packet_size) if args.percentage: pps = pps * (args.percentage / 100) # if c: # max_speed = get_max_client_rate(c) - # max_pps = line_rate_to_ip_pps(max_speed, avg_ipsize) + # max_pps = util.line_rate_to_ip_pps(max_speed, avg_ipsize) # if pps > max_pps: # max_speed_float = max_speed / 1e9 # capacity = 100 * max_pps / pps @@ -374,14 +149,14 @@ def get_udp_spread_table(args, c): def get_imix_table(args, c, max_imix_size=1500): if args.user_packet_size: ipsize = args.user_packet_size - pps = line_rate_to_pps(args, args.rate, ipsize, args.iptfs_packet_size) + pps = util.line_rate_to_pps(args, args.rate, ipsize, args.iptfs_packet_size) if args.percentage: pps = pps * (args.percentage / 100) capacity = 0 if c: max_speed = get_max_client_rate(c) - max_pps = line_rate_to_ip_pps(max_speed, ipsize) + max_pps = util.line_rate_to_ip_pps(max_speed, ipsize) if max_speed > 1e9: max_speed_float = f"{max_speed / 1e9}Gbps" elif max_speed > 1e6: @@ -408,7 +183,7 @@ def get_imix_table(args, c, max_imix_size=1500): } ] desc = ( - f"static IP size {ipsize}@{get_human_readable(pps)}pps" + f"static IP size {ipsize}@{util.get_human_readable(pps)}pps" f" {capacity:.1f}% of capacity" ) avg_ipsize = ipsize @@ -456,10 +231,10 @@ def get_imix_table(args, c, max_imix_size=1500): capacity = 0 if c: max_speed = get_max_client_rate(c) - max_pps = line_rate_to_ip_pps(max_speed, avg_ipsize) + max_pps = util.line_rate_to_ip_pps(max_speed, avg_ipsize) capacity = 100 * pps / max_pps desc = ( - f"imix (avg: {avg_ipsize})@{get_human_readable(pps)}pps " + f"imix (avg: {avg_ipsize})@{util.get_human_readable(pps)}pps " f" {capacity:.1f}% of capacity" ) diff --git a/tests/common/util.py b/tests/common/util.py new file mode 100644 index 0000000..70f18a0 --- /dev/null +++ b/tests/common/util.py @@ -0,0 +1,279 @@ +# -*- coding: utf-8 eval: (blacken-mode 1) -*- +# +# October 26 2022, Christian Hopps +# +# Copyright (c) 2022, LabN Consulting, L.L.C. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; see the file COPYING; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +"General purpose utility functions" +from datetime import datetime, timedelta +import re +import logging + +from subprocess import check_output + +def myreadline(f): + buf = "" + while True: + # logging.info("READING 1 CHAR") + c = f.read(1) + if not c: + return buf if buf else None + buf += c + # logging.info("READ CHAR: '%s'", c) + if c == "\n": + return buf + + +def wait_output(p, regex, timeout=120): + retry_until = datetime.now() + timedelta(seconds=timeout) + regex = re.compile(regex) + while datetime.now() < retry_until: + line = myreadline(p.stdout) + if not line: + assert None, f"EOF waiting for '{regex}'" + line = line.rstrip() + if line: + logging.info("GOT LINE: '%s'", line) + m = regex.search(line) + if m: + return m + assert None, f"Failed to get output withint {timeout}s" +class Timeout: + """An object to passively monitor for timeouts.""" + + def __init__(self, delta): + self.started_on = datetime.datetime.now() + self.expires_on = self.started_on + datetime.timedelta(seconds=delta) + + def elapsed(self): + elapsed = datetime.datetime.now() - self.started_on + return elapsed.total_seconds() + + def is_expired(self): + return datetime.datetime.now() > self.expires_on + + +def chunkit(lst, chunk): + for i in range(0, len(lst), chunk): + yield lst[i : i + chunk] + + +def get_intf_stats(intf): + try: + output = check_output(f"ip -s link show {intf}", shell=True, text=True).strip() + lines = output.split("\n") + rxstats = [int(x) for x in lines[3].strip().split()] + txstats = [int(x) for x in lines[5].strip().split()] + return rxstats[1], txstats[1], rxstats[2:-1], txstats[2:-1] + except Exception as error: + logging.error("Got error getting stats: %s", error) + raise + + +def get_human_readable(v): + for suffix in ["", "K", "M", "G"]: + if v < 1000.0: + return "%3.03f%s" % (v, suffix) + v /= 1000 + return "%3.1f%s" % (v, "T") + + +def convert_number(value): + """Convert a number value with a possible suffix to an integer. + + >>> convert_number("100k") == 100 * 1024 + True + >>> convert_number("100M") == 100 * 1000 * 1000 + True + >>> convert_number("100Gi") == 100 * 1024 * 1024 * 1024 + True + >>> convert_number("55") == 55 + True + """ + if value is None: + return None + rate = str(value) + base = 1000 + if rate[-1] == "i": + base = 1024 + rate = rate[:-1] + suffix = "KMGTPEZY" + index = suffix.find(rate[-1]) + if index == -1: + base = 1024 + index = suffix.lower().find(rate[-1]) + if index != -1: + rate = rate[:-1] + return int(rate) * base ** (index + 1) + + +def line_rate_to_ip_pps(l1_rate, ipmtu): + """Convert an L1 ethernet rate to number of IP packets of ipmtu size per second.""" + # Each IP packet requires 8b l1-preamble 14b l2-hdr 4b l2-crc and 12b l1-gap + # The frame not including the preamble and inter frame gap must be at least 64b + # 46b + 14 + 4 == 64 + emtu = 8 + max(64, 14 + ipmtu + 4) + 12 + return float(l1_rate) / (emtu * 8) + + +def ipsec_overhead(gcm, user_pkt_size=None, ipv6=False, udp=False): + """Get the IPSEC payload size given a target IPTFS packet size""" + # IPsec/ESP packets are aligned to 4 byte boundary. + # target_mtu = target_mtu - (target_mtu % 4) + if ipv6: + # 40 - IP header, 8 ESP Header, 2 ESP Footer + o = 40 + 8 + 2 + else: + # 20 - IP header, 8 ESP Header, 2 ESP Footer + o = 20 + 8 + 2 + if user_pkt_size: + # User + Footer must align to 4 byte boundary + over = (user_pkt_size + 2) % 4 + if over: + o += 4 - over + if udp: + o += 8 + if gcm: + o += 8 + 16 # IV + ICV = 1440 + return o + + +def iptfs_payload_size(target_mtu, gcm, cc=False, ipv6=False, udp=False): + """Get the IPTFS payload size given a target IPTFS packet size""" + # IPsec/ESP packets are aligned to 4 byte boundary. + # target_mtu = target_mtu - (target_mtu % 4) + assert target_mtu % 4 == 0 + iptfs_hdr_size = 4 if not cc else 24 + return target_mtu - ipsec_overhead(gcm, None, ipv6, udp) - iptfs_hdr_size + + +def iptfs_payload_rate(l1_rate, target_mtu, gcm, cc=False, ipv6=False, udp=False): + ps = iptfs_payload_size(target_mtu, gcm, cc, ipv6, udp) + return line_rate_to_ip_pps(l1_rate, target_mtu) * ps + + +def line_rate_to_iptfs_encap_pps( + l1_rate, ipmtu, iptfs_mtu, gcm, cc=False, ipv6=False, udp=False +): + """Convert an l1 line rate to number of inner IP packets per second for a given + IP MTU using (or not) GCM encryption + """ + rate = iptfs_payload_rate(l1_rate, iptfs_mtu, gcm, cc, ipv6, udp) + input_pps = rate / ipmtu + return input_pps + # XXX this max should be based on the *physical* line not on the rate we've + # chosen. + # max_pps = line_rate_to_ip_pps(l1_rate, ipmtu) + # return min(max_pps, input_pps) + + +def line_rate_to_etfs_encap_pps( + tunnel_line_rate, + uf_ip_size, # size of IP frame in user packets + tunnel_etfs_mtu, # size of ethernet payload (== etfs encap framesize) + macsec_enabled, +): # true/false + del macsec_enabled + + uf_eth_size = uf_ip_size + 14 + + # + # Calculate ratio of user frames to tunnel frames. In ETFS + # this number is not exact because fragments have a six-octet + # header whereas full-frames have a two-octet header, but we + # should be able to get reasonably close. + # + # Consider two cases (maybe they will reduce to the same formula): + # + # 1. Small user frames. Multiple full user frames fit into a + # single tunnel frame. + # + # A full user frame takes up 2 + uf_eth_size, so the number + # of full frames that fit is: + # + # NF = int(tunnel_etfs_mtu / (2 + uf_eth_size)) + # + # The remainder is likely to be filled with two fragments, one + # at the head of the tunnel frame and one at the tail. We assume + # a uniform distribution of head fragment lengths (i.e., there is + # an arbitrary shift of the contents with respect to the tunnel + # frame). + # + # The number of actual full user frames in a tunnel packet will + # be either NF or NF-1, with a probability depending almost + # linearly on the size of the remainder. We will simplify for + # now and assume that if the remainder is greater than half the + # size of (UF+2), the actual number of full frames is NF, otherwise + # it will be NF-1. + # + # The number of fragments will usually be two. I think the edge + # cases are improbable enough to ignore for this calculation. + # + # 2. Large user frames. Tunnel frames contain either one or two + # fragments. I think this case applies any time NF is 0. + # + + NF = tunnel_etfs_mtu // (2 + uf_eth_size) + + if NF > 0: + # remainder = tunnel_etfs_mtu - (NF * (2 + uf_eth_size)) + # if remainder > (2 + uf_eth_size) / 2: + # full_frame_count = NF + # else: + # full_frame_count = NF - 1 + + full_frame_headers_per_tunnel_frame = NF + fragment_headers_per_tunnel_frame = 2 + + else: + full_frame_headers_per_tunnel_frame = 0 + fragment_headers_per_tunnel_frame = 2 + + payload = ( + tunnel_etfs_mtu + - (2 * full_frame_headers_per_tunnel_frame) + - (6 * fragment_headers_per_tunnel_frame) + ) + + tunnel_packet_rate = line_rate_to_ip_pps(tunnel_line_rate, tunnel_etfs_mtu - 14) + + tunnel_payload_byte_rate = tunnel_packet_rate * payload + + payload_pps = tunnel_payload_byte_rate / uf_eth_size + + return payload_pps + + +def line_rate_to_pps(args, l1_rate, ipmtu, iptfs_mtu): + """Convert an l1 line rate to number of packets per second for a given + IP MTU using (or not) GCM encryption + """ + + gcm = not args.null + if args.forward_only: + pps = line_rate_to_ip_pps(l1_rate, ipmtu) + elif args.dont_use_ipsec: + ip_ohead = 20 if not args.encap_ipv6 else 40 + pps = line_rate_to_ip_pps(l1_rate, ipmtu + ip_ohead) + elif args.dont_use_tfs: + ipsec_ohead = ipsec_overhead(gcm, ipmtu, args.encap_ipv6, args.encap_udp) + pps = line_rate_to_ip_pps(l1_rate, ipmtu + ipsec_ohead) + else: + pps = line_rate_to_iptfs_encap_pps( + l1_rate, ipmtu, iptfs_mtu, gcm, args.cc, args.encap_ipv6, args.encap_udp + ) + return pps diff --git a/tests/conftest.py b/tests/conftest.py index eed4e0c..e7ef076 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,8 +18,14 @@ # with this program; see the file COPYING; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # -"""Fixtures and other utilities imported from munet for testing.""" # pylint: disable=wildcard-import,unused-wildcard-import +# pylint: disable=wrong-import-position +"""Fixtures and other utilities imported from munet for testing.""" + +import glob +import os +import sys + from munet.testing.fixtures import * # noqa from munet.testing.hooks import * # noqa from munet.testing.hooks import pytest_addoption as _pytest_addoption @@ -32,3 +38,19 @@ def pytest_addoption(parser): # pylint: disable=E0102 help="Enable the physical interface based tests", ) return _pytest_addoption(parser) + + +# This still doesn't work and +# def pytest_collection_finish(session): +# found = False +# for item in session.items: +# if "stress" in str(item.path): +# logging.warning("Found stress test '%s', modifying sys.path", item.path) +# found = True +# break +# if found: +# # So gross.. but trex plays stupid games with embedded pkgs and path +# SRCDIR = os.path.dirname(os.path.abspath(__file__)) +# trexlib = os.path.join(SRCDIR, "external_libs") +# scapydir = glob.glob(trexlib + "/scapy*")[0] +# sys.path[0:0] = [scapydir] diff --git a/tests/stress/stress.py b/tests/stress/stress.py index 02582d8..4d85ce4 100644 --- a/tests/stress/stress.py +++ b/tests/stress/stress.py @@ -18,7 +18,9 @@ # with this program; see the file COPYING; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # +# pylint: disable=wrong-import-position "Shared functionality between virtual and physical stress tests." + import glob import logging import os @@ -29,11 +31,11 @@ # So gross.. but trex plays stupid games with embedded pkgs and path SRCDIR = os.path.dirname(os.path.abspath(__file__)) -trexlib = os.path.join(os.path.dirname(SRCDIR), "trex-external-libs") +trexlib = os.path.join(os.path.dirname(SRCDIR), "external_libs") scapydir = glob.glob(trexlib + "/scapy*")[0] sys.path[0:0] = [scapydir] -from common import testutil, trexlib +from common import trexlib, trexutil from common.config import setup_policy_tun, toggle_ipv6 from trex_stl_lib.api import STLClient @@ -145,7 +147,7 @@ async def _test_policy_small_pkt(unet, rate, mode="iptfs", duration=10, connecti # await async_pause_test("after policy setup") - args = testutil.Args( + args = trexutil.Args( rate=rate, user_packet_size=40, duration=duration, connections=connections ) @@ -167,9 +169,9 @@ def get_streams( ) dutlist = [] - imix_table, pps, avg_ipsize, imix_desc = testutil.get_imix_table(args, c) + imix_table, pps, avg_ipsize, imix_desc = trexutil.get_imix_table(args, c) logging.info("pps: %s av_ipsize: %s desc: %s", pps, avg_ipsize, imix_desc) - trex_stats, vstats, _ = await testutil.run_trex_cont_test( + trex_stats, vstats, _ = await trexutil.run_trex_cont_test( args, c, dutlist, @@ -179,14 +181,14 @@ def get_streams( # extended_stats=True) ) c.disconnect() - testutil.finish_test(__name__, args, dutlist, True, trex_stats, vstats) + trexutil.finish_test(__name__, args, dutlist, True, trex_stats, vstats) # await async_cli(unet) async def _test_policy_imix(unet, rate, mode="iptfs", duration=10, connections=1): await setup_policy_tun(unet, ipsec_intf="eth1", mode=mode, trex=True) - args = testutil.Args( + args = trexutil.Args( rate=rate, old_imix=True, duration=duration, connections=connections ) @@ -208,11 +210,11 @@ def get_streams( ) dutlist = [] - imix_table, pps, avg_ipsize, imix_desc = testutil.get_imix_table( + imix_table, pps, avg_ipsize, imix_desc = trexutil.get_imix_table( args, c, max_imix_size=1400 ) logging.info("pps: %s av_ipsize: %s desc: %s", pps, avg_ipsize, imix_desc) - trex_stats, vstats, _ = await testutil.run_trex_cont_test( + trex_stats, vstats, _ = await trexutil.run_trex_cont_test( args, c, dutlist, @@ -222,7 +224,7 @@ def get_streams( # extended_stats=True) ) c.disconnect() - testutil.finish_test(__name__, args, dutlist, True, trex_stats, vstats) + trexutil.finish_test(__name__, args, dutlist, True, trex_stats, vstats) # await async_cli(unet) diff --git a/tests/stress/test_stress.py b/tests/stress/test_stress.py index 6ebdd2e..bc541ac 100644 --- a/tests/stress/test_stress.py +++ b/tests/stress/test_stress.py @@ -23,14 +23,13 @@ import os import pytest +from munet.testing.fixtures import _unet_impl from stress import ( _network_up, _test_policy_imix, _test_policy_small_pkt, convert_number, ) -from munet.testing.fixtures import _unet_impl - # All tests are coroutines pytestmark = pytest.mark.asyncio @@ -84,8 +83,8 @@ async def test_net_up(unet): async def test_policy_small_pkt(unet): - await _test_policy_small_pkt(unet, convert_number("20M")) + await _test_policy_small_pkt(unet, convert_number("8M")) async def test_policy_imix(unet): - await _test_policy_imix(unet, convert_number("20M")) + await _test_policy_imix(unet, convert_number("40M")) diff --git a/tests/utpkt/genpkt.py b/tests/utpkt/genpkt.py index d29f536..3561a30 100755 --- a/tests/utpkt/genpkt.py +++ b/tests/utpkt/genpkt.py @@ -27,7 +27,7 @@ from common import iptfs from common.config import create_scapy_sa_pair from common.scapy import Interface, gen_ippkts -from common.testutil import chunkit, get_intf_stats +from common.util import chunkit, get_intf_stats from scapy.config import conf from scapy.layers.inet import ICMP, IP from scapy.layers.ipsec import ESP diff --git a/tests/utpkt/test_utpkt.py b/tests/utpkt/test_utpkt.py index 080f28c..4f1223e 100644 --- a/tests/utpkt/test_utpkt.py +++ b/tests/utpkt/test_utpkt.py @@ -19,23 +19,21 @@ # with this program; see the file COPYING; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # +# pylint: disable=wrong-import-position +"Unit tests utilizign scapy" +import glob import logging import os -import re import subprocess -import time -from datetime import datetime, timedelta +import sys import pytest -from common import iptfs +from common import iptfs, util from common.config import create_scapy_sa_pair, setup_policy_tun, toggle_ipv6 -from common.scapy import Interface, gen_ippkts -from common.testutil import Timeout, chunkit, get_intf_stats +from common.scapy import Interface, gen_pkts, send_recv_esp_pkts from munet.base import comm_error from scapy.config import conf -from scapy.layers.inet import ICMP, IP -from scapy.layers.ipsec import ESP -from scapy.sendrecv import AsyncSniffer, sendp, sniff, srp +from scapy.layers.inet import ICMP # from munet.cli import async_cli @@ -98,236 +96,67 @@ async def test_net_up(unet): logging.debug(r1repl.cmd_raises("ping -w1 -i.2 -c1 10.0.0.1")) -def myreadline(f): - buf = "" - while True: - # logging.info("READING 1 CHAR") - c = f.read(1) - if not c: - return buf if buf else None - buf += c - # logging.info("READ CHAR: '%s'", c) - if c == "\n": - return buf - - -def _wait_output(p, regex, timeout=120): - retry_until = datetime.now() + timedelta(seconds=timeout) - regex = re.compile(regex) - while datetime.now() < retry_until: - line = myreadline(p.stdout) - if not line: - assert None, f"EOF waiting for '{regex}'" - line = line.rstrip() - if line: - logging.info("GOT LINE: '%s'", line) - m = regex.search(line) - if m: - return m - assert None, f"Failed to get output withint {timeout}s" +def decrypt_decap_iptfs_pkts(osa, encpkts): + """Decrypt a list of packets and then process resulting IPTFS stream""" + idx = 0 + pkts = [] + try: + for idx, esppkt in enumerate(encpkts): + pkts.append(osa.decrypt(esppkt)) + except Exception as error: + logging.error( + "Exception decrypt recv ESP pkts index %s: %s\n", + idx, + error, + exc_info=True, + ) + raise + return iptfs.decap_frag_stream(pkts) def send_recv_iptfs_pkts(osa, encpkts, iface, chunksize=30, faster=False): + def process_pkts(decpkts): + _pkts = iptfs.decap_frag_stream(decpkts) + # Greb echo replies. + inner_pkts = [x for x in _pkts if x.haslayer(ICMP) and x[ICMP].type == 0] + other_inner_pkts = [ + x for x in _pkts if not x.haslayer(ICMP) or x[ICMP].type != 0 + ] + return inner_pkts, other_inner_pkts - rxs, txs, rxerr, txerr = get_intf_stats(iface) - assert max(rxerr) == 0, f"rxerr not 0, is {max(rxerr)}" - assert max(txerr) == 0, f"txerr not 0, is {max(txerr)}" - - def get_esp_pkts(pkts): - rawpkts = (x.answer for x in pkts) - pkts = [x[IP] for x in rawpkts if x.haslayer(ESP)] - logging.info("RECEIVED %s ipsec packets", len(pkts)) - return pkts - - def process_esp_pkts(esppkts, nchunk): - idx = 0 - pkts = [] - try: - for idx, esppkt in enumerate(esppkts): - pkts.append(osa.decrypt(esppkt)) - except Exception as error: - logging.error( - "Exception decrypt recv ESP pkts index %s chunk %s: %s\n", - idx, - nchunk, - error, - exc_info=True, - ) - raise - return pkts - - net0sniffer = AsyncSniffer(iface="net0", promisc=1, filter="icmp[0] == 0") - net0sniffer.start() - - net1sniffer = AsyncSniffer( - iface=iface, - # prn=lambda x: print("-"), - promisc=1, - # filter=f"ip proto esp and ip[((ip[0]&0x0f)<<2):4]=={osa.spi}", - filter="dst host 10.0.1.3", - ) - net1sniffer.start() - - # This sleep seems required or the sniffer misses initial packets!? - time.sleep(1) - - logging.info("SENDING %s ipsec/iptfs packets", len(encpkts)) - - outer_pkts = [] - decpkts = [] - - # Really we want to check for kvm - if faster or len(encpkts) <= 20: - sendp(encpkts, iface=iface, inter=0.001) - else: - sendp(encpkts, iface=iface, inter=0.01) - - # nchunk = 0 - # for chunk in chunkit(encpkts, chunksize): - # # logging.info("SENDING gratiutous arp on %s", tun_if.name) - # # tun_if.send_gratuitous_arp() - # logging.info("sending chunk %s with %s ipsec/iptfs packets", nchunk, len(chunk)) - - # nchunk += 1 - # timeout = 10 - # timeo = Timeout(timeout) - # pkts = srp( - # chunk, - # verbose=0, - # timeout=timeout, - # promisc=1, - # nofilter=1, - # iface=iface, - # inter=0.05, - # chainCC=True, # pass up ^C - # ) - # logging.info("srp returns %s", pkts) - - # _esppkts = get_esp_pkts(pkts[0]) - # outer_pkts.extend(_esppkts) - # if len(_esppkts) == 0 and timeo is not None: - # if timeo.is_expired(): - # logging.info("Ending chunking loop as no packets received (timeout)") - # raise TimeoutError() - # logging.info("Ending chunking loop as no packets received (break)") - # raise KeyboardInterrupt() - # _decpkts = process_esp_pkts(_esppkts, nchunk) - # decpkts.extend(_decpkts) - - # # If we arrive here w/o exceptions (from timeout or break) let's take another second - # # to see if we have anymore packets coming. - # timeout = 1 - # logging.info("Waiting %ss for final packets", timeout) - # pkts = sniff(timeout=timeout, promisc=1, nofilter=1, iface=iface) - # logging.info("Final sniff returns %s", pkts) - - # XXX improve this, sleep 2 seconds for things to flush - time.sleep(2) - - net0results = net0sniffer.stop() - # net0results = [] - net1results = net1sniffer.stop() - - # _esppkts = get_esp_pkts(pkts) - pkts = [x[IP] for x in net1results if x.haslayer(ESP)] - # XXX should use iface ip local addr - _esppkts = [x for x in pkts if x.src != "10.0.1.3"] - logging.info("RECEIVED %s ipsec packets", len(_esppkts)) - - outer_pkts.extend(_esppkts) - if _esppkts: - _decpkts = process_esp_pkts(_esppkts, -1) - decpkts.extend(_decpkts) - - _pkts = iptfs.decap_frag_stream(decpkts) - - # Greb echo replies. - inner_pkts = [x for x in _pkts if x.haslayer(ICMP) and x[ICMP].type == 0] - other_inner_pkts = [x for x in _pkts if not x.haslayer(ICMP) or x[ICMP].type != 0] - - nrxs, ntxs, rxerr, txerr = get_intf_stats(iface) - assert max(rxerr) == 0, f"rxerr not 0, is {max(rxerr)}" - assert max(txerr) == 0, f"txerr not 0, is {max(txerr)}" - logging.info("STATS for %s: RX %s TX %s", iface, nrxs - rxs, ntxs - txs) - - logging.info( - "DECAP %s inner ICMP replies and %s other pkts from %s ipsec pkts", - len(inner_pkts), - len(other_inner_pkts), - len(outer_pkts), - ) - return inner_pkts, outer_pkts, net0results + return send_recv_esp_pkts(osa, encpkts, iface, chunksize, faster, process_pkts) async def gen_pkt_test( unet, - astepf, ping="10.0.0.1", mtu=1500, df=False, - psize=0, - pstep=0, - pmax=0, - count=0, - wrap=False, iface="net1", nofail=False, + **kwargs, ): osa, sa = create_scapy_sa_pair( mtu=mtu, addr1=unet.tun_if.remote_addr, addr2=unet.tun_if.local_addr ) - inner_ip_overhead = len(IP() / ICMP(seq=1)) - - psize = max(psize, inner_ip_overhead) - if pstep: - if pmax: - pmaxsize = max(pmax, psize) - else: - pmaxsize = mtu - sa.get_ipsec_overhead() - logging.info("setting pmaxsize to %s", pmaxsize) - - if count: - pcount = count - else: - # Walk spread one time - pcount = (pmaxsize - psize + 1 + pstep - 1) // pstep - if wrap: - pcount *= wrap - else: - pcount = count if count else 100 - pmaxsize = None - - logging.info( - "GENERATING from %s to %s count %s step %s", psize, pmaxsize, pcount, pstep - ) - opkts = gen_ippkts( - unet.tun_if.local_addr, - ping, - payload_size=psize - inner_ip_overhead, - payload_spread=pmaxsize - inner_ip_overhead if pmaxsize else pmaxsize, - inc=pstep, - count=pcount, - ) - maxsz = max(len(x) for x in opkts) - logging.info("GENERATED %s inner packets max size %s", len(opkts), maxsz) + # + # Create encrypted packet stream with fragmentation + # + opkts = await gen_pkts(unet, sa, mtu=mtu, ping=ping, **kwargs) encpkts = iptfs.gen_encrypt_pktstream_pkts(sa, mtu, opkts, dontfrag=df) encpkts = unet.tun_if.prep_pkts(encpkts) + # + # Send and receive pkts + # r1 = unet.hosts["r1"] - output = r1.conrepl.cmd_nostatus("ip -s link show eth2") - logging.info("r1 eth2:\n%s", output) - output = r1.conrepl.cmd_nostatus("ip -s link show eth1") - logging.info("r1 eth1:\n%s", output) - is_kvm = r1.is_kvm if hasattr(r1, "is_kvm") else False pkts, _, net0pkts = send_recv_iptfs_pkts(osa, encpkts, iface, faster=is_kvm) - output = r1.conrepl.cmd_nostatus("ip -s link show eth2") - logging.info("r1 eth2:\n%s", output) - output = r1.conrepl.cmd_nostatus("ip -s link show eth1") - logging.info("r1 eth1:\n%s", output) - + # + # Analyze results + # nnet0pkts = len(net0pkts) npkts = len(pkts) nopkts = len(opkts) @@ -358,15 +187,15 @@ async def _gen_pkt_test(unet, astepf, expected=None, **kwargs): stderr=subprocess.STDOUT, ) try: - _ = _wait_output(p, "STARTING") + _ = util.wait_output(p, "STARTING") - m = _wait_output(p, r"DECAP (\d+) inner packets") + m = util.wait_output(p, r"DECAP (\d+) inner packets") ndecap = int(m.group(1)) assert ( ndecap == expected ), f"Wrong number ({ndecap}, expected {expected}) return IP packets" - _ = _wait_output(p, "FINISH") + _ = util.wait_output(p, "FINISH") except Exception: if p: @@ -384,15 +213,15 @@ async def _gen_pkt_test(unet, astepf, expected=None, **kwargs): async def test_spread_recv_frag(unet, astepf): await setup_policy_tun(unet, r1only=True, iptfs_opts="dont-frag") await astepf("Prior to gen_pkt_test") - await gen_pkt_test(unet, astepf, psize=0, pstep=1) - # await gen_pkt_test(unet, astepf, psize=1400, pmax=1438, pstep=1) + await gen_pkt_test(unet, psize=0, pstep=1) + # await gen_pkt_test(unet, psize=1400, pmax=1438, pstep=1) async def test_spread_recv_frag_toobig_reply(unet, astepf): await setup_policy_tun(unet, r1only=True, iptfs_opts="dont-frag") await astepf("Prior to gen_pkt_test") npkts, nopkts, nnet0pkts = await gen_pkt_test( - unet, astepf, psize=1442, pmax=1443, pstep=1, nofail=True + unet, psize=1442, pmax=1443, pstep=1, nofail=True ) # one echo reply is too big assert npkts == 1 and nnet0pkts == 2 and nopkts == 2 @@ -401,10 +230,10 @@ async def test_spread_recv_frag_toobig_reply(unet, astepf): async def test_recv_frag(unet, astepf): await setup_policy_tun(unet, r1only=True, iptfs_opts="dont-frag") await astepf("Prior to gen_pkt_test") - await gen_pkt_test(unet, astepf, psize=411, mtu=500, pstep=1, count=2) + await gen_pkt_test(unet, psize=411, mtu=500, pstep=1, count=2) async def test_small_pkt_agg(unet, astepf): await setup_policy_tun(unet, r1only=True, iptfs_opts="dont-frag") await astepf("Prior to gen_pkt_test") - await gen_pkt_test(unet, astepf, count=80) + await gen_pkt_test(unet, count=80)