Skip to content

Commit

Permalink
fix TestBuiltinEventTimeExtractor (numaproj#885)
Browse files Browse the repository at this point in the history
  • Loading branch information
yhl25 committed Aug 1, 2023
1 parent d3b3a92 commit 5073f1c
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions test/e2e-suite-1/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Expand All @@ -20,6 +18,7 @@ package e2e

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -221,6 +220,7 @@ func (s *FunctionalSuite) TestBuiltinEventTimeExtractor() {
w := s.Given().Pipeline("@testdata/extract-event-time-from-payload.yaml").
When().
CreatePipelineAndWait()
currentTime := time.Now().UnixMilli()
defer w.DeletePipelineAndWait()
pipelineName := "extract-event-time"

Expand All @@ -237,23 +237,28 @@ func (s *FunctionalSuite) TestBuiltinEventTimeExtractor() {
_ = client.Close()
}()

// In this test, we send a message with event time being now, apply event time extractor and verify from log that the message event time gets updated.
testMsgOne := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-01-18T21:54:42.123Z"}]}`
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgOne)))
w.Expect().VertexPodLogContains("out", fmt.Sprintf("EventTime - %d", time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli()), PodLogCheckOptionWithCount(1))

// Verify watermark is generated based on the new event time.
testMsgTwo := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-02-18T21:54:42.123Z"}]}`
testMsgThree := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-03-18T21:54:42.123Z"}]}`
testMsgFour := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-04-18T21:54:42.123Z"}]}`
testMsgFive := `{"test": 21, "item": [{"id": 1, "name": "numa", "time": "2022-02-18T21:54:42.123Z"},{"id": 2, "name": "numa", "time": "2021-05-18T21:54:42.123Z"}]}`
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgTwo)))
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgThree)))
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgFour)))
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgFive)))
done := make(chan struct{})
go func() {
startTime := time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC)
for {
select {
case <-done:
return
case <-ctx.Done():
return
default:
testMsg := generateTestMsg("numa", startTime)
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsg)))
startTime = startTime.Add(1 * time.Minute)
time.Sleep(100 * time.Millisecond)
}
}
}()
// In this test, we send a message with event time being now, apply event time extractor and verify from log that the message event time gets updated.
w.Expect().VertexPodLogContains("out", fmt.Sprintf("EventTime - %d", time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli()), PodLogCheckOptionWithCount(1))

wmLoop:
for {
Expand All @@ -272,14 +277,16 @@ wmLoop:
if err != nil {
assert.Fail(s.T(), err.Error())
}
// Watermark propagation can delay, we consider the test as passed as long as the retrieved watermark matches one of the assigned event times.
assert.True(s.T(), edgeWM == time.Date(2021, 5, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli() || edgeWM == time.Date(2021, 4, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli() || edgeWM == time.Date(2021, 3, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli() || edgeWM == time.Date(2021, 2, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli() || edgeWM == time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli())
println(edgeWM)
// Watermark propagation can delay, we consider the test as passed as long as the retrieved watermark is greater than the event time of the first message
// and less than the current time.
assert.True(s.T(), edgeWM >= time.Date(2021, 1, 18, 21, 54, 42, 123000000, time.UTC).UnixMilli() && edgeWM < currentTime)
break wmLoop
}
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte(testMsgFive)))
time.Sleep(time.Second)
}
}
done <- struct{}{}
}

func (s *FunctionalSuite) TestConditionalForwarding() {
Expand Down Expand Up @@ -387,3 +394,24 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie
func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}

type Item struct {
ID int `json:"id"`
Name string `json:"name"`
Time time.Time `json:"time"`
}

type TestMsg struct {
Test int `json:"test"`
Item []Item `json:"item"`
}

func generateTestMsg(msg string, t time.Time) string {
items := []Item{
{ID: 1, Name: msg, Time: t},
{ID: 2, Name: msg, Time: t},
}
testMsg := TestMsg{Test: 21, Item: items}
jsonBytes, _ := json.Marshal(testMsg)
return string(jsonBytes)
}

0 comments on commit 5073f1c

Please sign in to comment.