Skip to content

Commit

Permalink
Wire up for eventhandler integration in the static to flow converter (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
erikbaranowski authored Jan 4, 2024
1 parent 900a035 commit cce5b03
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 6 deletions.
7 changes: 5 additions & 2 deletions converter/internal/staticconvert/internal/build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import (
app_agent_receiver_v2 "github.com/grafana/agent/pkg/integrations/v2/app_agent_receiver"
blackbox_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/blackbox_exporter"
common_v2 "github.com/grafana/agent/pkg/integrations/v2/common"
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
eventhandler_v2 "github.com/grafana/agent/pkg/integrations/v2/eventhandler"
metricsutils_v2 "github.com/grafana/agent/pkg/integrations/v2/metricsutils"
snmp_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/snmp_exporter"
vmware_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/vmware_exporter"
"github.com/grafana/agent/pkg/integrations/windows_exporter"
Expand Down Expand Up @@ -229,13 +230,15 @@ func (b *IntegrationsConfigBuilder) appendV2Integrations() {
case *blackbox_exporter_v2.Config:
exports = b.appendBlackboxExporterV2(itg)
commonConfig = itg.Common
case *eventhandler_v2.Config:
b.appendEventHandlerV2(itg)
case *snmp_exporter_v2.Config:
exports = b.appendSnmpExporterV2(itg)
commonConfig = itg.Common
case *vmware_exporter_v2.Config:
exports = b.appendVmwareExporterV2(itg)
commonConfig = itg.Common
case *metricsutils.ConfigShim:
case *metricsutils_v2.ConfigShim:
commonConfig = itg.Common
switch v1_itg := itg.Orig.(type) {
case *azure_exporter.Config:
Expand Down
98 changes: 98 additions & 0 deletions converter/internal/staticconvert/internal/build/eventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package build

import (
"fmt"

"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/relabel"
"github.com/grafana/agent/component/loki/source/kubernetes_events"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
eventhandler_v2 "github.com/grafana/agent/pkg/integrations/v2/eventhandler"
"github.com/grafana/river/scanner"
)

func (b *IntegrationsConfigBuilder) appendEventHandlerV2(config *eventhandler_v2.Config) {
compLabel, err := scanner.SanitizeIdentifier(b.formatJobName(config.Name(), nil))
if err != nil {
b.diags.Add(diag.SeverityLevelCritical, fmt.Sprintf("failed to sanitize job name: %s", err))
}

b.diags.AddAll(common.ValidateSupported(common.NotDeepEquals, config.SendTimeout, eventhandler_v2.DefaultConfig.SendTimeout, "eventhandler send_timeout", "this field is not configurable in flow mode"))
b.diags.AddAll(common.ValidateSupported(common.NotDeepEquals, config.CachePath, eventhandler_v2.DefaultConfig.CachePath, "eventhandler cache_path", "this field is not configurable in flow mode"))
b.diags.AddAll(common.ValidateSupported(common.NotDeepEquals, config.InformerResync, eventhandler_v2.DefaultConfig.InformerResync, "eventhandler informer_resync", "this field is not configurable in flow mode"))
b.diags.AddAll(common.ValidateSupported(common.NotDeepEquals, config.FlushInterval, eventhandler_v2.DefaultConfig.FlushInterval, "eventhandler flush_interval", "this field is not configurable in flow mode"))

receiver := getLogsReceiver(config)
if len(config.ExtraLabels) > 0 {
receiver = b.injectExtraLabels(config, receiver, compLabel)
}

args := toEventHandlerV2(config, receiver)

b.f.Body().AppendBlock(common.NewBlockWithOverride(
[]string{"loki", "source", "kubernetes_events"},
compLabel,
args,
))
}

func (b *IntegrationsConfigBuilder) injectExtraLabels(config *eventhandler_v2.Config, receiver common.ConvertLogsReceiver, compLabel string) common.ConvertLogsReceiver {
var relabelConfigs []*flow_relabel.Config
for _, extraLabel := range config.ExtraLabels {
defaultConfig := flow_relabel.DefaultRelabelConfig
relabelConfig := &defaultConfig
relabelConfig.SourceLabels = []string{"__address__"}
relabelConfig.TargetLabel = extraLabel.Name
relabelConfig.Replacement = extraLabel.Value

relabelConfigs = append(relabelConfigs, relabelConfig)
}

relabelArgs := relabel.Arguments{
ForwardTo: []loki.LogsReceiver{receiver},
RelabelConfigs: relabelConfigs,
MaxCacheSize: relabel.DefaultArguments.MaxCacheSize,
}

b.f.Body().AppendBlock(common.NewBlockWithOverride(
[]string{"loki", "relabel"},
compLabel,
relabelArgs,
))

return common.ConvertLogsReceiver{
Expr: fmt.Sprintf("loki.relabel.%s.receiver", compLabel),
}
}

func getLogsReceiver(config *eventhandler_v2.Config) common.ConvertLogsReceiver {
logsReceiver := common.ConvertLogsReceiver{}
if config.LogsInstance != "" {
compLabel, err := scanner.SanitizeIdentifier("logs_" + config.LogsInstance)
if err != nil {
panic(fmt.Errorf("failed to sanitize job name: %s", err))
}

logsReceiver.Expr = fmt.Sprintf("loki.write.%s.receiver", compLabel)
}

return logsReceiver
}

func toEventHandlerV2(config *eventhandler_v2.Config, receiver common.ConvertLogsReceiver) *kubernetes_events.Arguments {
defaultOverrides := kubernetes_events.DefaultArguments
defaultOverrides.Client.KubeConfig = config.KubeconfigPath
if config.Namespace != "" {
defaultOverrides.Namespaces = []string{config.Namespace}
}

return &kubernetes_events.Arguments{
ForwardTo: []loki.LogsReceiver{receiver},
JobName: kubernetes_events.DefaultArguments.JobName,
Namespaces: defaultOverrides.Namespaces,
LogFormat: config.LogFormat,
Client: defaultOverrides.Client,
}
}
20 changes: 20 additions & 0 deletions converter/internal/staticconvert/testdata-v2/integrations_v2.river
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ logging {
format = "json"
}

loki.relabel "integrations_eventhandler" {
forward_to = [loki.write.logs_log_config.receiver]

rule {
source_labels = ["__address__"]
target_label = "test_label"
replacement = "test_label_value"
}

rule {
source_labels = ["__address__"]
target_label = "test_label_2"
replacement = "test_label_value_2"
}
}

loki.source.kubernetes_events "integrations_eventhandler" {
forward_to = [loki.relabel.integrations_eventhandler.receiver]
}

prometheus.exporter.azure "integrations_azure1" {
subscriptions = ["subId"]
resource_type = "Microsoft.Dashboard/grafana"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ integrations:
elasticsearch_configs:
- autoscrape:
metrics_instance: "default"
eventhandler:
cache_path: "./.eventcache/eventhandler.cache"
logs_instance: "log_config"
extra_labels:
test_label: test_label_value
test_label_2: test_label_value_2
gcp_configs:
- project_ids:
- <project_id>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
(Error) The converter does not support converting the provided eventhandler send_timeout config: this field is not configurable in flow mode
(Error) The converter does not support converting the provided eventhandler cache_path config: this field is not configurable in flow mode
(Error) The converter does not support converting the provided eventhandler informer_resync config: this field is not configurable in flow mode
(Error) The converter does not support converting the provided eventhandler flush_interval config: this field is not configurable in flow mode
(Warning) Please review your agent command line flags and ensure they are set in your Flow mode config file where necessary.
(Error) The converter does not support converting the provided eventhandler integration.
(Error) The converter does not support converting the provided app_agent_receiver traces_instance config.
11 changes: 11 additions & 0 deletions converter/internal/staticconvert/testdata-v2/unsupported.river
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ prometheus.remote_write "metrics_default" {
}
}

loki.write "logs_log_config" {
endpoint {
url = "http://localhost/loki/api/v1/push"
}
external_labels = {}
}

loki.source.kubernetes_events "integrations_eventhandler" {
forward_to = [loki.write.logs_log_config.receiver]
}

faro.receiver "integrations_app_agent_receiver" {
extra_log_labels = {}

Expand Down
13 changes: 12 additions & 1 deletion converter/internal/staticconvert/testdata-v2/unsupported.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ metrics:
configs:
- name: default

logs:
positions_directory: /path
configs:
- name: log_config
clients:
- url: http://localhost/loki/api/v1/push

integrations:
app_agent_receiver_configs:
- instance: "default"
Expand All @@ -14,4 +21,8 @@ integrations:
host: "localhost"
port: 55678
eventhandler:
cache_path: "/etc/eventhandler/eventhandler.cache"
cache_path: "/etc/eventhandler/not_default.cache"
logs_instance: "log_config"
send_timeout: 30
informer_resync: 30
flush_interval: 30
6 changes: 4 additions & 2 deletions converter/internal/staticconvert/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
apache_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/apache_http"
app_agent_receiver_v2 "github.com/grafana/agent/pkg/integrations/v2/app_agent_receiver"
blackbox_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/blackbox_exporter"
"github.com/grafana/agent/pkg/integrations/v2/metricsutils"
eventhandler_v2 "github.com/grafana/agent/pkg/integrations/v2/eventhandler"
metricsutils_v2 "github.com/grafana/agent/pkg/integrations/v2/metricsutils"
snmp_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/snmp_exporter"
vmware_exporter_v2 "github.com/grafana/agent/pkg/integrations/v2/vmware_exporter"
"github.com/grafana/agent/pkg/integrations/windows_exporter"
Expand Down Expand Up @@ -171,9 +172,10 @@ func validateIntegrationsV2(integrationsConfig *v2.SubsystemOptions) diag.Diagno
case *app_agent_receiver_v2.Config:
diags.AddAll(common.ValidateSupported(common.NotEquals, itg.TracesInstance, "", "app_agent_receiver traces_instance", ""))
case *blackbox_exporter_v2.Config:
case *eventhandler_v2.Config:
case *snmp_exporter_v2.Config:
case *vmware_exporter_v2.Config:
case *metricsutils.ConfigShim:
case *metricsutils_v2.ConfigShim:
switch v1_itg := itg.Orig.(type) {
case *azure_exporter.Config:
case *cadvisor.Config:
Expand Down

0 comments on commit cce5b03

Please sign in to comment.