From 368271c9f3bd2ff0882be2bfb94ddaeb0461cbdf Mon Sep 17 00:00:00 2001 From: Elliot Dobson Date: Thu, 7 Dec 2023 06:45:30 +1300 Subject: [PATCH] lambda-promtail: cloudwatch: add '__aws_log_type' label (#11335) **What this PR does / why we need it**: Adds the `__aws_log_type` label to AWS CloudWatch logs in `lambda-promtail`. **Which issue(s) this PR fixes**: N/A **Special notes for your reviewer**: AWS S3 & Kinesis log types already have this label. The `lambda-promtail` documentation [here](https://github.com/grafana/loki/blob/main/docs/sources/send-data/lambda-promtail/_index.md?plain=1#L154) suggests that CloudWatch logs has this label added as well, but in practice it does not AFAICT. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- tools/lambda-promtail/lambda-promtail/cw.go | 1 + .../lambda-promtail/cw_test.go | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 tools/lambda-promtail/lambda-promtail/cw_test.go diff --git a/tools/lambda-promtail/lambda-promtail/cw.go b/tools/lambda-promtail/lambda-promtail/cw.go index 895cd66c8f450..1ad6bf34878ed 100644 --- a/tools/lambda-promtail/lambda-promtail/cw.go +++ b/tools/lambda-promtail/lambda-promtail/cw.go @@ -18,6 +18,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent) } labels := model.LabelSet{ + model.LabelName("__aws_log_type"): model.LabelValue("cloudwatch"), model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(data.LogGroup), model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(data.Owner), } diff --git a/tools/lambda-promtail/lambda-promtail/cw_test.go b/tools/lambda-promtail/lambda-promtail/cw_test.go new file mode 100644 index 0000000000000..9ad5a907c7711 --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/cw_test.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "testing" + + "github.com/aws/aws-lambda-go/events" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" +) + +func Test_parseCWEvent(t *testing.T) { + tests := []struct { + name string + b *batch + expectedStream string + keepStream bool + }{ + { + name: "cloudwatch", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + expectedStream: `{__aws_cloudwatch_log_group="testLogGroup", __aws_cloudwatch_owner="123456789123", __aws_log_type="cloudwatch"}`, + keepStream: false, + }, + { + name: "cloudwatch_keepStream", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + expectedStream: `{__aws_cloudwatch_log_group="testLogGroup", __aws_cloudwatch_log_stream="testLogStream", __aws_cloudwatch_owner="123456789123", __aws_log_type="cloudwatch"}`, + keepStream: true, + }, + } + + for _, tt := range tests { + // Docs: https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchlogs.html + // Example CloudWatchLogEvent copied from https://github.com/aws/aws-lambda-go/blob/main/events/cloudwatch_logs_test.go + cwevent := &events.CloudwatchLogsEvent{ + AWSLogs: events.CloudwatchLogsRawData{ + Data: "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0bLYmibvPPN3wz00CJxmQnTO41whwWQRIctmEcB6sQbFC3CjW3XW8kxpOpP+OC22d1Wml1qZkQGtoMsScxaczKN3plG8zlaHIta5KqWsozoTYw3/djzwhpLwivWFGHGpAFe7DL68JlBUk+l7KSN7tCOEJ4M3/qOI49vMHj+zCKdlFqLaU2ZHV2a4Ct/an0/ivdX8oYc1UVX860fQDQiMdxRQEAAA==", + }, + } + + t.Run(tt.name, func(t *testing.T) { + batchSize = 131072 // Set large enough we don't send to promtail + keepStream = tt.keepStream + err := parseCWEvent(context.Background(), tt.b, cwevent) + if err != nil { + t.Error(err) + } + require.Len(t, tt.b.streams, 1) + stream, ok := tt.b.streams[tt.expectedStream] + require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) + require.NotNil(t, stream) + }) + } +}