Skip to content

Commit

Permalink
Update packet format (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohkinozomu authored Dec 7, 2023
1 parent e0b4115 commit 21654a2
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 325 deletions.
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
proto:
protoc -I=./pkg/data --go_out=./pkg/data --go_opt=paths=source_relative ./pkg/data/data.proto

vtproto:
protoc -I=./pkg/data \
--go_out=./pkg/data \
Expand Down
131 changes: 64 additions & 67 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -277,73 +275,78 @@ func sendHTTP1Request(proxyHost string, data *data.HTTPRequestData) (*http.Respo
return response, nil
}

func getResponseSize(r *http.Response) (int32, error) {
if r.ContentLength > 0 {
return int32(r.ContentLength), nil
} else if r.TransferEncoding != nil && len(r.TransferEncoding) > 0 && r.TransferEncoding[0] == "chunked" {
b, err := io.ReadAll(r.Body)
if err != nil {
return 0, err
}
return int32(len(b)), nil
} else {
return 0, fmt.Errorf("unknown response size")
func (s *server) sendOnce(requestID string, isLast bool, sequence int32, d []byte, r http.Response) error {
httpBodyChunk := data.HTTPBodyChunk{
RequestId: requestID,
IsLast: isLast,
Sequence: int32(sequence),
Data: d,
}
}

func (s *server) sendSplitData(requestID string, httpResponse *http.Response) error {
buffer := make([]byte, s.commonConfig.Split.ChunkBytes)

total, err := getResponseSize(httpResponse)
b, err := data.SerializeHTTPBodyChunk(&httpBodyChunk, s.commonConfig.Networking.Format)
if err != nil {
return err
}
s.logger.Debug("Total request size based on getRequestSize: " + fmt.Sprintf("%d", total))

totalChunks := int32(math.Ceil(float64(total) / float64(s.commonConfig.Split.ChunkBytes)))
s.logger.Debug("Total chunks: " + fmt.Sprintf("%d", totalChunks))
body := data.HTTPBody{
Body: b,
Type: "split",
}
protoHeaders := data.HTTPHeaderToProtoHeaders(r.Header)
responseData := data.HTTPResponseData{
Body: &body,
StatusCode: int32(r.StatusCode),
Headers: &protoHeaders,
}
sendErr := s.sendResponseData(&responseData, requestID)
if sendErr != nil {
return sendErr
}
return nil
}

func (s *server) sendSplitData(requestID string, httpResponse *http.Response) error {
buffer := make([]byte, s.commonConfig.Split.ChunkBytes)
sequence := 1
isLast := false
previousData := []byte{}
for {
n, readErr := io.ReadFull(httpResponse.Body, buffer)
if n > 0 {
s.logger.Debug("Sending chunk " + fmt.Sprintf("%d", sequence))
s.logger.Debug("Chunk size: " + fmt.Sprintf("%d", n))
httpBodyChunk := data.HTTPBodyChunk{
RequestId: requestID,
Total: totalChunks,
Sequence: int32(sequence + 1),
Data: buffer[:n],
}
b, err := data.SerializeHTTPBodyChunk(&httpBodyChunk, s.commonConfig.Networking.Format)
if err != nil {
return err
}
body := data.HTTPBody{
Body: b,
Type: "split",
}
protoHeaders := data.HTTPHeaderToProtoHeaders(httpResponse.Header)
responseData := data.HTTPResponseData{
Body: &body,
StatusCode: int32(httpResponse.StatusCode),
Headers: &protoHeaders,
}

sendErr := s.sendResponseData(&responseData, requestID)
if sendErr != nil {
return sendErr
}
sequence++
if n > 0 {
s.logger.Debug(string(buffer[:n]))
previousData = make([]byte, n)
copy(previousData, buffer[:n])
}

if readErr != nil {
if readErr == io.EOF || readErr == io.ErrUnexpectedEOF {
s.logger.Debug("Finished sending chunks")
isLast = true
if len(previousData) > 0 {
err := s.sendOnce(requestID, isLast, int32(sequence), previousData, *httpResponse)
if err != nil {
return err
}
}
// The request is smaller than a chunk and only one packet is sent.
if len(previousData) == 0 {
err := s.sendOnce(requestID, isLast, int32(sequence), buffer[:n], *httpResponse)
if err != nil {
return err
}
}
break
} else {
s.logger.Error("Error reading request body", zap.Error(readErr))
return readErr
}
} else {
if len(previousData) > 0 {
err := s.sendOnce(requestID, false, int32(sequence), previousData, *httpResponse)
if err != nil {
return err
}
}
return readErr
}
sequence++
}

return nil
Expand Down Expand Up @@ -375,7 +378,7 @@ func (s *server) sendResponseData(responseData *data.HTTPResponseData, requestID
s.logger.Debug("Publishing response")
_, err = s.client.Publish(context.Background(), &paho.Publish{
Topic: responseTopic,
QoS: 0,
QoS: 1,
Payload: responsePayload,
})
if err != nil {
Expand Down Expand Up @@ -428,17 +431,15 @@ func (s *server) sendUnsplitData(requestID string, httpResponse *http.Response)
return nil
}

func (s *server) handleErr(requestID string, header http.Header, err error) {
func (s *server) handleErr(requestID string, err error) {
s.logger.Error("Error sending HTTP request", zap.Error(err))
protoHeaders := data.HTTPHeaderToProtoHeaders(header)
// For now, not apply the storage relay to the error
responseData := data.HTTPResponseData{
Body: &data.HTTPBody{
Body: []byte(err.Error()),
Type: "data",
},
StatusCode: http.StatusInternalServerError,
Headers: &protoHeaders,
}
err = s.sendResponseData(&responseData, requestID)
if err != nil {
Expand Down Expand Up @@ -544,12 +545,14 @@ func Start(c AgentConfig) {
}()
case mergeChPayload := <-s.mergeCh:
go func() {
s.logger.Debug("Merging message")
combined, completed, err := split.Merge(s.merger, mergeChPayload.httpRequestData.Body.Body, s.commonConfig.Networking.Format)
if err != nil {
s.logger.Info("Error merging message: " + err.Error())
return
}
if completed {
s.logger.Debug("combined: " + string(combined))
mergeChPayload.httpRequestData.Body.Body = combined
s.processCh <- processChPayload(mergeChPayload)
s.merger.DeleteChunk(mergeChPayload.requestPacket.RequestId)
Expand All @@ -567,27 +570,21 @@ func Start(c AgentConfig) {
var err error
httpResponse, err := sendHTTP1Request(s.proxyHost, processChPayload.httpRequestData)
if err != nil {
s.handleErr(processChPayload.requestPacket.RequestId, httpResponse.Header, err)
s.handleErr(processChPayload.requestPacket.RequestId, err)
return
}
defer httpResponse.Body.Close()

responseSize, err := getResponseSize(httpResponse)
if err != nil {
s.handleErr(processChPayload.requestPacket.RequestId, httpResponse.Header, err)
return
}
s.logger.Debug("Response size: " + fmt.Sprintf("%d", responseSize))
if s.commonConfig.Networking.LargeDataPolicy == "split" && int(responseSize) > s.commonConfig.Split.ChunkBytes {
if s.commonConfig.Networking.LargeDataPolicy == "split" {
err = s.sendSplitData(processChPayload.requestPacket.RequestId, httpResponse)
if err != nil {
s.handleErr(processChPayload.requestPacket.RequestId, httpResponse.Header, err)
s.handleErr(processChPayload.requestPacket.RequestId, err)
return
}
} else {
err = s.sendUnsplitData(processChPayload.requestPacket.RequestId, httpResponse)
if err != nil {
s.handleErr(processChPayload.requestPacket.RequestId, httpResponse.Header, err)
s.handleErr(processChPayload.requestPacket.RequestId, err)
return
}
}
Expand Down
47 changes: 33 additions & 14 deletions internal/common/split/merger.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,48 @@
package split

import (
"log"
"sort"
"sync"

"github.com/ohkinozomu/fuyuu-router/pkg/data"
)

type chunkData struct {
total int
data map[int][]byte
}

type Merger struct {
chunks map[string]map[int][]byte
chunks map[string]chunkData
mu sync.Mutex
}

func NewMerger() *Merger {
return &Merger{
chunks: make(map[string]map[int][]byte),
chunks: make(map[string]chunkData),
mu: sync.Mutex{},
}
}

func (m *Merger) AddChunk(chunk *data.HTTPBodyChunk) {
// Avoid concurrent map writes
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.chunks[chunk.RequestId]; !exists {
m.chunks[chunk.RequestId] = make(map[int][]byte)

c, exists := m.chunks[chunk.RequestId]
if !exists {
c = chunkData{
data: make(map[int][]byte),
}
}

c.data[int(chunk.Sequence)] = chunk.Data

if chunk.IsLast {
c.total = int(chunk.Sequence)
}
m.chunks[chunk.RequestId][int(chunk.Sequence)] = chunk.Data

m.chunks[chunk.RequestId] = c
}

func (m *Merger) DeleteChunk(requestId string) {
Expand All @@ -37,33 +53,36 @@ func (m *Merger) DeleteChunk(requestId string) {
}

func (m *Merger) IsComplete(chunk *data.HTTPBodyChunk) bool {
return len(m.chunks[chunk.RequestId]) == int(chunk.Total)
chunkData, exists := m.chunks[chunk.RequestId]
log.Println(chunkData.total, len(chunkData.data))
return exists && len(chunkData.data) == chunkData.total
}

func (m *Merger) GetCombinedData(chunk *data.HTTPBodyChunk) []byte {
// Avoid concurrent map read and map write
m.mu.Lock()
defer m.mu.Unlock()

if !m.IsComplete(chunk) {
chunkData, exists := m.chunks[chunk.RequestId]
if !exists || len(chunkData.data) != chunkData.total {
return nil
}

sequences := make([]int, 0, len(m.chunks[chunk.RequestId]))
for seq := range m.chunks[chunk.RequestId] {
sequences := make([]int, 0, len(chunkData.data))
for seq := range chunkData.data {
sequences = append(sequences, seq)
}
sort.Ints(sequences)

totalSize := 0
for _, seq := range sequences {
totalSize += len(m.chunks[chunk.RequestId][seq])
totalSize += len(chunkData.data[seq])
}
combinedData := make([]byte, totalSize)

currentIndex := 0
for _, seq := range sequences {
copy(combinedData[currentIndex:], m.chunks[chunk.RequestId][seq])
currentIndex += len(m.chunks[chunk.RequestId][seq])
copy(combinedData[currentIndex:], chunkData.data[seq])
currentIndex += len(chunkData.data[seq])
}

return combinedData
Expand Down
19 changes: 12 additions & 7 deletions internal/common/split/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/ohkinozomu/fuyuu-router/pkg/data"
"github.com/stretchr/testify/assert"
)

func TestNewMerger(t *testing.T) {
Expand All @@ -22,14 +23,12 @@ func TestAddChunkAndIsComplete(t *testing.T) {
chunk := &data.HTTPBodyChunk{
RequestId: "test",
Sequence: 1,
Total: 2,
IsLast: false,
Data: []byte("part1"),
}

merger.AddChunk(chunk)
if !reflect.DeepEqual(merger.chunks[chunk.RequestId][int(chunk.Sequence)], chunk.Data) {
t.Errorf("AddChunk did not add the chunk data correctly")
}
assert.Equal(t, merger.chunks[chunk.RequestId].data[int(chunk.Sequence)], chunk.Data)

if merger.IsComplete(chunk) {
t.Error("IsComplete should return false when the total number of chunks has not been reached")
Expand All @@ -38,28 +37,34 @@ func TestAddChunkAndIsComplete(t *testing.T) {
chunk2 := &data.HTTPBodyChunk{
RequestId: "test",
Sequence: 2,
Total: 2,
IsLast: true,
Data: []byte("part2"),
}

merger.AddChunk(chunk2)
assert.Equal(t, merger.chunks[chunk2.RequestId].data[int(chunk2.Sequence)], chunk2.Data)

if !merger.IsComplete(chunk2) {
t.Error("IsComplete should return true when all chunks have been added")
}

if merger.chunks[chunk2.RequestId].total != int(chunk2.Sequence) {
t.Errorf("total was not set correctly when IsLast is true")
}
}

func TestGetCombinedData(t *testing.T) {
merger := NewMerger()
chunk1 := &data.HTTPBodyChunk{
RequestId: "test",
Sequence: 1,
Total: 2,
IsLast: false,
Data: []byte("part1"),
}
chunk2 := &data.HTTPBodyChunk{
RequestId: "test",
Sequence: 2,
Total: 2,
IsLast: true,
Data: []byte("part2"),
}

Expand Down
Loading

0 comments on commit 21654a2

Please sign in to comment.