Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: advanced split options [BREAKING] #311

Merged
merged 14 commits into from
Nov 7, 2024
9 changes: 6 additions & 3 deletions transport/split/stream_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"github.com/Jigsaw-Code/outline-sdk/transport"
)

// splitDialer is a [transport.StreamDialer] that implements the split strategy.
// Use [NewStreamDialer] to create new instances.
type splitDialer struct {
dialer transport.StreamDialer
splitPoint int64
options []Option
}

var _ transport.StreamDialer = (*splitDialer)(nil)

// NewStreamDialer creates a [transport.StreamDialer] that splits the outgoing stream after writing "prefixBytes" bytes
// using [SplitWriter].
func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64) (transport.StreamDialer, error) {
// using the split writer. You can specify multiple sequences with the [AddSplitSequence] option.
func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64, options ...Option) (transport.StreamDialer, error) {
if dialer == nil {
return nil, errors.New("argument dialer must not be nil")
}
Expand All @@ -43,5 +46,5 @@ func (d *splitDialer) DialStream(ctx context.Context, remoteAddr string) (transp
if err != nil {
return nil, err
}
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.splitPoint)), nil
return transport.WrapConn(innerConn, innerConn, NewWriter(innerConn, d.splitPoint, d.options...)), nil
}
65 changes: 52 additions & 13 deletions transport/split/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ import (
"io"
)

type repeatedSplit struct {
count int
bytes int64
}

type splitWriter struct {
writer io.Writer
prefixBytes int64
writer io.Writer
nextSplitBytes int64
remainingSplits []repeatedSplit
}

