Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
[reworked-core-abstractions] removed all references of pipeline & com…
Browse files Browse the repository at this point in the history
…ponent
  • Loading branch information
Ethen Pociask committed Nov 5, 2023
1 parent e3dc581 commit 7ec4f79
Show file tree
Hide file tree
Showing 50 changed files with 262 additions and 269 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ To use the template, run the following command(s):
docker run -p 8080:8080 -p 7300:7300 --env-file=config.env -it -v ${PWD}/genesis.json:/app/genesis.json ghcr.io/base-org/pessimism:latest
```

**Note**: If you want to bootstrap the application and run specific heuristics/pipelines upon start, update config.env `BOOTSTRAP_PATH` value to the location of your genesis.json file then run
**Note**: If you want to bootstrap the application and run specific heuristics/paths upon start, update config.env `BOOTSTRAP_PATH` value to the location of your genesis.json file then run

### Building and Running New Images

Expand Down
2 changes: 1 addition & 1 deletion config.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ ENABLE_METRICS=1 # 0 to disable, 1 to enable
METRICS_READ_HEADER_TIMEOUT=60

# Concurrency Management
MAX_PIPELINE_COUNT=10
MAX_PATH_COUNT=10
6 changes: 3 additions & 3 deletions docs/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ GEM
octokit (~> 4.0)
public_suffix (>= 3.0, < 5.0)
typhoeus (~> 1.3)
html-pipeline (2.14.3)
html-path (2.14.3)
activesupport (>= 2)
nokogiri (>= 1.4)
http_parser.rb (0.8.0)
Expand Down Expand Up @@ -125,7 +125,7 @@ GEM
jekyll-include-cache (0.2.1)
jekyll (>= 3.7, < 5.0)
jekyll-mentions (1.6.0)
html-pipeline (~> 2.3)
html-path (~> 2.3)
jekyll (>= 3.7, < 5.0)
jekyll-optional-front-matter (0.3.2)
jekyll (>= 3.0, < 5.0)
Expand Down Expand Up @@ -194,7 +194,7 @@ GEM
listen (~> 3.0)
jemoji (0.12.0)
gemoji (~> 3.0)
html-pipeline (~> 2.2)
html-path (~> 2.2)
jekyll (>= 3.0, < 5.0)
kramdown (2.3.2)
rexml
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/api.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The API can be customly configured using environment variables stored in a `conf

### Processs

The Pessimism API is broken down into the following constituent components:
The Pessimism API is broken down into the following constituent processs:

- `handlers`: The handlers package contains the HTTP handlers for the API. Each handler is responsible for handling a specific endpoint and is responsible for parsing the request, calling the appropriate service method, and renders a response.
- `service`: The service package contains the business logic for the API. The service is responsible for handling calls to the core Pessimism subsystems and is responsible for validating incoming requests.
Expand Down
6 changes: 3 additions & 3 deletions docs/architecture/architecture.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ permalink: /architecture

There are *three subsystems* that drive Pessimism’s architecture:

1. [ETL](./etl.markdown) - Modularized data extraction system for retrieving and processing external chain data in the form of a DAG known as the Pipeline DAG
2. [Risk Engine](./engine.markdown) - Logical execution platform that runs a set of heuristics on the data funneled from the Pipeline DAG
1. [ETL](./etl.markdown) - Modularized data extraction system for retrieving and processing external chain data in the form of a DAG known as the Path DAG
2. [Risk Engine](./engine.markdown) - Logical execution platform that runs a set of heuristics on the data funneled from the Path DAG
3. [Alerting](./alerting.markdown) - Alerting system that is used to notify users of heuristic failures

These systems will be accessible by a client through the use of a JSON-RPC API that has unilateral access to all three primary subsystems.
Expand All @@ -23,7 +23,7 @@ The API will be supported to allow Pessimism users via client to:
## Diagram

The following diagram illustrates the core interaction flow between the three primary subsystems, API, and external data sources:
![high level component diagram](../assets/images/high_level_diagram.png)
![high level process diagram](../assets/images/high_level_diagram.png)

## Shared State

Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/engine.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ A `SessionPID` is encoded using the following 3 byte array sequence:
```
0 1 2 3
|-----------|-----------|-----------|
network pipeline heuristic
network path heuristic
type type type
```

