diff --git a/.chloggen/migrate-to-use.yaml b/.chloggen/migrate-to-use.yaml new file mode 100755 index 0000000000..5569f09a72 --- /dev/null +++ b/.chloggen/migrate-to-use.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: collector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Improves the performance of port and configuration parsing in the operator + +# One or more tracking issues related to the change +issues: [2603] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index dbe9c43efd..3dd2350ae8 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -24,18 +24,24 @@ import ( "strconv" "strings" + "github.com/go-logr/logr" "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/exporters" + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" ) -type ComponentType int +type ComponentKind int const ( - ComponentTypeReceiver ComponentType = iota - ComponentTypeExporter - ComponentTypeProcessor + KindReceiver ComponentKind = iota + KindExporter + KindProcessor ) -func (c ComponentType) String() string { +func (c ComponentKind) String() string { return [...]string{"receiver", "exporter", "processor"}[c] } @@ -95,24 +101,24 @@ type Pipeline struct { } // GetEnabledComponents constructs a list of enabled components by component type. -func (c *Config) GetEnabledComponents() map[ComponentType]map[string]interface{} { - toReturn := map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: {}, - ComponentTypeProcessor: {}, - ComponentTypeExporter: {}, +func (c *Config) GetEnabledComponents() map[ComponentKind]map[string]interface{} { + toReturn := map[ComponentKind]map[string]interface{}{ + KindReceiver: {}, + KindProcessor: {}, + KindExporter: {}, } for _, pipeline := range c.Service.Pipelines { if pipeline == nil { continue } for _, componentId := range pipeline.Receivers { - toReturn[ComponentTypeReceiver][componentId] = struct{}{} + toReturn[KindReceiver][componentId] = struct{}{} } for _, componentId := range pipeline.Exporters { - toReturn[ComponentTypeExporter][componentId] = struct{}{} + toReturn[KindExporter][componentId] = struct{}{} } for _, componentId := range pipeline.Processors { - toReturn[ComponentTypeProcessor][componentId] = struct{}{} + toReturn[KindProcessor][componentId] = struct{}{} } } return toReturn @@ -133,6 +139,53 @@ type Config struct { Service Service `json:"service" yaml:"service"` } +// getPortsForComponentKinds gets the ports for the given ComponentKind(s). +func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) ([]corev1.ServicePort, error) { + var ports []corev1.ServicePort + enabledComponents := c.GetEnabledComponents() + for _, componentKind := range componentKinds { + var retriever components.ParserRetriever + var cfg AnyConfig + switch componentKind { + case KindReceiver: + retriever = receivers.ReceiverFor + cfg = c.Receivers + case KindExporter: + retriever = exporters.ParserFor + cfg = c.Exporters + case KindProcessor: + break + } + for componentName := range enabledComponents[componentKind] { + // TODO: Clean up the naming here and make it simpler to use a retriever. + parser := retriever(componentName) + if parsedPorts, err := parser.Ports(logger, componentName, cfg.Object[componentName]); err != nil { + return nil, err + } else { + ports = append(ports, parsedPorts...) + } + } + } + + sort.Slice(ports, func(i, j int) bool { + return ports[i].Name < ports[j].Name + }) + + return ports, nil +} + +func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) { + return c.getPortsForComponentKinds(logger, KindReceiver) +} + +func (c *Config) GetExporterPorts(logger logr.Logger) ([]corev1.ServicePort, error) { + return c.getPortsForComponentKinds(logger, KindExporter) +} + +func (c *Config) GetAllPorts(logger logr.Logger) ([]corev1.ServicePort, error) { + return c.getPortsForComponentKinds(logger, KindReceiver, KindExporter) +} + // Yaml encodes the current object and returns it as a string. func (c *Config) Yaml() (string, error) { var buf bytes.Buffer diff --git a/apis/v1beta1/config_test.go b/apis/v1beta1/config_test.go index c1ad6d59fe..117ad5a414 100644 --- a/apis/v1beta1/config_test.go +++ b/apis/v1beta1/config_test.go @@ -21,9 +21,13 @@ import ( "strings" "testing" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" go_yaml "gopkg.in/yaml.v3" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -284,19 +288,19 @@ func TestConfig_GetEnabledComponents(t *testing.T) { tests := []struct { name string file string - want map[ComponentType]map[string]interface{} + want map[ComponentKind]map[string]interface{} }{ { name: "connectors", file: "testdata/otelcol-connectors.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: { + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: { "foo": struct{}{}, "count": struct{}{}, }, - ComponentTypeProcessor: {}, - ComponentTypeExporter: { + KindProcessor: {}, + KindExporter: { "bar": struct{}{}, "count": struct{}{}, }, @@ -305,16 +309,16 @@ func TestConfig_GetEnabledComponents(t *testing.T) { { name: "couchbase", file: "testdata/otelcol-couchbase.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: { + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: { "prometheus/couchbase": struct{}{}, }, - ComponentTypeProcessor: { + KindProcessor: { "filter/couchbase": struct{}{}, "metricstransform/couchbase": struct{}{}, "transform/couchbase": struct{}{}, }, - ComponentTypeExporter: { + KindExporter: { "prometheus": struct{}{}, }, }, @@ -322,14 +326,14 @@ func TestConfig_GetEnabledComponents(t *testing.T) { { name: "demo", file: "testdata/otelcol-demo.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: { + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: { "otlp": struct{}{}, }, - ComponentTypeProcessor: { + KindProcessor: { "batch": struct{}{}, }, - ComponentTypeExporter: { + KindExporter: { "debug": struct{}{}, "zipkin": struct{}{}, "otlp": struct{}{}, @@ -340,12 +344,12 @@ func TestConfig_GetEnabledComponents(t *testing.T) { { name: "extensions", file: "testdata/otelcol-extensions.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: { + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: { "otlp": struct{}{}, }, - ComponentTypeProcessor: {}, - ComponentTypeExporter: { + KindProcessor: {}, + KindExporter: { "otlp/auth": struct{}{}, }, }, @@ -353,12 +357,12 @@ func TestConfig_GetEnabledComponents(t *testing.T) { { name: "filelog", file: "testdata/otelcol-filelog.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: { + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: { "filelog": struct{}{}, }, - ComponentTypeProcessor: {}, - ComponentTypeExporter: { + KindProcessor: {}, + KindExporter: { "debug": struct{}{}, }, }, @@ -366,10 +370,10 @@ func TestConfig_GetEnabledComponents(t *testing.T) { { name: "null", file: "testdata/otelcol-null-values.yaml", - want: map[ComponentType]map[string]interface{}{ - ComponentTypeReceiver: {}, - ComponentTypeProcessor: {}, - ComponentTypeExporter: {}, + want: map[ComponentKind]map[string]interface{}{ + KindReceiver: {}, + KindProcessor: {}, + KindExporter: {}, }, }, } @@ -385,3 +389,160 @@ func TestConfig_GetEnabledComponents(t *testing.T) { }) } } + +func TestConfig_GetReceiverPorts(t *testing.T) { + tests := []struct { + name string + file string + want []v1.ServicePort + wantErr bool + }{ + + { + name: "connectors", + file: "testdata/otelcol-connectors.yaml", + want: nil, + wantErr: true, + }, + { + name: "couchbase", + file: "testdata/otelcol-couchbase.yaml", + want: nil, // Couchbase uses a prometheus scraper, no ports should be opened + }, + { + name: "demo", + file: "testdata/otelcol-demo.yaml", + want: []v1.ServicePort{ + { + Name: "otlp-grpc", + Protocol: "", + AppProtocol: ptr.To("grpc"), + Port: 4317, + TargetPort: intstr.FromInt32(4317), + }, + }, + }, + { + name: "extensions", + file: "testdata/otelcol-extensions.yaml", + want: []v1.ServicePort{ + { + Name: "otlp-grpc", + Protocol: "", + AppProtocol: ptr.To("grpc"), + Port: 4317, + TargetPort: intstr.FromInt32(4317), + }, + }, + }, + { + name: "filelog", + file: "testdata/otelcol-filelog.yaml", + want: nil, + }, + { + name: "null", + file: "testdata/otelcol-null-values.yaml", + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collectorYaml, err := os.ReadFile(tt.file) + require.NoError(t, err) + + c := &Config{} + err = go_yaml.Unmarshal(collectorYaml, c) + require.NoError(t, err) + ports, err := c.GetReceiverPorts(logr.Discard()) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.Equalf(t, tt.want, ports, "GetReceiverPorts()") + }) + } +} + +func TestConfig_GetExporterPorts(t *testing.T) { + tests := []struct { + name string + file string + want []v1.ServicePort + wantErr bool + }{ + + { + name: "connectors", + file: "testdata/otelcol-connectors.yaml", + want: nil, + wantErr: false, + }, + { + name: "couchbase", + file: "testdata/otelcol-couchbase.yaml", + want: []v1.ServicePort{ + { + Name: "prometheus", + Port: 9123, + }, + }, + }, + { + name: "demo", + file: "testdata/otelcol-demo.yaml", + want: []v1.ServicePort{ + { + Name: "prometheus", + Port: 8889, + }, + { + Name: "otlp", + Port: 4317, + }, + { + Name: "zipkin", + Port: 9411, + }, + }, + }, + { + name: "extensions", + file: "testdata/otelcol-extensions.yaml", + want: []v1.ServicePort{ + { + Name: "otlp-auth", + Port: 4317, + }, + }, + }, + { + name: "filelog", + file: "testdata/otelcol-filelog.yaml", + want: nil, + }, + { + name: "null", + file: "testdata/otelcol-null-values.yaml", + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collectorYaml, err := os.ReadFile(tt.file) + require.NoError(t, err) + + c := &Config{} + err = go_yaml.Unmarshal(collectorYaml, c) + require.NoError(t, err) + ports, err := c.GetExporterPorts(logr.Discard()) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + assert.ElementsMatchf(t, tt.want, ports, "GetReceiverPorts()") + }) + } +} diff --git a/apis/v1beta1/metrics.go b/apis/v1beta1/metrics.go index 395306059d..3c0e27825f 100644 --- a/apis/v1beta1/metrics.go +++ b/apis/v1beta1/metrics.go @@ -44,7 +44,7 @@ const ( ) // TODO: Refactor this logic, centralize it. See: https://github.com/open-telemetry/opentelemetry-operator/issues/2603 -type components struct { +type componentDefinitions struct { receivers []string processors []string exporters []string @@ -193,9 +193,9 @@ func extractElements(elements map[string]interface{}) []string { return items } -func getComponentsFromConfig(yamlContent Config) *components { +func getComponentsFromConfig(yamlContent Config) *componentDefinitions { - info := &components{ + info := &componentDefinitions{ receivers: extractElements(yamlContent.Receivers.Object), exporters: extractElements(yamlContent.Exporters.Object), } diff --git a/internal/components/component.go b/internal/components/component.go index e704c39e25..1cdaf1dfea 100644 --- a/internal/components/component.go +++ b/internal/components/component.go @@ -81,22 +81,25 @@ func PortFromEndpoint(endpoint string) (int32, error) { port, err = strconv.ParseInt(cleanedPortStr, 10, 32) if err != nil { - return 0, err + return UnsetPort, err } } if port == 0 { - return 0, PortNotFoundErr + return UnsetPort, PortNotFoundErr } return int32(port), err } +type ParserRetriever func(string) ComponentPortParser + type ComponentPortParser interface { - // Ports returns the service ports parsed based on the exporter's configuration - Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) + // Ports returns the service ports parsed based on the component's configuration where name is the component's name + // of the form "name" or "type/name" + Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) - // ParserType returns the name of this parser + // ParserType returns the type of this parser ParserType() string // ParserName is an internal name for the parser @@ -113,3 +116,16 @@ func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.Servic Protocol: current.Protocol, } } + +func GetPortsForConfig(logger logr.Logger, config map[string]interface{}, retriever ParserRetriever) ([]corev1.ServicePort, error) { + var ports []corev1.ServicePort + for componentName, componentDef := range config { + parser := retriever(componentName) + if parsedPorts, err := parser.Ports(logger, componentName, componentDef); err != nil { + return nil, err + } else { + ports = append(ports, parsedPorts...) + } + } + return ports, nil +} diff --git a/internal/components/component_test.go b/internal/components/component_test.go index 4671e98087..1e9e5a21b6 100644 --- a/internal/components/component_test.go +++ b/internal/components/component_test.go @@ -15,11 +15,16 @@ package components_test import ( + "fmt" "testing" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/receivers" ) func TestComponentType(t *testing.T) { @@ -65,3 +70,68 @@ func TestReceiverParsePortFromEndpoint(t *testing.T) { }) } } + +func TestGetPortsForConfig(t *testing.T) { + type args struct { + config map[string]interface{} + retriever components.ParserRetriever + } + tests := []struct { + name string + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "nothing", + args: args{ + config: nil, + retriever: receivers.ReceiverFor, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "bad config", + args: args{ + config: map[string]interface{}{ + "test": "garbage", + }, + retriever: receivers.ReceiverFor, + }, + want: nil, + wantErr: assert.Error, + }, + { + name: "receivers", + args: args{ + config: map[string]interface{}{ + "otlp": map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + }, + retriever: receivers.ReceiverFor, + }, + want: []corev1.ServicePort{ + { + Name: "otlp-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &components.GrpcProtocol, + }, + }, + wantErr: assert.NoError, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := components.GetPortsForConfig(logr.Discard(), tt.args.config, tt.args.retriever) + if !tt.wantErr(t, err, fmt.Sprintf("GetPortsForConfig(%v)", tt.args.config)) { + return + } + assert.Equalf(t, tt.want, got, "GetPortsForConfig(%v)", tt.args.config) + }) + } +} diff --git a/internal/components/exporters/helpers.go b/internal/components/exporters/helpers.go new file mode 100644 index 0000000000..644477cb5d --- /dev/null +++ b/internal/components/exporters/helpers.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporters + +import ( + "github.com/open-telemetry/opentelemetry-operator/internal/components" +) + +// registry holds a record of all known receiver parsers. +var registry = make(map[string]components.ComponentPortParser) + +// Register adds a new parser builder to the list of known builders. +func Register(name string, p components.ComponentPortParser) { + registry[name] = p +} + +// IsRegistered checks whether a parser is registered with the given name. +func IsRegistered(name string) bool { + _, ok := registry[name] + return ok +} + +// ParserFor returns a parser builder for the given exporter name. +func ParserFor(name string) components.ComponentPortParser { + if parser, ok := registry[components.ComponentType(name)]; ok { + return parser + } + // We want the default for exporters to fail silently. + return components.NewSilentSinglePortParser(components.ComponentType(name), components.UnsetPort) +} + +var ( + componentParsers = []components.ComponentPortParser{ + components.NewSinglePortParser("prometheus", 8888), + } +) + +func init() { + for _, parser := range componentParsers { + Register(parser.ParserType(), parser) + } +} diff --git a/internal/components/exporters/helpers_test.go b/internal/components/exporters/helpers_test.go new file mode 100644 index 0000000000..5e449cf74f --- /dev/null +++ b/internal/components/exporters/helpers_test.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporters_test + +import ( + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-operator/internal/components" + "github.com/open-telemetry/opentelemetry-operator/internal/components/exporters" + "github.com/open-telemetry/opentelemetry-operator/internal/naming" +) + +func TestParserForReturns(t *testing.T) { + const testComponentName = "test" + parser := exporters.ParserFor(testComponentName) + assert.Equal(t, "test", parser.ParserType()) + assert.Equal(t, "__test", parser.ParserName()) + ports, err := parser.Ports(logr.Discard(), testComponentName, map[string]interface{}{ + "endpoint": "localhost:9000", + }) + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.Equal(t, ports[0].Port, int32(9000)) +} + +func TestCanRegister(t *testing.T) { + const testComponentName = "test" + exporters.Register(testComponentName, components.NewSinglePortParser(testComponentName, 9000)) + assert.True(t, exporters.IsRegistered(testComponentName)) + parser := exporters.ParserFor(testComponentName) + assert.Equal(t, "test", parser.ParserType()) + assert.Equal(t, "__test", parser.ParserName()) + ports, err := parser.Ports(logr.Discard(), testComponentName, map[string]interface{}{}) + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.Equal(t, ports[0].Port, int32(9000)) +} + +func TestExporterComponentParsers(t *testing.T) { + for _, tt := range []struct { + exporterName string + parserName string + defaultPort int + }{ + {"prometheus", "__prometheus", 8888}, + } { + t.Run(tt.exporterName, func(t *testing.T) { + t.Run("is registered", func(t *testing.T) { + assert.True(t, exporters.IsRegistered(tt.exporterName)) + }) + t.Run("bad config errors", func(t *testing.T) { + // prepare + parser := exporters.ParserFor(tt.exporterName) + + // test throwing in pure junk + _, err := parser.Ports(logr.Discard(), tt.exporterName, func() {}) + + // verify + assert.ErrorContains(t, err, "expected a map, got ") + }) + + t.Run("assigns the expected port", func(t *testing.T) { + // prepare + parser := exporters.ParserFor(tt.exporterName) + + // test + ports, err := parser.Ports(logr.Discard(), tt.exporterName, map[string]interface{}{}) + + if tt.defaultPort == 0 { + assert.Len(t, ports, 0) + return + } + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, tt.defaultPort, ports[0].Port) + assert.Equal(t, naming.PortName(tt.exporterName, int32(tt.defaultPort)), ports[0].Name) + }) + + t.Run("allows port to be overridden", func(t *testing.T) { + // prepare + parser := exporters.ParserFor(tt.exporterName) + + // test + ports, err := parser.Ports(logr.Discard(), tt.exporterName, map[string]interface{}{ + "endpoint": "0.0.0.0:65535", + }) + + // verify + assert.NoError(t, err) + assert.Len(t, ports, 1) + assert.EqualValues(t, 65535, ports[0].Port) + assert.Equal(t, naming.PortName(tt.exporterName, int32(tt.defaultPort)), ports[0].Name) + }) + }) + } +} diff --git a/internal/components/multi_endpoint.go b/internal/components/multi_endpoint.go index 304d92d521..e089465f5d 100644 --- a/internal/components/multi_endpoint.go +++ b/internal/components/multi_endpoint.go @@ -42,7 +42,7 @@ type MultiPortReceiver struct { portMappings map[string]*corev1.ServicePort } -func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { +func (m *MultiPortReceiver) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { multiProtoEndpointCfg := &MultiProtocolEndpointConfig{} if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil { return nil, err @@ -53,7 +53,7 @@ func (m *MultiPortReceiver) Ports(logger logr.Logger, config interface{}) ([]cor port := defaultSvc.Port if ec != nil { port = ec.GetPortNumOrDefault(logger, port) - defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", m.name, protocol), port) + defaultSvc.Name = naming.PortName(fmt.Sprintf("%s-%s", name, protocol), port) } ports = append(ports, ConstructServicePort(defaultSvc, port)) } else { diff --git a/internal/components/multi_endpoint_test.go b/internal/components/multi_endpoint_test.go index 8009b8e9f3..f3829d82e9 100644 --- a/internal/components/multi_endpoint_test.go +++ b/internal/components/multi_endpoint_test.go @@ -319,7 +319,7 @@ func TestMultiPortReceiver_Ports(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := components.NewMultiPortReceiver(tt.fields.name, tt.fields.opts...) - got, err := m.Ports(logr.Discard(), tt.args.config) + got, err := m.Ports(logr.Discard(), tt.fields.name, tt.args.config) if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { return } diff --git a/internal/components/receivers/helpers.go b/internal/components/receivers/helpers.go index 2848b36514..220558fdfd 100644 --- a/internal/components/receivers/helpers.go +++ b/internal/components/receivers/helpers.go @@ -30,12 +30,12 @@ func Register(name string, p components.ComponentPortParser) { // IsRegistered checks whether a parser is registered with the given name. func IsRegistered(name string) bool { - _, ok := registry[name] + _, ok := registry[components.ComponentType(name)] return ok } -// BuilderFor returns a parser builder for the given exporter name. -func BuilderFor(name string) components.ComponentPortParser { +// ReceiverFor returns a parser builder for the given exporter name. +func ReceiverFor(name string) components.ComponentPortParser { if parser, ok := registry[components.ComponentType(name)]; ok { return parser } @@ -136,6 +136,7 @@ var ( NewScraperParser("haproxy"), NewScraperParser("flinkmetrics"), NewScraperParser("couchdb"), + NewScraperParser("filelog"), } ) diff --git a/internal/components/receivers/multi_endpoint_receiver_test.go b/internal/components/receivers/multi_endpoint_receiver_test.go index dde04b763f..ff6990378d 100644 --- a/internal/components/receivers/multi_endpoint_receiver_test.go +++ b/internal/components/receivers/multi_endpoint_receiver_test.go @@ -186,6 +186,72 @@ func TestMultiEndpointReceiverParsers(t *testing.T) { }, }, }, + { + receiverName: "otlp/test", + parserName: "__otlp", + cases: []testCase{ + { + name: "minimal config", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-test-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "grpc overridden", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "0.0.0.0:1234", + }, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-test-grpc", + Port: 1234, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + }, + }, + { + name: "all defaults", + config: map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{}, + "http": map[string]interface{}{}, + }, + }, + expectedErr: nil, + expectedSvc: []corev1.ServicePort{ + { + Name: "otlp-test-grpc", + Port: 4317, + TargetPort: intstr.FromInt32(4317), + AppProtocol: &grpc, + }, + { + Name: "otlp-test-http", + Port: 4318, + TargetPort: intstr.FromInt32(4318), + AppProtocol: &http, + }, + }, + }, + }, + }, { receiverName: "loki", parserName: "__loki", @@ -326,26 +392,26 @@ func TestMultiEndpointReceiverParsers(t *testing.T) { }) t.Run("is found by name", func(t *testing.T) { - p := receivers.BuilderFor(tt.receiverName) + p := receivers.ReceiverFor(tt.receiverName) assert.Equal(t, tt.parserName, p.ParserName()) }) t.Run("bad config errors", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - _, err := parser.Ports(logger, []interface{}{"junk"}) + _, err := parser.Ports(logger, tt.receiverName, []interface{}{"junk"}) // verify assert.ErrorContains(t, err, "expected a map, got 'slice'") }) t.Run("good config, unknown protocol", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - _, err := parser.Ports(logger, map[string]interface{}{ + _, err := parser.Ports(logger, tt.receiverName, map[string]interface{}{ "protocols": map[string]interface{}{ "garbage": map[string]interface{}{}, }, @@ -357,10 +423,10 @@ func TestMultiEndpointReceiverParsers(t *testing.T) { for _, kase := range tt.cases { t.Run(kase.name, func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - ports, err := parser.Ports(logger, kase.config) + ports, err := parser.Ports(logger, tt.receiverName, kase.config) if kase.expectedErr != nil { assert.EqualError(t, err, kase.expectedErr.Error()) return diff --git a/internal/components/receivers/scraper.go b/internal/components/receivers/scraper.go index 8f01e95c3a..01f0ad53d9 100644 --- a/internal/components/receivers/scraper.go +++ b/internal/components/receivers/scraper.go @@ -31,7 +31,7 @@ type ScraperParser struct { componentType string } -func (s *ScraperParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { +func (s *ScraperParser) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { return nil, nil } diff --git a/internal/components/receivers/scraper_test.go b/internal/components/receivers/scraper_test.go index 3456cbc6ff..59ac2eec1f 100644 --- a/internal/components/receivers/scraper_test.go +++ b/internal/components/receivers/scraper_test.go @@ -62,7 +62,7 @@ func TestScraperParsers(t *testing.T) { t.Run(tt.receiverName, func(t *testing.T) { t.Run("builds successfully", func(t *testing.T) { // test - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // verify assert.Equal(t, tt.parserName, parser.ParserName()) @@ -70,10 +70,10 @@ func TestScraperParsers(t *testing.T) { t.Run("default is nothing", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - ports, err := parser.Ports(logger, map[string]interface{}{}) + ports, err := parser.Ports(logger, tt.receiverName, map[string]interface{}{}) // verify assert.NoError(t, err) @@ -82,10 +82,10 @@ func TestScraperParsers(t *testing.T) { t.Run("always returns nothing", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - ports, err := parser.Ports(logger, map[string]interface{}{ + ports, err := parser.Ports(logger, tt.receiverName, map[string]interface{}{ "endpoint": "0.0.0.0:65535", }) diff --git a/internal/components/receivers/single_endpoint_receiver_test.go b/internal/components/receivers/single_endpoint_receiver_test.go index f06353ca90..00993ca06d 100644 --- a/internal/components/receivers/single_endpoint_receiver_test.go +++ b/internal/components/receivers/single_endpoint_receiver_test.go @@ -30,10 +30,10 @@ var logger = logf.Log.WithName("unit-tests") func TestParseEndpoint(t *testing.T) { // prepare // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser - parser := receivers.BuilderFor("myreceiver") + parser := receivers.ReceiverFor("myreceiver") // test - ports, err := parser.Ports(logger, map[string]interface{}{ + ports, err := parser.Ports(logger, "myreceiver", map[string]interface{}{ "endpoint": "0.0.0.0:1234", }) @@ -46,10 +46,10 @@ func TestParseEndpoint(t *testing.T) { func TestFailedToParseEndpoint(t *testing.T) { // prepare // there's no parser registered to handle "myreceiver", so, it falls back to the generic parser - parser := receivers.BuilderFor("myreceiver") + parser := receivers.ReceiverFor("myreceiver") // test - ports, err := parser.Ports(logger, map[string]interface{}{ + ports, err := parser.Ports(logger, "myreceiver", map[string]interface{}{ "endpoint": "0.0.0.0", }) @@ -86,17 +86,17 @@ func TestDownstreamParsers(t *testing.T) { t.Run(tt.receiverName, func(t *testing.T) { t.Run("builds successfully", func(t *testing.T) { // test - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // verify assert.Equal(t, tt.parserName, parser.ParserName()) }) t.Run("bad config errors", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test throwing in pure junk - _, err := parser.Ports(logger, func() {}) + _, err := parser.Ports(logger, tt.receiverName, func() {}) // verify assert.ErrorContains(t, err, "expected a map, got 'func'") @@ -104,10 +104,10 @@ func TestDownstreamParsers(t *testing.T) { t.Run("assigns the expected port", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test - ports, err := parser.Ports(logger, map[string]interface{}{}) + ports, err := parser.Ports(logger, tt.receiverName, map[string]interface{}{}) if tt.defaultPort == 0 { assert.Len(t, ports, 0) @@ -122,17 +122,17 @@ func TestDownstreamParsers(t *testing.T) { t.Run("allows port to be overridden", func(t *testing.T) { // prepare - parser := receivers.BuilderFor(tt.receiverName) + parser := receivers.ReceiverFor(tt.receiverName) // test var ports []corev1.ServicePort var err error if tt.listenAddrParser { - ports, err = parser.Ports(logger, map[string]interface{}{ + ports, err = parser.Ports(logger, tt.receiverName, map[string]interface{}{ "listen_address": "0.0.0.0:65535", }) } else { - ports, err = parser.Ports(logger, map[string]interface{}{ + ports, err = parser.Ports(logger, tt.receiverName, map[string]interface{}{ "endpoint": "0.0.0.0:65535", }) } diff --git a/internal/components/single_endpoint.go b/internal/components/single_endpoint.go index f7de2b7aaa..5ec498f612 100644 --- a/internal/components/single_endpoint.go +++ b/internal/components/single_endpoint.go @@ -44,13 +44,15 @@ func (g *SingleEndpointConfig) GetPortNumOrDefault(logger logr.Logger, p int32) return num } +// GetPortNum attempts to get the port for the given config. If it cannot, the UnsetPort and the given missingPortError +// are returned. func (g *SingleEndpointConfig) GetPortNum() (int32, error) { if len(g.Endpoint) > 0 { return PortFromEndpoint(g.Endpoint) } else if len(g.ListenAddress) > 0 { return PortFromEndpoint(g.ListenAddress) } - return 0, PortNotFoundErr + return UnsetPort, PortNotFoundErr } // SingleEndpointParser is a special parser for a generic receiver that has an endpoint or listen_address in its @@ -59,20 +61,26 @@ type SingleEndpointParser struct { name string svcPort *corev1.ServicePort + + // failSilently allows the parser to prevent the propagation of failure if the parser fails to set a port. + failSilently bool } -func (s *SingleEndpointParser) Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error) { +func (s *SingleEndpointParser) Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error) { singleEndpointConfig := &SingleEndpointConfig{} if err := mapstructure.Decode(config, singleEndpointConfig); err != nil { return nil, err } if _, err := singleEndpointConfig.GetPortNum(); err != nil && s.svcPort.Port == UnsetPort { logger.WithValues("receiver", s.name).Error(err, "couldn't parse the endpoint's port and no default port set") + if s.failSilently { + err = nil + } return []corev1.ServicePort{}, err } port := singleEndpointConfig.GetPortNumOrDefault(logger, s.svcPort.Port) - s.svcPort.Name = naming.PortName(s.name, port) + s.svcPort.Name = naming.PortName(name, port) return []corev1.ServicePort{ConstructServicePort(s.svcPort, port)}, nil } @@ -94,3 +102,15 @@ func NewSinglePortParser(name string, port int32, opts ...PortBuilderOption) *Si } return &SingleEndpointParser{name: name, svcPort: servicePort} } + +// NewSilentSinglePortParser returns a SingleEndpointParser that errors silently on failure to find a port. +func NewSilentSinglePortParser(name string, port int32, opts ...PortBuilderOption) *SingleEndpointParser { + servicePort := &corev1.ServicePort{ + Name: naming.PortName(name, port), + Port: port, + } + for _, opt := range opts { + opt(servicePort) + } + return &SingleEndpointParser{name: name, svcPort: servicePort, failSilently: true} +} diff --git a/internal/components/single_endpoint_test.go b/internal/components/single_endpoint_test.go index b0efdb1c90..11d9121f54 100644 --- a/internal/components/single_endpoint_test.go +++ b/internal/components/single_endpoint_test.go @@ -284,7 +284,117 @@ func TestSingleEndpointParser_Ports(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := components.NewSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) - got, err := s.Ports(logr.Discard(), tt.args.config) + got, err := s.Ports(logr.Discard(), tt.fields.name, tt.args.config) + if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { + return + } + assert.ElementsMatchf(t, tt.want, got, "Ports(%v)", tt.args.config) + }) + } +} + +func TestNewSilentSinglePortParser_Ports(t *testing.T) { + type fields struct { + name string + port int32 + opts []components.PortBuilderOption + } + type args struct { + config interface{} + } + tests := []struct { + name string + fields fields + args args + want []corev1.ServicePort + wantErr assert.ErrorAssertionFunc + }{ + { + name: "ValidConfigWithPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{ + "port": 8080, + }, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ValidConfigWithDefaultPort", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + {Name: "testparser", Port: 8080}, + }, + wantErr: assert.NoError, + }, + { + name: "ConfigWithFixins", + fields: fields{ + name: "testparser", + port: 8080, + opts: []components.PortBuilderOption{ + components.WithTargetPort(4317), + components.WithProtocol(corev1.ProtocolTCP), + components.WithAppProtocol(&components.GrpcProtocol), + }, + }, + args: args{ + config: map[string]interface{}{}, + }, + want: []corev1.ServicePort{ + { + Name: "testparser", + Port: 8080, + TargetPort: intstr.FromInt32(4317), + Protocol: corev1.ProtocolTCP, + AppProtocol: &components.GrpcProtocol, + }, + }, + wantErr: assert.NoError, + }, + { + name: "InvalidConfigMissingPort", + fields: fields{ + name: "testparser", + port: 0, + }, + args: args{ + config: map[string]interface{}{ + "endpoint": "garbageeeee", + }, + }, + want: nil, + wantErr: assert.NoError, + }, + { + name: "ErrorParsingConfig", + fields: fields{ + name: "testparser", + port: 8080, + }, + args: args{ + config: "invalid config", + }, + want: nil, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := components.NewSilentSinglePortParser(tt.fields.name, tt.fields.port, tt.fields.opts...) + got, err := s.Ports(logr.Discard(), tt.fields.name, tt.args.config) if !tt.wantErr(t, err, fmt.Sprintf("Ports(%v)", tt.args.config)) { return } diff --git a/internal/manifests/collector/adapters/config_to_ports.go b/internal/manifests/collector/adapters/config_to_ports.go index cb960c423b..444eb0f9fe 100644 --- a/internal/manifests/collector/adapters/config_to_ports.go +++ b/internal/manifests/collector/adapters/config_to_ports.go @@ -14,18 +14,6 @@ package adapters -import ( - "fmt" - "sort" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser" - exporterParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/exporter" - receiverParser "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver" -) - type ComponentType int const ( @@ -37,105 +25,3 @@ const ( func (c ComponentType) String() string { return [...]string{"receiver", "exporter", "processor"}[c] } - -// ConfigToComponentPorts converts the incoming configuration object into a set of service ports required by the exporters. -func ConfigToComponentPorts(logger logr.Logger, cType ComponentType, config map[interface{}]interface{}) ([]corev1.ServicePort, error) { - // now, we gather which ports we might need to open - // for that, we get all the exporters and check their `endpoint` properties, - // extracting the port from it. The port name has to be a "DNS_LABEL", so, we try to make it follow the pattern: - // ${instance.Name}-${exporter.name}-${exporter.qualifier} - // the exporter-name is typically the node name from the exporters map - // the exporter-qualifier is what comes after the slash in the exporter name, but typically nil - // examples: - // ```yaml - // components: - // componentexample: - // endpoint: 0.0.0.0:12345 - // componentexample/settings: - // endpoint: 0.0.0.0:12346 - // in this case, we have 2 ports, named: "componentexample" and "componentexample-settings" - componentsProperty, ok := config[fmt.Sprintf("%ss", cType.String())] - if !ok { - return nil, fmt.Errorf("no %ss available as part of the configuration", cType) - } - - components, ok := componentsProperty.(map[interface{}]interface{}) - if !ok { - return nil, fmt.Errorf("%ss doesn't contain valid components", cType.String()) - } - - compEnabled := getEnabledComponents(config, cType) - - if compEnabled == nil { - return nil, fmt.Errorf("no enabled %ss available as part of the configuration", cType) - } - - ports := []corev1.ServicePort{} - for key, val := range components { - // This check will pass only the enabled components, - // then only the related ports will be opened. - if !compEnabled[key] { - continue - } - exporter, ok := val.(map[interface{}]interface{}) - if !ok { - logger.V(2).Info("component doesn't seem to be a map of properties", cType.String(), key) - exporter = map[interface{}]interface{}{} - } - - cmptName := key.(string) - var cmptParser parser.ComponentPortParser - var err error - switch cType { - case ComponentTypeExporter: - cmptParser, err = exporterParser.For(logger, cmptName, exporter) - case ComponentTypeReceiver: - cmptParser, err = receiverParser.For(logger, cmptName, exporter) - case ComponentTypeProcessor: - logger.V(4).Info("processors don't provide a way to enable associated ports", "name", key) - } - - if err != nil { - logger.V(2).Info("no parser found for", "component", cmptName) - continue - } - - exprtPorts, err := cmptParser.Ports() - if err != nil { - logger.Error(err, "parser for '%s' has returned an error: %w", cmptName, err) - continue - } - - if len(exprtPorts) > 0 { - ports = append(ports, exprtPorts...) - } - } - - sort.Slice(ports, func(i, j int) bool { - return ports[i].Name < ports[j].Name - }) - - return ports, nil -} - -func ConfigToPorts(logger logr.Logger, config map[interface{}]interface{}) ([]corev1.ServicePort, error) { - ports, err := ConfigToComponentPorts(logger, ComponentTypeReceiver, config) - if err != nil { - logger.Error(err, "there was a problem while getting the ports from the receivers") - return nil, err - } - - exporterPorts, err := ConfigToComponentPorts(logger, ComponentTypeExporter, config) - if err != nil { - logger.Error(err, "there was a problem while getting the ports from the exporters") - return nil, err - } - - ports = append(ports, exporterPorts...) - - sort.Slice(ports, func(i, j int) bool { - return ports[i].Name < ports[j].Name - }) - - return ports, nil -} diff --git a/internal/manifests/collector/adapters/config_to_ports_test.go b/internal/manifests/collector/adapters/config_to_ports_test.go deleted file mode 100644 index a53a695e3b..0000000000 --- a/internal/manifests/collector/adapters/config_to_ports_test.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package adapters_test - -import ( - "errors" - "testing" - - "github.com/go-logr/logr" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/parser/receiver" -) - -var logger = logf.Log.WithName("unit-tests") - -var portConfigStr = `receivers: - examplereceiver: - endpoint: "0.0.0.0:12345" - examplereceiver/settings: - endpoint: "0.0.0.0:12346" - examplereceiver/invalid-ignored: - endpoint: "0.0.0.0" - examplereceiver/invalid-not-number: - endpoint: "0.0.0.0:not-number" - examplereceiver/without-endpoint: - notendpoint: "0.0.0.0:12347" - jaeger: - protocols: - grpc: - thrift_compact: - thrift_binary: - endpoint: 0.0.0.0:6833 - jaeger/custom: - protocols: - thrift_http: - endpoint: 0.0.0.0:15268 - otlp: - protocols: - grpc: - http: - otlp/2: - protocols: - grpc: - endpoint: 0.0.0.0:55555 - zipkin: - zipkin/2: - endpoint: 0.0.0.0:33333 -service: - pipelines: - metrics: - receivers: [examplereceiver, examplereceiver/settings] - exporters: [debug] - metrics/1: - receivers: [jaeger, jaeger/custom] - exporters: [debug] - metrics/2: - receivers: [otlp, otlp/2, zipkin] - exporters: [debug] -` - -func TestExtractPortsFromConfig(t *testing.T) { - // prepare - config, err := adapters.ConfigFromString(portConfigStr) - require.NoError(t, err) - require.NotEmpty(t, config) - - // test - ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config) - assert.NoError(t, err) - assert.Len(t, ports, 10) - - // verify - httpAppProtocol := "http" - grpcAppProtocol := "grpc" - targetPortZero := intstr.IntOrString{Type: 0, IntVal: 0, StrVal: ""} - targetPort4317 := intstr.IntOrString{Type: 0, IntVal: 4317, StrVal: ""} - targetPort4318 := intstr.IntOrString{Type: 0, IntVal: 4318, StrVal: ""} - - expectedPorts := []corev1.ServicePort{ - {Name: "examplereceiver", Port: 12345}, - {Name: "port-12346", Port: 12346}, - {Name: "port-15268", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: 15268, TargetPort: targetPortZero}, - {Name: "jaeger-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: 14250}, - {Name: "port-6833", Protocol: "UDP", Port: 6833}, - {Name: "port-6831", Protocol: "UDP", Port: 6831}, - {Name: "otlp-2-grpc", AppProtocol: &grpcAppProtocol, Protocol: "TCP", Port: 55555}, - {Name: "otlp-grpc", AppProtocol: &grpcAppProtocol, Port: 4317, TargetPort: targetPort4317}, - {Name: "otlp-http", AppProtocol: &httpAppProtocol, Port: 4318, TargetPort: targetPort4318}, - {Name: "zipkin", AppProtocol: &httpAppProtocol, Protocol: "TCP", Port: 9411}, - } - assert.ElementsMatch(t, expectedPorts, ports) -} - -func TestNoPortsParsed(t *testing.T) { - for _, tt := range []struct { - expected error - desc string - configStr string - }{ - { - expected: errors.New("no receivers available as part of the configuration"), - desc: "empty", - configStr: "", - }, - { - expected: errors.New("receivers doesn't contain valid components"), - desc: "not a map", - configStr: "receivers: some-string", - }, - } { - t.Run(tt.desc, func(t *testing.T) { - // prepare - config, err := adapters.ConfigFromString(tt.configStr) - require.NoError(t, err) - - // test - ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config) - - // verify - assert.Nil(t, ports) - assert.Equal(t, tt.expected, err) - }) - } -} - -func TestInvalidReceivers(t *testing.T) { - for _, tt := range []struct { - desc string - configStr string - }{ - { - "receiver isn't a map", - "receivers:\n some-receiver: string\nservice:\n pipelines:\n metrics:\n receivers: [some-receiver]", - }, - { - "receiver's endpoint isn't string", - "receivers:\n some-receiver:\n endpoint: 123\nservice:\n pipelines:\n metrics:\n receivers: [some-receiver]", - }, - } { - t.Run(tt.desc, func(t *testing.T) { - // prepare - config, err := adapters.ConfigFromString(tt.configStr) - require.NoError(t, err) - - // test - ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config) - - // verify - assert.NoError(t, err) - assert.Len(t, ports, 0) - }) - } -} - -func TestParserFailed(t *testing.T) { - // prepare - mockParserCalled := false - mockParser := &mockParser{ - portsFunc: func() ([]corev1.ServicePort, error) { - mockParserCalled = true - return nil, errors.New("mocked error") - }, - } - receiver.Register("mock", func(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser { - return mockParser - }) - - config := map[interface{}]interface{}{ - "receivers": map[interface{}]interface{}{ - "mock": map[string]interface{}{}, - }, - "service": map[interface{}]interface{}{ - "pipelines": map[interface{}]interface{}{ - "metrics": map[interface{}]interface{}{ - "receivers": []interface{}{"mock"}, - }, - }, - }, - } - - // test - ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, config) - - // verify - assert.Len(t, ports, 0) - assert.NoError(t, err) - assert.True(t, mockParserCalled) -} - -type mockParser struct { - portsFunc func() ([]corev1.ServicePort, error) -} - -func (m *mockParser) Ports() ([]corev1.ServicePort, error) { - if m.portsFunc != nil { - return m.portsFunc() - } - - return nil, nil -} - -func (m *mockParser) ParserName() string { - return "__mock-adapters" -} diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index e0ed0babb7..de53314ffb 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -50,7 +50,7 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1beta1.OpenTeleme } // build container ports from service ports - ports, err := getConfigContainerPorts(logger, configYaml, otelcol.Spec.Config) + ports, err := getConfigContainerPorts(logger, otelcol.Spec.Config) if err != nil { logger.Error(err, "container ports config") } @@ -204,14 +204,9 @@ func Container(cfg config.Config, logger logr.Logger, otelcol v1beta1.OpenTeleme } } -func getConfigContainerPorts(logger logr.Logger, cfgYaml string, conf v1beta1.Config) (map[string]corev1.ContainerPort, error) { +func getConfigContainerPorts(logger logr.Logger, conf v1beta1.Config) (map[string]corev1.ContainerPort, error) { ports := map[string]corev1.ContainerPort{} - c, err := adapters.ConfigFromString(cfgYaml) - if err != nil { - logger.Error(err, "couldn't extract the configuration") - return ports, err - } - ps, err := adapters.ConfigToPorts(logger, c) + ps, err := conf.GetAllPorts(logger) if err != nil { return ports, err } diff --git a/internal/manifests/collector/ingress.go b/internal/manifests/collector/ingress.go index e646a5446d..aaddac4181 100644 --- a/internal/manifests/collector/ingress.go +++ b/internal/manifests/collector/ingress.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/internal/manifests" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -136,17 +135,7 @@ func createSubdomainIngressRules(otelcol string, hostname string, ports []corev1 } func servicePortsFromCfg(logger logr.Logger, otelcol v1beta1.OpenTelemetryCollector) ([]corev1.ServicePort, error) { - out, err := otelcol.Spec.Config.Yaml() - if err != nil { - return nil, err - } - configFromString, err := adapters.ConfigFromString(out) - if err != nil { - logger.Error(err, "couldn't extract the configuration from the context") - return nil, err - } - - ports, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeReceiver, configFromString) + ports, err := otelcol.Spec.Config.GetReceiverPorts(logger) if err != nil { logger.Error(err, "couldn't build the ingress for this instance") return nil, err diff --git a/internal/manifests/collector/podmonitor.go b/internal/manifests/collector/podmonitor.go index 761f7d307c..74e8961cf4 100644 --- a/internal/manifests/collector/podmonitor.go +++ b/internal/manifests/collector/podmonitor.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/internal/autodetect/prometheus" "github.com/open-telemetry/opentelemetry-operator/internal/manifests" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -66,18 +65,7 @@ func PodMonitor(params manifests.Params) (*monitoringv1.PodMonitor, error) { } func metricsEndpointsFromConfig(logger logr.Logger, otelcol v1beta1.OpenTelemetryCollector) []monitoringv1.PodMetricsEndpoint { - // TODO: https://github.com/open-telemetry/opentelemetry-operator/issues/2603 - cfgStr, err := otelcol.Spec.Config.Yaml() - if err != nil { - logger.V(2).Error(err, "Error while marshaling to YAML") - return []monitoringv1.PodMetricsEndpoint{} - } - config, err := adapters.ConfigFromString(cfgStr) - if err != nil { - logger.V(2).Error(err, "Error while parsing the configuration") - return []monitoringv1.PodMetricsEndpoint{} - } - exporterPorts, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeExporter, config) + exporterPorts, err := otelcol.Spec.Config.GetExporterPorts(logger) if err != nil { logger.Error(err, "couldn't build endpoints to podMonitors from configuration") return []monitoringv1.PodMetricsEndpoint{} diff --git a/internal/manifests/collector/service.go b/internal/manifests/collector/service.go index d66e4bfe99..2cf7616732 100644 --- a/internal/manifests/collector/service.go +++ b/internal/manifests/collector/service.go @@ -24,7 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/internal/manifests" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -107,18 +106,7 @@ func Service(params manifests.Params) (*corev1.Service, error) { labels := manifestutils.Labels(params.OtelCol.ObjectMeta, name, params.OtelCol.Spec.Image, ComponentOpenTelemetryCollector, []string{}) labels[serviceTypeLabel] = BaseServiceType.String() - out, err := params.OtelCol.Spec.Config.Yaml() - if err != nil { - return nil, err - } - - configFromString, err := adapters.ConfigFromString(out) - if err != nil { - params.Log.Error(err, "couldn't extract the configuration from the context") - return nil, err - } - - ports, err := adapters.ConfigToPorts(params.Log, configFromString) + ports, err := params.OtelCol.Spec.Config.GetAllPorts(params.Log) if err != nil { return nil, err } diff --git a/internal/manifests/collector/servicemonitor.go b/internal/manifests/collector/servicemonitor.go index 2c1088f44c..c3950dce3b 100644 --- a/internal/manifests/collector/servicemonitor.go +++ b/internal/manifests/collector/servicemonitor.go @@ -25,7 +25,6 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/internal/autodetect/prometheus" "github.com/open-telemetry/opentelemetry-operator/internal/manifests" - "github.com/open-telemetry/opentelemetry-operator/internal/manifests/collector/adapters" "github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils" "github.com/open-telemetry/opentelemetry-operator/internal/naming" ) @@ -105,19 +104,7 @@ func shouldCreateServiceMonitor(params manifests.Params) bool { } func endpointsFromConfig(logger logr.Logger, otelcol v1beta1.OpenTelemetryCollector) []monitoringv1.Endpoint { - // TODO: https://github.com/open-telemetry/opentelemetry-operator/issues/2603 - cfgStr, err := otelcol.Spec.Config.Yaml() - if err != nil { - logger.V(2).Error(err, "Error while marshaling to YAML") - return []monitoringv1.Endpoint{} - } - c, err := adapters.ConfigFromString(cfgStr) - if err != nil { - logger.V(2).Error(err, "Error while parsing the configuration") - return []monitoringv1.Endpoint{} - } - - exporterPorts, err := adapters.ConfigToComponentPorts(logger, adapters.ComponentTypeExporter, c) + exporterPorts, err := otelcol.Spec.Config.GetExporterPorts(logger) if err != nil { logger.Error(err, "couldn't build service monitors from configuration") return []monitoringv1.Endpoint{}