Skip to content

Commit

Permalink
Better documentation, small improvements, mage check
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Oct 11, 2023
1 parent 69d061b commit 2842714
Showing 1 changed file with 89 additions and 50 deletions.
139 changes: 89 additions & 50 deletions testing/integration/datastreams_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration
Expand All @@ -17,26 +21,19 @@ import (
"text/template"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"
"github.com/elastic/elastic-agent/pkg/testing/tools/estools"
"github.com/elastic/elastic-agent/version"
"github.com/stretchr/testify/require"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
)

func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
dsType := "logs"
dsNamespace := strings.ToLower(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64()))
dsDataset := strings.ToLower(fmt.Sprintf("%s-dataset", t.Name()))
numEvents := uint64(60)

tempDir := t.TempDir()
logFilePath := filepath.Join(tempDir, "log.log")
generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents)

info := define.Require(t, define.Requirements{
Local: false,
Stack: &define.Stack{
Expand All @@ -45,6 +42,15 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
Sudo: true,
})

dsType := "logs"
dsNamespace := strings.ToLower(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64()))
dsDataset := strings.ToLower(fmt.Sprintf("%s-dataset", t.Name()))
numEvents := 60

tempDir := t.TempDir()
logFilePath := filepath.Join(tempDir, "log.log")
generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents)

agentFixture, err := define.NewFixture(t, define.Version())
if err != nil {
t.Fatalf("could not create new fixture: %s", err)
Expand All @@ -53,6 +59,10 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 1. Create a policy in Fleet with monitoring enabled.
// To ensure there are no conflicts with previous test runs against
// the same ESS stack, we add the current time at the end of the policy
// name. This policy does not contain any integration.
createPolicyReq := kibana.AgentPolicy{
Name: t.Name() + "--" + time.Now().Format(time.RFC3339Nano),
Namespace: info.Namespace,
Expand All @@ -68,6 +78,8 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
Force: true,
}

// 2. Install the Elastic-Agent with the policy that
// was just created.
policy, err := tools.InstallAgentWithPolicy(ctx,
t,
installOpts,
Expand All @@ -78,11 +90,14 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
t.Fatalf("could not install Elastic-AGent with Policy: %s", err)
}

// 3. Prepare a request to add an integration to the policy
tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON)
if err != nil {
t.Fatalf("cannot parse template: %s", err)
}

// The time here ensures there are no conflicts with the integration name
// in Fleet.
agentPolicyBuffer := bytes.Buffer{}
err = tmpl.Execute(&agentPolicyBuffer, plolicyVars{
Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339),
Expand All @@ -95,6 +110,8 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
t.Fatalf("could not render template: %s", err)
}

// 4. Call Kibana to create the policy.
// Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api
resp, err := info.KibanaClient.Connection.Send(
http.MethodPost,
"/api/fleet/package_policies",
Expand All @@ -105,6 +122,8 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
t.Fatalf("could not execute request to Kibana/Fleet: %s", err)
}
if resp.StatusCode != http.StatusOK {
// On error dump the whole request response so we can easily spot
// what went wrong.
t.Errorf("received a non 200-OK when adding package to policy. "+
"Status code: %d", resp.StatusCode)
respDump, err := httputil.DumpResponse(resp, true)
Expand All @@ -116,34 +135,15 @@ func TestFlattenedDatastreamFleetPolicy(t *testing.T) {
t.FailNow()
}

ensureDocumentsInES := func() bool {
docs, err := estools.GetLogsForDatastream(
context.Background(), info.ESClient, dsType, dsDataset, dsNamespace)
if err != nil {
t.Logf("error quering ES, will retry later: %s", err)
}

if docs.Hits.Total.Value == int(numEvents) {
return true
}

return false
}

require.Eventually(t, ensureDocumentsInES, 120*time.Second, time.Second,
require.Eventually(
t,
ensureDocumentsInES(t, context.TODO(), info.ESClient, dsType, dsDataset, dsNamespace, int(numEvents)),
120*time.Second,
time.Second,
"could not get all expected documents form ES")
}

func TestFlattenedDatastreamStandalone(t *testing.T) {
dsType := "logs"
dsNamespace := fmt.Sprintf("%s-namespace-%d", t.Name(), rand.Uint64())
dsDataset := fmt.Sprintf("%s-dataset", t.Name())
numEvents := uint64(60)

tempDir := t.TempDir()
logFilePath := filepath.Join(tempDir, "log.log")
generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents)

info := define.Require(t, define.Requirements{
Local: false,
Stack: &define.Stack{
Expand All @@ -152,6 +152,15 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
Sudo: true,
})

dsType := "logs"
dsNamespace := fmt.Sprintf("%s-namespace-%d", t.Name(), rand.Uint64())
dsDataset := fmt.Sprintf("%s-dataset", t.Name())
numEvents := 60

tempDir := t.TempDir()
logFilePath := filepath.Join(tempDir, "log.log")
generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents)

agentFixture, err := define.NewFixture(t,
define.Version(), atesting.WithAllowErrors())
if err != nil {
Expand All @@ -178,10 +187,12 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
ESPassword: os.Getenv("ELASTICSEARCH_PASSWORD"),
})

