Skip to content

Commit

Permalink
Entities support prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Oct 2, 2024
1 parent 69ff46b commit d15292c
Show file tree
Hide file tree
Showing 158 changed files with 7,395 additions and 1,012 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ genproto: genproto-cleanup
# Call a sub-make to ensure OPENTELEMETRY_PROTO_FILES is populated
$(MAKE) genproto_sub
$(MAKE) fmt
$(MAKE) genproto-cleanup
# $(MAKE) genproto-cleanup

genproto_sub:
@echo Generating code for the following files:
Expand Down Expand Up @@ -234,8 +234,8 @@ genproto_sub:
cp -R $(PROTO_INTERMEDIATE_DIR)/$(PROTO_PACKAGE)/* $(PROTO_TARGET_GEN_DIR)/
rm -rf $(PROTO_INTERMEDIATE_DIR)/go.opentelemetry.io

@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/*
#@rm -rf $(OPENTELEMETRY_PROTO_SRC_DIR)/.* > /dev/null 2>&1 || true

# Generate structs, functions and tests for pdata package. Must be used after any changes
# to proto and after running `make genproto`
Expand Down
11 changes: 11 additions & 0 deletions cmd/mdatagen/internal/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ func (t telemetry) Levels() map[string]interface{} {
return levels
}

type Entity struct {
// Type of the entity.
Type string `mapstructure:"type"`
// Identifying attributes of the entity.
IDAttributes []AttributeName `mapstructure:"id_attributes"`
// Descriptive attributes of the entity.
DescriptiveAttributes []AttributeName `mapstructure:"descriptive_attributes"`
}

type Metadata struct {
// Type of the component.
Type string `mapstructure:"type"`
Expand All @@ -282,6 +291,8 @@ type Metadata struct {
SemConvVersion string `mapstructure:"sem_conv_version"`
// ResourceAttributes that can be emitted by the component.
ResourceAttributes map[AttributeName]Attribute `mapstructure:"resource_attributes"`
// Entities associated with the emitted resource attributes.
Entities []Entity `mapstructure:"entities"`
// Attributes emitted by one or more metrics.
Attributes map[AttributeName]Attribute `mapstructure:"attributes"`
// Metrics that can be emitted by the component.
Expand Down
16 changes: 16 additions & 0 deletions cmd/mdatagen/internal/templates/resource.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func (rb *ResourceBuilder) Set{{ $name.Render }}(val {{ $attr.Type.Primitive }})
// Emit returns the built resource and resets the internal builder state.
func (rb *ResourceBuilder) Emit() pcommon.Resource {
r := rb.res
{{- range $entity := .Entities }}
{{- range $attr := .IDAttributes }}
_, found{{ $attr.Render }} := r.Attributes().Get("{{ $attr }}")
{{- end }}
if {{ range $i, $attr := .IDAttributes }}{{ if $i }}&& {{ end }}found{{ $attr.Render }} {{ end }} {
ref := pcommon.NewResourceEntityRef()
ref.SetType("{{ $entity.Type }}")
ref.IdAttrKeys().Append({{ range $i, $attr := .IDAttributes }}{{ if $i }}, {{ end }}"{{ $attr }}"{{ end }})
{{- range $attr := .DescriptiveAttributes }}
if _, ok := r.Attributes().Get("{{ $attr }}"); ok {
ref.DescrAttrKeys().Append("{{ $attr }}")
}
{{- end }}
ref.CopyTo(r.Entities().AppendEmpty())
}
{{- end }}
rb.res = pcommon.NewResource()
return r
}
15 changes: 15 additions & 0 deletions cmd/mdatagen/internal/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (md *Metadata) Validate() error {
if err := md.validateMetrics(); err != nil {
errs = errors.Join(errs, err)
}
if err := md.validateEntities(); err != nil {
errs = errors.Join(errs, err)

Check warning on line 29 in cmd/mdatagen/internal/validate.go

View check run for this annotation

Codecov / codecov/patch

cmd/mdatagen/internal/validate.go#L29

Added line #L29 was not covered by tests
}
return errs
}

Expand Down Expand Up @@ -143,6 +146,18 @@ func (md *Metadata) validateMetrics() error {
return errs
}

func (md *Metadata) validateEntities() error {
var errs error
for _, entity := range md.Entities {
for _, attr := range append(entity.IDAttributes, entity.DescriptiveAttributes...) {
if _, ok := md.ResourceAttributes[attr]; !ok {
errs = errors.Join(errs, fmt.Errorf("undefined resource attribute: %v", attr))

Check warning on line 154 in cmd/mdatagen/internal/validate.go

View check run for this annotation

Codecov / codecov/patch

cmd/mdatagen/internal/validate.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}
}
}
return errs
}

func (m *Metric) validate() error {
var errs error
if m.Description == "" {
Expand Down
6 changes: 6 additions & 0 deletions consumer/consumertest/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/pdata/pentity"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
Expand All @@ -34,13 +35,17 @@ type Consumer interface {
// ConsumeProfiles to implement the consumerprofiles.Profiles.
ConsumeProfiles(context.Context, pprofile.Profiles) error

// ConsumeEntities to implement the consumer.Entities.
ConsumeEntities(context.Context, pentity.Entities) error

unexported()
}

var _ consumer.Logs = (Consumer)(nil)
var _ consumer.Metrics = (Consumer)(nil)
var _ consumer.Traces = (Consumer)(nil)
var _ consumerprofiles.Profiles = (Consumer)(nil)
var _ consumer.Entities = (Consumer)(nil)

type nonMutatingConsumer struct{}

Expand All @@ -55,6 +60,7 @@ type baseConsumer struct {
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
consumerprofiles.ConsumeProfilesFunc
consumer.ConsumeEntitiesFunc
}

func (bc baseConsumer) unexported() {}
2 changes: 2 additions & 0 deletions consumer/consumertest/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package consumertest // import "go.opentelemetry.io/collector/consumer/consumert
import (
"context"

"go.opentelemetry.io/collector/pdata/pentity"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
Expand All @@ -19,5 +20,6 @@ func NewNop() Consumer {
ConsumeMetricsFunc: func(context.Context, pmetric.Metrics) error { return nil },
ConsumeLogsFunc: func(context.Context, plog.Logs) error { return nil },
ConsumeProfilesFunc: func(context.Context, pprofile.Profiles) error { return nil },
ConsumeEntitiesFunc: func(context.Context, pentity.Entities) error { return nil },

Check warning on line 23 in consumer/consumertest/nop.go

View check run for this annotation

Codecov / codecov/patch

consumer/consumertest/nop.go#L23

Added line #L23 was not covered by tests
}
}
43 changes: 43 additions & 0 deletions consumer/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package consumer // import "go.opentelemetry.io/collector/consumer"

import (
"context"

"go.opentelemetry.io/collector/consumer/internal"
"go.opentelemetry.io/collector/pdata/pentity"
)

// Entities is an interface that receives pentity.Entities, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Entities interface {
internal.BaseConsumer
// ConsumeEntities receives pentity.Entities for consumption.
ConsumeEntities(ctx context.Context, td pentity.Entities) error
}

// ConsumeEntitiesFunc is a helper function that is similar to ConsumeEntities.
type ConsumeEntitiesFunc func(ctx context.Context, td pentity.Entities) error

// ConsumeEntities calls f(ctx, td).
func (f ConsumeEntitiesFunc) ConsumeEntities(ctx context.Context, td pentity.Entities) error {
return f(ctx, td)

Check warning on line 26 in consumer/entities.go

View check run for this annotation

Codecov / codecov/patch

consumer/entities.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}

type baseEntities struct {
*internal.BaseImpl
ConsumeEntitiesFunc
}

// NewEntities returns a Entities configured with the provided options.
func NewEntities(consume ConsumeEntitiesFunc, options ...Option) (Entities, error) {
if consume == nil {
return nil, errNilFunc

Check warning on line 37 in consumer/entities.go

View check run for this annotation

Codecov / codecov/patch

consumer/entities.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}
return &baseEntities{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeEntitiesFunc: consume,
}, nil

Check warning on line 42 in consumer/entities.go

View check run for this annotation

Codecov / codecov/patch

consumer/entities.go#L39-L42

Added lines #L39 - L42 were not covered by tests
}
39 changes: 39 additions & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ type Logs interface {
consumer.Logs
}

// Entities is an exporter that can consume entities.
type Entities interface {
component.Component
consumer.Entities
}

// Settings configures exporter creators.
type Settings struct {
// ID returns the ID of the component that will be created.
Expand Down Expand Up @@ -72,6 +78,14 @@ type Factory interface {
// LogsExporterStability gets the stability level of the LogsExporter.
LogsExporterStability() component.StabilityLevel

// CreateEntitiesExporter creates an EntitiesExporter based on the config.
// If the exporter type does not support entities,
// this function returns the error [pipeline.ErrSignalNotSupported].
CreateEntitiesExporter(ctx context.Context, set Settings, cfg component.Config) (Entities, error)

// EntityExporterStability gets the stability level of the EntityExporter.
EntityExporterStability() component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -123,6 +137,17 @@ func (f CreateLogsFunc) CreateLogsExporter(ctx context.Context, set Settings, cf
return f(ctx, set, cfg)
}

// CreateEntitiesFunc is the equivalent of Factory.CreateEntities.
type CreateEntitiesFunc func(context.Context, Settings, component.Config) (Entities, error)

// CreateEntitiesExporter implements Factory.CreateEntities.
func (f CreateEntitiesFunc) CreateEntitiesExporter(ctx context.Context, set Settings, cfg component.Config) (Entities, error) {
if f == nil {
return nil, pipeline.ErrSignalNotSupported
}
return f(ctx, set, cfg)

Check warning on line 148 in exporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporter.go#L148

Added line #L148 was not covered by tests
}

type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
Expand All @@ -132,6 +157,8 @@ type factory struct {
metricsStabilityLevel component.StabilityLevel
CreateLogsFunc
logsStabilityLevel component.StabilityLevel
CreateEntitiesFunc
entitiesStabilityLevel component.StabilityLevel
}

func (f *factory) Type() component.Type {
Expand All @@ -152,6 +179,10 @@ func (f *factory) LogsExporterStability() component.StabilityLevel {
return f.logsStabilityLevel
}

func (f *factory) EntityExporterStability() component.StabilityLevel {
return f.entitiesStabilityLevel

Check warning on line 183 in exporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporter.go#L182-L183

Added lines #L182 - L183 were not covered by tests
}

// WithTraces overrides the default "error not supported" implementation for CreateTracesExporter and the default "undefined" stability level.
func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
Expand All @@ -176,6 +207,14 @@ func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOpt
})
}

// WithEntities overrides the default "error not supported" implementation for CreateEntitiesExporter and the default "undefined" stability level.
func WithEntities(createEntities CreateEntitiesFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.entitiesStabilityLevel = sl
o.CreateEntitiesFunc = createEntities
})

Check warning on line 215 in exporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporter.go#L211-L215

Added lines #L211 - L215 were not covered by tests
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ func TestNewFactory(t *testing.T) {
_, err = factory.CreateMetricsExporter(context.Background(), Settings{}, &defaultCfg)
require.Error(t, err)
_, err = factory.CreateLogsExporter(context.Background(), Settings{}, &defaultCfg)
assert.Error(t, err)
require.Error(t, err)
_, err = factory.CreateEntitiesExporter(context.Background(), Settings{}, &defaultCfg)
require.Error(t, err)
}

func TestNewFactoryWithOptions(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions exporter/internal/entities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/internal"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)

// Profiles is an exporter that can consume profiles.
type Entities interface {
component.Component
consumer.Entities
}
16 changes: 16 additions & 0 deletions exporter/internal/otlptext/databuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,22 @@ func (b *dataBuffer) logExemplars(description string, se pmetric.ExemplarSlice)
}
}

func (b *dataBuffer) logResourceEntities(rers pcommon.ResourceEntityRefSlice) {
if rers.Len() == 0 {
return
}

b.logEntry("Entities:")
for i := 0; i < rers.Len(); i++ {
rer := rers.At(i)
b.logEntry("Entity Ref #%d", i)
b.logEntry(" -> Entity Type: %s", rer.Type())
b.logEntry(" -> SchemaURL: %s", rer.SchemaUrl())
b.logEntry(" -> Identifying Attributes: %s", strings.Join(rer.IdAttrKeys().AsRaw(), ", "))
b.logEntry(" -> Descriptive Attributes: %s", strings.Join(rer.DescrAttrKeys().AsRaw(), ", "))

Check warning on line 295 in exporter/internal/otlptext/databuffer.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/otlptext/databuffer.go#L288-L295

Added lines #L288 - L295 were not covered by tests
}
}

func valueToString(v pcommon.Value) string {
return fmt.Sprintf("%s(%s)", v.Type().String(), v.AsString())
}
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (textLogsMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) {
buf.logEntry("ResourceLog #%d", i)
rl := rls.At(i)
buf.logEntry("Resource SchemaURL: %s", rl.SchemaUrl())
buf.logAttributes("Resource attributes", rl.Resource().Attributes())
marshalResource(rl.Resource(), &buf)
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
buf.logEntry("ScopeLogs #%d", j)
Expand Down
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (textMetricsMarshaler) MarshalMetrics(md pmetric.Metrics) ([]byte, error) {
buf.logEntry("ResourceMetrics #%d", i)
rm := rms.At(i)
buf.logEntry("Resource SchemaURL: %s", rm.SchemaUrl())
buf.logAttributes("Resource attributes", rm.Resource().Attributes())
marshalResource(rm.Resource(), &buf)
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
buf.logEntry("ScopeMetrics #%d", j)
Expand Down
13 changes: 13 additions & 0 deletions exporter/internal/otlptext/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlptext // import "go.opentelemetry.io/collector/exporter/internal/otlptext"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

func marshalResource(res pcommon.Resource, buf *dataBuffer) {
buf.logAttributes("Resource attributes", res.Attributes())
buf.logResourceEntities(res.Entities())
}
2 changes: 1 addition & 1 deletion exporter/internal/otlptext/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (textTracesMarshaler) MarshalTraces(td ptrace.Traces) ([]byte, error) {
buf.logEntry("ResourceSpans #%d", i)
rs := rss.At(i)
buf.logEntry("Resource SchemaURL: %s", rs.SchemaUrl())
buf.logAttributes("Resource attributes", rs.Resource().Attributes())
marshalResource(rs.Resource(), &buf)
ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ {
buf.logEntry("ScopeSpans #%d", j)
Expand Down
Loading

0 comments on commit d15292c

Please sign in to comment.