Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Issue 6 fix #18

Merged
merged 24 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ linters-settings:
gocognit:
# Minimal code complexity to report.
# Default: 30
min-complexity: 20
min-complexity: 30

gocritic:
# Settings passed to gocritic.
Expand Down
4 changes: 3 additions & 1 deletion cmd/block_read_poc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/conduit/models"
"github.com/base-org/pessimism/internal/conduit/pipeline"
"github.com/base-org/pessimism/internal/conduit/registry"
Expand Down Expand Up @@ -83,7 +84,8 @@ func main() {
}
}()

l1Oracle, err := init(appCtx, pipeline.LiveOracle, l1OracleCfg)
ethClient := client.EthClient{}
l1Oracle, err := init(appCtx, pipeline.LiveOracle, l1OracleCfg, &ethClient)
if err != nil {
logger.Fatal("error initializing oracle", zap.Error(err))
}
Expand Down
35 changes: 31 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,61 @@ go 1.19

require (
github.com/ethereum/go-ethereum v1.11.4
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/joho/godotenv v1.5.1
github.com/stretchr/testify v1.8.2
go.uber.org/zap v1.24.0
)

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215 // indirect
google.golang.org/grpc v1.29.1 // indirect
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84 // indirect
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
289 changes: 285 additions & 4 deletions go.sum

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions internal/client/eth_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package client

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved
type EthClient struct {
client *ethclient.Client
}

type EthClientInterface interface {
DialContext(ctx context.Context, rawURL string) error
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
}

func (ec *EthClient) DialContext(ctx context.Context, rawURL string) error {
client, err := ethclient.DialContext(ctx, rawURL)

if err != nil {
return err
}

ec.client = client
return nil
}

func (ec *EthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
return ec.client.HeaderByNumber(ctx, number)
}

func (ec *EthClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return ec.client.BlockByNumber(ctx, number)
}
13 changes: 13 additions & 0 deletions internal/conduit/models/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,16 @@ const (
Pipe ComponentType = 1
Conveyor ComponentType = 2
)

type FetchType int

const (
FetchHeader FetchType = 0
FetchBlock FetchType = 1
)

anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved
type Timeouts int

const (
EthClientTimeout Timeouts = 20 // in seconds
)
26 changes: 19 additions & 7 deletions internal/conduit/pipeline/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package pipeline