// 1. The first thing to do is to prepare the fixture.
if err := agentFixture.Prepare(context.Background()); err != nil {
t.Fatalf("cannot prepare Elastic-Agent: %s", err)
}

// 2. Create a context with cancel to easily stop the Elastic-Agent
runCtx, cancelAgentRunCtx := context.WithCancel(context.Background())
go func() {
// make sure the test does not hang forever
Expand All @@ -190,6 +201,16 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
cancelAgentRunCtx()
}()

// 3. Define the "desired state". Here we define the desired state
// for Elastic-Agent and its components. Once this state is reached the
// `After` hook is called, the actual test code goes there. Anything that needs
// to be done after the Elastic-Agent is running goes there. In this case we only
// need to assert the documents are correctly ingested in ES.
//
// `Configure` contains the raw YAML policy for the Elastic-Agent. Because `agentFixture.Run` starts
// the Elastic-Agent in test mode (`--testing-mode`), it will ignore the `elastic-agent.yaml`
// and wait to receive the full configuration via gRPC, hence there is no need to call the
// `agent.Fixture.Configure` method.
state := atesting.State{
Configure: renderedPolicy.String(),
AgentState: atesting.NewClientState(client.Healthy),
Expand All @@ -214,40 +235,58 @@ func TestFlattenedDatastreamStandalone(t *testing.T) {
},
},
After: func() error {
ensureDocumentsInES := func() bool {
docs, err := estools.GetLogsForDatastream(context.Background(), info.ESClient, dsType, dsDataset, dsNamespace)
if err != nil {
t.Logf("error quering ES, will retry later: %s", err)
}

if docs.Hits.Total.Value == 60 {
return true
}

return false
}

require.Eventually(
t,
ensureDocumentsInES,
ensureDocumentsInES(t, runCtx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents),
2*time.Minute, time.Second,
"did not find all expected documents")
cancelAgentRunCtx()
return nil
},
}

// 4. Start the Elastic-Agent. `agentFixture.Run` will block until
// the Elastic-Agent exits or `runCtx` is cancelled.
if err := agentFixture.Run(runCtx, state); err != nil {
if !errors.Is(err, context.Canceled) {
t.Errorf("error running Elastic-Agent: %s", err)
}
}
}

// ensureDocumentsInES asserts the documents were ingested into the correct
// datastream
func ensureDocumentsInES(
t *testing.T,
ctx context.Context,
esClient elastictransport.Interface,
dsType, dsDataset, dsNamespace string,
numEvents int,
) func() bool {

f := func() bool {
t.Helper()

docs, err := estools.GetLogsForDatastream(ctx, esClient, dsType, dsDataset, dsNamespace)
if err != nil {
t.Logf("error quering ES, will retry later: %s", err)
}

if docs.Hits.Total.Value == int(numEvents) {
return true
}

return false

}

return f
}

// generateLogFile generates a log file by appending new lines every tick
// the lines are composed by the test name and the current time in RFC3339Nano
// This function spans a new goroutine and does not block
func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events uint64) {
func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events int) {
t.Helper()
f, err := os.Create(fullPath)
if err != nil {
Expand All @@ -268,7 +307,7 @@ func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events u
}
}()

i := uint64(0)
i := 0
for {
select {
case <-done:
Expand Down

0 comments on commit 2842714

Please sign in to comment.