Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flattened data_stream.* fields #3465

Merged
merged 11 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ steps:
key: "serverless-integration-tests"
env:
TEST_INTEG_AUTH_ESS_REGION: us-east-1
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
artifact_paths:
- "build/TEST-**"
- "build/diagnostics/*"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Support flattened data_stream.* fields in input configuration

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: >-
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3465

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3191
71 changes: 71 additions & 0 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "test-namespace"
expectedType := "test-type"
expectedDataset := "test-dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}
Comment on lines +2396 to +2406
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be more compact here

Suggested change
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}
if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}
if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}
require.NoErrorf(t, err, "cannot convert policy to component: %s", err)
require.Len(t, result, 1, "unexpected result length")
require.Len(t, result[0].Units, 2, "unexpected number of units for result[0]")


// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
Comment on lines +2408 to +2433
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be substituted with an assert.ElementsMatch() if we are able to predictably build DataStream objects (since we are building them from protobuf, that's not always a given)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf types do not play well with testify, as far as I know they don't play well with any comparison package that is not built specifically for protobuf.

I rather keep the code like that and not have to worry about protobuf internal fields.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should probably be a reason not to propagate protobuf objects across layers if they make writing tests more tedious than it needs to be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that, but it's not too bad either, github.com/protocolbuffers/protobuf-go and github.com/google/go-cmp comparing and getting a diff is not too bad. E.g:

diff := cmp.Diff(expected.Config, actual.Config, protocmp.Transform())

}
80 changes: 80 additions & 0 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -100,9 +101,88 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := updateDataStreamsFromSource(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) {
if ds == nil {
ds = &proto.DataStream{}
}

cfg, err := config.NewConfigFrom(source.AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

// Create a temporary struct to unpack the configuration.
// Unpack correctly handles any flattened fields like
// data_stream.type. So all we need to do is to call Unpack,
// ensure the DataStream does not have a different value,
// them merge them both.
tmp := struct {
belimawr marked this conversation as resolved.
Show resolved Hide resolved
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") {
return nil, errors.New("duplicated key 'datastream.type'")
}

if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset),
Type: valueOrDefault(tmp.DataStream.Type, ds.Type),
Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace),
Source: ds.GetSource(),
}

return ret, nil
}

// valueOrDefault returns b if a is an empty string
func valueOrDefault(a, b string) string {
if a == "" {
return b
}
return a
}

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
}
}

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down
9 changes: 8 additions & 1 deletion pkg/component/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) {
assert.Equal(t, err.Error(), scenario.Err.Error())
} else {
require.NoError(t, err)
assert.EqualValues(t, scenario.Expected, observed)
// protocmp.Transform ensures we do not compare any internal
// protobuf fields
if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) {
t.Errorf("mismatch (-want +got) \n%s",
cmp.Diff(scenario.Expected, observed, protocmp.Transform()))
}
}
})
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,14 @@ func (f *Fixture) RunBeat(ctx context.Context) error {
// Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started
// with `WithAllowErrors()` then `Run` will exit early and return the logged error.
//
// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached.
// If no `states` are provided then the Elastic Agent runs until the context is cancelled.
//
// The Elastic-Agent is started agent in test mode (--testing-mode) this mode
// expects the initial configuration (full YAML config) via gRPC.
// This configuration should be passed in the State.Configure field.
//
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
if f.binaryName != "elastic-agent" {
return errors.New("Run() can only be used with elastic-agent, use RunBeat()")
Expand Down
64 changes: 59 additions & 5 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor

}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatastreamWithContext(context.Background(), client, index)
// GetLogsForDataset returns any logs associated with the datastream
func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatasetWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
Expand Down Expand Up @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents,
return handleDocsResponse(res)
}

// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
// GetLogsForDatasetWithContext returns any logs associated with the datastream
func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
Expand Down Expand Up @@ -536,6 +536,60 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
return handleDocsResponse(res)
}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(
ctx context.Context,
client elastictransport.Interface,
dsType, dataset, namespace string) (Documents, error) {

query := map[string]any{
"_source": []string{"message"},
"query": map[string]any{
"bool": map[string]any{
"must": []any{
map[string]any{
"match": map[string]any{
"data_stream.dataset": dataset,
},
},
map[string]any{
"match": map[string]any{
"data_stream.namespace": namespace,
},
},
map[string]any{
"match": map[string]any{
"data_stream.type": dsType,
},
},
},
},
},
}

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// handleDocsResponse converts the esapi.Response into Documents,
// it closes the response.Body after reading
func handleDocsResponse(res *esapi.Response) (Documents, error) {
resultBuf, err := handleResponseRaw(res)
if err != nil {
Expand Down
Loading
Loading