Skip to content

Commit

Permalink
Expose the monitoring port depending on the configuration (#1950)
Browse files Browse the repository at this point in the history
* Create the monitoring service using the port specified by the user

Signed-off-by: Israel Blancas <[email protected]>

* Add missing changelog

Signed-off-by: Israel Blancas <[email protected]>

---------

Signed-off-by: Israel Blancas <[email protected]>
  • Loading branch information
iblancasa committed Jul 24, 2023
1 parent 55d4ed9 commit 9e7fe16
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 40 deletions.
16 changes: 16 additions & 0 deletions .chloggen/1931-allow-customize-monitoring-port.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: operator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: When an user specifies the monitoring port for their collector in the configuration, the monitoring service uses that port.

# One or more tracking issues related to the change
issues: [1931]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
37 changes: 37 additions & 0 deletions pkg/collector/adapters/config_to_ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package adapters

import (
"errors"
"net"
"sort"
"strconv"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/pkg/collector/parser"
Expand Down Expand Up @@ -97,3 +100,37 @@ func ConfigToReceiverPorts(logger logr.Logger, config map[interface{}]interface{

return ports, nil
}

// ConfigToMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set.
func ConfigToMetricsPort(logger logr.Logger, config map[interface{}]interface{}) (int32, error) {
// we don't need to unmarshal the whole config, just follow the keys down to
// the metrics address.
type metricsCfg struct {
Address string
}
type telemetryCfg struct {
Metrics metricsCfg
}
type serviceCfg struct {
Telemetry telemetryCfg
}
type cfg struct {
Service serviceCfg
}
var cOut cfg
err := mapstructure.Decode(config, &cOut)
if err != nil {
return 0, err
}

_, port, err := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address)
if err != nil {
return 0, err
}
i64, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return 0, err
}

return int32(i64), nil
}
67 changes: 67 additions & 0 deletions pkg/collector/adapters/config_to_ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,73 @@ func TestParserFailed(t *testing.T) {
assert.True(t, mockParserCalled)
}

func TestConfigToMetricsPort(t *testing.T) {
t.Run("custom port specified", func(t *testing.T) {
config := map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
"address": "0.0.0.0:9090",
},
},
},
}

port, err := adapters.ConfigToMetricsPort(logger, config)
assert.NoError(t, err)
assert.Equal(t, int32(9090), port)
})

for _, tt := range []struct {
desc string
config map[interface{}]interface{}
}{
{
"bad address",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
"address": "0.0.0.0",
},
},
},
},
},
{
"missing address",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{
"metrics": map[interface{}]interface{}{
"level": "detailed",
},
},
},
},
},
{
"missing metrics",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{
"telemetry": map[interface{}]interface{}{},
},
},
},
{
"missing telemetry",
map[interface{}]interface{}{
"service": map[interface{}]interface{}{},
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
_, err := adapters.ConfigToMetricsPort(logger, tt.config)
assert.Error(t, err)
})
}
}

type mockParser struct {
portsFunc func() ([]corev1.ServicePort, error)
}
Expand Down
39 changes: 1 addition & 38 deletions pkg/collector/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package collector

import (
"fmt"
"net"
"sort"
"strconv"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation"

Expand Down Expand Up @@ -176,7 +173,7 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C
}
}

metricsPort, err := getMetricsPort(c)
metricsPort, err := adapters.ConfigToMetricsPort(logger, c)
if err != nil {
logger.Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err)
metricsPort = 8888
Expand All @@ -189,40 +186,6 @@ func getConfigContainerPorts(logger logr.Logger, cfg string) map[string]corev1.C
return ports
}

// getMetricsPort gets the port number for the metrics endpoint from the collector config if it has been set.
func getMetricsPort(c map[interface{}]interface{}) (int32, error) {
// we don't need to unmarshal the whole config, just follow the keys down to
// the metrics address.
type metricsCfg struct {
Address string
}
type telemetryCfg struct {
Metrics metricsCfg
}
type serviceCfg struct {
Telemetry telemetryCfg
}
type cfg struct {
Service serviceCfg
}
var cOut cfg
err := mapstructure.Decode(c, &cOut)
if err != nil {
return 0, err
}

_, port, err := net.SplitHostPort(cOut.Service.Telemetry.Metrics.Address)
if err != nil {
return 0, err
}
i64, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return 0, err
}

return int32(i64), nil
}

func portMapToList(portMap map[string]corev1.ContainerPort) []corev1.ContainerPort {
ports := make([]corev1.ContainerPort, 0, len(portMap))
for _, p := range portMap {
Expand Down
14 changes: 13 additions & 1 deletion pkg/collector/reconcile/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ func monitoringService(ctx context.Context, params Params) *corev1.Service {
name := naming.MonitoringService(params.Instance)
labels := collector.Labels(params.Instance, name, []string{})

c, err := adapters.ConfigFromString(params.Instance.Spec.Config)
if err != nil {
params.Log.Error(err, "couldn't extract the configuration")
return nil
}

metricsPort, err := adapters.ConfigToMetricsPort(params.Log, c)
if err != nil {
params.Log.V(2).Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err)
metricsPort = 8888
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -196,7 +208,7 @@ func monitoringService(ctx context.Context, params Params) *corev1.Service {
ClusterIP: "",
Ports: []corev1.ServicePort{{
Name: "monitoring",
Port: 8888,
Port: metricsPort,
}},
},
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/collector/reconcile/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,29 @@ func TestHeadlessService(t *testing.T) {
}

func TestMonitoringService(t *testing.T) {
t.Run("returned service should expose monitoring port", func(t *testing.T) {
t.Run("returned service should expose monitoring port in the default port", func(t *testing.T) {
expected := []v1.ServicePort{{
Name: "monitoring",
Port: 8888,
}}
actual := monitoringService(context.Background(), params())
assert.Equal(t, expected, actual.Spec.Ports)
})

t.Run("returned the service in a custom port", func(t *testing.T) {
expected := []v1.ServicePort{{
Name: "monitoring",
Port: 9090,
}}
params := params()
params.Instance.Spec.Config = `service:
telemetry:
metrics:
level: detailed
address: 0.0.0.0:9090`
actual := monitoringService(context.Background(), params)
assert.NotNil(t, actual)
assert.Equal(t, expected, actual.Spec.Ports)
})
}

Expand Down

0 comments on commit 9e7fe16

Please sign in to comment.