diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 3be3a37af5d..e34fb2ff7d6 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -1,9 +1,9 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp, jaeger, zipkin] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: @@ -25,6 +25,16 @@ extensions: memstore_archive: max_traces: 100000 + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + sampling_store: memstore + initial_sampling_probability: 0.1 + http: + grpc: + receivers: otlp: protocols: @@ -42,6 +52,9 @@ receivers: processors: batch: + # Adaptive Sampling Processor is required to support adaptive sampling. + # It expects remote_sampling extension with `adaptive:` config to be enabled. + adaptive_sampling: exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index f187e731c07..7a615785b4d 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -30,7 +30,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling" ) type builders struct { @@ -63,7 +65,7 @@ func (b builders) build() (otelcol.Factories, error) { jaegerquery.NewFactory(), jaegerstorage.NewFactory(), storagecleaner.NewFactory(), - // TODO add adaptive sampling + remotesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err @@ -101,7 +103,7 @@ func (b builders) build() (otelcol.Factories, error) { batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), // add-ons - // TODO add adaptive sampling + adaptivesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a93f8eb178d..8342e8e2070 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -57,7 +57,7 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error f, ok := comp.(Extension).Factory(name) if !ok { return nil, fmt.Errorf( - "cannot find storage '%s' declared by '%s' extension", + "cannot find definition of storage '%s' in the configuration for extension '%s'", name, componentType, ) } diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index 4d42ff4289f..5a9bd7c4a9a 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -83,7 +83,7 @@ func TestStorageFactoryBadHostError(t *testing.T) { func TestStorageFactoryBadNameError(t *testing.T) { host := storageHost{t: t, ext: startStorageExtension(t, "foo")} _, err := GetStorageFactory("bar", host) - require.ErrorContains(t, err, "cannot find storage 'bar'") + require.ErrorContains(t, err, "cannot find definition of storage 'bar'") } func TestStorageFactoryBadShutdownError(t *testing.T) { diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go new file mode 100644 index 00000000000..edff4707e26 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -0,0 +1,91 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "errors" + + "github.com/asaskevich/govalidator" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap" + + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" +) + +var ( + errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'") + errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") +) + +var ( + _ component.Config = (*Config)(nil) + _ component.ConfigValidator = (*Config)(nil) + _ confmap.Unmarshaler = (*Config)(nil) +) + +type Config struct { + File *FileConfig `mapstructure:"file"` + Adaptive *AdaptiveConfig `mapstructure:"adaptive"` + HTTP *confighttp.ServerConfig `mapstructure:"http"` + GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` +} + +type FileConfig struct { + // File specifies a local file as the source of sampling strategies. + Path string `valid:"required" mapstructure:"path"` +} + +type AdaptiveConfig struct { + // SamplingStore is the name of the storage defined in the jaegerstorage extension. + SamplingStore string `valid:"required" mapstructure:"sampling_store"` + + adaptive.Options `mapstructure:",squash"` +} + +// Unmarshal is a custom unmarshaler that allows the factory to provide default values +// for nested configs (like GRPC endpoint) yes still reset the pointers to nil if the +// config did not contain the corresponding sections. +// This is a workaround for the lack of opional fields support in OTEL confmap. +// Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/10266 +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + // first load the config normally + err := conf.Unmarshal(cfg) + if err != nil { + return err + } + + // use string names of fields to see if they are set in the confmap + if !conf.IsSet("file") { + cfg.File = nil + } + + if !conf.IsSet("adaptive") { + cfg.Adaptive = nil + } + + if !conf.IsSet("grpc") { + cfg.GRPC = nil + } + + if !conf.IsSet("http") { + cfg.HTTP = nil + } + + return nil +} + +func (cfg *Config) Validate() error { + if cfg.File == nil && cfg.Adaptive == nil { + return errNoProvider + } + + if cfg.File != nil && cfg.Adaptive != nil { + return errMultipleProviders + } + + _, err := govalidator.ValidateStruct(cfg) + return err +} diff --git a/cmd/jaeger/internal/extension/remotesampling/config_test.go b/cmd/jaeger/internal/extension/remotesampling/config_test.go new file mode 100644 index 00000000000..c5071423c19 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config_test.go @@ -0,0 +1,134 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func Test_Validate(t *testing.T) { + tests := []struct { + name string + config *Config + expectedErr string + }{ + { + name: "No provider specified", + config: &Config{}, + expectedErr: "no sampling strategy provider specified, expecting 'adaptive' or 'file'", + }, + { + name: "Both providers specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "only one sampling strategy provider can be specified, 'adaptive' or 'file'", + }, + { + name: "Only File provider specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Only Adaptive provider specified", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Invalid File provider", + config: &Config{ + File: &FileConfig{Path: ""}, + }, + expectedErr: "File.Path: non zero value required", + }, + { + name: "Invalid Adaptive provider", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: ""}, + }, + expectedErr: "Adaptive.SamplingStore: non zero value required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + assert.Equal(t, tt.expectedErr, err.Error()) + } + }) + } +} + +func Test_Unmarshal(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedCfg *Config + expectedErr string + }{ + { + name: "Valid config with File", + input: map[string]any{ + "file": map[string]any{ + "path": "test-path", + }, + }, + expectedCfg: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Valid config with Adaptive", + input: map[string]any{ + "adaptive": map[string]any{ + "sampling_store": "test-store", + }, + }, + expectedCfg: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Empty config", + input: map[string]any{}, + expectedCfg: &Config{}, + expectedErr: "", + }, + { + name: "Invalid config", + input: map[string]any{ + "foo": "bar", + }, + expectedErr: "invalid keys: foo", // sensitive to lib implementation + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.input) + var cfg Config + err := cfg.Unmarshal(conf) + if tt.expectedErr == "" { + require.NoError(t, err) + assert.Equal(t, tt.expectedCfg, &cfg) + } else { + assert.ErrorContains(t, err, tt.expectedErr) + } + }) + } +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go new file mode 100644 index 00000000000..58552d0eaab --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -0,0 +1,296 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +var _ extension.Extension = (*rsExtension)(nil) + +// type Extension interface { +// extension.Extension +// // rs *rsExtension +// } + +const defaultResourceName = "sampling_store_leader" + +type rsExtension struct { + cfg *Config + telemetry component.TelemetrySettings + httpServer *http.Server + grpcServer *grpc.Server + strategyProvider samplingstrategy.Provider // TODO we should rename this to Provider, not "store" + adaptiveStore samplingstore.Store + distLock *leaderelection.DistributedElectionParticipant + shutdownWG sync.WaitGroup +} + +func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension { + return &rsExtension{ + cfg: cfg, + telemetry: telemetry, + } +} + +// AdaptiveSamplingComponents is a struct that holds the components needed for adaptive sampling. +type AdaptiveSamplingComponents struct { + SamplingStore samplingstore.Store + DistLock *leaderelection.DistributedElectionParticipant + Options *adaptive.Options +} + +// GetAdaptiveSamplingComponents locates the `remotesampling` extension in Host +// and returns the sampling store and a loader/follower implementation, provided +// that the extension is configured with adaptive sampling (vs. file-based config). +func GetAdaptiveSamplingComponents(host component.Host) (*AdaptiveSamplingComponents, error) { + var comp component.Component + var compID component.ID + for id, ext := range host.GetExtensions() { + if id.Type() == ComponentType { + comp = ext + compID = id + break + } + } + if comp == nil { + return nil, fmt.Errorf( + "cannot find extension '%s' (make sure it's defined earlier in the config)", + ComponentType, + ) + } + ext, ok := comp.(*rsExtension) + if !ok { + return nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, ComponentType) + } + if ext.adaptiveStore == nil || ext.distLock == nil { + return nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID) + } + return &AdaptiveSamplingComponents{ + SamplingStore: ext.adaptiveStore, + DistLock: ext.distLock, + Options: &ext.cfg.Adaptive.Options, + }, nil +} + +func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { + if ext.cfg.File != nil { + ext.telemetry.Logger.Info( + "Starting file-based sampling strategy provider", + zap.String("path", ext.cfg.File.Path), + ) + if err := ext.startFileBasedStrategyProvider(ctx); err != nil { + return err + } + } + + if ext.cfg.Adaptive != nil { + ext.telemetry.Logger.Info( + "Starting adaptive sampling strategy provider", + zap.String("sampling_store", ext.cfg.Adaptive.SamplingStore), + ) + if err := ext.startAdaptiveStrategyProvider(host); err != nil { + return err + } + } + + if ext.cfg.HTTP != nil { + if err := ext.startHTTPServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling http server: %w", err) + } + } + + if ext.cfg.GRPC != nil { + if err := ext.startGRPCServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling gRPC server: %w", err) + } + } + + return nil +} + +func (ext *rsExtension) Shutdown(ctx context.Context) error { + var errs []error + + if ext.httpServer != nil { + if err := ext.httpServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the sampling HTTP server: %w", err)) + } + } + + if ext.grpcServer != nil { + ext.grpcServer.GracefulStop() + } + + if ext.distLock != nil { + if err := ext.distLock.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the distributed lock: %w", err)) + } + } + + if ext.strategyProvider != nil { + if err := ext.strategyProvider.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop strategy provider: %w", err)) + } + } + return errors.Join(errs...) +} + +func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error { + opts := static.Options{ + StrategiesFile: ext.cfg.File.Path, + } + + // contextcheck linter complains about next line that context is not passed. + //nolint + provider, err := static.NewProvider(opts, ext.telemetry.Logger) + if err != nil { + return fmt.Errorf("failed to create the local file strategy store: %w", err) + } + + ext.strategyProvider = provider + return nil +} + +func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { + storageName := ext.cfg.Adaptive.SamplingStore + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("cannot find storage factory: %w", err) + } + + storeFactory, ok := f.(storage.SamplingStoreFactory) + if !ok { + return fmt.Errorf("storage '%s' does not support sampling store", storageName) + } + + store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets) + if err != nil { + return fmt.Errorf("failed to create the sampling store: %w", err) + } + ext.adaptiveStore = store + + { + lock, err := storeFactory.CreateLock() + if err != nil { + return fmt.Errorf("failed to create the distributed lock: %w", err) + } + + ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, + leaderelection.ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, + Logger: ext.telemetry.Logger, + }) + if err := ep.Start(); err != nil { + return fmt.Errorf("failed to start the leader election participant: %w", err) + } + ext.distLock = ep + } + + provider := adaptive.NewProvider(ext.cfg.Adaptive.Options, ext.telemetry.Logger, ext.distLock, store) + if err := provider.Start(); err != nil { + return fmt.Errorf("failed to start the adaptive strategy store: %w", err) + } + ext.strategyProvider = provider + return nil +} + +func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { + handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ + ConfigManager: &clientcfghttp.ConfigManager{ + SamplingProvider: ext.strategyProvider, + }, + MetricsFactory: metrics.NullFactory, + BasePath: "/api", // TODO is /api correct? + }) + httpMux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(httpMux) + + var err error + if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { + return err + } + + ext.telemetry.Logger.Info( + "Starting remote sampling HTTP server", + zap.String("endpoint", ext.cfg.HTTP.Endpoint), + ) + var hln net.Listener + if hln, err = ext.cfg.HTTP.ToListener(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + err := ext.httpServer.Serve(hln) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { + var err error + if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { + return err + } + + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider)) + + healthServer := health.NewServer() // support health checks on the gRPC server + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) + + ext.telemetry.Logger.Info( + "Starting remote sampling GRPC server", + zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint), + ) + var gln net.Listener + if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + if err := ext.grpcServer.Serve(gln); err != nil && !errors.Is(err, grpc.ErrServerStopped) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (*rsExtension) Dependencies() []component.ID { + return []component.ID{jaegerstorage.ID} +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go new file mode 100644 index 00000000000..de8cb300e24 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -0,0 +1,252 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "io" + "net/http" + "path/filepath" + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +func makeStorageExtension(t *testing.T, memstoreName string) component.Host { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + TracerProvider: nooptrace.NewTracerProvider(), + MeterProvider: noopmetric.NewMeterProvider(), + } + extensionFactory := jaegerstorage.NewFactory() + storageExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: telemetrySettings, + }, + &jaegerstorage.Config{Backends: map[string]jaegerstorage.Backend{ + memstoreName: {Memory: &memory.Configuration{MaxTraces: 10000}}, + }}, + ) + require.NoError(t, err) + + host := storagetest.NewStorageHost() + host.WithExtension(jaegerstorage.ID, storageExtension) + + err = storageExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) }) + return host +} + +func makeRemoteSamplingExtension(t *testing.T, cfg component.Config) component.Host { + extensionFactory := NewFactory() + samplingExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + cfg, + ) + require.NoError(t, err) + host := storagetest.NewStorageHost().WithExtension(ID, samplingExtension) + storageHost := makeStorageExtension(t, "foobar") + + require.NoError(t, samplingExtension.Start(context.Background(), storageHost)) + t.Cleanup(func() { require.NoError(t, samplingExtension.Shutdown(context.Background())) }) + return host +} + +func TestStartFileBasedProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartHTTP(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = &confighttp.ServerConfig{ + Endpoint: "0.0.0.0:12345", + } + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + + resp, err := http.Get("http://0.0.0.0:12345/api/sampling?service=foo") + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedResponse := `{ + "probabilisticSampling": { + "samplingRate": 0.8 + }, + "strategyType": 0 + }` + require.JSONEq(t, expectedResponse, string(body)) + + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartGRPC(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = nil + cfg.GRPC = &configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: "0.0.0.0:12346", + Transport: "tcp", + }, + } + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + + conn, err := grpc.NewClient("0.0.0.0:12346", grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer conn.Close() + + c := api_v2.NewSamplingManagerClient(conn) + response, err := c.GetSamplingStrategy(context.Background(), &api_v2.SamplingStrategyParameters{ServiceName: "foo"}) + require.NoError(t, err) + require.Equal(t, 0.8, response.ProbabilisticSampling.SamplingRate) + + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartAdaptiveProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File = nil + cfg.Adaptive.SamplingStore = "foobar" + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) +} + +func TestStartAdaptiveStrategyProviderErrors(t *testing.T) { + host := storagetest.NewStorageHost() + ext := &rsExtension{ + cfg: &Config{ + Adaptive: &AdaptiveConfig{ + SamplingStore: "foobar", + }, + }, + } + err := ext.startAdaptiveStrategyProvider(host) + require.ErrorContains(t, err, "cannot find storage factory") +} + +func TestGetAdaptiveSamplingComponents(t *testing.T) { + // Success case + host := makeRemoteSamplingExtension(t, &Config{ + Adaptive: &AdaptiveConfig{ + SamplingStore: "foobar", + Options: adaptive.Options{ + FollowerLeaseRefreshInterval: 1, + LeaderLeaseRefreshInterval: 1, + AggregationBuckets: 1, + }, + }, + }) + + comps, err := GetAdaptiveSamplingComponents(host) + require.NoError(t, err) + assert.NotNil(t, comps.DistLock) + assert.NotNil(t, comps.SamplingStore) + assert.Equal(t, time.Duration(1), comps.Options.FollowerLeaseRefreshInterval) + assert.Equal(t, time.Duration(1), comps.Options.LeaderLeaseRefreshInterval) + assert.Equal(t, 1, comps.Options.AggregationBuckets) +} + +type wrongExtension struct{} + +func (*wrongExtension) Start(context.Context, component.Host) error { return nil } +func (*wrongExtension) Shutdown(context.Context) error { return nil } + +func TestGetAdaptiveSamplingComponentsErrors(t *testing.T) { + host := makeRemoteSamplingExtension(t, &Config{}) + _, err := GetAdaptiveSamplingComponents(host) + require.ErrorContains(t, err, "extension 'remote_sampling' is not configured for adaptive sampling") + + h1 := storagetest.NewStorageHost() + _, err = GetAdaptiveSamplingComponents(h1) + require.ErrorContains(t, err, "cannot find extension") + + h2 := h1.WithExtension(ID, &wrongExtension{}) + _, err = GetAdaptiveSamplingComponents(h2) + require.ErrorContains(t, err, "is not of type") +} + +func TestDependencies(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + assert.Equal(t, []component.ID{jaegerstorage.ID}, ext.(*rsExtension).Dependencies()) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go new file mode 100644 index 00000000000..e3a1c9a5756 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -0,0 +1,57 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/ports" +) + +// ComponentType is the name of this extension in configuration. +var ComponentType = component.MustNewType("remote_sampling") + +var ID = component.NewID(ComponentType) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + ComponentType, + createDefaultConfig, + createExtension, + component.StabilityLevelBeta, + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + HTTP: &confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP + 100), + }, + GRPC: &configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorGRPC + 100), + Transport: confignet.TransportTypeTCP, + }, + }, + File: &FileConfig{ + Path: "", // path needs to be specified + }, + Adaptive: &AdaptiveConfig{ + SamplingStore: "", // storage name needs to be specified + Options: adaptive.DefaultOptions(), + }, + } +} + +func createExtension(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { + return newExtension(cfg.(*Config), set.TelemetrySettings), nil +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory_test.go b/cmd/jaeger/internal/extension/remotesampling/factory_test.go new file mode 100644 index 00000000000..a6de996e02e --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory_test.go @@ -0,0 +1,28 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension/extensiontest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + f := NewFactory() + r, err := f.CreateExtension(context.Background(), extensiontest.NewNopSettings(), cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/package_test.go b/cmd/jaeger/internal/extension/remotesampling/package_test.go new file mode 100644 index 00000000000..5bd9ea71735 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go new file mode 100644 index 00000000000..86f3a30c8d9 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -0,0 +1,18 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "go.opentelemetry.io/collector/component" +) + +var _ component.ConfigValidator = (*Config)(nil) + +type Config struct { + // all configuration for the processor is in the remotesampling extension +} + +func (*Config) Validate() error { + return nil +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go new file mode 100644 index 00000000000..601dcd60317 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -0,0 +1,48 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("adaptive_sampling") + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelBeta), + ) +} + +func createDefaultConfig() component.Config { + return &Config{} +} + +func createTracesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + sp := newTraceProcessor(*oCfg, set.TelemetrySettings) + return processorhelper.NewTracesProcessor( + ctx, + set, + cfg, + nextConsumer, + sp.processTraces, + processorhelper.WithStart(sp.start), + processorhelper.WithShutdown(sp.close), + ) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go new file mode 100644 index 00000000000..85e95b19cd0 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go @@ -0,0 +1,39 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) + require.NoError(t, cfg.Validate()) +} + +func TestCreateTracesProcessor(t *testing.T) { + ctx := context.Background() + cfg := createDefaultConfig().(*Config) + + nextConsumer := consumertest.NewNop() + set := processortest.NewNopSettings() + + tracesProcessor, err := createTracesProcessor(ctx, set, cfg, nextConsumer) + require.NoError(t, err) + assert.NotNil(t, tracesProcessor) +} + +func TestFactoryType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, componentType, factory.Type()) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/package_test.go b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go new file mode 100644 index 00000000000..10d464704eb --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go new file mode 100644 index 00000000000..9573b2257a3 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -0,0 +1,82 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "fmt" + + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" +) + +type traceProcessor struct { + config *Config + aggregator samplingstrategy.Aggregator + telset component.TelemetrySettings +} + +func newTraceProcessor(cfg Config, telset component.TelemetrySettings) *traceProcessor { + return &traceProcessor{ + config: &cfg, + telset: telset, + } +} + +func (tp *traceProcessor) start(_ context.Context, host component.Host) error { + parts, err := remotesampling.GetAdaptiveSamplingComponents(host) + if err != nil { + return fmt.Errorf( + "cannot load adaptive sampling components from `%s` extension: %w", + remotesampling.ComponentType, err) + } + + agg, err := adaptive.NewAggregator( + *parts.Options, + tp.telset.Logger, + otelmetrics.NewFactory(tp.telset.MeterProvider), + parts.DistLock, + parts.SamplingStore, + ) + if err != nil { + return fmt.Errorf("failed to create the adaptive sampling aggregator: %w", err) + } + + agg.Start() + tp.aggregator = agg + + return nil +} + +func (tp *traceProcessor) close(context.Context) error { + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the adaptive sampling aggregator : %w", err) + } + } + return nil +} + +func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches, err := otlp2jaeger.ProtoFromTraces(td) + if err != nil { + return td, fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err) + } + + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + adaptive.RecordThroughput(tp.aggregator, span, tp.telset.Logger) + } + } + return td, nil +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go new file mode 100644 index 00000000000..3c8561dc7d4 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go @@ -0,0 +1,171 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "errors" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pdata/ptrace" + noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +type samplingHost struct { + t *testing.T + samplingExtension component.Component +} + +func (host samplingHost) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + remotesampling.ID: host.samplingExtension, + } +} + +func (host samplingHost) ReportFatalError(err error) { + host.t.Fatal(err) +} + +func (samplingHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } +func (samplingHost) GetExporters() map[component.DataType]map[component.ID]component.Component { + return nil +} + +func makeStorageExtension(t *testing.T, memstoreName string) component.Host { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t), + TracerProvider: nooptrace.NewTracerProvider(), + MeterProvider: noopmetric.NewMeterProvider(), + } + extensionFactory := jaegerstorage.NewFactory() + storageExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: telemetrySettings, + }, + &jaegerstorage.Config{Backends: map[string]jaegerstorage.Backend{ + memstoreName: {Memory: &memory.Configuration{MaxTraces: 10000}}, + }}, + ) + require.NoError(t, err) + + host := storagetest.NewStorageHost() + host.WithExtension(jaegerstorage.ID, storageExtension) + + err = storageExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) }) + return host +} + +var _ component.Config = (*Config)(nil) + +func makeRemoteSamplingExtension(t *testing.T, cfg component.Config) samplingHost { + extensionFactory := remotesampling.NewFactory() + samplingExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + cfg, + ) + require.NoError(t, err) + host := samplingHost{t: t, samplingExtension: samplingExtension} + storageHost := makeStorageExtension(t, "foobar") + + err = samplingExtension.Start(context.Background(), storageHost) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, samplingExtension.Shutdown(context.Background())) }) + return host +} + +func TestNewTraceProcessor(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + } + config, ok := createDefaultConfig().(*Config) + require.True(t, ok) + newTraceProcessor := newTraceProcessor(*config, telemetrySettings) + require.NotNil(t, newTraceProcessor) +} + +func TestTraceProcessor(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + MeterProvider: noopmetric.NewMeterProvider(), + } + config := createDefaultConfig().(*Config) + traceProcessor := newTraceProcessor(*config, telemetrySettings) + + rsCfg := &remotesampling.Config{ + Adaptive: &remotesampling.AdaptiveConfig{ + SamplingStore: "foobar", + Options: adaptive.DefaultOptions(), + }, + } + host := makeRemoteSamplingExtension(t, rsCfg) + + rsCfg.Adaptive.Options.AggregationBuckets = 0 + err := traceProcessor.start(context.Background(), host) + require.ErrorContains(t, err, "AggregationBuckets must be greater than 0") + + rsCfg.Adaptive.Options = adaptive.DefaultOptions() + require.NoError(t, traceProcessor.start(context.Background(), host)) + + twww := makeTracesOneSpan() + trace, err := traceProcessor.processTraces(context.Background(), twww) + require.NoError(t, err) + require.NotNil(t, trace) + + err = traceProcessor.close(context.Background()) + require.NoError(t, err) +} + +func makeTracesOneSpan() ptrace.Traces { + traces := ptrace.NewTraces() + rSpans := traces.ResourceSpans().AppendEmpty() + sSpans := rSpans.ScopeSpans().AppendEmpty() + span := sSpans.Spans().AppendEmpty() + span.SetName("test") + return traces +} + +func TestGetAdaptiveSamplingComponentsError(t *testing.T) { + host := &samplingHost{} + processor := &traceProcessor{} + err := processor.start(context.Background(), host) + require.ErrorContains(t, err, "cannot load adaptive sampling components") +} + +// aggregator that returns error from Close() +type notClosingAgg struct{} + +func (*notClosingAgg) Close() error { return errors.New("not closing") } + +func (*notClosingAgg) HandleRootSpan(*model.Span, *zap.Logger) {} +func (*notClosingAgg) RecordThroughput(string, string, model.SamplerType, float64) {} +func (*notClosingAgg) Start() {} + +func TestTraceProcessorCloseError(t *testing.T) { + processor := &traceProcessor{ + aggregator: ¬ClosingAgg{}, + } + require.ErrorContains(t, processor.close(context.Background()), "not closing") +} diff --git a/cmd/jaeger/sampling-strategies.json b/cmd/jaeger/sampling-strategies.json new file mode 100644 index 00000000000..6928e6d0436 --- /dev/null +++ b/cmd/jaeger/sampling-strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.1 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 1 + } + ] +} diff --git a/go.mod b/go.mod index f639e9c8953..c35791e1352 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,11 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require ( + go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect + go.opentelemetry.io/collector/pdata/testdata v0.104.0 // indirect +) + require ( github.com/IBM/sarama v1.43.2 // indirect github.com/aws/aws-sdk-go v1.53.11 // indirect @@ -195,7 +200,7 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.104.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.11.0 // indirect - go.opentelemetry.io/collector/config/confignet v0.104.0 // indirect + go.opentelemetry.io/collector/config/confignet v0.104.0 go.opentelemetry.io/collector/config/configopaque v1.11.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect go.opentelemetry.io/collector/config/internal v0.104.0 // indirect diff --git a/pkg/clientcfg/clientcfghttp/handler.go b/pkg/clientcfg/clientcfghttp/handler.go index ab2cef08c5d..7bc22be9a4e 100644 --- a/pkg/clientcfg/clientcfghttp/handler.go +++ b/pkg/clientcfg/clientcfghttp/handler.go @@ -109,6 +109,17 @@ func (h *HTTPHandler) RegisterRoutes(router *mux.Router) { }).Methods(http.MethodGet) } +// RegisterRoutes registers configuration handlers with HTTP Router. +func (h *HTTPHandler) RegisterRoutesWithHTTP(router *http.ServeMux) { + prefix := h.params.BasePath + router.HandleFunc( + prefix+"/", + func(w http.ResponseWriter, r *http.Request) { + h.serveSamplingHTTP(w, r, h.encodeThriftLegacy) + }, + ) +} + func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) { services := r.URL.Query()["service"] if len(services) != 1 { diff --git a/pkg/clientcfg/clientcfghttp/handler_test.go b/pkg/clientcfg/clientcfghttp/handler_test.go index 6afc30d61b2..723047c1ed3 100644 --- a/pkg/clientcfg/clientcfghttp/handler_test.go +++ b/pkg/clientcfg/clientcfghttp/handler_test.go @@ -47,6 +47,7 @@ func withServer( basePath string, mockSamplingResponse *api_v2.SamplingStrategyResponse, mockBaggageResponse []*baggage.BaggageRestriction, + withGorilla bool, testFn func(server *testServer), ) { metricsFactory := metricstest.NewFactory(0) @@ -62,9 +63,18 @@ func withServer( BasePath: basePath, LegacySamplingEndpoint: true, }) - r := mux.NewRouter() - handler.RegisterRoutes(r) - server := httptest.NewServer(r) + + var server *httptest.Server + if withGorilla { + r := mux.NewRouter() + handler.RegisterRoutes(r) + server = httptest.NewServer(r) + } else { + mux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(mux) + server = httptest.NewServer(mux) + } + defer server.Close() testFn(&testServer{ metricsFactory: metricsFactory, @@ -76,15 +86,17 @@ func withServer( } func TestHTTPHandler(t *testing.T) { + testGorillaHTTPHandler(t, "") testHTTPHandler(t, "") } func TestHTTPHandlerWithBasePath(t *testing.T) { + testGorillaHTTPHandler(t, "/foo") testHTTPHandler(t, "/foo") } -func testHTTPHandler(t *testing.T, basePath string) { - withServer(basePath, rateLimiting(42), restrictions("luggage", 10), func(ts *testServer) { +func testGorillaHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), true, func(ts *testServer) { tests := []struct { endpoint string expOutput string @@ -146,6 +158,49 @@ func testHTTPHandler(t *testing.T, basePath string) { }) } +func testHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), false, func(ts *testServer) { + tests := []struct { + endpoint string + expOutput string + }{ + { + endpoint: "/", + expOutput: `{"strategyType":1,"rateLimitingSampling":{"maxTracesPerSecond":42}}`, + }, + } + for _, test := range tests { + t.Run("endpoint="+test.endpoint, func(t *testing.T) { + resp, err := http.Get(ts.server.URL + basePath + test.endpoint + "?service=Y") + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + err = resp.Body.Close() + require.NoError(t, err) + assert.Equal(t, test.expOutput, string(body)) + if test.endpoint == "/" { + objResp := &tSampling092.SamplingStrategyResponse{} + require.NoError(t, json.Unmarshal(body, objResp)) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetStrategyType(), + objResp.GetStrategyType()) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetRateLimitingSampling().GetMaxTracesPerSecond(), + objResp.GetRateLimitingSampling().GetMaxTracesPerSecond()) + } else { + objResp, err := p2json.SamplingStrategyResponseFromJSON(body) + require.NoError(t, err) + assert.EqualValues(t, ts.samplingProvider.samplingResponse, objResp) + } + }) + } + + // handler must emit metrics + ts.metricsFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{Name: "http-server.requests", Tags: map[string]string{"type": "sampling-legacy"}, Value: 1}) + }) +} + func TestHTTPHandlerErrors(t *testing.T) { testCases := []struct { description string @@ -215,61 +270,67 @@ func TestHTTPHandlerErrors(t *testing.T) { for _, tc := range testCases { testCase := tc // capture loop var t.Run(testCase.description, func(t *testing.T) { - withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, func(ts *testServer) { - resp, err := http.Get(ts.server.URL + testCase.url) - require.NoError(t, err) - assert.Equal(t, testCase.statusCode, resp.StatusCode) - if testCase.body != "" { - body, err := io.ReadAll(resp.Body) + for _, withGorilla := range []bool{true, false} { + withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, withGorilla, func(ts *testServer) { + resp, err := http.Get(ts.server.URL + testCase.url) require.NoError(t, err) - assert.Equal(t, testCase.body, string(body)) - } + assert.Equal(t, testCase.statusCode, resp.StatusCode) + if testCase.body != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, testCase.body, string(body)) + } - if len(testCase.metrics) > 0 { - ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) - } - }) + if len(testCase.metrics) > 0 { + ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) + } + }) + } }) } t.Run("failure to write a response", func(t *testing.T) { - withServer("", probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) { - handler := ts.handler + for _, withGorilla := range []bool{true, false} { + withServer("", probabilistic(0.001), restrictions("luggage", 10), withGorilla, func(ts *testServer) { + handler := ts.handler - req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) - w := &mockWriter{header: make(http.Header)} - handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) + req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) + w := &mockWriter{header: make(http.Header)} + handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) - req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) - handler.serveBaggageHTTP(w, req) + req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) + handler.serveBaggageHTTP(w, req) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) - }) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) + }) + } }) } func TestEncodeErrors(t *testing.T) { - withServer("", nil, nil, func(server *testServer) { - _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ - StrategyType: -1, - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, - }...) + for _, withGorilla := range []bool{true, false} { + withServer("", nil, nil, withGorilla, func(server *testServer) { + _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ + StrategyType: -1, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, + }...) - _, err = server.handler.encodeProto(nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, - }...) - }) + _, err = server.handler.encodeProto(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, + }...) + }) + } } func rateLimiting(rate int32) *api_v2.SamplingStrategyResponse { diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator.go b/plugin/sampling/strategyprovider/adaptive/aggregator.go index 97b8e69399c..5fe4fcd59dd 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator.go @@ -129,6 +129,23 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } } +func RecordThroughput(agg samplingstrategy.Aggregator, span *span_model.Span, logger *zap.Logger) { + // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} + func (a *aggregator) Start() { a.postAggregator.Start() diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go index bd34c3cb016..9e4247635d4 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go @@ -155,3 +155,44 @@ func TestRecordThroughput(t *testing.T) { a.HandleRootSpan(span, logger) assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) } + +func TestRecordThroughputFunc(t *testing.T) { + metricsFactory := metricstest.NewFactory(0) + mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + logger := zap.NewNop() + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) + + // Testing non-root span + span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name but no operation + span.References = []model.SpanRef{} + span.Process = &model.Process{ + ServiceName: "A", + } + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name and operation but no probabilistic sampling tags + span.OperationName = "GET" + RecordThroughput(a, span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name, operation, and probabilistic sampling tags + span.Tags = model.KeyValues{ + model.String("sampler.type", "probabilistic"), + model.String("sampler.param", "0.001"), + } + RecordThroughput(a, span, logger) + assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) +} diff --git a/plugin/sampling/strategyprovider/adaptive/options.go b/plugin/sampling/strategyprovider/adaptive/options.go index 64288632dbf..4eb13361443 100644 --- a/plugin/sampling/strategyprovider/adaptive/options.go +++ b/plugin/sampling/strategyprovider/adaptive/options.go @@ -53,7 +53,7 @@ const ( type Options struct { // TargetSamplesPerSecond is the global target rate of samples per operation. // TODO implement manual overrides per service/operation. - TargetSamplesPerSecond float64 + TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) @@ -62,23 +62,23 @@ type Options struct { // in the PID Controller terminology) to the sampler in the application. // // Increase this to reduce the amount of fluctuation in the calculated probabilities. - DeltaTolerance float64 + DeltaTolerance float64 `mapstructure:"delta_tolerance"` // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth // of aggregated throughput data. - CalculationInterval time.Duration + CalculationInterval time.Duration `mapstructure:"calculation_interval"` // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for // all operations. // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. - AggregationBuckets int + AggregationBuckets int `mapstructure:"aggregation_buckets"` // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. - BucketsForCalculation int + BucketsForCalculation int `mapstructure:"calculation_buckets"` // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time @@ -88,29 +88,45 @@ type Options struct { // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. - Delay time.Duration + Delay time.Duration `mapstructure:"calculation_delay"` // InitialSamplingProbability is the initial sampling probability for all new operations. - InitialSamplingProbability float64 + InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling // probability will be in the range [MinSamplingProbability, 1.0]. - MinSamplingProbability float64 + MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` // MinSamplesPerSecond determines the min number of traces that are sampled per second. // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations // that may never be sampled by the probabilistic sampler. - MinSamplesPerSecond float64 + MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval // to reduce lock thrashing. - LeaderLeaseRefreshInterval time.Duration + LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower // (ie. failed to gain the leader lock). - FollowerLeaseRefreshInterval time.Duration + FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` +} + +func DefaultOptions() Options { + return Options{ + TargetSamplesPerSecond: defaultTargetSamplesPerSecond, + DeltaTolerance: defaultDeltaTolerance, + BucketsForCalculation: defaultBucketsForCalculation, + CalculationInterval: defaultCalculationInterval, + AggregationBuckets: defaultAggregationBuckets, + Delay: defaultDelay, + InitialSamplingProbability: defaultInitialSamplingProbability, + MinSamplingProbability: defaultMinSamplingProbability, + MinSamplesPerSecond: defaultMinSamplesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + } } // AddFlags adds flags for Options diff --git a/plugin/sampling/strategyprovider/adaptive/options_test.go b/plugin/sampling/strategyprovider/adaptive/options_test.go index 1ab64c0589c..e441ad1472b 100644 --- a/plugin/sampling/strategyprovider/adaptive/options_test.go +++ b/plugin/sampling/strategyprovider/adaptive/options_test.go @@ -43,3 +43,18 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, time.Duration(5000000000), opts.LeaderLeaseRefreshInterval) assert.Equal(t, time.Duration(60000000000), opts.FollowerLeaseRefreshInterval) } + +func TestDefaultOptions(t *testing.T) { + options := DefaultOptions() + assert.Equal(t, float64(defaultTargetSamplesPerSecond), options.TargetSamplesPerSecond) + assert.Equal(t, defaultDeltaTolerance, options.DeltaTolerance) + assert.Equal(t, defaultBucketsForCalculation, options.BucketsForCalculation) + assert.Equal(t, defaultCalculationInterval, options.CalculationInterval) + assert.Equal(t, defaultAggregationBuckets, options.AggregationBuckets) + assert.Equal(t, defaultDelay, options.Delay) + assert.Equal(t, defaultInitialSamplingProbability, options.InitialSamplingProbability) + assert.Equal(t, defaultMinSamplingProbability, options.MinSamplingProbability) + assert.Equal(t, defaultMinSamplesPerSecond, options.MinSamplesPerSecond) + assert.Equal(t, defaultLeaderLeaseRefreshInterval, options.LeaderLeaseRefreshInterval) + assert.Equal(t, defaultFollowerLeaseRefreshInterval, options.FollowerLeaseRefreshInterval) +}