Skip to content

Commit

Permalink
Merge pull request #131 from eos175/master
Browse files Browse the repository at this point in the history
minor fix closed
  • Loading branch information
AmarnathCJD authored Aug 7, 2024
2 parents cd5ff73 + d9d8acf commit 1fca87a
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions internal/transport/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,34 @@ type Reader struct {
ctx context.Context
data chan []byte

sizeWant chan int
sizeRead chan int

err error
r io.Reader
}

func (c *Reader) begin() {
defer func() {
close(c.data)
close(c.sizeRead)
}()

for {
select {
case sizeWant := <-c.sizeWant:
buf := make([]byte, sizeWant)
case buf := <-c.data:
n, err := io.ReadFull(c.r, buf)
if err != nil {
c.err = err
close(c.data)
return
}
if n != sizeWant {
panic("read " + strconv.Itoa(n) + ", want " + strconv.Itoa(sizeWant))

// if len(buf) != n, err = ErrUnexpectedEOF, this will never happen
if false {
panic("read " + strconv.Itoa(n) + ", want " + strconv.Itoa(len(buf)))
}
c.data <- buf

c.sizeRead <- n
case <-c.ctx.Done():
close(c.data)
close(c.sizeWant)
return
}
}
Expand All @@ -54,25 +58,20 @@ func isClosed(ch <-chan int) bool {
}

func (c *Reader) Read(p []byte) (int, error) {
if isClosed(c.sizeWant) {
return 0, io.EOF
}

select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
case c.sizeWant <- len(p):
case c.data <- p:
}

select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
case d, ok := <-c.data:
case n, ok := <-c.sizeRead:
if !ok {
return 0, c.err
}
copy(p, d)
return len(d), nil
return n, nil
}
}

Expand All @@ -95,7 +94,7 @@ func NewReader(ctx context.Context, r io.Reader) *Reader {
r: r,
ctx: ctx,
data: make(chan []byte),
sizeWant: make(chan int),
sizeRead: make(chan int),
}
go c.begin()
return c
Expand Down

0 comments on commit 1fca87a

Please sign in to comment.