From 58625c0a3144ed66213ea536d8cc3eb824f06512 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Thu, 17 Oct 2024 10:16:04 -0400 Subject: [PATCH] Functional tests. --- functional_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/functional_test.go b/functional_test.go index bd6df9f..3e52a6c 100644 --- a/functional_test.go +++ b/functional_test.go @@ -2096,6 +2096,73 @@ func TestGlobalBehavior(t *testing.T) { }) } +func TestEventChannel(t *testing.T) { + eventChannel := make(chan guber.HitEvent) + defer close(eventChannel) + var counter int + hits := make(map[string]int64) + var mu sync.Mutex + sem := make(chan struct{}) + go func() { + for e := range eventChannel { + mu.Lock() + counter++ + key := e.Request.Name + "|" + e.Request.UniqueKey + hits[key] += e.Request.Hits + mu.Unlock() + sem <- struct{}{} + } + }() + + // Spawn specialized Gubernator cluster with EventChannel enabled. + peers := []guber.PeerInfo{ + {GRPCAddress: "127.0.0.1:10000", HTTPAddress: "127.0.0.1:10001", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:10002", HTTPAddress: "127.0.0.1:10003", DataCenter: cluster.DataCenterNone}, + {GRPCAddress: "127.0.0.1:10004", HTTPAddress: "127.0.0.1:10005", DataCenter: cluster.DataCenterNone}, + } + err := cluster.StartWith(peers, cluster.WithEventChannel(eventChannel)) + require.NoError(t, err) + defer cluster.Stop() + + client, err := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) + require.NoError(t, err) + sendHit := func(key string, behavior guber.Behavior) { + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) + defer cancel() + _, err = client.GetRateLimits(ctx, &guber.GetRateLimitsReq{ + Requests: []*guber.RateLimitReq{ + { + Name: "test", + UniqueKey: key, + Algorithm: guber.Algorithm_TOKEN_BUCKET, + Behavior: behavior, + Duration: guber.Minute * 3, + Hits: 2, + Limit: 1000, + }, + }, + }) + require.NoError(t, err) + select { + case <-sem: + case <-time.After(3 * time.Second): + t.Fatal("Timeout waiting for EventChannel handler") + } + } + + // Send hits using all peering behaviors. + const iterations = 3 + sendHit("foobar0", guber.Behavior_BATCHING) + sendHit("foobar1", guber.Behavior_NO_BATCHING) + sendHit("foobar2", guber.Behavior_GLOBAL) + + assert.Equal(t, iterations, counter) + for i := 0; i < iterations; i++ { + expectedKey := fmt.Sprintf("test|foobar%d", i) + assert.Equal(t, int64(2), hits[expectedKey], "i=%d", i) + } +} + // Request metrics and parse into map. // Optionally pass names to filter metrics by name. func getMetrics(HTTPAddr string, names ...string) (map[string]*model.Sample, error) {