Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Support flattened data_stream.* fields" #3743

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ steps:
key: "serverless-integration-tests"
env:
TEST_INTEG_AUTH_ESS_REGION: us-east-1
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
artifact_paths:
- "build/TEST-**"
- "build/diagnostics/*"
Expand Down

This file was deleted.

71 changes: 0 additions & 71 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,74 +2361,3 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "test-namespace"
expectedType := "test-type"
expectedDataset := "test-dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}

// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
}
80 changes: 0 additions & 80 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -101,88 +100,9 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := updateDataStreamsFromSource(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) {
if ds == nil {
ds = &proto.DataStream{}
}

cfg, err := config.NewConfigFrom(source.AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

// Create a temporary struct to unpack the configuration.
// Unpack correctly handles any flattened fields like
// data_stream.type. So all we need to do is to call Unpack,
// ensure the DataStream does not have a different value,
// them merge them both.
tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") {
return nil, errors.New("duplicated key 'datastream.type'")
}

if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset),
Type: valueOrDefault(tmp.DataStream.Type, ds.Type),
Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace),
Source: ds.GetSource(),
}

return ret, nil
}

// valueOrDefault returns b if a is an empty string
func valueOrDefault(a, b string) string {
if a == "" {
return b
}
return a
}

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
}
}

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down
9 changes: 1 addition & 8 deletions pkg/component/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -199,12 +197,7 @@ func TestExpectedConfig(t *testing.T) {
assert.Equal(t, err.Error(), scenario.Err.Error())
} else {
require.NoError(t, err)
// protocmp.Transform ensures we do not compare any internal
// protobuf fields
if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) {
t.Errorf("mismatch (-want +got) \n%s",
cmp.Diff(scenario.Expected, observed, protocmp.Transform()))
}
assert.EqualValues(t, scenario.Expected, observed)
}
})
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,7 @@ func (f *Fixture) RunBeat(ctx context.Context) error {
// Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started
// with `WithAllowErrors()` then `Run` will exit early and return the logged error.
//
// If no `states` are provided then the Elastic Agent runs until the context is cancelled.
//
// The Elastic-Agent is started agent in test mode (--testing-mode) this mode
// expects the initial configuration (full YAML config) via gRPC.
// This configuration should be passed in the State.Configure field.
//
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
if f.binaryName != "elastic-agent" {
return errors.New("Run() can only be used with elastic-agent, use RunBeat()")
Expand Down
64 changes: 5 additions & 59 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor

}

// GetLogsForDataset returns any logs associated with the datastream
func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatasetWithContext(context.Background(), client, index)
// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatastreamWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
Expand Down Expand Up @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents,
return handleDocsResponse(res)
}

// GetLogsForDatasetWithContext returns any logs associated with the datastream
func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
Expand Down Expand Up @@ -536,60 +536,6 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
return handleDocsResponse(res)
}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(
ctx context.Context,
client elastictransport.Interface,
dsType, dataset, namespace string) (Documents, error) {

query := map[string]any{
"_source": []string{"message"},
"query": map[string]any{
"bool": map[string]any{
"must": []any{
map[string]any{
"match": map[string]any{
"data_stream.dataset": dataset,
},
},
map[string]any{
"match": map[string]any{
"data_stream.namespace": namespace,
},
},
map[string]any{
"match": map[string]any{
"data_stream.type": dsType,
},
},
},
},
},
}

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// handleDocsResponse converts the esapi.Response into Documents,
// it closes the response.Body after reading
func handleDocsResponse(res *esapi.Response) (Documents, error) {
resultBuf, err := handleResponseRaw(res)
if err != nil {
Expand Down
Loading
Loading