Skip to content

Commit

Permalink
Change compression/decompression timing (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
ohkinozomu authored Dec 1, 2023
1 parent 7b11252 commit 1ecc028
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
25 changes: 13 additions & 12 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ func (s *server) sendResponseData(responseData *data.HTTPResponseData, requestID
if err != nil {
return err
}

if s.commonConfig.Networking.Compress == "zstd" && s.encoder != nil {
b = s.encoder.EncodeAll(b, nil)
}

responsePacket := data.HTTPResponsePacket{
RequestId: requestID,
HttpResponseData: b,
Expand Down Expand Up @@ -444,6 +449,14 @@ func Start(c AgentConfig) {
return
}

if requestPacket.Compress == "zstd" && s.decoder != nil {
requestPacket.HttpRequestData, err = s.decoder.DecodeAll(requestPacket.HttpRequestData, nil)
if err != nil {
s.logger.Error("Error decoding request body", zap.Error(err))
return
}
}

httpRequestData, err := data.DeserializeHTTPRequestData(requestPacket.HttpRequestData, s.commonConfig.Networking.Format, s.bucket)
if err != nil {
s.logger.Error("Error deserializing request data", zap.Error(err))
Expand Down Expand Up @@ -486,14 +499,6 @@ func Start(c AgentConfig) {
}

var err error
if processChPayload.requestPacket.Compress == "zstd" && s.decoder != nil {
processChPayload.httpRequestData.Body.Body, err = s.decoder.DecodeAll(processChPayload.httpRequestData.Body.Body, nil)
if err != nil {
s.logger.Error("Error decoding request body", zap.Error(err))
return
}
}

httpResponse, statusCode, responseHeader, err := sendHTTP1Request(s.proxyHost, processChPayload.httpRequestData)
if err != nil {
s.logger.Error("Error sending HTTP request", zap.Error(err))
Expand All @@ -513,10 +518,6 @@ func Start(c AgentConfig) {
return
}
} else {
if s.commonConfig.Networking.Compress == "zstd" && s.encoder != nil {
httpResponse = s.encoder.EncodeAll(httpResponse, nil)
}

if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponse) > s.commonConfig.Split.ChunkBytes {
err = s.sendSplitData(processChPayload.requestPacket.RequestId, httpResponse, statusCode, responseHeader)
if err != nil {
Expand Down
35 changes: 18 additions & 17 deletions internal/hub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error {
return err
}

if s.encoder != nil {
b = s.encoder.EncodeAll(b, nil)
}

requestPacket := data.HTTPRequestPacket{
RequestId: uuid,
HttpRequestData: b,
Expand All @@ -279,6 +283,7 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error {
if err != nil {
return err
}

requestTopic := topics.RequestTopic(agentID)
_, err = s.client.Publish(context.Background(), &paho.Publish{
Payload: requestPayload,
Expand All @@ -295,10 +300,6 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error {
return err
}

if s.encoder != nil {
bodyBytes = s.encoder.EncodeAll(bodyBytes, nil)
}

err = split.Split(uuid, bodyBytes, s.commonConfig.Split.ChunkBytes, s.commonConfig.Networking.Format, callbackFn)
if err != nil {
return err
Expand All @@ -313,10 +314,6 @@ func (s *server) sendUnsplitData(r *http.Request, uuid, agentID, objectName stri
return err
}

if s.encoder != nil {
bodyBytes = s.encoder.EncodeAll(bodyBytes, nil)
}

var body data.HTTPBody
if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && r.ContentLength > int64(s.commonConfig.StorageRelay.ThresholdBytes) {
body = data.HTTPBody{
Expand All @@ -342,6 +339,10 @@ func (s *server) sendUnsplitData(r *http.Request, uuid, agentID, objectName stri
return err
}

if s.encoder != nil {
b = s.encoder.EncodeAll(b, nil)
}

requestPacket := data.HTTPRequestPacket{
RequestId: uuid,
HttpRequestData: b,
Expand Down Expand Up @@ -543,6 +544,15 @@ func (s *server) startHTTP1(c HubConfig) {
return
}

if httpResponsePacket.Compress == "zstd" && s.decoder != nil {
var err error
httpResponsePacket.HttpResponseData, err = s.decoder.DecodeAll(httpResponsePacket.HttpResponseData, nil)
if err != nil {
s.logger.Info("Error decompressing message: " + err.Error())
return
}
}

httpResponseData, err := data.DeserializeHTTPResponseData(httpResponsePacket.GetHttpResponseData(), s.commonConfig.Networking.Format, s.bucket)
if err != nil {
s.logger.Info("Error deserializing HTTP response data: " + err.Error())
Expand Down Expand Up @@ -579,15 +589,6 @@ func (s *server) startHTTP1(c HubConfig) {
go func() {
s.logger.Debug("Emitting message to bus")

if busChPayload.responsePacket.Compress == "zstd" && s.decoder != nil {
var err error
busChPayload.httpResponseData.Body.Body, err = s.decoder.DecodeAll(busChPayload.httpResponseData.Body.Body, nil)
if err != nil {
s.logger.Info("Error decompressing message: " + err.Error())
return
}
}

err := s.bus.Emit(context.Background(), busChPayload.responsePacket.RequestId, busChPayload.httpResponseData)
if err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("bus: topic(%s) not found", busChPayload.responsePacket.RequestId)) {
Expand Down

0 comments on commit 1ecc028

Please sign in to comment.