Skip to content

Commit

Permalink
feat(compression): Add the option of configuring compression levels
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Sep 9, 2024
1 parent 6928951 commit 0500415
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 37 deletions.
25 changes: 25 additions & 0 deletions .chloggen/configure-compression-levels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added support for configuring compression levels.

# One or more tracking issues or pull requests related to the change
issues: [10467]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
61 changes: 60 additions & 1 deletion config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

package configcompression // import "go.opentelemetry.io/collector/config/configcompression"

import "fmt"
import (
"fmt"
"strconv"
"strings"

"github.com/klauspost/compress/zlib"
)

// Type represents a compression method
type Type string
Expand Down Expand Up @@ -39,3 +45,56 @@ func (ct *Type) UnmarshalText(in []byte) error {
return fmt.Errorf("unsupported compression type %q", typ)

}

// IsZstd returns true if the compression type is zstd.
func (ct *Type) IsZstd() bool {
parts := strings.Split(string(*ct), "/")
if parts[0] == string(TypeZstd) {
return true
}
return false
}

// IsGzip returns true if the compression type is gzip and the specified compression level is valid.
func (ct *Type) IsGzip() bool {
parts := strings.Split(string(*ct), "/")
if parts[0] == string(TypeGzip) {
if len(parts) > 1 {
levelStr, err := strconv.Atoi(parts[1])
if err != nil {
return false
}
if levelStr == zlib.BestSpeed ||
levelStr == zlib.BestCompression ||
levelStr == zlib.DefaultCompression ||
levelStr == zlib.HuffmanOnly ||
levelStr == zlib.NoCompression {
return true
}
}
return true
}
return false
}

// IsZlib returns true if the compression type is zlib and the specified compression level is valid.
func (ct *Type) IsZlib() bool {
parts := strings.Split(string(*ct), "/")
if parts[0] == string(TypeZlib) || parts[0] == string(TypeDeflate) {
if len(parts) > 1 {
levelStr, err := strconv.Atoi(parts[1])
if err != nil {
return false
}
if levelStr == zlib.BestSpeed ||
levelStr == zlib.BestCompression ||
levelStr == zlib.DefaultCompression ||
levelStr == zlib.HuffmanOnly ||
levelStr == zlib.NoCompression {
return true
}
}
return true
}
return false
}
1 change: 1 addition & 0 deletions config/configcompression/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configcompression
go 1.22.0

require (
github.com/klauspost/compress v1.17.9
github.com/stretchr/testify v1.9.0
go.uber.org/goleak v1.3.0
)
Expand Down
2 changes: 2 additions & 0 deletions config/configcompression/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ README](../configtls/README.md).
- `compression`: Compression type to use among `gzip`, `zstd`, `snappy`, `zlib`, and `deflate`.
- look at the documentation for the server-side of the communication.
- `none` will be treated as uncompressed, and any other inputs will cause an error.
- Compression levels can now be configured as part of the compression type like below. Not specifying any compression level will result in the default.
- `gzip`
- NoCompression: `gzip/0`
- BestSpeed: `gzip/1`
- BestCompression: `gzip/9`
- DefaultCompression: `gzip/-1`
- `zlib`
- NoCompression: `zlib/0`
- BestSpeed: `zlib/1`
- BestCompression: `zlib/9`
- DefaultCompression: `zlib/-1`
- `deflate`
- NoCompression: `deflate/0`
- BestSpeed: `deflate/1`
- BestCompression: `deflate/9`
- DefaultCompression: `deflate/-1`
- `zstd`
- SpeedFastest: `zstd/1`
- SpeedDefault: `zstd/3`
- SpeedBetterCompression: `zstd/6`
- SpeedBestCompression: `zstd/11`
- [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport)
- [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
- [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport)
Expand Down
11 changes: 8 additions & 3 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type compressRoundTripper struct {
compressor *compressor
}

type CompressionOptions struct {
compressionType configcompression.Type
compressionLevel int
}

var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
Expand Down Expand Up @@ -72,14 +77,14 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro
},
}

