diff --git a/pipe/batch.go b/pipe/batch.go new file mode 100644 index 0000000..5cfa0ac --- /dev/null +++ b/pipe/batch.go @@ -0,0 +1,47 @@ +package pipe + +import ( + "context" + + "github.com/hiroara/carbo/task" +) + +// A Pipe task that makes a fixed size of batches. +type BatchOp[S any] struct { + size int +} + +// Create a batch operator with the passed size. +func Batch[S any](size int) *BatchOp[S] { + return &BatchOp[S]{size: size} +} + +// Convert the batch operator as a Pipe. +func (op *BatchOp[S]) AsPipe(opts ...task.Option) Pipe[S, []S] { + return FromFn(op.run, opts...) +} + +// Convert the batch operator as a task. +func (op *BatchOp[S]) AsTask() task.Task[S, []S] { + return op.AsPipe().AsTask() +} + +func (op *BatchOp[S]) run(ctx context.Context, in <-chan S, out chan<- []S) error { + b := make([]S, 0, op.size) + for el := range in { + b = append(b, el) + if len(b) < op.size { + continue + } + if err := task.Emit(ctx, out, b); err != nil { + return err + } + b = make([]S, 0, op.size) + } + if len(b) > 0 { + if err := task.Emit(ctx, out, b); err != nil { + return err + } + } + return nil +} diff --git a/pipe/batch_test.go b/pipe/batch_test.go new file mode 100644 index 0000000..95179db --- /dev/null +++ b/pipe/batch_test.go @@ -0,0 +1,33 @@ +package pipe_test + +import ( + "context" + "testing" + + "github.com/hiroara/carbo/pipe" + "github.com/hiroara/carbo/taskfn" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBatch(t *testing.T) { + t.Parallel() + + t.Run("Mod=0", func(t *testing.T) { + t.Parallel() + + batch := taskfn.SliceToSlice(pipe.Batch[string](2).AsTask()) + result, err := batch(context.Background(), []string{"a", "b", "c", "d"}) + require.NoError(t, err) + assert.Equal(t, [][]string{{"a", "b"}, {"c", "d"}}, result) + }) + + t.Run("Mod=1", func(t *testing.T) { + t.Parallel() + + batch := taskfn.SliceToSlice(pipe.Batch[string](2).AsTask()) + result, err := batch(context.Background(), []string{"a", "b", "c", "d", "e"}) + require.NoError(t, err) + assert.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}, result) + }) +}