From 346ec309c798a533ce715555e696a37a77ebc43b Mon Sep 17 00:00:00 2001 From: Adrian Smith Date: Thu, 22 Feb 2024 13:50:54 -0800 Subject: [PATCH] Publish SNS event on new alert (#200) --- .github/workflows/hygeine.yml | 4 +- Makefile | 6 +- config.env.template | 3 + docker-compose.yml | 15 ++++ docs/alert-routing.md | 19 +++- e2e/alerting_test.go | 26 +++++- e2e/heuristic_test.go | 10 +-- e2e/setup.go | 61 ++++++++++++- go.mod | 16 +++- go.sum | 32 +++++++ internal/alert/manager.go | 49 ++++++++--- internal/alert/manager_test.go | 38 +++++++- internal/alert/routing.go | 14 +++ internal/alert/routing_test.go | 4 + internal/client/alert.go | 21 +++-- internal/client/alert_test.go | 14 +-- internal/client/sns.go | 124 +++++++++++++++++++++++++++ internal/client/sns_test.go | 52 +++++++++++ internal/config/config.go | 4 + internal/core/constants.go | 3 + internal/metrics/metrics.go | 3 +- internal/mocks/mock_sns.go | 65 ++++++++++++++ internal/mocks/routing_directory.go | 26 ++++++ scripts/localstack-e2e-test-setup.sh | 10 +++ 24 files changed, 574 insertions(+), 45 deletions(-) create mode 100644 docker-compose.yml create mode 100644 internal/client/sns.go create mode 100644 internal/client/sns_test.go create mode 100644 internal/mocks/mock_sns.go create mode 100755 scripts/localstack-e2e-test-setup.sh diff --git a/.github/workflows/hygeine.yml b/.github/workflows/hygeine.yml index 4c743a73..689f0590 100644 --- a/.github/workflows/hygeine.yml +++ b/.github/workflows/hygeine.yml @@ -48,8 +48,8 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v2 with: - node-version: '20' - + node-version: '20' + - name: Install markdownlint CLI run: npm install -g markdownlint-cli diff --git a/Makefile b/Makefile index cd68481a..e0d7d097 100644 --- a/Makefile +++ b/Makefile @@ -27,11 +27,13 @@ go-gen-mocks: .PHONY: test test: - @ go test ./internal/... -timeout $(TEST_LIMIT) + @go test ./internal/... -timeout $(TEST_LIMIT) .PHONY: test-e2e e2e-test: - @ go test ./e2e/... -timeout $(TEST_LIMIT) -deploy-config ../.devnet/devnetL1.json -parallel=4 -v + @docker compose up -d + @go test ./e2e/... -timeout $(TEST_LIMIT) -deploy-config ../.devnet/devnetL1.json -parallel=4 -v + @docker compose down .PHONY: lint lint: diff --git a/config.env.template b/config.env.template index 01ab5772..038c55a0 100644 --- a/config.env.template +++ b/config.env.template @@ -32,6 +32,9 @@ P0_PAGERDUTY_ALERT_EVENTS_URL= P1_PAGERDUTY_INTEGRATION_KEY= P1_PAGERDUTY_ALERT_EVENTS_URL= +SNS_TOPIC_ARN= +AWS_ENDPOINT= + # Metrics configurations METRICS_HOST=localhost METRICS_PORT=7300 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..2eb95cdb --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,15 @@ +version: "3.8" + +services: + localstack: + container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}" + image: localstack/localstack:3.1.0 + ports: + - "127.0.0.1:4566:4566" # LocalStack Gateway + - "127.0.0.1:4510-4559:4510-4559" # external services port range + environment: + # LocalStack configuration: https://docs.localstack.cloud/references/configuration/ + - DEBUG=${DEBUG:-0} + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + - "./scripts/localstack-e2e-test-setup.sh:/etc/localstack/init/ready.d/script.sh" \ No newline at end of file diff --git a/docs/alert-routing.md b/docs/alert-routing.md index 10917adb..e29a1f2e 100644 --- a/docs/alert-routing.md +++ b/docs/alert-routing.md @@ -32,10 +32,11 @@ a few examples on how you might want to configure your alert routing. Pessimism currently supports the following alert destinations: -| Name | Description | -|-----------|-------------------------------------| -| slack | Sends alerts to a Slack channel | -| pagerduty | Sends alerts to a PagerDuty service | +| Name | Description | +|-----------|---------------------------------------------------| +| slack | Sends alerts to a Slack channel | +| pagerduty | Sends alerts to a PagerDuty service | +| sns | Sends alerts to an SNS topic defined in .env file | ## Alert Severity @@ -47,6 +48,16 @@ Pessimism currently defines the following severities for alerts: | medium | Alerts that could be hazardous, but may not be completely destructive | | high | Alerts that require immediate attention and could result in a loss of funds | +## Publishing to an SNS Topic + +To publish alerts to an SNS topic, you must first create an SNS topic in the AWS +console. Once you have created the topic, you will need to add the ARN of the +topic to the `.env` file. Ensure that you have AWS_REGION, +`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` set in your environment if you are looking to publish messages to an SNS +topic. The ARN should be added to the `SNS_TOPIC_ARN` variable found in the `.env` file. +The AWS_ENDPOINT is optional and is primarily used for testing with localstack. +> Note: Currently, Pessimism only support one SNS topic to publish alerts to. + ## PagerDuty Severity Mapping PagerDuty supports the following severities: `critical`, `error`, `warning`, diff --git a/e2e/alerting_test.go b/e2e/alerting_test.go index 1df8f596..f9435b5c 100644 --- a/e2e/alerting_test.go +++ b/e2e/alerting_test.go @@ -3,6 +3,7 @@ package e2e_test import ( "context" "math/big" + "testing" "time" @@ -17,11 +18,18 @@ import ( "github.com/stretchr/testify/require" ) +const ( + // These are localstack specific Topic ARNs that are used to test the SNS integration. + MultiDirectiveTopicArn = "arn:aws:sns:us-east-1:000000000000:multi-directive-test-topic" + CoolDownTopicArn = "arn:aws:sns:us-east-1:000000000000:alert-cooldown-test-topic" +) + // TestMultiDirectiveRouting ... Tests the E2E flow of a contract event heuristic with high priority alerts all // necessary destinations func TestMultiDirectiveRouting(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, MultiDirectiveTopicArn) + defer ts.Close() updateSig := "ConfigUpdate(uint256,uint8,bytes)" alertMsg := "System config gas config updated" @@ -73,6 +81,12 @@ func TestMultiDirectiveRouting(t *testing.T) { return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil })) + snsMessages, err := e2e.GetSNSMessages(ts.AppCfg.AlertConfig.SNSConfig.Endpoint, "multi-directive-test-queue") + require.NoError(t, err) + + assert.Len(t, snsMessages.Messages, 1, "Incorrect number of SNS messages sent") + assert.Contains(t, *snsMessages.Messages[0].Body, "contract_event", "System contract event alert was not sent") + slackPosts := ts.TestSlackSvr.SlackAlerts() pdPosts := ts.TestPagerDutyServer.PagerDutyAlerts() @@ -90,7 +104,7 @@ func TestMultiDirectiveRouting(t *testing.T) { // balance enforcement heuristic session on L2 network with a cooldown. func TestCoolDown(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, CoolDownTopicArn) defer ts.Close() alice := ts.Cfg.Secrets.Addresses().Alice @@ -149,7 +163,7 @@ func TestCoolDown(t *testing.T) { receipt, err := wait.ForReceipt(context.Background(), ts.L2Client, drainAliceTx.Hash(), types.ReceiptStatusSuccessful) require.NoError(t, err) - require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) { + require.NoError(t, wait.For(context.Background(), 1000*time.Millisecond, func() (bool, error) { id := ids[0].PathID height, err := ts.Subsystems.PathHeight(id) if err != nil { @@ -162,6 +176,11 @@ func TestCoolDown(t *testing.T) { // Check that the balance enforcement was triggered using the mocked server cache. posts := ts.TestSlackSvr.SlackAlerts() + sqsMessages, err := e2e.GetSNSMessages(ts.AppCfg.AlertConfig.SNSConfig.Endpoint, "alert-cooldown-test-queue") + assert.NoError(t, err) + assert.Len(t, sqsMessages.Messages, 1, "Incorrect number of SNS messages sent") + assert.Contains(t, *sqsMessages.Messages[0].Body, "balance_enforcement", "Balance enforcement alert was not sent") + require.Equal(t, 1, len(posts), "No balance enforcement alert was sent") assert.Contains(t, posts[0].Text, "balance_enforcement", "Balance enforcement alert was not sent") assert.Contains(t, posts[0].Text, alertMsg) @@ -170,4 +189,5 @@ func TestCoolDown(t *testing.T) { time.Sleep(1 * time.Second) posts = ts.TestSlackSvr.SlackAlerts() assert.Equal(t, 1, len(posts), "No alerts should be sent after the transaction is sent") + } diff --git a/e2e/heuristic_test.go b/e2e/heuristic_test.go index 1930c061..6f075087 100644 --- a/e2e/heuristic_test.go +++ b/e2e/heuristic_test.go @@ -31,7 +31,7 @@ import ( // balance enforcement heuristic session on L2 network. func TestBalanceEnforcement(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, "") defer ts.Close() alice := ts.Cfg.Secrets.Addresses().Alice @@ -158,7 +158,7 @@ func TestBalanceEnforcement(t *testing.T) { // contract event heuristic session on L1 network. func TestContractEvent(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, "") defer ts.Close() // The string declaration of the event we want to listen for. @@ -226,7 +226,7 @@ func TestContractEvent(t *testing.T) { // safety heuristic session. This test ensures that an alert is produced in the event // of a highly suspicious withdrawal. func TestWithdrawalSafetyAllInvariants(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, "") defer ts.Close() opts, err := bind.NewKeyedTransactorWithChainID(ts.Cfg.Secrets.Alice, ts.Cfg.L2ChainIDBig()) @@ -357,7 +357,7 @@ func TestWithdrawalSafetyAllInvariants(t *testing.T) { // of a normal tx func TestWithdrawalSafetyNoInvariants(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, "") defer ts.Close() ids, err := ts.App.BootStrap([]*models.SessionRequestParams{ @@ -439,7 +439,7 @@ func TestWithdrawalSafetyNoInvariants(t *testing.T) { // TestFaultDetector ... Ensures that an alert is produced in the presence of a faulty L2Output root // on the L1 Optimism portal contract. func TestFaultDetector(t *testing.T) { - ts := e2e.CreateSysTestSuite(t) + ts := e2e.CreateSysTestSuite(t, "") defer ts.Close() // Generate transactor opts diff --git a/e2e/setup.go b/e2e/setup.go index 22a17ea7..abb76467 100644 --- a/e2e/setup.go +++ b/e2e/setup.go @@ -6,6 +6,9 @@ import ( "os" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" "github.com/base-org/pessimism/internal/alert" "github.com/base-org/pessimism/internal/api/server" "github.com/base-org/pessimism/internal/app" @@ -19,11 +22,11 @@ import ( "github.com/base-org/pessimism/internal/state" "github.com/base-org/pessimism/internal/subsystem" ix_node "github.com/ethereum-optimism/optimism/indexer/node" - "github.com/golang/mock/gomock" - op_e2e "github.com/ethereum-optimism/optimism/op-e2e" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/golang/mock/gomock" + "go.uber.org/zap" ) // SysTestSuite ... Stores all the information needed to run an e2e system test @@ -51,7 +54,7 @@ type SysTestSuite struct { } // CreateSysTestSuite ... Creates a new SysTestSuite -func CreateSysTestSuite(t *testing.T) *SysTestSuite { +func CreateSysTestSuite(t *testing.T, topicArn string) *SysTestSuite { t.Log("Creating system test suite") ctx := context.Background() logging.New(core.Development) @@ -112,11 +115,14 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite { pagerdutyServer := NewTestPagerDutyServer("127.0.0.1", 0) + setAwsVars(t) + slackURL := fmt.Sprintf("http://127.0.0.1:%d", slackServer.Port) pagerdutyURL := fmt.Sprintf("http://127.0.0.1:%d", pagerdutyServer.Port) appCfg.AlertConfig.PagerdutyAlertEventsURL = pagerdutyURL appCfg.AlertConfig.RoutingParams = DefaultRoutingParams(core.StringFromEnv(slackURL)) + appCfg.AlertConfig.SNSConfig.TopicArn = topicArn pess, kill, err := app.NewPessimismApp(ctx, appCfg) if err != nil { @@ -165,6 +171,9 @@ func DefaultTestConfig() *config.Config { AlertConfig: &alert.Config{ PagerdutyAlertEventsURL: "", RoutingCfgPath: "", + SNSConfig: &client.SNSConfig{ + Endpoint: "http://localhost:4566", + }, }, EngineConfig: &engine.Config{ WorkerCount: workerCount, @@ -186,6 +195,52 @@ func DefaultTestConfig() *config.Config { } } +func setAwsVars(t *testing.T) { + awsEnvVariables := map[string]string{ + "AWS_REGION": "us-east-1", + "AWS_SECRET_ACCESS_KEY": "test", + "AWS_ACCESS_KEY_ID": "test", + } + for key, value := range awsEnvVariables { + if err := os.Setenv(key, value); err != nil { + t.Fatalf("Error setting %s environment variable: %s", key, err) + } + } +} + +func GetSNSMessages(endpoint string, queueName string) (*sqs.ReceiveMessageOutput, error) { + sess, err := session.NewSession(&aws.Config{ + Endpoint: aws.String(endpoint), + }) + if err != nil { + logging.NoContext().Error("failed to create AWS session", zap.Error(err)) + return nil, err + } + + svc := sqs.New(sess) + urlResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: aws.String(queueName), + }) + if err != nil { + return nil, err + } + + queueURL := urlResult.QueueUrl + msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: queueURL, + MaxNumberOfMessages: aws.Int64(10), + WaitTimeSeconds: aws.Int64(5), + MessageAttributeNames: []*string{ + aws.String(sqs.QueueAttributeNameAll), + }, + }) + if err != nil { + return nil, err + } + + return msgResult, nil +} + // DefaultRoutingParams ... Returns a default routing params configuration for testing func DefaultRoutingParams(slackURL core.StringFromEnv) *core.AlertRoutingParams { return &core.AlertRoutingParams{ diff --git a/go.mod b/go.mod index 5ecaff99..f9669563 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/VictoriaMetrics/fastcache v1.10.0 // indirect github.com/ajg/form v1.5.1 // indirect + github.com/aws/aws-sdk-go v1.50.3 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.7.0 // indirect @@ -50,13 +51,17 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect + github.com/distribution/reference v0.5.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect + github.com/docker/docker v25.0.2+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.3 // indirect github.com/ethereum-optimism/superchain-registry/superchain v0.0.0-20231001123245-7b48d3818686 // indirect github.com/ethereum/c-kzg-4844 v0.3.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fjl/memsize v0.0.1 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -65,6 +70,8 @@ require ( github.com/getsentry/sentry-go v0.18.0 // indirect github.com/go-chi/chi/v5 v5.0.10 // indirect github.com/go-chi/docgen v1.2.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-stack/stack v1.8.1 // indirect @@ -74,7 +81,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.4.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect - github.com/google/go-cmp v0.5.9 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect github.com/gorilla/websocket v1.5.0 // indirect @@ -104,6 +111,7 @@ require ( github.com/jbenet/goprocess v0.1.4 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.16.7 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/koron/go-ssdp v0.0.4 // indirect @@ -148,6 +156,8 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.12.0 // indirect github.com/onsi/gomega v1.28.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect @@ -178,6 +188,10 @@ require ( github.com/urfave/cli/v2 v2.25.7 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect + go.opentelemetry.io/otel v1.22.0 // indirect + go.opentelemetry.io/otel/metric v1.22.0 // indirect + go.opentelemetry.io/otel/trace v1.22.0 // indirect go.uber.org/dig v1.17.0 // indirect go.uber.org/fx v1.20.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 9a83c26a..5930da31 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKS github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/aws/aws-sdk-go v1.50.3 h1:NnXC/ukOakZbBwQcwAzkAXYEB4SbWboP9TFx9vvhIrE= +github.com/aws/aws-sdk-go v1.50.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -148,11 +150,17 @@ github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQ github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= +github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dlclark/regexp2 v1.7.0 h1:7lJfhqlPssTb1WQx4yvTHN0uElPEv52sbaECrAQxjAo= github.com/dlclark/regexp2 v1.7.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v25.0.2+incompatible h1:/OaKeauroa10K4Nqavw4zlhcDq/WBcPMc5DbjOGgozY= +github.com/docker/docker v25.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -184,6 +192,8 @@ github.com/ethereum/c-kzg-4844 v0.3.1 h1:sR65+68+WdnMKxseNWxSJuAv2tsUrihTpVBTfM/ github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fjl/memsize v0.0.1 h1:+zhkb+dhUgx0/e+M8sF0QqiouvMQUiKR+QYvdxIOKcQ= github.com/fjl/memsize v0.0.1/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= @@ -225,8 +235,13 @@ github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxI github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= @@ -296,6 +311,8 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -437,6 +454,9 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -640,6 +660,10 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= +github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg= github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= @@ -811,6 +835,14 @@ github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPR github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= +go.opentelemetry.io/otel v1.22.0 h1:xS7Ku+7yTFvDfDraDIJVpw7XPyuHlB9MCiqqX5mcJ6Y= +go.opentelemetry.io/otel v1.22.0/go.mod h1:eoV4iAi3Ea8LkAEI9+GFT44O6T/D0GWAVFyZVCC6pMI= +go.opentelemetry.io/otel/metric v1.22.0 h1:lypMQnGyJYeuYPhOM/bgjbFM6WE44W1/T45er4d8Hhg= +go.opentelemetry.io/otel/metric v1.22.0/go.mod h1:evJGjVpZv0mQ5QBRJoBF64yMuOf4xCWdXjK8pzFvliY= +go.opentelemetry.io/otel/trace v1.22.0 h1:Hg6pPujv0XG9QaVbGOBVHunyuLcCC3jN7WEhPx83XD0= +go.opentelemetry.io/otel/trace v1.22.0/go.mod h1:RbbHXVqKES9QhzZq/fE5UnOSILqRt40a21sPw2He1xo= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/internal/alert/manager.go b/internal/alert/manager.go index 7f2b43f1..fe0b4ee2 100644 --- a/internal/alert/manager.go +++ b/internal/alert/manager.go @@ -23,10 +23,12 @@ type Manager interface { } // Config ... Alert manager configuration +// SNSConfig is not part of the RoutingParams as we only support publishing to one SNS client type Config struct { RoutingCfgPath string PagerdutyAlertEventsURL string RoutingParams *core.AlertRoutingParams + SNSConfig *client.SNSConfig } // alertManager ... Alert manager implementation @@ -52,12 +54,12 @@ func NewManager(ctx context.Context, cfg *Config, cm RoutingDirectory) Manager { ctx, cancel := context.WithCancel(ctx) + // NOTE - Consider adding support for additional sns configurations am := &alertManager{ - ctx: ctx, - cdHandler: NewCoolDownHandler(), - cfg: cfg, - cm: cm, - + ctx: ctx, + cdHandler: NewCoolDownHandler(), + cfg: cfg, + cm: cm, cancel: cancel, interpolator: new(Interpolator), store: NewStore(), @@ -90,8 +92,8 @@ func (am *alertManager) handleSlackPost(alert core.Alert, policy *core.AlertPoli // Create event trigger event := &client.AlertEventTrigger{ - Message: am.interpolator.SlackMessage(alert, policy.Msg), - Severity: alert.Sev, + Message: am.interpolator.SlackMessage(alert, policy.Msg), + Alert: alert, } for _, sc := range slackClients { @@ -120,9 +122,8 @@ func (am *alertManager) handlePagerDutyPost(alert core.Alert) error { } event := &client.AlertEventTrigger{ - Message: am.interpolator.PagerDutyMessage(alert), - DedupKey: alert.PathID, - Severity: alert.Sev, + Message: am.interpolator.PagerDutyMessage(alert), + Alert: alert, } for _, pdc := range pdClients { @@ -135,13 +136,35 @@ func (am *alertManager) handlePagerDutyPost(alert core.Alert) error { return fmt.Errorf("client %s could not post to pagerduty: %s", pdc.GetName(), resp.Message) } - am.logger.Debug("Successfully posted to ", zap.Any("resp", resp)) + am.logger.Debug("Successfully posted to PagerDuty", zap.Any("resp", resp)) am.metrics.RecordAlertGenerated(alert, core.PagerDuty, pdc.GetName()) } return nil } +func (am *alertManager) handleSNSPublish(alert core.Alert, policy *core.AlertPolicy) error { + event := &client.AlertEventTrigger{ + Message: am.interpolator.SlackMessage(alert, policy.Msg), + Alert: alert, + } + + c := am.cm.GetSNSClient() + + resp, err := c.PostEvent(am.ctx, event) + if err != nil { + return err + } + + if resp.Status != core.SuccessStatus { + return fmt.Errorf("client %s could not post to sns: %s", c.GetName(), resp.Message) + } + + am.logger.Debug("Successfully posted to SNS", zap.Any("resp", resp)) + am.metrics.RecordAlertGenerated(alert, core.SNS, c.GetName()) + return nil +} + // EventLoop ... Event loop for alert manager subsystem func (am *alertManager) EventLoop() error { ticker := time.NewTicker(time.Second * 1) @@ -202,6 +225,10 @@ func (am *alertManager) HandleAlert(alert core.Alert, policy *core.AlertPolicy) if err := am.handlePagerDutyPost(alert); err != nil { am.logger.Error("could not post to pagerduty", zap.Error(err)) } + + if err := am.handleSNSPublish(alert, policy); err != nil { + am.logger.Error("could not publish to sns", zap.Error(err)) + } } // Shutdown ... Shuts down the alert manager subsystem diff --git a/internal/alert/manager_test.go b/internal/alert/manager_test.go index db92e3d1..1c77c715 100644 --- a/internal/alert/manager_test.go +++ b/internal/alert/manager_test.go @@ -22,6 +22,9 @@ func TestEventLoop(t *testing.T) { AlertConfig: &alert.Config{ RoutingCfgPath: "test_data/alert-routing-test.yaml", PagerdutyAlertEventsURL: "test", + SNSConfig: &client.SNSConfig{ + TopicArn: "test", + }, }, } @@ -39,6 +42,7 @@ func TestEventLoop(t *testing.T) { description: "Test low sev alert sends to slack", test: func(t *testing.T) { cm := alert.NewRoutingDirectory(cfg.AlertConfig) + sns := mocks.NewMockSNSClient(c) am := alert.NewManager(ctx, cfg.AlertConfig, cm) go func() { @@ -51,6 +55,7 @@ func TestEventLoop(t *testing.T) { ingress := am.Transit() + cm.SetSNSClient(sns) cm.SetSlackClients([]client.SlackClient{mocks.NewMockSlackClient(c)}, core.LOW) alert := core.Alert{ @@ -76,6 +81,14 @@ func TestEventLoop(t *testing.T) { }, nil).Times(1) } + sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return( + &client.AlertAPIResponse{ + Message: "test", + Status: core.SuccessStatus, + }, nil).AnyTimes() + + sns.EXPECT().GetName().AnyTimes() + ingress <- alert time.Sleep(1 * time.Second) id := core.NewUUID() @@ -93,6 +106,7 @@ func TestEventLoop(t *testing.T) { description: "Test medium sev alert sends to just PagerDuty", test: func(t *testing.T) { cm := alert.NewRoutingDirectory(cfg.AlertConfig) + sns := mocks.NewMockSNSClient(c) am := alert.NewManager(ctx, cfg.AlertConfig, cm) go func() { @@ -106,6 +120,7 @@ func TestEventLoop(t *testing.T) { ingress := am.Transit() cm.SetPagerDutyClients([]client.PagerDutyClient{mocks.NewMockPagerDutyClient(c)}, core.MEDIUM) + cm.SetSNSClient(sns) alert := core.Alert{ Sev: core.MEDIUM, @@ -130,6 +145,14 @@ func TestEventLoop(t *testing.T) { }, nil).Times(1) } + sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return( + &client.AlertAPIResponse{ + Message: "test", + Status: core.SuccessStatus, + }, nil).AnyTimes() + + sns.EXPECT().GetName().AnyTimes() + ingress <- alert time.Sleep(1 * time.Second) id := core.UUID{} @@ -147,6 +170,7 @@ func TestEventLoop(t *testing.T) { description: "Test high sev alert sends to both slack and PagerDuty", test: func(t *testing.T) { cm := alert.NewRoutingDirectory(cfg.AlertConfig) + sns := mocks.NewMockSNSClient(c) am := alert.NewManager(ctx, cfg.AlertConfig, cm) go func() { @@ -161,6 +185,7 @@ func TestEventLoop(t *testing.T) { cm.SetSlackClients([]client.SlackClient{mocks.NewMockSlackClient(c), mocks.NewMockSlackClient(c)}, core.HIGH) cm.SetPagerDutyClients([]client.PagerDutyClient{mocks.NewMockPagerDutyClient(c), mocks.NewMockPagerDutyClient(c)}, core.HIGH) + cm.SetSNSClient(sns) alert := core.Alert{ Sev: core.HIGH, @@ -181,7 +206,7 @@ func TestEventLoop(t *testing.T) { &client.AlertAPIResponse{ Message: "test", Status: core.SuccessStatus, - }, nil).Times(1) + }, nil) } for _, cli := range cm.GetSlackClients(core.HIGH) { @@ -191,8 +216,17 @@ func TestEventLoop(t *testing.T) { &client.AlertAPIResponse{ Message: "test", Status: core.SuccessStatus, - }, nil).Times(1) + }, nil) } + + sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return( + &client.AlertAPIResponse{ + Message: "test", + Status: core.SuccessStatus, + }, nil).AnyTimes() + + sns.EXPECT().GetName().AnyTimes() + ingress <- alert time.Sleep(1 * time.Second) id := core.UUID{} diff --git a/internal/alert/routing.go b/internal/alert/routing.go index 748361e6..c1a26b3d 100644 --- a/internal/alert/routing.go +++ b/internal/alert/routing.go @@ -14,14 +14,18 @@ type RoutingDirectory interface { InitializeRouting(params *core.AlertRoutingParams) SetPagerDutyClients([]client.PagerDutyClient, core.Severity) SetSlackClients([]client.SlackClient, core.Severity) + GetSNSClient() client.SNSClient + SetSNSClient(client.SNSClient) } // routingDirectory ... Routing directory implementation // NOTE: This implementation works for now, but if we add more routing clients in the future, // we should consider refactoring this to be more generic +// Only one SNS client is needed in most cases. If we need to support multiple SNS clients, we can refactor this type routingDirectory struct { pagerDutyClients map[core.Severity][]client.PagerDutyClient slackClients map[core.Severity][]client.SlackClient + snsClient client.SNSClient cfg *Config } @@ -31,6 +35,7 @@ func NewRoutingDirectory(cfg *Config) RoutingDirectory { cfg: cfg, pagerDutyClients: make(map[core.Severity][]client.PagerDutyClient), slackClients: make(map[core.Severity][]client.SlackClient), + snsClient: nil, } } @@ -49,6 +54,14 @@ func (rd *routingDirectory) SetSlackClients(clients []client.SlackClient, sev co copy(rd.slackClients[sev][0:], clients) } +func (rd *routingDirectory) GetSNSClient() client.SNSClient { + return rd.snsClient +} + +func (rd *routingDirectory) SetSNSClient(client client.SNSClient) { + rd.snsClient = client +} + // SetPagerDutyClients ... Sets the pager duty clients for the given severity level func (rd *routingDirectory) SetPagerDutyClients(clients []client.PagerDutyClient, sev core.Severity) { copy(rd.pagerDutyClients[sev][0:], clients) @@ -56,6 +69,7 @@ func (rd *routingDirectory) SetPagerDutyClients(clients []client.PagerDutyClient // InitializeRouting ... Parses alert routing parameters for each severity level func (rd *routingDirectory) InitializeRouting(params *core.AlertRoutingParams) { + rd.snsClient = client.NewSNSClient(rd.cfg.SNSConfig, "sns") if params != nil { rd.paramsToRouteDirectory(params.AlertRoutes.Low, core.LOW) rd.paramsToRouteDirectory(params.AlertRoutes.Medium, core.MEDIUM) diff --git a/internal/alert/routing_test.go b/internal/alert/routing_test.go index 62fbadcb..ad1b7164 100644 --- a/internal/alert/routing_test.go +++ b/internal/alert/routing_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/base-org/pessimism/internal/alert" + "github.com/base-org/pessimism/internal/client" "github.com/base-org/pessimism/internal/config" "github.com/base-org/pessimism/internal/core" "github.com/stretchr/testify/assert" @@ -13,6 +14,9 @@ import ( func getCfg() *config.Config { return &config.Config{ AlertConfig: &alert.Config{ + SNSConfig: &client.SNSConfig{ + TopicArn: "test", + }, RoutingParams: &core.AlertRoutingParams{ AlertRoutes: &core.SeverityMap{ Low: &core.AlertClientCfg{ diff --git a/internal/client/alert.go b/internal/client/alert.go index e1a7f133..b19a0d21 100644 --- a/internal/client/alert.go +++ b/internal/client/alert.go @@ -16,9 +16,8 @@ type AlertClient interface { // AlertEventTrigger ... A standardized event trigger for alert clients type AlertEventTrigger struct { - Message string - Severity core.Severity - DedupKey core.PathID + Message string + Alert core.Alert } // AlertAPIResponse ... A standardized response for alert clients @@ -30,8 +29,20 @@ type AlertAPIResponse struct { // ToPagerdutyEvent ... Converts an AlertEventTrigger to a PagerDutyEventTrigger func (a *AlertEventTrigger) ToPagerdutyEvent() *PagerDutyEventTrigger { return &PagerDutyEventTrigger{ - DedupKey: a.DedupKey.String(), - Severity: a.Severity.ToPagerDutySev(), + DedupKey: a.Alert.PathID.String(), + Severity: a.Alert.Sev.ToPagerDutySev(), Message: a.Message, } } + +func (a *AlertEventTrigger) ToSNSMessagePayload() *SNSMessagePayload { + return &SNSMessagePayload{ + Network: a.Alert.Net.String(), + HeuristicType: a.Alert.HT.String(), + Severity: a.Alert.Sev.String(), + PathID: a.Alert.PathID.String(), + HeuristicID: a.Alert.HeuristicID.String(), + Timestamp: a.Alert.Timestamp, + Content: a.Message, + } +} diff --git a/internal/client/alert_test.go b/internal/client/alert_test.go index 3252f9b4..c9fba347 100644 --- a/internal/client/alert_test.go +++ b/internal/client/alert_test.go @@ -11,22 +11,24 @@ import ( func TestToPagerDutyEvent(t *testing.T) { alert := &client.AlertEventTrigger{ - Message: "test", - Severity: core.HIGH, - DedupKey: core.PathID{}, + Message: "test", + Alert: core.Alert{ + Sev: core.HIGH, + PathID: core.PathID{}, + }, } - sPathID := alert.DedupKey.String() + sPathID := alert.Alert.PathID.String() res := alert.ToPagerdutyEvent() assert.Equal(t, core.Critical, res.Severity) assert.Equal(t, "test", res.Message) assert.Equal(t, sPathID, res.DedupKey) - alert.Severity = core.MEDIUM + alert.Alert.Sev = core.MEDIUM res = alert.ToPagerdutyEvent() assert.Equal(t, core.Error, res.Severity) - alert.Severity = core.LOW + alert.Alert.Sev = core.LOW res = alert.ToPagerdutyEvent() assert.Equal(t, core.Warning, res.Severity) } diff --git a/internal/client/sns.go b/internal/client/sns.go new file mode 100644 index 00000000..0dc87ff3 --- /dev/null +++ b/internal/client/sns.go @@ -0,0 +1,124 @@ +//go:generate mockgen -package mocks --destination=../mocks/mock_sns.go . SNSClient + +package client + +import ( + "context" + "encoding/json" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sns" + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/logging" + + "go.uber.org/zap" +) + +// SNSClient ... An interface for SNS clients to implement +type SNSClient interface { + AlertClient +} + +// SNSConfig ... Configuration for SNS client +type SNSConfig struct { + TopicArn string + Endpoint string +} + +// SNSMessagePayload ... The json message payload published to SNS +type SNSMessagePayload struct { + Network string `json:"network"` + HeuristicType string `json:"heuristic_type"` + Severity string `json:"severity"` + PathID string `json:"path_id"` + HeuristicID string `json:"heuristic_id"` + Timestamp time.Time `json:"timestamp"` + Content string `json:"content"` +} + +// SNSMessage ... The SNS message structure. Required for SNS Publish API +type SNSMessage struct { + Default string `json:"default"` +} + +type snsClient struct { + svc *sns.SNS + name string + topicArn string +} + +// NewSNSClient ... Initializer +func NewSNSClient(cfg *SNSConfig, name string) SNSClient { + if cfg.TopicArn == "" { + logging.NoContext().Warn("No SNS topic ARN provided") + } + + // Initialize a session that the SDK will use to load configuration, + // credentials, and region. AWS_REGION, AWS_SECRET_ACCESS_KEY, and AWS_ACCESS_KEY_ID should be set in the + // environment's runtime + // Note: If session is to arbitrarily crash, there is a possibility that message publishing will fail + sess, err := session.NewSession(&aws.Config{ + Endpoint: aws.String(cfg.Endpoint), + }) + if err != nil { + logging.NoContext().Error("Failed to create AWS session", zap.Error(err)) + return nil + } + + return &snsClient{ + svc: sns.New(sess), + topicArn: cfg.TopicArn, + name: name, + } +} + +// Marshal ... Marshals the SNS message payload +func (p *SNSMessagePayload) Marshal() ([]byte, error) { + payloadBytes, err := json.Marshal(p) + if err != nil { + return nil, err + } + + msg := &SNSMessage{ + Default: string(payloadBytes), + } + + msgBytes, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + return msgBytes, nil +} + +// PostEvent ... Publishes an event to an SNS topic ARN +func (sc snsClient) PostEvent(_ context.Context, event *AlertEventTrigger) (*AlertAPIResponse, error) { + msgPayload, err := event.ToSNSMessagePayload().Marshal() + if err != nil { + return nil, err + } + + // Publish a message to the topic + result, err := sc.svc.Publish(&sns.PublishInput{ + Message: aws.String(string(msgPayload)), + MessageStructure: aws.String("json"), + TopicArn: &sc.topicArn, + }) + if err != nil { + return &AlertAPIResponse{ + Status: core.FailureStatus, + Message: err.Error(), + }, err + } + + return &AlertAPIResponse{ + Status: core.SuccessStatus, + Message: *result.MessageId, + }, nil +} + +func (sc snsClient) GetName() string { + return sc.name +} diff --git a/internal/client/sns_test.go b/internal/client/sns_test.go new file mode 100644 index 00000000..4254824d --- /dev/null +++ b/internal/client/sns_test.go @@ -0,0 +1,52 @@ +package client + +import ( + "encoding/json" + "testing" + "time" + + "github.com/base-org/pessimism/internal/core" + "github.com/stretchr/testify/assert" +) + +func TestSNSMessagePayload_Marshal(t *testing.T) { + alert := core.Alert{ + Net: core.Layer1, + HT: core.BalanceEnforcement, + Sev: core.HIGH, + PathID: core.MakePathID(0, core.MakeProcessID(core.Live, 0, 0, 0), core.MakeProcessID(core.Live, 0, 0, 0)), + HeuristicID: core.UUID{}, + Timestamp: time.Time{}, + Content: "test", + } + + event := &AlertEventTrigger{ + Message: "test", + Alert: alert, + } + + payload, err := event.ToSNSMessagePayload().Marshal() + if err != nil { + t.Fatal(err) + } + + var snsPayload SNSMessage + err = json.Unmarshal(payload, &snsPayload) + if err != nil { + t.Fatal(err) + } + + var snsMsgPayload SNSMessagePayload + err = json.Unmarshal([]byte(snsPayload.Default), &snsMsgPayload) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, core.Layer1.String(), snsMsgPayload.Network) + assert.Equal(t, core.BalanceEnforcement.String(), snsMsgPayload.HeuristicType) + assert.Equal(t, core.HIGH.String(), snsMsgPayload.Severity) + assert.Equal(t, "test", snsMsgPayload.Content) + assert.Equal(t, alert.PathID.String(), snsMsgPayload.PathID) + assert.Equal(t, alert.HeuristicID.String(), snsMsgPayload.HeuristicID) + assert.Equal(t, alert.Timestamp, snsMsgPayload.Timestamp) +} diff --git a/internal/config/config.go b/internal/config/config.go index 064c75db..21007960 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,6 +58,10 @@ func NewConfig(fileName core.FilePath) *Config { RoutingCfgPath: getEnvStrWithDefault("ALERT_ROUTE_CFG_PATH", "alerts-routing.yaml"), PagerdutyAlertEventsURL: getEnvStrWithDefault("PAGERDUTY_ALERT_EVENTS_URL", ""), RoutingParams: nil, // This is populated after the config is created (see IngestAlertConfig) + SNSConfig: &client.SNSConfig{ + TopicArn: getEnvStrWithDefault("SNS_TOPIC_ARN", ""), + Endpoint: getEnvStrWithDefault("AWS_ENDPOINT", ""), + }, }, ClientConfig: &client.Config{ diff --git a/internal/core/constants.go b/internal/core/constants.go index edef5960..1fe6eb50 100644 --- a/internal/core/constants.go +++ b/internal/core/constants.go @@ -172,6 +172,7 @@ type AlertDestination uint8 const ( Slack AlertDestination = iota + 1 PagerDuty + SNS ThirdParty ) @@ -182,6 +183,8 @@ func (ad AlertDestination) String() string { return "slack" case PagerDuty: return "pager_duty" + case SNS: + return "sns" case ThirdParty: return "third_party" default: diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 3f64036d..dd86a40b 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -247,8 +247,9 @@ func (m *Metrics) RecordAlertGenerated(alert core.Alert, dest core.AlertDestinat net := alert.PathID.Network().String() h := alert.HT.String() sev := alert.Sev.String() + id := alert.PathID.String() - m.AlertsGenerated.WithLabelValues(net, h, sev, dest.String(), clientName).Inc() + m.AlertsGenerated.WithLabelValues(net, h, id, sev, dest.String(), clientName).Inc() } func (m *Metrics) RecordNodeError(n core.Network) { diff --git a/internal/mocks/mock_sns.go b/internal/mocks/mock_sns.go new file mode 100644 index 00000000..b94dc1e7 --- /dev/null +++ b/internal/mocks/mock_sns.go @@ -0,0 +1,65 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/base-org/pessimism/internal/client (interfaces: SNSClient) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + client "github.com/base-org/pessimism/internal/client" + gomock "github.com/golang/mock/gomock" +) + +// MockSNSClient is a mock of SNSClient interface. +type MockSNSClient struct { + ctrl *gomock.Controller + recorder *MockSNSClientMockRecorder +} + +// MockSNSClientMockRecorder is the mock recorder for MockSNSClient. +type MockSNSClientMockRecorder struct { + mock *MockSNSClient +} + +// NewMockSNSClient creates a new mock instance. +func NewMockSNSClient(ctrl *gomock.Controller) *MockSNSClient { + mock := &MockSNSClient{ctrl: ctrl} + mock.recorder = &MockSNSClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSNSClient) EXPECT() *MockSNSClientMockRecorder { + return m.recorder +} + +// GetName mocks base method. +func (m *MockSNSClient) GetName() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetName") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetName indicates an expected call of GetName. +func (mr *MockSNSClientMockRecorder) GetName() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetName", reflect.TypeOf((*MockSNSClient)(nil).GetName)) +} + +// PostEvent mocks base method. +func (m *MockSNSClient) PostEvent(arg0 context.Context, arg1 *client.AlertEventTrigger) (*client.AlertAPIResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PostEvent", arg0, arg1) + ret0, _ := ret[0].(*client.AlertAPIResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PostEvent indicates an expected call of PostEvent. +func (mr *MockSNSClientMockRecorder) PostEvent(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostEvent", reflect.TypeOf((*MockSNSClient)(nil).PostEvent), arg0, arg1) +} diff --git a/internal/mocks/routing_directory.go b/internal/mocks/routing_directory.go index 33a32e07..fd42c95e 100644 --- a/internal/mocks/routing_directory.go +++ b/internal/mocks/routing_directory.go @@ -49,6 +49,20 @@ func (mr *MockRoutingDirectoryMockRecorder) GetPagerDutyClients(arg0 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPagerDutyClients", reflect.TypeOf((*MockRoutingDirectory)(nil).GetPagerDutyClients), arg0) } +// GetSNSClient mocks base method. +func (m *MockRoutingDirectory) GetSNSClient() client.SNSClient { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSNSClient") + ret0, _ := ret[0].(client.SNSClient) + return ret0 +} + +// GetSNSClient indicates an expected call of GetSNSClient. +func (mr *MockRoutingDirectoryMockRecorder) GetSNSClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSNSClient", reflect.TypeOf((*MockRoutingDirectory)(nil).GetSNSClient)) +} + // GetSlackClients mocks base method. func (m *MockRoutingDirectory) GetSlackClients(arg0 core.Severity) []client.SlackClient { m.ctrl.T.Helper() @@ -87,6 +101,18 @@ func (mr *MockRoutingDirectoryMockRecorder) SetPagerDutyClients(arg0, arg1 inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPagerDutyClients", reflect.TypeOf((*MockRoutingDirectory)(nil).SetPagerDutyClients), arg0, arg1) } +// SetSNSClient mocks base method. +func (m *MockRoutingDirectory) SetSNSClient(arg0 client.SNSClient) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetSNSClient", arg0) +} + +// SetSNSClient indicates an expected call of SetSNSClient. +func (mr *MockRoutingDirectoryMockRecorder) SetSNSClient(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSNSClient", reflect.TypeOf((*MockRoutingDirectory)(nil).SetSNSClient), arg0) +} + // SetSlackClients mocks base method. func (m *MockRoutingDirectory) SetSlackClients(arg0 []client.SlackClient, arg1 core.Severity) { m.ctrl.T.Helper() diff --git a/scripts/localstack-e2e-test-setup.sh b/scripts/localstack-e2e-test-setup.sh new file mode 100755 index 00000000..da323052 --- /dev/null +++ b/scripts/localstack-e2e-test-setup.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +echo "Initializing localstack SNS topic..." + +awslocal sns create-topic --name multi-directive-test-topic +awslocal sns create-topic --name alert-cooldown-test-topic +awslocal sqs create-queue --queue-name multi-directive-test-queue +awslocal sqs create-queue --queue-name alert-cooldown-test-queue +awslocal sns subscribe --topic-arn "arn:aws:sns:us-east-1:000000000000:multi-directive-test-topic" --protocol sqs --notification-endpoint "arn:aws:sqs:us-east-1:000000000000:multi-directive-test-queue" +awslocal sns subscribe --topic-arn "arn:aws:sns:us-east-1:000000000000:alert-cooldown-test-topic" --protocol sqs --notification-endpoint "arn:aws:sqs:us-east-1:000000000000:alert-cooldown-test-queue" \ No newline at end of file