func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionType)
func newCompressRoundTripper(rt http.RoundTripper, compressionopts CompressionOptions) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionopts)
if err != nil {
return nil, err
}
return &compressRoundTripper{
rt: rt,
compressionType: compressionType,
compressionType: compressionopts.compressionType,
compressor: encoder,
}, nil
}
Expand Down
33 changes: 22 additions & 11 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func TestHTTPClientCompression(t *testing.T) {
compressedSnappyBody := compressSnappy(t, testBody)
compressedZstdBody := compressZstd(t, testBody)

const (
GzipLevel configcompression.Type = "gzip/1"
ZlibLevel configcompression.Type = "zlib/1"
DeflateLevel configcompression.Type = "deflate/1"
ZstdLevel configcompression.Type = "zstd/11"
)

tests := []struct {
name string
encoding configcompression.Type
Expand All @@ -52,19 +59,19 @@ func TestHTTPClientCompression(t *testing.T) {
},
{
name: "ValidGzip",
encoding: configcompression.TypeGzip,
encoding: GzipLevel,
reqBody: compressedGzipBody.Bytes(),
shouldError: false,
},
{
name: "ValidZlib",
encoding: configcompression.TypeZlib,
encoding: ZlibLevel,
reqBody: compressedZlibBody.Bytes(),
shouldError: false,
},
{
name: "ValidDeflate",
encoding: configcompression.TypeDeflate,
encoding: DeflateLevel,
reqBody: compressedDeflateBody.Bytes(),
shouldError: false,
},
Expand All @@ -76,7 +83,7 @@ func TestHTTPClientCompression(t *testing.T) {
},
{
name: "ValidZstd",
encoding: configcompression.TypeZstd,
encoding: ZstdLevel,
reqBody: compressedZstdBody.Bytes(),
shouldError: false,
},
Expand Down Expand Up @@ -288,7 +295,8 @@ func TestHTTPContentCompressionRequestWithNilBody(t *testing.T) {
require.NoError(t, err, "failed to create request to test handler")

client := http.Client{}
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
compression := CompressionOptions{configcompression.TypeGzip, gzip.BestSpeed}
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression)
require.NoError(t, err)
res, err := client.Do(req)
require.NoError(t, err)
Expand Down Expand Up @@ -319,7 +327,8 @@ func TestHTTPContentCompressionCopyError(t *testing.T) {
require.NoError(t, err)

client := http.Client{}
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
compression := CompressionOptions{configcompression.TypeGzip, gzip.DefaultCompression}
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression)
require.NoError(t, err)
_, err = client.Do(req)
require.Error(t, err)
Expand All @@ -341,9 +350,9 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) {

req, err := http.NewRequest(http.MethodGet, server.URL, &closeFailBody{Buffer: bytes.NewBuffer([]byte("blank"))})
require.NoError(t, err)

compression := CompressionOptions{configcompression.TypeGzip, gzip.DefaultCompression}
client := http.Client{}
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, configcompression.TypeGzip)
client.Transport, err = newCompressRoundTripper(http.DefaultTransport, compression)
require.NoError(t, err)
_, err = client.Do(req)
require.Error(t, err)
Expand Down Expand Up @@ -376,7 +385,7 @@ func TestOverrideCompressionList(t *testing.T) {

func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
gw, _ := gzip.NewWriterLevel(&buf, gzip.BestSpeed)
_, err := gw.Write(body)
require.NoError(t, err)
require.NoError(t, gw.Close())
Expand All @@ -385,7 +394,7 @@ func compressGzip(t testing.TB, body []byte) *bytes.Buffer {

func compressZlib(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw, _ := zlib.NewWriterLevel(&buf, zlib.BestSpeed)
_, err := zw.Write(body)
require.NoError(t, err)
require.NoError(t, zw.Close())
Expand All @@ -403,7 +412,9 @@ func compressSnappy(t testing.TB, body []byte) *bytes.Buffer {

func compressZstd(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
zw, _ := zstd.NewWriter(&buf)
compression := zstd.SpeedFastest
encoderLevel := zstd.WithEncoderLevel(compression)
zw, _ := zstd.NewWriter(&buf, encoderLevel)
_, err := zw.Write(body)
require.NoError(t, err)
require.NoError(t, zw.Close())
Expand Down
32 changes: 13 additions & 19 deletions config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,30 @@ type writeCloserReset interface {
Reset(w io.Writer)
}

var (
_ writeCloserReset = (*gzip.Writer)(nil)
gZipPool = &compressor{pool: sync.Pool{New: func() any { return gzip.NewWriter(nil) }}}
_ writeCloserReset = (*snappy.Writer)(nil)
snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}
_ writeCloserReset = (*zstd.Encoder)(nil)
// Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool.
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
)

type compressor struct {
pool sync.Pool
}

// writerFactory defines writer field in CompressRoundTripper.
// The validity of input is already checked when NewCompressRoundTripper was called in confighttp,
func newCompressor(compressionType configcompression.Type) (*compressor, error) {
switch compressionType {
func newCompressor(compressionopts CompressionOptions) (*compressor, error) {
switch compressionopts.compressionType {
case configcompression.TypeGzip:
return gZipPool, nil
var _ writeCloserReset = (*gzip.Writer)(nil)
return &compressor{pool: sync.Pool{New: func() any { w, _ := gzip.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil
case configcompression.TypeSnappy:
return snappyPool, nil
var _ writeCloserReset = (*snappy.Writer)(nil)
return &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}, nil
case configcompression.TypeZstd:
return zStdPool, nil
var _ writeCloserReset = (*zstd.Encoder)(nil)
compression := zstd.EncoderLevelFromZstd(compressionopts.compressionLevel)
encoderLevel := zstd.WithEncoderLevel(compression)
return &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), encoderLevel); return zw }}}, nil
case configcompression.TypeZlib, configcompression.TypeDeflate:
return zLibPool, nil
var _ writeCloserReset = (*zlib.Writer)(nil)
return &compressor{pool: sync.Pool{New: func() any { w, _ := zlib.NewWriterLevel(nil, compressionopts.compressionLevel); return w }}}, nil
}
return nil, errors.New("unsupported compression type, ")
return nil, errors.New("unsupported compression type")
}

func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error {
Expand Down
34 changes: 31 additions & 3 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net/http"
"net/http/cookiejar"
"net/url"
"strconv"
"strings"
"time"

"github.com/rs/cors"
Expand Down Expand Up @@ -132,6 +134,21 @@ func NewDefaultClientConfig() ClientConfig {
}
}

func setCompression(compressionField configcompression.Type) (compressionType configcompression.Type, compressionLevel int) {
parts := strings.Split(string(compressionField), "/")

// Set compression type
compressionLevel = 1
compressionType = configcompression.Type(parts[0])
if len(parts) > 1 {
levelStr := parts[1]
if level, err := strconv.Atoi(levelStr); err == nil {
compressionLevel = level
}
}
return compressionType, compressionLevel
}

// ToClient creates an HTTP client.
func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, settings component.TelemetrySettings) (*http.Client, error) {
tlsCfg, err := hcs.TLSSetting.LoadTLSConfig(ctx)
Expand Down Expand Up @@ -217,9 +234,20 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett
// Compress the body using specified compression methods if non-empty string is provided.
// Supporting gzip, zlib, deflate, snappy, and zstd; none is treated as uncompressed.
if hcs.Compression.IsCompressed() {
clientTransport, err = newCompressRoundTripper(clientTransport, hcs.Compression)
if err != nil {
return nil, err
if hcs.Compression.IsZstd() || hcs.Compression.IsGzip() || hcs.Compression.IsZlib() {
compressionType, compressionLevel := setCompression(hcs.Compression)
compression := CompressionOptions{compressionType, compressionLevel}
clientTransport, err = newCompressRoundTripper(clientTransport, compression)
if err != nil {
return nil, err
}
} else {
// Use the default if the compression level is not specified.
compressionopts := CompressionOptions{hcs.Compression, -1}
clientTransport, err = newCompressRoundTripper(clientTransport, compressionopts)
if err != nil {
return nil, err
}
}
}

Expand Down

0 comments on commit 0500415

Please sign in to comment.