From 5497453ea8cb9203bb3ed5c73919bbcd7ab56593 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 18:19:16 +0530 Subject: [PATCH] Intialized ES factory Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 130 +++++++++++------- 1 file changed, 78 insertions(+), 52 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 7a92224d906..afaeb6d69f3 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -29,15 +29,13 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" @@ -66,21 +64,22 @@ type ESStorageIntegration struct { v8Client *elasticsearch8.Client bulkProcessor *elastic.BulkProcessor logger *zap.Logger + factory *es.Factory } -func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { - exporter := tracetest.NewInMemoryExporter() - tp := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithSyncer(exporter), - ) - closer := func() { - if err := tp.Shutdown(context.Background()); err != nil { - s.logger.Error("failed to close tracer", zap.Error(err)) - } - } - return tp, exporter, closer -} +// func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { +// exporter := tracetest.NewInMemoryExporter() +// tp := sdktrace.NewTracerProvider( +// sdktrace.WithSampler(sdktrace.AlwaysSample()), +// sdktrace.WithSyncer(exporter), +// ) +// closer := func() { +// if err := tp.Shutdown(context.Background()); err != nil { +// s.logger.Error("failed to close tracer", zap.Error(err)) +// } +// } +// return tp, exporter, closer +// } func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) @@ -101,12 +100,38 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { } func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error { + // Intialize ES Factory + s.factory = es.NewFactory() + err := s.factory.Initialize(metrics.NullFactory, zap.NewNop()) + if err != nil { + return err + } + + // Create Span Writer and Reader + s.SpanWriter, err = s.factory.CreateSpanWriter() + if err != nil { + return err + } + s.SpanReader, err = s.factory.CreateSpanReader() + if err != nil { + return err + } + + // Create Archive Span Writer and Reader + s.ArchiveSpanWriter, err = s.factory.CreateArchiveSpanWriter() + if err != nil { + return err + } + s.ArchiveSpanReader, err = s.factory.CreateArchiveSpanReader() + if err != nil { + return err + } + rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetSniff(false)) require.NoError(t, err) s.logger, _ = testutils.NewLogger() - s.client = rawClient s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ Addresses: []string{queryURL}, @@ -185,6 +210,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) clientFn := func() estemplate.Client { return client } // Initializing Span Reader and Writer + w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ Client: clientFn, @@ -197,43 +223,43 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) }) err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) require.NoError(t, err) - tracer, _, closer := s.tracerProvider() - defer closer() - s.SpanWriter = w - s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, - TagDotReplacement: tagKeyDeDotChar, - MaxDocCount: defaultMaxDocCount, - Tracer: tracer.Tracer("test"), - Archive: false, - }) + // tracer, _, closer := s.tracerProvider() + // defer closer() + // s.SpanWriter = w + // s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // MaxSpanAge: maxSpanAge, + // TagDotReplacement: tagKeyDeDotChar, + // MaxDocCount: defaultMaxDocCount, + // Tracer: tracer.Tracer("test"), + // Archive: false, + // }) // Initializing Archive Span Reader and Writer - s.ArchiveSpanWriter = spanstore.NewSpanWriter( - spanstore.SpanWriterParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - AllTagsAsFields: allTagsAsFields, - TagDotReplacement: tagKeyDeDotChar, - Archive: true, - }) - s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, - TagDotReplacement: tagKeyDeDotChar, - MaxDocCount: defaultMaxDocCount, - Tracer: tracer.Tracer("test"), - Archive: true, - }) + // s.ArchiveSpanWriter = spanstore.NewSpanWriter( + // spanstore.SpanWriterParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // AllTagsAsFields: allTagsAsFields, + // TagDotReplacement: tagKeyDeDotChar, + // Archive: true, + // }) + // s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // MaxSpanAge: maxSpanAge, + // TagDotReplacement: tagKeyDeDotChar, + // MaxDocCount: defaultMaxDocCount, + // Tracer: tracer.Tracer("test"), + // Archive: true, + // }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: clientFn,