Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDP Conn implemented #29

Merged
merged 6 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eth/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (ehdr EthernetHeader) AssertType() EtherType { return EtherType(ehdr.SizeOr
// Put marshals the ethernet frame onto buf. buf needs to be 14 bytes in length or Put panics.
func (ehdr *EthernetHeader) Put(buf []byte) {
_ = buf[13]

copy(buf[0:], ehdr.Destination[0:])
copy(buf[6:], ehdr.Source[0:])
binary.BigEndian.PutUint16(buf[12:14], ehdr.SizeOrEtherType)
Expand Down Expand Up @@ -256,7 +257,7 @@ func (ehdr *EthernetHeader) String() string {

// IHL returns the internet header length in 32bit words and is guaranteed to be within 0..15.
// Valid values for IHL are 5..15. When multiplied by 4 this yields number of bytes of the header, 20..60.
func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf }
func (iphdr *IPv4Header) IHL() uint8 { return iphdr.VersionAndIHL & 0xf } //low four bits
func (iphdr *IPv4Header) Version() uint8 { return iphdr.VersionAndIHL >> 4 }
func (iphdr *IPv4Header) DSCP() uint8 { return iphdr.ToS >> 2 }
func (iphdr *IPv4Header) ECN() uint8 { return iphdr.ToS & 0b11 }
Expand Down
38 changes: 33 additions & 5 deletions stacks/port_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (
"github.com/soypat/seqs/eth"
)


// tcphandler represents a user provided function for handling incoming TCP packets on a port.
// Incoming data is sent inside the `pkt` TCPPacket argument when pkt.HasPacket returns true.
// Outgoing data is stored into the `response` byte slice. The function must return the number of
// Incoming data is passed in a 'pkt' to the recv function which is invoked whenever data arrives (by RecvEth)
// Outgoing data is written into the `dst` byte slice (from the tx ring buffer). The function must return the number of
// bytes written to `response` and an error.
//
// TCPConn provides an implemntation of this interface - note .send is ONLY called by HandleEth


nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
// See [PortStack] for information on how to use this function and other port handlers.
type itcphandler interface {
// note TCPConn is our implementation of this interface
type itcphandler interface {
send(dst []byte) (n int, err error)
recv(pkt *TCPPacket) error
// needsHandling() bool
Expand Down Expand Up @@ -51,7 +55,7 @@ func (port *tcpPort) HandleEth(dst []byte) (n int, err error) {
return n, err
}

// Open sets the UDP handler and opens the port.
// Open sets the TCP handler and opens the port.
func (port *tcpPort) Open(portNum uint16, handler itcphandler) {
if portNum == 0 || handler == nil {
panic("invalid port or nil handler" + strconv.Itoa(int(port.port)))
Expand Down Expand Up @@ -193,6 +197,30 @@ func (pkt *TCPPacket) CalculateHeaders(seg seqs.Segment, payload []byte) {
pkt.TCP.Checksum = pkt.TCP.CalculateChecksumIPv4(&pkt.IP, nil, payload)
}

func (pkt *UDPPacket) CalculateHeaders( payload []byte) {
nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
const ipLenInWords = 5
pkt.Eth.SizeOrEtherType = uint16(eth.EtherTypeIPv4)

// IPv4 frame.
pkt.IP.Protocol = 17 // UDP
pkt.IP.TTL = 64
pkt.IP.ID = prand16(pkt.IP.ID)
pkt.IP.VersionAndIHL = ipLenInWords // Sets IHL: No IP options. Version set automatically.
pkt.IP.TotalLength = 4*ipLenInWords + eth.SizeUDPHeader + uint16(len(payload))
// TODO(soypat): Document how to handle ToS. For now just use ToS used by other side.
pkt.IP.Flags = 0 // packet.IP.ToS = 0
pkt.IP.Checksum = pkt.IP.CalculateChecksum()

pkt.UDP = eth.UDPHeader{
SourcePort: pkt.UDP.SourcePort,
DestinationPort: pkt.UDP.DestinationPort,
Checksum : pkt.UDP.CalculateChecksumIPv4(&pkt.IP, payload),
Length: uint16(len(payload)+8),
}

}


// prand16 generates a pseudo random number from a seed.
func prand16(seed uint16) uint16 {
// 16bit Xorshift https://en.wikipedia.org/wiki/Xorshift
Expand Down
19 changes: 11 additions & 8 deletions stacks/port_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,44 @@ type iudphandler interface {
}

type udpPort struct {
ihandler iudphandler
port uint16
handler iudphandler
nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
port uint16
}

func (port udpPort) Port() uint16 { return port.port }

// IsPendingHandling returns true if there are packet(s) pending handling.
func (port *udpPort) IsPendingHandling() bool {
// return port.port != 0 && port.ihandler.isPendingHandling()
return port.port != 0 && port.ihandler.isPendingHandling()
return port.port != 0 && port.handler.isPendingHandling()
}

// HandleEth writes the socket's response into dst to be sent over an ethernet interface.
// HandleEth can return 0 bytes written and a nil error to indicate no action must be taken.
func (port *udpPort) HandleEth(dst []byte) (int, error) {
if port.ihandler == nil {

if port.handler == nil {
panic("nil udp handler on port " + strconv.Itoa(int(port.port)))
}
return port.ihandler.send(dst)

return port.handler.send(dst)
}

// Open sets the UDP handler and opens the port.
// This is effectively a constructor for the port NewUDPPort() - would be an alternative name
func (port *udpPort) Open(portNum uint16, h iudphandler) {
if portNum == 0 || h == nil {
panic("invalid port or nil handler" + strconv.Itoa(int(port.port)))
} else if port.port != 0 {
panic("port already open")
}
port.ihandler = h
port.handler = h
port.port = portNum
}

func (port *udpPort) Close() {
port.port = 0 // Port 0 flags the port is inactive.
port.ihandler = nil
port.handler = nil
}

// UDP socket can be forced to respond even if no packet has been received
Expand All @@ -70,7 +73,7 @@ func (pkt *UDPPacket) PutHeaders(b []byte) {
panic("short UDPPacket buffer")
}
if pkt.IP.IHL() != 5 {
panic("UDPPacket.PutHeaders expects no IP options")
panic("UDPPacket.PutHeaders expects no IP options " + strconv.Itoa(int(pkt.IP.IHL())))
}
pkt.Eth.Put(b)
pkt.IP.Put(b[eth.SizeEthernetHeader:])
Expand Down
70 changes: 37 additions & 33 deletions stacks/portstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ const (
arpOpWait = 0xffff
)

type socket interface {
Close()
IsPendingHandling() bool
HandleEth(dst []byte) (int, error)
}

var modernAge = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)

type ethernethandler = func(ehdr *eth.EthernetHeader, ethPayload []byte) error
Expand Down Expand Up @@ -56,7 +62,7 @@ func NewPortStack(cfg PortStackConfig) *PortStack {

var ErrFlagPending = errors.New("seqs: pending data")

// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RcvEth].
// PortStack implements partial TCP/UDP packet muxing to respective sockets with [PortStack.RecvEth].
// This implementation limits itself basic header validation and port matching.
// Users of PortStack are expected to implement connection state, packet buffering and retransmission logic.
// - In the case of TCP this means implementing the TCP state machine.
Expand Down Expand Up @@ -167,7 +173,6 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
if len(payload) < eth.SizeEthernetHeader+eth.SizeIPv4Header {
return errPacketSmol
} else if len(payload) > int(ps.mtu) {
println("recv", payload, ps.mtu)
return errPacketExceedsMTU
}
ps.trace("Stack.RecvEth:start", slog.Int("plen", len(payload)))
Expand Down Expand Up @@ -267,8 +272,8 @@ func (ps *PortStack) RecvEth(ethernetFrame []byte) (err error) {
pkt.Eth = *ehdr
pkt.IP = ihdr // TODO(soypat): Don't ignore IP options.
pkt.UDP = uhdr
copy(pkt.payload[:], payload)
err = port.ihandler.recv(pkt)
copy(pkt.payload[:], payload) //copies the payload from the EtherNet frame into the UDP packet
err = port.handler.recv(pkt) //<-- where the magic happens - invoking recv(), passes the arrived packet so in can be placed in the RX ring buffer
if err == io.EOF {
// Special case; EOF is flag to close port
err = nil
Expand Down Expand Up @@ -376,10 +381,10 @@ func (ps *PortStack) HandleEth(dst []byte) (n int, err error) {
//
// If a handler returns any other error the port is closed.
func (ps *PortStack) handleEth(dst []byte) (n int, err error) {

switch {
case len(dst) < int(ps.mtu):
return 0, io.ErrShortBuffer

case !ps.IsPendingHandling():
nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
return 0, nil // No remaining packets to handle.
}
Expand All @@ -388,37 +393,10 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) {
return n, nil
}

type Socket interface {
Close()
IsPendingHandling() bool
HandleEth(dst []byte) (int, error)
}

handleSocket := func(dst []byte, sock Socket) (int, bool, error) {
if !sock.IsPendingHandling() {
return 0, false, nil // Nothing to handle, just skip.
}
// Socket has an unhandled packet.
n, err := sock.HandleEth(dst)
if err == ErrFlagPending {
// Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error.
return n, true, nil
}
if err != nil {
sock.Close()
if err == io.EOF {
// Special case: If error is EOF we don't return it to caller but we do write the packet if any.
err = nil
} else {
n = 0 // Clear n on unknown error and return error up the call stack.
}
}
return n, sock.IsPendingHandling(), err
}

isDebug := ps.isLogEnabled(slog.LevelDebug)
socketPending := false
if ps.pendingUDPv4 > 0 {

for i := range ps.portsUDP {
n, pending, err := handleSocket(dst, &ps.portsUDP[i])
if pending {
Expand All @@ -427,6 +405,7 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) {
if err != nil {
return 0, err
} else if n > 0 {

if isDebug {
ps.debug("UDP:send", slog.Int("plen", n))
}
Expand Down Expand Up @@ -462,6 +441,31 @@ func (ps *PortStack) handleEth(dst []byte) (n int, err error) {
return 0, nil // Nothing handled.
}

func handleSocket(dst []byte, sock socket) (int, bool, error) {

//note sock is a UDPport or TCPport - things that impliment the Sock interface

if !sock.IsPendingHandling() {
return 0, false, nil // Nothing to handle, just skip.
}
// Socket has an unhandled packet.
n, err := sock.HandleEth(dst)
if err == ErrFlagPending {
// Special case: Socket may have written data but needs future handling, flagged with the ErrFlagPending error.
return n, true, nil
}
if err != nil {
sock.Close()
if err == io.EOF {
// Special case: If error is EOF we don't return it to caller but we do write the packet if any.
err = nil
} else {
n = 0 // Clear n on unknown error and return error up the call stack.
}
}
return n, sock.IsPendingHandling(), err
}

nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
// IsPendingHandling checks if a call to HandleEth could possibly result in a packet being generated by the PortStack.
func (ps *PortStack) IsPendingHandling() bool {
return ps.pendingUDPv4 > 0 || ps.pendingTCPv4 > 0 || ps.arpClient.isPending()
Expand Down
5 changes: 4 additions & 1 deletion stacks/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ type ring struct {
end int
}

// Write writes b[] into the ring buffer at the current write position (r.end) - wrapping if needed, and increasing r.end (possibly wrapping)
func (r *ring) Write(b []byte) (int, error) {

nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
free := r.Free()
if len(b) > free {
return 0, errRingBufferFull
Expand Down Expand Up @@ -105,11 +107,12 @@ func (r *ring) onReadEnd() {
r.off = 0 // Wrap around.
}
if r.off == r.end {
r.Reset() // We read everything, reset.
r.Reset() // We have read everything, reset. (reduces split reads - can get the data in one chunk more often)
}
}

func max(a, b int) int {

if a > b {
return a
}
Expand Down
14 changes: 9 additions & 5 deletions stacks/tcpconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (

// TCPConn is a userspace implementation of a TCP connection intended for use with PortStack
// though is purposefully loosely coupled. It implements [net.Conn].

nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
type TCPConn struct {
stack *PortStack
rdead time.Time
Expand All @@ -39,7 +40,7 @@ type TCPConn struct {
// remote is the IP+port address of remote.
remote netip.AddrPort
localPort uint16
remoteMAC [6]byte
remoteMAC [6]byte //this is the local peer's MAC address (or often, that of the router/gateway for Internet traffic)
abortErr error
closing bool
// connid is a conenction counter that is incremented each time a new
Expand Down Expand Up @@ -160,8 +161,8 @@ func (sock *TCPConn) Write(b []byte) (n int, _ error) {
}
}

// Read reads data from the socket's input buffer. If the buffer is empty,
// Read will block until data is available.
// Read reads data from the socket's input (RX) (ring) buffer... populating b[]..
// If the rx buffer is empty, Read will block until data is available.
func (sock *TCPConn) Read(b []byte) (int, error) {
err := sock.checkPipeOpen()
if err != nil {
Expand Down Expand Up @@ -331,6 +332,7 @@ func (sock *TCPConn) checkPipeOpen() error {
return nil
}

// recv is called by the PortStack.RecvEth when a packet is received on the network interface, pkt is (a pointer to) the arrived packet.
func (sock *TCPConn) recv(pkt *TCPPacket) (err error) {
nickaxgit marked this conversation as resolved.
Show resolved Hide resolved
sock.trace("TCPConn.recv:start")
prevState := sock.scb.State()
Expand Down Expand Up @@ -380,6 +382,7 @@ func (sock *TCPConn) recv(pkt *TCPPacket) (err error) {
return err
}

// Send this handler is called by the underlying stack and populates response[] from the TX ring buffer, with data to be sent as a packet
func (sock *TCPConn) send(response []byte) (n int, err error) {
defer sock.trace("TCPConn.send:start")
if !sock.remote.IsValid() {
Expand Down Expand Up @@ -414,14 +417,15 @@ func (sock *TCPConn) send(response []byte) (n int, err error) {
var payload []byte
if available > 0 {
payload = response[sizeTCPNoOptions : sizeTCPNoOptions+seg.DATALEN]
//we are reading out of the TX ring buffer, data to encapsulate and send
n, err = sock.tx.Read(payload)
if err != nil && err != io.EOF || n != int(seg.DATALEN) {
panic("bug in handleUser") // This is a bug in ring buffer or a race condition.
panic("unexpected condition in seqs.TCPConn.send") // This is a bug in ring buffer or a race condition.
}
}
sock.setSrcDest(&sock.pkt)
sock.pkt.CalculateHeaders(seg, payload)
sock.pkt.PutHeaders(response)
sock.pkt.PutHeaders(response) //puts the headers in the response bytes array (around the payload)
if prevState != sock.scb.State() {
sock.info("TCP:tx-statechange", slog.Uint64("port", uint64(sock.localPort)), slog.String("old", prevState.String()), slog.String("new", sock.scb.State().String()), slog.String("txflags", seg.Flags.String()))
}
Expand Down
Loading