From 554282f3b047155eb51c87499e647c7517d6932a Mon Sep 17 00:00:00 2001 From: Ethen Pociask Date: Mon, 6 Nov 2023 01:45:25 -0800 Subject: [PATCH] [reworked-core-abstractions] updated docs --- docs/architecture/etl.markdown | 8 ++++---- internal/etl/analysis_test.go | 20 +++++++++---------- internal/etl/etl.go | 6 +++--- internal/etl/etl_test.go | 10 +++++----- internal/etl/graph.go | 10 +++++----- internal/etl/manager_test.go | 6 +++--- ...ubscriber_test.go => subscription_test.go} | 0 internal/etl/registry/subscriptions.go | 3 --- internal/etl/types.go | 6 +++--- 9 files changed, 33 insertions(+), 36 deletions(-) rename internal/etl/registry/{subscriber_test.go => subscription_test.go} (100%) diff --git a/docs/architecture/etl.markdown b/docs/architecture/etl.markdown index 00c1215b..3c0f0042 100644 --- a/docs/architecture/etl.markdown +++ b/docs/architecture/etl.markdown @@ -56,14 +56,14 @@ flowchart TD; All process types also use an `ingressHandler` struct for ingesting active transit data from upstream ETL processes. -### Process UUID (CUUID) +### Process ID -All processes have a UUID that stores critical identification data. Process IDs are used by higher order abstractions to: +All processes have an ID that stores critical identification data. Process IDs are used by higher order abstractions to: * Represent a process DAG -* Understand when process duplicates occur in the system +* Understand when duplicate processes are generated -Process UUID's constitute of both a randomly generated `UUID` and a deterministic `PID`. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable. +Process ID's constitute of both a randomly generated `UUID` and a deterministic `PID`. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable. A `ProcIdentifier` is encoded using the following four byte sequence: diff --git a/internal/etl/analysis_test.go b/internal/etl/analysis_test.go index 35efaa11..2465ed51 100644 --- a/internal/etl/analysis_test.go +++ b/internal/etl/analysis_test.go @@ -14,19 +14,19 @@ import ( func Test_Mergable(t *testing.T) { var tests = []struct { - name string - description string - construct func() etl.Analyzer - testLogic func(t *testing.T, a etl.Analyzer) + name string + description string + construction func() etl.Analyzer + test func(t *testing.T, a etl.Analyzer) }{ { name: "Successful Path Merge", description: "Mergable function should return true if paths are mergable", - construct: func() etl.Analyzer { + construction: func() etl.Analyzer { r := registry.New() return etl.NewAnalyzer(r) }, - testLogic: func(t *testing.T, a etl.Analyzer) { + test: func(t *testing.T, a etl.Analyzer) { // Setup test paths mockOracle, err := mocks.NewReader(context.Background(), core.BlockHeader) assert.NoError(t, err) @@ -52,11 +52,11 @@ func Test_Mergable(t *testing.T) { { name: "Failure Path Merge", description: "Mergable function should return false when PID's do not match", - construct: func() etl.Analyzer { + construction: func() etl.Analyzer { r := registry.New() return etl.NewAnalyzer(r) }, - testLogic: func(t *testing.T, a etl.Analyzer) { + test: func(t *testing.T, a etl.Analyzer) { // Setup test paths reader, err := mocks.NewReader(context.Background(), core.BlockHeader) assert.NoError(t, err) @@ -83,8 +83,8 @@ func Test_Mergable(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - a := test.construct() - test.testLogic(t, a) + a := test.construction() + test.test(t, a) }) } diff --git a/internal/etl/etl.go b/internal/etl/etl.go index 408d3410..76523b5c 100644 --- a/internal/etl/etl.go +++ b/internal/etl/etl.go @@ -19,7 +19,7 @@ import ( ) type ETL interface { - CreateProcess(cc *core.ClientConfig, cUUID core.ProcessID, PathID core.PathID, + CreateProcess(cc *core.ClientConfig, id core.ProcessID, PathID core.PathID, dt *core.DataTopic) (process.Process, error) GetStateKey(rt core.TopicType) (*core.StateKey, bool, error) GetBlockHeight(id core.PathID) (*big.Int, error) @@ -210,14 +210,14 @@ func (etl *etl) getMergePath(id core.PathID, path Path) (core.PathID, error) { return core.PathID{}, nil } -func (etl *etl) CreateProcess(cc *core.ClientConfig, cUUID core.ProcessID, pathID core.PathID, +func (etl *etl) CreateProcess(cc *core.ClientConfig, id core.ProcessID, pathID core.PathID, dt *core.DataTopic) (process.Process, error) { logging.WithContext(etl.ctx).Debug("constructing process", zap.String("type", dt.ProcessType.String()), zap.String("register_type", dt.DataType.String())) // embed options to avoid constructor boilerplate - opts := []process.Option{process.WithID(cUUID), process.WithPathID(pathID)} + opts := []process.Option{process.WithID(id), process.WithPathID(pathID)} if dt.Stateful() { // Propagate state key to process so that it can be used diff --git a/internal/etl/etl_test.go b/internal/etl/etl_test.go index f443d877..a662a17a 100644 --- a/internal/etl/etl_test.go +++ b/internal/etl/etl_test.go @@ -34,24 +34,24 @@ func Test_Graph(t *testing.T) { constructionLogic: etl.NewGraph, testLogic: func(t *testing.T, g *etl.Graph) { - cUUID := core.MakeProcessID(69, 69, 69, 69) + id := core.MakeProcessID(69, 69, 69, 69) process, err := mocks.NewSubscriber(context.Background(), core.BlockHeader, core.BlockHeader) assert.NoError(t, err) - err = g.Add(cUUID, process) + err = g.Add(id, process) assert.NoError(t, err, "Process addition should resolve to Nil") - actualProcess, err := g.GetProcess(cUUID) + actualProcess, err := g.GetProcess(id) assert.NoError(t, err, "Process retrieval should resolve to Nil") assert.Equal(t, process, actualProcess) edges := g.Edges() - assert.Contains(t, edges, cUUID) + assert.Contains(t, edges, id) - assert.Len(t, edges[cUUID], 0, "No edges should exist yet") + assert.Len(t, edges[id], 0, "No edges should exist yet") }, }, diff --git a/internal/etl/graph.go b/internal/etl/graph.go index 83332461..98f3cafd 100644 --- a/internal/etl/graph.go +++ b/internal/etl/graph.go @@ -42,7 +42,7 @@ func (graph *Graph) GetProcess(id core.ProcessID) (process.Process, error) { return graph.edgeMap[id].p, nil } - return nil, fmt.Errorf(cUUIDNotFoundErr, id) + return nil, fmt.Errorf(procNotFoundErr, id) } /* @@ -59,12 +59,12 @@ NOTE - There is no check to ensure that a cyclic edge is being added, meaning func (graph *Graph) Subscribe(from, to core.ProcessID) error { fromNode, found := graph.edgeMap[from] if !found { - return fmt.Errorf(cUUIDNotFoundErr, from.String()) + return fmt.Errorf(procNotFoundErr, from.String()) } toNode, found := graph.edgeMap[to] if !found { - return fmt.Errorf(cUUIDNotFoundErr, to.String()) + return fmt.Errorf(procNotFoundErr, to.String()) } if _, exists := fromNode.edges[toNode.p.ID()]; exists { @@ -93,8 +93,8 @@ func (graph *Graph) RemoveEdge(_, _ core.ProcessID) error { } // TODO(#23): Manager DAG process Removal Support -// removeprocess ... Removes a process from the graph -func (graph *Graph) Removeprocess(_ core.ProcessID) error { +// Remove ... Removes a process from the graph +func (graph *Graph) Remove(_ core.ProcessID) error { return nil } diff --git a/internal/etl/manager_test.go b/internal/etl/manager_test.go index 8e8de9d2..87fa8c6f 100644 --- a/internal/etl/manager_test.go +++ b/internal/etl/manager_test.go @@ -39,7 +39,7 @@ func TestETL(t *testing.T) { }, testLogic: func(t *testing.T, etl ETL) { - cUUID := core.MakeProcessID(1, 1, 1, 1) + id := core.MakeProcessID(1, 1, 1, 1) register, err := registry.New().GetDataTopic(core.BlockHeader) @@ -48,10 +48,10 @@ func TestETL(t *testing.T) { cc := &core.ClientConfig{ Network: core.Layer1, } - p, err := etl.CreateProcess(cc, cUUID, core.PathID{}, register) + p, err := etl.CreateProcess(cc, id, core.PathID{}, register) assert.NoError(t, err) - assert.Equal(t, p.ID(), cUUID) + assert.Equal(t, p.ID(), id) assert.Equal(t, p.Type(), register.ProcessType) assert.Equal(t, p.EmitType(), register.DataType) diff --git a/internal/etl/registry/subscriber_test.go b/internal/etl/registry/subscription_test.go similarity index 100% rename from internal/etl/registry/subscriber_test.go rename to internal/etl/registry/subscription_test.go diff --git a/internal/etl/registry/subscriptions.go b/internal/etl/registry/subscriptions.go index 520d0f44..2220de31 100644 --- a/internal/etl/registry/subscriptions.go +++ b/internal/etl/registry/subscriptions.go @@ -19,7 +19,6 @@ import ( "go.uber.org/zap" ) -// LogSubscription ... type LogSubscription struct { PathID core.PathID SK *core.StateKey @@ -28,7 +27,6 @@ type LogSubscription struct { ss state.Store } -// NewLogSubscript ... func NewLogSubscript(ctx context.Context, n core.Network) (*LogSubscription, error) { client, err := client.FromNetwork(ctx, n) if err != nil { @@ -47,7 +45,6 @@ func NewLogSubscript(ctx context.Context, n core.Network) (*LogSubscription, err return sub, nil } -// NewLogSubscriber ... Initializer func NewLogSubscriber(ctx context.Context, cfg *core.ClientConfig, opts ...process.Option) (process.Process, error) { s, err := NewLogSubscript(ctx, cfg.Network) diff --git a/internal/etl/types.go b/internal/etl/types.go index 8734e2c5..62949696 100644 --- a/internal/etl/types.go +++ b/internal/etl/types.go @@ -34,9 +34,9 @@ const ( uuidNotFoundErr = "could not find matching UUID for path entry" // ProcessGraph error constants - cUUIDNotFoundErr = "process with ID %s does not exist within dag" - procExistsErr = "process with ID %s already exists in dag" - edgeExistsErr = "edge already exists from (%s) to (%s) in dag" + procNotFoundErr = "process with ID %s does not exist within dag" + procExistsErr = "process with ID %s already exists in dag" + edgeExistsErr = "edge already exists from (%s) to (%s) in dag" emptyPathError = "path must contain at least one process" // Manager error constants