diff --git a/e2e/add_nums_verbose/main/main.neva b/e2e/add_nums_verbose/main/main.neva index 6365127f..6d05db0b 100644 --- a/e2e/add_nums_verbose/main/main.neva +++ b/e2e/add_nums_verbose/main/main.neva @@ -9,7 +9,7 @@ flow Main(start any) (stop any) { (1 -> sequencer:port[0]), (2 -> sequencer:port[1]) ] - sequencer:seq -> adder:seq + sequencer:data -> adder:data adder:res -> println:data println:sig -> :stop } \ No newline at end of file diff --git a/examples/reduce_list/e2e_test.go b/examples/reduce_list/e2e_test.go new file mode 100644 index 00000000..13e18ddf --- /dev/null +++ b/examples/reduce_list/e2e_test.go @@ -0,0 +1,45 @@ +package test + +import ( + "context" + "os" + "os/exec" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test(t *testing.T) { + err := os.Chdir("..") + require.NoError(t, err) + + wd, err := os.Getwd() + require.NoError(t, err) + defer os.Chdir(wd) + + for i := 0; i < 100; i++ { + cmd := exec.Command("neva", "run", "filter_list") + + // Set a timeout for the command + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cmd = exec.CommandContext(ctx, cmd.Path, cmd.Args[1:]...) + + out, err := cmd.CombinedOutput() + if err != nil { + if ctx.Err() == context.DeadlineExceeded { + t.Fatal("Command timed out after 5 seconds") + } + require.NoError(t, err) + } + + require.Equal( + t, + "55\n", + string(out), + ) + + require.Equal(t, 0, cmd.ProcessState.ExitCode()) + } +} diff --git a/examples/reduce_list/main.neva b/examples/reduce_list/main.neva new file mode 100644 index 00000000..fecc112a --- /dev/null +++ b/examples/reduce_list/main.neva @@ -0,0 +1,9 @@ +const lst list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + +flow Main(start) (stop) { + Iter, Reduce{AddReducer}, Println + --- + :start -> ( + $lst -> iter -> reduce -> println -> :stop + ) +} diff --git a/internal/compiler/parser/smoke_test/happypath/026_mixed.neva b/internal/compiler/parser/smoke_test/happypath/026_mixed.neva index eee6de66..b240a7d1 100644 --- a/internal/compiler/parser/smoke_test/happypath/026_mixed.neva +++ b/internal/compiler/parser/smoke_test/happypath/026_mixed.neva @@ -25,4 +25,4 @@ flow Main(start any) (stop any) { #extern(int IntAdder, float FloatAdder, string StringAdder) pub flow Add< T int | float | string ->(seq stream) (res T) \ No newline at end of file +>(data stream) (res T) \ No newline at end of file diff --git a/internal/runtime/funcs/arr_port_to_stream.go b/internal/runtime/funcs/arr_port_to_stream.go index 63c78f07..68a5ac79 100644 --- a/internal/runtime/funcs/arr_port_to_stream.go +++ b/internal/runtime/funcs/arr_port_to_stream.go @@ -18,7 +18,7 @@ func (arrayPortToStream) Create( return nil, errors.New("missing array inport 'port'") } - seqOut, err := io.Out.Single("seq") + seqOut, err := io.Out.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/int_add.go b/internal/runtime/funcs/int_add.go index ab6f3c93..f0bfa401 100644 --- a/internal/runtime/funcs/int_add.go +++ b/internal/runtime/funcs/int_add.go @@ -12,7 +12,7 @@ func (intAdd) Create( io runtime.IO, _ runtime.Msg, ) (func(ctx context.Context), error) { - seqIn, err := io.In.Single("seq") + seqIn, err := io.In.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/int_add_reducer.go b/internal/runtime/funcs/int_add_reducer.go new file mode 100644 index 00000000..ed2c0933 --- /dev/null +++ b/internal/runtime/funcs/int_add_reducer.go @@ -0,0 +1,48 @@ +package funcs + +import ( + "context" + + "github.com/nevalang/neva/internal/runtime" +) + +type intAddReducer struct{} + +func (intAddReducer) Create( + io runtime.IO, + _ runtime.Msg, +) (func(ctx context.Context), error) { + firstIn, err := io.In.Single("first") + if err != nil { + return nil, err + } + + secondIn, err := io.In.Single("second") + if err != nil { + return nil, err + } + + resOut, err := io.Out.Single("res") + if err != nil { + return nil, err + } + + return func(ctx context.Context) { + for { + firstMsg, ok := firstIn.Receive(ctx) + if !ok { + return + } + + secondMsg, ok := secondIn.Receive(ctx) + if !ok { + return + } + + resMsg := runtime.NewIntMsg(firstMsg.Int() + secondMsg.Int()) + if !resOut.Send(ctx, resMsg) { + return + } + } + }, nil +} diff --git a/internal/runtime/funcs/int_mul.go b/internal/runtime/funcs/int_mul.go index b1ef0b19..d42a5d1e 100644 --- a/internal/runtime/funcs/int_mul.go +++ b/internal/runtime/funcs/int_mul.go @@ -9,7 +9,7 @@ import ( type intMul struct{} func (intMul) Create(io runtime.IO, _ runtime.Msg) (func(ctx context.Context), error) { - seqIn, err := io.In.Single("seq") + seqIn, err := io.In.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/int_sub.go b/internal/runtime/funcs/int_sub.go index c306e8eb..10b8c2f8 100644 --- a/internal/runtime/funcs/int_sub.go +++ b/internal/runtime/funcs/int_sub.go @@ -9,7 +9,7 @@ import ( type intSub struct{} func (intSub) Create(io runtime.IO, _ runtime.Msg) (func(ctx context.Context), error) { - seqIn, err := io.In.Single("seq") + seqIn, err := io.In.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/list_to_stream.go b/internal/runtime/funcs/list_to_stream.go index 2ef7d41d..d634c01a 100644 --- a/internal/runtime/funcs/list_to_stream.go +++ b/internal/runtime/funcs/list_to_stream.go @@ -17,7 +17,7 @@ func (c listToStream) Create( return nil, err } - seqOut, err := io.Out.Single("seq") + seqOut, err := io.Out.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/registry.go b/internal/runtime/funcs/registry.go index 82e64ac8..87211e9d 100644 --- a/internal/runtime/funcs/registry.go +++ b/internal/runtime/funcs/registry.go @@ -52,13 +52,14 @@ func CreatorRegistry() map[string]runtime.FuncCreator { "field": readStructField{}, // math - "int_add": intAdd{}, - "int_sub": intSub{}, - "int_mul": intMul{}, - "int_div": intDiv{}, - "float_div": floatDiv{}, - "int_decr": intDecr{}, - "int_mod": intMod{}, + "int_add": intAdd{}, + "int_add_reducer": intAddReducer{}, + "int_sub": intSub{}, + "int_mul": intMul{}, + "int_div": intDiv{}, + "float_div": floatDiv{}, + "int_decr": intDecr{}, + "int_mod": intMod{}, // strconv "parse_int": parseInt{}, diff --git a/internal/runtime/funcs/stream_product.go b/internal/runtime/funcs/stream_product.go index fd0a26ef..b22113d2 100644 --- a/internal/runtime/funcs/stream_product.go +++ b/internal/runtime/funcs/stream_product.go @@ -22,7 +22,7 @@ func (streamProduct) Create( return nil, err } - seqOut, err := io.Out.Single("seq") + seqOut, err := io.Out.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/stream_to_list.go b/internal/runtime/funcs/stream_to_list.go index 90de67e7..adec7080 100644 --- a/internal/runtime/funcs/stream_to_list.go +++ b/internal/runtime/funcs/stream_to_list.go @@ -12,7 +12,7 @@ func (s streamToList) Create( io runtime.IO, _ runtime.Msg, ) (func(ctx context.Context), error) { - seqIn, err := io.In.Single("seq") + seqIn, err := io.In.Single("data") if err != nil { return nil, err } diff --git a/internal/runtime/funcs/stream_zip.go b/internal/runtime/funcs/stream_zip.go index d40d63d6..a5d6e4bf 100644 --- a/internal/runtime/funcs/stream_zip.go +++ b/internal/runtime/funcs/stream_zip.go @@ -22,7 +22,7 @@ func (streamZip) Create( return nil, err } - seqOut, err := io.Out.Single("seq") + seqOut, err := io.Out.Single("data") if err != nil { return nil, err } diff --git a/std/builtin/collections.neva b/std/builtin/collections.neva index 55658d79..0490b361 100644 --- a/std/builtin/collections.neva +++ b/std/builtin/collections.neva @@ -7,7 +7,7 @@ pub flow Len | map | string>(data T) (res int) // List receives stream and sends list with all elements from the stream. #extern(stream_to_list) -pub flow List(seq stream) (res list) +pub flow List(data stream) (res list) // Push creates new list with appended element. #extern(list_push) @@ -21,4 +21,4 @@ pub flow Sort(data list) (res T) pub flow Slice>(data T, from int, to int) (res T, err error) #extern(list_to_stream) -pub flow Iter(data list) (seq stream) +pub flow Iter(data list) (data stream) diff --git a/std/builtin/math.neva b/std/builtin/math.neva index 44d01a30..efdc5da7 100644 --- a/std/builtin/math.neva +++ b/std/builtin/math.neva @@ -1,11 +1,11 @@ #extern(int int_add, float float_add, string string_add) -pub flow Add(seq stream) (res T) +pub flow Add(data stream) (res T) #extern(int int_sub, float float_sub) -pub flow Sub(seq stream) (res T) +pub flow Sub(data stream) (res T) #extern(int int_mul, float float_mul) -pub flow Mul(seq stream) (res T) +pub flow Mul(data stream) (res T) #extern(int int_div, float float_div) pub flow Div(x T, y T) (res T, err error) @@ -15,3 +15,8 @@ pub flow Decr(data T) (res T) #extern(int_mod) pub flow Mod(num int, den int) (res int, err error) + +// === Reducer Interface Experiment === + +#extern(int int_add_reducer, float float_add_reducer, string string_add_reducer) +pub flow AddReducer(acc T, el T) (res T) \ No newline at end of file diff --git a/std/streams/streams.neva b/std/streams/streams.neva index 2648e1e0..53eb8d8e 100644 --- a/std/streams/streams.neva +++ b/std/streams/streams.neva @@ -4,7 +4,7 @@ pub type ProductResult struct { } #extern(stream_product) -pub flow Product(first stream, second stream) (seq stream>) +pub flow Product(first stream, second stream) (data stream>) pub type ZipResult struct { first T @@ -15,4 +15,4 @@ pub type ZipResult struct { // where first is from the first stream and second is from the second stream. // Stops when either stream is exhausted. #extern(stream_zip) -pub flow Zip(first stream, second stream) (seq stream>) \ No newline at end of file +pub flow Zip(first stream, second stream) (data stream>) \ No newline at end of file