Skip to content

Commit

Permalink
Merge branch 'main' into grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
danish9039 authored Apr 6, 2024
2 parents fd5594c + 8a4748c commit de648af
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 104 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.97.0
github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.97.0
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.6.0
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.52.2
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.52.2 h1:LW8Vk7BccEdONfrJBDffQGRtpSzi5CQaRZGtboOO2ck=
github.com/prometheus/common v0.52.2/go.mod h1:lrWtQx+iDfn2mbH5GUzlH9TSHyfZpHkSiG1W7y3sF2Q=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
Expand Down
137 changes: 41 additions & 96 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package integration
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
Expand All @@ -28,19 +29,18 @@ import (
"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
estemplate "github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
"github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
)

const (
Expand All @@ -56,6 +56,8 @@ const (
spanTemplateName = "jaeger-span"
serviceTemplateName = "jaeger-service"
dependenciesTemplateName = "jaeger-dependencies"
primaryNamespace = "es"
archiveNamespace = "es-archive"
)

type ESStorageIntegration struct {
Expand All @@ -67,20 +69,6 @@ type ESStorageIntegration struct {
logger *zap.Logger
}

func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() {
if err := tp.Shutdown(context.Background()); err != nil {
s.logger.Error("failed to close tracer", zap.Error(err))
}
}
return tp, exporter, closer
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
pingResult, _, err := s.client.Ping(queryURL).Do(context.Background())
if err != nil {
Expand All @@ -99,7 +87,7 @@ func (s *ESStorageIntegration) getVersion() (uint, error) {
return uint(esVersion), nil
}

func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error {
func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
Expand All @@ -124,7 +112,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool)
// TODO: remove this flag after ES support returning spanKind when get operations
s.GetOperationsMissingSpanKind = true
s.SkipArchiveTest = false
return nil
}

func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
Expand Down Expand Up @@ -168,86 +155,44 @@ func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper
return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client)
}

func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) error {
client := s.getEsClient(t)
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: estemplate.TextTemplateBuilder{},
Shards: 5,
Replicas: 1,
EsVersion: client.GetVersion(),
IndexPrefix: indexPrefix,
UseILM: false,
func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
s.logger = zaptest.NewLogger(t)
f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
args := []string{
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.index-prefix=%v", indexPrefix),
"--es-archive.enabled=true",
fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix),
}
spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
require.NoError(t, err)
clientFn := func() estemplate.Client { return client }
require.NoError(t, command.ParseFlags(args))
f.InitFromViper(v, s.logger)
require.NoError(t, f.Initialize(metrics.NullFactory, s.logger))

// Initializing Span Reader and Writer
w := spanstore.NewSpanWriter(
spanstore.SpanWriterParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
AllTagsAsFields: allTagsAsFields,
TagDotReplacement: tagKeyDeDotChar,
Archive: false,
})
err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix)
require.NoError(t, err)
tracer, _, closer := s.tracerProvider()
defer closer()
s.SpanWriter = w
s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
MaxDocCount: defaultMaxDocCount,
Tracer: tracer.Tracer("test"),
Archive: false,
})

// Initializing Archive Span Reader and Writer
s.ArchiveSpanWriter = spanstore.NewSpanWriter(
spanstore.SpanWriterParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
AllTagsAsFields: allTagsAsFields,
TagDotReplacement: tagKeyDeDotChar,
Archive: true,
})
s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: clientFn,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
MaxDocCount: defaultMaxDocCount,
Tracer: tracer.Tracer("test"),
Archive: true,
})

dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{
Client: clientFn,
Logger: s.logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
MaxDocCount: defaultMaxDocCount,
})
// TODO ideally we need to close the factory once the test is finished
// but because esCleanup calls initialize() we get a panic later
// t.Cleanup(func() {
// require.NoError(t, f.Close())
// })
return f
}

depMapping, err := mappingBuilder.GetDependenciesMappings()
func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) {
f := s.initializeESFactory(t, allTagsAsFields)
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
err = dependencyStore.CreateTemplates(depMapping)
s.SpanReader, err = f.CreateSpanReader()
require.NoError(t, err)
s.DependencyReader = dependencyStore
s.DependencyWriter = dependencyStore
return nil
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)

s.DependencyReader, err = f.CreateDependencyReader()
require.NoError(t, err)
s.DependencyWriter = s.DependencyReader.(dependencystore.Writer)
}

func (s *ESStorageIntegration) esRefresh(t *testing.T) {
Expand Down
20 changes: 15 additions & 5 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,22 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) {

require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
require.NoError(t, err)
sort.Slice(actual, func(i, j int) bool {
return actual[i].Parent < actual[j].Parent

var actual []model.DependencyLink
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
require.NoError(t, err)
sort.Slice(actual, func(i, j int) bool {
return actual[i].Parent < actual[j].Parent
})
return assert.ObjectsAreEqualValues(expected, actual)
})
assert.EqualValues(t, expected, actual)

if !assert.True(t, found) {
t.Log("\t Expected:", expected)
t.Log("\t Actual :", actual)
}
}

// === Sampling Store Integration Tests ===
Expand Down

0 comments on commit de648af

Please sign in to comment.