Expand Down
135 changes: 67 additions & 68 deletions docs/architecture/etl.markdown

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions docs/telemetry.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ which can be pasted directly below to keep current system metric documentation u
| METRIC | DESCRIPTION | LABELS | TYPE |
|-------------------------------------------|--------------------------------------------------------|----------------------------------------|---------|
| pessimism_up | 1 if the service is up | | gauge |
| pessimism_heuristics_active_heuristics | Number of active heuristics | heuristic,network,pipeline | gauge |
| pessimism_etl_active_pipelines | Number of active pipelines | pipeline,network | gauge |
| pessimism_heuristics_active_heuristics | Number of active heuristics | heuristic,network,path | gauge |
| pessimism_etl_active_paths | Number of active paths | path,network | gauge |
| pessimism_heuristics_heuristic_runs_total | Number of times a specific heuristic has been run | network,heuristic | counter |
| pessimism_alerts_generated_total | Number of total alerts generated for a given heuristic | network,heuristic,pipeline,destination | counter |
| pessimism_alerts_generated_total | Number of total alerts generated for a given heuristic | network,heuristic,path,destination | counter |
| pessimism_node_errors_total | Number of node errors caught | node | counter |
| pessimism_block_latency | Millisecond latency of block processing | network | gauge |
| pessimism_pipeline_latency | Millisecond latency of pipeline processing | PathID | gauge |
| pessimism_path_latency | Millisecond latency of path processing | PathID | gauge |
| pessimism_heuristic_execution_time | Nanosecond time of heuristic execution | heuristic | gauge |
| pessimism_heuristic_errors_total | Number of errors generated by heuristic executions | heuristic | counter |
4 changes: 2 additions & 2 deletions e2e/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestMultiDirectiveRouting(t *testing.T) {
// and PagerDuty servers.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
PathID := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(PathID)
height, err := ts.Subsystems.PathHeight(PathID)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestCoolDown(t *testing.T) {

require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
PathID := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(PathID)
height, err := ts.Subsystems.PathHeight(PathID)
if err != nil {
return false, err
}
Expand Down
14 changes: 7 additions & 7 deletions e2e/heuristic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBalanceEnforcement(t *testing.T) {
// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestBalanceEnforcement(t *testing.T) {
// Wait for Pessimism to process the balance change and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestContractEvent(t *testing.T) {
// Wait for Pessimism to process the newly emitted event and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestWithdrawalSafetyAllInvariants(t *testing.T) {
// Wait for Pessimism to process the proven withdrawal and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestWithdrawalSafetyAllInvariants(t *testing.T) {
// // Wait for Pessimism to process the finalized withdrawal and send a notification to the mocked Slack server.
// require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
// id := ids[0].PathID
// height, err := ts.Subsystems.PipelineHeight(id)
// height, err := ts.Subsystems.PathHeight(id)
// if err != nil {
// return false, err
// }
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestWithdrawalSafetyNoInvariants(t *testing.T) {
// Wait for Pessimism to process the proven withdrawal and send a notification to the mocked Slack server.
require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestFaultDetector(t *testing.T) {

require.NoError(t, wait.For(context.Background(), 500*time.Millisecond, func() (bool, error) {
id := ids[0].PathID
height, err := ts.Subsystems.PipelineHeight(id)
height, err := ts.Subsystems.PathHeight(id)
if err != nil {
return false, err
}
Expand Down
8 changes: 4 additions & 4 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite {
func DefaultTestConfig() *config.Config {
l1PollInterval := 900
l2PollInterval := 300
maxPipelines := 10
maxPaths := 10
workerCount := 4

return &config.Config{
Expand All @@ -179,9 +179,9 @@ func DefaultTestConfig() *config.Config {
Port: 0,
},
SystemConfig: &subsystem.Config{
MaxPipelineCount: maxPipelines,
L2PollInterval: l2PollInterval,
L1PollInterval: l1PollInterval,
MaxPathCount: maxPaths,
L2PollInterval: l2PollInterval,
L1PollInterval: l1PollInterval,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/api/models/heuristic.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (hrp *SessionRequestParams) AlertPolicy() *core.AlertPolicy {
return hrp.AlertingParams
}

// NewPathCfg ... Generates a pipeline config using the request params
// NewPathCfg ... Generates a path config using the request params
func (hrp *SessionRequestParams) NewPathCfg(pollInterval time.Duration,
regType core.TopicType) *core.PathConfig {
return &core.PathConfig{
Expand Down
2 changes: 1 addition & 1 deletion internal/api/service/heuristic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (svc *PessimismService) ProcessHeuristicRequest(ir *models.SessionRequestBo

// RunHeuristicSession ... Runs a heuristic session provided
func (svc *PessimismService) RunHeuristicSession(params *models.SessionRequestParams) (core.UUID, error) {
pConfig, err := svc.m.BuildPipelineCfg(params)
pConfig, err := svc.m.BuildPathCfg(params)
if err != nil {
return core.UUID{}, err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/api/service/heuristic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test_RunHeuristicSession(t *testing.T) {
ts := createTestSuite(ctrl)

ts.mockSub.EXPECT().
BuildPipelineCfg(&defaultBody.Params).
BuildPathCfg(&defaultBody.Params).
Return(nil, nil).
Times(1)

Expand All @@ -71,12 +71,12 @@ func Test_RunHeuristicSession(t *testing.T) {
},
},
{
name: "Failure when building pipeline config",
name: "Failure when building path config",
constructionLogic: func() *testSuite {
ts := createTestSuite(ctrl)

ts.mockSub.EXPECT().
BuildPipelineCfg(&defaultBody.Params).
BuildPathCfg(&defaultBody.Params).
Return(nil, testErr()).
Times(1)
return ts
Expand All @@ -96,7 +96,7 @@ func Test_RunHeuristicSession(t *testing.T) {
ts := createTestSuite(ctrl)

ts.mockSub.EXPECT().
BuildPipelineCfg(&defaultBody.Params).
BuildPathCfg(&defaultBody.Params).
Return(nil, nil).
Times(1)

Expand All @@ -122,7 +122,7 @@ func Test_RunHeuristicSession(t *testing.T) {
ts := createTestSuite(ctrl)

ts.mockSub.EXPECT().
BuildPipelineCfg(&defaultBody.Params).
BuildPathCfg(&defaultBody.Params).
Return(nil, nil).
Times(1)

Expand Down
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (a *Application) BootStrap(sessions []*BootSession) ([]core.SessionID, erro
ids := make([]core.SessionID, 0, len(sessions))

for _, session := range sessions {
pConfig, err := a.Subsystems.BuildPipelineCfg(session)
pConfig, err := a.Subsystems.BuildPathCfg(session)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func NewConfig(fileName core.FilePath) *Config {
},

SystemConfig: &subsystem.Config{
MaxPipelineCount: getEnvInt("MAX_PIPELINE_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
MaxPathCount: getEnvInt("MAX_PATH_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
},
}

Expand Down
4 changes: 0 additions & 4 deletions internal/core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ func RetryStrategy() *retry.ExponentialStrategy {
return &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
}

// ClientConfig ... Configuration passed through to an reader component constructor
type ClientConfig struct {
Network Network
PollInterval time.Duration
Expand All @@ -20,7 +19,6 @@ type ClientConfig struct {
EndHeight *big.Int
}

// SessionConfig ... Configuration passed through to a session constructor
type SessionConfig struct {
Network Network
PT PathType
Expand All @@ -29,15 +27,13 @@ type SessionConfig struct {
Params *SessionParams
}

// PathConfig ... Configuration passed through to a pipeline constructor
type PathConfig struct {
Network Network
DataType TopicType
PathType PathType
ClientConfig *ClientConfig
}

// Backfill ... Returns true if the reader is configured to backfill
func (oc *ClientConfig) Backfill() bool {
return oc.StartHeight != nil
}
2 changes: 1 addition & 1 deletion internal/core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
Clients
)

// Network ... Represents the network for which a pipeline's reader
// Network ... Represents the network for which a path's reader
// is subscribed to.
type Network uint8

Expand Down
2 changes: 1 addition & 1 deletion internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type HeuristicInput struct {
}

// ExecInputRelay ... Represents a inter-subsystem
// relay used to bind final ETL pipeline outputs to risk engine inputs
// relay used to bind final ETL path outputs to risk engine inputs
type ExecInputRelay struct {
PathID PathID
outChan chan HeuristicInput
Expand Down
8 changes: 4 additions & 4 deletions internal/core/etl.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package core

// ProcessType ... Denotes the ETL component type
// ProcessType ... Denotes the ETL process type
type ProcessType uint8

const (
Read ProcessType = iota + 1
Subscribe
)

// String ... Converts the component type to a string
// String ... Converts the process type to a string
func (ct ProcessType) String() string {
switch ct {
case Read:
Expand All @@ -28,7 +28,7 @@ const (
Live PathType = iota + 1
)

// StringToPathType ... Converts a string to a pipeline type
// StringToPathType ... Converts a string to a path type
func StringToPathType(stringType string) PathType {
switch stringType {

Check failure on line 33 in internal/core/etl.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)

Expand All @@ -40,7 +40,7 @@ func StringToPathType(stringType string) PathType {
return PathType(0)
}

// String ... Converts the pipeline type to a string
// String ... Converts the path type to a string
func (pt PathType) String() string {
switch pt {

Check failure on line 45 in internal/core/etl.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)

Expand Down
6 changes: 3 additions & 3 deletions internal/core/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (id UUID) ShortString() string {
type ProcIdentifier [4]byte

// Represents a non-deterministic ID that's assigned to
// every uniquely constructed ETL component
// every uniquely constructed ETL process
type ProcessID struct {
ID ProcIdentifier
UUID UUID
Expand Down Expand Up @@ -82,7 +82,7 @@ func MakeProcessID(pt PathType, ct ProcessType, tt TopicType, n Network) Process
}
}

// MakePathID ... Constructs a pipeline PID sequence & random UUID
// MakePathID ... Constructs a path PID sequence & random UUID
func MakePathID(pt PathType, proc1, proc2 ProcessID) PathID {
id1, id2 := proc1.ID, proc2.ID

Expand All @@ -104,7 +104,7 @@ func MakePathID(pt PathType, proc1, proc2 ProcessID) PathID {
}
}

// String ... Returns string representation of a component PID
// String ... Returns string representation of a process PID
func (pid ProcIdentifier) String() string {
return fmt.Sprintf("%s:%s:%s:%s",
Network(pid[0]).String(),
Expand Down
Loading

0 comments on commit 7ec4f79

Please sign in to comment.