Skip to content

Commit

Permalink
Create sampling templates when creating sampling store (#5349)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #5333

## Description of the changes
- Create sampling templates when creating sampling store since doing so
is intended.

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: JaeguKim <[email protected]>
  • Loading branch information
JaeguKim authored Apr 20, 2024
1 parent 466e105 commit 7f99a88
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 84 deletions.
55 changes: 35 additions & 20 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package es

import (
"context"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -266,23 +267,6 @@ func createSpanWriter(
return nil, err
}

mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
PrioritySpanTemplate: cfg.PrioritySpanTemplate,
PriorityServiceTemplate: cfg.PriorityServiceTemplate,
PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate,
}

spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
if err != nil {
return nil, err
}

writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: clientFn,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -299,27 +283,58 @@ func createSpanWriter(

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
if cfg.CreateIndexTemplates && !cfg.UseILM {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix)
mappingBuilder := mappingBuilderFromConfig(cfg)
spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
if err != nil {
return nil, err
}
if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix); err != nil {
return nil, err
}
}
return writer, nil
}

func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
store := esSampleStore.NewSamplingStore(esSampleStore.SamplingStoreParams{
params := esSampleStore.SamplingStoreParams{
Client: f.getPrimaryClient,
Logger: f.logger,
IndexPrefix: f.primaryConfig.IndexPrefix,
IndexDateLayout: f.primaryConfig.IndexDateLayoutSampling,
IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(),
Lookback: f.primaryConfig.AdaptiveSamplingLookback,
MaxDocCount: f.primaryConfig.MaxDocCount,
})
}
store := esSampleStore.NewSamplingStore(params)

if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.primaryConfig)
samplingMapping, err := mappingBuilder.GetSamplingMappings()
if err != nil {
return nil, err
}
if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil {
return nil, fmt.Errorf("failed to create template: %w", err)
}
}

return store, nil
}

func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder {
return mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
PrioritySpanTemplate: cfg.PrioritySpanTemplate,
PriorityServiceTemplate: cfg.PriorityServiceTemplate,
PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate,
}
}

func createDependencyReader(
clientFn func() es.Client,
cfg *config.Configuration,
Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,14 @@ func TestCreateTemplateError(t *testing.T) {
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close()

w, err := f.CreateSpanWriter()
assert.Nil(t, w)
require.Error(t, err, "template-error")

s, err := f.CreateSamplingStore(1)
assert.Nil(t, s)
require.Error(t, err, "template-error")
}

func TestILMDisableTemplateCreation(t *testing.T) {
Expand Down
11 changes: 3 additions & 8 deletions plugin/storage/es/samplingstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
samplingIndex = "jaeger-sampling-"
samplingIndex = "jaeger-sampling"
throughputType = "throughput-sampling"
probabilitiesType = "probabilities-sampling"
indexPrefixSeparator = "-"
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewSamplingStore(p SamplingStoreParams) *SamplingStore {
return &SamplingStore{
client: p.Client,
logger: p.Logger,
samplingIndexPrefix: p.prefixIndexName(),
samplingIndexPrefix: p.PrefixedIndexName() + indexPrefixSeparator,
indexDateLayout: p.IndexDateLayout,
maxDocCount: p.MaxDocCount,
indexRolloverFrequency: p.IndexRolloverFrequency,
Expand Down Expand Up @@ -161,11 +161,6 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time,
}).Add()
}

func (s *SamplingStore) CreateTemplates(samplingTemplate string) error {
_, err := s.client().CreateTemplate("jaeger-sampling").Body(samplingTemplate).Do(context.Background())
return err
}

func getLatestIndices(indexPrefix, indexDateLayout string, clientFn es.Client, rollover time.Duration, maxDuration time.Duration) ([]string, error) {
ctx := context.Background()
now := time.Now().UTC()
Expand Down Expand Up @@ -200,7 +195,7 @@ func getReadIndices(indexName, indexDateLayout string, startTime time.Time, endT
return indices
}

func (p *SamplingStoreParams) prefixIndexName() string {
func (p *SamplingStoreParams) PrefixedIndexName() string {
if p.IndexPrefix != "" {
return p.IndexPrefix + indexPrefixSeparator + samplingIndex
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/samplingstore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestNewIndexPrefix(t *testing.T) {
IndexDateLayout: "2006-01-02",
MaxDocCount: defaultMaxDocCount,
})
assert.Equal(t, test.expected+samplingIndex, r.samplingIndexPrefix)
assert.Equal(t, test.expected+samplingIndex+indexPrefixSeparator, r.samplingIndexPrefix)
})
}
}
Expand Down
57 changes: 12 additions & 45 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ import (
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
estemplate "github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
"github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
)

