From dd6ee8a482d1331e865d62c4111d96b7caeeee22 Mon Sep 17 00:00:00 2001 From: soypat Date: Tue, 5 Dec 2023 21:58:33 -0300 Subject: [PATCH] change RxTx methods for Read and Write --- stacks/socket_tcp.go | 35 ++++++++++++++++++++++++++--------- stacks/stacks_test.go | 7 ++++--- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/stacks/socket_tcp.go b/stacks/socket_tcp.go index 1b9759d..3a64c4b 100644 --- a/stacks/socket_tcp.go +++ b/stacks/socket_tcp.go @@ -6,6 +6,7 @@ import ( "log/slog" "net" "net/netip" + "runtime" "time" "github.com/soypat/seqs" @@ -87,29 +88,41 @@ func (sock *TCPSocket) FlushOutputBuffer() error { return nil } -func (sock *TCPSocket) Send(b []byte) error { +// Write writes argument data to the socket's output buffer which is queued to be sent. +func (sock *TCPSocket) Write(b []byte) (int, error) { if sock.abortErr != nil { - return sock.abortErr + return 0, sock.abortErr } state := sock.State() if state.IsClosing() || state.IsClosed() { - return net.ErrClosed + return 0, net.ErrClosed } if len(b) == 0 { - return nil + return 0, nil } err := sock.stack.FlagPendingTCP(sock.localPort) if err != nil { - return err + return 0, err } - _, err = sock.tx.Write(b) + n, err := sock.tx.Write(b) if err != nil { - return err + return 0, err } - return nil + return n, nil +} + +// Read reads data from the socket's input buffer. If the buffer is empty, +// Read will block until data is available. +func (sock *TCPSocket) Read(b []byte) (int, error) { + return sock.ReadDeadline(b, time.Time{}) } -func (sock *TCPSocket) Recv(b []byte) (int, error) { +// BufferedInput returns the number of bytes in the socket's input buffer. +func (sock *TCPSocket) BufferedInput() int { return sock.rx.Buffered() } + +// Read reads data from the socket's input buffer. If the buffer is empty +// it will wait until the deadline is met or data is available. +func (sock *TCPSocket) ReadDeadline(b []byte, deadline time.Time) (int, error) { if sock.abortErr != nil { return 0, sock.abortErr } @@ -117,6 +130,10 @@ func (sock *TCPSocket) Recv(b []byte) (int, error) { if state.IsClosed() || state.IsClosing() { return 0, net.ErrClosed } + noDeadline := deadline.IsZero() + for sock.rx.Buffered() == 0 && sock.State() == seqs.StateEstablished && (noDeadline || time.Until(deadline) > 0) { + runtime.Gosched() + } n, err := sock.rx.Read(b) return n, err } diff --git a/stacks/stacks_test.go b/stacks/stacks_test.go index 272c127..e66a9e1 100644 --- a/stacks/stacks_test.go +++ b/stacks/stacks_test.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/soypat/seqs" "github.com/soypat/seqs/eth" @@ -613,8 +614,8 @@ func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) { func socketReadAllString(s *stacks.TCPSocket) string { var str strings.Builder var buf [1024]byte - for { - n, err := s.Recv(buf[:]) + for s.BufferedInput() > 0 { + n, err := s.ReadDeadline(buf[:], time.Time{}) str.Write(buf[:n]) if n == 0 || err != nil { break @@ -624,7 +625,7 @@ func socketReadAllString(s *stacks.TCPSocket) string { } func socketSendString(s *stacks.TCPSocket, str string) { - err := s.Send([]byte(str)) + _, err := s.Write([]byte(str)) if err != nil { panic(err) }