Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Issue 6 fix #18

Merged
merged 24 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ linters-settings:
gocognit:
# Minimal code complexity to report.
# Default: 30
min-complexity: 20
min-complexity: 30

gocritic:
# Settings passed to gocritic.
Expand Down
10 changes: 6 additions & 4 deletions cmd/block_read_poc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func main() {

cfg := config.NewConfig("config.env")
l1OracleCfg := &config.OracleConfig{
RPCEndpoint: cfg.L1RpcEndpoint,
StartHeight: nil,
EndHeight: nil}
RPCEndpoint: cfg.L1RpcEndpoint,
StartHeight: nil,
NumOfRetries: cfg.NumOfRetries,
EndHeight: nil}

// 1. Configure blackhole tx pipe component
createRegister, err := registry.GetRegister(registry.ContractCreateTX)
Expand Down Expand Up @@ -71,7 +72,8 @@ func main() {
}
}()

l1Oracle, err := init(appCtx, pipeline.LiveOracle, l1OracleCfg)
ethClient := pipeline.EthClient{}
l1Oracle, err := init(appCtx, pipeline.LiveOracle, l1OracleCfg, &ethClient)
if err != nil {
panic(err)
}
Expand Down
30 changes: 30 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,50 @@ require (
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
370 changes: 369 additions & 1 deletion go.sum

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions internal/conduit/models/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ const (
Pipe ComponentType = 1
Conveyor ComponentType = 2
)

type Timeouts int

const (
EthClientTimeout Timeouts = 20 // in seconds
)
38 changes: 38 additions & 0 deletions internal/conduit/pipeline/eth_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pipeline
anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved

anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved
import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

type EthClient struct {
client *ethclient.Client
}

type EthClientInterface interface {
DialContext(ctx context.Context, rawURL string) error
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}

func (ec *EthClient) DialContext(ctx context.Context, rawURL string) error {
client, err := ethclient.DialContext(ctx, rawURL)

if err != nil {
return err
}

ec.client = client
return nil
}

func (ec *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return ec.client.HeaderByNumber(ctx, number)
}

func (ec *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return ec.client.BlockByNumber(ctx, number)
}
21 changes: 16 additions & 5 deletions internal/conduit/pipeline/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package pipeline
import (
"context"
"log"
"math/big"
"sync"

"github.com/base-org/pessimism/internal/conduit/models"
)

// OracleDefinition ... Provides a generalized interface for developers to bind their own functionality to
type OracleDefinition interface {
ConfigureRoutine() error
BackTestRoutine(ctx context.Context, componentChan chan models.TransitData) error
BackTestRoutine(ctx context.Context, componentChan chan models.TransitData, startHeight *big.Int,
endHeight *big.Int) error
ReadRoutine(ctx context.Context, componentChan chan models.TransitData) error
}

Expand All @@ -21,8 +24,9 @@ type OracleOption = func(*Oracle)
type Oracle struct {
ctx context.Context

od OracleDefinition
ot OracleType
od OracleDefinition
ot OracleType
waitGroup *sync.WaitGroup

*OutputRouter
}
Expand All @@ -44,6 +48,7 @@ func NewOracle(ctx context.Context, ot OracleType,
ctx: ctx,
od: od,
ot: ot,
waitGroup: &sync.WaitGroup{},
OutputRouter: router,
}

Expand All @@ -58,14 +63,21 @@ func NewOracle(ctx context.Context, ot OracleType,
return o, nil
}

anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved
func (o *Oracle) Close() {
log.Printf("Waiting for oracle goroutines to be done.")
o.waitGroup.Wait()
log.Printf("Oracle goroutines have exited.")
}
anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved

// EventLoop ... Component loop that actively waits and transits register data
// from a channel that the definition's read routine writes to
func (o *Oracle) EventLoop() error {
oracleChannel := make(chan models.TransitData)

// Spawn read routine process
// TODO - Consider higher order concurrency injection; ie waitgroup, routine management
o.waitGroup.Add(1)
go func() {
defer o.waitGroup.Done()
if err := o.od.ReadRoutine(o.ctx, oracleChannel); err != nil {
log.Printf("Received error from read routine %s", err.Error())
}
Expand All @@ -78,7 +90,6 @@ func (o *Oracle) EventLoop() error {

case <-o.ctx.Done():
close(oracleChannel)

return nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/conduit/pipeline/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (p *Pipe) Type() models.ComponentType {
return models.Pipe
}

func (p *Pipe) Close() {
}

// EventLoop ... Driver loop for component that actively subscribes
// to an input channel where transit data is read, transformed, and transitte
// to downstream components
Expand Down
1 change: 1 addition & 0 deletions internal/conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type Component interface {
// EventLoop ... Component driver function; spun up as separate go routine
EventLoop() error
Type() models.ComponentType
Close()
}
10 changes: 5 additions & 5 deletions internal/conduit/pipeline/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func WithDirective(componentID int, outChan chan models.TransitData) RouterOptio
}

// OutputRouter ... Used as a lookup for components to know where to send output data to
// Adding and removing directives in the equivalent of adding an edge between two nodes using standard graph theory
// Adding and removing directives is the equivalent of adding an edge between two nodes using standard graph theory
type OutputRouter struct {
outChans map[int]chan models.TransitData
}
Expand All @@ -35,17 +35,17 @@ func NewOutputRouter(opts ...RouterOption) (*OutputRouter, error) {
return router, nil
}

// TransitOutput ... Sends single piece of transitData to all innner mapping value channels
// TransitOutput ... Sends single piece of transitData to all inner mapping value channels
func (router *OutputRouter) TransitOutput(data models.TransitData) {
// NOTE - Consider introducing a fail safe timeout to ensure that freezing on clogged chanel buffers is recognized
// NOTE - Consider introducing a fail-safe timeout to ensure that freezing on clogged chanel buffers is recognized
for _, channel := range router.outChans {
channel <- data
}
}

// TransitOutput ... Sends slice of transitData to all innner mapping value channels
// TransitOutputs ... Sends slice of transitData to all inner mapping value channels
func (router *OutputRouter) TransitOutputs(dataSlice []models.TransitData) {
// NOTE - Consider introducing a fail safe timeout to ensure that freezing on clogged chanel buffers is recognized
// NOTE - Consider introducing a fail-safe timeout to ensure that freezing on clogged chanel buffers is recognized
for _, data := range dataSlice {
router.TransitOutput(data)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/conduit/pipeline/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
// Generalized component constructor types
type (
// OracleConstructor ... Type declaration that a registry oracle component constructor must adhere to
OracleConstructor = func(ctx context.Context, ot OracleType, cfg *config.OracleConfig) (Component, error)
OracleConstructor = func(ctx context.Context, ot OracleType, cfg *config.OracleConfig,
clientNew EthClientInterface) (Component, error)
anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved

// PipeConstructorFunc ... Type declaration that a registry pipe component constructor must adhere to
PipeConstructorFunc = func(ctx context.Context, inputChan chan models.TransitData) (Component, error)
Expand Down
Loading