Skip to content

Commit

Permalink
Adding support for lz4 compression
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Oct 11, 2024
1 parent e4adf36 commit eaf3eea
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 0 deletions.
2 changes: 2 additions & 0 deletions config/configcompression/compressiontype.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (ct *TypeWithLevel) UnmarshalText(in []byte) error {
return fmt.Errorf("invalid compression level: %q", parts[1])
}
if compressionTyp == TypeSnappy ||
compressionTyp == TypeLz4 ||
compressionTyp == typeNone ||
compressionTyp == typeEmpty {
return fmt.Errorf("compression level is not supported for %q", compressionTyp)
Expand All @@ -58,6 +59,7 @@ func (ct *TypeWithLevel) UnmarshalText(in []byte) error {
(compressionTyp == TypeZlib && isValidLevel(level)) ||
(compressionTyp == TypeDeflate && isValidLevel(level)) ||
compressionTyp == TypeSnappy ||
compressionTyp == TypeLz4 ||
compressionTyp == TypeZstd ||
compressionTyp == typeNone ||
compressionTyp == typeEmpty {
Expand Down
9 changes: 9 additions & 0 deletions config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"

"go.opentelemetry.io/collector/config/configcompression"
)
Expand All @@ -31,10 +32,12 @@ var (
snappyCompressor = &compressor{}
zstdCompressor = &compressor{}
zlibCompressor = &compressor{}
lz4Compressor = &compressor{}
_ writeCloserReset = (*gzip.Writer)(nil)
_ writeCloserReset = (*snappy.Writer)(nil)
_ writeCloserReset = (*zstd.Encoder)(nil)
_ writeCloserReset = (*zlib.Writer)(nil)
_ writeCloserReset = (*lz4.Writer)(nil)
)

// writerFactory defines writer field in CompressRoundTripper.
Expand Down Expand Up @@ -67,6 +70,12 @@ func newCompressor(compressionType configcompression.TypeWithLevel) (*compressor
return zlibCompressor, nil
}
return zlibCompressor, nil
case configcompression.TypeLz4:
if lz4Compressor.pool.Get() == nil {
lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }}
return lz4Compressor, nil
}
return lz4Compressor, nil
}
return nil, errors.New("unsupported compression type")
}
Expand Down

0 comments on commit eaf3eea

Please sign in to comment.