diff --git a/cmd/main.go b/cmd/main.go index b93a084e..597bc4ee 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -113,7 +113,7 @@ func RunPessimism(_ *cli.Context) error { } // fetchBootSessions ... Loads the bootstrap file -func fetchBootSessions(path string) ([]app.BootSession, error) { +func fetchBootSessions(path string) ([]*app.BootSession, error) { if !strings.HasSuffix(path, extJSON) { return nil, fmt.Errorf("invalid bootstrap file format; expected %s", extJSON) } @@ -123,7 +123,7 @@ func fetchBootSessions(path string) ([]app.BootSession, error) { return nil, err } - data := []app.BootSession{} + data := []*app.BootSession{} err = json.Unmarshal(file, &data) if err != nil { diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index f4cbf497..3c21867e 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -39,13 +39,13 @@ func Test_Balance_Enforcement(t *testing.T) { bob := ts.L2Cfg.Secrets.Addresses().Bob // Deploy a balance enforcement invariant session for Alice. - err := ts.App.BootStrap([]models.InvRequestParams{{ - Network: "layer2", - PType: "live", - InvType: "balance_enforcement", + err := ts.App.BootStrap([]*models.InvRequestParams{{ + Network: core.Layer2.String(), + PType: core.Live.String(), + InvType: core.BalanceEnforcement.String(), StartHeight: nil, EndHeight: nil, - AlertingDest: "slack", + AlertingDest: core.Slack.String(), SessionParams: map[string]interface{}{ "address": alice.String(), "lower": 3, // i.e. alert if balance is less than 3 ETH @@ -141,7 +141,7 @@ func Test_Contract_Event(t *testing.T) { updateSig := "ConfigUpdate(uint256,uint8,bytes)" // Deploy a contract event invariant session for the L1 system config addresss. - err := ts.App.BootStrap([]models.InvRequestParams{{ + err := ts.App.BootStrap([]*models.InvRequestParams{{ Network: core.Layer1.String(), PType: core.Live.String(), InvType: core.ContractEvent.String(), @@ -245,7 +245,7 @@ func Test_Withdrawal_Enforcement(t *testing.T) { // We use two invariants here; one configured with a dummy L1 message passer // and one configured with the real L1->L2 message passer contract. This allows us to // ensure that an alert is only produced using faulty message passer. - err = ts.App.BootStrap([]models.InvRequestParams{{ + err = ts.App.BootStrap([]*models.InvRequestParams{{ // This is the one that should produce an alert Network: core.Layer1.String(), PType: core.Live.String(), diff --git a/internal/alert/manager.go b/internal/alert/manager.go index 75618831..4e8de52a 100644 --- a/internal/alert/manager.go +++ b/internal/alert/manager.go @@ -14,7 +14,7 @@ import ( // Manager ... Interface for alert manager type Manager interface { - AddInvariantSession(core.SUUID, core.AlertDestination) error + AddSession(core.SUUID, core.AlertDestination) error Transit() chan core.Alert core.Subsystem @@ -52,8 +52,8 @@ func NewManager(ctx context.Context, sc client.SlackClient) Manager { return am } -// AddInvariantSession ... Adds an invariant session to the alert manager store -func (am *alertManager) AddInvariantSession(sUUID core.SUUID, alertDestination core.AlertDestination) error { +// AddSession ... Adds an invariant session to the alert manager store +func (am *alertManager) AddSession(sUUID core.SUUID, alertDestination core.AlertDestination) error { return am.store.AddAlertDestination(sUUID, alertDestination) } diff --git a/internal/alert/store.go b/internal/alert/store.go index 1d5c9b2c..79c5c436 100644 --- a/internal/alert/store.go +++ b/internal/alert/store.go @@ -21,7 +21,7 @@ type store struct { invariantstore map[core.SUUID]core.AlertDestination } -// Newstore ... Initializer +// NewStore ... Initializer func NewStore() Store { return &store{ invariantstore: make(map[core.SUUID]core.AlertDestination), diff --git a/internal/api/models/health.go b/internal/api/models/health.go index 2c146c50..7bf170f1 100644 --- a/internal/api/models/health.go +++ b/internal/api/models/health.go @@ -5,7 +5,6 @@ import ( ) // HealthCheck ... Returns health status of server -// Currently just returns True type HealthCheck struct { Timestamp time.Time Healthy bool diff --git a/internal/api/models/invariant.go b/internal/api/models/invariant.go index 4f297e32..106dc849 100644 --- a/internal/api/models/invariant.go +++ b/internal/api/models/invariant.go @@ -53,6 +53,7 @@ type InvRequestParams struct { AlertingDest string `json:"alert_destination"` } +// Params ... Returns the invariant session params func (irp *InvRequestParams) Params() *core.InvSessionParams { isp := core.NewSessionParams() @@ -73,7 +74,7 @@ func (irp *InvRequestParams) NetworkType() core.Network { return core.StringToNetwork(irp.Network) } -// PiplineType ... Returns the pipeline type +// PipelineType ... Returns the pipeline type func (irp *InvRequestParams) PiplineType() core.PipelineType { return core.StringToPipelineType(irp.PType) } @@ -115,6 +116,13 @@ type InvRequestBody struct { Params InvRequestParams `json:"params"` } +func (irb *InvRequestBody) Clone() *InvRequestBody { + return &InvRequestBody{ + Method: irb.Method, + Params: irb.Params, + } +} + // MethodType ... Returns the invariant method type func (irb *InvRequestBody) MethodType() InvariantMethod { return StringToInvariantMethod(irb.Method) diff --git a/internal/api/service/health_test.go b/internal/api/service/health_test.go index 91de6c02..9114be8b 100644 --- a/internal/api/service/health_test.go +++ b/internal/api/service/health_test.go @@ -16,18 +16,18 @@ func Test_GetHealth(t *testing.T) { description string function string - constructionLogic func() testSuite - testLogic func(*testing.T, testSuite) + constructionLogic func() *testSuite + testLogic func(*testing.T, *testSuite) }{ { name: "Get Health Success", description: "", function: "ProcessInvariantRequest", - constructionLogic: func() testSuite { + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockEthClientInterface.EXPECT(). + ts.mockClient.EXPECT(). HeaderByNumber(gomock.Any(), gomock.Any()). Return(nil, nil). Times(2) @@ -35,7 +35,7 @@ func Test_GetHealth(t *testing.T) { return ts }, - testLogic: func(t *testing.T, ts testSuite) { + testLogic: func(t *testing.T, ts *testSuite) { hc := ts.apiSvc.CheckHealth() assert.True(t, hc.Healthy) @@ -49,10 +49,10 @@ func Test_GetHealth(t *testing.T) { description: "Emulates unhealthy rpc endpoints", function: "ProcessInvariantRequest", - constructionLogic: func() testSuite { + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockEthClientInterface.EXPECT(). + ts.mockClient.EXPECT(). HeaderByNumber(gomock.Any(), gomock.Any()). Return(nil, testErr1()). Times(2) @@ -60,7 +60,7 @@ func Test_GetHealth(t *testing.T) { return ts }, - testLogic: func(t *testing.T, ts testSuite) { + testLogic: func(t *testing.T, ts *testSuite) { hc := ts.apiSvc.CheckHealth() assert.False(t, hc.Healthy) assert.False(t, hc.ChainConnectionStatus.IsL2Healthy) diff --git a/internal/api/service/invariant.go b/internal/api/service/invariant.go index 45e75329..b524d247 100644 --- a/internal/api/service/invariant.go +++ b/internal/api/service/invariant.go @@ -17,7 +17,6 @@ func (svc *PessimismService) ProcessInvariantRequest(ir *models.InvRequestBody) // runInvariantSession ... Runs an invariant session provided func (svc *PessimismService) RunInvariantSession(params *models.InvRequestParams) (core.SUUID, error) { - pConfig, err := svc.m.BuildPipelineCfg(params) if err != nil { return core.NilSUUID(), err @@ -25,7 +24,12 @@ func (svc *PessimismService) RunInvariantSession(params *models.InvRequestParams sConfig := params.SessionConfig() - sUUID, err := svc.m.RunInvSession(pConfig, sConfig) + deployCfg, err := svc.m.BuildDeployCfg(pConfig, sConfig) + if err != nil { + return core.NilSUUID(), err + } + + sUUID, err := svc.m.RunInvSession(deployCfg) if err != nil { return core.NilSUUID(), err } diff --git a/internal/api/service/invariant_test.go b/internal/api/service/invariant_test.go index f5d8d7d1..550af9ea 100644 --- a/internal/api/service/invariant_test.go +++ b/internal/api/service/invariant_test.go @@ -6,245 +6,156 @@ import ( "github.com/base-org/pessimism/internal/api/models" "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/engine/invariant" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) -func Test_ProcessInvariantRequest(t *testing.T) { +func testErr() error { + return fmt.Errorf("test error") +} + +func Test_RunInvariantSession(t *testing.T) { + testSUUID := core.MakeSUUID(1, 1, 1) + ctrl := gomock.NewController(t) - defaultRequestBody := func() *models.InvRequestBody { - return &models.InvRequestBody{ - Method: "run", + testCfg := &invariant.DeployConfig{} - Params: models.InvRequestParams{ - Network: "layer1", - PType: "live", - InvType: "contract_event", + defaultBody := &models.InvRequestBody{ + Method: "run", - StartHeight: nil, - EndHeight: nil, + Params: models.InvRequestParams{ + Network: "layer1", + PType: "live", + InvType: "contract_event", - SessionParams: nil, - }, - } + StartHeight: nil, + EndHeight: nil, + + SessionParams: nil, + }, } var tests = []struct { - name string - description string - function string + name string - constructionLogic func() testSuite - testLogic func(*testing.T, testSuite) + constructionLogic func() *testSuite + testLogic func(*testing.T, *testSuite) }{ { - name: "Get Invariant Failure", - description: "When ProcessInvariantRequest is called provided an invalid invariant, an error should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { - - return createTestSuite(ctrl) - }, - - testLogic: func(t *testing.T, ts testSuite) { - - testParams := defaultRequestBody() - testParams.Params.InvType = "bleh" - - _, err := ts.apiSvc.ProcessInvariantRequest(testParams) - - assert.Error(t, err) - assert.Contains(t, err.Error(), "could not find implementation for type") - - }, - }, - { - name: "Create Pipeline Failure", - description: "When ProcessInvariantRequest is called that results in etl failure, an error should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { - + name: "Successful invariant session deployment", + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockEtlMan.EXPECT(). - CreateDataPipeline(gomock.Any()). - Return(core.NilPUUID(), false, testErr1()). + ts.mockSub.EXPECT(). + BuildPipelineCfg(&defaultBody.Params). + Return(nil, nil). Times(1) - return ts - }, - - testLogic: func(t *testing.T, ts testSuite) { - - _, err := ts.apiSvc.ProcessInvariantRequest(defaultRequestBody()) - - assert.Error(t, err) - assert.Equal(t, err.Error(), testErr1().Error()) - - }, - }, - { - name: "Deploy to Risk Engine Failure", - description: "When ProcessInvariantRequest is called that results in a etl registry fetch failure, an error should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { - ts := createTestSuite(ctrl) - - ts.mockEtlMan.EXPECT(). - CreateDataPipeline(gomock.Any()). - Return(core.NilPUUID(), false, nil). + ts.mockSub.EXPECT(). + BuildDeployCfg(gomock.Any(), gomock.Any()). + Return(testCfg, nil). Times(1) - ts.mockEtlMan.EXPECT(). - GetStateKey(gomock.Any()). - Return(nil, testErr1()). + ts.mockSub.EXPECT(). + RunInvSession(testCfg). + Return(testSUUID, nil). Times(1) return ts }, - testLogic: func(t *testing.T, ts testSuite) { + testLogic: func(t *testing.T, ts *testSuite) { - _, err := ts.apiSvc.ProcessInvariantRequest(defaultRequestBody()) - - assert.Error(t, err) - assert.Equal(t, err.Error(), testErr1().Error()) + testParams := defaultBody.Clone() + actualSUUID, err := ts.apiSvc.ProcessInvariantRequest(testParams) + assert.NoError(t, err) + assert.Equal(t, testSUUID, actualSUUID) }, }, { - name: "Deploy to Risk Engine Failure", - description: "When ProcessInvariantRequest is called that results in a risk engine deploy failure, an error should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { + name: "Failure when building pipeline config", + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockEtlMan.EXPECT(). - CreateDataPipeline(gomock.Any()). - Return(core.NilPUUID(), false, nil). - Times(1) - - ts.mockEtlMan.EXPECT(). - GetStateKey(gomock.Any()). - Return(&core.DataRegister{}, nil). + ts.mockSub.EXPECT(). + BuildPipelineCfg(&defaultBody.Params). + Return(nil, testErr()). Times(1) - - ts.mockEngineMan.EXPECT(). - DeployInvariantSession(gomock.Any()). - Return(core.NilSUUID(), testErr2()). - Times(1) - return ts }, - testLogic: func(t *testing.T, ts testSuite) { - - _, err := ts.apiSvc.ProcessInvariantRequest(defaultRequestBody()) + testLogic: func(t *testing.T, ts *testSuite) { + testParams := defaultBody.Clone() + actualSUUID, err := ts.apiSvc.ProcessInvariantRequest(testParams) assert.Error(t, err) - assert.Equal(t, err.Error(), testErr2().Error()) - + assert.Equal(t, core.NilSUUID(), actualSUUID) }, }, { - name: "Run ETL Pipeline Failure", - description: "When ProcessInvariantRequest is called that results in a pipeline that fails to run, an error should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { + name: "Failure when building deploy config", + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockAlertMan.EXPECT(). - AddInvariantSession(gomock.Any(), gomock.Any()). - Return(nil). - Times(1) - - ts.mockEtlMan.EXPECT(). - CreateDataPipeline(gomock.Any()). - Return(core.NilPUUID(), false, nil). + ts.mockSub.EXPECT(). + BuildPipelineCfg(&defaultBody.Params). + Return(nil, nil). Times(1) - ts.mockEtlMan.EXPECT(). - GetStateKey(gomock.Any()). - Return(&core.DataRegister{}, nil). - Times(1) - - ts.mockEngineMan.EXPECT(). - DeployInvariantSession(gomock.Any()). - Return(core.NilSUUID(), nil). - Times(1) - - ts.mockEtlMan.EXPECT(). - RunPipeline(core.NilPUUID()). - Return(testErr3()). + ts.mockSub.EXPECT(). + BuildDeployCfg(gomock.Any(), gomock.Any()). + Return(nil, testErr()). Times(1) return ts }, - testLogic: func(t *testing.T, ts testSuite) { - - _, err := ts.apiSvc.ProcessInvariantRequest(defaultRequestBody()) + testLogic: func(t *testing.T, ts *testSuite) { + testParams := defaultBody.Clone() + actualSUUID, err := ts.apiSvc.ProcessInvariantRequest(testParams) assert.Error(t, err) - assert.Equal(t, err.Error(), testErr3().Error()) - + assert.Equal(t, core.NilSUUID(), actualSUUID) }, }, { - name: "Successful Sesion Creation", - description: "When ProcessInvariantRequest is called that results in a pipeline that succeeds to run, an invariant UUID should be returned", - function: "ProcessInvariantRequest", - - constructionLogic: func() testSuite { + name: "Failure when running invariant session", + constructionLogic: func() *testSuite { ts := createTestSuite(ctrl) - ts.mockEtlMan.EXPECT(). - CreateDataPipeline(gomock.Any()). - Return(core.NilPUUID(), false, nil). + ts.mockSub.EXPECT(). + BuildPipelineCfg(&defaultBody.Params). + Return(nil, nil). Times(1) - ts.mockEtlMan.EXPECT(). - GetStateKey(gomock.Any()). - Return(&core.DataRegister{}, nil). + ts.mockSub.EXPECT(). + BuildDeployCfg(gomock.Any(), gomock.Any()). + Return(testCfg, nil). Times(1) - ts.mockEngineMan.EXPECT(). - DeployInvariantSession(gomock.Any()). - Return(testSUUID1(), nil). - Times(1) - - ts.mockEtlMan.EXPECT(). - RunPipeline(core.NilPUUID()). - Return(nil). - Times(1) - - ts.mockAlertMan.EXPECT(). - AddInvariantSession(gomock.Any(), gomock.Any()). - Return(nil). + ts.mockSub.EXPECT(). + RunInvSession(testCfg). + Return(core.NilSUUID(), testErr()). Times(1) return ts }, - testLogic: func(t *testing.T, ts testSuite) { - - sUUUID, err := ts.apiSvc.ProcessInvariantRequest(defaultRequestBody()) - - assert.NoError(t, err) - assert.Equal(t, testSUUID1().PID.String(), sUUUID.PID.String()) + testLogic: func(t *testing.T, ts *testSuite) { + testParams := defaultBody.Clone() + actualSUUID, err := ts.apiSvc.ProcessInvariantRequest(testParams) + assert.Error(t, err) + assert.Equal(t, core.NilSUUID(), actualSUUID) }, }, } for i, tc := range tests { - t.Run(fmt.Sprintf("%d-%s-%s", i, tc.name, tc.function), func(t *testing.T) { + t.Run(fmt.Sprintf("%d-%s", i, tc.name), func(t *testing.T) { testMeta := tc.constructionLogic() tc.testLogic(t, testMeta) }) diff --git a/internal/api/service/service.go b/internal/api/service/service.go index f03bc3e9..84553cfd 100644 --- a/internal/api/service/service.go +++ b/internal/api/service/service.go @@ -22,8 +22,7 @@ type Service interface { // PessimismService ... API service type PessimismService struct { ctx context.Context - - m subsystem.Manager + m subsystem.Manager } // New ... Initializer diff --git a/internal/api/service/service_test.go b/internal/api/service/service_test.go index b040aba7..43fe134f 100644 --- a/internal/api/service/service_test.go +++ b/internal/api/service/service_test.go @@ -5,7 +5,6 @@ import ( "fmt" svc "github.com/base-org/pessimism/internal/api/service" - "github.com/base-org/pessimism/internal/subsystem" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/mocks" @@ -19,55 +18,29 @@ const ( ) type testSuite struct { - mockAlertMan *mocks.AlertManager - mockEngineMan *mocks.EngineManager - mockEtlMan *mocks.EtlManager - mockEthClientInterface *mocks.MockEthClientInterface - - apiSvc svc.Service - mockCtrl *gomock.Controller + apiSvc svc.Service + mockClient *mocks.MockEthClientInterface + mockSub *mocks.SubManager + mockCtrl *gomock.Controller } func testErr1() error { return fmt.Errorf(testErrMsg1) } -func testErr2() error { - return fmt.Errorf(testErrMsg2) -} -func testErr3() error { - return fmt.Errorf(testErrMsg3) -} - -func testSUUID1() core.SUUID { - return core.MakeSUUID(1, 1, 1) -} -func createTestSuite(ctrl *gomock.Controller) testSuite { - engineManager := mocks.NewEngineManager(ctrl) - etlManager := mocks.NewEtlManager(ctrl) - alertManager := mocks.NewAlertManager(ctrl) +func createTestSuite(ctrl *gomock.Controller) *testSuite { + sysMock := mocks.NewSubManager(ctrl) ethClient := mocks.NewMockEthClientInterface(ctrl) - - // NOTE - These tests should be migrated to the subsystem manager package - // TODO(#76): No Subsystem Manager Tests - ctx := context.Background() ctx = context.WithValue(ctx, core.L1Client, ethClient) ctx = context.WithValue(ctx, core.L2Client, ethClient) - cfg := &subsystem.Config{} - - m := subsystem.NewManager(ctx, cfg, etlManager, engineManager, alertManager) - - service := svc.New(ctx, m) - return testSuite{ - - mockAlertMan: alertManager, - mockEngineMan: engineManager, - mockEtlMan: etlManager, - mockEthClientInterface: ethClient, - apiSvc: service, - mockCtrl: ctrl, + service := svc.New(ctx, sysMock) + return &testSuite{ + apiSvc: service, + mockClient: ethClient, + mockSub: sysMock, + mockCtrl: ctrl, } } diff --git a/internal/app/app.go b/internal/app/app.go index 0527c758..7fd20975 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -74,18 +74,23 @@ func (a *Application) End() <-chan os.Signal { } // BootStrap ... Bootstraps the application -func (a *Application) BootStrap(sessions []BootSession) error { +func (a *Application) BootStrap(sessions []*BootSession) error { logger := logging.WithContext(a.ctx) for _, session := range sessions { - pConfig, err := a.sub.BuildPipelineCfg(&session) + pConfig, err := a.sub.BuildPipelineCfg(session) if err != nil { return err } sConfig := session.SessionConfig() - sUUID, err := a.sub.RunInvSession(pConfig, sConfig) + deployCfg, err := a.sub.BuildDeployCfg(pConfig, sConfig) + if err != nil { + return err + } + + sUUID, err := a.sub.RunInvSession(deployCfg) if err != nil { return err } diff --git a/internal/client/eth_client.go b/internal/client/eth_client.go index 233ab0f2..7949e168 100644 --- a/internal/client/eth_client.go +++ b/internal/client/eth_client.go @@ -36,7 +36,8 @@ type EthClientInterface interface { BalanceAt(ctx context.Context, account common.Address, number *big.Int) (*big.Int, error) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) - SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) + SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, + ch chan<- types.Log) (ethereum.Subscription, error) } // FromContext ... Retrieves ethClient from context @@ -64,36 +65,38 @@ func NewEthClient(ctx context.Context, rawURL string) (EthClientInterface, error return &EthClient{client}, nil } -// HeaderByNumber ... Wraps go-ethereum node headerByNumber RPC call +// HeaderByNumber ... Wraps go-ethereum headerByNumber client method call func (ec *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { return ec.client.HeaderByNumber(ctx, number) } -// BlockByNumber ... Wraps go-ethereum node blockByNumber RPC call +// BlockByNumber ... Wraps go-ethereum blockByNumber client method call func (ec *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { return ec.client.BlockByNumber(ctx, number) } -// BalanceAt ... Wraps go-ethereum node balanceAt RPC call +// BalanceAt ... Wraps go-ethereum balanceAt client method call func (ec *EthClient) BalanceAt(ctx context.Context, account common.Address, number *big.Int) (*big.Int, error) { return ec.client.BalanceAt(ctx, account, number) } -// FilterLogs ... Wraps go-ethereum node balanceAt RPC call +// FilterLogs ... Wraps go-ethereum balanceAt client method call func (ec *EthClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { return ec.client.FilterLogs(ctx, query) } -// CallContract ... Wraps go-ethereum node callContract RPC call +// CallContract ... Wraps go-ethereum callContract client method call func (ec *EthClient) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { return ec.client.CallContract(ctx, msg, blockNumber) } -// CodeAt ... Wraps go-ethereum node codeAt RPC call +// CodeAt ... Wraps go-ethereum codeAt client method call func (ec *EthClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { return ec.client.CodeAt(ctx, account, blockNumber) } -func (ec *EthClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { +// SubscribeFilterLogs ... Wraps go-ethereum subscribeFilterLogs client method call +func (ec *EthClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, + ch chan<- types.Log) (ethereum.Subscription, error) { return ec.client.SubscribeFilterLogs(ctx, query, ch) } diff --git a/internal/config/config.go b/internal/config/config.go index 4f9e2a2c..7a9fbcc5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -58,7 +58,6 @@ func NewConfig(fileName FilePath) *Config { SlackURL: getEnvStrWithDefault("SLACK_URL", ""), SystemConfig: &subsystem.Config{ - L1PollInterval: getEnvInt("L1_POLL_INTERVAL"), L2PollInterval: getEnvInt("L2_POLL_INTERVAL"), }, diff --git a/internal/core/constants.go b/internal/core/constants.go index 31bf3a4e..a0c7b020 100644 --- a/internal/core/constants.go +++ b/internal/core/constants.go @@ -19,8 +19,6 @@ type Network uint8 const ( Layer1 Network = iota + 1 Layer2 - - UnknownNetwork ) const ( @@ -35,9 +33,10 @@ func (n Network) String() string { case Layer2: return "layer2" - } - return UnknownType + default: + return UnknownType + } } // StringToNetwork ... Converts a string to a network @@ -48,11 +47,20 @@ func StringToNetwork(stringType string) Network { case "layer2": return Layer2 - } - return UnknownNetwork + default: + return Network(0) + } } +type ChainSubscription uint8 + +const ( + OnlyLayer1 ChainSubscription = iota + 1 + OnlyLayer2 + BothNetworks +) + type FetchType int const ( diff --git a/internal/core/core.go b/internal/core/core.go index 10f0a930..5169730c 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -102,8 +102,8 @@ type InvSessionParams struct { } // Bytes ... Returns a marshalled byte array of the invariant session params -func (isp *InvSessionParams) Bytes() []byte { - bytes, _ := json.Marshal(isp.params) +func (sp *InvSessionParams) Bytes() []byte { + bytes, _ := json.Marshal(sp.params) return bytes } @@ -126,18 +126,18 @@ func (sp *InvSessionParams) Value(key string) (any, error) { } // Address ... Returns the address from the invariant session params -func (sp *InvSessionParams) Address() string { +func (sp *InvSessionParams) Address() common.Address { rawAddr, found := sp.params[AddressKey] if !found { - return "" + return common.Address{0} } addr, success := rawAddr.(string) if !success { - return "" + return common.Address{0} } - return addr + return common.HexToAddress(addr) } // SetValue ... Sets a value in the invariant session params @@ -181,6 +181,6 @@ type Subsystem interface { } const ( - L1Portal = "l1_portal_address" - L2ToL1MessgPasser = "l2_to_l1_address" + L1Portal = "l1_portal_address" //#nosec G101: False positive, this isn't a credential + L2ToL1MessgPasser = "l2_to_l1_address" //#nosec G101: False positive, this isn't a credential ) diff --git a/internal/engine/addressing.go b/internal/engine/addressing.go index 3abe82a8..c25bbcc6 100644 --- a/internal/engine/addressing.go +++ b/internal/engine/addressing.go @@ -33,23 +33,27 @@ func (am *addressingMap) GetSUUIDsByPair(address common.Address, pUUID core.PUUI // Insert ... Inserts a new entry into the addressing map func (am *addressingMap) Insert(addr common.Address, pUUID core.PUUID, sUUID core.SUUID) error { + // 1. Check if address exists; create nested entry & return if not if _, found := am.m[addr]; !found { am.m[addr] = make(map[core.PUUID][]core.SUUID) am.m[addr][pUUID] = []core.SUUID{sUUID} return nil } + // 2. Check if pipeline UUID exists; create entry & return if not if _, found := am.m[addr][pUUID]; !found { am.m[addr][pUUID] = []core.SUUID{sUUID} return nil } + // 3. Ensure that entry doesn't already exist for _, entry := range am.m[addr][pUUID] { if entry == sUUID { return fmt.Errorf("entry already exists") } } + // 4. Append entry and return am.m[addr][pUUID] = append(am.m[addr][pUUID], sUUID) return nil } diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 610bd2b7..5a24a2fe 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -9,10 +9,12 @@ import ( "go.uber.org/zap" ) +// Type ... Risk engine execution type type Type int const ( - HardCoded Type = iota + HardCoded Type = iota + 1 + // NOTE: Dynamic invariant support is not implemented Dynamic ) @@ -24,7 +26,7 @@ type RiskEngine interface { } // hardCodedEngine ... Hard coded execution engine -// IE: native application code for invariant implementation +// IE: native hardcoded application code for invariant implementation type hardCodedEngine struct { // TODO: Add any engine specific fields here } diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go new file mode 100644 index 00000000..7cc34a0c --- /dev/null +++ b/internal/engine/engine_test.go @@ -0,0 +1,84 @@ +package engine_test + +import ( + "context" + "fmt" + "testing" + + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/engine" + "github.com/base-org/pessimism/internal/mocks" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +type testSuite struct { + ctrl *gomock.Controller + re engine.RiskEngine + mockInv *mocks.MockInvariant +} + +func createTestSuite(t *testing.T) *testSuite { + ctrl := gomock.NewController(t) + + return &testSuite{ + ctrl: ctrl, + re: engine.NewHardCodedEngine(), + mockInv: mocks.NewMockInvariant(ctrl), + } +} + +func testErr() error { + return fmt.Errorf("test error") +} + +func Test_HardCodedEngine(t *testing.T) { + var tests = []struct { + name string + test func(t *testing.T, ts *testSuite) + }{ + { + name: "Invalidation Failure From Error", + test: func(t *testing.T, ts *testSuite) { + td := core.TransitData{} + + ts.mockInv.EXPECT().Invalidate(td). + Return(nil, false, testErr()).Times(1) + + ts.mockInv.EXPECT().SUUID(). + Return(core.NilSUUID()).Times(1) + + outcome, invalid := ts.re.Execute(context.Background(), td, ts.mockInv) + assert.Nil(t, outcome) + assert.False(t, invalid) + + }}, + { + name: "Successful Invalidation", + test: func(t *testing.T, ts *testSuite) { + td := core.TransitData{} + + expectedOut := &core.InvalOutcome{ + Message: "20 inch blade on the Impala", + } + + ts.mockInv.EXPECT().Invalidate(td). + Return(expectedOut, true, nil).Times(1) + + ts.mockInv.EXPECT().SUUID(). + Return(core.NilSUUID()).Times(1) + + outcome, invalid := ts.re.Execute(context.Background(), td, ts.mockInv) + assert.NotNil(t, outcome) + assert.True(t, invalid) + assert.Equal(t, expectedOut, outcome) + }}, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d-%s", i, test.name), func(t *testing.T) { + ts := createTestSuite(t) + test.test(t, ts) + }) + } +} diff --git a/internal/engine/invariant/config.go b/internal/engine/invariant/config.go index 339b93be..4fe6ea75 100644 --- a/internal/engine/invariant/config.go +++ b/internal/engine/invariant/config.go @@ -7,8 +7,12 @@ type DeployConfig struct { Stateful bool StateKey *core.StateKey - Network core.Network - PUUID core.PUUID + Network core.Network + PUUID core.PUUID + Reuse bool + InvType core.InvariantType InvParams *core.InvSessionParams + + AlertDest core.AlertDestination } diff --git a/internal/engine/invariant/invariant.go b/internal/engine/invariant/invariant.go index 5b024275..5c22c400 100644 --- a/internal/engine/invariant/invariant.go +++ b/internal/engine/invariant/invariant.go @@ -1,3 +1,5 @@ +//go:generate mockgen -package mocks --destination ../../mocks/invariant.go . Invariant + package invariant import ( @@ -60,6 +62,7 @@ func (bi *BaseInvariant) Invalidate(core.TransitData) (*core.InvalOutcome, bool, return nil, false, nil } +// SetSUUID ... Sets the invariant session UUID func (bi *BaseInvariant) SetSUUID(sUUID core.SUUID) { bi.sUUID = sUUID } diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 9abd4562..9ae6caea 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -13,20 +13,19 @@ import ( "github.com/base-org/pessimism/internal/metrics" "github.com/base-org/pessimism/internal/state" - "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" ) // Manager ... Engine manager interface type Manager interface { - core.Subsystem - GetInputType(invType core.InvariantType) (core.RegisterType, error) Transit() chan core.InvariantInput // TODO( ) : Session deletion logic DeleteInvariantSession(core.SUUID) (core.SUUID, error) DeployInvariantSession(cfg *invariant.DeployConfig) (core.SUUID, error) + + core.Subsystem } /* @@ -94,7 +93,7 @@ func (em *engineManager) updateSharedState(invParams *core.InvSessionParams, } // Use accessor method to insert entry into state store - err = state.InsertUnique(em.ctx, sk, invParams.Address()) + err = state.InsertUnique(em.ctx, sk, invParams.Address().String()) if err != nil { return err } @@ -110,7 +109,7 @@ func (em *engineManager) updateSharedState(invParams *core.InvSessionParams, innerKey := &core.StateKey{ Nesting: false, Prefix: sk.Prefix, - ID: invParams.Address(), + ID: invParams.Address().String(), PUUID: &pUUID, } @@ -123,7 +122,7 @@ func (em *engineManager) updateSharedState(invParams *core.InvSessionParams, logging.WithContext(em.ctx).Debug("Setting to state store", zap.String(core.PUUIDKey, pUUID.String()), - zap.String(core.AddrKey, invParams.Address())) + zap.String(core.AddrKey, invParams.Address().String())) return nil } @@ -132,21 +131,23 @@ func (em *engineManager) updateSharedState(invParams *core.InvSessionParams, func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (core.SUUID, error) { reg, exists := em.invTable[cfg.InvType] if !exists { - return core.NilSUUID(), fmt.Errorf("Invariant type %s not found", cfg.InvType) + return core.NilSUUID(), fmt.Errorf("invariant type %s not found", cfg.InvType) } - if reg.Preprocess != nil { + if reg.Preprocess != nil { // Preprocess invariant params err := reg.Preprocess(cfg.InvParams) if err != nil { return core.NilSUUID(), err } } + // Build invariant instance using constructor function from register definition inv, err := reg.Constructor(em.ctx, cfg.InvParams) if err != nil { return core.NilSUUID(), err } + // Generate session UUID and set it to the invariant sUUID := core.MakeSUUID(cfg.Network, cfg.PUUID.PipelineType(), cfg.InvType) inv.SetSUUID(sUUID) @@ -155,10 +156,9 @@ func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (co return core.NilSUUID(), err } + // Shared subsytem state management if cfg.Stateful { - gethAddr := common.HexToAddress(cfg.InvParams.Address()) - - err = em.addresser.Insert(gethAddr, cfg.PUUID, sUUID) + err = em.addresser.Insert(cfg.InvParams.Address(), cfg.PUUID, sUUID) if err != nil { return core.NilSUUID(), err } @@ -170,7 +170,6 @@ func (em *engineManager) DeployInvariantSession(cfg *invariant.DeployConfig) (co } em.metrics.IncActiveInvariants() - return sUUID, nil } @@ -197,7 +196,7 @@ func (em *engineManager) EventLoop() error { func (em *engineManager) GetInputType(invType core.InvariantType) (core.RegisterType, error) { val, exists := em.invTable[invType] if !exists { - return 0, fmt.Errorf("Invariant type %s not found", invType) + return 0, fmt.Errorf("invariant type %s not found", invType) } return val.InputType, nil @@ -231,7 +230,7 @@ func (em *engineManager) executeAddressInvariants(ctx context.Context, data core } for _, sUUID := range ids { - inv, err := em.store.GetInvSessionByUUID(sUUID) + inv, err := em.store.GetInstanceByUUID(sUUID) if err != nil { logger.Error("Could not session by invariant sUUID", zap.Error(err), @@ -247,16 +246,16 @@ func (em *engineManager) executeAddressInvariants(ctx context.Context, data core func (em *engineManager) executeNonAddressInvariants(ctx context.Context, data core.InvariantInput) { logger := logging.WithContext(ctx) - // Fetch all invariants associated with the pipeline - sUUIDs, err := em.store.GetInvSessionsForPipeline(data.PUUID) + // Fetch all session UUIDs associated with the pipeline + sUUIDs, err := em.store.GetSUUIDsByPUUID(data.PUUID) if err != nil { logger.Error("Could not fetch invariants for pipeline", zap.Error(err), zap.String(core.PUUIDKey, data.PUUID.String())) } - // Fetch all invariants by SUUIDs - invs, err := em.store.GetInvariantsByUUIDs(sUUIDs...) + // Fetch all invariants for a slice of SUUIDs + invs, err := em.store.GetInstancesByUUIDs(sUUIDs) if err != nil { logger.Error("Could not fetch invariants for pipeline", zap.Error(err), @@ -273,9 +272,10 @@ func (em *engineManager) executeInvariant(ctx context.Context, data core.Invaria logger := logging.WithContext(ctx) // Execute invariant using risk engine and return alert if invalidation occurs - outcome, invalid := em.engine.Execute(ctx, data.Input, inv) + outcome, invalidated := em.engine.Execute(ctx, data.Input, inv) - if invalid { // Alert + if invalidated { + // Generate & send alert alert := core.Alert{ Timestamp: outcome.TimeStamp, SUUID: inv.SUUID(), diff --git a/internal/engine/registry/balance.go b/internal/engine/registry/balance.go index 254a2666..f8aec89d 100644 --- a/internal/engine/registry/balance.go +++ b/internal/engine/registry/balance.go @@ -20,7 +20,6 @@ type BalanceInvConfig struct { // UnmarshalToBalanceInvConfig ... Converts a general config to a balance invariant config func UnmarshalToBalanceInvConfig(cfg *core.InvSessionParams) (*BalanceInvConfig, error) { - invConfg := BalanceInvConfig{} err := json.Unmarshal(cfg.Bytes(), &invConfg) if err != nil { @@ -49,10 +48,8 @@ const reportMsg = ` // NewBalanceInvariant ... Initializer func NewBalanceInvariant(cfg *BalanceInvConfig) (invariant.Invariant, error) { - return &BalanceInvariant{ - cfg: cfg, - + cfg: cfg, Invariant: invariant.NewBaseInvariant(core.AccountBalance), }, nil } @@ -63,12 +60,12 @@ func (bi *BalanceInvariant) Invalidate(td core.TransitData) (*core.InvalOutcome, logging.NoContext().Debug("Checking invalidation for balance invariant", zap.String("data", fmt.Sprintf("%v", td))) if td.Type != bi.InputType() { - return nil, false, fmt.Errorf("invalid type supplied") + return nil, false, fmt.Errorf(invalidInTypeErr, bi.InputType(), td.Type) } balance, ok := td.Value.(float64) if !ok { - return nil, false, fmt.Errorf("could not cast transit data value to float type") + return nil, false, fmt.Errorf(couldNotCastErr, "float64") } invalidated := false diff --git a/internal/engine/registry/contract_event.go b/internal/engine/registry/contract_event.go index 8fbbf3fd..6ecac140 100644 --- a/internal/engine/registry/contract_event.go +++ b/internal/engine/registry/contract_event.go @@ -48,7 +48,7 @@ const eventReportMsg = ` ` // NewEventInvariant ... Initializer -func NewEventInvariant(cfg *EventInvConfig) (invariant.Invariant, error) { +func NewEventInvariant(cfg *EventInvConfig) invariant.Invariant { var sigs []common.Hash for _, sig := range cfg.Sigs { @@ -60,27 +60,26 @@ func NewEventInvariant(cfg *EventInvConfig) (invariant.Invariant, error) { sigs: sigs, Invariant: invariant.NewBaseInvariant(core.EventLog), - }, nil + } } // Invalidate ... Checks if the balance is within the bounds // specified in the config func (ei *EventInvariant) Invalidate(td core.TransitData) (*core.InvalOutcome, bool, error) { if td.Type != ei.InputType() { - return nil, false, fmt.Errorf("invalid type supplied") + return nil, false, fmt.Errorf(invalidInTypeErr, td.Type.String(), ei.InputType().String()) } if td.Address.String() != ei.cfg.Address { - return nil, false, fmt.Errorf("invalid address supplied") + return nil, false, fmt.Errorf(invalidAddrErr, ei.cfg.Address, td.Address.String()) } log, success := td.Value.(types.Log) if !success { - return nil, false, fmt.Errorf("could not convert transit data to log") + return nil, false, fmt.Errorf(couldNotCastErr, "types.Log") } - var invalidated = false - + invalidated := false for _, sig := range ei.sigs { if log.Topics[0] == sig { invalidated = true diff --git a/internal/engine/registry/contract_event_test.go b/internal/engine/registry/contract_event_test.go index d6d816a4..fb0922ac 100644 --- a/internal/engine/registry/contract_event_test.go +++ b/internal/engine/registry/contract_event_test.go @@ -19,14 +19,13 @@ func Test_Event_Log_Invariant(t *testing.T) { { name: "Successful Invalidation", function: func(t *testing.T, cfg *registry.EventInvConfig) { - ei, err := registry.NewEventInvariant( + ei := registry.NewEventInvariant( ®istry.EventInvConfig{ Address: "0x0000000000000000000000000000000000000420", ContractName: "0x69", Sigs: []string{"0x420"}, }) - assert.NoError(t, err) hash := crypto.Keccak256Hash([]byte("0x420")) td := core.TransitData{ @@ -47,7 +46,7 @@ func Test_Event_Log_Invariant(t *testing.T) { { name: "Error Invalidation Due to Mismatched Addresses", function: func(t *testing.T, cfg *registry.EventInvConfig) { - ei, err := registry.NewEventInvariant( + ei := registry.NewEventInvariant( ®istry.EventInvConfig{ Address: "0x0000000000000000000000000000000000000420", ContractName: "0x69", @@ -74,14 +73,13 @@ func Test_Event_Log_Invariant(t *testing.T) { { name: "No Invalidation Due to Missing Signature", function: func(t *testing.T, cfg *registry.EventInvConfig) { - ei, err := registry.NewEventInvariant( + ei := registry.NewEventInvariant( ®istry.EventInvConfig{ Address: "0x0000000000000000000000000000000000000420", ContractName: "0x69", Sigs: []string{"0x424"}, }) - assert.NoError(t, err) hash := crypto.Keccak256Hash([]byte("0x420")) td := core.TransitData{ diff --git a/internal/engine/registry/errors.go b/internal/engine/registry/errors.go new file mode 100644 index 00000000..1b71606a --- /dev/null +++ b/internal/engine/registry/errors.go @@ -0,0 +1,7 @@ +package registry + +const ( + invalidInTypeErr = "invalid input type provided for invariant. expected %s, got %s" + invalidAddrErr = "invalid address provided for invariant. expected %s, got %s" + couldNotCastErr = "could not cast transit data value to %s type" +) diff --git a/internal/engine/registry/registry.go b/internal/engine/registry/registry.go index 1417c63b..f3a25fe4 100644 --- a/internal/engine/registry/registry.go +++ b/internal/engine/registry/registry.go @@ -6,6 +6,7 @@ import ( "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine/invariant" + "github.com/ethereum/go-ethereum/common" ) // InvariantTable ... Invariant table @@ -14,7 +15,7 @@ type InvariantTable map[core.InvariantType]*InvRegister // InvRegister ... Invariant register struct type InvRegister struct { Preprocess func(*core.InvSessionParams) error - MultiChain bool + Policy core.ChainSubscription InputType core.RegisterType Constructor func(ctx context.Context, isp *core.InvSessionParams) (invariant.Invariant, error) } @@ -24,19 +25,19 @@ func NewInvariantTable() InvariantTable { tbl := map[core.InvariantType]*InvRegister{ core.BalanceEnforcement: { Preprocess: addressPreprocess, - MultiChain: false, + Policy: core.BothNetworks, InputType: core.AccountBalance, Constructor: constructBalanceInv, }, core.ContractEvent: { Preprocess: eventPreprocess, - MultiChain: false, + Policy: core.BothNetworks, InputType: core.EventLog, Constructor: constructEventInv, }, core.WithdrawalEnforcement: { Preprocess: preprocWithdrwlEnforce, - MultiChain: true, + Policy: core.OnlyLayer1, InputType: core.EventLog, Constructor: constructWithdrawlEnforceInv, }, @@ -45,6 +46,7 @@ func NewInvariantTable() InvariantTable { return tbl } +// constructWithdrawlEnforceInv ... Constructs a withdrawal enforcement invariant func constructWithdrawlEnforceInv(ctx context.Context, isp *core.InvSessionParams) (invariant.Invariant, error) { cfg, err := UnmarshalToWthdrawlEnforceCfg(isp) if err != nil { @@ -54,15 +56,17 @@ func constructWithdrawlEnforceInv(ctx context.Context, isp *core.InvSessionParam return NewWthdrawlEnforceInv(ctx, cfg) } +// constructEventInv ... Constructs an event invariant instance func constructEventInv(_ context.Context, isp *core.InvSessionParams) (invariant.Invariant, error) { cfg, err := UnmarshalToEventInvConfig(isp) if err != nil { return nil, err } - return NewEventInvariant(cfg) + return NewEventInvariant(cfg), nil } +// constructBalanceInv ... Constructs a balance invariant instance func constructBalanceInv(_ context.Context, isp *core.InvSessionParams) (invariant.Invariant, error) { cfg, err := UnmarshalToBalanceInvConfig(isp) if err != nil { @@ -87,7 +91,8 @@ func eventPreprocess(cfg *core.InvSessionParams) error { // NewBalanceInvariant ... Ensures that an address exists in the session params func addressPreprocess(cfg *core.InvSessionParams) error { - if cfg.Address() == "" { + nilAddr := common.Address{0} + if cfg.Address() == nilAddr { return fmt.Errorf("address not found") } @@ -96,7 +101,7 @@ func addressPreprocess(cfg *core.InvSessionParams) error { // preprocWithdrwlEnforce ... Ensures that the l2 to l1 message passer exists // and performs a "hack" operation to set the address key as the l2tol1MessagePasser -// address for upstream ETL components (ie. event log) to know which address to +// address for upstream ETL components (ie. event log) to know which L1 address to // query for events func preprocWithdrwlEnforce(cfg *core.InvSessionParams) error { l1Portal, err := cfg.Value(core.L1Portal) @@ -104,11 +109,12 @@ func preprocWithdrwlEnforce(cfg *core.InvSessionParams) error { return err } - // Configure the session to subscribe to events from the L1Portal contract + // Configure the session to inform the ETL to subscribe + // to withdrawal proof events from the L1Portal contract cfg.SetValue(core.AddrKey, l1Portal) if len(cfg.NestedArgs()) != 0 { - return fmt.Errorf("No nested args should be present") + return fmt.Errorf("no nested args should be present") } cfg.SetNestedArg("WithdrawalProven(bytes32,address,address)") diff --git a/internal/engine/registry/withdrawal_enforce.go b/internal/engine/registry/withdrawal_enforce.go index c1c1b061..06b4268e 100644 --- a/internal/engine/registry/withdrawal_enforce.go +++ b/internal/engine/registry/withdrawal_enforce.go @@ -56,7 +56,6 @@ type WthdrawlEnforceInv struct { // NewWthdrawlEnforceInv ... Initializer func NewWthdrawlEnforceInv(ctx context.Context, cfg *WthdrawlEnforceCfg) (invariant.Invariant, error) { - l2Client, err := client.FromContext(ctx, core.Layer2) if err != nil { return nil, err @@ -101,12 +100,12 @@ func (wi *WthdrawlEnforceInv) Invalidate(td core.TransitData) (*core.InvalOutcom } if td.Address.String() != wi.cfg.L1PortalAddress { - return nil, false, fmt.Errorf("invalid address supplied") + return nil, false, fmt.Errorf(invalidAddrErr, td.Address.String(), wi.cfg.L1PortalAddress) } log, success := td.Value.(types.Log) if !success { - return nil, false, fmt.Errorf("could not convert transit data to log") + return nil, false, fmt.Errorf(couldNotCastErr, "types.Log") } provenWithdrawl, err := wi.l1PortalFilter.ParseWithdrawalProven(log) diff --git a/internal/engine/store.go b/internal/engine/store.go index 8366f2f2..a1090057 100644 --- a/internal/engine/store.go +++ b/internal/engine/store.go @@ -10,31 +10,31 @@ import ( // SessionStore ... type SessionStore interface { AddInvSession(sUUID core.SUUID, pID core.PUUID, inv invariant.Invariant) error - GetInvSessionByUUID(sUUID core.SUUID) (invariant.Invariant, error) - GetInvariantsByUUIDs(sUUIDs ...core.SUUID) ([]invariant.Invariant, error) - GetInvSessionsForPipeline(pUUID core.PUUID) ([]core.SUUID, error) + GetInstanceByUUID(sUUID core.SUUID) (invariant.Invariant, error) + GetInstancesByUUIDs(sUUIDs []core.SUUID) ([]invariant.Invariant, error) + GetSUUIDsByPUUID(pUUID core.PUUID) ([]core.SUUID, error) } // sessionStore ... type sessionStore struct { - sessionPipelineMap map[core.PUUID][]core.SUUID - invSessionMap map[core.SUUID]invariant.Invariant // no duplicates + idMap map[core.PUUID][]core.SUUID + instanceMap map[core.SUUID]invariant.Invariant // no duplicates } // NewSessionStore ... Initializer func NewSessionStore() SessionStore { return &sessionStore{ - invSessionMap: make(map[core.SUUID]invariant.Invariant), - sessionPipelineMap: make(map[core.PUUID][]core.SUUID), + instanceMap: make(map[core.SUUID]invariant.Invariant), + idMap: make(map[core.PUUID][]core.SUUID), } } -// GetInvariantsByUUIDs ... Fetches in-order all invariants associated with a set of session UUIDs -func (ss *sessionStore) GetInvariantsByUUIDs(sUUIDs ...core.SUUID) ([]invariant.Invariant, error) { +// GetInstancesByUUIDs ... Fetches in-order all invariants associated with a set of session UUIDs +func (ss *sessionStore) GetInstancesByUUIDs(sUUIDs []core.SUUID) ([]invariant.Invariant, error) { invariants := make([]invariant.Invariant, len(sUUIDs)) for i, uuid := range sUUIDs { - session, err := ss.GetInvSessionByUUID(uuid) + session, err := ss.GetInstanceByUUID(uuid) if err != nil { return nil, err } @@ -45,17 +45,17 @@ func (ss *sessionStore) GetInvariantsByUUIDs(sUUIDs ...core.SUUID) ([]invariant. return invariants, nil } -// GetInvSessionByUUID .... Fetches invariant session by UUID -func (ss *sessionStore) GetInvSessionByUUID(sUUID core.SUUID) (invariant.Invariant, error) { - if entry, found := ss.invSessionMap[sUUID]; found { +// GetInstanceByUUID .... Fetches invariant session by SUUID +func (ss *sessionStore) GetInstanceByUUID(sUUID core.SUUID) (invariant.Invariant, error) { + if entry, found := ss.instanceMap[sUUID]; found { return entry, nil } return nil, fmt.Errorf("invariant UUID doesn't exists in store inv mapping") } -// GetInvSessionsForPipeline ... Returns all invariant session ids associated with pipeline -func (ss *sessionStore) GetInvSessionsForPipeline(pUUID core.PUUID) ([]core.SUUID, error) { - if sessionIDs, found := ss.sessionPipelineMap[pUUID]; found { +// GetSUUIDsByPUUID ... Returns all invariant session ids associated with pipeline +func (ss *sessionStore) GetSUUIDsByPUUID(pUUID core.PUUID) ([]core.SUUID, error) { + if sessionIDs, found := ss.idMap[pUUID]; found { return sessionIDs, nil } return nil, fmt.Errorf("pipeline UUID doesn't exists in store inv mapping") @@ -64,15 +64,16 @@ func (ss *sessionStore) GetInvSessionsForPipeline(pUUID core.PUUID) ([]core.SUUI // AddInvSession ... Adds an invariant session to the store func (ss *sessionStore) AddInvSession(sUUID core.SUUID, pUUID core.PUUID, inv invariant.Invariant) error { - if _, found := ss.invSessionMap[sUUID]; found { + if _, found := ss.instanceMap[sUUID]; found { return fmt.Errorf("invariant UUID already exists in store pid mapping") } - if _, found := ss.sessionPipelineMap[pUUID]; !found { // - ss.sessionPipelineMap[pUUID] = make([]core.SUUID, 0) + if _, found := ss.idMap[pUUID]; !found { + ss.idMap[pUUID] = make([]core.SUUID, 0) } - ss.invSessionMap[sUUID] = inv - ss.sessionPipelineMap[pUUID] = append(ss.sessionPipelineMap[pUUID], sUUID) + + ss.instanceMap[sUUID] = inv + ss.idMap[pUUID] = append(ss.idMap[pUUID], sUUID) return nil } diff --git a/internal/engine/store_test.go b/internal/engine/store_test.go index 39514b8c..5cfeeb12 100644 --- a/internal/engine/store_test.go +++ b/internal/engine/store_test.go @@ -1,18 +1,150 @@ package engine_test import ( + "fmt" "testing" "github.com/base-org/pessimism/internal/core" "github.com/base-org/pessimism/internal/engine" + "github.com/base-org/pessimism/internal/engine/invariant" "github.com/stretchr/testify/assert" ) func TestSessionStore(t *testing.T) { - // Setup - ss := engine.NewSessionStore() + sUUID1 := core.MakeSUUID(core.Layer1, core.Live, core.InvariantType(0)) + sUUID2 := core.MakeSUUID(core.Layer2, core.Live, core.InvariantType(0)) + pUUID1 := core.NilPUUID() - // Test GetInvSessionByUUID - _, err := ss.GetInvSessionByUUID(core.NilSUUID()) - assert.Error(t, err, "should error") + var tests = []struct { + name string + function string + constructor func() engine.SessionStore + testFunc func(t *testing.T, ss engine.SessionStore) + }{ + { + name: "Successful Retrieval", + constructor: func() engine.SessionStore { + ss := engine.NewSessionStore() + + inv := invariant.NewBaseInvariant(core.RegisterType(0)) + inv.SetSUUID(sUUID1) + + _ = ss.AddInvSession(sUUID1, pUUID1, inv) + + return ss + }, + testFunc: func(t *testing.T, ss engine.SessionStore) { + // Ensure that the invariant is retrievable + inv, err := ss.GetInstanceByUUID(sUUID1) + assert.NoError(t, err) + assert.Equal(t, inv.SUUID(), sUUID1) + + // Ensure that pipeline UUIDs are retrievable + sUUIDs, err := ss.GetSUUIDsByPUUID(pUUID1) + assert.NoError(t, err) + assert.Equal(t, sUUIDs, []core.SUUID{sUUID1}) + }, + }, + { + name: "Successful Retrieval with Multiple Invariants", + constructor: func() engine.SessionStore { + ss := engine.NewSessionStore() + + inv := invariant.NewBaseInvariant(core.RegisterType(0)) + inv.SetSUUID(sUUID1) + + _ = ss.AddInvSession(sUUID1, pUUID1, inv) + + inv2 := invariant.NewBaseInvariant(core.RegisterType(0)) + inv2.SetSUUID(sUUID2) + + _ = ss.AddInvSession(sUUID2, pUUID1, inv2) + + return ss + }, + testFunc: func(t *testing.T, ss engine.SessionStore) { + // Ensure that the first inserted invariant is retrievable + inv, err := ss.GetInstanceByUUID(sUUID1) + assert.NoError(t, err) + assert.Equal(t, inv.SUUID(), sUUID1) + + // Ensure that the second inserted invariant is retrievable + inv2, err := ss.GetInstanceByUUID(sUUID2) + assert.NoError(t, err) + assert.Equal(t, inv2.SUUID(), sUUID2) + + // Ensure that pipeline UUIDs are retrievable + sUUIDs, err := ss.GetSUUIDsByPUUID(pUUID1) + assert.NoError(t, err) + assert.Equal(t, sUUIDs, []core.SUUID{sUUID1, sUUID2}) + + // Ensure that both invariants are retrievable at once + invs, err := ss.GetInstancesByUUIDs([]core.SUUID{sUUID1, sUUID2}) + assert.NoError(t, err) + assert.Equal(t, invs, []invariant.Invariant{inv, inv2}) + }, + }, + { + name: "Successful Retrieval", + constructor: func() engine.SessionStore { + ss := engine.NewSessionStore() + + inv := invariant.NewBaseInvariant(core.RegisterType(0)) + inv.SetSUUID(sUUID1) + + _ = ss.AddInvSession(sUUID1, pUUID1, inv) + return ss + }, + testFunc: func(t *testing.T, ss engine.SessionStore) { + // Ensure that the invariant is retrievable + inv, err := ss.GetInstanceByUUID(sUUID1) + assert.NoError(t, err) + assert.Equal(t, inv.SUUID(), sUUID1) + + // Ensure that pipeline UUIDs are retrievable + sUUIDs, err := ss.GetSUUIDsByPUUID(pUUID1) + assert.NoError(t, err) + assert.Equal(t, sUUIDs, []core.SUUID{sUUID1}) + }, + }, + { + name: "Failed Retrieval", + constructor: func() engine.SessionStore { + ss := engine.NewSessionStore() + + inv := invariant.NewBaseInvariant(core.RegisterType(0)) + _ = ss.AddInvSession(sUUID1, pUUID1, inv) + + return ss + }, + testFunc: func(t *testing.T, ss engine.SessionStore) { + inv, err := ss.GetInstanceByUUID(sUUID2) + assert.Nil(t, inv) + assert.Error(t, err) + }, + }, + { + name: "Failed Add with Duplicate SUUIDs", + constructor: func() engine.SessionStore { + ss := engine.NewSessionStore() + + inv := invariant.NewBaseInvariant(core.RegisterType(0)) + _ = ss.AddInvSession(sUUID1, pUUID1, inv) + + return ss + }, + testFunc: func(t *testing.T, ss engine.SessionStore) { + // Ensure that only one suuid can exist in the store + err := ss.AddInvSession(sUUID1, pUUID1, invariant.NewBaseInvariant(core.RegisterType(0))) + assert.Error(t, err) + }, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d-%s", i, test.name), func(t *testing.T) { + ss := test.constructor() + test.testFunc(t, ss) + }) + } } diff --git a/internal/etl/registry/pipe/event_log.go b/internal/etl/registry/pipe/event_log.go index 8f628d10..a4733a59 100644 --- a/internal/etl/registry/pipe/event_log.go +++ b/internal/etl/registry/pipe/event_log.go @@ -24,6 +24,7 @@ type EventDefinition struct { sk *core.StateKey pUUID core.PUUID cfg *core.ClientConfig + ss state.Store } // ConfigureRoutine ... Sets up the pipe client connection and persists puuid to definition state @@ -40,9 +41,15 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, return nil, err } + stateStore, err := state.FromContext(ctx) + if err != nil { + return nil, err + } + ed := &EventDefinition{ cfg: cfg, client: client, + ss: stateStore, } p, err := component.NewPipe(ctx, ed, core.GethBlock, core.EventLog, opts...) @@ -54,27 +61,11 @@ func NewEventParserPipe(ctx context.Context, cfg *core.ClientConfig, return p, nil } -// contractEvents ... Struct to hold the contract address and the event signatures -type contractEvents struct { - address common.Address - sigs []common.Hash -} - -// HasSignature ... Checks if the event has the signature -func (ce *contractEvents) HasSignature(sig common.Hash) bool { - for _, s := range ce.sigs { - if s == sig { - return true - } - } - - return false -} - // getEventsToMonitor ... Gets the smart contract events to monitor from the state store -func (ed *EventDefinition) getEventsToMonitor(ctx context.Context, - addresses []string, ss state.Store) ([]contractEvents, error) { - var events []contractEvents +func (ed *EventDefinition) getTopics(ctx context.Context, + addresses []string, ss state.Store) [][]common.Hash { + events := make([]common.Hash, 0) + for _, address := range addresses { innerKey := &core.StateKey{ Nesting: false, @@ -86,55 +77,46 @@ func (ed *EventDefinition) getEventsToMonitor(ctx context.Context, sigs, err := ss.GetSlice(ctx, innerKey) if err != nil { logging.WithContext(ctx).Error(err.Error()) - return []contractEvents{}, err } - var parsedSigs []common.Hash for _, sig := range sigs { - parsedSigs = append(parsedSigs, crypto.Keccak256Hash([]byte(sig))) + events = append(events, crypto.Keccak256Hash([]byte(sig))) } - - logging.WithContext(ctx).Debug(fmt.Sprintf("Address: %s, Sigs: %v", address, parsedSigs)) - events = append(events, contractEvents{ - address: common.HexToAddress(address), - sigs: parsedSigs, - }) } - return events, nil + topics := make([][]common.Hash, 1) + topics[0] = events + + return topics } // Transform ... Gets the events from the block, filters them and // returns them if they are in the list of events to monitor func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ([]core.TransitData, error) { + // 1. Convert arbitrary transit data to go-ethereum compatible block type block, success := td.Value.(types.Block) if !success { return []core.TransitData{}, fmt.Errorf("could not convert to block") } - stateStore, err := state.FromContext(ctx) - if err != nil { - return []core.TransitData{}, err - } - + // 2. Fetch the addresess and events to monitor for logging.NoContext().Debug("Getting addresess", zap.String(core.PUUIDKey, ed.pUUID.String())) - addresses, err := stateStore.GetSlice(ctx, ed.sk) - if err != nil { - return []core.TransitData{}, err - } - - eventsToMonitor, err := ed.getEventsToMonitor(ctx, addresses, stateStore) + addresses, err := ed.ss.GetSlice(ctx, ed.sk) if err != nil { return []core.TransitData{}, err } + topics := ed.getTopics(ctx, addresses, ed.ss) hash := block.Header().Hash() + // 3. Construct and execute a filter query on the provided block + // to get the relevant logs query := ethereum.FilterQuery{ BlockHash: &hash, Addresses: pess_common.SliceToAddresses(addresses), + Topics: topics, } logs, err := ed.client.FilterLogs(context.Background(), query) @@ -142,19 +124,16 @@ func (ed *EventDefinition) Transform(ctx context.Context, td core.TransitData) ( return []core.TransitData{}, err } + // 4. See if there are any logs to process if len(logs) == 0 { return []core.TransitData{}, nil } + // 5. Convert the logs to transit data and return them result := make([]core.TransitData, 0) for _, log := range logs { - for _, event := range eventsToMonitor { - // Check if event is in the list of events to monitor - if event.address == log.Address && event.HasSignature(log.Topics[0]) { - result = append(result, - core.NewTransitData(core.EventLog, log, core.WithAddress(log.Address))) - } - } + result = append(result, + core.NewTransitData(core.EventLog, log, core.WithAddress(log.Address))) } return result, nil diff --git a/internal/mocks/alert_manager.go b/internal/mocks/alert_manager.go index 787196a8..0266fcf0 100644 --- a/internal/mocks/alert_manager.go +++ b/internal/mocks/alert_manager.go @@ -34,18 +34,18 @@ func (m *AlertManager) EXPECT() *AlertManagerMockRecorder { return m.recorder } -// AddInvariantSession mocks base method. -func (m *AlertManager) AddInvariantSession(arg0 core.SUUID, arg1 core.AlertDestination) error { +// AddSession mocks base method. +func (m *AlertManager) AddSession(arg0 core.SUUID, arg1 core.AlertDestination) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddInvariantSession", arg0, arg1) + ret := m.ctrl.Call(m, "AddSession", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// AddInvariantSession indicates an expected call of AddInvariantSession. -func (mr *AlertManagerMockRecorder) AddInvariantSession(arg0, arg1 interface{}) *gomock.Call { +// AddSession indicates an expected call of AddSession. +func (mr *AlertManagerMockRecorder) AddSession(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddInvariantSession", reflect.TypeOf((*AlertManager)(nil).AddInvariantSession), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSession", reflect.TypeOf((*AlertManager)(nil).AddSession), arg0, arg1) } // EventLoop mocks base method. diff --git a/internal/mocks/invariant.go b/internal/mocks/invariant.go new file mode 100644 index 00000000..3c27c21f --- /dev/null +++ b/internal/mocks/invariant.go @@ -0,0 +1,91 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/base-org/pessimism/internal/engine/invariant (interfaces: Invariant) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + core "github.com/base-org/pessimism/internal/core" + gomock "github.com/golang/mock/gomock" +) + +// MockInvariant is a mock of Invariant interface. +type MockInvariant struct { + ctrl *gomock.Controller + recorder *MockInvariantMockRecorder +} + +// MockInvariantMockRecorder is the mock recorder for MockInvariant. +type MockInvariantMockRecorder struct { + mock *MockInvariant +} + +// NewMockInvariant creates a new mock instance. +func NewMockInvariant(ctrl *gomock.Controller) *MockInvariant { + mock := &MockInvariant{ctrl: ctrl} + mock.recorder = &MockInvariantMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInvariant) EXPECT() *MockInvariantMockRecorder { + return m.recorder +} + +// InputType mocks base method. +func (m *MockInvariant) InputType() core.RegisterType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InputType") + ret0, _ := ret[0].(core.RegisterType) + return ret0 +} + +// InputType indicates an expected call of InputType. +func (mr *MockInvariantMockRecorder) InputType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InputType", reflect.TypeOf((*MockInvariant)(nil).InputType)) +} + +// Invalidate mocks base method. +func (m *MockInvariant) Invalidate(arg0 core.TransitData) (*core.InvalOutcome, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Invalidate", arg0) + ret0, _ := ret[0].(*core.InvalOutcome) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// Invalidate indicates an expected call of Invalidate. +func (mr *MockInvariantMockRecorder) Invalidate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Invalidate", reflect.TypeOf((*MockInvariant)(nil).Invalidate), arg0) +} + +// SUUID mocks base method. +func (m *MockInvariant) SUUID() core.SUUID { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SUUID") + ret0, _ := ret[0].(core.SUUID) + return ret0 +} + +// SUUID indicates an expected call of SUUID. +func (mr *MockInvariantMockRecorder) SUUID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SUUID", reflect.TypeOf((*MockInvariant)(nil).SUUID)) +} + +// SetSUUID mocks base method. +func (m *MockInvariant) SetSUUID(arg0 core.SUUID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetSUUID", arg0) +} + +// SetSUUID indicates an expected call of SetSUUID. +func (mr *MockInvariantMockRecorder) SetSUUID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSUUID", reflect.TypeOf((*MockInvariant)(nil).SetSUUID), arg0) +} diff --git a/internal/mocks/subsystem.go b/internal/mocks/subsystem.go new file mode 100644 index 00000000..c7b3297d --- /dev/null +++ b/internal/mocks/subsystem.go @@ -0,0 +1,109 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/base-org/pessimism/internal/subsystem (interfaces: Manager) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + models "github.com/base-org/pessimism/internal/api/models" + core "github.com/base-org/pessimism/internal/core" + invariant "github.com/base-org/pessimism/internal/engine/invariant" + gomock "github.com/golang/mock/gomock" +) + +// SubManager is a mock of Manager interface. +type SubManager struct { + ctrl *gomock.Controller + recorder *SubManagerMockRecorder +} + +// SubManagerMockRecorder is the mock recorder for SubManager. +type SubManagerMockRecorder struct { + mock *SubManager +} + +// NewSubManager creates a new mock instance. +func NewSubManager(ctrl *gomock.Controller) *SubManager { + mock := &SubManager{ctrl: ctrl} + mock.recorder = &SubManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *SubManager) EXPECT() *SubManagerMockRecorder { + return m.recorder +} + +// BuildDeployCfg mocks base method. +func (m *SubManager) BuildDeployCfg(arg0 *core.PipelineConfig, arg1 *core.SessionConfig) (*invariant.DeployConfig, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BuildDeployCfg", arg0, arg1) + ret0, _ := ret[0].(*invariant.DeployConfig) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BuildDeployCfg indicates an expected call of BuildDeployCfg. +func (mr *SubManagerMockRecorder) BuildDeployCfg(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildDeployCfg", reflect.TypeOf((*SubManager)(nil).BuildDeployCfg), arg0, arg1) +} + +// BuildPipelineCfg mocks base method. +func (m *SubManager) BuildPipelineCfg(arg0 *models.InvRequestParams) (*core.PipelineConfig, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BuildPipelineCfg", arg0) + ret0, _ := ret[0].(*core.PipelineConfig) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BuildPipelineCfg indicates an expected call of BuildPipelineCfg. +func (mr *SubManagerMockRecorder) BuildPipelineCfg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BuildPipelineCfg", reflect.TypeOf((*SubManager)(nil).BuildPipelineCfg), arg0) +} + +// RunInvSession mocks base method. +func (m *SubManager) RunInvSession(arg0 *invariant.DeployConfig) (core.SUUID, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunInvSession", arg0) + ret0, _ := ret[0].(core.SUUID) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RunInvSession indicates an expected call of RunInvSession. +func (mr *SubManagerMockRecorder) RunInvSession(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunInvSession", reflect.TypeOf((*SubManager)(nil).RunInvSession), arg0) +} + +// Shutdown mocks base method. +func (m *SubManager) Shutdown() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Shutdown") + ret0, _ := ret[0].(error) + return ret0 +} + +// Shutdown indicates an expected call of Shutdown. +func (mr *SubManagerMockRecorder) Shutdown() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*SubManager)(nil).Shutdown)) +} + +// StartEventRoutines mocks base method. +func (m *SubManager) StartEventRoutines(arg0 context.Context) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StartEventRoutines", arg0) +} + +// StartEventRoutines indicates an expected call of StartEventRoutines. +func (mr *SubManagerMockRecorder) StartEventRoutines(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventRoutines", reflect.TypeOf((*SubManager)(nil).StartEventRoutines), arg0) +} diff --git a/internal/state/memory.go b/internal/state/memory.go index d4efc6d3..f0520edf 100644 --- a/internal/state/memory.go +++ b/internal/state/memory.go @@ -78,29 +78,3 @@ func (ss *stateStore) Remove(_ context.Context, key *core.StateKey) error { delete(ss.sliceStore, key.String()) return nil } - -// GetNestedSubset ... Fetches a subset of a nested slice provided a nested -// key/value pair (ie. filters the state object into a subset object that -// contains only the values that match the nested key/value pair) -func (ss *stateStore) GetNestedSubset(_ context.Context, - key *core.StateKey) (map[string][]string, error) { - ss.RLock() - defer ss.RUnlock() - - values, exists := ss.sliceStore[key.String()] - if !exists { - return map[string][]string{}, fmt.Errorf(notFoundError, key) - } - - var nestedMap = make(map[string][]string, 0) - for _, val := range values { - if _, exists := ss.sliceStore[val]; !exists { - return map[string][]string{}, fmt.Errorf(notFoundError, val) - } - - nestedValues := ss.sliceStore[val] - nestedMap[val] = nestedValues - } - - return nestedMap, nil -} diff --git a/internal/state/memory_test.go b/internal/state/memory_test.go index 72d8c0ce..8f4867c4 100644 --- a/internal/state/memory_test.go +++ b/internal/state/memory_test.go @@ -14,9 +14,7 @@ func Test_MemState(t *testing.T) { testKey := &core.StateKey{false, 1, "test", nil} testValue := "0xabc" - testValue2 := "0xdef" - innerTestKey := &core.StateKey{false, 1, "best", nil} var tests = []struct { name string description string @@ -68,30 +66,6 @@ func Test_MemState(t *testing.T) { assert.NoError(t, err, "should not error") }, }, - { - name: "Test_GetNestedSubset_Success", - description: "Test get nested subset", - function: "GetNestedSubset", - construction: func() state.Store { - ss := state.NewMemState() - _, err := ss.SetSlice(context.Background(), testKey, innerTestKey.String()) - if err != nil { - panic(err) - } - - _, err = ss.SetSlice(context.Background(), innerTestKey, testValue2) - if err != nil { - panic(err) - } - return ss - }, - testLogic: func(t *testing.T, ss state.Store) { - subGraph, err := ss.GetNestedSubset(context.Background(), testKey) - assert.NoError(t, err, "should not error") - - assert.Contains(t, subGraph, innerTestKey.String(), "should contain inner key") - }, - }, } // TODO - Consider making generic test helpers for this diff --git a/internal/state/state.go b/internal/state/state.go index 099e2e37..299a50d8 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -11,7 +11,6 @@ import ( // TODO() - Add optional redis store implementation type Store interface { GetSlice(context.Context, *core.StateKey) ([]string, error) - GetNestedSubset(ctx context.Context, key *core.StateKey) (map[string][]string, error) SetSlice(context.Context, *core.StateKey, string) (string, error) Remove(context.Context, *core.StateKey) error diff --git a/internal/subsystem/manager.go b/internal/subsystem/manager.go index bd971539..76a6a520 100644 --- a/internal/subsystem/manager.go +++ b/internal/subsystem/manager.go @@ -1,3 +1,5 @@ +//go:generate mockgen -package mocks --destination ../mocks/subsystem.go --mock_names Manager=SubManager . Manager + package subsystem import ( @@ -38,8 +40,10 @@ func (cfg *Config) GetPollInterval(n core.Network) (time.Duration, error) { // Manager ... Subsystem manager interface type Manager interface { + BuildDeployCfg(pConfig *core.PipelineConfig, sConfig *core.SessionConfig) (*invariant.DeployConfig, error) BuildPipelineCfg(params *models.InvRequestParams) (*core.PipelineConfig, error) - RunInvSession(pConfig *core.PipelineConfig, sConfig *core.SessionConfig) (core.SUUID, error) + RunInvSession(cfg *invariant.DeployConfig) (core.SUUID, error) + // Orchestration StartEventRoutines(ctx context.Context) Shutdown() error } @@ -71,16 +75,18 @@ func NewManager(ctx context.Context, cfg *Config, etl pipeline.Manager, eng engi } // Shutdown ... Shuts down all subsystems in primary data flow order -// Ie. ETL -> Engine -> Alert func (m *manager) Shutdown() error { + // 1. Shutdown ETL subsystem if err := m.etl.Shutdown(); err != nil { return err } + // 2. Shutdown Engine subsystem if err := m.eng.Shutdown(); err != nil { return err } + // 3. Shutdown Alert subsystem return m.alrt.Shutdown() } @@ -116,48 +122,63 @@ func (m *manager) StartEventRoutines(ctx context.Context) { }() } -func (m *manager) RunInvSession(pConfig *core.PipelineConfig, sConfig *core.SessionConfig) (core.SUUID, error) { - logger := logging.WithContext(m.ctx) - +// BuildDeployCfg ... Builds a deploy config provided a pipeline & session config +func (m *manager) BuildDeployCfg(pConfig *core.PipelineConfig, + sConfig *core.SessionConfig) (*invariant.DeployConfig, error) { + // 1. Fetch state key using risk engine input register type sk, stateful, err := m.etl.GetStateKey(pConfig.DataType) if err != nil { - return core.NilSUUID(), err + return nil, err } + // 2. Create data pipeline pUUID, reuse, err := m.etl.CreateDataPipeline(pConfig) + if err != nil { + return nil, err + } - logger.Info("Created etl pipeline", zap.String(core.PUUIDKey, pUUID.String())) + logging.WithContext(m.ctx). + Info("Created etl pipeline", zap.String(core.PUUIDKey, pUUID.String())) - deployCfg := &invariant.DeployConfig{ + // 3. Create a deploy config + return &invariant.DeployConfig{ PUUID: pUUID, + Reuse: reuse, InvType: sConfig.Type, InvParams: sConfig.Params, Network: pConfig.Network, Stateful: stateful, StateKey: sk, - } + AlertDest: sConfig.AlertDest, + }, nil +} - sUUID, err := m.eng.DeployInvariantSession(deployCfg) +// RunInvSession ... Runs an invariant session +func (m *manager) RunInvSession(cfg *invariant.DeployConfig) (core.SUUID, error) { + // 1. Deploy invariant session to risk engine + sUUID, err := m.eng.DeployInvariantSession(cfg) if err != nil { return core.NilSUUID(), err } - logger.Info("Deployed invariant session to risk engine", zap.String(core.SUUIDKey, sUUID.String())) + logging.WithContext(m.ctx). + Info("Deployed invariant session to risk engine", zap.String(core.SUUIDKey, sUUID.String())) - err = m.alrt.AddInvariantSession(sUUID, sConfig.AlertDest) + // 2. Add session to alert manager + err = m.alrt.AddSession(sUUID, cfg.AlertDest) if err != nil { return core.NilSUUID(), err } - if reuse { // If the pipeline was reused, we don't need to run it again + // 3. Run pipeline if not reused + if cfg.Reuse { return sUUID, nil } - if err = m.etl.RunPipeline(pUUID); err != nil { // Spinup pipeline components + if err = m.etl.RunPipeline(cfg.PUUID); err != nil { // Spinup pipeline components return core.NilSUUID(), err } return sUUID, nil - } // BuildPipelineCfg ... Builds a pipeline config provided a set of invariant request params @@ -179,10 +200,8 @@ func (m *manager) BuildPipelineCfg(params *models.InvRequestParams) (*core.Pipel ClientConfig: &core.ClientConfig{ Network: params.NetworkType(), PollInterval: pollInterval, - NumOfRetries: 3, StartHeight: params.StartHeight, EndHeight: params.EndHeight, }, }, nil - } diff --git a/internal/subsystem/manager_test.go b/internal/subsystem/manager_test.go index 99416fa0..ce79d0b4 100644 --- a/internal/subsystem/manager_test.go +++ b/internal/subsystem/manager_test.go @@ -1,3 +1,224 @@ package subsystem_test -// TODO(#76) : No Subsystem Manager Tests +import ( + "context" + "fmt" + "testing" + + "github.com/base-org/pessimism/internal/core" + "github.com/base-org/pessimism/internal/engine/invariant" + "github.com/base-org/pessimism/internal/mocks" + "github.com/base-org/pessimism/internal/subsystem" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func testErr() error { + return fmt.Errorf("test error") +} + +type testSuite struct { + subsys subsystem.Manager + + mockEtl *mocks.EtlManager + mockEng *mocks.EngineManager + mockAlrt *mocks.AlertManager + mockCtrl *gomock.Controller +} + +func createTestSuite(t *testing.T) *testSuite { + ctrl := gomock.NewController(t) + + etlMock := mocks.NewEtlManager(ctrl) + engMock := mocks.NewEngineManager(ctrl) + alrtMock := mocks.NewAlertManager(ctrl) + cfg := &subsystem.Config{} + + subsys := subsystem.NewManager(context.Background(), cfg, etlMock, engMock, alrtMock) + + return &testSuite{ + subsys: subsys, + mockEtl: etlMock, + mockEng: engMock, + mockAlrt: alrtMock, + mockCtrl: ctrl, + } +} + +func Test_BuildDeployCfg(t *testing.T) { + pConfig := &core.PipelineConfig{ + Network: core.Layer1, + DataType: core.GethBlock, + PipelineType: core.Live, + ClientConfig: nil, + } + + sConfig := &core.SessionConfig{ + Network: core.Layer1, + PT: core.Live, + Type: core.BalanceEnforcement, + Params: nil, + } + + var tests = []struct { + name string + constructor func(t *testing.T) *testSuite + testLogic func(t *testing.T, ts *testSuite) + }{ + { + name: "Failure when fetching state key", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + + ts.mockEtl.EXPECT().GetStateKey(pConfig.DataType). + Return(nil, false, testErr()). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + actualCfg, err := ts.subsys.BuildDeployCfg(pConfig, sConfig) + assert.Error(t, err) + assert.Nil(t, actualCfg) + }, + }, + { + name: "Failure when creating data pipeline", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + + ts.mockEtl.EXPECT().GetStateKey(pConfig.DataType). + Return(nil, false, nil). + Times(1) + + ts.mockEtl.EXPECT().CreateDataPipeline(gomock.Any()). + Return(core.NilPUUID(), false, testErr()). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + actualCfg, err := ts.subsys.BuildDeployCfg(pConfig, sConfig) + assert.Error(t, err) + assert.Nil(t, actualCfg) + }, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d-%s", i, test.name), func(t *testing.T) { + ts := test.constructor(t) + test.testLogic(t, ts) + }) + } +} + +func Test_RunInvSession(t *testing.T) { + testSUUID := core.MakeSUUID(1, 1, 1) + testCfg := &invariant.DeployConfig{ + Stateful: false, + StateKey: nil, + Network: core.Layer1, + PUUID: core.NilPUUID(), + Reuse: false, + + InvType: core.BalanceEnforcement, + InvParams: nil, + AlertDest: core.Slack, + } + + var tests = []struct { + name string + constructor func(t *testing.T) *testSuite + testLogic func(t *testing.T, ts *testSuite) + }{ + { + name: "Failure when deploying invariant session", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + ts.mockEng.EXPECT().DeployInvariantSession(testCfg). + Return(core.NilSUUID(), testErr()). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + actualSUUID, err := ts.subsys.RunInvSession(testCfg) + assert.Error(t, err) + assert.Equal(t, core.NilSUUID(), actualSUUID) + }, + }, + { + name: "Failure when adding invariant session to alerting system", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + ts.mockEng.EXPECT().DeployInvariantSession(testCfg). + Return(testSUUID, nil). + Times(1) + + ts.mockAlrt.EXPECT().AddSession(testSUUID, testCfg.AlertDest). + Return(testErr()). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + actualSUUID, err := ts.subsys.RunInvSession(testCfg) + assert.Error(t, err) + assert.Equal(t, core.NilSUUID(), actualSUUID) + }, + }, + { + name: "Success with no reuse", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + ts.mockEng.EXPECT().DeployInvariantSession(testCfg). + Return(testSUUID, nil). + Times(1) + + ts.mockAlrt.EXPECT().AddSession(testSUUID, testCfg.AlertDest). + Return(nil). + Times(1) + + ts.mockEtl.EXPECT().RunPipeline(testCfg.PUUID). + Return(nil). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + actualSUUID, err := ts.subsys.RunInvSession(testCfg) + assert.NoError(t, err) + assert.Equal(t, testSUUID, actualSUUID) + }, + }, + { + name: "Success with reuse", + constructor: func(t *testing.T) *testSuite { + ts := createTestSuite(t) + ts.mockEng.EXPECT().DeployInvariantSession(testCfg). + Return(testSUUID, nil). + Times(1) + + ts.mockAlrt.EXPECT().AddSession(testSUUID, testCfg.AlertDest). + Return(nil). + Times(1) + + return ts + }, + testLogic: func(t *testing.T, ts *testSuite) { + testCfg.Reuse = true + actualSUUID, err := ts.subsys.RunInvSession(testCfg) + assert.NoError(t, err) + assert.Equal(t, testSUUID, actualSUUID) + }, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d-%s", i, test.name), func(t *testing.T) { + ts := test.constructor(t) + test.testLogic(t, ts) + }) + } +}