Skip to content

Commit

Permalink
Compress before splitting
Browse files Browse the repository at this point in the history
  • Loading branch information
ohkinozomu committed Nov 16, 2023
1 parent 78e2ca7 commit 32b28c7
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 28 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down
28 changes: 15 additions & 13 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"net/http"
"os"
"strings"
"time"

"github.com/eclipse/paho.golang/paho"
Expand Down Expand Up @@ -105,14 +104,14 @@ func newServer(c AgentConfig) server {
}
}

func sendHTTP1Request(proxyHost string, data *data.HTTPRequestData) (string, int, http.Header, error) {
func sendHTTP1Request(proxyHost string, data *data.HTTPRequestData) ([]byte, int, http.Header, error) {
var responseHeader http.Header

url := "http://" + proxyHost + data.Path
body := bytes.NewBuffer(data.Body.Body)
req, err := http.NewRequest(data.Method, url, body)
if err != nil {
return "", http.StatusInternalServerError, responseHeader, err
return nil, http.StatusInternalServerError, responseHeader, err
}
for key, values := range data.Headers.GetHeaders() {
for _, value := range values.GetValues() {
Expand All @@ -122,15 +121,15 @@ func sendHTTP1Request(proxyHost string, data *data.HTTPRequestData) (string, int
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return "", http.StatusInternalServerError, responseHeader, err
return nil, http.StatusInternalServerError, responseHeader, err
}
defer resp.Body.Close()
responseBody := new(bytes.Buffer)
_, err = responseBody.ReadFrom(resp.Body)
if err != nil {
return "", resp.StatusCode, responseHeader, err
return nil, resp.StatusCode, responseHeader, err
}
return responseBody.String(), resp.StatusCode, resp.Header, nil
return responseBody.Bytes(), resp.StatusCode, resp.Header, nil
}

func Start(c AgentConfig) {
Expand Down Expand Up @@ -293,15 +292,18 @@ func Start(c AgentConfig) {
Headers: &protoHeaders,
}
} else {
httpResponseBytes := []byte(httpResponse)
if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponseBytes) > s.commonConfig.Split.ChunkBytes {
if s.commonConfig.Networking.Compress == "zstd" && s.encoder != nil {
httpResponse = s.encoder.EncodeAll(httpResponse, nil)
}

if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponse) > s.commonConfig.Split.ChunkBytes {
var chunks [][]byte
for i := 0; i < len(httpResponseBytes); i += s.commonConfig.Split.ChunkBytes {
for i := 0; i < len(httpResponse); i += s.commonConfig.Split.ChunkBytes {
end := i + s.commonConfig.Split.ChunkBytes
if end > len(httpResponseBytes) {
end = len(httpResponseBytes)
if end > len(httpResponse) {
end = len(httpResponse)
}
chunks = append(chunks, httpResponseBytes[i:end])
chunks = append(chunks, httpResponse[i:end])
}

for sequence, c := range chunks {
Expand Down Expand Up @@ -360,7 +362,7 @@ func Start(c AgentConfig) {
} else {
if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && len(httpResponse) > s.commonConfig.StorageRelay.ThresholdBytes {
objectName = s.id + "/" + processChPayload.requestPacket.RequestId + "/response"
err := s.bucket.Upload(context.Background(), objectName, strings.NewReader(httpResponse))
err := s.bucket.Upload(context.Background(), objectName, bytes.NewReader(httpResponse))
if err != nil {
s.logger.Error("Error uploading object to object storage", zap.Error(err))
return
Expand Down
9 changes: 7 additions & 2 deletions internal/hub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (s *server) handleRequest(w http.ResponseWriter, r *http.Request) {
bodyBytes := make([]byte, r.ContentLength)
r.Body.Read(bodyBytes)

if s.encoder != nil {
bodyBytes = s.encoder.EncodeAll(bodyBytes, nil)
}

dataHeaders := data.HTTPHeaderToProtoHeaders(r.Header)

var body data.HTTPBody
Expand Down Expand Up @@ -229,14 +233,15 @@ func (s *server) handleRequest(w http.ResponseWriter, r *http.Request) {
Body: b,
Type: "split",
}

requestData := data.HTTPRequestData{
Method: r.Method,
Path: r.URL.Path,
Headers: &dataHeaders,
Body: &body,
}

b, err = data.SerializeHTTPRequestData(&requestData, s.commonConfig.Networking.Format, s.encoder)
b, err = data.SerializeHTTPRequestData(&requestData, s.commonConfig.Networking.Format)
if err != nil {
s.logger.Error("Error serializing request data", zap.Error(err))
return
Expand Down Expand Up @@ -284,7 +289,7 @@ func (s *server) handleRequest(w http.ResponseWriter, r *http.Request) {
Body: &body,
}

b, err := data.SerializeHTTPRequestData(&requestData, s.commonConfig.Networking.Format, s.encoder)
b, err := data.SerializeHTTPRequestData(&requestData, s.commonConfig.Networking.Format)
if err != nil {
s.logger.Error("Error serializing request data", zap.Error(err))
return
Expand Down
8 changes: 1 addition & 7 deletions pkg/data/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func DeserializeResponsePacket(payload []byte, format string) (*HTTPResponsePack
return &responsePacket, err
}

func SerializeHTTPRequestData(httpRequestData *HTTPRequestData, format string, encoder *zstd.Encoder) ([]byte, error) {
func SerializeHTTPRequestData(httpRequestData *HTTPRequestData, format string) ([]byte, error) {
var b []byte
var err error
switch format {
Expand All @@ -103,9 +103,6 @@ func SerializeHTTPRequestData(httpRequestData *HTTPRequestData, format string, e
default:
return nil, fmt.Errorf("unknown format: %s", format)
}
if encoder != nil {
b = encoder.EncodeAll(b, nil)
}
return b, nil
}

Expand Down Expand Up @@ -170,9 +167,6 @@ func SerializeHTTPResponseData(httpResponseData *HTTPResponseData, format string
default:
return nil, fmt.Errorf("unknown format: %s", format)
}
if encoder != nil {
b = encoder.EncodeAll(b, nil)
}
return b, nil
}

Expand Down
26 changes: 22 additions & 4 deletions pkg/data/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func TestSerializedRequestPacket(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.format, func(t *testing.T) {
body := []byte("test")

if testCase.compress == "zstd" && testCase.encoder != nil {
body = testCase.encoder.EncodeAll(body, nil)
}

httpRequestData := HTTPRequestData{
Method: "GET",
Path: "/",
Expand All @@ -66,11 +72,11 @@ func TestSerializedRequestPacket(t *testing.T) {
},
},
Body: &HTTPBody{
Body: []byte("test"),
Body: body,
Type: "data",
},
}
b, err := SerializeHTTPRequestData(&httpRequestData, testCase.format, testCase.encoder)
b, err := SerializeHTTPRequestData(&httpRequestData, testCase.format)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -151,11 +157,17 @@ func TestSerializedResponsePacket(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.format, func(t *testing.T) {
body := []byte("test")

if testCase.compress == "zstd" && testCase.encoder != nil {
body = testCase.encoder.EncodeAll(body, nil)
}

httpReponseData := HTTPResponseData{
StatusCode: 200,
Headers: &headers,
Body: &HTTPBody{
Body: []byte("test"),
Body: body,
Type: "data",
},
}
Expand Down Expand Up @@ -238,10 +250,16 @@ func TestHTTPResponseDataSerialize(t *testing.T) {
"Content-Length": {"31"},
})

body := []byte("test")

if testCase.compress == "zstd" && testCase.encoder != nil {
body = testCase.encoder.EncodeAll(body, nil)
}

httpResponseData := HTTPResponseData{
StatusCode: 200,
Headers: &headers,
Body: &HTTPBody{Body: []byte("test"), Type: "data"},
Body: &HTTPBody{Body: body, Type: "data"},
}

serializedResponseData, err := SerializeHTTPResponseData(&httpResponseData, testCase.format, testCase.encoder)
Expand Down

0 comments on commit 32b28c7

Please sign in to comment.