Skip to content

Commit

Permalink
feat: add pipe.Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroara committed Aug 2, 2023
1 parent 75c29fa commit e1a6a4d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
47 changes: 47 additions & 0 deletions pipe/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package pipe

import (
"context"

"github.com/hiroara/carbo/task"
)

// A Pipe task that makes a fixed size of batches.
type BatchOp[S any] struct {
size int
}

// Create a batch operator with the passed size.
func Batch[S any](size int) *BatchOp[S] {
return &BatchOp[S]{size: size}
}

// Convert the batch operator as a Pipe.
func (op *BatchOp[S]) AsPipe(opts ...task.Option) Pipe[S, []S] {
return FromFn(op.run, opts...)
}

// Convert the batch operator as a task.
func (op *BatchOp[S]) AsTask() task.Task[S, []S] {
return op.AsPipe().AsTask()
}

func (op *BatchOp[S]) run(ctx context.Context, in <-chan S, out chan<- []S) error {
b := make([]S, 0, op.size)
for el := range in {
b = append(b, el)
if len(b) < op.size {
continue
}
if err := task.Emit(ctx, out, b); err != nil {
return err
}
b = make([]S, 0, op.size)
}
if len(b) > 0 {
if err := task.Emit(ctx, out, b); err != nil {
return err
}
}
return nil
}
33 changes: 33 additions & 0 deletions pipe/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pipe_test

import (
"context"
"testing"

"github.com/hiroara/carbo/pipe"

Check failure on line 7 in pipe/batch_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/hiroara/carbo/) -s blank -s dot (gci)
"github.com/hiroara/carbo/taskfn"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Check failure on line 10 in pipe/batch_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/hiroara/carbo/) -s blank -s dot (gci)
)

func TestBatch(t *testing.T) {
t.Parallel()

t.Run("Mod=0", func(t *testing.T) {
t.Parallel()

batch := taskfn.SliceToSlice(pipe.Batch[string](2).AsTask())
result, err := batch(context.Background(), []string{"a", "b", "c", "d"})
require.NoError(t, err)
assert.Equal(t, [][]string{{"a", "b"}, {"c", "d"}}, result)
})

t.Run("Mod=1", func(t *testing.T) {
t.Parallel()

batch := taskfn.SliceToSlice(pipe.Batch[string](2).AsTask())
result, err := batch(context.Background(), []string{"a", "b", "c", "d", "e"})
require.NoError(t, err)
assert.Equal(t, [][]string{{"a", "b"}, {"c", "d"}, {"e"}}, result)
})
}

0 comments on commit e1a6a4d

Please sign in to comment.