-
Notifications
You must be signed in to change notification settings - Fork 7
/
node.go
155 lines (130 loc) · 3.97 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package eventlogger
import (
"context"
"fmt"
)
// NodeType defines the possible Node type's in the system.
type NodeType int
const (
_ NodeType = iota
NodeTypeFilter
NodeTypeFormatter
NodeTypeSink
NodeTypeFormatterFilter // A node that formats and then filters the events based on the new format.
)
// A Node in a graph
type Node interface {
// Process does something with the Event: filter, redaction,
// marshalling, persisting.
Process(ctx context.Context, e *Event) (*Event, error)
// Reopen is used to re-read any config stored externally
// and to close and reopen files, e.g. for log rotation.
Reopen() error
// Type describes the type of the node. This is mostly just used to
// validate that pipelines are sensibly arranged, e.g. ending with a sink.
Type() NodeType
}
// A NodeController is used by a Broker to attempt additional control of a given node.
// For instance, when a Node supports being closed via the Closer interface.
type NodeController struct {
n Node
}
// NewNodeController creates a new NodeController for a given Node. The Node
// should be the original value registered with the broker, or have an Unwrap
// method returning the original Node (see NodeUnwrapper interface).
//
// If the Node implements any of the following methods, the NodeController will
// call them as appropriate/needed:
//
// Close() error
func NewNodeController(n Node) *NodeController {
// intentionally not checking the Node for nil.. the caller must ensure it's
// valid and the docs make that clear.
return &NodeController{n}
}
// NodeUnwrapper will unwrap a node, returning the original value (see
// NewNodeController docs)
type NodeUnwrapper interface {
Unwrap() Node
}
// Closer will close without error
type Closer interface {
Close(ctx context.Context) error
}
// Close the Node if it implements the Closer interface, and if required use the
// NodeUnwrapper interface to unwrap it before closing it.
func (nc *NodeController) Close(ctx context.Context) error {
n := nc.n
for {
switch t := n.(type) {
case Closer:
return t.Close(ctx)
case NodeUnwrapper:
n = t.Unwrap()
default:
return nil
}
}
}
type linkedNode struct {
node Node
nodeID NodeID
next []*linkedNode
}
// linkNodes is a convenience function that connects Nodes together into a linked list.
func linkNodes(nodes []Node, ids []NodeID) (*linkedNode, error) {
switch {
case len(nodes) == 0:
return nil, fmt.Errorf("no nodes given")
case len(ids) == 0:
return nil, fmt.Errorf("no IDs given")
case len(nodes) != len(ids):
return nil, fmt.Errorf("number of nodes does not match number of IDs")
}
root := &linkedNode{node: nodes[0], nodeID: ids[0]}
cur := root
for i, n := range nodes[1:] {
next := &linkedNode{node: n, nodeID: ids[i+1]}
cur.next = []*linkedNode{next}
cur = next
}
return root, nil
}
// linkNodesAndSinks is a convenience function that connects
// the inner Nodes together into a linked list. Then it appends the sinks
// to the end as a set of fan-out leaves.
func linkNodesAndSinks(inner, sinks []Node, nodeIDs, sinkIDs []NodeID) (*linkedNode, error) {
root, err := linkNodes(inner, nodeIDs)
if err != nil {
return nil, err
}
// This is inefficient but since it's only used in setup we don't care:
cur := root
for cur.next != nil {
cur = cur.next[0]
}
for i, s := range sinks {
cur.next = append(cur.next, &linkedNode{node: s, nodeID: sinkIDs[i]})
}
return root, nil
}
// flatten will attempt to visit every linked node and flatten the overall set of node IDs.
func (l *linkedNode) flatten() map[NodeID]struct{} {
stack := []*linkedNode{l}
flattened := make(map[NodeID]struct{})
for len(stack) > 0 {
node := stack[len(stack)-1]
stack = stack[:len(stack)-1]
// Skip already flattened nodes
if _, ok := flattened[node.nodeID]; ok {
continue
}
flattened[node.nodeID] = struct{}{}
for _, child := range node.next {
stack = append(stack, child)
}
}
return flattened
}