Skip to content

Commit

Permalink
change RxTx methods for Read and Write
Browse files Browse the repository at this point in the history
  • Loading branch information
soypat committed Dec 6, 2023
1 parent 0a36f9d commit dd6ee8a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
35 changes: 26 additions & 9 deletions stacks/socket_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log/slog"
"net"
"net/netip"
"runtime"
"time"

"github.com/soypat/seqs"
Expand Down Expand Up @@ -87,36 +88,52 @@ func (sock *TCPSocket) FlushOutputBuffer() error {
return nil
}

func (sock *TCPSocket) Send(b []byte) error {
// Write writes argument data to the socket's output buffer which is queued to be sent.
func (sock *TCPSocket) Write(b []byte) (int, error) {
if sock.abortErr != nil {
return sock.abortErr
return 0, sock.abortErr
}
state := sock.State()
if state.IsClosing() || state.IsClosed() {
return net.ErrClosed
return 0, net.ErrClosed
}
if len(b) == 0 {
return nil
return 0, nil
}
err := sock.stack.FlagPendingTCP(sock.localPort)
if err != nil {
return err
return 0, err
}
_, err = sock.tx.Write(b)
n, err := sock.tx.Write(b)
if err != nil {
return err
return 0, err
}
return nil
return n, nil
}

// Read reads data from the socket's input buffer. If the buffer is empty,
// Read will block until data is available.
func (sock *TCPSocket) Read(b []byte) (int, error) {
return sock.ReadDeadline(b, time.Time{})
}

func (sock *TCPSocket) Recv(b []byte) (int, error) {
// BufferedInput returns the number of bytes in the socket's input buffer.
func (sock *TCPSocket) BufferedInput() int { return sock.rx.Buffered() }

// Read reads data from the socket's input buffer. If the buffer is empty
// it will wait until the deadline is met or data is available.
func (sock *TCPSocket) ReadDeadline(b []byte, deadline time.Time) (int, error) {
if sock.abortErr != nil {
return 0, sock.abortErr
}
state := sock.State()
if state.IsClosed() || state.IsClosing() {
return 0, net.ErrClosed
}
noDeadline := deadline.IsZero()
for sock.rx.Buffered() == 0 && sock.State() == seqs.StateEstablished && (noDeadline || time.Until(deadline) > 0) {
runtime.Gosched()
}
n, err := sock.rx.Read(b)
return n, err
}
Expand Down
7 changes: 4 additions & 3 deletions stacks/stacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/soypat/seqs"
"github.com/soypat/seqs/eth"
Expand Down Expand Up @@ -613,8 +614,8 @@ func createPortStacks(t *testing.T, n int) (Stacks []*stacks.PortStack) {
func socketReadAllString(s *stacks.TCPSocket) string {
var str strings.Builder
var buf [1024]byte
for {
n, err := s.Recv(buf[:])
for s.BufferedInput() > 0 {
n, err := s.ReadDeadline(buf[:], time.Time{})
str.Write(buf[:n])
if n == 0 || err != nil {
break
Expand All @@ -624,7 +625,7 @@ func socketReadAllString(s *stacks.TCPSocket) string {
}

func socketSendString(s *stacks.TCPSocket, str string) {
err := s.Send([]byte(str))
_, err := s.Write([]byte(str))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit dd6ee8a

Please sign in to comment.