Skip to content
This repository has been archived by the owner on Apr 3, 2021. It is now read-only.

Remove intermediate buffering and solve the Read blocking issue #150

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type TCPConn interface {
Sent(len uint16) error

// Receive will be called when data arrives from TUN.
Receive(data []byte) error
Receive() (<-chan []byte, error)

ReceiveDone(int)

// Err will be called when a fatal error has occurred on the connection.
// The corresponding pcb is already freed when this callback is called
Expand Down
32 changes: 21 additions & 11 deletions core/tcp_callback_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,7 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
}
}

var buf []byte
var totlen = int(p.tot_len)
if p.tot_len == p.len {
buf = (*[1 << 30]byte)(unsafe.Pointer(p.payload))[:totlen:totlen]
} else {
buf = NewBytes(totlen)
defer FreeBytes(buf)
C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0)
}

rerr := conn.(TCPConn).Receive(buf[:totlen])
readCh, rerr := conn.(TCPConn).Receive()
if rerr != nil {
switch rerr.(*lwipError).Code {
case LWIP_ERR_ABRT:
Expand All @@ -109,6 +99,26 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
}
}

select {
case buf, ok := <-readCh:
if !ok {
C.tcp_recved(tpcb, p.tot_len)
C.tcp_shutdown(tpcb, 1, 0)
return C.ERR_OK
}
if len(buf) < int(p.tot_len) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break if the app passes a small buffer.

Instead of failing, why don't you copy as much as the buffer can hold, calling C.tcp_recved(tpcb, len(buf))?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling tcp_recved only cause lwip to advertise a larger TCP window size, there is no way to partly receive a pbuf passed to tcpRecvFn, you either accept a whole pbuf and return ERR_OK, or reject it and return some other errors such as ERR_CONN, lwip would hold the pbuf for the next tcpRecvFn call in the latter case.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry this new behavior would break any client that passes a buffer smaller than the TCP MSS, which seems to be the size of the pbufs. Actually it's worse: it will break if the buffer is shorter than the pbuf chain, since you're using the p.tot_len, not p.len.

I see you have to consume a full pbuf in order to return OK. In that case you could iterate until a full pbuf is read, then return OK.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the blocking issue is re-introduced.

conn.(TCPConn).ReceiveDone(-1)
shouldFreePbuf = false
return C.ERR_CONN
}
C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0)
conn.(TCPConn).ReceiveDone(int(p.tot_len))
C.tcp_recved(tpcb, p.tot_len)
default:
shouldFreePbuf = false
return C.ERR_CONN
}

return C.ERR_OK
}

Expand Down
65 changes: 33 additions & 32 deletions core/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ type tcpConn struct {
connKey uint32
canWrite *sync.Cond // Condition variable to implement TCP backpressure.
state tcpConnState
sndPipeReader *io.PipeReader
sndPipeWriter *io.PipeWriter
readCh chan []byte
readDoneCh chan int
closeOnce sync.Once
readCloseOnce sync.Once
closeErr error
}

Expand All @@ -86,18 +87,17 @@ func newTCPConn(pcb *C.struct_tcp_pcb, handler TCPConnHandler) (TCPConn, error)
setTCPErrCallback(pcb)
setTCPPollCallback(pcb, C.u8_t(TCP_POLL_INTERVAL))

pipeReader, pipeWriter := io.Pipe()
conn := &tcpConn{
pcb: pcb,
handler: handler,
localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)),
remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)),
connKeyArg: connKeyArg,
connKey: connKey,
canWrite: sync.NewCond(&sync.Mutex{}),
state: tcpNewConn,
sndPipeReader: pipeReader,
sndPipeWriter: pipeWriter,
pcb: pcb,
handler: handler,
localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)),
remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)),
connKeyArg: connKeyArg,
connKey: connKey,
canWrite: sync.NewCond(&sync.Mutex{}),
state: tcpNewConn,
readCh: make(chan []byte, 1),
readDoneCh: make(chan int, 1),
}

// Associate conn with key and save to the global map.
Expand Down Expand Up @@ -180,16 +180,15 @@ func (conn *tcpConn) receiveCheck() error {
return nil
}

func (conn *tcpConn) Receive(data []byte) error {
func (conn *tcpConn) Receive() (<-chan []byte, error) {
if err := conn.receiveCheck(); err != nil {
return err
}
n, err := conn.sndPipeWriter.Write(data)
if err != nil {
return NewLWIPError(LWIP_ERR_CLSD)
return nil, err
}
C.tcp_recved(conn.pcb, C.u16_t(n))
return NewLWIPError(LWIP_ERR_OK)
return conn.readCh, nil
}

func (conn *tcpConn) ReceiveDone(n int) {
conn.readDoneCh <- n
}

func (conn *tcpConn) Read(data []byte) (int, error) {
Expand All @@ -204,12 +203,12 @@ func (conn *tcpConn) Read(data []byte) (int, error) {
}
conn.Unlock()

// Handler should get EOF.
n, err := conn.sndPipeReader.Read(data)
if err == io.ErrClosedPipe {
err = io.EOF
conn.readCh <- data
n := <-conn.readDoneCh
if n == -1 {
return 0, errors.New("insufficient read buffer")
}
return n, err
return n, nil
}

// writeInternal enqueues data to snd_buf, and treats ERR_MEM returned by tcp_write not an error,
Expand Down Expand Up @@ -312,7 +311,8 @@ func (conn *tcpConn) CloseWrite() error {
}

func (conn *tcpConn) CloseRead() error {
return conn.sndPipeReader.Close()
conn.readCloseOnce.Do(conn.closeReadCh)
return nil
}

func (conn *tcpConn) Sent(len uint16) error {
Expand Down Expand Up @@ -386,6 +386,11 @@ func (conn *tcpConn) close() {
}
}

func (conn *tcpConn) closeReadCh() {
close(conn.readCh)
close(conn.readDoneCh)
}

func (conn *tcpConn) setLocalClosed() error {
conn.Lock()
defer conn.Unlock()
Expand All @@ -394,9 +399,6 @@ func (conn *tcpConn) setLocalClosed() error {
return nil
}

// Causes the read half of the pipe returns.
conn.sndPipeWriter.Close()

if conn.state == tcpWriteClosed {
conn.state = tcpClosing
} else {
Expand Down Expand Up @@ -464,8 +466,7 @@ func (conn *tcpConn) release() {
freeConnKeyArg(conn.connKeyArg)
tcpConns.Delete(conn.connKey)
}
conn.sndPipeWriter.Close()
conn.sndPipeReader.Close()
conn.readCloseOnce.Do(conn.closeReadCh)
conn.state = tcpClosed
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ go 1.13

require (
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b
golang.org/x/net v0.0.0-20191021144547-ec77196f6094
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
)
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b h1:+y4hCMc/WKsDbAPsOQZgBSaSZ26uh2afyaWeVg/3s/c=
github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191021144547-ec77196f6094 h1:5O4U9trLjNpuhpynaDsqwCk+Tw6seqJz1EbqbnzHrc8=
golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=