Skip to content

Commit

Permalink
refactor tests: incorporate TCP buffer size into tests
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Dec 30, 2023
1 parent ea4eef3 commit 6a26e9d
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 45 deletions.
59 changes: 59 additions & 0 deletions internal/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package internal

import "time"

type BackoffFlags uint8

const (
BackoffHasPriority BackoffFlags = 1 << iota
BackoffCriticalPath
)

func NewBackoff(priority BackoffFlags) Backoff {
if priority&BackoffCriticalPath != 0 {
return Backoff{
maxWait: uint32(1 * time.Millisecond),
}
}
return Backoff{
maxWait: uint32(time.Second) >> (priority & BackoffHasPriority),
}
}

// A Backoff with a non-zero MaxWait is ready for use.
type Backoff struct {
// wait defines the amount of time that Miss will wait on next call.
wait uint32
// Maximum allowable value for Wait.
maxWait uint32
// startWait is the value that Wait takes after a call to Hit.
startWait uint32
// expMinusOne is the shift performed on Wait minus one, so the zero value performs a shift of 1.
expMinusOne uint32
}

// Hit sets eb.Wait to the StartWait value.
func (eb *Backoff) Hit() {
if eb.maxWait == 0 {
panic("MaxWait cannot be zero")
}
eb.wait = eb.startWait
}

// Miss sleeps for eb.Wait and increases eb.Wait exponentially.
func (eb *Backoff) Miss() {
const k = 1
wait := eb.wait
maxWait := eb.maxWait
exp := eb.expMinusOne + 1
if maxWait == 0 {
panic("MaxWait cannot be zero")
}
time.Sleep(time.Duration(wait))
wait |= k
wait <<= exp
if wait > maxWait {
wait = maxWait
}
eb.wait = wait
}
9 changes: 9 additions & 0 deletions stacks/port_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ func prand16(seed uint16) uint16 {
return seed
}

// prand32 generates a pseudo random number from a seed.
func prand32[T ~uint32](seed T) T {
/* Algorithm "xor" from p. 4 of Marsaglia, "Xorshift RNGs" */
seed ^= seed << 13
seed ^= seed >> 17
seed ^= seed << 5
return seed
}

// ParseTCPPacket is a convenience function for generating a pkt TCP packet
//
// Deprecated: This function is guaranteed to disappear in the future. Used only in tests.
Expand Down
86 changes: 55 additions & 31 deletions stacks/stacks_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stacks_test

