Skip to content

Commit

Permalink
Fixed flaky cert test for ASB Queue binding
Browse files Browse the repository at this point in the history
Also added prefix to all sent messages so tests don't fail if there are multiple concurrent runners (hopefully)

Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle committed Aug 3, 2023
1 parent 7fcfd9a commit 4a84a01
Showing 1 changed file with 91 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ limitations under the License.
package servicebusqueue_test

import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
"testing"
"time"

Expand Down Expand Up @@ -49,6 +53,15 @@ const (
numMessages = 100
)

var testprefix string

func init() {
// Generate a random test prefix
rnd := make([]byte, 7)
io.ReadFull(rand.Reader, rnd)
testprefix = base64.RawURLEncoding.EncodeToString(rnd)
}

func TestServiceBusQueue(t *testing.T) {
messagesFor1 := watcher.NewOrdered()
messagesFor2 := watcher.NewOrdered()
Expand All @@ -67,11 +80,11 @@ func TestServiceBusQueue(t *testing.T) {
msgsFor1 := make([]string, numMessages/2)
msgsFor2 := make([]string, numMessages/2)
for i := 0; i < numMessages/2; i++ {
msgsFor1[i] = fmt.Sprintf("sb-binding-1: Message %03d", i)
msgsFor1[i] = fmt.Sprintf("%s: sb-binding-1: Message %03d", testprefix, i)
}

for i := numMessages / 2; i < numMessages; i++ {
msgsFor2[i-(numMessages/2)] = fmt.Sprintf("sb-binding-2: Message %03d", i)
msgsFor2[i-(numMessages/2)] = fmt.Sprintf("%s: sb-binding-2: Message %03d", testprefix, i)
}

messagesFor1.ExpectStrings(msgsFor1...)
Expand Down Expand Up @@ -108,11 +121,19 @@ func TestServiceBusQueue(t *testing.T) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

messagesFor1.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("sb-binding-2", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

messagesFor2.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
Expand All @@ -128,7 +149,7 @@ func TestServiceBusQueue(t *testing.T) {
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/standard"),
embedded.WithResourcesPath("./components/standard"),
componentRuntimeOptions(),
)).
// Block the standard AMPQ ports.
Expand All @@ -151,23 +172,38 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {

ctx.Logf("Sending messages for expiration.")
for i := 0; i < numMessages; i++ {
msg := fmt.Sprintf("Expiring message %d", i)
msg := fmt.Sprintf("%s: Expiring message %d", testprefix, i)

metadata := make(map[string]string)

// Send to the queue with TTL.
queueTTLReq := &daprClient.InvokeBindingRequest{Name: "queuettl", Operation: "create", Data: []byte(msg), Metadata: metadata}
queueTTLReq := &daprClient.InvokeBindingRequest{
Name: "queuettl",
Operation: "create",
Data: []byte(msg),
Metadata: metadata,
}
err := client.InvokeOutputBinding(ctx, queueTTLReq)
require.NoError(ctx, err, "error publishing message")

// Send message with TTL.
messageTTLReq := &daprClient.InvokeBindingRequest{Name: "messagettl", Operation: "create", Data: []byte(msg), Metadata: metadata}
messageTTLReq := &daprClient.InvokeBindingRequest{
Name: "messagettl",
Operation: "create",
Data: []byte(msg),
Metadata: metadata,
}
messageTTLReq.Metadata["ttlInSeconds"] = "10"
err = client.InvokeOutputBinding(ctx, messageTTLReq)
require.NoError(ctx, err, "error publishing message")

// Send message with TTL to ensure it overwrites Queue TTL.
mixedTTLReq := &daprClient.InvokeBindingRequest{Name: "mixedttl", Operation: "create", Data: []byte(msg), Metadata: metadata}
mixedTTLReq := &daprClient.InvokeBindingRequest{
Name: "mixedttl",
Operation: "create",
Data: []byte(msg),
Metadata: metadata,
}
mixedTTLReq.Metadata["ttlInSeconds"] = "10"
err = client.InvokeOutputBinding(ctx, mixedTTLReq)
require.NoError(ctx, err, "error publishing message")
Expand All @@ -182,16 +218,28 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("queuettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("messagettl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("mixedttl", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

ctx.Logf("Oh no! Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
Expand All @@ -207,7 +255,7 @@ func TestAzureServiceBusQueuesTTLs(t *testing.T) {
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/ttl"),
embedded.WithResourcesPath("./components/ttl"),
componentRuntimeOptions(),
)).
Step("send ttl messages", sendTTLMessages).
Expand Down Expand Up @@ -242,7 +290,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
// that will satisfy the test.
msgs := make([]string, numMessages/2)
for i := 0; i < numMessages/2; i++ {
msgs[i] = fmt.Sprintf("Message %03d", i)
msgs[i] = fmt.Sprintf("%s: Message %03d", testprefix, i)
}

messages.ExpectStrings(msgs...)
Expand All @@ -252,7 +300,11 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)

req := &daprClient.InvokeBindingRequest{Name: "retry-binding", Operation: "create", Data: []byte(msg)}
req := &daprClient.InvokeBindingRequest{
Name: "retry-binding",
Operation: "create",
Data: []byte(msg),
}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
Expand All @@ -271,6 +323,10 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
// Setup the input binding endpoint
err = multierr.Combine(err,
s.AddBindingInvocationHandler("retry-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

if err := sim(); err != nil {
ctx.Logf("Failing message: %s", string(in.Data))
return nil, err
Expand All @@ -291,7 +347,7 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/retry"),
embedded.WithResourcesPath("./components/retry"),
componentRuntimeOptions(),
)).
Step("send and wait", test).
Expand All @@ -312,10 +368,17 @@ func TestServiceBusQueueMetadata(t *testing.T) {

// Send events that the application above will observe.
ctx.Log("Invoking binding!")
req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"Testmetadata": "Some Metadata"}}
req := &daprClient.InvokeBindingRequest{
Name: "sb-binding-1",
Operation: "create",
Data: []byte(testprefix + ": test msg"),
Metadata: map[string]string{"Testmetadata": "Some Metadata"},
}
err = client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")

messages.ExpectStrings(string(req.Data))

// Do the messages we observed match what we expect?
messages.Assert(ctx, time.Minute)

Expand All @@ -327,11 +390,15 @@ func TestServiceBusQueueMetadata(t *testing.T) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if !bytes.HasPrefix(in.Data, []byte(testprefix)) {
return []byte("{}"), nil
}

messages.Observe(string(in.Data))
ctx.Logf("Got message: %s - %+v", string(in.Data), in.Metadata)
require.NotEmpty(t, in.Metadata)
require.Contains(t, in.Metadata, "Testmetadata")
require.Equal(t, "Some Metadata", in.Metadata["Testmetadata"])
ctx.Logf("Got message: %s - %#v", string(in.Data), in.Metadata)
require.NotEmptyf(t, in.Metadata, "Data: %s - Metadata: %#v", in.Data, in.Metadata)
require.Containsf(t, in.Metadata, "Testmetadata", "Data: %s - Metadata: %#v", in.Data, in.Metadata)
require.Equalf(t, "Some+Metadata", in.Metadata["Testmetadata"], "Data: %s - Metadata: %#v", in.Data, in.Metadata) // + because the message is encoded for HTTP headers

return []byte("{}"), nil
}))
Expand All @@ -346,7 +413,7 @@ func TestServiceBusQueueMetadata(t *testing.T) {
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/standard"),
embedded.WithResourcesPath("./components/standard"),
componentRuntimeOptions(),
)).
Step("send and wait", test).
Expand All @@ -364,7 +431,12 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) {

// Send events that the application above will observe.
ctx.Log("Invoking binding!")
req := &daprClient.InvokeBindingRequest{Name: "mgmt-binding", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"TestMetadata": "Some Metadata"}}
req := &daprClient.InvokeBindingRequest{
Name: "mgmt-binding",
Operation: "create",
Data: []byte(testprefix + ": test msg"),
Metadata: map[string]string{"TestMetadata": "Some Metadata"},
}
err = client.InvokeOutputBinding(ctx, req)
require.Error(ctx, err, "error publishing message")
return nil
Expand All @@ -376,7 +448,7 @@ func TestServiceBusQueueDisableEntityManagement(t *testing.T) {
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/disable_entity_mgmt"),
embedded.WithResourcesPath("./components/disable_entity_mgmt"),
componentRuntimeOptions(),
)).
Step("send and wait", testWithExpectedFailure).
Expand Down

0 comments on commit 4a84a01

Please sign in to comment.