Skip to content

Commit

Permalink
fix: builtin transformer should keep the keys (#2047)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Sep 9, 2024
1 parent 24c6553 commit ba40b15
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.Value(), datum.EventTime())
resultMsg, err := e.apply(datum.Value(), datum.EventTime(), keys)
if err != nil {
log.Warnf("event time extractor got an error: %v, skip updating event time...", err)
}
Expand All @@ -66,10 +66,10 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

// apply compiles the payload to extract the new event time. If there is any error during extraction,
// we pass on the original input event time. Otherwise, we assign the new event time to the message.
func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransformer.Message, error) {
func (e eventTimeExtractor) apply(payload []byte, et time.Time, keys []string) (sourcetransformer.Message, error) {
timeStr, err := expr.EvalStr(e.expression, payload)
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
}

var newEventTime time.Time
Expand All @@ -80,8 +80,8 @@ func (e eventTimeExtractor) apply(payload []byte, et time.Time) (sourcetransform
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
} else {
return sourcetransformer.NewMessage(payload, newEventTime), nil
return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/assert"
)

var _keys = []string{"test-key"}

type testDatum struct {
value []byte
eventTime time.Time
Expand Down Expand Up @@ -74,7 +76,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
Expand All @@ -86,6 +88,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Json expression valid, assign a new event time to the message - format specified", func(t *testing.T) {
Expand All @@ -94,7 +98,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
Expand All @@ -106,6 +110,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Time string not matching user-provided format, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -114,9 +120,9 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
// Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -126,6 +132,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.Equal(t, testInputEventTime, result.Items()[0].EventTime())
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("Cannot compile json expression, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -135,7 +143,7 @@ func TestEventTimeExtractor(t *testing.T) {

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -146,6 +154,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is in epoch format with a granularity of seconds, assign a new event time to the message", func(t *testing.T) {
Expand All @@ -154,9 +164,9 @@ func TestEventTimeExtractor(t *testing.T) {
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// Handler receives format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
// Handler receives a format as time.ANSIC but in the message, we use time.RFC3339. Format is not matched.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888"},{"id": 2, "name": "numa", "time": "1673239888"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -168,6 +178,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is in epoch format with a granularity of milliseconds, assign a new event time to the message", func(t *testing.T) {
Expand All @@ -177,7 +189,7 @@ func TestEventTimeExtractor(t *testing.T) {

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "1673239888123"},{"id": 2, "name": "numa", "time": "1673239888123"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -189,6 +201,8 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("The time string is ambiguous, pass on the message without assigning new event time", func(t *testing.T) {
Expand All @@ -199,7 +213,7 @@ func TestEventTimeExtractor(t *testing.T) {
testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
// 04/08/2014 is ambiguous because it could be mm/dd/yyyy or dd/mm/yyyy.
testJsonMsg := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "04/08/2014 22:05"},{"id": 2, "name": "numa", "time": "04/08/2014 22:05"}]}`
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
Expand All @@ -211,5 +225,7 @@ func TestEventTimeExtractor(t *testing.T) {
assert.True(t, expected.Equal(result.Items()[0].EventTime()))
// Verify the payload remains unchanged.
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// Verify the keys remain unchanged.
assert.Equal(t, _keys, result.Items()[0].Keys())
})
}
10 changes: 5 additions & 5 deletions pkg/sources/transformer/builtin/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ type filter struct {
}

func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error) {
expr, existing := args["expression"]
exp, existing := args["expression"]
if !existing {
return nil, fmt.Errorf(`missing "expression"`)
}
f := filter{
expression: expr,
expression: exp,
}

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := f.apply(datum.EventTime(), datum.Value())
resultMsg, err := f.apply(datum.EventTime(), datum.Value(), keys)
if err != nil {
log.Errorf("Filter map function apply got an error: %v", err)
}
return sourcetransformer.MessagesBuilder().Append(resultMsg)
}, nil
}

func (f filter) apply(et time.Time, msg []byte) (sourcetransformer.Message, error) {
func (f filter) apply(et time.Time, msg []byte, keys []string) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(f.expression, msg)
if err != nil {
return sourcetransformer.MessageToDrop(et), err
}
if result {
return sourcetransformer.NewMessage(msg, et), nil
return sourcetransformer.NewMessage(msg, et).WithKeys(keys), nil
}
return sourcetransformer.MessageToDrop(et), nil
}
10 changes: 7 additions & 3 deletions pkg/sources/transformer/builtin/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -86,6 +87,7 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, jsonMsg, string(result.Items()[0].Value()))
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("invalid expression", func(t *testing.T) {
Expand All @@ -99,7 +101,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("Json expression invalid", func(t *testing.T) {
Expand All @@ -113,7 +115,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("String expression invalid", func(t *testing.T) {
Expand All @@ -127,7 +129,7 @@ func TestExpression(t *testing.T) {
eventTime: time.Time{},
watermark: time.Time{},
})
assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("base64 expression valid", func(t *testing.T) {
Expand All @@ -142,6 +144,7 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, base64Msg, string(result.Items()[0].Value()))
assert.Equal(t, _keys, result.Items()[0].Keys())
})

t.Run("event time unchanged", func(t *testing.T) {
Expand All @@ -157,5 +160,6 @@ func TestExpression(t *testing.T) {
watermark: time.Time{},
})
assert.Equal(t, testEventTime, result.Items()[0].EventTime())
assert.Equal(t, _keys, result.Items()[0].Keys())
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

return func(ctx context.Context, keys []string, datum sourcetransformer.Datum) sourcetransformer.Messages {
log := logging.FromContext(ctx)
resultMsg, err := e.apply(datum.EventTime(), datum.Value())
resultMsg, err := e.apply(datum.EventTime(), datum.Value(), keys)
if err != nil {
log.Errorf("Filter or event time extractor got an error: %v", err)
}
Expand All @@ -68,15 +68,15 @@ func New(args map[string]string) (sourcetransformer.SourceTransformFunc, error)

}

func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Message, error) {
func (e expressions) apply(et time.Time, payload []byte, keys []string) (sourcetransformer.Message, error) {
result, err := expr.EvalBool(e.filterExpr, payload)
if err != nil {
return sourcetransformer.MessageToDrop(et), err
}
if result {
timeStr, err := expr.EvalStr(e.eventTimeExpr, payload)
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
}
var newEventTime time.Time
time.Local, _ = time.LoadLocation("UTC")
Expand All @@ -86,9 +86,9 @@ func (e expressions) apply(et time.Time, payload []byte) (sourcetransformer.Mess
newEventTime, err = dateparse.ParseStrict(timeStr)
}
if err != nil {
return sourcetransformer.NewMessage(payload, et), err
return sourcetransformer.NewMessage(payload, et).WithKeys(keys), err
} else {
return sourcetransformer.NewMessage(payload, newEventTime), nil
return sourcetransformer.NewMessage(payload, newEventTime).WithKeys(keys), nil
}
}
return sourcetransformer.MessageToDrop(et), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"testing"
"time"

"github.com/numaproj/numaflow-go/pkg/sourcetransformer"
"github.com/stretchr/testify/assert"
)

var _keys = []string{"test-key"}

type testDatum struct {
value []byte
eventTime time.Time
Expand Down Expand Up @@ -66,7 +69,6 @@ var (
)

func TestFilterEventTime(t *testing.T) {

t.Run("Missing both expressions, return error", func(t *testing.T) {
_, err := New(map[string]string{})
assert.Error(t, err)
Expand All @@ -89,14 +91,16 @@ func TestFilterEventTime(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

// check that messsage has not changed
// check that message has not changed
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// check that keys have not changed
assert.Equal(t, _keys, result.Items()[0].Keys())

// check that event time has changed
time.Local, _ = time.LoadLocation("UTC")
Expand All @@ -108,28 +112,31 @@ func TestFilterEventTime(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 3", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.RFC3339})
assert.NoError(t, err)

result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: time.Time{},
watermark: time.Time{},
})

assert.Equal(t, "", string(result.Items()[0].Value()))
assert.Equal(t, sourcetransformer.MessageToDrop(time.Time{}), result.Items()[0])
})

t.Run("Valid JSON expression for filter, incorrect format to eventTime", func(t *testing.T) {
handle, err := New(map[string]string{"filterExpr": "int(json(payload).item[1].id) == 2", "eventTimeExpr": "json(payload).item[1].time", "eventTimeFormat": time.ANSIC})
assert.NoError(t, err)

testInputEventTime := time.Date(2022, 1, 4, 2, 3, 4, 5, time.UTC)
result := handle(context.Background(), []string{"test-key"}, &testDatum{
result := handle(context.Background(), _keys, &testDatum{
value: []byte(testJsonMsg),
eventTime: testInputEventTime,
watermark: time.Time{},
})

// check that message event time has not changed
assert.Equal(t, testInputEventTime, result.Items()[0].EventTime())
// check that message has not changed
assert.Equal(t, testJsonMsg, string(result.Items()[0].Value()))
// check that keys have not been added
assert.Equal(t, _keys, result.Items()[0].Keys())
})

}

0 comments on commit ba40b15

Please sign in to comment.