From 7f99a8844d326e2a40ff8b296bd36cb84d7dc148 Mon Sep 17 00:00:00 2001 From: Jaegoo Kim Date: Sat, 20 Apr 2024 12:12:44 +0900 Subject: [PATCH] Create sampling templates when creating sampling store (#5349) ## Which problem is this PR solving? - Resolves #5333 ## Description of the changes - Create sampling templates when creating sampling store since doing so is intended. ## How was this change tested? - CI ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [ ] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: JaeguKim --- plugin/storage/es/factory.go | 55 +++++++++++------- plugin/storage/es/factory_test.go | 5 ++ plugin/storage/es/samplingstore/storage.go | 11 +--- .../storage/es/samplingstore/storage_test.go | 2 +- .../storage/integration/elasticsearch_test.go | 57 ++++--------------- plugin/storage/integration/integration.go | 14 ++--- storage/factory.go | 2 +- 7 files changed, 62 insertions(+), 84 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 8a6866c74ae..e95be48177d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -16,6 +16,7 @@ package es import ( + "context" "errors" "flag" "fmt" @@ -266,23 +267,6 @@ func createSpanWriter( return nil, err } - mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: es.TextTemplateBuilder{}, - Shards: cfg.NumShards, - Replicas: cfg.NumReplicas, - EsVersion: cfg.Version, - IndexPrefix: cfg.IndexPrefix, - UseILM: cfg.UseILM, - PrioritySpanTemplate: cfg.PrioritySpanTemplate, - PriorityServiceTemplate: cfg.PriorityServiceTemplate, - PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate, - } - - spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() - if err != nil { - return nil, err - } - writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: clientFn, IndexPrefix: cfg.IndexPrefix, @@ -299,16 +283,20 @@ func createSpanWriter( // Creating a template here would conflict with the one created for ILM resulting to no index rollover if cfg.CreateIndexTemplates && !cfg.UseILM { - err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix) + mappingBuilder := mappingBuilderFromConfig(cfg) + spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() if err != nil { return nil, err } + if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix); err != nil { + return nil, err + } } return writer, nil } func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { - store := esSampleStore.NewSamplingStore(esSampleStore.SamplingStoreParams{ + params := esSampleStore.SamplingStoreParams{ Client: f.getPrimaryClient, Logger: f.logger, IndexPrefix: f.primaryConfig.IndexPrefix, @@ -316,10 +304,37 @@ func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, erro IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(), Lookback: f.primaryConfig.AdaptiveSamplingLookback, MaxDocCount: f.primaryConfig.MaxDocCount, - }) + } + store := esSampleStore.NewSamplingStore(params) + + if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM { + mappingBuilder := mappingBuilderFromConfig(f.primaryConfig) + samplingMapping, err := mappingBuilder.GetSamplingMappings() + if err != nil { + return nil, err + } + if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil { + return nil, fmt.Errorf("failed to create template: %w", err) + } + } + return store, nil } +func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder { + return mappings.MappingBuilder{ + TemplateBuilder: es.TextTemplateBuilder{}, + Shards: cfg.NumShards, + Replicas: cfg.NumReplicas, + EsVersion: cfg.Version, + IndexPrefix: cfg.IndexPrefix, + UseILM: cfg.UseILM, + PrioritySpanTemplate: cfg.PrioritySpanTemplate, + PriorityServiceTemplate: cfg.PriorityServiceTemplate, + PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate, + } +} + func createDependencyReader( clientFn func() es.Client, cfg *config.Configuration, diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 0e424d699b9..e072b5bf19b 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -212,9 +212,14 @@ func TestCreateTemplateError(t *testing.T) { err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) defer f.Close() + w, err := f.CreateSpanWriter() assert.Nil(t, w) require.Error(t, err, "template-error") + + s, err := f.CreateSamplingStore(1) + assert.Nil(t, s) + require.Error(t, err, "template-error") } func TestILMDisableTemplateCreation(t *testing.T) { diff --git a/plugin/storage/es/samplingstore/storage.go b/plugin/storage/es/samplingstore/storage.go index c27eb34b126..9e0cf8750d3 100644 --- a/plugin/storage/es/samplingstore/storage.go +++ b/plugin/storage/es/samplingstore/storage.go @@ -29,7 +29,7 @@ import ( ) const ( - samplingIndex = "jaeger-sampling-" + samplingIndex = "jaeger-sampling" throughputType = "throughput-sampling" probabilitiesType = "probabilities-sampling" indexPrefixSeparator = "-" @@ -59,7 +59,7 @@ func NewSamplingStore(p SamplingStoreParams) *SamplingStore { return &SamplingStore{ client: p.Client, logger: p.Logger, - samplingIndexPrefix: p.prefixIndexName(), + samplingIndexPrefix: p.PrefixedIndexName() + indexPrefixSeparator, indexDateLayout: p.IndexDateLayout, maxDocCount: p.MaxDocCount, indexRolloverFrequency: p.IndexRolloverFrequency, @@ -161,11 +161,6 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time, }).Add() } -func (s *SamplingStore) CreateTemplates(samplingTemplate string) error { - _, err := s.client().CreateTemplate("jaeger-sampling").Body(samplingTemplate).Do(context.Background()) - return err -} - func getLatestIndices(indexPrefix, indexDateLayout string, clientFn es.Client, rollover time.Duration, maxDuration time.Duration) ([]string, error) { ctx := context.Background() now := time.Now().UTC() @@ -200,7 +195,7 @@ func getReadIndices(indexName, indexDateLayout string, startTime time.Time, endT return indices } -func (p *SamplingStoreParams) prefixIndexName() string { +func (p *SamplingStoreParams) PrefixedIndexName() string { if p.IndexPrefix != "" { return p.IndexPrefix + indexPrefixSeparator + samplingIndex } diff --git a/plugin/storage/es/samplingstore/storage_test.go b/plugin/storage/es/samplingstore/storage_test.go index 7950cfde7dc..f0d171119c0 100644 --- a/plugin/storage/es/samplingstore/storage_test.go +++ b/plugin/storage/es/samplingstore/storage_test.go @@ -88,7 +88,7 @@ func TestNewIndexPrefix(t *testing.T) { IndexDateLayout: "2006-01-02", MaxDocCount: defaultMaxDocCount, }) - assert.Equal(t, test.expected+samplingIndex, r.samplingIndexPrefix) + assert.Equal(t, test.expected+samplingIndex+indexPrefixSeparator, r.samplingIndexPrefix) }) } } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index f40d5790fc3..2f0e1cc2fe1 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -33,13 +33,9 @@ import ( "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/pkg/config" - estemplate "github.com/jaegertracing/jaeger/pkg/es" - eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" - "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" - "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" "github.com/jaegertracing/jaeger/storage/dependencystore" ) @@ -63,10 +59,9 @@ const ( type ESStorageIntegration struct { StorageIntegration - client *elastic.Client - v8Client *elasticsearch8.Client - bulkProcessor *elastic.BulkProcessor - logger *zap.Logger + client *elastic.Client + v8Client *elasticsearch8.Client + logger *zap.Logger } func (s *ESStorageIntegration) getVersion() (uint, error) { @@ -102,7 +97,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) require.NoError(t, err) s.initSpanstore(t, allTagsAsFields) - s.initSamplingStore(t) s.CleanUp = func(t *testing.T) { s.esCleanUp(t, allTagsAsFields) @@ -120,48 +114,18 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) { s.initSpanstore(t, allTagsAsFields) } -func (s *ESStorageIntegration) initSamplingStore(t *testing.T) { - client := s.getEsClient(t) - mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: estemplate.TextTemplateBuilder{}, - Shards: 5, - Replicas: 1, - EsVersion: client.GetVersion(), - IndexPrefix: indexPrefix, - UseILM: false, - } - clientFn := func() estemplate.Client { return client } - samplingstore := samplingstore.NewSamplingStore( - samplingstore.SamplingStoreParams{ - Client: clientFn, - Logger: s.logger, - IndexPrefix: indexPrefix, - IndexDateLayout: indexDateLayout, - MaxDocCount: defaultMaxDocCount, - }) - sampleMapping, err := mappingBuilder.GetSamplingMappings() - require.NoError(t, err) - err = samplingstore.CreateTemplates(sampleMapping) - require.NoError(t, err) - s.SamplingStore = samplingstore -} - -func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper { - bp, err := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) - require.NoError(t, err) - s.bulkProcessor = bp - esVersion, err := s.getVersion() - require.NoError(t, err) - return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client) -} - func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { s.logger = zaptest.NewLogger(t) f := es.NewFactory() v, command := config.Viperize(f.AddFlags) args := []string{ - fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es.num-shards=%v", 5), + fmt.Sprintf("--es.num-replicas=%v", 1), fmt.Sprintf("--es.index-prefix=%v", indexPrefix), + fmt.Sprintf("--es.use-ilm=%v", false), + fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es.bulk.actions=%v", 1), + fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond), "--es-archive.enabled=true", fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), @@ -193,6 +157,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) s.DependencyReader, err = f.CreateDependencyReader() require.NoError(t, err) s.DependencyWriter = s.DependencyReader.(dependencystore.Writer) + + s.SamplingStore, err = f.CreateSamplingStore(1) + require.NoError(t, err) } func healthCheck() error { diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 211a2e4e963..b8654e13a1e 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -316,13 +316,16 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.Tr traces, err = s.SpanReader.FindTraces(context.Background(), query) require.NoError(t, err) if len(expected) != len(traces) { - t.Logf("FindTraces: expected: %d, actual: %d", len(expected), len(traces)) + t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces)) + return false + } + if spanCount(expected) != spanCount(traces) { + t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces)) return false } return true }) require.True(t, found) - tracesMatch(t, traces, expected) return traces } @@ -433,13 +436,6 @@ func correctTime(json []byte) []byte { return []byte(retString) } -func tracesMatch(t *testing.T, actual []*model.Trace, expected []*model.Trace) bool { - if !assert.Equal(t, len(expected), len(actual), "Expecting certain number of traces") { - return false - } - return assert.Equal(t, spanCount(expected), spanCount(actual), "Expecting certain number of spans") -} - func spanCount(traces []*model.Trace) int { var count int for _, trace := range traces { diff --git a/storage/factory.go b/storage/factory.go index f6d3394840b..933d0dce27d 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -23,7 +23,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/distributedlock" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/storage/dependencystore" - metricsstore "github.com/jaegertracing/jaeger/storage/metricsstore" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" )