From da565c34a94f71cca495a57f144450979df2ce59 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 22 Sep 2023 18:25:00 +0200 Subject: [PATCH] Support flattened data_stream.* fields An input configuration supports flattened fields, however the 'data_stream' field was not being correctly decoded when flattened. This commit fixes this issue. Some small additions and refactoring are also implemented in the integration test framework as well as some more detailed documentation. --- ...Support-flattened-data_stream.-fields.yaml | 35 ++ pkg/component/component_test.go | 71 ++++ pkg/component/config.go | 83 ++++ pkg/component/config_test.go | 9 +- pkg/testing/fixture.go | 7 + pkg/testing/tools/estools/elasticsearch.go | 65 +++- testing/integration/datastreams_test.go | 356 ++++++++++++++++++ testing/integration/monitoring_logs_test.go | 2 +- 8 files changed, 621 insertions(+), 7 deletions(-) create mode 100644 changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml create mode 100644 testing/integration/datastreams_test.go diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml new file mode 100644 index 00000000000..2e04a62793f --- /dev/null +++ b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Support flattened data_stream.* fields + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: >- + An input configuration supports flattened fields, however the + 'data_stream' field was not being correctly decoded when + flattened. This commit fixes this issue. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3465 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 424f3a93147..00c4d1c63cb 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } + +func TestFlattenedDataStream(t *testing.T) { + expectedNamespace := "test-namespace" + expectedType := "test-type" + expectedDataset := "test-dataset" + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "data_stream.type": expectedType, + "data_stream.dataset": expectedDataset, + "data_stream": map[string]any{ + "namespace": expectedNamespace, + }, + }, + }, + } + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 1 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + if len(result[0].Units) != 2 { + t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range result[0].Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + if dataStream.Dataset != expectedDataset { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) + } + if dataStream.Type != expectedType { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) + } + if dataStream.Namespace != expectedNamespace { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) + } +} diff --git a/pkg/component/config.go b/pkg/component/config.go index a0c75d00e32..50b5e590e6b 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -100,9 +101,91 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } + if err := updateDataStreamsFromSource(result); err != nil { + return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) + } + return result, nil } +// dataStreamAndSource is a generic way to represent proto mesages +// that contain a source field and a datastream field. +type dataStreamAndSource interface { + GetDataStream() *proto.DataStream + GetSource() *structpb.Struct +} + +func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { + ds := raw.GetDataStream() + if ds == nil { + ds = &proto.DataStream{} + } + + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + cfg, err := config.NewConfigFrom(raw.GetSource().AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + if err := cfg.Unpack(&tmp); err != nil { + return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) + } + + if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") { + return nil, errors.New("duplicated key 'datastream.dataset'") + } + + if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") { + return nil, errors.New("duplicated key 'datastream.type'") + } + + if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") { + return nil, errors.New("duplicated key 'datastream.namespace'") + } + + ret := &proto.DataStream{ + Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), + Type: merge(tmp.DataStream.Type, ds.Type), + Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), + Source: raw.GetDataStream().GetSource(), + } + + return ret, nil +} + +// merge returns b if a is an empty string +func merge(a, b string) string { + if a == "" { + return b + } + return a +} + +func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { + var err error + unitConfig.DataStream, err = deDotDataStream(unitConfig) + if err != nil { + return fmt.Errorf("could not parse data_stream from input: %w", err) + } + + for i, stream := range unitConfig.Streams { + stream.DataStream, err = deDotDataStream(stream) + if err != nil { + return fmt.Errorf("could not parse data_stream from stream [%d]: %w", + i, err) + } + } + + return nil +} + func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/pkg/component/config_test.go b/pkg/component/config_test.go index 64dcfe3a697..7cdef177829 100644 --- a/pkg/component/config_test.go +++ b/pkg/component/config_test.go @@ -8,8 +8,10 @@ import ( "errors" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) { assert.Equal(t, err.Error(), scenario.Err.Error()) } else { require.NoError(t, err) - assert.EqualValues(t, scenario.Expected, observed) + // protocmp.Transform ensures we do not compare any internal + // protobuf fields + if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) { + t.Errorf("mismatch (-want +got) \n%s", + cmp.Diff(scenario.Expected, observed, protocmp.Transform())) + } } }) } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index f713a581f80..99035fc8d50 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -242,6 +242,13 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // // If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// +// The Elastic-Agent is started agent in test mode (--testing-mode) this mode +// expects the initial configuration (full YAML config) via gRPC. +// This configuration should be passed in the State.Configure field. +// +// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored +// when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.installed { return errors.New("fixture is installed; cannot be run") diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 8cd6e126597..ea78373e1b7 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -230,9 +230,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return handleDocsResponse(res) } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatastreamWithContext(context.Background(), client, index) +// GetLogsForDataset returns any logs associated with the datastream +func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatasetWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -270,8 +270,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatastreamWithContext returns any logs associated with the datastream -func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatasetWithContext returns any logs associated with the datastream +func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -302,7 +302,62 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor return handleDocsResponse(res) } +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream( + ctx context.Context, + client elastictransport.Interface, + dsType, dataset, namespace string) (Documents, error) { + + query := map[string]any{ + "_source": []string{"message"}, + "query": map[string]any{ + "bool": map[string]any{ + "must": []any{ + map[string]any{ + "match": map[string]any{ + "data_stream.dataset": dataset, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.namespace": namespace, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.type": dsType, + }, + }, + }, + }, + }, + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + es := esapi.New(client) + res, err := es.Search( + es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), + es.Search.WithExpandWildcards("all"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + if err != nil { + return Documents{}, fmt.Errorf("error performing ES search: %w", err) + } + + return handleDocsResponse(res) +} + +// handleDocsResponse converts the esapi.Response into Documents, +// it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { + defer res.Body.Close() if res.StatusCode >= 300 || res.StatusCode < 200 { return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) } diff --git a/testing/integration/datastreams_test.go b/testing/integration/datastreams_test.go new file mode 100644 index 00000000000..8775f3d3a64 --- /dev/null +++ b/testing/integration/datastreams_test.go @@ -0,0 +1,356 @@ +//go:build integration + +package integration + +import ( + "bytes" + "context" + "errors" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "strings" + "testing" + "text/template" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/stretchr/testify/require" +) + +func TestFlattenedDatastreamFleetPolicy(t *testing.T) { + dsType := "logs" + dsNamespace := strings.ToLower(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) + dsDataset := strings.ToLower(fmt.Sprintf("%s-dataset", t.Name())) + numEvents := uint64(60) + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + + agentFixture, err := define.NewFixture(t, define.Version()) + if err != nil { + t.Fatalf("could not create new fixture: %s", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + createPolicyReq := kibana.AgentPolicy{ + Name: t.Name() + "--" + time.Now().Format(time.RFC3339Nano), + Namespace: info.Namespace, + Description: "Test policy for " + t.Name(), + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + IsProtected: false, + } + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + policy, err := tools.InstallAgentWithPolicy(ctx, + t, + installOpts, + agentFixture, + info.KibanaClient, + createPolicyReq) + if err != nil { + t.Fatalf("could not install Elastic-AGent with Policy: %s", err) + } + + tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + agentPolicyBuffer := bytes.Buffer{} + err = tmpl.Execute(&agentPolicyBuffer, plolicyVars{ + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + Namespace: dsNamespace, + Dataset: dsDataset, + }) + if err != nil { + t.Fatalf("could not render template: %s", err) + } + + resp, err := info.KibanaClient.Connection.Send( + http.MethodPost, + "/api/fleet/package_policies", + nil, + nil, + &agentPolicyBuffer) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump error response from Kibana: %s", err) + } + t.Log("Kibana error response") + t.Log(string(respDump)) + t.FailNow() + } + + ensureDocumentsInES := func() bool { + docs, err := estools.GetLogsForDatastream( + context.Background(), info.ESClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == int(numEvents) { + return true + } + + return false + } + + require.Eventually(t, ensureDocumentsInES, 120*time.Second, time.Second, + "could not get all expected documents form ES") +} + +func TestFlattenedDatastreamStandalone(t *testing.T) { + dsType := "logs" + dsNamespace := fmt.Sprintf("%s-namespace-%d", t.Name(), rand.Uint64()) + dsDataset := fmt.Sprintf("%s-dataset", t.Name()) + numEvents := uint64(60) + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) + + info := define.Require(t, define.Requirements{ + Local: false, + Stack: &define.Stack{}, + Sudo: true, + }) + + agentFixture, err := define.NewFixture(t, + define.Version(), atesting.WithAllowErrors()) + if err != nil { + t.Fatalf("could not create new fixture: %s", err) + } + + tmpl, err := template.New("standalone-policy").Parse(standalonePolicy) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The environment variables are set by the test runner. + // If you're manually running the tests (go test) then you + // will have to manually set them + renderedPolicy := bytes.Buffer{} + tmpl.Execute(&renderedPolicy, plolicyVars{ + LogFilePath: logFilePath, + Dataset: dsDataset, + Namespace: dsNamespace, + Type: dsType, + + ESHost: os.Getenv("ELASTICSEARCH_HOST"), + ESUsername: os.Getenv("ELASTICSEARCH_USERNAME"), + ESPassword: os.Getenv("ELASTICSEARCH_PASSWORD"), + }) + + if err := agentFixture.Prepare(context.Background()); err != nil { + t.Fatalf("cannot prepare Elastic-Agent: %s", err) + } + + runCtx, cancelAgentRunCtx := context.WithCancel(context.Background()) + go func() { + // make sure the test does not hang forever + time.Sleep(30 * time.Second) + t.Error("'test timeout': cancelling run context, the Elastic-Agent will exit") + cancelAgentRunCtx() + }() + + state := atesting.State{ + Configure: renderedPolicy.String(), + AgentState: atesting.NewClientState(client.Healthy), + Components: map[string]atesting.ComponentState{ + "filestream-default": { + State: atesting.NewClientState(client.Healthy), + Units: map[atesting.ComponentUnitKey]atesting.ComponentUnitState{ + { + UnitType: client.UnitTypeInput, + UnitID: "filestream-default-elastic-agent-input-id", + }: { + State: atesting.NewClientState(client.Healthy), + }, + + { + UnitType: client.UnitTypeOutput, + UnitID: "filestream-default", + }: { + State: atesting.NewClientState(client.Healthy), + }, + }, + }, + }, + After: func() error { + ensureDocumentsInES := func() bool { + docs, err := estools.GetLogsForDatastream(context.Background(), info.ESClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == 60 { + return true + } + + return false + } + + require.Eventually( + t, + ensureDocumentsInES, + 2*time.Minute, time.Second, + "did not find all expected documents") + cancelAgentRunCtx() + return nil + }, + } + + if err := agentFixture.Run(runCtx, state); err != nil { + if !errors.Is(err, context.Canceled) { + t.Errorf("error running Elastic-Agent: %s", err) + } + } +} + +// generateLogFile generates a log file by appending new lines every tick +// the lines are composed by the test name and the current time in RFC3339Nano +// This function spans a new goroutine and does not block +func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events uint64) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(tick) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + i := uint64(0) + for { + select { + case <-done: + return + case now := <-ticker.C: + i++ + _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, t.Errorf is our only option + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + if i == events { + return + } + } + } + }() +} + +type plolicyVars struct { + Name string + PolicyID string + LogFilePath string + ESHost string + ESPassword string + ESUsername string + Namespace string + Dataset string + Type string +} + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.3.0" + }, + "name": "{{.Name}}", + "namespace": "{{.Namespace}}", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath}}" + ], + "data_stream.dataset": "{{.Dataset}}" + } + } + } + } + } +}` + +var standalonePolicy = ` +outputs: + default: + type: elasticsearch + hosts: + - "{{.ESHost}}:443" + username: "{{.ESUsername}}" + password: "{{.ESPassword}}" + +inputs: + - type: filestream + id: elastic-agent-input-id + streams: + - id: filestream-input-id-1 + data_stream: + dataset: "{{.Dataset}}" + data_stream.namespace: "{{.Namespace}}" + data_stream.type: "{{.Type}}" + paths: + - {{.LogFilePath}} + +agent.monitoring: + enabled: true + logs: true + metrics: true +` diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go index 97836c7ff3f..9e9ee99e41e 100644 --- a/testing/integration/monitoring_logs_test.go +++ b/testing/integration/monitoring_logs_test.go @@ -85,7 +85,7 @@ func TestMonitoringLogsShipped(t *testing.T) { // Stage 3: Make sure metricbeat logs are populated t.Log("Making sure metricbeat logs are populated") docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") }) require.NotZero(t, len(docs.Hits.Hits)) t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))