var _ io.Writer = (*splitWriter)(nil)
Expand All @@ -36,32 +42,65 @@ var _ io.ReaderFrom = (*splitWriterReaderFrom)(nil)
// A write will end right after byte index prefixBytes - 1, before a write starting at byte index prefixBytes.
// For example, if you have a write of [0123456789] and prefixBytes = 3, you will get writes [012] and [3456789].
// If the input writer is a [io.ReaderFrom], the output writer will be too.
func NewWriter(writer io.Writer, prefixBytes int64) io.Writer {
sw := &splitWriter{writer, prefixBytes}
if rf, ok := writer.(io.ReaderFrom); ok {
return &splitWriterReaderFrom{sw, rf}
// It's possible to enable multiple splits with the [AddSplitSequence] option, which adds count splits every skipBytes bytes.
// Example:
// prefixBytes = 1, AddSplitSequence(count=2, bytes=6)
// Array of [0 1 3 2 4 5 6 7 8 9 10 11 12 13 14 15 16 ...] will become
// [0] [1 2 3 4 5 6] [7 8 9 10 11 12] [13 14 15 16 ...]
func NewWriter(writer io.Writer, prefixBytes int64, options ...Option) io.Writer {
sw := &splitWriter{writer: writer, nextSplitBytes: prefixBytes, remainingSplits: []repeatedSplit{}}
for _, option := range options {
option(sw)
}
if len(sw.remainingSplits) == 0 {
// TODO(fortuna): Support ReaderFrom for repeat split.
if rf, ok := writer.(io.ReaderFrom); ok {
return &splitWriterReaderFrom{sw, rf}
}
}
return sw
}

type Option func(w *splitWriter)

// AddSplitSequence will add count splits, each of skipBytes length.
func AddSplitSequence(count int, skipBytes int64) Option {
fortuna marked this conversation as resolved.
Show resolved Hide resolved
return func(w *splitWriter) {
if count > 0 {
w.remainingSplits = append(w.remainingSplits, repeatedSplit{count: count, bytes: skipBytes})
}
}
}

func (w *splitWriterReaderFrom) ReadFrom(source io.Reader) (int64, error) {
reader := io.MultiReader(io.LimitReader(source, w.prefixBytes), source)
reader := io.MultiReader(io.LimitReader(source, w.nextSplitBytes), source)
fortuna marked this conversation as resolved.
Show resolved Hide resolved
written, err := w.rf.ReadFrom(reader)
w.prefixBytes -= written
w.nextSplitBytes -= written
return written, err
}

func (w *splitWriter) Write(data []byte) (written int, err error) {
if 0 < w.prefixBytes && w.prefixBytes < int64(len(data)) {
written, err = w.writer.Write(data[:w.prefixBytes])
w.prefixBytes -= int64(written)
for 0 < w.nextSplitBytes && w.nextSplitBytes < int64(len(data)) {
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
dataToSend := data[:w.nextSplitBytes]
n, err := w.writer.Write(dataToSend)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we discovered during the experiments: it's not enough to just split data into multiple writes.

OS sometimes unites the writes anyways.

smth like https://github.com/hufrea/byedpi/blob/75671fa11c57aa458e87d876efc48b40985ed78f/desync.c#L88 or just a small Sleep is required to make it reliable.

Maybe there's a better way (somehow make the OS buffer sizes smaller?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were the experiments using Go? I have not seen that happen. I'd appreciate a way to reproduce it before we can add and wait.

In Go each TCPConn.Write will correspond to one write system call. With the NoDelay enabled (the default), it ends up being one write consistently. Make sure you don't have TCP_CORK enabled.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were the experiments using Go?

yes

With the NoDelay enabled (the default), it ends up being one write consistently. Make sure you don't have TCP_CORK enabled.

I'll try to run this on my machine (I don't remember the details about the test we did IRL)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For TCP sockets, Go will try to send packets ASAP without delays: https://pkg.go.dev/net#TCPConn.SetNoDelay

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing about NoDelay: Documentation says "meaning that data is sent as soon as possible after a Write".

We don't have any guarantees that "as soon is possible" is before the next write. It might happen later.

I took 4801a03, manually configured it to spit 999 times per 1 byte and see the following picture when running fetch:

00:18:59.021904 IP nyanpc.55852 > opennet.ru.http: Flags [S], seq 500399592, win 64240, options [mss 1460,sackOK,TS val 1345057543 ecr 0,nop,wscale 7], length 0
00:18:59.069457 IP opennet.ru.http > nyanpc.55852: Flags [S.], seq 4096694502, ack 500399593, win 28960, options [mss 1452,sackOK,TS val 4132463171 ecr 1345057543,nop,wscale 7], length 0
00:18:59.069476 IP nyanpc.55852 > opennet.ru.http: Flags [.], ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 0
00:18:59.069708 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 1:2, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 1: HTTP
00:18:59.069712 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 2:4, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 2: HTTP
00:18:59.069720 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 4:20, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 16: HTTP
00:18:59.069770 IP nyanpc.55852 > opennet.ru.http: Flags [P.], seq 20:92, ack 1, win 502, options [nop,nop,TS val 1345057590 ecr 4132463171], length 72: HTTP
00:18:59.117256 IP opennet.ru.http > nyanpc.55852: Flags [.], ack 2, win 227, options [nop,nop,TS val 4132463219 ecr 1345057590], length 0

The first packet is len 1 (expected), but then it's 2, 16, 72.

If I add time.Sleep(time.Millisecond) after the write I see what I expect to see:

00:23:20.956161 IP nyanpc.35648 > opennet.ru.http: Flags [S], seq 3537089940, win 64240, options [mss 1460,sackOK,TS val 1345319477 ecr 0,nop,wscale 7], length 0
00:23:21.003374 IP opennet.ru.http > nyanpc.35648: Flags [S.], seq 1632357611, ack 3537089941, win 28960, options [mss 1452,sackOK,TS val 4132725105 ecr 1345319477,nop,wscale 7], length 0
00:23:21.003389 IP nyanpc.35648 > opennet.ru.http: Flags [.], ack 1, win 502, options [nop,nop,TS val 1345319524 ecr 4132725105], length 0
00:23:21.003585 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 1:2, ack 1, win 502, options [nop,nop,TS val 1345319524 ecr 4132725105], length 1: HTTP
00:23:21.004670 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 2:3, ack 1, win 502, options [nop,nop,TS val 1345319525 ecr 4132725105], length 1: HTTP
00:23:21.005743 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 3:4, ack 1, win 502, options [nop,nop,TS val 1345319526 ecr 4132725105], length 1: HTTP
00:23:21.006810 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 4:5, ack 1, win 502, options [nop,nop,TS val 1345319527 ecr 4132725105], length 1: HTTP
00:23:21.007874 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 5:6, ack 1, win 502, options [nop,nop,TS val 1345319529 ecr 4132725105], length 1: HTTP
00:23:21.008934 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 6:7, ack 1, win 502, options [nop,nop,TS val 1345319530 ecr 4132725105], length 1: HTTP
00:23:21.010007 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 7:8, ack 1, win 502, options [nop,nop,TS val 1345319531 ecr 4132725105], length 1: HTTP
00:23:21.011070 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 8:9, ack 1, win 502, options [nop,nop,TS val 1345319532 ecr 4132725105], length 1: HTTP
00:23:21.012130 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 9:10, ack 1, win 502, options [nop,nop,TS val 1345319533 ecr 4132725105], length 1: HTTP
00:23:21.013168 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 10:11, ack 1, win 502, options [nop,nop,TS val 1345319534 ecr 4132725105], length 1: HTTP
00:23:21.051078 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 2, win 227, options [nop,nop,TS val 4132725152 ecr 1345319524], length 0
00:23:21.051084 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 11:46, ack 1, win 502, options [nop,nop,TS val 1345319572 ecr 4132725152], length 35: HTTP
00:23:21.051205 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 46:47, ack 1, win 502, options [nop,nop,TS val 1345319572 ecr 4132725152], length 1: HTTP
00:23:21.051745 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 3, win 227, options [nop,nop,TS val 4132725153 ecr 1345319525], length 0
00:23:21.052274 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 47:48, ack 1, win 502, options [nop,nop,TS val 1345319573 ecr 4132725153], length 1: HTTP
00:23:21.052719 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 4, win 227, options [nop,nop,TS val 4132725154 ecr 1345319526], length 0
00:23:21.053341 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 48:49, ack 1, win 502, options [nop,nop,TS val 1345319574 ecr 4132725154], length 1: HTTP
00:23:21.053974 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 5, win 227, options [nop,nop,TS val 4132725155 ecr 1345319527], length 0
00:23:21.054405 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 49:50, ack 1, win 502, options [nop,nop,TS val 1345319575 ecr 4132725155], length 1: HTTP
00:23:21.055214 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 6, win 227, options [nop,nop,TS val 4132725157 ecr 1345319529], length 0
00:23:21.055464 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 50:51, ack 1, win 502, options [nop,nop,TS val 1345319576 ecr 4132725157], length 1: HTTP
00:23:21.056222 IP opennet.ru.http > nyanpc.35648: Flags [.], ack 7, win 227, options [nop,nop,TS val 4132725158 ecr 1345319530], length 0
00:23:21.056497 IP nyanpc.35648 > opennet.ru.http: Flags [P.], seq 51:52, ack 1, win 502, options [nop,nop,TS val 1345319577 ecr 4132725158], length 1: HTTP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added 100us sleep. I wonder if that's enough or if you need a full millisecond.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why you are getting 35 there?

No idea, likely some kind of congestion control kicking it.

I added 100us sleep. I wonder if that's enough or if you need a full millisecond.

If congestion control is causing this, relying on delays won't be good enough for mobile network with high latencies. Need some experimentation, and maybe putting the delay into config.

Porting https://github.com/hufrea/byedpi/blob/75671fa11c57aa458e87d876efc48b40985ed78f/desync.c#L88 for android should also be possible, but tricky. I wonder if this option is also supported on ios, but there's a solution for windows.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://pkg.go.dev/net#TCPConn.SetWriteBuffer

hmm, is this what we need?

setting write buffer to a relatively low value when doing splits, then maybe restoring it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@derlaft What operating system are you using?

I get it working fine on macOS without the sleep:
image

FYI, I pushed the config changes. Can you try it again with the new code?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

linux

I get it working fine on macOS without the sleep:

looks like mac stack is simpler in that aspect (which is good news, means we won't likely have to do anything for ios as well?)

FYI, I pushed the config changes. Can you try it again with the new code?

sure, will do that today in the evening

written += n
w.nextSplitBytes -= int64(n)
if err != nil {
return written, err
}
data = data[written:]
data = data[n:]

// Split done. Update nextSplitBytes.
if len(w.remainingSplits) > 0 {
fortuna marked this conversation as resolved.
Show resolved Hide resolved
w.nextSplitBytes = w.remainingSplits[0].bytes
w.remainingSplits[0].count -= 1
if w.remainingSplits[0].count == 0 {
w.remainingSplits = w.remainingSplits[1:]
}
}
}
n, err := w.writer.Write(data)
written += n
w.prefixBytes -= int64(n)
w.nextSplitBytes -= int64(n)
return written, err
}
24 changes: 24 additions & 0 deletions transport/split/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,30 @@ func TestWrite_Compound(t *testing.T) {
require.Equal(t, [][]byte{[]byte("R"), []byte("equ"), []byte("est")}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes5(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 1, AddSplitSequence(3, 5))
n, err := splitWriter.Write([]byte("RequestRequestRequest."))
require.NoError(t, err)
require.Equal(t, 7*3+1, n)
require.Equal(t, [][]byte{
[]byte("R"), // prefix
[]byte("eques"), // split 1
[]byte("tRequ"), // split 2
[]byte("estRe"), // split 3
[]byte("quest."), // tail
}, innerWriter.writes)
}

func TestWrite_RepeatNumber3_SkipBytes0(t *testing.T) {
var innerWriter collectWrites
splitWriter := NewWriter(&innerWriter, 1, AddSplitSequence(0, 3))
n, err := splitWriter.Write([]byte("Request"))
require.NoError(t, err)
require.Equal(t, 7, n)
require.Equal(t, [][]byte{[]byte("R"), []byte("equest")}, innerWriter.writes)
}

// collectReader is a [io.Reader] that appends each Read from the Reader to the reads slice.
type collectReader struct {
io.Reader
Expand Down
Loading