From 05004158d1a3b9bfde48c9bcea58938e6344bd1a Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Mon, 9 Sep 2024 12:45:53 -0400 Subject: [PATCH] feat(compression): Add the option of configuring compression levels --- .chloggen/configure-compression-levels.yaml | 25 +++++++++ config/configcompression/compressiontype.go | 61 ++++++++++++++++++++- config/configcompression/go.mod | 1 + config/configcompression/go.sum | 2 + config/confighttp/README.md | 21 +++++++ config/confighttp/compression.go | 11 +++- config/confighttp/compression_test.go | 33 +++++++---- config/confighttp/compressor.go | 32 +++++------ config/confighttp/confighttp.go | 34 +++++++++++- 9 files changed, 183 insertions(+), 37 deletions(-) create mode 100644 .chloggen/configure-compression-levels.yaml diff --git a/.chloggen/configure-compression-levels.yaml b/.chloggen/configure-compression-levels.yaml new file mode 100644 index 00000000000..1b88a1063fc --- /dev/null +++ b/.chloggen/configure-compression-levels.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added support for configuring compression levels. + +# One or more tracking issues or pull requests related to the change +issues: [10467] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index 004e9558665..8b38d61fe94 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -3,7 +3,13 @@ package configcompression // import "go.opentelemetry.io/collector/config/configcompression" -import "fmt" +import ( + "fmt" + "strconv" + "strings" + + "github.com/klauspost/compress/zlib" +) // Type represents a compression method type Type string @@ -39,3 +45,56 @@ func (ct *Type) UnmarshalText(in []byte) error { return fmt.Errorf("unsupported compression type %q", typ) } + +// IsZstd returns true if the compression type is zstd. +func (ct *Type) IsZstd() bool { + parts := strings.Split(string(*ct), "/") + if parts[0] == string(TypeZstd) { + return true + } + return false +} + +// IsGzip returns true if the compression type is gzip and the specified compression level is valid. +func (ct *Type) IsGzip() bool { + parts := strings.Split(string(*ct), "/") + if parts[0] == string(TypeGzip) { + if len(parts) > 1 { + levelStr, err := strconv.Atoi(parts[1]) + if err != nil { + return false + } + if levelStr == zlib.BestSpeed || + levelStr == zlib.BestCompression || + levelStr == zlib.DefaultCompression || + levelStr == zlib.HuffmanOnly || + levelStr == zlib.NoCompression { + return true + } + } + return true + } + return false +} + +// IsZlib returns true if the compression type is zlib and the specified compression level is valid. +func (ct *Type) IsZlib() bool { + parts := strings.Split(string(*ct), "/") + if parts[0] == string(TypeZlib) || parts[0] == string(TypeDeflate) { + if len(parts) > 1 { + levelStr, err := strconv.Atoi(parts[1]) + if err != nil { + return false + } + if levelStr == zlib.BestSpeed || + levelStr == zlib.BestCompression || + levelStr == zlib.DefaultCompression || + levelStr == zlib.HuffmanOnly || + levelStr == zlib.NoCompression { + return true + } + } + return true + } + return false +} diff --git a/config/configcompression/go.mod b/config/configcompression/go.mod index 3e9b00c0d02..f22448aff2d 100644 --- a/config/configcompression/go.mod +++ b/config/configcompression/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configcompression go 1.22.0 require ( + github.com/klauspost/compress v1.17.9 github.com/stretchr/testify v1.9.0 go.uber.org/goleak v1.3.0 ) diff --git a/config/configcompression/go.sum b/config/configcompression/go.sum index bdd6d70ba4d..e884184d191 100644 --- a/config/configcompression/go.sum +++ b/config/configcompression/go.sum @@ -1,6 +1,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 24d2905d7af..070c7593f09 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -26,6 +26,27 @@ README](../configtls/README.md). - `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, and `deflate`. - look at the documentation for the server-side of the communication. - `none` will be treated as uncompressed, and any other inputs will cause an error. + - Compression levels can now be configured as part of the compression type like below. Not specifying any compression level will result in the default. + - `gzip` + - NoCompression: `gzip/0` + - BestSpeed: `gzip/1` + - BestCompression: `gzip/9` + - DefaultCompression: `gzip/-1` + - `zlib` + - NoCompression: `zlib/0` + - BestSpeed: `zlib/1` + - BestCompression: `zlib/9` + - DefaultCompression: `zlib/-1` + - `deflate` + - NoCompression: `deflate/0` + - BestSpeed: `deflate/1` + - BestCompression: `deflate/9` + - DefaultCompression: `deflate/-1` + - `zstd` + - SpeedFastest: `zstd/1` + - SpeedDefault: `zstd/3` + - SpeedBetterCompression: `zstd/6` + - SpeedBestCompression: `zstd/11` - [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport) - [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport) - [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 4498fefe864..202ab65d472 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -25,6 +25,11 @@ type compressRoundTripper struct { compressor *compressor } +type CompressionOptions struct { + compressionType configcompression.Type + compressionLevel int +} + var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. @@ -72,14 +77,14 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro }, } -func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) { - encoder, err := newCompressor(compressionType) +func newCompressRoundTripper(rt http.RoundTripper, compressionopts CompressionOptions) (*compressRoundTripper, error) { + encoder, err := newCompressor(compressionopts) if err != nil { return nil, err } return &compressRoundTripper{ rt: rt, - compressionType: compressionType, + compressionType: compressionopts.compressionType, compressor: encoder, }, nil } diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index a4fcb013f4f..89fd471c04c 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -32,6 +32,13 @@ func TestHTTPClientCompression(t *testing.T) { compressedSnappyBody := compressSnappy(t, testBody) compressedZstdBody := compressZstd(t, testBody) + const ( + GzipLevel configcompression.Type = "gzip/1" + ZlibLevel configcompression.Type = "zlib/1" + DeflateLevel configcompression.Type = "deflate/1" + ZstdLevel configcompression.Type = "zstd/11" + ) + tests := []struct { name string encoding configcompression.Type @@ -52,19 +59,19 @@ func TestHTTPClientCompression(t *testing.T) { }, { name: "ValidGzip", - encoding: configcompression.TypeGzip, + encoding: GzipLevel, reqBody: compressedGzipBody.Bytes(), shouldError: false, }, { name: "ValidZlib", - encoding: configcompression.TypeZlib, + encoding: ZlibLevel, reqBody: compressedZlibBody.Bytes(), shouldError: false, }, { name: "ValidDeflate", - encoding: configcompression.TypeDeflate, + encoding: DeflateLevel, reqBody: compressedDeflateBody.Bytes(), shouldError: false, }, @@ -76,7 +83,7 @@ func TestHTTPClientCompression(t *testing.T) { }, { name: "ValidZstd", - encoding: configcompression.TypeZstd, + encoding: ZstdLevel, reqBody: compressedZstdBody.Bytes(), shouldError: false, }, @@ -288,7 +295,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) { require.NoError(t, err, "failed to create request to test handler") client := http.Client{} - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compression := CompressionOptions{configcompression.TypeGzip, gzip.BestSpeed} + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression) require.NoError(t, err) res, err := client.Do(req) require.NoError(t, err) @@ -319,7 +327,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) { require.NoError(t, err) client := http.Client{} - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + compression := CompressionOptions{configcompression.TypeGzip, gzip.DefaultCompression} + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -341,9 +350,9 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) { req, err := http.NewRequest(http.MethodGet, server.URL, &closeFailBody{Buffer: bytes.NewBuffer([]byte("blank"))}) require.NoError(t, err) - + compression := CompressionOptions{configcompression.TypeGzip, gzip.DefaultCompression} client := http.Client{} - client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip) + client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression) require.NoError(t, err) _, err = client.Do(req) require.Error(t, err) @@ -376,7 +385,7 @@ func TestOverrideCompressionList(t *testing.T) { func compressGzip(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - gw := gzip.NewWriter(&buf) + gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed) _, err := gw.Write(body) require.NoError(t, err) require.NoError(t, gw.Close()) @@ -385,7 +394,7 @@ func compressGzip(t testing.TB, body []byte) *bytes.Buffer { func compressZlib(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw := zlib.NewWriter(&buf) + zw, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) @@ -403,7 +412,9 @@ func compressSnappy(t testing.TB, body []byte) *bytes.Buffer { func compressZstd(t testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer - zw, _ := zstd.NewWriter(&buf) + compression := zstd.SpeedFastest + encoderLevel := zstd.WithEncoderLevel(compression) + zw, _ := zstd.NewWriter(&buf, encoderLevel) _, err := zw.Write(body) require.NoError(t, err) require.NoError(t, zw.Close()) diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 660fa83ce51..a93ff625bf6 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -22,36 +22,30 @@ type writeCloserReset interface { Reset(w io.Writer) } -var ( - _ writeCloserReset = (*gzip.Writer)(nil) - gZipPool = &compressor{pool: sync.Pool{New: func() any { return gzip.NewWriter(nil) }}} - _ writeCloserReset = (*snappy.Writer)(nil) - snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}} - _ writeCloserReset = (*zstd.Encoder)(nil) - // Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool. - zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}} - _ writeCloserReset = (*zlib.Writer)(nil) - zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}} -) - type compressor struct { pool sync.Pool } // writerFactory defines writer field in CompressRoundTripper. // The validity of input is already checked when NewCompressRoundTripper was called in confighttp, -func newCompressor(compressionType configcompression.Type) (*compressor, error) { - switch compressionType { +func newCompressor(compressionopts CompressionOptions) (*compressor, error) { + switch compressionopts.compressionType { case configcompression.TypeGzip: - return gZipPool, nil + var _ writeCloserReset = (*gzip.Writer)(nil) + return &compressor{pool: sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil case configcompression.TypeSnappy: - return snappyPool, nil + var _ writeCloserReset = (*snappy.Writer)(nil) + return &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}, nil case configcompression.TypeZstd: - return zStdPool, nil + var _ writeCloserReset = (*zstd.Encoder)(nil) + compression := zstd.EncoderLevelFromZstd(compressionopts.compressionLevel) + encoderLevel := zstd.WithEncoderLevel(compression) + return &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }}}, nil case configcompression.TypeZlib, configcompression.TypeDeflate: - return zLibPool, nil + var _ writeCloserReset = (*zlib.Writer)(nil) + return &compressor{pool: sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil } - return nil, errors.New("unsupported compression type, ") + return nil, errors.New("unsupported compression type") } func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error { diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index ec7c3197de9..846728b815e 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -13,6 +13,8 @@ import ( "net/http" "net/http/cookiejar" "net/url" + "strconv" + "strings" "time" "github.com/rs/cors" @@ -132,6 +134,21 @@ func NewDefaultClientConfig() ClientConfig { } } +func setCompression(compressionField configcompression.Type) (compressionType configcompression.Type, compressionLevel int) { + parts := strings.Split(string(compressionField), "/") + + // Set compression type + compressionLevel = 1 + compressionType = configcompression.Type(parts[0]) + if len(parts) > 1 { + levelStr := parts[1] + if level, err := strconv.Atoi(levelStr); err == nil { + compressionLevel = level + } + } + return compressionType, compressionLevel +} + // ToClient creates an HTTP client. func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, settings component.TelemetrySettings) (*http.Client, error) { tlsCfg, err := hcs.TLSSetting.LoadTLSConfig(ctx) @@ -217,9 +234,20 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett // Compress the body using specified compression methods if non-empty string is provided. // Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed. if hcs.Compression.IsCompressed() { - clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression) - if err != nil { - return nil, err + if hcs.Compression.IsZstd() || hcs.Compression.IsGzip() || hcs.Compression.IsZlib() { + compressionType, compressionLevel := setCompression(hcs.Compression) + compression := CompressionOptions{compressionType, compressionLevel} + clientTransport, err = newCompressRoundTripper(clientTransport, compression) + if err != nil { + return nil, err + } + } else { + // Use the default if the compression level is not specified. + compressionopts := CompressionOptions{hcs.Compression, -1} + clientTransport, err = newCompressRoundTripper(clientTransport, compressionopts) + if err != nil { + return nil, err + } } }