From 3f81294d6f3888870bec9e591329b4e299a0878a Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 19 Sep 2023 15:35:05 +0200 Subject: [PATCH] Support flattened data_stream.* fields An input configuration supports flattened fields, however the 'data_stream' field was not being correctly decoded when flattened. This commit fixes this issue. --- ...upport-flattened-data_stream.*-fields.yaml | 35 ++ pkg/component/component_test.go | 71 ++++ pkg/component/config.go | 42 ++ testing/integration/datastreams_test.go | 387 ++++++++++++++++++ 4 files changed, 535 insertions(+) create mode 100644 changelog/fragments/1695389490-Support-flattened-data_stream.*-fields.yaml create mode 100644 testing/integration/datastreams_test.go diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.*-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.*-fields.yaml new file mode 100644 index 00000000000..2e04a62793f --- /dev/null +++ b/changelog/fragments/1695389490-Support-flattened-data_stream.*-fields.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Support flattened data_stream.* fields + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: >- + An input configuration supports flattened fields, however the + 'data_stream' field was not being correctly decoded when + flattened. This commit fixes this issue. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3465 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 424f3a93147..00c4d1c63cb 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } + +func TestFlattenedDataStream(t *testing.T) { + expectedNamespace := "test-namespace" + expectedType := "test-type" + expectedDataset := "test-dataset" + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "data_stream.type": expectedType, + "data_stream.dataset": expectedDataset, + "data_stream": map[string]any{ + "namespace": expectedNamespace, + }, + }, + }, + } + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 1 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + if len(result[0].Units) != 2 { + t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range result[0].Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + if dataStream.Dataset != expectedDataset { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) + } + if dataStream.Type != expectedType { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) + } + if dataStream.Namespace != expectedNamespace { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) + } +} diff --git a/pkg/component/config.go b/pkg/component/config.go index a0c75d00e32..fb59a21b48a 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -100,9 +101,50 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } + if err := dedotDataStream(result); err != nil { + return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) + } + return result, nil } +func dedotDataStream(unitConfig *proto.UnitExpectedConfig) error { + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + cfg, err := config.NewConfigFrom(unitConfig.GetSource().AsMap()) + if err != nil { + return fmt.Errorf("cannot create new config from Source: %w", err) + } + + if err := cfg.Unpack(&tmp); err != nil { + return fmt.Errorf("cannot unpack config: %w", err) + } + + // merge returns b if a is an empty string + merge := func(a, b string) string { + if a == "" { + return b + } + return a + } + + if unitConfig.DataStream == nil { + unitConfig.DataStream = &proto.DataStream{} + } + + unitConfig.DataStream.Dataset = merge(unitConfig.DataStream.Dataset, tmp.DataStream.Dataset) + unitConfig.DataStream.Namespace = merge(unitConfig.DataStream.Namespace, tmp.DataStream.Namespace) + unitConfig.DataStream.Type = merge(unitConfig.DataStream.Type, tmp.DataStream.Type) + + return nil +} + func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/testing/integration/datastreams_test.go b/testing/integration/datastreams_test.go new file mode 100644 index 00000000000..fd802c71825 --- /dev/null +++ b/testing/integration/datastreams_test.go @@ -0,0 +1,387 @@ +//go:build integration + +package integration + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "testing" + "text/template" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/stretchr/testify/require" +) + +func TestFoo(t *testing.T) { + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath) + + fmt.Println("================================================== ELASTICSEARCH_HOST: ", os.Getenv("ELASTICSEARCH_HOST")) + fmt.Println("================================================== ELASTICSEARCH_USERNAME: ", os.Getenv("ELASTICSEARCH_USERNAME")) + fmt.Println("================================================== ELASTICSEARCH_PASSWORD: ", os.Getenv("ELASTICSEARCH_PASSWORD")) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + createPolicyReq := kibana.AgentPolicy{ + Name: "test-" + t.Name() + "--" + time.Now().Format(time.RFC3339Nano), + Namespace: info.Namespace, + Description: "Test policy for " + t.Name(), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + IsProtected: false, + } + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + policy, err := tools.InstallAgentWithPolicy(t, ctx, installOpts, agentFixture, info.KibanaClient, createPolicyReq) + require.NoError(t, err, "Could not install Elastic-Agent with Policy") + + // t.Cleanup(func() { + // t.Log("Un-enrolling Elastic Agent...") + // assert.NoError(t, tools.UnEnrollAgent(info.KibanaClient, policy.ID)) + // }) + + // Make sure the templated value is actually valid JSON before making the API request. + // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. + tmpl, err := template.New("foo").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + pkgPolicyBuf := bytes.Buffer{} + tmpl.Execute(&pkgPolicyBuf, plolicyVars{ + ID: "foo", + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + }) + + t.Log("POST /api/fleet/package_policies") + resp, err := info.KibanaClient.Connection.Send(http.MethodPost, "/api/fleet/package_policies", nil, nil, &pkgPolicyBuf) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump response: %s", err) + } + fmt.Println("==================================") + fmt.Println(string(respDump)) + fmt.Println("==================================") + t.FailNow() + } + + agentClient := agentFixture.Client() + if err := agentClient.Connect(context.TODO()); err != nil { + t.Fatalf("could not connect to Elastic-Agent: %s", err) + } + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AgentClient==nil?", agentClient == nil) + if agentClient != nil { + state, err := agentClient.State(context.TODO()) + if err != nil { + t.Fatalf("failed to get Agent state: %s", err) + } + fmt.Println("==================================================") + fmt.Println("State: ", state.State.String()) + for _, c := range state.Components { + fmt.Println(c.ID, c.State.String(), c.Message) + } + } + + for i := 0; i < 60; i++ { + fmt.Println("================================================== ", i) + state, err := agentClient.State(context.TODO()) + if err != nil { + t.Fatalf("failed to get Agent state: %s", err) + } + fmt.Println("==================================================") + fmt.Println("State: ", state.State.String()) + for _, c := range state.Components { + fmt.Println(c.ID, c.State.String(), c.Message) + } + + time.Sleep(time.Second) + } + // t.Fail() +} + +type plolicyVars struct { + ID string + Name string + PolicyID string + LogFilePath string + ESHost string + ESPassword string + ESUsername string +} + +// generateLogFile generates a log file by appending the current +// time to it every second. +func generateLogFile(t *testing.T, fullPath string) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(time.Second) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + for { + select { + case <-done: + return + case now := <-ticker.C: + _, err := fmt.Fprintln(f, t.Name(), t.TempDir(), now.Format(time.RFC3339)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, so just log it instead + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + } + } + }() +} + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.2.0" + }, + "name": "{{.Name}}", + "namespace": "default", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath}}" + ], + "data_stream.dataset": "generic", + "tags": [], + "custom": "" + } + } + } + } + } +}` + +// { +// "id": "{{.ID}}", +// "name": "{{.Name}}", +// "policy_id": "{{.PolicyID}}", +// "package": { +// "name": "log", +// "version": "2.2.0" +// }, +// "vars": {}, +// "description": "", +// "namespace": "default", +// "force": true, +// "inputs": [ +// { +// "enabled": true, +// "type": "filestream", +// "streams": [ +// { +// "enabled": true, +// "data_stream": { +// "dataset": "generic", +// "type": "logs" +// }, +// "vars": { +// "paths": "/tmp/foo.log", +// "tags": [] +// } +// } +// ] +// } +// ] +// } +// var policyJSON = ` +// { +// "id": "{{.ID}}", +// "name": "{{.Name}}", +// "namespace": "default", +// "package": { +// "name": "endpoint", +// "version": "{{.Version}}" +// }, +// "policy_id": "{{.PolicyID}}", +// "vars": {}, +// "inputs": [ +// { +// "enabled": true, +// "id": "filestream-agent-input-id", +// "name": "filestream", +// "type": "filestream", +// "streams": [ +// { +// "id": "filestream-test-id", +// "paths": [ +// "/tmp/log.log" +// ] +// } +// ] +// } +// ] +// } +// ` + +var standalonePolicy = ` +outputs: + default: + type: elasticsearch + hosts: + - "{{.ESHost}}:443" + username: "{{.ESUsername}}" + password: "{{.ESPassword}}" + +inputs: + - type: filestream + id: elastic-agent-input-id + streams: + - id: filestream-input-id-1 + data_stream: + dataset: generic + data_stream.namespace: "flattened-namespace" + data_stream.type: "logs" + paths: + - {{.LogFilePath}} + +agent.monitoring: + enabled: true + logs: true + metrics: true +` + +func TestBar(t *testing.T) { + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath) + + // fmt.Println("================================================== ELASTICSEARCH_HOST: ", os.Getenv("ELASTICSEARCH_HOST")) + // fmt.Println("================================================== ELASTICSEARCH_USERNAME: ", os.Getenv("ELASTICSEARCH_USERNAME")) + // fmt.Println("================================================== ELASTICSEARCH_PASSWORD: ", os.Getenv("ELASTICSEARCH_PASSWORD")) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + fmt.Println(info) + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + installationLogs, err := tools.InstallStandaloneAgent(agentFixture) + if err != nil { + fmt.Println("==================================================") + fmt.Println(string(installationLogs)) + fmt.Println("==================================================") + t.Fatalf("could not install Standalone Elastic-Agent: %s", err) + } + + // Make sure the templated value is actually valid JSON before making the API request. + // Using json.Unmarshal will give us the actual syntax error, calling json.Valid() would not. + tmpl, err := template.New("standalone-policy").Parse(standalonePolicy) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The environment variables are set by the test runner. + // If you're manually running the tests (go test) then you + // will have to manually set them + pkgPolicyBuf := bytes.Buffer{} + tmpl.Execute(&pkgPolicyBuf, plolicyVars{ + LogFilePath: logFilePath, + ESHost: os.Getenv("ELASTICSEARCH_HOST"), + ESUsername: os.Getenv("ELASTICSEARCH_USERNAME"), + ESPassword: os.Getenv("ELASTICSEARCH_PASSWORD"), + }) + + if err := agentFixture.Configure(context.TODO(), pkgPolicyBuf.Bytes()); err != nil { + t.Fatalf("could not configure standalone Elastic-Agent: %s", err) + } + + agentClient := agentFixture.Client() + if err := agentClient.Connect(context.TODO()); err != nil { + t.Fatalf("could not connect to Elastic-Agent: %s", err) + } + + if agentClient != nil { + state, err := agentClient.State(context.TODO()) + if err != nil { + t.Fatalf("failed to get Agent state: %s", err) + } + fmt.Println("==================================================") + fmt.Println("State: ", state.State.String()) + for _, c := range state.Components { + fmt.Println(c.ID, c.State.String(), c.Message) + } + } + + for i := 0; i < 60; i++ { + fmt.Println("================================================== ", i) + state, err := agentClient.State(context.TODO()) + if err != nil { + t.Fatalf("failed to get Agent state: %s", err) + } + fmt.Println("==================================================") + fmt.Println("State: ", state.State.String()) + for _, c := range state.Components { + fmt.Println(c.ID, c.State.String(), c.Message) + } + + time.Sleep(time.Second) + } + t.Fatal("let's keep the agent installed") +}