From eaf3eea65a5c93f2fefb2ac7983e94f24b77f3b1 Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Tue, 8 Oct 2024 23:15:59 -0400 Subject: [PATCH] Adding support for lz4 compression --- config/configcompression/compressiontype.go | 2 ++ config/confighttp/compressor.go | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index 1ad0302f264..2abf6609a24 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -48,6 +48,7 @@ func (ct *TypeWithLevel) UnmarshalText(in []byte) error { return fmt.Errorf("invalid compression level: %q", parts[1]) } if compressionTyp == TypeSnappy || + compressionTyp == TypeLz4 || compressionTyp == typeNone || compressionTyp == typeEmpty { return fmt.Errorf("compression level is not supported for %q", compressionTyp) @@ -58,6 +59,7 @@ func (ct *TypeWithLevel) UnmarshalText(in []byte) error { (compressionTyp == TypeZlib && isValidLevel(level)) || (compressionTyp == TypeDeflate && isValidLevel(level)) || compressionTyp == TypeSnappy || + compressionTyp == TypeLz4 || compressionTyp == TypeZstd || compressionTyp == typeNone || compressionTyp == typeEmpty { diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 1c4f6854b08..57ffc865a13 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -13,6 +13,7 @@ import ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" "go.opentelemetry.io/collector/config/configcompression" ) @@ -31,10 +32,12 @@ var ( snappyCompressor = &compressor{} zstdCompressor = &compressor{} zlibCompressor = &compressor{} + lz4Compressor = &compressor{} _ writeCloserReset = (*gzip.Writer)(nil) _ writeCloserReset = (*snappy.Writer)(nil) _ writeCloserReset = (*zstd.Encoder)(nil) _ writeCloserReset = (*zlib.Writer)(nil) + _ writeCloserReset = (*lz4.Writer)(nil) ) // writerFactory defines writer field in CompressRoundTripper. @@ -67,6 +70,12 @@ func newCompressor(compressionType configcompression.TypeWithLevel) (*compressor return zlibCompressor, nil } return zlibCompressor, nil + case configcompression.TypeLz4: + if lz4Compressor.pool.Get() == nil { + lz4Compressor.pool = sync.Pool{New: func() any { lz := lz4.NewWriter(nil); _ = lz.Apply(lz4.ConcurrencyOption(1)); return lz }} + return lz4Compressor, nil + } + return lz4Compressor, nil } return nil, errors.New("unsupported compression type") }