diff --git a/tests/certification/bindings/cron/cron_test.go b/tests/certification/bindings/cron/cron_test.go index 6c13814689..f0c1aa933f 100644 --- a/tests/certification/bindings/cron/cron_test.go +++ b/tests/certification/bindings/cron/cron_test.go @@ -11,16 +11,22 @@ See the License for the specific language governing permissions and limitations under the License. */ +//nolint:dupword package cron_test import ( "context" + "errors" "fmt" + goruntime "runtime" + "sync/atomic" "testing" "time" - "github.com/benbjohnson/clock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/bindings/cron" @@ -39,82 +45,81 @@ import ( ) type cronTest struct { - cronName string // name of the cron binding - schedule string // cron schedule - expectedTriggerCount int // expected number of triggers within the deadline - timeoutToObserveTriggers time.Duration // time to add to the mock clock to observe triggers - clk *clock.Mock // mock clock + cronName string // name of the cron binding + schedule string // cron schedule + expectedTriggerCount int64 // expected number of triggers within the deadline + step time.Duration // duration to advance the mock clock every time + testDuration time.Duration // test duration (in the mock clock) + clk *clocktesting.FakeClock // mock clock } // starting time for the mock clock var startTime = time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC) // Test cron triggers with different schedules -// -//nolint:dupword func TestCronBindingTrigger(t *testing.T) { - appName := "cronapp" - sidecarName := "cron-sidecar" + const appName = "cronapp" + const sidecarName = "cron-sidecar" testMatrix := []cronTest{ { - cronName: "cron1s", - schedule: "@every 1s", // Test macro cron format - expectedTriggerCount: 10, - timeoutToObserveTriggers: time.Second * 10, + cronName: "cron1s", + schedule: "@every 1s", // Test macro cron format + expectedTriggerCount: 10, + step: time.Second / 5, + testDuration: time.Second * 10, }, { - cronName: "cron3s", - schedule: "*/3 * * * * *", // Test non-standard crontab format - expectedTriggerCount: 10, - timeoutToObserveTriggers: time.Second * 30, + cronName: "cron3s", + schedule: "*/3 * * * * *", // Test non-standard crontab format + expectedTriggerCount: 10, + step: time.Second, + testDuration: time.Second * 30, }, { - cronName: "cron15m", - schedule: "*/15 * * * *", // Test standard crontab format - expectedTriggerCount: 12, - timeoutToObserveTriggers: time.Hour * 3, + cronName: "cron15m", + schedule: "*/15 * * * *", // Test standard crontab format + expectedTriggerCount: 12, + step: 30 * time.Second, + testDuration: time.Hour * 3, }, { - cronName: "cron6h", - schedule: "0 0 */6 ? * *", // Test quartz cron format - expectedTriggerCount: 12, - timeoutToObserveTriggers: time.Hour * 24 * 3, - }, - { - cronName: "cronMonthly", - schedule: "0 0 1 * *", // Test standard cron format - expectedTriggerCount: 10, - timeoutToObserveTriggers: time.Hour * 24 * 31 * 10, // Add 10 months to the mock clock + cronName: "cron6h", + schedule: "0 0 */6 ? * *", // Test quartz cron format + expectedTriggerCount: 12, + step: time.Minute, + testDuration: time.Hour * 24 * 3, }, } for _, cronTest := range testMatrix { - cronTest.clk = clock.NewMock() - cronTest.clk.Set(startTime) - - ports, _ := dapr_testing.GetFreePorts(3) - grpcPort := ports[0] - httpPort := ports[1] - appPort := ports[2] - - // total times cron is triggered - observedTriggerCount := 0 - - flow.New(t, "test cron trigger with different schedules"). - Step(app.Run(appName, fmt.Sprintf(":%d", appPort), appWithTriggerCounter(t, cronTest.clk, cronTest.cronName, &observedTriggerCount))). - Step(sidecar.Run(sidecarName, - embedded.WithComponentsPath("./components"), - embedded.WithDaprGRPCPort(grpcPort), - embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), - embedded.WithDaprHTTPPort(httpPort), - componentRuntimeOptions(cronTest.clk), - )). - Step("advance the clock time", addTimeToMockClock(cronTest.clk, cronTest.timeoutToObserveTriggers)). - Step("assert cron triggered within deadline", assertTriggerCount(t, cronTest.expectedTriggerCount, &observedTriggerCount)). - Step("stop sidecar", sidecar.Stop(sidecarName)). - Step("stop app", app.Stop(appName)). - Run() + t.Run(cronTest.cronName, func(t *testing.T) { + cronTest.clk = clocktesting.NewFakeClock(startTime) + + ports, _ := dapr_testing.GetFreePorts(3) + grpcPort := ports[0] + httpPort := ports[1] + appPort := ports[2] + + testFn, triggeredCh := testerFn(cronTest.clk, cronTest.testDuration, cronTest.expectedTriggerCount, cronTest.step) + + flow.New(t, "test cron trigger with different schedules"). + Step(app.Run(appName, + fmt.Sprintf(":%d", appPort), + appWithTriggerCounter(t, cronTest.clk, cronTest.cronName, triggeredCh), + )). + Step(sidecar.Run(sidecarName, + embedded.WithResourcesPath("./components"), + embedded.WithDaprGRPCPort(grpcPort), + embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), + embedded.WithDaprHTTPPort(httpPort), + componentRuntimeOptions(cronTest.clk), + )). + Step("run test", testFn). + Step("stop sidecar", sidecar.Stop(sidecarName)). + Step("stop app", app.Stop(appName)). + Run() + }) } } @@ -125,31 +130,32 @@ func TestCronBindingsWithSameRoute(t *testing.T) { httpPort := ports[1] appPort := ports[2] - cronName := "cron" - appName := "cronapp" - sidecarName := "cron-sidecar" + const cronName = "cron" + const appName = "cronapp" + const sidecarName = "cron-sidecar" // check if cron triggers 20 times within 15 seconds (15 times from @every 1s binding and 5 times from @every 3s binding) - expectedTriggerCount := 20 - // total times cron is triggered - observedTriggerCount := 0 + const expectedTriggerCount = 20 // total time for all triggers to be observed - timeoutToObserveTriggers := time.Second * 15 + const testDuration = time.Second * 15 - clk := clock.NewMock() - clk.Set(startTime) + clk := clocktesting.NewFakeClock(startTime) + + testFn, triggeredCh := testerFn(clk, testDuration, expectedTriggerCount, time.Second/2) flow.New(t, "test cron bindings with different schedules and same route"). - Step(app.Run(appName, fmt.Sprintf(":%d", appPort), appWithTriggerCounter(t, clk, cronName, &observedTriggerCount))). + Step(app.Run(appName, + fmt.Sprintf(":%d", appPort), + appWithTriggerCounter(t, clk, cronName, triggeredCh), + )). Step(sidecar.Run(sidecarName, - embedded.WithComponentsPath("./components_sameroute"), + embedded.WithResourcesPath("./components_sameroute"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(clk), )). - Step("advance the clock time", addTimeToMockClock(clk, timeoutToObserveTriggers)). - Step("assert cron triggered within deadline", assertTriggerCount(t, expectedTriggerCount, &observedTriggerCount)). + Step("run test", testFn). Step("stop sidecar", sidecar.Stop(sidecarName)). Step("stop app", app.Stop(appName)). Run() @@ -162,39 +168,61 @@ func TestCronBindingWithAppRestart(t *testing.T) { httpPort := ports[1] appPort := ports[2] - cronName := "cron3s" - appName := "cronapp3s" - sidecarName := "cron-sidecar" + const cronName = "cron3s" + const appName = "cronapp3s" + const sidecarName = "cron-sidecar" // check if cron triggers 5 times within 15 seconds - expectedTriggerCount := 5 - // total times cron is triggered - observedTriggerCount := 0 + const expectedTriggerCount = 5 // total time for all triggers to be observed - timeoutToObserveTriggers := time.Second * 15 + const testDuration = time.Second * 15 // allow cron to trigger once before stopping the app - waitBeforeAppStop := time.Second * 5 + const waitBeforeAppStop = time.Second * 5 // wait for some time after the app has stopped, before restarting the app - waitBeforeAppRestart := time.Second * 5 + const waitBeforeAppRestart = time.Second * 5 - clk := clock.NewMock() - clk.Set(startTime) + clk := clocktesting.NewFakeClock(startTime) + + observedTriggerCountCh := make(chan int64) + counterFn, triggeredCh := counterFn(clk, testDuration, time.Second/2) flow.New(t, "test cron trigger schedule @every3s with app restart"). - Step(app.Run(appName, fmt.Sprintf(":%d", appPort), appWithTriggerCounter(t, clk, cronName, &observedTriggerCount))). + Step(app.Run(appName, + fmt.Sprintf(":%d", appPort), + appWithTriggerCounter(t, clk, cronName, triggeredCh), + )). Step(sidecar.Run(sidecarName, - embedded.WithComponentsPath("./components"), + embedded.WithResourcesPath("./components"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(clk), )). - Step("allow cron to trigger once", addTimeToMockClock(clk, waitBeforeAppStop), flow.Sleep(waitBeforeAppStop)). + Step("start counting executions", func(ctx flow.Context) error { + go func() { + observedTriggerCountCh <- counterFn() + }() + return nil + }). + Step("allow cron to trigger once", flow.Sleep(waitBeforeAppStop)). Step("stop app", app.Stop(appName)). Step("wait before app restart", flow.Sleep(waitBeforeAppRestart)). - Step(app.Run(appName, fmt.Sprintf(":%d", appPort), appWithTriggerCounter(t, clk, cronName, &observedTriggerCount))). - Step("advance the clock time", addTimeToMockClock(clk, timeoutToObserveTriggers)). - Step("assert cron triggered within deadline", assertTriggerCount(t, expectedTriggerCount, &observedTriggerCount)). + Step(app.Run(appName, + fmt.Sprintf(":%d", appPort), + appWithTriggerCounter(t, clk, cronName, triggeredCh), + )). + Step("assert cron triggered within deadline", func(ctx flow.Context) error { + // Assert number of executions + // Allow up to 2 less or extra trigger to account for additional timeout(@schedule interval of cron trigger) provided in the tests or if unable to observe up to 2 triggers during app or sidecar restart + assert.InDelta(ctx.T, float64(expectedTriggerCount), float64(<-observedTriggerCountCh), 2) + + switch { + case ctx.T.Failed(): + return errors.New("test failed") + default: + return nil + } + }). Step("stop sidecar", sidecar.Stop(sidecarName)). Step("stop app", app.Stop(appName)). Run() @@ -207,56 +235,136 @@ func TestCronBindingWithSidecarRestart(t *testing.T) { httpPort := ports[1] appPort := ports[2] - cronName := "cron3s" - appName := "cronapp3s" - sidecarName := "cron-sidecar" + const cronName = "cron3s" + const appName = "cronapp3s" + const sidecarName = "cron-sidecar" // check if cron triggers 5 times within 15 seconds - expectedTriggerCount := 5 - // total times cron is triggered - observedTriggerCount := 0 + const expectedTriggerCount = 5 // total time for all triggers to be observed - timeoutToObserveTriggers := time.Second * 15 + const testDuration = time.Second * 15 // allow cron to trigger once before stopping the sidecar - waitBeforeSidecarStop := time.Second * 5 + const waitBeforeSidecarStop = time.Second * 5 // wait for some time after the app has stopped, before restarting the sidecar - waitBeforeSidecarRestart := time.Second * 5 + const waitBeforeSidecarRestart = time.Second * 5 + + clk := clocktesting.NewFakeClock(startTime) - clk := clock.NewMock() - clk.Set(startTime) + observedTriggerCountCh := make(chan int64) + counterFn, triggeredCh := counterFn(clk, testDuration, time.Second/2) flow.New(t, "test cron trigger schedule @every 3s with sidecar restart"). - Step(app.Run(appName, fmt.Sprintf(":%d", appPort), appWithTriggerCounter(t, clk, cronName, &observedTriggerCount))). + Step(app.Run(appName, + fmt.Sprintf(":%d", appPort), + appWithTriggerCounter(t, clk, cronName, triggeredCh), + )). Step(sidecar.Run(sidecarName, - embedded.WithComponentsPath("./components"), + embedded.WithResourcesPath("./components"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(clk), )). - Step("allow cron to trigger once", addTimeToMockClock(clk, waitBeforeSidecarStop), flow.Sleep(waitBeforeSidecarStop)). + Step("start counting executions", func(ctx flow.Context) error { + go func() { + observedTriggerCountCh <- counterFn() + }() + return nil + }). + Step("allow cron to trigger once", flow.Sleep(waitBeforeSidecarStop)). Step("stop sidecar", sidecar.Stop(sidecarName)). Step("wait before sidecar restart", flow.Sleep(waitBeforeSidecarRestart)). Step(sidecar.Run(sidecarName, - embedded.WithComponentsPath("./components"), + embedded.WithResourcesPath("./components"), embedded.WithDaprGRPCPort(grpcPort), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprHTTPPort(httpPort), componentRuntimeOptions(clk), )). - Step("advance the clock time", addTimeToMockClock(clk, timeoutToObserveTriggers)). - Step("assert cron triggered within deadline", assertTriggerCount(t, expectedTriggerCount, &observedTriggerCount)). + Step("assert cron triggered within deadline", func(ctx flow.Context) error { + // Assert number of executions + // Allow up to 2 less or extra trigger to account for additional timeout(@schedule interval of cron trigger) provided in the tests or if unable to observe up to 2 triggers during app or sidecar restart + assert.InDelta(ctx.T, float64(expectedTriggerCount), float64(<-observedTriggerCountCh), 2) + + switch { + case ctx.T.Failed(): + return errors.New("test failed") + default: + return nil + } + }). Step("stop sidecar", sidecar.Stop(sidecarName)). Step("stop app", app.Stop(appName)). Run() } -func appWithTriggerCounter(t *testing.T, clk *clock.Mock, cronName string, triggeredCount *int) func(ctx flow.Context, s common.Service) error { +func counterFn(clk *clocktesting.FakeClock, testDuration time.Duration, step time.Duration) (func() int64, chan struct{}) { + triggeredCh := make(chan struct{}) + + return func() int64 { + // In background advance the clock and count the executions + observedTriggerCount := atomic.Int64{} + doneCh := make(chan struct{}) + go func() { + for range triggeredCh { + observedTriggerCount.Add(1) + } + close(doneCh) + }() + go func() { + end := clk.Now().Add(testDuration) + // We can't use Before because we want to count the equal time too + for !clk.Now().After(end) { + clk.Step(step) + + // Sleep on the wall clock to allow goroutines to advance + goruntime.Gosched() + time.Sleep(time.Millisecond / 2) + } + + // Close triggeredCh + close(triggeredCh) + }() + + // Sleep for 1s to allow goroutines to catch up if needed + goruntime.Gosched() + time.Sleep(time.Second) + + // Wait for test to end + <-doneCh + + return observedTriggerCount.Load() + }, triggeredCh +} + +func testerFn(clk *clocktesting.FakeClock, testDuration time.Duration, expectedTriggerCount int64, step time.Duration) (func(ctx flow.Context) error, chan struct{}) { + counter, triggeredCh := counterFn(clk, testDuration, step) + + return func(ctx flow.Context) error { + t := ctx.T + + // Count executions + // This call blocks until the test duration + observedTriggerCount := counter() + + // Assert number of executions + assert.Equal(t, expectedTriggerCount, observedTriggerCount) + + switch { + case t.Failed(): + return errors.New("test failed") + default: + return nil + } + }, triggeredCh +} + +func appWithTriggerCounter(t *testing.T, clk clock.Clock, cronName string, triggeredCh chan<- struct{}) func(ctx flow.Context, s common.Service) error { return func(ctx flow.Context, s common.Service) error { // Setup the input binding endpoint err := s.AddBindingInvocationHandler(cronName, func(_ context.Context, in *common.BindingEvent) ([]byte, error) { ctx.Logf("Cron triggered at %s", clk.Now().String()) - (*triggeredCount)++ + triggeredCh <- struct{}{} return []byte("{}"), nil }) require.NoError(t, err) @@ -264,25 +372,6 @@ func appWithTriggerCounter(t *testing.T, clk *clock.Mock, cronName string, trigg } } -func addTimeToMockClock(clk *clock.Mock, timeToAdd time.Duration) func(ctx flow.Context) error { - return func(ctx flow.Context) error { - clk.Add(timeToAdd) - // Wait for 1 second after adding time to mock clock to allow cron goroutine to run - time.Sleep(time.Second) - return nil - } -} - -func assertTriggerCount(t *testing.T, expectedTriggerCount int, observedTriggerCount *int) func(ctx flow.Context) error { - return func(ctx flow.Context) error { - // allow up to 1 less or extra trigger to account for additional timeout(@schedule interval of cron trigger) provided in the tests or if unable to observe up to 1 trigger during app or sidecar restart - if !(*observedTriggerCount >= expectedTriggerCount-1 && *observedTriggerCount <= expectedTriggerCount+1) { - t.Errorf("expected %d triggers, got %d", expectedTriggerCount, *observedTriggerCount) - } - return nil - } -} - func componentRuntimeOptions(clk clock.Clock) []runtime.Option { log := logger.NewLogger("dapr.components") diff --git a/tests/certification/go.mod b/tests/certification/go.mod index 3ee941b9a8..c8b1c48e5f 100644 --- a/tests/certification/go.mod +++ b/tests/certification/go.mod @@ -12,7 +12,6 @@ require ( github.com/apache/pulsar-client-go v0.11.0 github.com/apache/thrift v0.13.0 github.com/aws/aws-sdk-go v1.44.315 - github.com/benbjohnson/clock v1.3.5 github.com/cenkalti/backoff/v4 v4.2.1 github.com/cloudwego/kitex v0.5.0 github.com/cloudwego/kitex-examples v0.1.1 @@ -37,6 +36,7 @@ require ( go.uber.org/multierr v1.11.0 go.uber.org/ratelimit v0.3.0 golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b + k8s.io/utils v0.0.0-20230726121419-3b25d923346b modernc.org/sqlite v1.24.0 ) @@ -77,6 +77,7 @@ require ( github.com/apache/dubbo-getty v1.4.9-0.20220610060150-8af010f3f3dc // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/armon/go-metrics v0.4.1 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect github.com/bradfitz/gomemcache v0.0.0-20230611145640-acc696258285 // indirect @@ -298,7 +299,6 @@ require ( k8s.io/component-base v0.26.3 // indirect k8s.io/klog/v2 v2.80.1 // indirect k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect - k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect lukechampine.com/uint128 v1.3.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect modernc.org/ccgo/v3 v3.16.13 // indirect