diff --git a/internal/backoff.go b/internal/backoff.go new file mode 100644 index 0000000..ae58d97 --- /dev/null +++ b/internal/backoff.go @@ -0,0 +1,59 @@ +package internal + +import "time" + +type BackoffFlags uint8 + +const ( + BackoffHasPriority BackoffFlags = 1 << iota + BackoffCriticalPath +) + +func NewBackoff(priority BackoffFlags) Backoff { + if priority&BackoffCriticalPath != 0 { + return Backoff{ + maxWait: uint32(1 * time.Millisecond), + } + } + return Backoff{ + maxWait: uint32(time.Second) >> (priority & BackoffHasPriority), + } +} + +// A Backoff with a non-zero MaxWait is ready for use. +type Backoff struct { + // wait defines the amount of time that Miss will wait on next call. + wait uint32 + // Maximum allowable value for Wait. + maxWait uint32 + // startWait is the value that Wait takes after a call to Hit. + startWait uint32 + // expMinusOne is the shift performed on Wait minus one, so the zero value performs a shift of 1. + expMinusOne uint32 +} + +// Hit sets eb.Wait to the StartWait value. +func (eb *Backoff) Hit() { + if eb.maxWait == 0 { + panic("MaxWait cannot be zero") + } + eb.wait = eb.startWait +} + +// Miss sleeps for eb.Wait and increases eb.Wait exponentially. +func (eb *Backoff) Miss() { + const k = 1 + wait := eb.wait + maxWait := eb.maxWait + exp := eb.expMinusOne + 1 + if maxWait == 0 { + panic("MaxWait cannot be zero") + } + time.Sleep(time.Duration(wait)) + wait |= k + wait <<= exp + if wait > maxWait { + wait = maxWait + } + eb.wait = wait +} diff --git a/stacks/port_tcp.go b/stacks/port_tcp.go index 6e1b196..c221612 100644 --- a/stacks/port_tcp.go +++ b/stacks/port_tcp.go @@ -202,6 +202,15 @@ func prand16(seed uint16) uint16 { return seed } +// prand32 generates a pseudo random number from a seed. +func prand32[T ~uint32](seed T) T { + /* Algorithm "xor" from p. 4 of Marsaglia, "Xorshift RNGs" */ + seed ^= seed << 13 + seed ^= seed >> 17 + seed ^= seed << 5 + return seed +} + // ParseTCPPacket is a convenience function for generating a pkt TCP packet // // Deprecated: This function is guaranteed to disappear in the future. Used only in tests. diff --git a/stacks/stacks_test.go b/stacks/stacks_test.go index 8043375..04a15a9 100644 --- a/stacks/stacks_test.go +++ b/stacks/stacks_test.go @@ -1,6 +1,7 @@ package stacks_test import ( + "cmp" "encoding/hex" "errors" "log/slog" @@ -19,7 +20,7 @@ const ( testingLargeNetworkSize = 2 // Minimum=2 exchangesToEstablish = 3 exchangesToClose = 3 - defaultTCPBufferSize = 2048 + defaultMTU = 2048 defaultTestDuplexMessages = 128 @@ -32,7 +33,7 @@ func TestDHCP(t *testing.T) { const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network. requestedIP := netip.AddrFrom4([4]byte{192, 168, 1, 69}) siaddr := netip.AddrFrom4([4]byte{192, 168, 1, 1}) - Stacks := createPortStacks(t, networkSize) + Stacks := createPortStacks(t, networkSize, defaultMTU) clientStack := Stacks[0] serverStack := Stacks[1] @@ -92,7 +93,7 @@ func TestDHCP(t *testing.T) { func TestARP(t *testing.T) { const networkSize = testingLargeNetworkSize // How many distinct IP/MAC addresses on network. - stacks := createPortStacks(t, networkSize) + stacks := createPortStacks(t, networkSize, 512) sender := stacks[0] target := stacks[1] @@ -132,7 +133,8 @@ func TestARP(t *testing.T) { } func TestTCPEstablish(t *testing.T) { - client, server := createTCPClientServerPair(t) + const bufSizes = 32 + client, server := createTCPClientServerPair(t, bufSizes, bufSizes) // 3 way handshake needs 3 exchanges to complete. egr := NewExchanger(client.PortStack(), server.PortStack()) wantStates := makeWantStatesHelper(t, client, server) @@ -164,8 +166,9 @@ func TestTCPEstablish(t *testing.T) { } func TestTCPSendReceive_simplex(t *testing.T) { + const bufSizes = 32 // Create Client+Server and establish TCP connection between them. - client, server := createTCPClientServerPair(t) + client, server := createTCPClientServerPair(t, bufSizes, bufSizes) egr := NewExchanger(client.PortStack(), server.PortStack()) egr.DoExchanges(t, exchangesToEstablish) wantStates := makeWantStatesHelper(t, client, server) @@ -184,8 +187,9 @@ func TestTCPSendReceive_simplex(t *testing.T) { } func TestTCPSendReceive_duplex_single(t *testing.T) { + const bufSizes = 32 // Create Client+Server and establish TCP connection between them. - client, server := createTCPClientServerPair(t) + client, server := createTCPClientServerPair(t, bufSizes, bufSizes) cstack, sstack := client.PortStack(), server.PortStack() egr := NewExchanger(cstack, sstack) egr.DoExchanges(t, exchangesToEstablish) @@ -215,7 +219,7 @@ func TestTCPSendReceive_duplex_single(t *testing.T) { func TestTCPSendReceive_duplex(t *testing.T) { // Create Client+Server and establish TCP connection between them. - client, server := createTCPClientServerPair(t) + client, server := createTCPClientServerPair(t, 32, 32) egr := NewExchanger(client.PortStack(), server.PortStack()) egr.DoExchanges(t, exchangesToEstablish) if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished { @@ -227,8 +231,9 @@ func TestTCPSendReceive_duplex(t *testing.T) { } func TestTCPClose_noPendingData(t *testing.T) { + const bufSizes = 32 // Create Client+Server and establish TCP connection between them. - client, server := createTCPClientServerPair(t) + client, server := createTCPClientServerPair(t, bufSizes, bufSizes) egr := NewExchanger(client.PortStack(), server.PortStack()) egr.DoExchanges(t, exchangesToEstablish) if client.State() != seqs.StateEstablished || server.State() != seqs.StateEstablished { @@ -300,7 +305,8 @@ func TestTCPSocketOpenOfClosedPort(t *testing.T) { // Create Client+Server and establish TCP connection between them. const newPortoffset = 1 const newISS = 1337 - client, server := createTCPClientServerPair(t) + const bufSizes = 512 + client, server := createTCPClientServerPair(t, bufSizes, bufSizes) cstack, sstack := client.PortStack(), server.PortStack() egr := NewExchanger(cstack, sstack) @@ -395,7 +401,7 @@ func TestPortStackTCPDecoding(t *testing.T) { ehdr := eth.DecodeEthernetHeader(data) ps := stacks.NewPortStack(stacks.PortStackConfig{ MaxOpenPortsTCP: 1, - MTU: 2048, + MTU: defaultMTU, MAC: ehdr.Destination, }) sock, err := stacks.NewTCPConn(ps, stacks.TCPConnConfig{}) @@ -414,7 +420,8 @@ func TestPortStackTCPDecoding(t *testing.T) { } func TestListener(t *testing.T) { - client, listener := createTCPClientListenerPair(t) + const bufSizes = 2048 + client, listener := createTCPClientListenerPair(t, bufSizes, bufSizes, 1) egr := NewExchanger(client.PortStack(), listener.PortStack()) // Establish the connection on one port. exdone, _ := egr.DoExchanges(t, exchangesToEstablish) @@ -458,7 +465,7 @@ func TestListener(t *testing.T) { type Exchanger struct { Stacks []*stacks.PortStack pipesN []int - pipes [][2048]byte + pipes [][]byte segments []seqs.Segment ex int loglevel slog.Level @@ -468,9 +475,20 @@ func NewExchanger(stacks ...*stacks.PortStack) *Exchanger { egr := &Exchanger{ Stacks: stacks, pipesN: make([]int, len(stacks)), - pipes: make([][2048]byte, len(stacks)), + pipes: make([][]byte, len(stacks)), ex: -1, } + n := 0 + for i := range stacks { + n += int(stacks[i].MTU()) + } + buf := make([]byte, n) + n = 0 + for i := range stacks { + end := n + int(stacks[i].MTU()) + egr.pipes[i] = buf[n:end] + n = end + } return egr } @@ -496,7 +514,6 @@ func (egr *Exchanger) getPayload(istack int) []byte { func (egr *Exchanger) zeroPayload(istack int) { egr.pipesN[istack] = 0 - egr.pipes[istack] = [2048]byte{} } // auxbuf returns an unused buffer for temporary use. Do not hold references to this buffer during calls to HandleTx. @@ -506,7 +523,7 @@ func (egr *Exchanger) auxbuf() []byte { return egr.pipes[istack][:] } } - return make([]byte, 2048) + return make([]byte, defaultMTU) } func (egr *Exchanger) HandleTx(t *testing.T) (pkts, bytesSent int) { @@ -580,22 +597,22 @@ func isDroppedPacket(err error) bool { return err != nil && (errors.Is(err, stacks.ErrDroppedPacket) || strings.HasPrefix(err.Error(), "drop")) } -func createTCPClientListenerPair(t *testing.T) (client *stacks.TCPConn, listener *stacks.TCPListener) { +func createTCPClientListenerPair(t *testing.T, clientSizes, listenerSizes, maxListenerConns uint16) (client *stacks.TCPConn, listener *stacks.TCPListener) { t.Helper() const ( clientPort = 1025 serverPort = 80 ) - Stacks := createPortStacks(t, 2) + Stacks := createPortStacks(t, 2, defaultMTU) clientStack := Stacks[0] listenerStack := Stacks[1] // Configure listener (server). listenerAddr := netip.AddrPortFrom(listenerStack.Addr(), serverPort) listener, err := stacks.NewTCPListener(listenerStack, stacks.TCPListenerConfig{ - ConnTxBufSize: defaultTCPBufferSize, - MaxConnections: 1, - ConnRxBufSize: defaultTCPBufferSize, + ConnTxBufSize: listenerSizes, + MaxConnections: maxListenerConns, + ConnRxBufSize: listenerSizes, }) if err != nil { t.Fatal(err) @@ -604,17 +621,17 @@ func createTCPClientListenerPair(t *testing.T) (client *stacks.TCPConn, listener if err != nil { t.Fatal(err) } - client = newTCPDialer(t, clientStack, clientPort, listenerAddr, listenerStack.HardwareAddr6()) + client = newTCPDialer(t, clientStack, clientPort, clientSizes, listenerAddr, listenerStack.HardwareAddr6()) return client, listener } -func createTCPClientServerPair(t *testing.T) (client, server *stacks.TCPConn) { +func createTCPClientServerPair(t *testing.T, clientSizes, serverSizes uint16) (client, server *stacks.TCPConn) { t.Helper() const ( clientPort = 1025 serverPort = 80 ) - Stacks := createPortStacks(t, 2) + Stacks := createPortStacks(t, 2, defaultMTU) clientStack := Stacks[0] serverStack := Stacks[1] @@ -622,8 +639,8 @@ func createTCPClientServerPair(t *testing.T) (client, server *stacks.TCPConn) { serverIP := netip.AddrPortFrom(serverStack.Addr(), serverPort) serverTCP, err := stacks.NewTCPConn(serverStack, stacks.TCPConnConfig{ - TxBufSize: defaultTCPBufferSize, - RxBufSize: defaultTCPBufferSize, + TxBufSize: serverSizes, + RxBufSize: serverSizes, }) if err != nil { t.Fatal(err) @@ -632,16 +649,16 @@ func createTCPClientServerPair(t *testing.T) (client, server *stacks.TCPConn) { if err != nil { t.Fatal(err) } - clientTCP := newTCPDialer(t, clientStack, clientPort, serverIP, serverStack.HardwareAddr6()) + clientTCP := newTCPDialer(t, clientStack, clientPort, clientSizes, serverIP, serverStack.HardwareAddr6()) return clientTCP, serverTCP } -func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort uint16, remoteAddr netip.AddrPort, remoteMAC [6]byte) *stacks.TCPConn { +func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort, bufSizes uint16, remoteAddr netip.AddrPort, remoteMAC [6]byte) *stacks.TCPConn { t.Helper() // Configure client. clientTCP, err := stacks.NewTCPConn(localstack, stacks.TCPConnConfig{ - TxBufSize: defaultTCPBufferSize, - RxBufSize: defaultTCPBufferSize, + TxBufSize: bufSizes, + RxBufSize: bufSizes, }) if err != nil { t.Fatal(err) @@ -653,7 +670,7 @@ func newTCPDialer(t *testing.T, localstack *stacks.PortStack, localPort uint16, return clientTCP } -func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) { +func createPortStacks(t *testing.T, n int, mtu uint16) (Stacks []*stacks.PortStack) { t.Helper() if n > math.MaxUint16 { t.Fatal("too many stacks") @@ -666,7 +683,7 @@ func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) { MAC: MAC, MaxOpenPortsTCP: 1, MaxOpenPortsUDP: 1, - MTU: 2048, + MTU: mtu, }) Stack.SetAddr(ip) Stacks = append(Stacks, Stack) @@ -754,3 +771,10 @@ func makeWantStatesHelper(t *testing.T, client, server *stacks.TCPConn) func(cs, } } } + +func max[T cmp.Ordered](a, b T) T { + if a > b { + return a + } + return b +} diff --git a/stacks/tcpconn.go b/stacks/tcpconn.go index 6aa07ad..1e5431e 100644 --- a/stacks/tcpconn.go +++ b/stacks/tcpconn.go @@ -62,16 +62,17 @@ func NewTCPConn(stack *PortStack, cfg TCPConnConfig) (*TCPConn, error) { if cfg.TxBufSize == 0 { cfg.TxBufSize = defaultSocketSize } - sock := makeTCPConn(stack, cfg) + buf := make([]byte, cfg.RxBufSize+cfg.TxBufSize) + sock := makeTCPConn(stack, buf[:cfg.TxBufSize], buf[cfg.TxBufSize:cfg.TxBufSize+cfg.RxBufSize]) sock.trace("NewTCPConn:end") return &sock, nil } -func makeTCPConn(stack *PortStack, cfg TCPConnConfig) TCPConn { +func makeTCPConn(stack *PortStack, tx, rx []byte) TCPConn { return TCPConn{ stack: stack, - tx: ring{buf: make([]byte, cfg.TxBufSize)}, - rx: ring{buf: make([]byte, cfg.RxBufSize)}, + tx: ring{buf: tx}, + rx: ring{buf: rx}, } } @@ -129,6 +130,7 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) { return 0, err } plen := len(b) + backoff := internal.NewBackoff(internal.BackoffHasPriority) for { if sock.abortErr != nil { return n, sock.abortErr @@ -140,7 +142,14 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) { b = b[ngot:] if n == plen { return n, nil + } else if ngot > 0 { + backoff.Hit() + runtime.Gosched() + } else { + backoff.Miss() } + + sock.trace("TCPConn.Write:insuf-buf", slog.Int("missing", plen-n)) if sock.deadlineExceeded(sock.wdead) { return n, os.ErrDeadlineExceeded } @@ -148,7 +157,6 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) { if err != nil { return n, err } - runtime.Gosched() } } @@ -161,6 +169,7 @@ func (sock *TCPConn) Read(b []byte) (int, error) { } sock.trace("TCPConn.Read:start") connid := sock.connid + backoff := internal.NewBackoff(internal.BackoffHasPriority) for sock.rx.Buffered() == 0 && sock.State() == seqs.StateEstablished { if sock.abortErr != nil { return 0, sock.abortErr @@ -170,7 +179,7 @@ func (sock *TCPConn) Read(b []byte) (int, error) { if sock.deadlineExceeded(sock.rdead) { return 0, os.ErrDeadlineExceeded } - runtime.Gosched() + backoff.Miss() } n, err := sock.rx.Read(b) return n, err diff --git a/stacks/tcplistener.go b/stacks/tcplistener.go index df49596..a9bea17 100644 --- a/stacks/tcplistener.go +++ b/stacks/tcplistener.go @@ -6,7 +6,6 @@ import ( "log/slog" "net" "net/netip" - "runtime" "github.com/soypat/seqs" "github.com/soypat/seqs/internal" @@ -41,12 +40,14 @@ func NewTCPListener(stack *PortStack, cfg TCPListenerConfig) (*TCPListener, erro conns: make([]TCPConn, cfg.MaxConnections), used: make([]bool, cfg.MaxConnections), } - cfgconn := TCPConnConfig{ - TxBufSize: cfg.ConnTxBufSize, - RxBufSize: cfg.ConnRxBufSize, - } + txlen := int(cfg.ConnTxBufSize) + rxlen := int(cfg.ConnRxBufSize) + buf := make([]byte, int(cfg.MaxConnections)*(txlen+rxlen)) for i := range l.conns { - l.conns[i] = makeTCPConn(stack, cfgconn) + offset := i * (txlen + rxlen) + tx := buf[offset : offset+txlen] + rx := buf[offset+txlen : offset+txlen+rxlen] + l.conns[i] = makeTCPConn(stack, tx, rx) } return l, nil } @@ -55,6 +56,7 @@ func NewTCPListener(stack *PortStack, cfg TCPListenerConfig) (*TCPListener, erro // It implements the [net.Listener] interface. func (l *TCPListener) Accept() (net.Conn, error) { connid := l.connid + backoff := internal.NewBackoff(internal.BackoffCriticalPath) for l.isOpen() && connid == l.connid { for i := range l.conns { conn := &l.conns[i] @@ -64,7 +66,7 @@ func (l *TCPListener) Accept() (net.Conn, error) { l.used[i] = true return conn, nil } - runtime.Gosched() + backoff.Miss() } return nil, net.ErrClosed } @@ -174,11 +176,11 @@ func (l *TCPListener) abort() { } func (l *TCPListener) freeConnForReuse(idx int) { + l.iss = prand32(l.iss) conn := &l.conns[idx] l.info("lst:freeConnForReuse", slog.Uint64("lport", uint64(conn.localPort)), slog.Uint64("rport", uint64(conn.remote.Port()))) conn.abort() conn.open(seqs.StateListen, l.port, l.iss, [6]byte{}, netip.AddrPort{}) - l.iss += 3237 l.used[idx] = false }