Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sampling] Fix merging of per-operation strategies into service strategies without them #5277

Merged
merged 16 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"probabilisticSampling": {
"samplingRate": 1
},
"operationSampling": {
"defaultSamplingProbability": 1,
"perOperationStrategies": [
{
"operation": "/health",
"probabilisticSampling": {
"samplingRate": 0.1
}
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"strategyType": 1,
"rateLimitingSampling": {
"maxTracesPerSecond": 3
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"probabilisticSampling": {
"samplingRate": 1
},
"operationSampling": {
"defaultSamplingProbability": 1,
"perOperationStrategies": [
{
"operation": "/health",
"probabilisticSampling": {
"samplingRate": 0.1
}
}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"strategyType": 1,
"rateLimitingSampling": {
"maxTracesPerSecond": 3
},
"operationSampling": {
"defaultSamplingProbability": 0.2,
"perOperationStrategies": [
{
"operation": "/health",
"probabilisticSampling": {
"samplingRate": 0.1
}
}
]
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
{
"service_strategies": [
{
"service": "ServiceA",
"type": "probabilistic",
"param": 1.0
},
{
"service": "ServiceB",
"type": "ratelimiting",
"param": 3
}
],
"default_strategy": {
"type": "probabilistic",
"param": 0.2,
"operation_strategies": [
{
"operation": "/health",
"type": "probabilistic",
"param": 0.0
}
]
}
}
{
"service_strategies": [
{
"service": "ServiceA",
"type": "probabilistic",
"param": 1.0
},
{
"service": "ServiceB",
"type": "ratelimiting",
"param": 3
}
],
"default_strategy": {
"type": "probabilistic",
"param": 0.2,
"operation_strategies": [
{
"operation": "/health",
"type": "probabilistic",
"param": 0.1
}
]
}
}
7 changes: 7 additions & 0 deletions plugin/sampling/strategystore/static/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
// samplingStrategiesFile contains the name of CLI option for config file.
samplingStrategiesFile = "sampling.strategies-file"
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
samplingStrategiesBugfix5270 = "sampling.strategies.bugfix-5270"
)

// Options holds configuration for the static sampling strategy store.
Expand All @@ -33,17 +34,23 @@ type Options struct {
StrategiesFile string
// ReloadInterval is the time interval to check and reload sampling strategies file
ReloadInterval time.Duration
// Flag for enabling possibly breaking change which includes default operations level
// strategies when calculating Ratelimiting type service level strategy
// more information https://github.com/jaegertracing/jaeger/issues/5270
IncludeDefaultOpStrategies bool
}

// AddFlags adds flags for Options
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no reloading")
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
flagSet.Bool(samplingStrategiesBugfix5270, false, "Include default operation level strategies for Ratesampling type service level strategy. Cf. https://github.com/jaegertracing/jaeger/issues/5270")
}

// InitFromViper initializes Options with properties from viper
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
opts.IncludeDefaultOpStrategies = v.GetBool(samplingStrategiesBugfix5270)
return opts
}
63 changes: 56 additions & 7 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
storedStrategies atomic.Value // holds *storedStrategies

cancelFunc context.CancelFunc

options Options
}

type storedStrategies struct {
Expand All @@ -58,20 +60,32 @@
h := &strategyStore{
logger: logger,
cancelFunc: cancelFunc,
options: options,
}
h.storedStrategies.Store(defaultStrategies())

if options.StrategiesFile == "" {
h.parseStrategies(nil)
h.logger.Info("No sampling strategies source provided, using defaults")
return h, nil
}

loadFn := h.samplingStrategyLoader(options.StrategiesFile)
strategies, err := loadStrategies(loadFn)
if err != nil {
return nil, err
} else if strategies == nil {
h.logger.Info("No sampling strategies found or URL is unavailable, using defaults")
return h, nil
}

if !h.options.IncludeDefaultOpStrategies {
h.logger.Warn("Default operations level strategies will not be included for Ratelimiting service strategies." +
"This behavior will be changed in future releases. " +
"Cf. https://github.com/jaegertracing/jaeger/issues/5270")
h.parseStrategies_deprecated(strategies)
} else {
h.parseStrategies(strategies)
}
h.parseStrategies(strategies)

if options.ReloadInterval > 0 {
go h.autoUpdateStrategies(ctx, options.ReloadInterval, loadFn)
Expand Down Expand Up @@ -206,11 +220,7 @@
return strategies, nil
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
if strategies == nil {
h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults")
return
}
func (h *strategyStore) parseStrategies_deprecated(strategies *strategies) {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
newStore := defaultStrategies()
if strategies.DefaultStrategy != nil {
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
Expand Down Expand Up @@ -249,6 +259,45 @@
h.storedStrategies.Store(newStore)
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
newStore := defaultStrategies()
if strategies.DefaultStrategy != nil {
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
}

for _, s := range strategies.ServiceStrategies {
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)

// Config for this service may not have per-operation strategies,
// but if the default strategy has them they should still apply.

if newStore.defaultStrategy.OperationSampling == nil {
// Default strategy doens't have them either, nothing to do.
continue
}

opS := newStore.serviceStrategies[s.Service].OperationSampling
if opS == nil {

// Service does not have its own per-operation rules, so copy (by value) from the default strategy.
newOpS := *newStore.defaultStrategy.OperationSampling

// If the service's own default is probabilistic, then its sampling rate should take precedence.
if newStore.serviceStrategies[s.Service].ProbabilisticSampling != nil {
newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate
}
newStore.serviceStrategies[s.Service].OperationSampling = &newOpS
continue
}

// If the service did have its own per-operation strategies, then merge them with the default ones.
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
opS.PerOperationStrategies,
newStore.defaultStrategy.OperationSampling.PerOperationStrategies)

Check warning on line 296 in plugin/sampling/strategystore/static/strategy_store.go

View check run for this annotation

Codecov / codecov/patch

plugin/sampling/strategystore/static/strategy_store.go#L294-L296

Added lines #L294 - L296 were not covered by tests
}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
h.storedStrategies.Store(newStore)
}

// mergePerOperationSamplingStrategies merges two operation strategies a and b, where a takes precedence over b.
func mergePerOperationSamplingStrategies(
a, b []*api_v2.OperationSamplingStrategy,
Expand Down
99 changes: 78 additions & 21 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package static

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
Expand All @@ -34,26 +36,33 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

const snapshotLocation = "./fixtures/"

// Snapshots can be regenerated via:
//
// REGENERATE_SNAPSHOTS=true go test -v ./plugin/sampling/strategystore/static/strategy_store_test.go
var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true"

// strategiesJSON returns the strategy with
// a given probability.
func strategiesJSON(probability float32) string {
strategy := fmt.Sprintf(`
{
"default_strategy": {
"type": "probabilistic",
"param": 0.5
},
"service_strategies": [
{
"service": "foo",
"type": "probabilistic",
"param": %.1f
"param": 0.5
},
{
"service": "bar",
"type": "ratelimiting",
"param": 5
}
"service_strategies": [
{
"service": "foo",
"type": "probabilistic",
"param": %.1f
},
{
"service": "bar",
"type": "ratelimiting",
"param": 5
}
]
}
`,
Expand Down Expand Up @@ -107,7 +116,7 @@ func TestStrategyStoreWithFile(t *testing.T) {
logger, buf := testutils.NewLogger()
store, err := NewStrategyStore(Options{}, logger)
require.NoError(t, err)
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults")
assert.Contains(t, buf.String(), "No sampling strategies source provided, using defaults")
s, err := store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s)
Expand All @@ -134,7 +143,7 @@ func TestStrategyStoreWithURL(t *testing.T) {
mockServer, _ := mockStrategyServer(t)
store, err := NewStrategyStore(Options{StrategiesFile: mockServer.URL + "/service-unavailable"}, logger)
require.NoError(t, err)
assert.Contains(t, buf.String(), "No sampling strategies provided or URL is unavailable, using defaults")
assert.Contains(t, buf.String(), "No sampling strategies found or URL is unavailable, using defaults")
s, err := store.GetSamplingStrategy(context.Background(), "foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s)
Expand Down Expand Up @@ -465,18 +474,66 @@ func TestAutoUpdateStrategyErrors(t *testing.T) {
}

func TestServiceNoPerOperationStrategies(t *testing.T) {
store, err := NewStrategyStore(Options{StrategiesFile: "fixtures/service_no_per_operation.json"}, zap.NewNop())
// given setup of strategy store with no specific per operation sampling strategies
// and option "sampling.strategies.bugfix-5270=true"
store, err := NewStrategyStore(Options{
StrategiesFile: "fixtures/service_no_per_operation.json",
IncludeDefaultOpStrategies: true,
}, zap.NewNop())
require.NoError(t, err)

s, err := store.GetSamplingStrategy(context.Background(), "ServiceA")
require.NoError(t, err)
assert.Equal(t, 1.0, s.OperationSampling.DefaultSamplingProbability)
for _, service := range []string{"ServiceA", "ServiceB"} {
t.Run(service, func(t *testing.T) {
strategy, err := store.GetSamplingStrategy(context.Background(), service)
require.NoError(t, err)
strategyJson, err := json.MarshalIndent(strategy, "", " ")
require.NoError(t, err)

testName := strings.ReplaceAll(t.Name(), "/", "_")
snapshotFile := filepath.Join(snapshotLocation, testName+".json")
expectedServiceResponse, err := os.ReadFile(snapshotFile)
require.NoError(t, err)

assert.Equal(t, string(expectedServiceResponse), string(strategyJson),
"comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.")

s, err = store.GetSamplingStrategy(context.Background(), "ServiceB")
if regenerateSnapshots {
os.WriteFile(snapshotFile, strategyJson, 0o644)
}
})
}
}

func TestServiceNoPerOperationStrategiesDeprecatedBehavior(t *testing.T) {
// test case to be removed along with removal of strategy_store.parseStrategies_deprecated,
// see https://github.com/jaegertracing/jaeger/issues/5270 for more details

// given setup of strategy store with no specific per operation sampling strategies
store, err := NewStrategyStore(Options{
StrategiesFile: "fixtures/service_no_per_operation.json",
}, zap.NewNop())
require.NoError(t, err)

expected := makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 3)
assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling)
for _, service := range []string{"ServiceA", "ServiceB"} {
t.Run(service, func(t *testing.T) {
strategy, err := store.GetSamplingStrategy(context.Background(), service)
require.NoError(t, err)
strategyJson, err := json.MarshalIndent(strategy, "", " ")
require.NoError(t, err)

testName := strings.ReplaceAll(t.Name(), "/", "_")
snapshotFile := filepath.Join(snapshotLocation, testName+".json")
expectedServiceResponse, err := os.ReadFile(snapshotFile)
require.NoError(t, err)

assert.Equal(t, string(expectedServiceResponse), string(strategyJson),
"comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.")

if regenerateSnapshots {
os.WriteFile(snapshotFile, strategyJson, 0o644)
}
})
}
}

func TestSamplingStrategyLoader(t *testing.T) {
Expand Down
Loading