Simple asynchronous data pipeline written in Go with support for concurrent tasks at each stage.
go get -v -u github.com/caffix/pipeline
The pipeline processes data provided by the input source through multiple stages and finally consumed by the output sink. All steps of the pipeline can be executing concurrently to maximize throughput. The pipeline can also be executed with buffering in-between each step in an attempt to minimize the impact of one stage taking longer than the others. Any error returned from a task being executed will terminate the pipeline. If a task returns nil
data, the data is marked as processed and will not continue to the following stage.
The pipeline Data
implements the Clone
and MarkAsProcessed
methods that performs a deep copy and marks the data to prevent further movement down the pipeline, respectively. Below is a simple pipeline Data
implementation:
type stringData struct {
processed bool
val string
}
// Clone implements the pipeline Data interface.
func (s *stringData) Clone() pipeline.Data { return &stringData{val: s.val} }
// Clone implements the pipeline Data interface.
func (s *stringData) MarkAsProcessed() { s.processed = true }
// String implements the Stringer interface.
func (s *stringData) String() string { return s.val }
The InputSource
is an iterator that feeds the pipeline with data. Once the Next
method returns false
, the pipeline prevents the following stage from receiving data and begins an avalanche affect stopping each stage and eventually terminating the pipeline. Below is a simple input source:
type stringSource []pipeline.Data
var source stringSource = []*stringData{
&stringData{val: "one"},
&stringData{val: "two"},
&stringData{val: "three"},
}
// Next implements the pipeline InputSource interface.
func (s stringSource) Next(context.Context) bool { return len(s) > 0 }
// Data implements the pipeline InputSource interface.
func (s stringSource) Data() pipeline.Data {
defer func() { s = s[1:] }
return s[0]
}
// Error implements the pipeline InputSource interface.
func (s stringSource) Error() error { return nil }
The OutputSink
serves as a final landing spot for the data after successfully traversing the entire pipeline. All data reaching the output sink is automatically marked as processed. Below is a simple output sink:
type stringSink []string
// Consume implements the pipeline OutputSink interface.
func (s stringSink) Consume(ctx context.Context, data pipeline.Data) error {
sd := data.(*stringData)
s = append(s, sd.String())
return nil
}
The pipeline steps are executed in sequential order by instances of Stage
. The execution strategies implemented are FIFO
, FixedPool
, DynamicPool
, Broadcast
, and Parallel
:
FIFO
- Executes the single TaskFixedPool
- Executes a fixed number of instances of the one specified TaskDynamicPool
- Executes a dynamic number of instances of the one specified TaskBroadcast
- Executes several unique Task instances concurrently moving Data ASAPParallel
- Executes several unique Task instances concurrently and passing through the original Data only once all the tasks complete successfully
The stage execution strategies can be combined to form desired pipelines. A Stage requires at least one Task to be executed at the step it represents in the pipeline. Each Task returns Data
and an error
. If the data returned is nil, it will not be sent to the following Stage. If the error is non-nil, the entire pipeline will be terminated. This allows users of the pipeline to have complete control over how failures impact the overall pipeline execution. A Task implements the Process
method.
// TaskFunc is defined as a function with a Process method that calls the function
task := pipeline.TaskFunc(func(ctx context.Context, data pipeline.Data, tp pipeline.TaskParams) (pipeline.Data, error) {
var val int
s := data.(*stringData)
switch s.String() {
case "one":
val = 1
case "two":
var = 2
case "three":
var = 3
}
data.val = fmt.Sprintf("%s - %d", s.String(), val)
return data, nil
})
stage := pipeline.FIFO("", task)
The Pipeline continues executing until all the Data from the input source is processed, an error takes place, or the provided Context expires. At a minimum, the pipeline requires an input source, a pass through stage, and the output sink.
p := NewPipeline(stage)
if err := p.Execute(context.TODO(), source, sink); err != nil {
fmt.Printf("Error executing the pipeline: %v\n", err)
}
Some additional features would bring value to this data pipeline implementation.
No logging is built into this pipeline implementation and this could be quite useful to have.
It would be helpful to have the ability to monitor stage and task performance such as how long each is taking to execute, the number of Data instances processes, the number of successes and failures, etc.
This pipeline implementation is very abstract, which allows users to perform nearly any set of steps. Currently, users must implement their own tasks. Some tasks are very common and the project could build support for such activities. For example, executing a script pulled from a Git repo.
As the implementation becomes from complex, it could be helpful to support the use of configuration files and reduce the level of effort necessary to build a pipeline. For example, the configuration file could specify when tasks should be output to alternative stages.
While the current execution strategies work for many use cases, there could be opportunities to develop additional stage types that ease pipeline development.
This program is free software: you can redistribute it and/or modify it under the terms of the Apache license.