Skip to content

Commit

Permalink
Disable concurrency in zstd and add Benchmark tests for it (open-tele…
Browse files Browse the repository at this point in the history
…metry#9749)

**Description:** zstd benchmark tests added
The goal of this PR is to disable concurrency in zstd compression to
reduce its memory footprint and avoid a known issue with goroutine
leaks. Please see - klauspost/compress#264

**Link to tracking Issue:**
open-telemetry#8216

**Testing:** Benchmark test results below
```
BenchmarkCompression/zstdWithConcurrency/compress-10         	   21392	     55855 ns/op	187732.88 MB/s	 2329164 B/op	      28 allocs/op
BenchmarkCompression/zstdNoConcurrency/compress-10           	   29526	     39902 ns/op	262787.42 MB/s	 1758988 B/op	      15 allocs/op
input => 10.00 MB
```
  • Loading branch information
rnishtala-sumo authored Apr 12, 2024
1 parent b1e075f commit 7a8954f
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 3 deletions.
25 changes: 25 additions & 0 deletions .chloggen/bnchmk-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Disable concurrency in zstd compression

# One or more tracking issues or pull requests related to the change
issues: [8216]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 4 additions & 3 deletions config/confighttp/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (
_ writeCloserReset = (*snappy.Writer)(nil)
snappyPool = &compressor{pool: sync.Pool{New: func() any { return snappy.NewBufferedWriter(nil) }}}
_ writeCloserReset = (*zstd.Encoder)(nil)
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
// Concurrency 1 disables async decoding via goroutines. This is useful to reduce memory usage and isn't a bottleneck for compression using sync.Pool.
zStdPool = &compressor{pool: sync.Pool{New: func() any { zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)); return zw }}}
_ writeCloserReset = (*zlib.Writer)(nil)
zLibPool = &compressor{pool: sync.Pool{New: func() any { return zlib.NewWriter(nil) }}}
)

type compressor struct {
Expand Down
96 changes: 96 additions & 0 deletions config/confighttp/compressor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// This file contains helper functions regarding compression/decompression for confighttp.

package confighttp // import "go.opentelemetry.io/collector/config/confighttp"

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

"github.com/klauspost/compress/zstd"

"go.opentelemetry.io/collector/config/configcompression"
)

func BenchmarkCompression(b *testing.B) {
benchmarks := []struct {
codec configcompression.Type
name string
function func(*testing.B, configcompression.Type, *bytes.Buffer, []byte)
}{
{
codec: configcompression.TypeZstd,
name: "zstdWithConcurrency",
function: benchmarkCompression,
},
{
codec: configcompression.TypeZstd,
name: "zstdNoConcurrency",
function: benchmarkCompressionNoConcurrency,
},
}
payload := make([]byte, 10<<20)
buffer := bytes.Buffer{}
buffer.Grow(len(payload))

ts := &bytes.Buffer{}
defer func() {
fmt.Printf("input => %.2f MB\n", float64(len(payload))/(1024*1024))
fmt.Println(ts)
}()

for i := range benchmarks {
benchmark := &benchmarks[i]
b.Run(fmt.Sprint(benchmark.name), func(b *testing.B) {
benchmark.function(b, benchmark.codec, &buffer, payload)
})

}
}

func benchmarkCompression(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) {
// Concurrency Enabled

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(5))
enc.(writeCloserReset).Reset(buf)
_, copyErr := io.Copy(enc, stringReadCloser)
if copyErr != nil {
b.Fatal(copyErr)
}
}
})
}

func benchmarkCompressionNoConcurrency(b *testing.B, _ configcompression.Type, buf *bytes.Buffer, payload []byte) {
// Concurrency Disabled

b.Run("compress", func(b *testing.B) {
stringReader := strings.NewReader(string(payload))
stringReadCloser := io.NopCloser(stringReader)
var enc io.Writer
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(payload)))
for i := 0; i < b.N; i++ {
enc, _ = zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
enc.(writeCloserReset).Reset(buf)
_, copyErr := io.Copy(enc, stringReadCloser)
if copyErr != nil {
b.Fatal(copyErr)
}
}
})
}

0 comments on commit 7a8954f

Please sign in to comment.