Skip to content

Commit

Permalink
GroupBy dynamic distribution function type change (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah authored Nov 5, 2020
1 parent c016903 commit 2c86562
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
8 changes: 5 additions & 3 deletions doc/groupbydynamic.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@

Divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.

`GroupByDyDynamic` differs from [GroupBy](groupby.md) in the sense that it does not require to pass the set length.
`GroupByDyDynamic` differs from [GroupBy](groupby.md) for two reasons:
* We don't need to pass a fixed set length.
* The distribution function is a `func(rxgo.Item) string` instead of a `func(rxgo.Item) int`. The rationale is because of possible collisions. For example, if our distribution function produces 128-bit UUIDs, there is a collision risk if such a UUID has to be casted into an int.

![](http://reactivex.io/documentation/operators/images/groupBy.c.png)

## Example

```go
count := 3
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) int {
return item.V.(int) % count
observable := rxgo.Range(0, 10).GroupByDynamic(func(item rxgo.Item) string {
return strconv.Itoa(item.V.(int) % count)
}, rxgo.WithBufferedChannel(10))

for i := range observable.Observe() {
Expand Down
2 changes: 1 addition & 1 deletion observable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Observable interface {
FlatMap(apply ItemToObservable, opts ...Option) Observable
ForEach(nextFunc NextFunc, errFunc ErrFunc, completedFunc CompletedFunc, opts ...Option) Disposed
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
GroupByDynamic(distribution func(Item) int, opts ...Option) Observable
GroupByDynamic(distribution func(Item) string, opts ...Option) Observable
IgnoreElements(opts ...Option) Observable
Join(joiner Func2, right Observable, timeExtractor func(interface{}) time.Time, window Duration, opts ...Option) Observable
Last(opts ...Option) OptionalSingle
Expand Down
6 changes: 3 additions & 3 deletions observable_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,15 +1353,15 @@ func (o *ObservableImpl) GroupBy(length int, distribution func(Item) int, opts .
type GroupedObservable struct {
Observable
// Key is the distribution key
Key int
Key string
}

// GroupByDynamic divides an Observable into a dynamic set of Observables that each emit GroupedObservable from the original Observable, organized by key.
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) int, opts ...Option) Observable {
func (o *ObservableImpl) GroupByDynamic(distribution func(Item) string, opts ...Option) Observable {
option := parseOptions(opts...)
next := option.buildChannel()
ctx := option.buildContext()
chs := make(map[int]chan Item)
chs := make(map[string]chan Item)

go func() {
observe := o.Observe(opts...)
Expand Down
15 changes: 8 additions & 7 deletions observable_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -960,11 +961,11 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
count := 3
max := 10

obs := Range(0, max).GroupByDynamic(func(item Item) int {
obs := Range(0, max).GroupByDynamic(func(item Item) string {
if item.V == 10 {
return 10
return "10"
}
return item.V.(int) % count
return strconv.Itoa(item.V.(int) % count)
}, WithBufferedChannel(max))
s, err := obs.ToSlice(0)
if err != nil {
Expand All @@ -975,13 +976,13 @@ func Test_Observable_GroupByDynamic(t *testing.T) {
}

Assert(ctx, t, s[0].(GroupedObservable), HasItems(0, 3, 6, 9), HasNoError())
assert.Equal(t, 0, s[0].(GroupedObservable).Key)
assert.Equal(t, "0", s[0].(GroupedObservable).Key)
Assert(ctx, t, s[1].(GroupedObservable), HasItems(1, 4, 7), HasNoError())
assert.Equal(t, 1, s[1].(GroupedObservable).Key)
assert.Equal(t, "1", s[1].(GroupedObservable).Key)
Assert(ctx, t, s[2].(GroupedObservable), HasItems(2, 5, 8), HasNoError())
assert.Equal(t, 2, s[2].(GroupedObservable).Key)
assert.Equal(t, "2", s[2].(GroupedObservable).Key)
Assert(ctx, t, s[3].(GroupedObservable), HasItems(10), HasNoError())
assert.Equal(t, 10, s[3].(GroupedObservable).Key)
assert.Equal(t, "10", s[3].(GroupedObservable).Key)
}

func joinTest(ctx context.Context, t *testing.T, left, right []interface{}, window Duration, expected []int64) {
Expand Down

0 comments on commit 2c86562

Please sign in to comment.