Skip to content

Commit

Permalink
test: refactor scapy/ut for reuse and fix trex/ut interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
choppsv1 committed Oct 26, 2022
1 parent 0e8d3a7 commit 6ae85ba
Show file tree
Hide file tree
Showing 13 changed files with 701 additions and 485 deletions.
19 changes: 17 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[tool.pylint.master_control]
init-hook='import sys; sys.path += ["tests"]'

[tool.pylint.messages_control]
disable = [
"broad-except",
Expand All @@ -24,10 +27,22 @@ max-line-length = 88

addopts = "-s"
asyncio_mode = "auto"
norecursedirs = "tests/manual tests/podman-trex-extract tests/trex*"
norecursedirs = "tests/external_libs tests/manual tests/podman-trex-extract tests/trex*"

# The order of these really matters, and there's serious voodoo, it has to do
# with trex's use of it's own earlier scapy. It's not enough for utpkt to be
# before stress though, it has to be at the top (or somewhere more than just
# above stress..) who knows...; this works.
testpaths = [
"tests",
"tests/utpkt",
"tests/config",
"tests/console",
"tests/errors",
"tests/phynic",
"tests/simplenet",
"tests/verify",
# need to put trex stuff last so normal scapy is not messed up
"tests/stress",
]

log_level = "INFO"
Expand Down
2 changes: 1 addition & 1 deletion scripts/extract-trex.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ for symdir in trex trex_stl_lib; do
set +x
done
set -x
ln -fs $tdir/external_libs $TESTSDIR/trex-external-libs
ln -fs $tdir/external_libs $TESTSDIR/
set +x
112 changes: 107 additions & 5 deletions tests/common/iptfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
# with this program; see the file COPYING; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
"IPTFS Scapy Functionality"
import logging
import socket
import struct
from functools import partial

from common.scapy import ppp
from scapy.compat import orb, raw
from scapy.config import conf
from scapy.data import IP_PROTOS
Expand Down Expand Up @@ -58,6 +60,8 @@ def sizeof(x):

# This was way too hard to figure out. :)
class AllPadField(StrLenField):
"A field for representing all-pad content"

def getfield(self, pkt, s):
slen = len(s)
return s[slen:], self.m2i(pkt, s[:slen])
Expand All @@ -67,6 +71,7 @@ def i2repr(self, pkt, x):


class IPTFSPad(Packet):
"An IPTFS pad packet"
name = "IPTFSPad"
fields_desc = [XByteField("zerotype", 0), AllPadField("pad", "")]

Expand All @@ -78,6 +83,7 @@ def fraglen_from(*args):


class IPTFSFrag(Packet):
"An IPTFS Pcaket"
__slots__ = ["fraglen"]

fields_desc = [StrLenField("frag", None, length_from=fraglen_from)]
Expand All @@ -94,15 +100,15 @@ def default_payload_class(self, payload):


class IPTFSEndFrag(IPTFSFrag):
pass
"IPTFS packet ending a fragment"


class IPTFSIPFrag(IPTFSFrag):
pass
"IPTFS packet containing a fragment"


class IPTFSIPv6Frag(IPTFSFrag):
pass
"IPTFS packet containing an IPv6 fragment"


def get_frag_class_and_len(data):
Expand Down Expand Up @@ -191,6 +197,7 @@ def _iptfs_decap_pkt_with_frags(ppkt, pkts, curpkt, data):


class IPTFSWithFrags(Packet):
"An IPTFS packet which handles fragments"

__slots__ = ["offset", "prevpkts"]

Expand Down Expand Up @@ -239,6 +246,7 @@ def iptfs_decap_pkt_nofrag(ppkt, pkts, curpkt, data):


class IPTFS(Packet):
"An IPTFS Packet"
name = "IPTFS"
fields_desc = [
FlagsField("flags", 0, 8, ["r0", "r1", "r2", "r3", "r4", "ECN", "CC", "V"]),
Expand All @@ -249,6 +257,7 @@ class IPTFS(Packet):


class IPTFSHeader(Packet):
"An IPTFS Header"
name = "IPTFSHeader"
fields_desc = [
FlagsField("flags", 0, 8, ["r0", "r1", "r2", "r3", "r4", "ECN", "CC", "V"]),
Expand Down Expand Up @@ -496,6 +505,93 @@ def gen_encrypt_pkts(sa, sw_intf, src, dst, count=1, payload_size=54):
]


def verify_encrypted(src, dst, sa, expected_count, rxs):
decrypt_pkts = []
for rx in rxs:
# self.assert_packet_checksums_valid(rx)
assert len(rx) - len(Ether()) == rx[IP].len
dpkts = sa.decrypt(rx[IP]).packets
dpkts = [x for x in dpkts if not isinstance(x, IPTFSPad)]
decrypt_pkts += dpkts

for decrypt_pkt in dpkts:
try:
assert decrypt_pkt.src == src
assert decrypt_pkt.dst == dst
except:
logging.debug(ppp("Unexpected packet:", rx))
try:
logging.debug(ppp("Decrypted packet:", decrypt_pkt))
except Exception: # pylint: disable=W0703
pass
raise

assert len(decrypt_pkts) == expected_count
# pkts = reassemble4(decrypt_pkts)
# for pkt in pkts:
# self.assert_packet_checksums_valid(pkt)


def verify_encrypted_with_frags(self, src, dst, sa, rxs, cmprxs):
dpkts_pcap = []
oldrxs = []

for rx in rxs:
self.assert_packet_checksums_valid(rx)
assert len(rx) - len(Ether()) == rx[IP].len
dpkts_pcap += sa.decrypt_iptfs_pkt(rx[IP], oldrxs)
oldrxs.append(rx)

# logging.info("XXXYYY: decrypted packets: {}".format(
# len(dpkts_pcap)))

# for x in dpkts_pcap:
# try:
# # ix = IPTFS(x)
# ix = x
# logging.info("XXXYYY: decrypted pkt:")
# logging.info("dump: {}".format(ix.show(dump=True)))
# except Exception as expkt:
# logging.info("XXXYYY: decrypted pkt: ex: {}".format(
# str(expkt)))
# logging.info(
# "XXXYYY: decrypted pkt: len {} dump: {}".format(
# len(x), x.show(dump=True)))

# Join fragments into real packets and drop padding return list of
# real packets.
dpkts = decap_frag_stream(dpkts_pcap)

for decrypt_pkt in dpkts:
# logging.info("XXXYYY: pktlen {} pkt: {}".format(
# len(decrypt_pkt), decrypt_pkt.show(dump=True)))
try:
assert decrypt_pkt.src == src
assert decrypt_pkt.dst == dst
except:
logging.debug(ppp("Unexpected packet:", decrypt_pkt))
try:
logging.debug(ppp("Decrypted packet:", decrypt_pkt))
except Exception: # pylint: disable=W0703
pass
raise

# logging.info("XXXYYY: dpkts count {} cmprxs count {}".format(
# len(dpkts), len(cmprxs)))

assert len(dpkts) == len(cmprxs)
# pkts = reassemble4(decrypt_pkts)
# for pkt in pkts:
# self.assert_packet_checksums_valid(pkt)


def verify_decrypted(self, src, dst, rxs):
for rx in rxs:
assert rx[IP].src == src
assert rx[IP].dst == dst
self.assert_packet_checksums_valid(rx)


class SecurityAssociation(ipsec.SecurityAssociation):
"""
This class is responsible of "encryption" and "decryption" of IPsec IPTFS packets.
Expand Down Expand Up @@ -574,7 +670,10 @@ def encrypt_esp_raw(self, payload, seq_num=None, iv=None, esn_en=None, esn=None)
esp = self.crypt_algo.encrypt(
self, esp, self.crypt_key, esn_en=esn_en or self.esn_en, esn=esn or self.esn
)
self.auth_algo.sign(esp, self.auth_key, high_seq_num)
try:
self.auth_algo.sign(esp, self.auth_key, high_seq_num)
except TypeError:
self.auth_algo.sign(esp, self.auth_key)

if ip_header.version == 4:
ip_header.len = len(ip_header) + len(esp)
Expand Down Expand Up @@ -608,7 +707,10 @@ def _decrypt_esp(

if verify:
self.check_spi(pkt)
self.auth_algo.verify(encrypted, self.auth_key, high_seq_num)
try:
self.auth_algo.verify(encrypted, self.auth_key, high_seq_num)
except TypeError:
self.auth_algo.verify(encrypted, self.auth_key)

esp = self.crypt_algo.decrypt(
self,
Expand Down
Loading

0 comments on commit 6ae85ba

Please sign in to comment.