diff --git a/.chloggen/1931-allow-customize-monitoring-port.yaml b/.chloggen/1931-allow-customize-monitoring-port.yaml new file mode 100755 index 0000000000..3f214cf716 --- /dev/null +++ b/.chloggen/1931-allow-customize-monitoring-port.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action) +component: operator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When an user specifies the monitoring port for their collector in the configuration, the monitoring service uses that port. + +# One or more tracking issues related to the change +issues: [1931] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/collector/adapters/config_to_ports.go b/pkg/collector/adapters/config_to_ports.go index c897bec832..55a165a14a 100644 --- a/pkg/collector/adapters/config_to_ports.go +++ b/pkg/collector/adapters/config_to_ports.go @@ -16,9 +16,12 @@ package adapters import ( "errors" + "net" "sort" + "strconv" "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" "github.com/open-telemetry/opentelemetry-operator/pkg/collector/parser" @@ -97,3 +100,37 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{ return ports, nil } + +// ConfigToMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set. +func ConfigToMetricsPort(logger logr.Logger, config map[interface{}]interface{}) (int32, error) { + // we don't need to unmarshal the whole config, just follow the keys down to + // the metrics address. + type metricsCfg struct { + Address string + } + type telemetryCfg struct { + Metrics metricsCfg + } + type serviceCfg struct { + Telemetry telemetryCfg + } + type cfg struct { + Service serviceCfg + } + var cOut cfg + err := mapstructure.Decode(config, &cOut) + if err != nil { + return 0, err + } + + _, port, err := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address) + if err != nil { + return 0, err + } + i64, err := strconv.ParseInt(port, 10, 32) + if err != nil { + return 0, err + } + + return int32(i64), nil +} diff --git a/pkg/collector/adapters/config_to_ports_test.go b/pkg/collector/adapters/config_to_ports_test.go index fbc6989552..09ead0cd86 100644 --- a/pkg/collector/adapters/config_to_ports_test.go +++ b/pkg/collector/adapters/config_to_ports_test.go @@ -206,6 +206,73 @@ func TestParserFailed(t *testing.T) { assert.True(t, mockParserCalled) } +func TestConfigToMetricsPort(t *testing.T) { + t.Run("custom port specified", func(t *testing.T) { + config := map[interface{}]interface{}{ + "service": map[interface{}]interface{}{ + "telemetry": map[interface{}]interface{}{ + "metrics": map[interface{}]interface{}{ + "address": "0.0.0.0:9090", + }, + }, + }, + } + + port, err := adapters.ConfigToMetricsPort(logger, config) + assert.NoError(t, err) + assert.Equal(t, int32(9090), port) + }) + + for _, tt := range []struct { + desc string + config map[interface{}]interface{} + }{ + { + "bad address", + map[interface{}]interface{}{ + "service": map[interface{}]interface{}{ + "telemetry": map[interface{}]interface{}{ + "metrics": map[interface{}]interface{}{ + "address": "0.0.0.0", + }, + }, + }, + }, + }, + { + "missing address", + map[interface{}]interface{}{ + "service": map[interface{}]interface{}{ + "telemetry": map[interface{}]interface{}{ + "metrics": map[interface{}]interface{}{ + "level": "detailed", + }, + }, + }, + }, + }, + { + "missing metrics", + map[interface{}]interface{}{ + "service": map[interface{}]interface{}{ + "telemetry": map[interface{}]interface{}{}, + }, + }, + }, + { + "missing telemetry", + map[interface{}]interface{}{ + "service": map[interface{}]interface{}{}, + }, + }, + } { + t.Run(tt.desc, func(t *testing.T) { + _, err := adapters.ConfigToMetricsPort(logger, tt.config) + assert.Error(t, err) + }) + } +} + type mockParser struct { portsFunc func() ([]corev1.ServicePort, error) } diff --git a/pkg/collector/container.go b/pkg/collector/container.go index 1562022093..716913b0b0 100644 --- a/pkg/collector/container.go +++ b/pkg/collector/container.go @@ -16,12 +16,9 @@ package collector import ( "fmt" - "net" "sort" - "strconv" "github.com/go-logr/logr" - "github.com/mitchellh/mapstructure" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/validation" @@ -176,7 +173,7 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C } } - metricsPort, err := getMetricsPort(c) + metricsPort, err := adapters.ConfigToMetricsPort(logger, c) if err != nil { logger.Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err) metricsPort = 8888 @@ -189,40 +186,6 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C return ports } -// getMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set. -func getMetricsPort(c map[interface{}]interface{}) (int32, error) { - // we don't need to unmarshal the whole config, just follow the keys down to - // the metrics address. - type metricsCfg struct { - Address string - } - type telemetryCfg struct { - Metrics metricsCfg - } - type serviceCfg struct { - Telemetry telemetryCfg - } - type cfg struct { - Service serviceCfg - } - var cOut cfg - err := mapstructure.Decode(c, &cOut) - if err != nil { - return 0, err - } - - _, port, err := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address) - if err != nil { - return 0, err - } - i64, err := strconv.ParseInt(port, 10, 32) - if err != nil { - return 0, err - } - - return int32(i64), nil -} - func portMapToList(portMap map[string]corev1.ContainerPort) []corev1.ContainerPort { ports := make([]corev1.ContainerPort, 0, len(portMap)) for _, p := range portMap { diff --git a/pkg/collector/reconcile/service.go b/pkg/collector/reconcile/service.go index ac471993ec..715f892d56 100644 --- a/pkg/collector/reconcile/service.go +++ b/pkg/collector/reconcile/service.go @@ -184,6 +184,18 @@ func monitoringService(ctx context.Context, params Params) *corev1.Service { name := naming.MonitoringService(params.Instance) labels := collector.Labels(params.Instance, name, []string{}) + c, err := adapters.ConfigFromString(params.Instance.Spec.Config) + if err != nil { + params.Log.Error(err, "couldn't extract the configuration") + return nil + } + + metricsPort, err := adapters.ConfigToMetricsPort(params.Log, c) + if err != nil { + params.Log.V(2).Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err) + metricsPort = 8888 + } + return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -196,7 +208,7 @@ func monitoringService(ctx context.Context, params Params) *corev1.Service { ClusterIP: "", Ports: []corev1.ServicePort{{ Name: "monitoring", - Port: 8888, + Port: metricsPort, }}, }, } diff --git a/pkg/collector/reconcile/service_test.go b/pkg/collector/reconcile/service_test.go index 17cac54159..18753982b9 100644 --- a/pkg/collector/reconcile/service_test.go +++ b/pkg/collector/reconcile/service_test.go @@ -235,14 +235,29 @@ func TestHeadlessService(t *testing.T) { } func TestMonitoringService(t *testing.T) { - t.Run("returned service should expose monitoring port", func(t *testing.T) { + t.Run("returned service should expose monitoring port in the default port", func(t *testing.T) { expected := []v1.ServicePort{{ Name: "monitoring", Port: 8888, }} actual := monitoringService(context.Background(), params()) assert.Equal(t, expected, actual.Spec.Ports) + }) + t.Run("returned the service in a custom port", func(t *testing.T) { + expected := []v1.ServicePort{{ + Name: "monitoring", + Port: 9090, + }} + params := params() + params.Instance.Spec.Config = `service: + telemetry: + metrics: + level: detailed + address: 0.0.0.0:9090` + actual := monitoringService(context.Background(), params) + assert.NotNil(t, actual) + assert.Equal(t, expected, actual.Spec.Ports) }) }