Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
[reworked-core-abstractions] updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethen Pociask committed Nov 6, 2023
1 parent a6976b9 commit 554282f
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 36 deletions.
8 changes: 4 additions & 4 deletions docs/architecture/etl.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ flowchart TD;

All process types also use an `ingressHandler` struct for ingesting active transit data from upstream ETL processes.

### Process UUID (CUUID)
### Process ID

All processes have a UUID that stores critical identification data. Process IDs are used by higher order abstractions to:
All processes have an ID that stores critical identification data. Process IDs are used by higher order abstractions to:

* Represent a process DAG
* Understand when process duplicates occur in the system
* Understand when duplicate processes are generated

Process UUID's constitute of both a randomly generated `UUID` and a deterministic `PID`. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable.
Process ID's constitute of both a randomly generated `UUID` and a deterministic `PID`. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable.

A `ProcIdentifier` is encoded using the following four byte sequence:

Expand Down
20 changes: 10 additions & 10 deletions internal/etl/analysis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import (

func Test_Mergable(t *testing.T) {
var tests = []struct {
name string
description string
construct func() etl.Analyzer
testLogic func(t *testing.T, a etl.Analyzer)
name string
description string
construction func() etl.Analyzer
test func(t *testing.T, a etl.Analyzer)
}{
{
name: "Successful Path Merge",
description: "Mergable function should return true if paths are mergable",
construct: func() etl.Analyzer {
construction: func() etl.Analyzer {
r := registry.New()
return etl.NewAnalyzer(r)
},
testLogic: func(t *testing.T, a etl.Analyzer) {
test: func(t *testing.T, a etl.Analyzer) {
// Setup test paths
mockOracle, err := mocks.NewReader(context.Background(), core.BlockHeader)
assert.NoError(t, err)
Expand All @@ -52,11 +52,11 @@ func Test_Mergable(t *testing.T) {
{
name: "Failure Path Merge",
description: "Mergable function should return false when PID's do not match",
construct: func() etl.Analyzer {
construction: func() etl.Analyzer {
r := registry.New()
return etl.NewAnalyzer(r)
},
testLogic: func(t *testing.T, a etl.Analyzer) {
test: func(t *testing.T, a etl.Analyzer) {
// Setup test paths
reader, err := mocks.NewReader(context.Background(), core.BlockHeader)
assert.NoError(t, err)
Expand All @@ -83,8 +83,8 @@ func Test_Mergable(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
a := test.construct()
test.testLogic(t, a)
a := test.construction()
test.test(t, a)
})
}

Expand Down
6 changes: 3 additions & 3 deletions internal/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

type ETL interface {
CreateProcess(cc *core.ClientConfig, cUUID core.ProcessID, PathID core.PathID,
CreateProcess(cc *core.ClientConfig, id core.ProcessID, PathID core.PathID,
dt *core.DataTopic) (process.Process, error)
GetStateKey(rt core.TopicType) (*core.StateKey, bool, error)
GetBlockHeight(id core.PathID) (*big.Int, error)
Expand Down Expand Up @@ -210,14 +210,14 @@ func (etl *etl) getMergePath(id core.PathID, path Path) (core.PathID, error) {
return core.PathID{}, nil
}

func (etl *etl) CreateProcess(cc *core.ClientConfig, cUUID core.ProcessID, pathID core.PathID,
func (etl *etl) CreateProcess(cc *core.ClientConfig, id core.ProcessID, pathID core.PathID,
dt *core.DataTopic) (process.Process, error) {
logging.WithContext(etl.ctx).Debug("constructing process",
zap.String("type", dt.ProcessType.String()),
zap.String("register_type", dt.DataType.String()))

// embed options to avoid constructor boilerplate
opts := []process.Option{process.WithID(cUUID), process.WithPathID(pathID)}
opts := []process.Option{process.WithID(id), process.WithPathID(pathID)}

if dt.Stateful() {
// Propagate state key to process so that it can be used
Expand Down
10 changes: 5 additions & 5 deletions internal/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,24 @@ func Test_Graph(t *testing.T) {

constructionLogic: etl.NewGraph,
testLogic: func(t *testing.T, g *etl.Graph) {
cUUID := core.MakeProcessID(69, 69, 69, 69)
id := core.MakeProcessID(69, 69, 69, 69)

process, err := mocks.NewSubscriber(context.Background(), core.BlockHeader, core.BlockHeader)
assert.NoError(t, err)

err = g.Add(cUUID, process)
err = g.Add(id, process)
assert.NoError(t, err, "Process addition should resolve to Nil")

actualProcess, err := g.GetProcess(cUUID)
actualProcess, err := g.GetProcess(id)
assert.NoError(t, err, "Process retrieval should resolve to Nil")

assert.Equal(t, process, actualProcess)

edges := g.Edges()

assert.Contains(t, edges, cUUID)
assert.Contains(t, edges, id)

assert.Len(t, edges[cUUID], 0, "No edges should exist yet")
assert.Len(t, edges[id], 0, "No edges should exist yet")

},
},
Expand Down
10 changes: 5 additions & 5 deletions internal/etl/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (graph *Graph) GetProcess(id core.ProcessID) (process.Process, error) {
return graph.edgeMap[id].p, nil
}

return nil, fmt.Errorf(cUUIDNotFoundErr, id)
return nil, fmt.Errorf(procNotFoundErr, id)
}

/*
Expand All @@ -59,12 +59,12 @@ NOTE - There is no check to ensure that a cyclic edge is being added, meaning
func (graph *Graph) Subscribe(from, to core.ProcessID) error {
fromNode, found := graph.edgeMap[from]
if !found {
return fmt.Errorf(cUUIDNotFoundErr, from.String())
return fmt.Errorf(procNotFoundErr, from.String())
}

toNode, found := graph.edgeMap[to]
if !found {
return fmt.Errorf(cUUIDNotFoundErr, to.String())
return fmt.Errorf(procNotFoundErr, to.String())
}

if _, exists := fromNode.edges[toNode.p.ID()]; exists {
Expand Down Expand Up @@ -93,8 +93,8 @@ func (graph *Graph) RemoveEdge(_, _ core.ProcessID) error {
}

// TODO(#23): Manager DAG process Removal Support
// removeprocess ... Removes a process from the graph
func (graph *Graph) Removeprocess(_ core.ProcessID) error {
// Remove ... Removes a process from the graph
func (graph *Graph) Remove(_ core.ProcessID) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/etl/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestETL(t *testing.T) {
},

testLogic: func(t *testing.T, etl ETL) {
cUUID := core.MakeProcessID(1, 1, 1, 1)
id := core.MakeProcessID(1, 1, 1, 1)

register, err := registry.New().GetDataTopic(core.BlockHeader)

Expand All @@ -48,10 +48,10 @@ func TestETL(t *testing.T) {
cc := &core.ClientConfig{
Network: core.Layer1,
}
p, err := etl.CreateProcess(cc, cUUID, core.PathID{}, register)
p, err := etl.CreateProcess(cc, id, core.PathID{}, register)
assert.NoError(t, err)

assert.Equal(t, p.ID(), cUUID)
assert.Equal(t, p.ID(), id)
assert.Equal(t, p.Type(), register.ProcessType)
assert.Equal(t, p.EmitType(), register.DataType)

Expand Down
3 changes: 0 additions & 3 deletions internal/etl/registry/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"
)

// LogSubscription ...
type LogSubscription struct {
PathID core.PathID
SK *core.StateKey
Expand All @@ -28,7 +27,6 @@ type LogSubscription struct {
ss state.Store
}

// NewLogSubscript ...
func NewLogSubscript(ctx context.Context, n core.Network) (*LogSubscription, error) {
client, err := client.FromNetwork(ctx, n)
if err != nil {
Expand All @@ -47,7 +45,6 @@ func NewLogSubscript(ctx context.Context, n core.Network) (*LogSubscription, err
return sub, nil
}

// NewLogSubscriber ... Initializer
func NewLogSubscriber(ctx context.Context, cfg *core.ClientConfig,
opts ...process.Option) (process.Process, error) {
s, err := NewLogSubscript(ctx, cfg.Network)
Expand Down
6 changes: 3 additions & 3 deletions internal/etl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
uuidNotFoundErr = "could not find matching UUID for path entry"

// ProcessGraph error constants
cUUIDNotFoundErr = "process with ID %s does not exist within dag"
procExistsErr = "process with ID %s already exists in dag"
edgeExistsErr = "edge already exists from (%s) to (%s) in dag"
procNotFoundErr = "process with ID %s does not exist within dag"
procExistsErr = "process with ID %s already exists in dag"
edgeExistsErr = "edge already exists from (%s) to (%s) in dag"

emptyPathError = "path must contain at least one process"
// Manager error constants
Expand Down

0 comments on commit 554282f

Please sign in to comment.