Expand All @@ -63,10 +59,9 @@ const (
type ESStorageIntegration struct {
StorageIntegration

client *elastic.Client
v8Client *elasticsearch8.Client
bulkProcessor *elastic.BulkProcessor
logger *zap.Logger
client *elastic.Client
v8Client *elasticsearch8.Client
logger *zap.Logger
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
Expand Down Expand Up @@ -102,7 +97,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool)
require.NoError(t, err)

s.initSpanstore(t, allTagsAsFields)
s.initSamplingStore(t)

s.CleanUp = func(t *testing.T) {
s.esCleanUp(t, allTagsAsFields)
Expand All @@ -120,48 +114,18 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
s.initSpanstore(t, allTagsAsFields)
}

func (s *ESStorageIntegration) initSamplingStore(t *testing.T) {
client := s.getEsClient(t)
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: estemplate.TextTemplateBuilder{},
Shards: 5,
Replicas: 1,
EsVersion: client.GetVersion(),
IndexPrefix: indexPrefix,
UseILM: false,
}
clientFn := func() estemplate.Client { return client }
samplingstore := samplingstore.NewSamplingStore(
samplingstore.SamplingStoreParams{
Client: clientFn,
Logger: s.logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
MaxDocCount: defaultMaxDocCount,
})
sampleMapping, err := mappingBuilder.GetSamplingMappings()
require.NoError(t, err)
err = samplingstore.CreateTemplates(sampleMapping)
require.NoError(t, err)
s.SamplingStore = samplingstore
}

func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper {
bp, err := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background())
require.NoError(t, err)
s.bulkProcessor = bp
esVersion, err := s.getVersion()
require.NoError(t, err)
return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client)
}

func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
s.logger = zaptest.NewLogger(t)
f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
args := []string{
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.num-shards=%v", 5),
fmt.Sprintf("--es.num-replicas=%v", 1),
fmt.Sprintf("--es.index-prefix=%v", indexPrefix),
fmt.Sprintf("--es.use-ilm=%v", false),
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.bulk.actions=%v", 1),
fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond),
"--es-archive.enabled=true",
fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix),
Expand Down Expand Up @@ -193,6 +157,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
s.DependencyReader, err = f.CreateDependencyReader()
require.NoError(t, err)
s.DependencyWriter = s.DependencyReader.(dependencystore.Writer)

s.SamplingStore, err = f.CreateSamplingStore(1)
require.NoError(t, err)
}

func healthCheck() error {
Expand Down
14 changes: 5 additions & 9 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,16 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.Tr
traces, err = s.SpanReader.FindTraces(context.Background(), query)
require.NoError(t, err)
if len(expected) != len(traces) {
t.Logf("FindTraces: expected: %d, actual: %d", len(expected), len(traces))
t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces))
return false
}
if spanCount(expected) != spanCount(traces) {
t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces))
return false
}
return true
})
require.True(t, found)
tracesMatch(t, traces, expected)
return traces
}

Expand Down Expand Up @@ -433,13 +436,6 @@ func correctTime(json []byte) []byte {
return []byte(retString)
}

func tracesMatch(t *testing.T, actual []*model.Trace, expected []*model.Trace) bool {
if !assert.Equal(t, len(expected), len(actual), "Expecting certain number of traces") {
return false
}
return assert.Equal(t, spanCount(expected), spanCount(actual), "Expecting certain number of spans")
}

func spanCount(traces []*model.Trace) int {
var count int
for _, trace := range traces {
Expand Down
2 changes: 1 addition & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/storage/dependencystore"
metricsstore "github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down

0 comments on commit 7f99a88

Please sign in to comment.