Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

fix - Removed balance component, changed ETL abstractions, & redid block syncing #180

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
L1_RPC_ENDPOINT=
L2_RPC_ENDPOINT=

# Oracle Geth Block Poll Intervals (ms)
# Chain
L1_POLL_INTERVAL=5000
L2_POLL_INTERVAL=5000

Expand Down
25 changes: 12 additions & 13 deletions docs/architecture/etl.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ A component refers to a graph node within the ETL system. Every component perfor
Currently, there are three total component types:

1. `Pipe` - Used to perform local arbitrary computations _(e.g. Extracting L1Withdrawal transactions from a block)_
2. `Oracle` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_
2. `Reader` - Used to poll and collect data from some third-party source _(e.g. Querying real-time account balance amounts from an op-geth execution client)_
3. `Aggregator` - Used to synchronize events between asynchronous data sources _(e.g. Synchronizing L1/L2 blocks to understand real-time changes in bridging TVL)_

### Inter-Connectivity
Expand Down Expand Up @@ -97,20 +97,19 @@ Once input data processing has been completed, the output data is then submitted
* Generating opcode traces for some EVM transaction
* Parsing emitted events from a transaction

### Oracle
### Reader

Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, oracle's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary oracle logic.
Oracles are responsible for collecting data from some external third party _(e.g. L1 geth node, L2 rollup node, etc.)_. As of now, reader's are configurable through the use of a standard `OracleDefinition` interface that allows developers to write arbitrary reader logic.
The following key interface functions are supported/enforced:

* `ReadRoutine` - Routine used for reading/polling real-time data for some arbitrarily configured data source
* `BackTestRoutine` - _Optional_ routine used for sequentially backtesting from some starting to ending block heights.

Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Oracle` event loop. This is visualized below:
Unlike other components, `Oracles` actually employ _2 go routines_ to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual `Reader` event loop. This is visualized below:

{% raw %}
<div class="mermaid">
graph LR;
subgraph A[Oracle]
subgraph A[Reader]
B[eventLoop]-->|channel|ODefRoutine;
B[eventLoop]-->|context|ODefRoutine;
B-->B;
Expand Down Expand Up @@ -185,7 +184,7 @@ A registry submodule is used to store all ETL data register definitions that pro

## Addressing

Some component's require knowledge of a specific address to properly function. For example, an oracle that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.
Some component's require knowledge of a specific address to properly function. For example, an reader that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.

Shown below is how the ETL and Risk Engine interact with the shared state store using a `BalanceOracle` component as an example:

Expand All @@ -210,22 +209,22 @@ graph LR;
GETH --> |"{4} []balance"|BO

BO("Balance
Oracle") --> |"{1} Get(PUUID)"|state
Reader") --> |"{1} Get(PUUID)"|state
BO -."eventLoop()".-> BO

state --> |"{2} []address"|BO
end
</div>
{% endraw %}

### Geth Block Oracle Register
### Geth Block Reader Register

A `GethBlock` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract block data from a go-ethereum node in real-time.
A `BlockHeader` register refers to a block output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract block data from a go-ethereum node in real-time.

### Geth Account Balance Oracle Register
### Geth Account Balance Reader Register

An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Oracle` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time.
Unlike, the `GethBlock` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.
An `AccountBalance` register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating `Reader` components that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time.
Unlike, the `BlockHeader` register, this register requires knowledge of an address set that's shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.

## Managed ETL

Expand Down
2 changes: 1 addition & 1 deletion docs/heuristics.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ curl --location --request POST 'http://localhost:8080/v0/heuristic' \

**NOTE:** This heuristic requires an active RPC connection to both L1 and L2 networks. Furthermore, the Pessimism implementation of fault-detector assumes that a submitted L2 output on L1 will correspond to a canonical block on L2.

The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Oracle contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts.
The hardcoded `fault_detector` heuristic scans for active `OutputProposed` events on an L1 Output Reader contract. Once an event is detected, the heuristic implementation proceeds to reconstruct a local state output for the corresponding L2 block. If there is a mismatch between the L1 output and the local state output, the heuristic alerts.

### Parameters

