From 030d9ea15dff4af3ff360cc9e2f1cb4f7e6c93a5 Mon Sep 17 00:00:00 2001 From: Ohki Nozomu Date: Wed, 15 Nov 2023 23:24:53 +0900 Subject: [PATCH] Refactoring (#45) --- cmd/agent.go | 12 +- cmd/hub.go | 7 +- e2e/basic/test.go | 9 +- e2e/protobuf/test.go | 9 +- e2e/zstd/test.go | 9 +- example/agent.yaml | 6 +- example/hub.yaml | 4 +- internal/agent/agent.go | 242 ++++++++++++++++++++-------------------- internal/common/util.go | 10 +- internal/hub/server.go | 5 +- pkg/data/merger_test.go | 72 ++++++++++++ pkg/data/serde.go | 72 +++++++----- 12 files changed, 269 insertions(+), 188 deletions(-) create mode 100644 pkg/data/merger_test.go diff --git a/cmd/agent.go b/cmd/agent.go index d4286db..e5877ab 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -81,11 +81,12 @@ var agentCmd = &cobra.Command{ logger.Info("profiling enabled") go func() { var registry ncp.Registry - if config.Profiling.Registry == "cloudprofiler" { + switch config.Profiling.Registry { + case "cloudprofiler": registry = ncp.CLOUD_PROFILER - } else if config.Profiling.Registry == "pyroscope" { + case "pyroscope": registry = ncp.PYROSCOPE - } else { + default: logger.Fatal("Unknown profiling registry") } c := ncp.Config{ @@ -109,11 +110,10 @@ var agentCmd = &cobra.Command{ labelMap := make(map[string]string) for _, label := range labels { parts := strings.SplitN(label, "=", 2) - if len(parts) == 2 { - labelMap[parts[0]] = parts[1] - } else { + if len(parts) != 2 { logger.Fatal("Invalid label format") } + labelMap[parts[0]] = parts[1] } c := agent.AgentConfig{ diff --git a/cmd/hub.go b/cmd/hub.go index b619c0f..5b1a140 100644 --- a/cmd/hub.go +++ b/cmd/hub.go @@ -73,11 +73,12 @@ var hubCmd = &cobra.Command{ logger.Info("profiling enabled") go func() { var registry ncp.Registry - if config.Profiling.Registry == "cloudprofiler" { + switch config.Profiling.Registry { + case "cloudprofiler": registry = ncp.CLOUD_PROFILER - } else if config.Profiling.Registry == "pyroscope" { + case "pyroscope": registry = ncp.PYROSCOPE - } else { + default: logger.Fatal("Unknown profiling registry") } c := ncp.Config{ diff --git a/e2e/basic/test.go b/e2e/basic/test.go index a0b7478..4cafd27 100644 --- a/e2e/basic/test.go +++ b/e2e/basic/test.go @@ -35,7 +35,7 @@ func main() { go func(i int) { defer wg.Done() - jsonData := map[string]interface{}{ + jsonData := map[string]any{ "message": fmt.Sprintf("This is request %d", i), } jsonBytes, err := json.Marshal(jsonData) @@ -47,7 +47,7 @@ func main() { response, err := exec.Command("kubectl", "exec", "-n", namespace, podName, "--", "curl", "-X", "POST", "-H", "FuyuuRouter-IDs: agent01", "-H", contentType, "-d", string(jsonBytes), routerURL, "-s").Output() - var responseJSON map[string]interface{} + var responseJSON map[string]any if err := json.Unmarshal(response, &responseJSON); err != nil { errors <- fmt.Sprintf("Failed to unmarshal JSON response: %s\nResponse: %s", err, response) return @@ -70,10 +70,9 @@ func main() { errorCount++ } - if errorCount == 0 { - fmt.Println("All requests returned the correct response.") - } else { + if errorCount != 0 { fmt.Printf("Some requests did not return the correct response. Errors: %d\n", errorCount) os.Exit(1) } + fmt.Println("All requests returned the correct response.") } diff --git a/e2e/protobuf/test.go b/e2e/protobuf/test.go index a0b7478..4cafd27 100644 --- a/e2e/protobuf/test.go +++ b/e2e/protobuf/test.go @@ -35,7 +35,7 @@ func main() { go func(i int) { defer wg.Done() - jsonData := map[string]interface{}{ + jsonData := map[string]any{ "message": fmt.Sprintf("This is request %d", i), } jsonBytes, err := json.Marshal(jsonData) @@ -47,7 +47,7 @@ func main() { response, err := exec.Command("kubectl", "exec", "-n", namespace, podName, "--", "curl", "-X", "POST", "-H", "FuyuuRouter-IDs: agent01", "-H", contentType, "-d", string(jsonBytes), routerURL, "-s").Output() - var responseJSON map[string]interface{} + var responseJSON map[string]any if err := json.Unmarshal(response, &responseJSON); err != nil { errors <- fmt.Sprintf("Failed to unmarshal JSON response: %s\nResponse: %s", err, response) return @@ -70,10 +70,9 @@ func main() { errorCount++ } - if errorCount == 0 { - fmt.Println("All requests returned the correct response.") - } else { + if errorCount != 0 { fmt.Printf("Some requests did not return the correct response. Errors: %d\n", errorCount) os.Exit(1) } + fmt.Println("All requests returned the correct response.") } diff --git a/e2e/zstd/test.go b/e2e/zstd/test.go index a0b7478..4cafd27 100644 --- a/e2e/zstd/test.go +++ b/e2e/zstd/test.go @@ -35,7 +35,7 @@ func main() { go func(i int) { defer wg.Done() - jsonData := map[string]interface{}{ + jsonData := map[string]any{ "message": fmt.Sprintf("This is request %d", i), } jsonBytes, err := json.Marshal(jsonData) @@ -47,7 +47,7 @@ func main() { response, err := exec.Command("kubectl", "exec", "-n", namespace, podName, "--", "curl", "-X", "POST", "-H", "FuyuuRouter-IDs: agent01", "-H", contentType, "-d", string(jsonBytes), routerURL, "-s").Output() - var responseJSON map[string]interface{} + var responseJSON map[string]any if err := json.Unmarshal(response, &responseJSON); err != nil { errors <- fmt.Sprintf("Failed to unmarshal JSON response: %s\nResponse: %s", err, response) return @@ -70,10 +70,9 @@ func main() { errorCount++ } - if errorCount == 0 { - fmt.Println("All requests returned the correct response.") - } else { + if errorCount != 0 { fmt.Printf("Some requests did not return the correct response. Errors: %d\n", errorCount) os.Exit(1) } + fmt.Println("All requests returned the correct response.") } diff --git a/example/agent.yaml b/example/agent.yaml index 8fff83d..3046dd5 100644 --- a/example/agent.yaml +++ b/example/agent.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - name: fuyuu-router-agent - image: ohkinozomu/fuyuu-router:6932d5a4388f2080d7d1e20b6ffb9fe24e1c4303 + image: ohkinozomu/fuyuu-router:0.0.2 volumeMounts: - name: fuyuu-router-agent-config mountPath: /app/config @@ -43,4 +43,6 @@ metadata: namespace: fuyuu-router data: config.toml: | - # TODO \ No newline at end of file + [profiling] + registry = "pyroscope" + server_address = "http://pyroscope.pyroscope:4040" \ No newline at end of file diff --git a/example/hub.yaml b/example/hub.yaml index e405708..44079c9 100644 --- a/example/hub.yaml +++ b/example/hub.yaml @@ -15,7 +15,7 @@ spec: spec: containers: - name: fuyuu-router-hub - image: ohkinozomu/fuyuu-router:6932d5a4388f2080d7d1e20b6ffb9fe24e1c4303 + image: ohkinozomu/fuyuu-router:0.0.2 volumeMounts: - name: fuyuu-router-hub-config mountPath: /app/config @@ -37,4 +37,4 @@ data: config.toml: | [profiling] registry = "pyroscope" - server_address = "pyroscope.pyroscope:4040" \ No newline at end of file + server_address = "http://pyroscope.pyroscope:4040" \ No newline at end of file diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 831a80e..854c130 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -147,19 +147,21 @@ func Start(c AgentConfig) { var terminatePayload []byte var err error - if c.CommonConfigV2.Networking.Format == "json" { + switch c.CommonConfigV2.Networking.Format { + case "json": terminatePayload, err = json.Marshal(&teminatePacket) if err != nil { c.Logger.Fatal(err.Error()) } - } else if c.CommonConfigV2.Networking.Format == "protobuf" { + case "protobuf": terminatePayload, err = proto.Marshal(&teminatePacket) if err != nil { c.Logger.Fatal(err.Error()) } - } else { + default: c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) } + if err != nil { c.Logger.Fatal(err.Error()) } @@ -181,17 +183,18 @@ func Start(c AgentConfig) { } var launchPayload []byte - if c.CommonConfigV2.Networking.Format == "json" { + switch c.CommonConfigV2.Networking.Format { + case "json": launchPayload, err = json.Marshal(&launchPacket) if err != nil { c.Logger.Fatal(err.Error()) } - } else if c.CommonConfigV2.Networking.Format == "protobuf" { + case "protobuf": launchPayload, err = proto.Marshal(&launchPacket) if err != nil { c.Logger.Fatal(err.Error()) } - } else { + default: c.Logger.Fatal("Unknown format: " + c.CommonConfigV2.Networking.Format) } @@ -259,10 +262,7 @@ func Start(c AgentConfig) { combined := s.merger.GetCombinedData(chunk) s.logger.Debug("Combined data") mergeChPayload.httpRequestData.Body.Body = combined - processChPayload := processChPayload{ - requestPacket: mergeChPayload.requestPacket, - httpRequestData: mergeChPayload.httpRequestData, - } + processChPayload := processChPayload(mergeChPayload) s.logger.Debug("Sending to processCh") s.processCh <- processChPayload s.logger.Debug("Sent to processCh") @@ -272,132 +272,132 @@ func Start(c AgentConfig) { s.logger.Debug("Processing request") var responsePacket data.HTTPResponsePacket - if s.protocol == "http1" { - var responseData data.HTTPResponseData - var objectName string - httpResponse, statusCode, responseHeader, err := sendHTTP1Request(s.proxyHost, processChPayload.httpRequestData) - if err != nil { - s.logger.Error("Error sending HTTP request", zap.Error(err)) - protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) - // For now, not apply the storage relay to the error - responseData = data.HTTPResponseData{ - Body: &data.HTTPBody{ - Body: []byte(err.Error()), - Type: "data", - }, - StatusCode: http.StatusInternalServerError, - Headers: &protoHeaders, - } - } else { - httpResponseBytes := []byte(httpResponse) - if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponseBytes) > s.commonConfig.Split.ChunkBytes { - var chunks [][]byte - for i := 0; i < len(httpResponseBytes); i += s.commonConfig.Split.ChunkBytes { - end := i + s.commonConfig.Split.ChunkBytes - if end > len(httpResponseBytes) { - end = len(httpResponseBytes) - } - chunks = append(chunks, httpResponseBytes[i:end]) + if s.protocol != "http1" { + s.logger.Error("Unknown protocol: " + s.protocol) + return + } + + var responseData data.HTTPResponseData + var objectName string + httpResponse, statusCode, responseHeader, err := sendHTTP1Request(s.proxyHost, processChPayload.httpRequestData) + if err != nil { + s.logger.Error("Error sending HTTP request", zap.Error(err)) + protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) + // For now, not apply the storage relay to the error + responseData = data.HTTPResponseData{ + Body: &data.HTTPBody{ + Body: []byte(err.Error()), + Type: "data", + }, + StatusCode: http.StatusInternalServerError, + Headers: &protoHeaders, + } + } else { + httpResponseBytes := []byte(httpResponse) + if s.commonConfig.Networking.LargeDataPolicy == "split" && len(httpResponseBytes) > s.commonConfig.Split.ChunkBytes { + var chunks [][]byte + for i := 0; i < len(httpResponseBytes); i += s.commonConfig.Split.ChunkBytes { + end := i + s.commonConfig.Split.ChunkBytes + if end > len(httpResponseBytes) { + end = len(httpResponseBytes) } + chunks = append(chunks, httpResponseBytes[i:end]) + } - for sequence, c := range chunks { - httpBodyChunk := data.HTTPBodyChunk{ - RequestId: processChPayload.requestPacket.RequestId, - Total: int32(len(chunks)), - Sequence: int32(sequence + 1), - Data: c, - } - b, err := data.SerializeHTTPBodyChunk(&httpBodyChunk, s.commonConfig.Networking.Format) - if err != nil { - s.logger.Error("Error serializing HTTP body chunk", zap.Error(err)) - return - } - - body := data.HTTPBody{ - Body: b, - Type: "split", - } - protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) - responseData = data.HTTPResponseData{ - Body: &body, - StatusCode: int32(statusCode), - Headers: &protoHeaders, - } - - b, err = data.SerializeHTTPResponseData(&responseData, s.commonConfig.Networking.Format, s.encoder) - if err != nil { - s.logger.Error("Error serializing response data", zap.Error(err)) - return - } - responsePacket = data.HTTPResponsePacket{ - RequestId: processChPayload.requestPacket.RequestId, - HttpResponseData: b, - Compress: s.commonConfig.Networking.Compress, - } - - responseTopic := topics.ResponseTopic(s.id, processChPayload.requestPacket.RequestId) - - responsePayload, err := data.SerializeResponsePacket(&responsePacket, s.commonConfig.Networking.Format) - if err != nil { - s.logger.Error("Error serializing response packet", zap.Error(err)) - return - } - - _, err = s.client.Publish(context.Background(), &paho.Publish{ - Topic: responseTopic, - QoS: 0, - Payload: responsePayload, - }) - if err != nil { - s.logger.Error("Error publishing response", zap.Error(err)) - return - } + for sequence, c := range chunks { + httpBodyChunk := data.HTTPBodyChunk{ + RequestId: processChPayload.requestPacket.RequestId, + Total: int32(len(chunks)), + Sequence: int32(sequence + 1), + Data: c, } - } else { - if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && len(httpResponse) > s.commonConfig.StorageRelay.ThresholdBytes { - objectName = s.id + "/" + processChPayload.requestPacket.RequestId + "/response" - err := s.bucket.Upload(context.Background(), objectName, strings.NewReader(httpResponse)) - if err != nil { - s.logger.Error("Error uploading object to object storage", zap.Error(err)) - return - } + b, err := data.SerializeHTTPBodyChunk(&httpBodyChunk, s.commonConfig.Networking.Format) + if err != nil { + s.logger.Error("Error serializing HTTP body chunk", zap.Error(err)) + return } - protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) - var body data.HTTPBody - if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && len(httpResponse) > s.commonConfig.StorageRelay.ThresholdBytes { - body = data.HTTPBody{ - Body: []byte(objectName), - Type: "storage_relay", - } - } else { - body = data.HTTPBody{ - Body: []byte(httpResponse), - Type: "data", - } + body := data.HTTPBody{ + Body: b, + Type: "split", } - + protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) responseData = data.HTTPResponseData{ Body: &body, StatusCode: int32(statusCode), Headers: &protoHeaders, } + + b, err = data.SerializeHTTPResponseData(&responseData, s.commonConfig.Networking.Format, s.encoder) + if err != nil { + s.logger.Error("Error serializing response data", zap.Error(err)) + return + } + responsePacket = data.HTTPResponsePacket{ + RequestId: processChPayload.requestPacket.RequestId, + HttpResponseData: b, + Compress: s.commonConfig.Networking.Compress, + } + + responseTopic := topics.ResponseTopic(s.id, processChPayload.requestPacket.RequestId) + + responsePayload, err := data.SerializeResponsePacket(&responsePacket, s.commonConfig.Networking.Format) + if err != nil { + s.logger.Error("Error serializing response packet", zap.Error(err)) + return + } + + _, err = s.client.Publish(context.Background(), &paho.Publish{ + Topic: responseTopic, + QoS: 0, + Payload: responsePayload, + }) + if err != nil { + s.logger.Error("Error publishing response", zap.Error(err)) + return + } + } + } else { + if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && len(httpResponse) > s.commonConfig.StorageRelay.ThresholdBytes { + objectName = s.id + "/" + processChPayload.requestPacket.RequestId + "/response" + err := s.bucket.Upload(context.Background(), objectName, strings.NewReader(httpResponse)) + if err != nil { + s.logger.Error("Error uploading object to object storage", zap.Error(err)) + return + } + } + protoHeaders := data.HTTPHeaderToProtoHeaders(responseHeader) + + var body data.HTTPBody + if s.commonConfig.Networking.LargeDataPolicy == "storage_relay" && len(httpResponse) > s.commonConfig.StorageRelay.ThresholdBytes { + body = data.HTTPBody{ + Body: []byte(objectName), + Type: "storage_relay", + } + } else { + body = data.HTTPBody{ + Body: []byte(httpResponse), + Type: "data", + } + } + + responseData = data.HTTPResponseData{ + Body: &body, + StatusCode: int32(statusCode), + Headers: &protoHeaders, } } - b, err := data.SerializeHTTPResponseData(&responseData, s.commonConfig.Networking.Format, s.encoder) - if err != nil { - s.logger.Error("Error serializing response data", zap.Error(err)) - return - } - responsePacket = data.HTTPResponsePacket{ - RequestId: processChPayload.requestPacket.RequestId, - HttpResponseData: b, - Compress: s.commonConfig.Networking.Compress, - } - } else { - s.logger.Error("Unknown protocol: " + s.protocol) + } + b, err := data.SerializeHTTPResponseData(&responseData, s.commonConfig.Networking.Format, s.encoder) + if err != nil { + s.logger.Error("Error serializing response data", zap.Error(err)) return } + responsePacket = data.HTTPResponsePacket{ + RequestId: processChPayload.requestPacket.RequestId, + HttpResponseData: b, + Compress: s.commonConfig.Networking.Compress, + } responseTopic := topics.ResponseTopic(s.id, processChPayload.requestPacket.RequestId) diff --git a/internal/common/util.go b/internal/common/util.go index fac137c..180d388 100644 --- a/internal/common/util.go +++ b/internal/common/util.go @@ -14,19 +14,19 @@ func NewZapToBadgerAdapter(logger *zap.Logger) *zapToBadgerAdapter { return &zapToBadgerAdapter{logger: logger} } -func (adapter *zapToBadgerAdapter) Errorf(format string, args ...interface{}) { +func (adapter *zapToBadgerAdapter) Errorf(format string, args ...any) { adapter.logger.Sugar().Errorf(format, args...) } -func (adapter *zapToBadgerAdapter) Warningf(format string, args ...interface{}) { +func (adapter *zapToBadgerAdapter) Warningf(format string, args ...any) { adapter.logger.Sugar().Warnf(format, args...) } -func (adapter *zapToBadgerAdapter) Infof(format string, args ...interface{}) { +func (adapter *zapToBadgerAdapter) Infof(format string, args ...any) { adapter.logger.Sugar().Infof(format, args...) } -func (adapter *zapToBadgerAdapter) Debugf(format string, args ...interface{}) { +func (adapter *zapToBadgerAdapter) Debugf(format string, args ...any) { adapter.logger.Sugar().Debugf(format, args...) } @@ -38,7 +38,7 @@ func NewZapToGoKitAdapter(logger *zap.Logger) *zapToGoKitAdapter { return &zapToGoKitAdapter{logger: logger} } -func (adapter *zapToGoKitAdapter) Log(keyvals ...interface{}) error { +func (adapter *zapToGoKitAdapter) Log(keyvals ...any) error { if len(keyvals)%2 != 0 { return fmt.Errorf("keyvals must come in pairs") } diff --git a/internal/hub/server.go b/internal/hub/server.go index 306d57f..bd54766 100644 --- a/internal/hub/server.go +++ b/internal/hub/server.go @@ -379,11 +379,10 @@ func Start(c HubConfig) { c.Logger.Fatal("Error connecting to MQTT broker: " + err.Error()) } - if c.Protocol == "http1" { - s.startHTTP1(c) - } else { + if c.Protocol != "http1" { c.Logger.Fatal("Unknown protocol: " + c.Protocol) } + s.startHTTP1(c) } func (s *server) startHTTP1(c HubConfig) { diff --git a/pkg/data/merger_test.go b/pkg/data/merger_test.go new file mode 100644 index 0000000..05d40e2 --- /dev/null +++ b/pkg/data/merger_test.go @@ -0,0 +1,72 @@ +package data + +import ( + "reflect" + "testing" +) + +func TestNewMerger(t *testing.T) { + merger := NewMerger() + if merger == nil { + t.Error("NewMerger returned nil") + } + if merger.chunks == nil { + t.Error("NewMerger did not initialize chunks map") + } +} + +func TestAddChunkAndIsComplete(t *testing.T) { + merger := NewMerger() + chunk := &HTTPBodyChunk{ + RequestId: "test", + Sequence: 1, + Total: 2, + Data: []byte("part1"), + } + + merger.AddChunk(chunk) + if !reflect.DeepEqual(merger.chunks[chunk.RequestId][int(chunk.Sequence)], chunk.Data) { + t.Errorf("AddChunk did not add the chunk data correctly") + } + + if merger.IsComplete(chunk) { + t.Error("IsComplete should return false when the total number of chunks has not been reached") + } + + chunk2 := &HTTPBodyChunk{ + RequestId: "test", + Sequence: 2, + Total: 2, + Data: []byte("part2"), + } + + merger.AddChunk(chunk2) + if !merger.IsComplete(chunk2) { + t.Error("IsComplete should return true when all chunks have been added") + } +} + +func TestGetCombinedData(t *testing.T) { + merger := NewMerger() + chunk1 := &HTTPBodyChunk{ + RequestId: "test", + Sequence: 1, + Total: 2, + Data: []byte("part1"), + } + chunk2 := &HTTPBodyChunk{ + RequestId: "test", + Sequence: 2, + Total: 2, + Data: []byte("part2"), + } + + merger.AddChunk(chunk1) + merger.AddChunk(chunk2) + + combined := merger.GetCombinedData(chunk1) + expected := []byte("part1part2") + if !reflect.DeepEqual(combined, expected) { + t.Errorf("GetCombinedData returned %v, expected %v", combined, expected) + } +} diff --git a/pkg/data/serde.go b/pkg/data/serde.go index 39b4993..668ada4 100644 --- a/pkg/data/serde.go +++ b/pkg/data/serde.go @@ -25,11 +25,12 @@ func HTTPHeaderToProtoHeaders(httpHeader http.Header) HTTPHeaders { func SerializeRequestPacket(packet *HTTPRequestPacket, format string) ([]byte, error) { var err error var payload []byte - if format == "json" { + switch format { + case "json": payload, err = json.Marshal(packet) - } else if format == "protobuf" { + case "protobuf": payload, err = proto.Marshal(packet) - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } if err != nil { @@ -42,12 +43,13 @@ func SerializeRequestPacket(packet *HTTPRequestPacket, format string) ([]byte, e func DeserializeRequestPacket(payload []byte, format string) (*HTTPRequestPacket, error) { var err error requestPacket := HTTPRequestPacket{} - if format == "json" { + switch format { + case "json": err = json.Unmarshal(payload, &requestPacket) - } else if format == "protobuf" { + case "protobuf": err = proto.Unmarshal(payload, &requestPacket) - } else { - return nil, err + default: + return nil, fmt.Errorf("unknown format: %s", format) } return &requestPacket, err } @@ -55,11 +57,12 @@ func DeserializeRequestPacket(payload []byte, format string) (*HTTPRequestPacket func SerializeResponsePacket(responsePacket *HTTPResponsePacket, format string) ([]byte, error) { var err error var responsePayload []byte - if format == "json" { + switch format { + case "json": responsePayload, err = json.Marshal(responsePacket) - } else if format == "protobuf" { + case "protobuf": responsePayload, err = proto.Marshal(responsePacket) - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } if err != nil { @@ -72,11 +75,12 @@ func SerializeResponsePacket(responsePacket *HTTPResponsePacket, format string) func DeserializeResponsePacket(payload []byte, format string) (*HTTPResponsePacket, error) { var err error responsePacket := HTTPResponsePacket{} - if format == "json" { + switch format { + case "json": err = json.Unmarshal(payload, &responsePacket) - } else if format == "protobuf" { + case "protobuf": err = proto.Unmarshal(payload, &responsePacket) - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } return &responsePacket, err @@ -85,17 +89,18 @@ func DeserializeResponsePacket(payload []byte, format string) (*HTTPResponsePack func SerializeHTTPRequestData(httpRequestData *HTTPRequestData, format string, encoder *zstd.Encoder) ([]byte, error) { var b []byte var err error - if format == "json" { + switch format { + case "json": b, err = json.Marshal(httpRequestData) if err != nil { return nil, err } - } else if format == "protobuf" { + case "protobuf": b, err = proto.Marshal(httpRequestData) if err != nil { return nil, err } - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } if encoder != nil { @@ -114,15 +119,16 @@ func DeserializeHTTPRequestData(b []byte, compress string, format string, decode } var httpRequestData HTTPRequestData - if format == "json" { + switch format { + case "json": if err := json.Unmarshal(b, &httpRequestData); err != nil { return nil, fmt.Errorf("error unmarshalling message: %v", err) } - } else if format == "protobuf" { + case "protobuf": if err := proto.Unmarshal(b, &httpRequestData); err != nil { return nil, fmt.Errorf("error unmarshalling message: %v", err) } - } else { + default: return nil, fmt.Errorf("unknown format: %v", format) } @@ -150,17 +156,18 @@ func DeserializeHTTPRequestData(b []byte, compress string, format string, decode func SerializeHTTPResponseData(httpResponseData *HTTPResponseData, format string, encoder *zstd.Encoder) ([]byte, error) { var b []byte var err error - if format == "json" { + switch format { + case "json": b, err = json.Marshal(httpResponseData) if err != nil { return nil, err } - } else if format == "protobuf" { + case "protobuf": b, err = proto.Marshal(httpResponseData) if err != nil { return nil, err } - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } if encoder != nil { @@ -178,15 +185,16 @@ func DeserializeHTTPResponseData(b []byte, compress string, format string, decod } } var httpResponseData HTTPResponseData - if format == "json" { + switch format { + case "json": if err := json.Unmarshal(b, &httpResponseData); err != nil { return nil, fmt.Errorf("error unmarshalling message: %v", err) } - } else if format == "protobuf" { + case "protobuf": if err := proto.Unmarshal(b, &httpResponseData); err != nil { return nil, fmt.Errorf("error unmarshalling message: %v", err) } - } else { + default: return nil, fmt.Errorf("unknown format: %v", format) } @@ -214,11 +222,12 @@ func DeserializeHTTPResponseData(b []byte, compress string, format string, decod func SerializeHTTPBodyChunk(httpBodyChunk *HTTPBodyChunk, format string) ([]byte, error) { var err error var b []byte - if format == "json" { + switch format { + case "json": b, err = json.Marshal(httpBodyChunk) - } else if format == "protobuf" { + case "protobuf": b, err = proto.Marshal(httpBodyChunk) - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } if err != nil { @@ -231,11 +240,12 @@ func SerializeHTTPBodyChunk(httpBodyChunk *HTTPBodyChunk, format string) ([]byte func DeserializeHTTPBodyChunk(payload []byte, format string) (*HTTPBodyChunk, error) { var err error httpBodyChunk := HTTPBodyChunk{} - if format == "json" { + switch format { + case "json": err = json.Unmarshal(payload, &httpBodyChunk) - } else if format == "protobuf" { + case "protobuf": err = proto.Unmarshal(payload, &httpBodyChunk) - } else { + default: return nil, fmt.Errorf("unknown format: %s", format) } return &httpBodyChunk, err