diff --git a/pkg/sources/transformer/builtin/event_time/event_time_extractor.go b/pkg/sources/transformer/builtin/event_time/event_time_extractor.go index ff143f2c3d..c844c518db 100644 --- a/pkg/sources/transformer/builtin/event_time/event_time_extractor.go +++ b/pkg/sources/transformer/builtin/event_time/event_time_extractor.go @@ -56,7 +56,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { log := logging.FromContext(ctx) - resultMsg, err := e.apply(datum.Value(), datum.EventTime()) + resultMsg, err := e.apply(datum.Value(), datum.EventTime(), keys) if err != nil { log.Warnf("event time extractor got an error: %v, skip updating event time...", err) } @@ -66,10 +66,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) // apply compiles the payload to extract the new event time. If there is any error during extraction, // we pass on the original input event time. Otherwise, we assign the new event time to the message. -func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransformer.Message, error) { +func (e eventTimeExtractor) apply(payload []byte, et time.Time, keys []string) (sourcetransformer.Message, error) { timeStr, err := expr.EvalStr(e.expression, payload) if err != nil { - return sourcetransformer.NewMessage(payload, et), err + return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err } var newEventTime time.Time @@ -80,8 +80,8 @@ func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransform newEventTime, err = dateparse.ParseStrict(timeStr) } if err != nil { - return sourcetransformer.NewMessage(payload, et), err + return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err } else { - return sourcetransformer.NewMessage(payload, newEventTime), nil + return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil } } diff --git a/pkg/sources/transformer/builtin/event_time/event_time_extractor_test.go b/pkg/sources/transformer/builtin/event_time/event_time_extractor_test.go index 22fc6b631e..d14e37beed 100644 --- a/pkg/sources/transformer/builtin/event_time/event_time_extractor_test.go +++ b/pkg/sources/transformer/builtin/event_time/event_time_extractor_test.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/assert" ) +var _keys = []string{"test-key"} + type testDatum struct { value []byte eventTime time.Time @@ -74,7 +76,7 @@ func TestEventTimeExtractor(t *testing.T) { assert.NoError(t, err) testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: time.Time{}, watermark: time.Time{}, @@ -86,6 +88,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("Json expression valid, assign a new event time to the message - format specified", func(t *testing.T) { @@ -94,7 +98,7 @@ func TestEventTimeExtractor(t *testing.T) { assert.NoError(t, err) testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: time.Time{}, watermark: time.Time{}, @@ -106,6 +110,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("Time string not matching user-provided format, pass on the message without assigning new event time", func(t *testing.T) { @@ -114,9 +120,9 @@ func TestEventTimeExtractor(t *testing.T) { assert.NoError(t, err) testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) - // Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched. + // Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched. testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, @@ -126,6 +132,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.Equal(t, testInputEventTime, result.Items()[0].EventTime()) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("Cannot compile json expression, pass on the message without assigning new event time", func(t *testing.T) { @@ -135,7 +143,7 @@ func TestEventTimeExtractor(t *testing.T) { testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, @@ -146,6 +154,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("The time string is in epoch format with a granularity of seconds, assign a new event time to the message", func(t *testing.T) { @@ -154,9 +164,9 @@ func TestEventTimeExtractor(t *testing.T) { assert.NoError(t, err) testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) - // Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched. + // Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched. testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888"},{"id": 2, "name": "numa", "time": "1673239888"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, @@ -168,6 +178,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("The time string is in epoch format with a granularity of milliseconds, assign a new event time to the message", func(t *testing.T) { @@ -177,7 +189,7 @@ func TestEventTimeExtractor(t *testing.T) { testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888123"},{"id": 2, "name": "numa", "time": "1673239888123"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, @@ -189,6 +201,8 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("The time string is ambiguous, pass on the message without assigning new event time", func(t *testing.T) { @@ -199,7 +213,7 @@ func TestEventTimeExtractor(t *testing.T) { testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) // 04/08/2014 is ambiguous because it could be mm/dd/yyyy or dd/mm/yyyy. testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "04/08/2014 22:05"},{"id": 2, "name": "numa", "time": "04/08/2014 22:05"}]}` - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, @@ -211,5 +225,7 @@ func TestEventTimeExtractor(t *testing.T) { assert.True(t, expected.Equal(result.Items()[0].EventTime())) // Verify the payload remains unchanged. assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // Verify the keys remain unchanged. + assert.Equal(t, _keys, result.Items()[0].Keys()) }) } diff --git a/pkg/sources/transformer/builtin/filter/filter.go b/pkg/sources/transformer/builtin/filter/filter.go index 08f47795f2..c30b99017f 100644 --- a/pkg/sources/transformer/builtin/filter/filter.go +++ b/pkg/sources/transformer/builtin/filter/filter.go @@ -32,17 +32,17 @@ type filter struct { } func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) { - expr, existing := args["expression"] + exp, existing := args["expression"] if !existing { return nil, fmt.Errorf(`missing "expression"`) } f := filter{ - expression: expr, + expression: exp, } return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { log := logging.FromContext(ctx) - resultMsg, err := f.apply(datum.EventTime(), datum.Value()) + resultMsg, err := f.apply(datum.EventTime(), datum.Value(), keys) if err != nil { log.Errorf("Filter map function apply got an error: %v", err) } @@ -50,13 +50,13 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) }, nil } -func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) { +func (f filter) apply(et time.Time, msg []byte, keys []string) (sourcetransformer.Message, error) { result, err := expr.EvalBool(f.expression, msg) if err != nil { return sourcetransformer.MessageToDrop(et), err } if result { - return sourcetransformer.NewMessage(msg, et), nil + return sourcetransformer.NewMessage(msg, et).WithKeys(keys), nil } return sourcetransformer.MessageToDrop(et), nil } diff --git a/pkg/sources/transformer/builtin/filter/filter_test.go b/pkg/sources/transformer/builtin/filter/filter_test.go index 8ac227bdc2..fb0d7a5768 100644 --- a/pkg/sources/transformer/builtin/filter/filter_test.go +++ b/pkg/sources/transformer/builtin/filter/filter_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/numaproj/numaflow-go/pkg/sourcetransformer" "github.com/stretchr/testify/assert" ) @@ -86,6 +87,7 @@ func TestExpression(t *testing.T) { watermark: time.Time{}, }) assert.Equal(t, jsonMsg, string(result.Items()[0].Value())) + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("invalid expression", func(t *testing.T) { @@ -99,7 +101,7 @@ func TestExpression(t *testing.T) { eventTime: time.Time{}, watermark: time.Time{}, }) - assert.Equal(t, "", string(result.Items()[0].Value())) + assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0]) }) t.Run("Json expression invalid", func(t *testing.T) { @@ -113,7 +115,7 @@ func TestExpression(t *testing.T) { eventTime: time.Time{}, watermark: time.Time{}, }) - assert.Equal(t, "", string(result.Items()[0].Value())) + assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0]) }) t.Run("String expression invalid", func(t *testing.T) { @@ -127,7 +129,7 @@ func TestExpression(t *testing.T) { eventTime: time.Time{}, watermark: time.Time{}, }) - assert.Equal(t, "", string(result.Items()[0].Value())) + assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0]) }) t.Run("base64 expression valid", func(t *testing.T) { @@ -142,6 +144,7 @@ func TestExpression(t *testing.T) { watermark: time.Time{}, }) assert.Equal(t, base64Msg, string(result.Items()[0].Value())) + assert.Equal(t, _keys, result.Items()[0].Keys()) }) t.Run("event time unchanged", func(t *testing.T) { @@ -157,5 +160,6 @@ func TestExpression(t *testing.T) { watermark: time.Time{}, }) assert.Equal(t, testEventTime, result.Items()[0].EventTime()) + assert.Equal(t, _keys, result.Items()[0].Keys()) }) } diff --git a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go index 112c5de25c..bb1515cb1e 100644 --- a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go +++ b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter.go @@ -59,7 +59,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages { log := logging.FromContext(ctx) - resultMsg, err := e.apply(datum.EventTime(), datum.Value()) + resultMsg, err := e.apply(datum.EventTime(), datum.Value(), keys) if err != nil { log.Errorf("Filter or event time extractor got an error: %v", err) } @@ -68,7 +68,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) } -func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) { +func (e expressions) apply(et time.Time, payload []byte, keys []string) (sourcetransformer.Message, error) { result, err := expr.EvalBool(e.filterExpr, payload) if err != nil { return sourcetransformer.MessageToDrop(et), err @@ -76,7 +76,7 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess if result { timeStr, err := expr.EvalStr(e.eventTimeExpr, payload) if err != nil { - return sourcetransformer.NewMessage(payload, et), err + return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err } var newEventTime time.Time time.Local, _ = time.LoadLocation("UTC") @@ -86,9 +86,9 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess newEventTime, err = dateparse.ParseStrict(timeStr) } if err != nil { - return sourcetransformer.NewMessage(payload, et), err + return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err } else { - return sourcetransformer.NewMessage(payload, newEventTime), nil + return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil } } return sourcetransformer.MessageToDrop(et), nil diff --git a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter_test.go b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter_test.go index 02631c25cd..a178ce251b 100644 --- a/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter_test.go +++ b/pkg/sources/transformer/builtin/time_extraction_filter/time_extraction_filter_test.go @@ -21,9 +21,12 @@ import ( "testing" "time" + "github.com/numaproj/numaflow-go/pkg/sourcetransformer" "github.com/stretchr/testify/assert" ) +var _keys = []string{"test-key"} + type testDatum struct { value []byte eventTime time.Time @@ -66,7 +69,6 @@ var ( ) func TestFilterEventTime(t *testing.T) { - t.Run("Missing both expressions, return error", func(t *testing.T) { _, err := New(map[string]string{}) assert.Error(t, err) @@ -89,14 +91,16 @@ func TestFilterEventTime(t *testing.T) { handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339}) assert.NoError(t, err) - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: time.Time{}, watermark: time.Time{}, }) - // check that messsage has not changed + // check that message has not changed assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // check that keys have not changed + assert.Equal(t, _keys, result.Items()[0].Keys()) // check that event time has changed time.Local, _ = time.LoadLocation("UTC") @@ -108,13 +112,13 @@ func TestFilterEventTime(t *testing.T) { handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 3", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339}) assert.NoError(t, err) - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: time.Time{}, watermark: time.Time{}, }) - assert.Equal(t, "", string(result.Items()[0].Value())) + assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0]) }) t.Run("Valid JSON expression for filter, incorrect format to eventTime", func(t *testing.T) { @@ -122,14 +126,17 @@ func TestFilterEventTime(t *testing.T) { assert.NoError(t, err) testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC) - result := handle(context.Background(), []string{"test-key"}, &testDatum{ + result := handle(context.Background(), _keys, &testDatum{ value: []byte(testJsonMsg), eventTime: testInputEventTime, watermark: time.Time{}, }) + // check that message event time has not changed assert.Equal(t, testInputEventTime, result.Items()[0].EventTime()) + // check that message has not changed assert.Equal(t, testJsonMsg, string(result.Items()[0].Value())) + // check that keys have not been added + assert.Equal(t, _keys, result.Items()[0].Keys()) }) - }