import (
"context"
"math/big"
"sync"

"github.com/base-org/pessimism/internal/conduit/models"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
Expand All @@ -10,8 +12,9 @@ import (

// OracleDefinition ... Provides a generalized interface for developers to bind their own functionality to
type OracleDefinition interface {
ConfigureRoutine(ctx context.Context) error
BackTestRoutine(ctx context.Context, componentChan chan models.TransitData) error
ConfigureRoutine() error
BackTestRoutine(ctx context.Context, componentChan chan models.TransitData, startHeight *big.Int,
endHeight *big.Int) error
ReadRoutine(ctx context.Context, componentChan chan models.TransitData) error
}

Expand All @@ -22,8 +25,9 @@ type OracleOption = func(*Oracle)
type Oracle struct {
ctx context.Context

od OracleDefinition
ot OracleType
od OracleDefinition
ot OracleType
waitGroup *sync.WaitGroup

*OutputRouter
}
Expand All @@ -45,29 +49,38 @@ func NewOracle(ctx context.Context, ot OracleType,
ctx: ctx,
od: od,
ot: ot,
waitGroup: &sync.WaitGroup{},
OutputRouter: router,
}

for _, opt := range opts {
opt(o)
}

if cfgErr := od.ConfigureRoutine(ctx); cfgErr != nil {
if cfgErr := od.ConfigureRoutine(); cfgErr != nil {
return nil, cfgErr
}

return o, nil
}

anupsv-cb marked this conversation as resolved.
Show resolved Hide resolved
func (o *Oracle) Close() {
log := ctxzap.Extract(o.ctx)
log.Info("Waiting for oracle goroutines to be done.")
o.waitGroup.Wait()
log.Info("Oracle goroutines have exited.")
}

// EventLoop ... Component loop that actively waits and transits register data
// from a channel that the definition's read routine writes to
func (o *Oracle) EventLoop() error {
log := ctxzap.Extract(o.ctx)
oracleChannel := make(chan models.TransitData)

// Spawn read routine process
// TODO - Consider higher order concurrency injection; ie waitgroup, routine management
o.waitGroup.Add(1)
go func() {
defer o.waitGroup.Done()
if err := o.od.ReadRoutine(o.ctx, oracleChannel); err != nil {
log.Error("Received error from read routine", zap.Error(err))
}
Expand All @@ -80,7 +93,6 @@ func (o *Oracle) EventLoop() error {

case <-o.ctx.Done():
close(oracleChannel)

return nil
}
}
Expand Down
3 changes: 3 additions & 0 deletions internal/conduit/pipeline/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (p *Pipe) Type() models.ComponentType {
return models.Pipe
}

func (p *Pipe) Close() {
}

// EventLoop ... Driver loop for component that actively subscribes
// to an input channel where transit data is read, transformed, and transitte
// to downstream components
Expand Down
1 change: 1 addition & 0 deletions internal/conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type Component interface {
// EventLoop ... Component driver function; spun up as separate go routine
EventLoop() error
Type() models.ComponentType
Close()
}
10 changes: 5 additions & 5 deletions internal/conduit/pipeline/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func WithDirective(componentID int, outChan chan models.TransitData) RouterOptio
}

// OutputRouter ... Used as a lookup for components to know where to send output data to
// Adding and removing directives in the equivalent of adding an edge between two nodes using standard graph theory
// Adding and removing directives is the equivalent of adding an edge between two nodes using standard graph theory
type OutputRouter struct {
outChans map[int]chan models.TransitData
}
Expand All @@ -35,17 +35,17 @@ func NewOutputRouter(opts ...RouterOption) (*OutputRouter, error) {
return router, nil
}

// TransitOutput ... Sends single piece of transitData to all innner mapping value channels
// TransitOutput ... Sends single piece of transitData to all inner mapping value channels
func (router *OutputRouter) TransitOutput(data models.TransitData) {
// NOTE - Consider introducing a fail safe timeout to ensure that freezing on clogged chanel buffers is recognized
// NOTE - Consider introducing a fail-safe timeout to ensure that freezing on clogged chanel buffers is recognized
for _, channel := range router.outChans {
channel <- data
}
}

// TransitOutput ... Sends slice of transitData to all innner mapping value channels
// TransitOutputs ... Sends slice of transitData to all inner mapping value channels
func (router *OutputRouter) TransitOutputs(dataSlice []models.TransitData) {
// NOTE - Consider introducing a fail safe timeout to ensure that freezing on clogged chanel buffers is recognized
// NOTE - Consider introducing a fail-safe timeout to ensure that freezing on clogged chanel buffers is recognized
for _, data := range dataSlice {
router.TransitOutput(data)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/conduit/pipeline/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pipeline
import (
"context"

"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/conduit/models"
"github.com/base-org/pessimism/internal/config"
)
Expand All @@ -26,7 +27,8 @@ const (
// Generalized component constructor types
type (
// OracleConstructor ... Type declaration that a registry oracle component constructor must adhere to
OracleConstructor = func(ctx context.Context, ot OracleType, cfg *config.OracleConfig) (Component, error)
OracleConstructor = func(ctx context.Context, ot OracleType, cfg *config.OracleConfig,
client client.EthClientInterface) (Component, error)

// PipeConstructorFunc ... Type declaration that a registry pipe component constructor must adhere to
PipeConstructorFunc = func(ctx context.Context, inputChan chan models.TransitData) (Component, error)
Expand Down
Loading