Skip to content

Commit

Permalink
wip(std): experiment with reduce and math operators as reducers
Browse files Browse the repository at this point in the history
  • Loading branch information
emil14 committed Sep 21, 2024
1 parent 4b83b68 commit 4c8d743
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 24 deletions.
2 changes: 1 addition & 1 deletion e2e/add_nums_verbose/main/main.neva
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ flow Main(start any) (stop any) {
(1 -> sequencer:port[0]),
(2 -> sequencer:port[1])
]
sequencer:seq -> adder:seq
sequencer:data -> adder:data
adder:res -> println:data
println:sig -> :stop
}
45 changes: 45 additions & 0 deletions examples/reduce_list/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package test

import (
"context"
"os"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test(t *testing.T) {
err := os.Chdir("..")
require.NoError(t, err)

wd, err := os.Getwd()
require.NoError(t, err)
defer os.Chdir(wd)

for i := 0; i < 100; i++ {
cmd := exec.Command("neva", "run", "filter_list")

// Set a timeout for the command
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cmd = exec.CommandContext(ctx, cmd.Path, cmd.Args[1:]...)

out, err := cmd.CombinedOutput()
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
t.Fatal("Command timed out after 5 seconds")
}
require.NoError(t, err)
}

require.Equal(
t,
"55\n",
string(out),
)

require.Equal(t, 0, cmd.ProcessState.ExitCode())
}
}
9 changes: 9 additions & 0 deletions examples/reduce_list/main.neva
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const lst list<int> = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

