From 1ecc028d3a4f9594ffd4ffe9f5ef6d4cec502884 Mon Sep 17 00:00:00 2001 From: Ohki Nozomu Date: Fri, 1 Dec 2023 18:36:57 +0900 Subject: [PATCH] Change compression/decompression timing (#106) --- internal/agent/agent.go | 25 +++++++++++++------------ internal/hub/server.go | 35 ++++++++++++++++++----------------- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 82d6eba..4248dec 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -313,6 +313,11 @@ func (s *server) sendResponseData(responseData *data.HTTPResponseData, requestID if err != nil { return err } + + if s.commonConfig.Networking.Compress == "zstd" && s.encoder != nil { + b = s.encoder.EncodeAll(b, nil) + } + responsePacket := data.HTTPResponsePacket{ RequestId: requestID, HttpResponseData: b, @@ -444,6 +449,14 @@ func Start(c AgentConfig) { return } + if requestPacket.Compress == "zstd" && s.decoder != nil { + requestPacket.HttpRequestData, err = s.decoder.DecodeAll(requestPacket.HttpRequestData, nil) + if err != nil { + s.logger.Error("Error decoding request body", zap.Error(err)) + return + } + } + httpRequestData, err := data.DeserializeHTTPRequestData(requestPacket.HttpRequestData, s.commonConfig.Networking.Format, s.bucket) if err != nil { s.logger.Error("Error deserializing request data", zap.Error(err)) @@ -486,14 +499,6 @@ func Start(c AgentConfig) { } var err error - if processChPayload.requestPacket.Compress == "zstd" && s.decoder != nil { - processChPayload.httpRequestData.Body.Body, err = s.decoder.DecodeAll(processChPayload.httpRequestData.Body.Body, nil) - if err != nil { - s.logger.Error("Error decoding request body", zap.Error(err)) - return - } - } - httpResponse, statusCode, responseHeader, err := sendHTTP1Request(s.proxyHost, processChPayload.httpRequestData) if err != nil { s.logger.Error("Error sending HTTP request", zap.Error(err)) @@ -513,10 +518,6 @@ func Start(c AgentConfig) { return } } else { - if s.commonConfig.Networking.Compress == "zstd" && s.encoder != nil { - httpResponse = s.encoder.EncodeAll(httpResponse, nil) - } - if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponse) > s.commonConfig.Split.ChunkBytes { err = s.sendSplitData(processChPayload.requestPacket.RequestId, httpResponse, statusCode, responseHeader) if err != nil { diff --git a/internal/hub/server.go b/internal/hub/server.go index d455145..6c3382a 100644 --- a/internal/hub/server.go +++ b/internal/hub/server.go @@ -270,6 +270,10 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error { return err } + if s.encoder != nil { + b = s.encoder.EncodeAll(b, nil) + } + requestPacket := data.HTTPRequestPacket{ RequestId: uuid, HttpRequestData: b, @@ -279,6 +283,7 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error { if err != nil { return err } + requestTopic := topics.RequestTopic(agentID) _, err = s.client.Publish(context.Background(), &paho.Publish{ Payload: requestPayload, @@ -295,10 +300,6 @@ func (s *server) sendSplitData(r *http.Request, uuid, agentID string) error { return err } - if s.encoder != nil { - bodyBytes = s.encoder.EncodeAll(bodyBytes, nil) - } - err = split.Split(uuid, bodyBytes, s.commonConfig.Split.ChunkBytes, s.commonConfig.Networking.Format, callbackFn) if err != nil { return err @@ -313,10 +314,6 @@ func (s *server) sendUnsplitData(r *http.Request, uuid, agentID, objectName stri return err } - if s.encoder != nil { - bodyBytes = s.encoder.EncodeAll(bodyBytes, nil) - } - var body data.HTTPBody if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && r.ContentLength > int64(s.commonConfig.StorageRelay.ThresholdBytes) { body = data.HTTPBody{ @@ -342,6 +339,10 @@ func (s *server) sendUnsplitData(r *http.Request, uuid, agentID, objectName stri return err } + if s.encoder != nil { + b = s.encoder.EncodeAll(b, nil) + } + requestPacket := data.HTTPRequestPacket{ RequestId: uuid, HttpRequestData: b, @@ -543,6 +544,15 @@ func (s *server) startHTTP1(c HubConfig) { return } + if httpResponsePacket.Compress == "zstd" && s.decoder != nil { + var err error + httpResponsePacket.HttpResponseData, err = s.decoder.DecodeAll(httpResponsePacket.HttpResponseData, nil) + if err != nil { + s.logger.Info("Error decompressing message: " + err.Error()) + return + } + } + httpResponseData, err := data.DeserializeHTTPResponseData(httpResponsePacket.GetHttpResponseData(), s.commonConfig.Networking.Format, s.bucket) if err != nil { s.logger.Info("Error deserializing HTTP response data: " + err.Error()) @@ -579,15 +589,6 @@ func (s *server) startHTTP1(c HubConfig) { go func() { s.logger.Debug("Emitting message to bus") - if busChPayload.responsePacket.Compress == "zstd" && s.decoder != nil { - var err error - busChPayload.httpResponseData.Body.Body, err = s.decoder.DecodeAll(busChPayload.httpResponseData.Body.Body, nil) - if err != nil { - s.logger.Info("Error decompressing message: " + err.Error()) - return - } - } - err := s.bus.Emit(context.Background(), busChPayload.responsePacket.RequestId, busChPayload.httpResponseData) if err != nil { if strings.Contains(err.Error(), fmt.Sprintf("bus: topic(%s) not found", busChPayload.responsePacket.RequestId)) {