Expand Down
37 changes: 23 additions & 14 deletions e2e/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ func TestMultiDirectiveRouting(t *testing.T) {
// balance enforcement heuristic session on L2 network with a cooldown.
func TestCoolDown(t *testing.T) {

ts := e2e.CreateL2TestSuite(t)
ts := e2e.CreateSysTestSuite(t)
defer ts.Close()

alice := ts.L2Cfg.Secrets.Addresses().Alice
bob := ts.L2Cfg.Secrets.Addresses().Bob
alice := ts.Cfg.Secrets.Addresses().Alice
bob := ts.Cfg.Secrets.Addresses().Bob

alertMsg := "one baby to another says:"
// Deploy a balance enforcement heuristic session for Alice using a cooldown.
_, err := ts.App.BootStrap([]*models.SessionRequestParams{{
ids, err := ts.App.BootStrap([]*models.SessionRequestParams{{
Network: core.Layer2.String(),
PType: core.Live.String(),
HeuristicType: core.BalanceEnforcement.String(),
Expand All @@ -120,21 +120,21 @@ func TestCoolDown(t *testing.T) {
require.NoError(t, err, "Failed to bootstrap balance enforcement heuristic session")

// Get Alice's balance.
aliceAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), alice, nil)
aliceAmt, err := ts.L2Client.BalanceAt(context.Background(), alice, nil)
require.NoError(t, err, "Failed to get Alice's balance")

// Determine the gas cost of the transaction.
gasAmt := 1_000_001
bigAmt := big.NewInt(1_000_001)
gasPrice := big.NewInt(int64(ts.L2Cfg.DeployConfig.L2GenesisBlockGasLimit))
gasPrice := big.NewInt(int64(ts.Cfg.DeployConfig.L2GenesisBlockGasLimit))

gasCost := gasPrice.Mul(gasPrice, bigAmt)

signer := types.LatestSigner(ts.L2Geth.L2ChainConfig)
signer := types.LatestSigner(ts.Sys.L2GenesisCfg.Config)

// Create a transaction from Alice to Bob that will drain almost all of Alice's ETH.
drainAliceTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)),
drainAliceTx := types.MustSignNewTx(ts.Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: big.NewInt(100),
GasFeeCap: big.NewInt(100000),
Expand All @@ -145,12 +145,21 @@ func TestCoolDown(t *testing.T) {
Data: nil,
})

// Send the transaction to drain Alice's account of almost all ETH.
_, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx)
require.NoError(t, err, "Failed to create L2 block with transaction")
err = ts.L2Client.SendTransaction(context.Background(), drainAliceTx)
require.NoError(t, err)

receipt, err := wait.ForReceipt(context.Background(), ts.L2Client, drainAliceTx.Hash(), types.ReceiptStatusSuccessful)
require.NoError(t, err)

require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
pUUID := ids[0].PUUID
height, err := ts.Subsystems.PipelineHeight(pUUID)
if err != nil {
return false, err
}

// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
time.Sleep(2 * time.Second)
return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil
}))

// Check that the balance enforcement was triggered using the mocked server cache.
posts := ts.TestSlackSvr.SlackAlerts()
Expand Down
61 changes: 42 additions & 19 deletions e2e/heuristic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (
// balance enforcement heuristic session on L2 network.
func TestBalanceEnforcement(t *testing.T) {

ts := e2e.CreateL2TestSuite(t)
ts := e2e.CreateSysTestSuite(t)
defer ts.Close()

alice := ts.L2Cfg.Secrets.Addresses().Alice
bob := ts.L2Cfg.Secrets.Addresses().Bob
alice := ts.Cfg.Secrets.Addresses().Alice
bob := ts.Cfg.Secrets.Addresses().Bob

alertMsg := "one baby to another says:"
// Deploy a balance enforcement heuristic session for Alice.
_, err := ts.App.BootStrap([]*models.SessionRequestParams{{
ids, err := ts.App.BootStrap([]*models.SessionRequestParams{{
Network: core.Layer2.String(),
PType: core.Live.String(),
HeuristicType: core.BalanceEnforcement.String(),
Expand All @@ -58,21 +58,21 @@ func TestBalanceEnforcement(t *testing.T) {
require.NoError(t, err, "Failed to bootstrap balance enforcement heuristic session")

// Get Alice's balance.
aliceAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), alice, nil)
aliceAmt, err := ts.L2Client.BalanceAt(context.Background(), alice, nil)
require.NoError(t, err, "Failed to get Alice's balance")

// Determine the gas cost of the transaction.
gasAmt := 1_000_001
bigAmt := big.NewInt(1_000_001)
gasPrice := big.NewInt(int64(ts.L2Cfg.DeployConfig.L2GenesisBlockGasLimit))
gasPrice := big.NewInt(int64(ts.Cfg.DeployConfig.L2GenesisBlockGasLimit))

gasCost := gasPrice.Mul(gasPrice, bigAmt)

signer := types.LatestSigner(ts.L2Geth.L2ChainConfig)
signer := types.LatestSigner(ts.Sys.L2GenesisCfg.Config)

// Create a transaction from Alice to Bob that will drain almost all of Alice's ETH.
drainAliceTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)),
drainAliceTx := types.MustSignNewTx(ts.Cfg.Secrets.Alice, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: big.NewInt(100),
GasFeeCap: big.NewInt(100000),
Expand All @@ -86,11 +86,23 @@ func TestBalanceEnforcement(t *testing.T) {
require.Equal(t, len(ts.TestPagerDutyServer.PagerDutyAlerts()), 0, "No alerts should be sent before the transaction is sent")

// Send the transaction to drain Alice's account of almost all ETH.
_, err = ts.L2Geth.AddL2Block(context.Background(), drainAliceTx)

err = ts.L2Client.SendTransaction(context.Background(), drainAliceTx)
require.NoError(t, err)

receipt, err := wait.ForReceipt(context.Background(), ts.L2Client, drainAliceTx.Hash(), types.ReceiptStatusSuccessful)
require.NoError(t, err, "Failed to create L2 block with transaction")

// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
time.Sleep(1 * time.Second)
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
pUUID := ids[0].PUUID
height, err := ts.Subsystems.PipelineHeight(pUUID)
if err != nil {
return false, err
}

return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil
}))

// Check that the balance enforcement was triggered using the mocked server cache.
pdMsgs := ts.TestPagerDutyServer.PagerDutyAlerts()
Expand All @@ -100,12 +112,12 @@ func TestBalanceEnforcement(t *testing.T) {
assert.Contains(t, pdMsgs[0].Payload.Summary, "balance_enforcement", "Balance enforcement alert was not sent")

// Get Bobs's balance.
bobAmt, err := ts.L2Geth.L2Client.BalanceAt(context.Background(), bob, nil)
bobAmt, err := ts.L2Client.BalanceAt(context.Background(), bob, nil)
require.NoError(t, err, "Failed to get Alice's balance")

// Create a transaction to send the ETH back to Alice.
drainBobTx := types.MustSignNewTx(ts.L2Cfg.Secrets.Bob, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.L2Cfg.DeployConfig.L2ChainID)),
drainBobTx := types.MustSignNewTx(ts.Cfg.Secrets.Bob, signer, &types.DynamicFeeTx{
ChainID: big.NewInt(int64(ts.Cfg.DeployConfig.L2ChainID)),
Nonce: 0,
GasTipCap: big.NewInt(100),
GasFeeCap: big.NewInt(100000),
Expand All @@ -116,11 +128,22 @@ func TestBalanceEnforcement(t *testing.T) {
})

// Send the transaction to re-disperse the ETH from Bob back to Alice.
_, err = ts.L2Geth.AddL2Block(context.Background(), drainBobTx)
require.NoError(t, err, "Failed to create L2 block with transaction")
err = ts.L2Client.SendTransaction(context.Background(), drainBobTx)
require.NoError(t, err)

// Wait for Pessimism to process the balance change.
time.Sleep(1 * time.Second)
receipt, err = wait.ForReceipt(context.Background(), ts.L2Client, drainBobTx.Hash(), types.ReceiptStatusSuccessful)
require.NoError(t, err)

// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
pUUID := ids[0].PUUID
height, err := ts.Subsystems.PipelineHeight(pUUID)
if err != nil {
return false, err
}

return height != nil && height.Uint64() > receipt.BlockNumber.Uint64(), nil
}))

// Empty the mocked PagerDuty server cache.
ts.TestPagerDutyServer.ClearAlerts()
Expand All @@ -129,7 +152,7 @@ func TestBalanceEnforcement(t *testing.T) {
time.Sleep(1 * time.Second)

// Ensure that no new alerts were sent.
assert.Equal(t, len(ts.TestPagerDutyServer.Payloads), 0, "No alerts should be sent after the transaction is sent")
assert.Equal(t, 0, len(ts.TestPagerDutyServer.Payloads))
}

// TestContractEvent ... Tests the E2E flow of a single
Expand Down
Loading
Loading