flow Main(start) (stop) {
Iter, Reduce{AddReducer}, Println
---
:start -> (
$lst -> iter -> reduce -> println -> :stop
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ flow Main(start any) (stop any) {
#extern(int IntAdder, float FloatAdder, string StringAdder)
pub flow Add<
T int | float | string
>(seq stream<T>) (res T)
>(data stream<T>) (res T)
2 changes: 1 addition & 1 deletion internal/runtime/funcs/arr_port_to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (arrayPortToStream) Create(
return nil, errors.New("missing array inport 'port'")
}

seqOut, err := io.Out.Single("seq")
seqOut, err := io.Out.Single("data")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/int_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (intAdd) Create(
io runtime.IO,
_ runtime.Msg,
) (func(ctx context.Context), error) {
seqIn, err := io.In.Single("seq")
seqIn, err := io.In.Single("data")
if err != nil {
return nil, err
}
Expand Down
48 changes: 48 additions & 0 deletions internal/runtime/funcs/int_add_reducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package funcs

import (
"context"

"github.com/nevalang/neva/internal/runtime"
)

type intAddReducer struct{}

func (intAddReducer) Create(
io runtime.IO,
_ runtime.Msg,
) (func(ctx context.Context), error) {
firstIn, err := io.In.Single("first")
if err != nil {
return nil, err
}

secondIn, err := io.In.Single("second")
if err != nil {
return nil, err
}

resOut, err := io.Out.Single("res")
if err != nil {
return nil, err
}

return func(ctx context.Context) {
for {
firstMsg, ok := firstIn.Receive(ctx)
if !ok {
return
}

secondMsg, ok := secondIn.Receive(ctx)
if !ok {
return
}

resMsg := runtime.NewIntMsg(firstMsg.Int() + secondMsg.Int())
if !resOut.Send(ctx, resMsg) {
return
}
}
}, nil
}
2 changes: 1 addition & 1 deletion internal/runtime/funcs/int_mul.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type intMul struct{}

func (intMul) Create(io runtime.IO, _ runtime.Msg) (func(ctx context.Context), error) {
seqIn, err := io.In.Single("seq")
seqIn, err := io.In.Single("data")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/int_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type intSub struct{}

func (intSub) Create(io runtime.IO, _ runtime.Msg) (func(ctx context.Context), error) {
seqIn, err := io.In.Single("seq")
seqIn, err := io.In.Single("data")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/list_to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (c listToStream) Create(
return nil, err
}

seqOut, err := io.Out.Single("seq")
seqOut, err := io.Out.Single("data")
if err != nil {
return nil, err
}
Expand Down
15 changes: 8 additions & 7 deletions internal/runtime/funcs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ func CreatorRegistry() map[string]runtime.FuncCreator {
"field": readStructField{},

// math
"int_add": intAdd{},
"int_sub": intSub{},
"int_mul": intMul{},
"int_div": intDiv{},
"float_div": floatDiv{},
"int_decr": intDecr{},
"int_mod": intMod{},
"int_add": intAdd{},
"int_add_reducer": intAddReducer{},
"int_sub": intSub{},
"int_mul": intMul{},
"int_div": intDiv{},
"float_div": floatDiv{},
"int_decr": intDecr{},
"int_mod": intMod{},

// strconv
"parse_int": parseInt{},
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/stream_product.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (streamProduct) Create(
return nil, err
}

seqOut, err := io.Out.Single("seq")
seqOut, err := io.Out.Single("data")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/stream_to_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func (s streamToList) Create(
io runtime.IO,
_ runtime.Msg,
) (func(ctx context.Context), error) {
seqIn, err := io.In.Single("seq")
seqIn, err := io.In.Single("data")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/funcs/stream_zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (streamZip) Create(
return nil, err
}

seqOut, err := io.Out.Single("seq")
seqOut, err := io.Out.Single("data")
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions std/builtin/collections.neva
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub flow Len<T list<any> | map<any> | string>(data T) (res int)

// List receives stream and sends list with all elements from the stream.
#extern(stream_to_list)
pub flow List<T>(seq stream<T>) (res list<T>)
pub flow List<T>(data stream<T>) (res list<T>)

// Push creates new list with appended element.
#extern(list_push)
Expand All @@ -21,4 +21,4 @@ pub flow Sort<T int | float | string>(data list<T>) (res T)
pub flow Slice<T string | list<any>>(data T, from int, to int) (res T, err error)

#extern(list_to_stream)
pub flow Iter<T>(data list<T>) (seq stream<T>)
pub flow Iter<T>(data list<T>) (data stream<T>)
11 changes: 8 additions & 3 deletions std/builtin/math.neva
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#extern(int int_add, float float_add, string string_add)
pub flow Add<T int | float | string>(seq stream<T>) (res T)
pub flow Add<T int | float | string>(data stream<T>) (res T)

#extern(int int_sub, float float_sub)
pub flow Sub<T int | float >(seq stream<T>) (res T)
pub flow Sub<T int | float >(data stream<T>) (res T)

#extern(int int_mul, float float_mul)
pub flow Mul<T int | float >(seq stream<T>) (res T)
pub flow Mul<T int | float >(data stream<T>) (res T)

#extern(int int_div, float float_div)
pub flow Div<T int | float >(x T, y T) (res T, err error)
Expand All @@ -15,3 +15,8 @@ pub flow Decr<T int | float>(data T) (res T)

#extern(int_mod)
pub flow Mod(num int, den int) (res int, err error)

// === Reducer Interface Experiment ===

#extern(int int_add_reducer, float float_add_reducer, string string_add_reducer)
pub flow AddReducer<T int | float | string>(acc T, el T) (res T)
4 changes: 2 additions & 2 deletions std/streams/streams.neva
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub type ProductResult<T, R> struct {
}

#extern(stream_product)
pub flow Product<T, R>(first stream<T>, second stream<R>) (seq stream<ProductResult<T, R>>)
pub flow Product<T, R>(first stream<T>, second stream<R>) (data stream<ProductResult<T, R>>)

pub type ZipResult<T, R> struct {
first T
Expand All @@ -15,4 +15,4 @@ pub type ZipResult<T, R> struct {
// where first is from the first stream and second is from the second stream.
// Stops when either stream is exhausted.
#extern(stream_zip)
pub flow Zip<T, R>(first stream<T>, second stream<R>) (seq stream<ZipResult<T, R>>)
pub flow Zip<T, R>(first stream<T>, second stream<R>) (data stream<ZipResult<T, R>>)

0 comments on commit 4c8d743

Please sign in to comment.