Skip to content

Commit

Permalink
WIP: Passed metric storage to the query service
Browse files Browse the repository at this point in the history
Signed-off-by: FlamingSaint <[email protected]>
  • Loading branch information
FlamingSaint committed Jul 14, 2024
1 parent 865d2ee commit c6abf8b
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func createMetricsQueryService(
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {
if err := metricsReaderFactory.Initialize(logger); err != nil {
if err := metricsReaderFactory.InitializeMetricsFactory(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

Expand Down
12 changes: 10 additions & 2 deletions cmd/jaeger/internal/exporters/storageexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

type mockStorageExt struct {
name string
factory *factoryMocks.Factory
name string
factory *factoryMocks.Factory
metricsFactory *factoryMocks.MetricsFactory
}

func (*mockStorageExt) Start(context.Context, component.Host) error {
Expand All @@ -58,6 +59,13 @@ func (m *mockStorageExt) Factory(name string) (storage.Factory, bool) {
return nil, false
}

func (m *mockStorageExt) MetricsFactory(name string) (storage.MetricsFactory, bool) {
if m.name == name {
return m.metricsFactory, true
}
return nil, false
}

func TestExporterConfigError(t *testing.T) {
config := createDefaultConfig().(*Config)
err := config.Validate()
Expand Down
32 changes: 29 additions & 3 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

var (
Expand Down Expand Up @@ -69,7 +69,14 @@ func (s *server) Start(_ context.Context, host component.Host) error {
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
metricsQueryService, _ := disabled.NewMetricsReader()

var mqs querysvc.MetricsQueryService
mqs, err = s.addMetricStorage(&opts, host)
if err != nil {
return err
}

// metricsQueryService, _ := disabled.NewMetricsReader()
tm := tenancy.NewManager(&s.config.Tenancy)

// TODO OTel-collector does not initialize the tracer currently
Expand All @@ -87,7 +94,7 @@ func (s *server) Start(_ context.Context, host component.Host) error {
// TODO propagate healthcheck updates up to the collector's runtime
healthcheck.New(),
qs,
metricsQueryService,
mqs,
s.makeQueryOptions(),
tm,
s.jtracer,
Expand Down Expand Up @@ -120,6 +127,25 @@ func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host comp
return nil
}

func (s *server) addMetricStorage(opts *querysvc.QueryServiceOptions, host component.Host) (metricsstore.Reader, error) {
if s.config.MetricStorage == "" {
s.logger.Info("Metric storage not configured")
return nil, nil
}

mf, err := jaegerstorage.GetMetricsFactory(s.config.MetricStorage, host)
if err != nil {
return nil, fmt.Errorf("cannot find metrics storage factory: %w", err)
}

metricsReader, ok := opts.InitMetricStorage(mf, s.logger)
if !ok {
s.logger.Info("Metric storage not initialized")
return nil, nil
}
return metricsReader, nil
}

func (s *server) makeQueryOptions() *queryApp.QueryOptions {
return &queryApp.QueryOptions{
QueryOptionsBase: s.config.QueryOptionsBase,
Expand Down
23 changes: 23 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/metricsstore"
metricsstoremocks "github.com/jaegertracing/jaeger/storage/metricsstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand Down Expand Up @@ -51,13 +53,27 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) CreateMetricsReader() (metricsstore.Reader, error) {
if ff.name == "need-span-writer-error" {
return nil, fmt.Errorf("test-error")
}
return &metricsstoremocks.Reader{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return fmt.Errorf("test-error")
}
return nil
}

func (ff fakeFactory) InitializeMetricsFactory(*zap.Logger) error {
if ff.name == "need-initialize-error" {
return fmt.Errorf("test-error")
}
return nil
}

type fakeStorageExt struct{}

var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
Expand All @@ -69,6 +85,13 @@ func (fakeStorageExt) Factory(name string) (storage.Factory, bool) {
return fakeFactory{name: name}, true
}

func (fakeStorageExt) MetricsFactory(name string) (storage.MetricsFactory, bool) {
if name == "need-factory-error" {
return nil, false
}
return fakeFactory{name: name}, true
}

func (fakeStorageExt) Start(context.Context, component.Host) error {
return nil
}
Expand Down
31 changes: 31 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var _ Extension = (*storageExt)(nil)
type Extension interface {
extension.Extension
Factory(name string) (storage.Factory, bool)
MetricsFactory(name string) (storage.MetricsFactory, bool)
}

type storageExt struct {
Expand Down Expand Up @@ -63,6 +64,31 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error
return f, nil
}

// GetMetricsFactory locates the extension in Host and retrieves a metrics factory from it with the given name.
func GetMetricsFactory(name string, host component.Host) (storage.MetricsFactory, error) {
var comp component.Component
for id, ext := range host.GetExtensions() {
if id.Type() == componentType {
comp = ext
break
}
}
if comp == nil {
return nil, fmt.Errorf(
"cannot find extension '%s' (make sure it's defined earlier in the config)",
componentType,
)
}
mf, ok := comp.(Extension).MetricsFactory(name)
if !ok {
return nil, fmt.Errorf(
"cannot find metric storage '%s' declared by '%s' extension",
name, componentType,
)
}
return mf, nil
}

func GetStorageFactoryV2(name string, host component.Host) (spanstore.Factory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
Expand Down Expand Up @@ -141,3 +167,8 @@ func (s *storageExt) Factory(name string) (storage.Factory, bool) {
f, ok := s.factories[name]
return f, ok
}

func (s *storageExt) MetricsFactory(name string) (storage.MetricsFactory, bool) {
mf, ok := s.metricsFactories[name]
return mf, ok
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
var _ jaegerstorage.Extension = (*mockStorageExt)(nil)

type mockStorageExt struct {
name string
factory *factoryMocks.Factory
name string
factory *factoryMocks.Factory
metricsFactory *factoryMocks.MetricsFactory
}

func (*mockStorageExt) Start(context.Context, component.Host) error {
Expand All @@ -48,6 +49,13 @@ func (m *mockStorageExt) Factory(name string) (storage.Factory, bool) {
return nil, false
}

func (m *mockStorageExt) MetricsFactory(name string) (storage.MetricsFactory, bool) {
if m.name == name {
return m.metricsFactory, true
}
return nil, false
}

type receiverTest struct {
storageName string
receiveName string
Expand Down
12 changes: 10 additions & 2 deletions cmd/jaeger/internal/integration/storagecleaner/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func (f *PurgerFactory) Purge(_ context.Context) error {
}

type mockStorageExt struct {
name string
factory storage.Factory
name string
factory storage.Factory
metricsFactory storage.MetricsFactory
}

func (*mockStorageExt) Start(context.Context, component.Host) error {
Expand All @@ -55,6 +56,13 @@ func (m *mockStorageExt) Factory(name string) (storage.Factory, bool) {
return nil, false
}

func (m *mockStorageExt) MetricsFactory(name string) (storage.MetricsFactory, bool) {
if m.name == name {
return m.metricsFactory, true
}
return nil, false
}

func TestStorageCleanerExtension(t *testing.T) {
tests := []struct {
name string
Expand Down
23 changes: 23 additions & 0 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -38,12 +39,14 @@ const (
type QueryServiceOptions struct {
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
MetricsReader metricsstore.Reader
Adjuster adjuster.Adjuster
}

// StorageCapabilities is a feature flag for query service
type StorageCapabilities struct {
ArchiveStorage bool `json:"archiveStorage"`
// TODO: Maybe add metrics Storage here
// SupportRegex bool
// SupportTagFilter bool
}
Expand Down Expand Up @@ -166,6 +169,26 @@ func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Facto
return true
}

// InitMetricStorage tries to initialize metric storage reader if storage factory supports them.
func (opts *QueryServiceOptions) InitMetricStorage(metricsFactory storage.MetricsFactory, logger *zap.Logger) (metricsstore.Reader, bool) {
// metricsFactory = metricsFactory.(storage.MetricsFactory)
// if !ok {
// logger.Info("Metric storage not supported by the factory")
// return false
// }
reader, err := metricsFactory.CreateMetricsReader()
if errors.Is(err, storage.ErrMetricStorageNotConfigured) || errors.Is(err, storage.ErrMetricStorageNotSupported) {
logger.Info("Metric storage not created", zap.String("reason", err.Error()))
return nil, false
}
if err != nil {
logger.Error("Cannot init metric storage reader", zap.Error(err))
return nil, false
}
opts.MetricsReader = reader
return reader, true
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptions) hasArchiveStorage() bool {
return opts.ArchiveSpanReader != nil && opts.ArchiveSpanWriter != nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func createMetricsQueryService(
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
) (querysvc.MetricsQueryService, error) {
if err := metricsReaderFactory.Initialize(logger); err != nil {
if err := metricsReaderFactory.InitializeMetricsFactory(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/metrics/disabled/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (*Factory) AddFlags(_ *flag.FlagSet) {}
func (*Factory) InitFromViper(_ *viper.Viper, _ *zap.Logger) {}

// Initialize implements storage.MetricsFactory.
func (*Factory) Initialize(_ *zap.Logger) error {
func (*Factory) InitializeMetricsFactory(_ *zap.Logger) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/metrics/disabled/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ var _ storage.MetricsFactory = new(Factory)

func TestPrometheusFactory(t *testing.T) {
f := NewFactory()
require.NoError(t, f.Initialize(zap.NewNop()))
require.NoError(t, f.InitializeMetricsFactory(zap.NewNop()))

err := f.Initialize(nil)
err := f.InitializeMetricsFactory(nil)
require.NoError(t, err)

f.AddFlags(nil)
Expand Down
4 changes: 2 additions & 2 deletions plugin/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.MetricsFactory, er
}

// Initialize implements storage.MetricsFactory.
func (f *Factory) Initialize(logger *zap.Logger) error {
func (f *Factory) InitializeMetricsFactory(logger *zap.Logger) error {
for _, factory := range f.factories {
factory.Initialize(logger)
factory.InitializeMetricsFactory(logger)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/metrics/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCreateMetricsReader(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, f)

require.NoError(t, f.Initialize(zap.NewNop()))
require.NoError(t, f.InitializeMetricsFactory(zap.NewNop()))

reader, err := f.CreateMetricsReader()
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions plugin/metrics/prometheus/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
}

// Initialize implements storage.MetricsFactory.
func (f *Factory) Initialize(logger *zap.Logger) error {
func (f *Factory) InitializeMetricsFactory(logger *zap.Logger) error {
f.logger = logger
return nil
}
Expand All @@ -80,7 +80,7 @@ func NewFactoryWithConfig(
f.options = &Options{
Configuration: cfg,
}
err := f.Initialize(logger)
err := f.InitializeMetricsFactory(logger)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/metrics/prometheus/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var _ storage.MetricsFactory = new(Factory)

func TestPrometheusFactory(t *testing.T) {
f := NewFactory()
require.NoError(t, f.Initialize(zap.NewNop()))
require.NoError(t, f.InitializeMetricsFactory(zap.NewNop()))
assert.NotNil(t, f.logger)

listener, err := net.Listen("tcp", "localhost:")
Expand Down
10 changes: 9 additions & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ type ArchiveFactory interface {
CreateArchiveSpanWriter() (spanstore.Writer, error)
}

var (
// ErrMetricStorageNotConfigured can be returned by the MetricsFactory when the metric storage is not configured.
ErrMetricStorageNotConfigured = errors.New("Metric storage not configured")

// ErrMetricStorageNotSupported can be returned by the MetricsFactory when the metric storage is not supported by the backend.
ErrMetricStorageNotSupported = errors.New("Metric storage not supported")
)

// MetricsFactory defines an interface for a factory that can create implementations of different metrics storage components.
// Implementations are also encouraged to implement plugin.Configurable interface.
//
Expand All @@ -92,7 +100,7 @@ type ArchiveFactory interface {
type MetricsFactory interface {
// Initialize performs internal initialization of the factory, such as opening connections to the backend store.
// It is called after all configuration of the factory itself has been done.
Initialize(logger *zap.Logger) error
InitializeMetricsFactory(logger *zap.Logger) error // Tests will fail as MetricsFactory and Factory Interface require Initialize but with different param's

// CreateMetricsReader creates a metricsstore.Reader.
CreateMetricsReader() (metricsstore.Reader, error)
Expand Down
Loading

0 comments on commit c6abf8b

Please sign in to comment.