Skip to content

Commit

Permalink
Entities support prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Sep 27, 2024
1 parent 431fd11 commit 4b08f48
Show file tree
Hide file tree
Showing 153 changed files with 7,343 additions and 1,010 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ genproto: genproto-cleanup
# Call a sub-make to ensure OPENTELEMETRY_PROTO_FILES is populated
$(MAKE) genproto_sub
$(MAKE) fmt
$(MAKE) genproto-cleanup
# $(MAKE) genproto-cleanup

genproto_sub:
@echo Generating code for the following files:
Expand Down Expand Up @@ -234,8 +234,8 @@ genproto_sub:
cp -R $(PROTO_INTERMEDIATE_DIR)/$(PROTO_PACKAGE)/* $(PROTO_TARGET_GEN_DIR)/
rm -rf $(PROTO_INTERMEDIATE_DIR)/go.opentelemetry.io

@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true

# Generate structs, functions and tests for pdata package. Must be used after any changes
# to proto and after running `make genproto`
Expand Down
43 changes: 43 additions & 0 deletions consumer/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumer // import "go.opentelemetry.io/collector/consumer"

import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/pentity"
)

// Entities is an interface that receives pentity.Entities, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Entities interface {
internal.BaseConsumer
// ConsumeEntities receives pentity.Entities for consumption.
ConsumeEntities(ctx context.Context, td pentity.Entities) error
}

// ConsumeEntitiesFunc is a helper function that is similar to ConsumeEntities.
type ConsumeEntitiesFunc func(ctx context.Context, td pentity.Entities) error

// ConsumeEntities calls f(ctx, td).
func (f ConsumeEntitiesFunc) ConsumeEntities(ctx context.Context, td pentity.Entities) error {
return f(ctx, td)
}

type baseEntities struct {
*internal.BaseImpl
ConsumeEntitiesFunc
}

// NewEntities returns a Entities configured with the provided options.
func NewEntities(consume ConsumeEntitiesFunc, options ...Option) (Entities, error) {
if consume == nil {
return nil, errNilFunc
}
return &baseEntities{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeEntitiesFunc: consume,
}, nil
}
15 changes: 15 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/internal"
)

Expand All @@ -19,6 +20,12 @@ type Metrics = internal.Metrics
// Logs is an exporter that can consume logs.
type Logs = internal.Logs

// Entities is an exporter that can consume entities.
type Entities interface {
component.Component
consumer.Entities
}

// Settings configures exporter creators.
type Settings = internal.Settings

Expand All @@ -40,6 +47,9 @@ type CreateMetricsFunc = internal.CreateMetricsFunc
// CreateLogsFunc is the equivalent of Factory.CreateLogs.
type CreateLogsFunc = internal.CreateLogsFunc

// CreateEntitiesFunc is the equivalent of Factory.CreateEntities.
type CreateEntitiesFunc = internal.CreateEntitiesFunc

// WithTraces overrides the default "error not supported" implementation for CreateTracesExporter and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return internal.WithTraces(createTraces, sl)
Expand All @@ -55,6 +65,11 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
return internal.WithLogs(createLogs, sl)
}

// WithEntities overrides the default "error not supported" implementation for CreateEntitiesExporter and the default "undefined" stability level.
func WithEntities(createEntities CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return internal.WithEntities(createEntities, sl)
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
return internal.NewFactory(cfgType, createDefaultConfig, options...)
Expand Down
15 changes: 15 additions & 0 deletions exporter/internal/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/internal"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)

// Profiles is an exporter that can consume profiles.
type Entities interface {
component.Component
consumer.Entities
}
33 changes: 33 additions & 0 deletions exporter/internal/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ type Factory interface {
// ProfilesExporterStability gets the stability level of the ProfilesExporter.
ProfilesExporterStability() component.StabilityLevel

// CreateEntitiesExporter creates a EntitiesExporter based on this config.
// If the exporter type does not support entities,
// this function returns the error [pipeline.ErrSignalNotSupported].
CreateEntitiesExporter(ctx context.Context, set Settings, cfg component.Config) (Entities, error)

// EntitiesExporterStability gets the stability level of the EntitiesExporter.
EntitiesExporterStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -110,6 +118,17 @@ func (f CreateProfilesFunc) CreateProfilesExporter(ctx context.Context, set Sett
return f(ctx, set, cfg)
}

// CreateEntitiesFunc is the equivalent of Factory.CreateEntities.
type CreateEntitiesFunc func(context.Context, Settings, component.Config) (Entities, error)

// CreateEntitiesExporter implements Factory.CreateEntitiesExporter().
func (f CreateEntitiesFunc) CreateEntitiesExporter(ctx context.Context, set Settings, cfg component.Config) (Entities, error) {
if f == nil {
return nil, pipeline.ErrSignalNotSupported
}
return f(ctx, set, cfg)
}

type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
Expand All @@ -121,6 +140,8 @@ type factory struct {
logsStabilityLevel component.StabilityLevel
CreateProfilesFunc
profilesStabilityLevel component.StabilityLevel
CreateEntitiesFunc
entitiesStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -145,6 +166,10 @@ func (f *factory) ProfilesExporterStability() component.StabilityLevel {
return f.profilesStabilityLevel
}

func (f *factory) EntitiesExporterStability() component.StabilityLevel {
return f.entitiesStabilityLevel
}

// WithTraces overrides the default "error not supported" implementation for CreateTracesExporter and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand Down Expand Up @@ -177,6 +202,14 @@ func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel
})
}

// WithEntities overrides the default "error not supported" implementation for CreateEntitiesExporter and the default "undefined" stability level.
func WithEntities(createEntities CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.entitiesStabilityLevel = sl
o.CreateEntitiesFunc = createEntities
})
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
15 changes: 15 additions & 0 deletions exporter/internal/otlptext/databuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,21 @@ func (b *dataBuffer) logExemplars(description string, se pmetric.ExemplarSlice)
}
}

func (b *dataBuffer) logResourceEntities(rers pcommon.ResourceEntityRefSlice) {
if rers.Len() == 0 {
return
}

b.logEntry("Entities:")
for i := 0; i < rers.Len(); i++ {
rer := rers.At(i)
b.logEntry("Entity Ref #%d", i)
b.logEntry(" -> Entity Type: %s", rer.Type())
b.logEntry(" -> Identifying Attributes: %s", strings.Join(rer.IdAttrKeys().AsRaw(), ", "))
b.logEntry(" -> Descriptive Attributes: %s", strings.Join(rer.DescrAttrKeys().AsRaw(), ", "))
}
}

func valueToString(v pcommon.Value) string {
return fmt.Sprintf("%s(%s)", v.Type().String(), v.AsString())
}
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (textLogsMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) {
buf.logEntry("ResourceLog #%d", i)
rl := rls.At(i)
buf.logEntry("Resource SchemaURL: %s", rl.SchemaUrl())
buf.logAttributes("Resource attributes", rl.Resource().Attributes())
marshalResource(rl.Resource(), &buf)
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
buf.logEntry("ScopeLogs #%d", j)
Expand Down
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (textMetricsMarshaler) MarshalMetrics(md pmetric.Metrics) ([]byte, error) {
buf.logEntry("ResourceMetrics #%d", i)
rm := rms.At(i)
buf.logEntry("Resource SchemaURL: %s", rm.SchemaUrl())
buf.logAttributes("Resource attributes", rm.Resource().Attributes())
marshalResource(rm.Resource(), &buf)
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
buf.logEntry("ScopeMetrics #%d", j)
Expand Down
13 changes: 13 additions & 0 deletions exporter/internal/otlptext/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlptext // import "go.opentelemetry.io/collector/exporter/internal/otlptext"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

func marshalResource(res pcommon.Resource, buf *dataBuffer) {
buf.logAttributes("Resource attributes", res.Attributes())
buf.logResourceEntities(res.Entities())
}
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (textTracesMarshaler) MarshalTraces(td ptrace.Traces) ([]byte, error) {
buf.logEntry("ResourceSpans #%d", i)
rs := rss.At(i)
buf.logEntry("Resource SchemaURL: %s", rs.SchemaUrl())
buf.logAttributes("Resource attributes", rs.Resource().Attributes())
marshalResource(rs.Resource(), &buf)
ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
buf.logEntry("ScopeSpans #%d", j)
Expand Down
Loading

0 comments on commit 4b08f48

Please sign in to comment.