import (
"cmp"
"encoding/hex"
"errors"
"log/slog"
Expand All @@ -19,7 +20,7 @@ const (
testingLargeNetworkSize = 2 // Minimum=2
exchangesToEstablish = 3
exchangesToClose = 3
defaultTCPBufferSize = 2048
defaultMTU = 2048

defaultTestDuplexMessages = 128

Expand All @@ -32,7 +33,7 @@ func TestDHCP(t *testing.T) {
const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network.
requestedIP := netip.AddrFrom4([4]byte{192, 168, 1, 69})
siaddr := netip.AddrFrom4([4]byte{192, 168, 1, 1})
Stacks := createPortStacks(t, networkSize)
Stacks := createPortStacks(t, networkSize, defaultMTU)

clientStack := Stacks[0]
serverStack := Stacks[1]
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestDHCP(t *testing.T) {

func TestARP(t *testing.T) {
const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network.
stacks := createPortStacks(t, networkSize)
stacks := createPortStacks(t, networkSize, 512)

sender := stacks[0]
target := stacks[1]
Expand Down Expand Up @@ -132,7 +133,8 @@ func TestARP(t *testing.T) {
}

func TestTCPEstablish(t *testing.T) {
client, server := createTCPClientServerPair(t)
const bufSizes = 32
client, server := createTCPClientServerPair(t, bufSizes, bufSizes)
// 3 way handshake needs 3 exchanges to complete.
egr := NewExchanger(client.PortStack(), server.PortStack())
wantStates := makeWantStatesHelper(t, client, server)
Expand Down Expand Up @@ -164,8 +166,9 @@ func TestTCPEstablish(t *testing.T) {
}

func TestTCPSendReceive_simplex(t *testing.T) {
const bufSizes = 32
// Create Client+Server and establish TCP connection between them.
client, server := createTCPClientServerPair(t)
client, server := createTCPClientServerPair(t, bufSizes, bufSizes)
egr := NewExchanger(client.PortStack(), server.PortStack())
egr.DoExchanges(t, exchangesToEstablish)
wantStates := makeWantStatesHelper(t, client, server)
Expand All @@ -184,8 +187,9 @@ func TestTCPSendReceive_simplex(t *testing.T) {
}

func TestTCPSendReceive_duplex_single(t *testing.T) {
const bufSizes = 32
// Create Client+Server and establish TCP connection between them.
client, server := createTCPClientServerPair(t)
client, server := createTCPClientServerPair(t, bufSizes, bufSizes)
cstack, sstack := client.PortStack(), server.PortStack()
egr := NewExchanger(cstack, sstack)
egr.DoExchanges(t, exchangesToEstablish)
Expand Down Expand Up @@ -215,7 +219,7 @@ func TestTCPSendReceive_duplex_single(t *testing.T) {

func TestTCPSendReceive_duplex(t *testing.T) {
// Create Client+Server and establish TCP connection between them.
client, server := createTCPClientServerPair(t)
client, server := createTCPClientServerPair(t, 32, 32)
egr := NewExchanger(client.PortStack(), server.PortStack())
egr.DoExchanges(t, exchangesToEstablish)
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
Expand All @@ -227,8 +231,9 @@ func TestTCPSendReceive_duplex(t *testing.T) {
}

func TestTCPClose_noPendingData(t *testing.T) {
const bufSizes = 32
// Create Client+Server and establish TCP connection between them.
client, server := createTCPClientServerPair(t)
client, server := createTCPClientServerPair(t, bufSizes, bufSizes)
egr := NewExchanger(client.PortStack(), server.PortStack())
egr.DoExchanges(t, exchangesToEstablish)
if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished {
Expand Down Expand Up @@ -300,7 +305,8 @@ func TestTCPSocketOpenOfClosedPort(t *testing.T) {
// Create Client+Server and establish TCP connection between them.
const newPortoffset = 1
const newISS = 1337
client, server := createTCPClientServerPair(t)
const bufSizes = 512
client, server := createTCPClientServerPair(t, bufSizes, bufSizes)
cstack, sstack := client.PortStack(), server.PortStack()

egr := NewExchanger(cstack, sstack)
Expand Down Expand Up @@ -395,7 +401,7 @@ func TestPortStackTCPDecoding(t *testing.T) {
ehdr := eth.DecodeEthernetHeader(data)
ps := stacks.NewPortStack(stacks.PortStackConfig{
MaxOpenPortsTCP: 1,
MTU: 2048,
MTU: defaultMTU,
MAC: ehdr.Destination,
})
sock, err := stacks.NewTCPConn(ps, stacks.TCPConnConfig{})
Expand All @@ -414,7 +420,8 @@ func TestPortStackTCPDecoding(t *testing.T) {
}

func TestListener(t *testing.T) {
client, listener := createTCPClientListenerPair(t)
const bufSizes = 2048
client, listener := createTCPClientListenerPair(t, bufSizes, bufSizes, 1)
egr := NewExchanger(client.PortStack(), listener.PortStack())
// Establish the connection on one port.
exdone, _ := egr.DoExchanges(t, exchangesToEstablish)
Expand Down Expand Up @@ -458,7 +465,7 @@ func TestListener(t *testing.T) {
type Exchanger struct {
Stacks []*stacks.PortStack
pipesN []int
pipes [][2048]byte
pipes [][]byte
segments []seqs.Segment
ex int
loglevel slog.Level
Expand All @@ -468,9 +475,20 @@ func NewExchanger(stacks ...*stacks.PortStack) *Exchanger {
egr := &Exchanger{
Stacks: stacks,
pipesN: make([]int, len(stacks)),
pipes: make([][2048]byte, len(stacks)),
pipes: make([][]byte, len(stacks)),
ex: -1,
}
n := 0
for i := range stacks {
n += int(stacks[i].MTU())
}
buf := make([]byte, n)
n = 0
for i := range stacks {
end := n + int(stacks[i].MTU())
egr.pipes[i] = buf[n:end]
n = end
}
return egr
}

Expand All @@ -496,7 +514,6 @@ func (egr *Exchanger) getPayload(istack int) []byte {

func (egr *Exchanger) zeroPayload(istack int) {
egr.pipesN[istack] = 0
egr.pipes[istack] = [2048]byte{}
}

// auxbuf returns an unused buffer for temporary use. Do not hold references to this buffer during calls to HandleTx.
Expand All @@ -506,7 +523,7 @@ func (egr *Exchanger) auxbuf() []byte {
return egr.pipes[istack][:]
}
}
return make([]byte, 2048)
return make([]byte, defaultMTU)
}

func (egr *Exchanger) HandleTx(t *testing.T) (pkts, bytesSent int) {
Expand Down Expand Up @@ -580,22 +597,22 @@ func isDroppedPacket(err error) bool {
return err != nil && (errors.Is(err, stacks.ErrDroppedPacket) || strings.HasPrefix(err.Error(), "drop"))
}

func createTCPClientListenerPair(t *testing.T) (client *stacks.TCPConn, listener *stacks.TCPListener) {
func createTCPClientListenerPair(t *testing.T, clientSizes, listenerSizes, maxListenerConns uint16) (client *stacks.TCPConn, listener *stacks.TCPListener) {
t.Helper()
const (
clientPort = 1025
serverPort = 80
)
Stacks := createPortStacks(t, 2)
Stacks := createPortStacks(t, 2, defaultMTU)
clientStack := Stacks[0]
listenerStack := Stacks[1]

// Configure listener (server).
listenerAddr := netip.AddrPortFrom(listenerStack.Addr(), serverPort)
listener, err := stacks.NewTCPListener(listenerStack, stacks.TCPListenerConfig{
ConnTxBufSize: defaultTCPBufferSize,
MaxConnections: 1,
ConnRxBufSize: defaultTCPBufferSize,
ConnTxBufSize: listenerSizes,
MaxConnections: maxListenerConns,
ConnRxBufSize: listenerSizes,
})
if err != nil {
t.Fatal(err)
Expand All @@ -604,26 +621,26 @@ func createTCPClientListenerPair(t *testing.T) (client *stacks.TCPConn, listener
if err != nil {
t.Fatal(err)
}
client = newTCPDialer(t, clientStack, clientPort, listenerAddr, listenerStack.HardwareAddr6())
client = newTCPDialer(t, clientStack, clientPort, clientSizes, listenerAddr, listenerStack.HardwareAddr6())
return client, listener
}

func createTCPClientServerPair(t *testing.T) (client, server *stacks.TCPConn) {
func createTCPClientServerPair(t *testing.T, clientSizes, serverSizes uint16) (client, server *stacks.TCPConn) {
t.Helper()
const (
clientPort = 1025
serverPort = 80
)
Stacks := createPortStacks(t, 2)
Stacks := createPortStacks(t, 2, defaultMTU)
clientStack := Stacks[0]
serverStack := Stacks[1]

// Configure server
serverIP := netip.AddrPortFrom(serverStack.Addr(), serverPort)

serverTCP, err := stacks.NewTCPConn(serverStack, stacks.TCPConnConfig{
TxBufSize: defaultTCPBufferSize,
RxBufSize: defaultTCPBufferSize,
TxBufSize: serverSizes,
RxBufSize: serverSizes,
})
if err != nil {
t.Fatal(err)
Expand All @@ -632,16 +649,16 @@ func createTCPClientServerPair(t *testing.T) (client, server *stacks.TCPConn) {
if err != nil {
t.Fatal(err)
}
clientTCP := newTCPDialer(t, clientStack, clientPort, serverIP, serverStack.HardwareAddr6())
clientTCP := newTCPDialer(t, clientStack, clientPort, clientSizes, serverIP, serverStack.HardwareAddr6())
return clientTCP, serverTCP
}

func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort uint16, remoteAddr netip.AddrPort, remoteMAC [6]byte) *stacks.TCPConn {
func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort, bufSizes uint16, remoteAddr netip.AddrPort, remoteMAC [6]byte) *stacks.TCPConn {
t.Helper()
// Configure client.
clientTCP, err := stacks.NewTCPConn(localstack, stacks.TCPConnConfig{
TxBufSize: defaultTCPBufferSize,
RxBufSize: defaultTCPBufferSize,
TxBufSize: bufSizes,
RxBufSize: bufSizes,
})
if err != nil {
t.Fatal(err)
Expand All @@ -653,7 +670,7 @@ func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort uint16,
return clientTCP
}

func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) {
func createPortStacks(t *testing.T, n int, mtu uint16) (Stacks []*stacks.PortStack) {
t.Helper()
if n > math.MaxUint16 {
t.Fatal("too many stacks")
Expand All @@ -666,7 +683,7 @@ func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) {
MAC: MAC,
MaxOpenPortsTCP: 1,
MaxOpenPortsUDP: 1,
MTU: 2048,
MTU: mtu,
})
Stack.SetAddr(ip)
Stacks = append(Stacks, Stack)
Expand Down Expand Up @@ -754,3 +771,10 @@ func makeWantStatesHelper(t *testing.T, client, server *stacks.TCPConn) func(cs,
}
}
}

func max[T cmp.Ordered](a, b T) T {
if a > b {
return a
}
return b
}
Loading

0 comments on commit 6a26e9d

Please sign in to comment.