From af5bacebf4877e0c10ba860737ce8a7ccb0ee02c Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Tue, 1 Aug 2023 19:38:33 +0100 Subject: [PATCH] Adds OIDC authentication support for pubsub Apache Pulsar (#3023) Signed-off-by: joshvanl Co-authored-by: Yaron Schneider --- .github/workflows/certification.yml | 2 +- .../authentication/oidc/clientcredentials.go | 213 +++++++++++ .../oidc/clientcredentials_test.go | 266 ++++++++++++++ pubsub/pulsar/metadata.go | 10 +- pubsub/pulsar/pulsar.go | 59 +++- .../{ => auth-none}/consumer_five/pulsar.yaml | 0 .../{ => auth-none}/consumer_four/pulsar.yaml | 0 .../{ => auth-none}/consumer_one/pulsar.yml | 0 .../{ => auth-none}/consumer_six/pulsar.yaml | 0 .../{ => auth-none}/consumer_three/pulsar.yml | 0 .../{ => auth-none}/consumer_two/pulsar.yml | 0 .../auth-oidc/consumer_five/pulsar.yml.tmpl | 34 ++ .../auth-oidc/consumer_four/pulsar.yml.tmpl | 30 ++ .../auth-oidc/consumer_one/pulsar.yml.tmpl | 28 ++ .../auth-oidc/consumer_six/pulsar.yaml.tmpl | 34 ++ .../auth-oidc/consumer_three/pulsar.yml.tmpl | 28 ++ .../auth-oidc/consumer_two/pulsar.yml.tmpl | 28 ++ .../pubsub/pulsar/config/.gitignore | 1 + .../docker-compose_auth-mock-oidc-server.yaml | 14 + .../docker-compose_auth-none.yaml} | 2 +- .../config/docker-compose_auth-oidc.yaml.tmpl | 96 +++++ .../pulsar/config/pulsar_auth-oidc.conf | 39 ++ .../pubsub/pulsar/pulsar_test.go | 334 ++++++++++++++---- 23 files changed, 1132 insertions(+), 86 deletions(-) create mode 100644 internal/authentication/oidc/clientcredentials.go create mode 100644 internal/authentication/oidc/clientcredentials_test.go rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_five/pulsar.yaml (100%) rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_four/pulsar.yaml (100%) rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_one/pulsar.yml (100%) rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_six/pulsar.yaml (100%) rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_three/pulsar.yml (100%) rename tests/certification/pubsub/pulsar/components/{ => auth-none}/consumer_two/pulsar.yml (100%) create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_five/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_four/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_one/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_six/pulsar.yaml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_three/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/components/auth-oidc/consumer_two/pulsar.yml.tmpl create mode 100644 tests/certification/pubsub/pulsar/config/.gitignore create mode 100644 tests/certification/pubsub/pulsar/config/docker-compose_auth-mock-oidc-server.yaml rename tests/certification/pubsub/pulsar/{docker-compose.yml => config/docker-compose_auth-none.yaml} (97%) create mode 100644 tests/certification/pubsub/pulsar/config/docker-compose_auth-oidc.yaml.tmpl create mode 100644 tests/certification/pubsub/pulsar/config/pulsar_auth-oidc.conf diff --git a/.github/workflows/certification.yml b/.github/workflows/certification.yml index dfa2fe39ba..5482b986a1 100644 --- a/.github/workflows/certification.yml +++ b/.github/workflows/certification.yml @@ -258,7 +258,7 @@ jobs: set +e gotestsum --jsonfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.json \ --junitfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.xml --format standard-quiet -- \ - -coverprofile=cover.out -covermode=set -tags=certtests -coverpkg=${{ matrix.source-pkg }} + -coverprofile=cover.out -covermode=set -tags=certtests -timeout=30m -coverpkg=${{ matrix.source-pkg }} status=$? echo "Completed certification tests for ${{ matrix.component }} ... " if test $status -ne 0; then diff --git a/internal/authentication/oidc/clientcredentials.go b/internal/authentication/oidc/clientcredentials.go new file mode 100644 index 0000000000..902f78b561 --- /dev/null +++ b/internal/authentication/oidc/clientcredentials.go @@ -0,0 +1,213 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package oidc + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" + + "golang.org/x/oauth2" + ccreds "golang.org/x/oauth2/clientcredentials" + "k8s.io/utils/clock" + + "github.com/dapr/kit/logger" +) + +const ( + oidcScopeOpenID = "openid" +) + +type ClientCredentialsOptions struct { + Logger logger.Logger + TokenURL string + ClientID string + ClientSecret string + Scopes []string + Audiences []string + CAPEM []byte +} + +// ClientCredentials is an OAuth2 Token Source that uses the client_credentials +// grant type to fetch a token. +type ClientCredentials struct { + log logger.Logger + currentToken *oauth2.Token + httpClient *http.Client + fetchTokenFn func(context.Context) (*oauth2.Token, error) + + lock sync.RWMutex + wg sync.WaitGroup + closeCh chan struct{} + closed atomic.Bool + clock clock.Clock +} + +func NewClientCredentials(ctx context.Context, opts ClientCredentialsOptions) (*ClientCredentials, error) { + conf, httpClient, err := toConfig(opts) + if err != nil { + return nil, err + } + + token, err := conf.Token(context.WithValue(ctx, oauth2.HTTPClient, httpClient)) + if err != nil { + return nil, fmt.Errorf("error fetching initial oidc client_credentials token: %w", err) + } + + opts.Logger.Info("Fetched initial oidc client_credentials token") + + return &ClientCredentials{ + log: opts.Logger, + currentToken: token, + httpClient: httpClient, + closeCh: make(chan struct{}), + clock: clock.RealClock{}, + fetchTokenFn: conf.Token, + }, nil +} + +func (c *ClientCredentials) Run(ctx context.Context) { + c.log.Info("Running oidc client_credentials token renewer") + renewDuration := c.tokenRenewDuration() + + c.wg.Add(1) + go func() { + defer func() { + c.log.Info("Stopped oidc client_credentials token renewer") + c.wg.Done() + }() + + for { + select { + case <-c.closeCh: + return + case <-ctx.Done(): + return + case <-c.clock.After(renewDuration): + } + + c.log.Debug("Renewing client credentials token") + + token, err := c.fetchTokenFn(context.WithValue(ctx, oauth2.HTTPClient, c.httpClient)) + if err != nil { + c.log.Errorf("Error fetching renewed oidc client_credentials token, retrying in 30 seconds: %s", err) + renewDuration = time.Second * 30 + continue + } + + c.lock.Lock() + c.currentToken = token + c.lock.Unlock() + renewDuration = c.tokenRenewDuration() + } + }() +} + +func toConfig(opts ClientCredentialsOptions) (*ccreds.Config, *http.Client, error) { + scopes := opts.Scopes + if len(scopes) == 0 { + // If no scopes are provided, then the default is to use the 'openid' scope + // since that is always required for OIDC so implicitly add it. + scopes = []string{oidcScopeOpenID} + } + + var oidcScopeFound bool + for _, scope := range scopes { + if scope == oidcScopeOpenID { + oidcScopeFound = true + break + } + } + if !oidcScopeFound { + return nil, nil, fmt.Errorf("oidc client_credentials token source requires the %q scope", oidcScopeOpenID) + } + + tokenURL, err := url.Parse(opts.TokenURL) + if err != nil { + return nil, nil, fmt.Errorf("error parsing token URL: %w", err) + } + if tokenURL.Scheme != "https" { + return nil, nil, fmt.Errorf("OIDC token provider URL requires 'https' scheme: %q", tokenURL) + } + + conf := &ccreds.Config{ + ClientID: opts.ClientID, + ClientSecret: opts.ClientSecret, + TokenURL: opts.TokenURL, + Scopes: scopes, + } + + if len(opts.Audiences) == 0 { + return nil, nil, errors.New("oidc client_credentials token source requires at least one audience") + } + + conf.EndpointParams = url.Values{"audience": opts.Audiences} + + // If caPool is nil, then the Go TLS library will use the system's root CA. + var caPool *x509.CertPool + if len(opts.CAPEM) > 0 { + caPool = x509.NewCertPool() + if !caPool.AppendCertsFromPEM(opts.CAPEM) { + return nil, nil, errors.New("failed to parse CA PEM") + } + } + + return conf, &http.Client{ + Timeout: time.Second * 30, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: caPool, + }, + }, + }, nil +} + +func (c *ClientCredentials) Close() { + defer c.wg.Wait() + + if c.closed.CompareAndSwap(false, true) { + close(c.closeCh) + } +} + +func (c *ClientCredentials) Token() (string, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.closed.Load() { + return "", errors.New("client_credentials token source is closed") + } + + if !c.currentToken.Valid() { + return "", errors.New("client_credentials token source is invalid") + } + + return c.currentToken.AccessToken, nil +} + +// tokenRenewTime returns the duration when the token should be renewed, which is +// half of the token's lifetime. +func (c *ClientCredentials) tokenRenewDuration() time.Duration { + c.lock.RLock() + defer c.lock.RUnlock() + return c.currentToken.Expiry.Sub(c.clock.Now()) / 2 +} diff --git a/internal/authentication/oidc/clientcredentials_test.go b/internal/authentication/oidc/clientcredentials_test.go new file mode 100644 index 0000000000..8a090037ac --- /dev/null +++ b/internal/authentication/oidc/clientcredentials_test.go @@ -0,0 +1,266 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package oidc + +import ( + "context" + "errors" + "net/url" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/oauth2" + ccreds "golang.org/x/oauth2/clientcredentials" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" + + "github.com/dapr/kit/logger" +) + +func TestRun(t *testing.T) { + var lock sync.Mutex + clock := clocktesting.NewFakeClock(time.Now()) + called := make(chan struct{}, 1) + var retErr error = nil + + fetchTokenFn := func(context.Context) (*oauth2.Token, error) { + lock.Lock() + defer lock.Unlock() + called <- struct{}{} + return &oauth2.Token{ + Expiry: clock.Now().Add(time.Minute), + }, retErr + } + + t.Run("should return when context is cancelled", func(t *testing.T) { + c := &ClientCredentials{ + log: logger.NewLogger("test"), + clock: clock, + fetchTokenFn: fetchTokenFn, + closeCh: make(chan struct{}), + currentToken: &oauth2.Token{ + Expiry: clock.Now(), + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + + c.Run(ctx) + cancel() + + select { + case <-called: + t.Fatal("should not have called fetchTokenFn") + default: + } + }) + + t.Run("should return when closed", func(t *testing.T) { + c := &ClientCredentials{ + log: logger.NewLogger("test"), + clock: clock, + fetchTokenFn: fetchTokenFn, + closeCh: make(chan struct{}), + currentToken: &oauth2.Token{ + Expiry: clock.Now(), + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c.Run(ctx) + + // Should be able to close multiple times. + c.Close() + c.Close() + + select { + case <-called: + t.Fatal("should not have called fetchTokenFn") + case <-c.closeCh: + case <-time.After(time.Second * 5): + t.Fatal("should have closed run") + } + }) + + t.Run("should renew token when ready for renewal", func(t *testing.T) { + c := &ClientCredentials{ + log: logger.NewLogger("test"), + clock: clock, + fetchTokenFn: fetchTokenFn, + closeCh: make(chan struct{}), + currentToken: &oauth2.Token{Expiry: clock.Now().Add(time.Minute * 2)}, + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + c.Run(ctx) + + assert.Eventually(t, clock.HasWaiters, time.Second*5, time.Millisecond*10) + clock.Step(time.Minute * 1) + + select { + case <-called: + case <-time.After(time.Second * 5): + t.Fatal("should have called") + } + }) + + t.Run("should call renew again after 30 seconds when it fails", func(t *testing.T) { + c := &ClientCredentials{ + log: logger.NewLogger("test"), + clock: clock, + fetchTokenFn: fetchTokenFn, + closeCh: make(chan struct{}), + currentToken: &oauth2.Token{ + Expiry: clock.Now(), + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c.Run(ctx) + + assert.Eventually(t, clock.HasWaiters, time.Second*5, time.Millisecond*10) + clock.Step(time.Minute * 1) + + select { + case <-called: + case <-time.After(time.Second * 5): + t.Fatal("should have called") + } + + lock.Lock() + retErr = errors.New("test error") + lock.Unlock() + + assert.Eventually(t, clock.HasWaiters, time.Second*5, time.Millisecond*10) + clock.Step(time.Minute * 1) + + select { + case <-called: + case <-time.After(time.Second * 5): + t.Fatal("should have called") + } + + assert.Eventually(t, clock.HasWaiters, time.Second*5, time.Millisecond*10) + clock.Step(time.Second * 30) + + select { + case <-called: + case <-time.After(time.Second * 5): + t.Fatal("should have called") + } + + c.Close() + + select { + case <-c.closeCh: + case <-time.After(time.Second * 5): + t.Fatal("should have closed run") + } + }) +} + +func Test_tokenRenewDuration(t *testing.T) { + c := &ClientCredentials{ + clock: clock.RealClock{}, + currentToken: &oauth2.Token{ + Expiry: time.Now(), + }, + } + assert.InDelta(t, c.tokenRenewDuration(), time.Duration(0), float64(time.Second*5)) + + c = &ClientCredentials{ + clock: clock.RealClock{}, + currentToken: &oauth2.Token{ + Expiry: time.Now().Add(time.Hour), + }, + } + assert.InDelta(t, c.tokenRenewDuration(), time.Minute*30, float64(time.Second*5)) +} + +func Test_toConfig(t *testing.T) { + tests := map[string]struct { + opts ClientCredentialsOptions + expConfig *ccreds.Config + expErr bool + }{ + "openid not in scopes should error": { + opts: ClientCredentialsOptions{ + TokenURL: "https://localhost:8080", + ClientID: "client-id", + ClientSecret: "client-secret", + Scopes: []string{"profile"}, + Audiences: []string{"audience"}, + }, + expErr: true, + }, + "non-https endpoint should error": { + opts: ClientCredentialsOptions{ + TokenURL: "http://localhost:8080", + ClientID: "client-id", + ClientSecret: "client-secret", + Audiences: []string{"audience"}, + }, + expErr: true, + }, + "bad CA certificate should error": { + opts: ClientCredentialsOptions{ + TokenURL: "https://localhost:8080", + ClientID: "client-id", + ClientSecret: "client-secret", + Audiences: []string{"audience"}, + CAPEM: []byte("ca-pem"), + }, + expErr: true, + }, + "no audiences should error": { + opts: ClientCredentialsOptions{ + TokenURL: "https://localhost:8080", + ClientID: "client-id", + ClientSecret: "client-secret", + }, + expErr: true, + }, + "should default scope": { + opts: ClientCredentialsOptions{ + TokenURL: "https://localhost:8080", + ClientID: "client-id", + ClientSecret: "client-secret", + Audiences: []string{"audience"}, + }, + expConfig: &ccreds.Config{ + ClientID: "client-id", + ClientSecret: "client-secret", + TokenURL: "https://localhost:8080", + Scopes: []string{"openid"}, + EndpointParams: url.Values{"audience": []string{"audience"}}, + }, + expErr: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + config, _, err := toConfig(test.opts) + assert.Equalf(t, test.expErr, err != nil, "%v", err) + assert.Equal(t, test.expConfig, config) + }) + } +} diff --git a/pubsub/pulsar/metadata.go b/pubsub/pulsar/metadata.go index 5bddedb6d6..aae2e20bb2 100644 --- a/pubsub/pulsar/metadata.go +++ b/pubsub/pulsar/metadata.go @@ -26,12 +26,20 @@ type pulsarMetadata struct { Tenant string `mapstructure:"tenant"` Namespace string `mapstructure:"namespace"` Persistent bool `mapstructure:"persistent"` - Token string `mapstructure:"token"` RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"` internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"` PublicKey string `mapstructure:"publicKey"` PrivateKey string `mapstructure:"privateKey"` Keys string `mapstructure:"keys"` + + AuthType string `mapstructure:"authType"` + Token string `mapstructure:"token"` + OIDCTokenCAPEM string `mapstructure:"oidcTokenCAPEM"` + OIDCTokenURL string `mapstructure:"oidcTokenURL"` + OIDCClientID string `mapstructure:"oidcClientID"` + OIDCClientSecret string `mapstructure:"oidcClientSecret"` + OIDCAudiences []string `mapstructure:"oidcAudiences"` + OIDCScopes []string `mapstructure:"oidcScopes"` } type schemaMetadata struct { diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 8846f2dc50..5df4216912 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -25,12 +25,12 @@ import ( "sync/atomic" "time" - "github.com/hamba/avro/v2" - "github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/hamba/avro/v2" lru "github.com/hashicorp/golang-lru/v2" + "github.com/dapr/components-contrib/internal/authentication/oidc" "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -55,6 +55,10 @@ const ( protoProtocol = "proto" partitionKey = "partitionKey" + authTypeNone = "none" + authTypeToken = "token" + authTypeOIDC = "oidc" + defaultTenant = "public" defaultNamespace = "default" cachedNumProducer = 10 @@ -94,13 +98,14 @@ const ( type ProcessMode string type Pulsar struct { - logger logger.Logger - client pulsar.Client - metadata pulsarMetadata - cache *lru.Cache[string, pulsar.Producer] - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup + logger logger.Logger + client pulsar.Client + metadata pulsarMetadata + oidcProvider *oidc.ClientCredentials + cache *lru.Cache[string, pulsar.Producer] + closed atomic.Bool + closeCh chan struct{} + wg sync.WaitGroup } func NewPulsar(l logger.Logger) pubsub.PubSub { @@ -157,7 +162,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { return &m, nil } -func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error { +func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error { m, err := parsePulsarMetadata(metadata) if err != nil { return err @@ -173,9 +178,37 @@ func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error { ConnectionTimeout: 30 * time.Second, TLSAllowInsecureConnection: !m.EnableTLS, } - if m.Token != "" { + + switch m.AuthType { + case "": + // To ensure backward compatibility, if authType is not set but the token + // is we fallthrough to token auth. + if m.Token == "" { + break + } + fallthrough + case authTypeToken: options.Authentication = pulsar.NewAuthenticationToken(m.Token) + case authTypeOIDC: + var cc *oidc.ClientCredentials + cc, err = oidc.NewClientCredentials(ctx, oidc.ClientCredentialsOptions{ + Logger: p.logger, + TokenURL: m.OIDCTokenURL, + CAPEM: []byte(m.OIDCTokenCAPEM), + ClientID: m.OIDCClientID, + ClientSecret: m.OIDCClientSecret, + Scopes: m.OIDCScopes, + Audiences: m.OIDCAudiences, + }) + if err != nil { + return fmt.Errorf("could not instantiate oidc token provider: %v", err) + } + + options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token) + p.oidcProvider = cc + p.oidcProvider.Run(ctx) } + client, err := pulsar.NewClient(options) if err != nil { return fmt.Errorf("could not instantiate pulsar client: %v", err) @@ -490,6 +523,10 @@ func (p *Pulsar) Close() error { } p.client.Close() + if p.oidcProvider != nil { + p.oidcProvider.Close() + } + return nil } diff --git a/tests/certification/pubsub/pulsar/components/consumer_five/pulsar.yaml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_five/pulsar.yaml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_five/pulsar.yaml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_five/pulsar.yaml diff --git a/tests/certification/pubsub/pulsar/components/consumer_four/pulsar.yaml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_four/pulsar.yaml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_four/pulsar.yaml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_four/pulsar.yaml diff --git a/tests/certification/pubsub/pulsar/components/consumer_one/pulsar.yml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_one/pulsar.yml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_one/pulsar.yml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_one/pulsar.yml diff --git a/tests/certification/pubsub/pulsar/components/consumer_six/pulsar.yaml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_six/pulsar.yaml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_six/pulsar.yaml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_six/pulsar.yaml diff --git a/tests/certification/pubsub/pulsar/components/consumer_three/pulsar.yml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_three/pulsar.yml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_three/pulsar.yml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_three/pulsar.yml diff --git a/tests/certification/pubsub/pulsar/components/consumer_two/pulsar.yml b/tests/certification/pubsub/pulsar/components/auth-none/consumer_two/pulsar.yml similarity index 100% rename from tests/certification/pubsub/pulsar/components/consumer_two/pulsar.yml rename to tests/certification/pubsub/pulsar/components/auth-none/consumer_two/pulsar.yml diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_five/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_five/pulsar.yml.tmpl new file mode 100644 index 0000000000..1b9189d6ce --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_five/pulsar.yml.tmpl @@ -0,0 +1,34 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification5 + - name: redeliveryDelay + value: 200ms + - name: publicKey + value: public.key + - name: privateKey + value: private.key + - name: keys + value: myapp.key + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_four/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_four/pulsar.yml.tmpl new file mode 100644 index 0000000000..e6caab2fb3 --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_four/pulsar.yml.tmpl @@ -0,0 +1,30 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification4 + - name: redeliveryDelay + value: 200ms + - name: certification-pubsub-topic-active.jsonschema + value: "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\",\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}" + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_one/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_one/pulsar.yml.tmpl new file mode 100644 index 0000000000..40e3ac6f33 --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_one/pulsar.yml.tmpl @@ -0,0 +1,28 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification1 + - name: redeliveryDelay + value: 200ms + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_six/pulsar.yaml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_six/pulsar.yaml.tmpl new file mode 100644 index 0000000000..67f4ff56d1 --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_six/pulsar.yaml.tmpl @@ -0,0 +1,34 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification5 + - name: redeliveryDelay + value: 200ms + - name: publicKey + value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n" + - name: privateKey + value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n" + - name: keys + value: myapp.key + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_three/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_three/pulsar.yml.tmpl new file mode 100644 index 0000000000..d1dc3e1095 --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_three/pulsar.yml.tmpl @@ -0,0 +1,28 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification3 + - name: redeliveryDelay + value: 200ms + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_two/pulsar.yml.tmpl b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_two/pulsar.yml.tmpl new file mode 100644 index 0000000000..2b4834f17c --- /dev/null +++ b/tests/certification/pubsub/pulsar/components/auth-oidc/consumer_two/pulsar.yml.tmpl @@ -0,0 +1,28 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messagebus +spec: + type: pubsub.pulsar + version: v1 + metadata: + - name: host + value: "localhost:6650" + - name: consumerID + value: certification2 + - name: redeliveryDelay + value: 200ms + - name: authType + value: oidc + - name: oidcTokenURL + value: https://localhost:8085/issuer1/token + - name: oidcClientID + value: foo + - name: oidcClientSecret + value: bar + - name: oidcScopes + value: openid + - name: oidcAudiences + value: pulsar + - name: oidcTokenCAPEM + value: "{{ .OIDCCAPEM }}" diff --git a/tests/certification/pubsub/pulsar/config/.gitignore b/tests/certification/pubsub/pulsar/config/.gitignore new file mode 100644 index 0000000000..3af0ccb687 --- /dev/null +++ b/tests/certification/pubsub/pulsar/config/.gitignore @@ -0,0 +1 @@ +/data diff --git a/tests/certification/pubsub/pulsar/config/docker-compose_auth-mock-oidc-server.yaml b/tests/certification/pubsub/pulsar/config/docker-compose_auth-mock-oidc-server.yaml new file mode 100644 index 0000000000..220011d949 --- /dev/null +++ b/tests/certification/pubsub/pulsar/config/docker-compose_auth-mock-oidc-server.yaml @@ -0,0 +1,14 @@ +# We run in network_mode: "host" so `localhost` is the same for both the host +# and containers. This is required as the mock server uses the SNI hostname to +# build the issuer URL. +version: '3' +services: + mock-oauth2-server: + image: ghcr.io/navikt/mock-oauth2-server:1.0.0 + container_name: mock-oauth2-server + restart: on-failure + network_mode: "host" + environment: + - PORT=8085 + - LOG_LEVEL=DEBUG + - 'JSON_CONFIG={"interactiveLogin":false,"httpServer":{"type":"NettyWrapper","ssl":{}},"tokenCallbacks":[{"issuerId":"issuer1","tokenExpiry":120,"requestMappings":[{"requestParam":"scope","match":"openid","claims":{"sub":"foo","aud":["pulsar"]}}]}]}' diff --git a/tests/certification/pubsub/pulsar/docker-compose.yml b/tests/certification/pubsub/pulsar/config/docker-compose_auth-none.yaml similarity index 97% rename from tests/certification/pubsub/pulsar/docker-compose.yml rename to tests/certification/pubsub/pulsar/config/docker-compose_auth-none.yaml index 5ae0535596..7857b77961 100644 --- a/tests/certification/pubsub/pulsar/docker-compose.yml +++ b/tests/certification/pubsub/pulsar/config/docker-compose_auth-none.yaml @@ -16,4 +16,4 @@ services: - pulsarconf:/pulsar/conf volumes: pulsardata: - pulsarconf: \ No newline at end of file + pulsarconf: diff --git a/tests/certification/pubsub/pulsar/config/docker-compose_auth-oidc.yaml.tmpl b/tests/certification/pubsub/pulsar/config/docker-compose_auth-oidc.yaml.tmpl new file mode 100644 index 0000000000..256ba89677 --- /dev/null +++ b/tests/certification/pubsub/pulsar/config/docker-compose_auth-oidc.yaml.tmpl @@ -0,0 +1,96 @@ +# We run the pulsar services individually as OIDC doesn't seem to work in +# standalone mode. OIDC is also only available from pulsar v3 onwards. We use +# host networking as the mock OAuth server uses the SNI host name to determine +# the host name of the OIDC issuer URL, so we need to have the mock server +# reachable by localhost from both the pulsar services and the host network. +version: '3' +services: + # Start zookeeper + zookeeper: + image: apachepulsar/pulsar:3.0.0 + container_name: zookeeper + restart: on-failure + network_mode: "host" + environment: + - metadataStoreUrl=zk:localhost:2181 + - metricsProvider.httpPort=7000 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m + command: > + bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ + bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ + exec bin/pulsar zookeeper" + healthcheck: + test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"] + interval: 1s + timeout: 5s + retries: 300 + + # Init cluster metadata + pulsar-init: + container_name: pulsar-init + image: apachepulsar/pulsar:3.0.0 + network_mode: "host" + env_file: + - ./pulsar_auth-oidc.conf + command: > + bin/pulsar initialize-cluster-metadata \ + --cluster cluster-a \ + --zookeeper localhost:2181 \ + --configuration-store localhost:2181 \ + --web-service-url http://localhost:8080 \ + --broker-service-url pulsar://localhost:6650 + depends_on: + zookeeper: + condition: service_healthy + + # Start bookie + bookie: + image: apachepulsar/pulsar:3.0.0 + container_name: bookie + restart: on-failure + network_mode: "host" + environment: + - clusterName=cluster-a + - zkServers=localhost:2181 + - metadataServiceUri=metadata-store:zk:localhost:2181 + # otherwise every time we run docker compose uo or down we fail to start due to Cookie + # See: https://github.com/apache/bookkeeper/blob/405e72acf42bb1104296447ea8840d805094c787/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java#L57-68 + - advertisedAddress=localhost + - BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + env_file: + - ./pulsar_auth-oidc.conf + volumes: + - "{{ .TmpDir }}:/pulsar/conf/dapr" + depends_on: + zookeeper: + condition: service_healthy + pulsar-init: + condition: service_completed_successfully + command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" + + # Start broker + broker: + image: apachepulsar/pulsar:3.0.0 + container_name: broker + restart: on-failure + network_mode: "host" + env_file: + - ./pulsar_auth-oidc.conf + volumes: + - "{{ .TmpDir }}:/pulsar/conf/dapr" + environment: + - metadataStoreUrl=zk:localhost:2181 + - zookeeperServers=localhost:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=localhost + - advertisedListeners=external:pulsar://127.0.0.1:6650 + - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" diff --git a/tests/certification/pubsub/pulsar/config/pulsar_auth-oidc.conf b/tests/certification/pubsub/pulsar/config/pulsar_auth-oidc.conf new file mode 100644 index 0000000000..1bef86ea45 --- /dev/null +++ b/tests/certification/pubsub/pulsar/config/pulsar_auth-oidc.conf @@ -0,0 +1,39 @@ +# Configuration to enable authentication +authenticationEnabled=true +authenticationProviders=org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID + +# Required settings for AuthenticationProviderOpenID +# A comma separated list of allowed, or trusted, token issuers. The token issuer is the URL of the token issuer. +PULSAR_PREFIX_openIDAllowedTokenIssuers=https://localhost:8085/issuer1 +# The list of allowed audiences for the token. The audience is the intended recipient of the token. A token with +# at least one of these audience claims will pass the audience validation check. +PULSAR_PREFIX_openIDAllowedAudiences=pulsar + +# Optional settings (values shown are the defaults) +# The path to the file containing the trusted certificate(s) of the token issuer(s). If not set, uses the default +# trust store of the JVM. Note: in version 3.0.0, the default only applies when this setting is not an environment +# variable and is not in the configuration file. +PULSAR_PREFIX_openIDTokenIssuerTrustCertsFilePath=/pulsar/conf/dapr/ca.pem +# The JWT's claim to use for the role/principal during authorization. +PULSAR_PREFIX_openIDRoleClaim=sub +# The leeway, in seconds, to use when validating the token's expiration time. +PULSAR_PREFIX_openIDAcceptedTimeLeewaySeconds=0 + +# Cache settings +PULSAR_PREFIX_openIDCacheSize=5 +PULSAR_PREFIX_openIDCacheRefreshAfterWriteSeconds=64800 +PULSAR_PREFIX_openIDCacheExpirationSeconds=86400 +PULSAR_PREFIX_openIDHttpConnectionTimeoutMillis=10000 +PULSAR_PREFIX_openIDHttpReadTimeoutMillis=10000 + +# The number of seconds to wait before refreshing the JWKS when a token presents a key ID (kid claim) that is not +# in the cache. This setting is available from Pulsar 3.0.1 and is documented below. +PULSAR_PREFIX_openIDKeyIdCacheMissRefreshSeconds=300 + +# Whether to require that issuers use HTTPS. It is part of the OIDC spec to use HTTPS, so the default is true. +# This setting is for testing purposes and is not recommended for any production environment. +#PULSAR_PREFIX_openIDRequireIssuersUseHttps=false + +# A setting describing how to handle discovery of the OpenID Connect configuration document when the issuer is not +# in the list of allowed issuers. This setting is documented below. +PULSAR_PREFIX_openIDFallbackDiscoveryMode=DISABLED diff --git a/tests/certification/pubsub/pulsar/pulsar_test.go b/tests/certification/pubsub/pulsar/pulsar_test.go index 9c7fd3e405..8d6f1cd80c 100644 --- a/tests/certification/pubsub/pulsar/pulsar_test.go +++ b/tests/certification/pubsub/pulsar/pulsar_test.go @@ -16,17 +16,28 @@ package pulsar_test import ( "bytes" "context" + "crypto/tls" "encoding/json" + "encoding/pem" "fmt" + "io" + "io/fs" "io/ioutil" "net/http" + "os" + "os/exec" + "path/filepath" + "strings" "testing" + "text/template" "time" "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/multierr" + "github.com/dapr/components-contrib/internal/authentication/oidc" pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub" @@ -54,21 +65,23 @@ const ( appID1 = "app-1" appID2 = "app-2" - numMessages = 10 - appPort = 8000 - portOffset = 2 - messageKey = "partitionKey" - pubsubName = "messagebus" - topicActiveName = "certification-pubsub-topic-active" - topicPassiveName = "certification-pubsub-topic-passive" - topicToBeCreated = "certification-topic-per-test-run" - topicDefaultName = "certification-topic-default" - topicMultiPartitionName = "certification-topic-multi-partition8" - partition0 = "partition-0" - partition1 = "partition-1" - clusterName = "pulsarcertification" - dockerComposeYAML = "docker-compose.yml" - pulsarURL = "localhost:6650" + numMessages = 10 + appPort = 8001 + portOffset = 2 + messageKey = "partitionKey" + pubsubName = "messagebus" + topicActiveName = "certification-pubsub-topic-active" + topicPassiveName = "certification-pubsub-topic-passive" + topicToBeCreated = "certification-topic-per-test-run" + topicDefaultName = "certification-topic-default" + topicMultiPartitionName = "certification-topic-multi-partition8" + partition0 = "partition-0" + partition1 = "partition-1" + clusterName = "pulsarcertification" + dockerComposeAuthNoneYAML = "./config/docker-compose_auth-none.yaml" + dockerComposeAuthOIDCYAML = "./config/docker-compose_auth-oidc.yaml.tmpl" + dockerComposeMockOAuthYAML = "./config/docker-compose_auth-mock-oidc-server.yaml" + pulsarURL = "localhost:6650" subscribeTypeKey = "subscribeType" @@ -82,6 +95,103 @@ const ( processModeSync = "sync" ) +type pulsarSuite struct { + suite.Suite + + authType string + oidcCAPEM []byte + dockerComposeYAML string + componentsPath string + services []string +} + +func TestPulsar(t *testing.T) { + t.Run("Auth:None", func(t *testing.T) { + suite.Run(t, &pulsarSuite{ + authType: "none", + dockerComposeYAML: dockerComposeAuthNoneYAML, + componentsPath: "./components/auth-none", + services: []string{"standalone"}, + }) + }) + + t.Run("Auth:OIDC", func(t *testing.T) { + dir := t.TempDir() + require.NoError(t, os.Chmod(dir, 0o777)) + + t.Log("Starting OIDC server...") + out, err := exec.Command( + "docker-compose", + "-p", "oidc", + "-f", dockerComposeMockOAuthYAML, + "up", "-d").CombinedOutput() + require.NoError(t, err, string(out)) + t.Log(string(out)) + + t.Cleanup(func() { + t.Log("Stopping OIDC server...") + out, err = exec.Command( + "docker-compose", + "-p", "oidc", + "-f", dockerComposeMockOAuthYAML, + "down", "-v", + "--remove-orphans").CombinedOutput() + require.NoError(t, err, string(out)) + t.Log(string(out)) + }) + + t.Log("Waiting for OAuth server to be ready...") + oauthCA := peerCertificate(t, "localhost:8085") + t.Log("OAuth server is ready") + + require.NoError(t, os.WriteFile(filepath.Join(dir, "ca.pem"), oauthCA, 0o644)) + outf, err := os.OpenFile("./config/pulsar_auth-oidc.conf", os.O_RDONLY, 0o644) + require.NoError(t, err) + inf, err := os.OpenFile(filepath.Join(dir, "pulsar_auth-oidc.conf"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + require.NoError(t, err) + _, err = io.Copy(inf, outf) + require.NoError(t, err) + outf.Close() + inf.Close() + + td := struct { + TmpDir string + OIDCCAPEM string + }{ + TmpDir: dir, + OIDCCAPEM: strings.ReplaceAll(string(oauthCA), "\n", "\\n"), + } + + tmpl, err := template.New("").ParseFiles(dockerComposeAuthOIDCYAML) + require.NoError(t, err) + f, err := os.OpenFile(filepath.Join(dir, "docker-compose.yaml"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + require.NoError(t, err) + require.NoError(t, tmpl.ExecuteTemplate(f, "docker-compose_auth-oidc.yaml.tmpl", td)) + + require.NoError(t, filepath.Walk("./components/auth-oidc", func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + tmpl, err := template.New("").ParseFiles(path) + require.NoError(t, err) + path = strings.TrimSuffix(path, ".tmpl") + require.NoError(t, os.MkdirAll(filepath.Dir(filepath.Join(dir, path)), 0o755)) + f, err := os.OpenFile(filepath.Join(dir, path), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) + require.NoError(t, err) + require.NoError(t, tmpl.ExecuteTemplate(f, filepath.Base(path)+".tmpl", td)) + return nil + })) + + suite.Run(t, &pulsarSuite{ + oidcCAPEM: oauthCA, + authType: "oidc", + dockerComposeYAML: filepath.Join(dir, "docker-compose.yaml"), + componentsPath: filepath.Join(dir, "components/auth-oidc"), + services: []string{"zookeeper", "pulsar-init", "bookie", "broker"}, + }) + }) +} + func subscriberApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn { return func(ctx flow.Context, s common.Service) error { // Simulate periodic errors. @@ -196,7 +306,8 @@ func assertMessages(timeout time.Duration, messageWatchers ...*watcher.Watcher) } } -func TestPulsar(t *testing.T) { +func (p *pulsarSuite) TestPulsar() { + t := p.T() consumerGroup1 := watcher.NewUnordered() consumerGroup2 := watcher.NewUnordered() @@ -244,10 +355,10 @@ func TestPulsar(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -267,7 +378,7 @@ func TestPulsar(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -280,7 +391,7 @@ func TestPulsar(t *testing.T) { // Run the Dapr sidecar with the component 2. Step(sidecar.Run(sidecarName2, - embedded.WithComponentsPath("./components/consumer_two"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -295,7 +406,8 @@ func TestPulsar(t *testing.T) { Run() } -func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) { +func (p *pulsarSuite) TestPulsarMultipleSubsSameConsumerIDs() { + t := p.T() consumerGroup1 := watcher.NewUnordered() consumerGroup2 := watcher.NewUnordered() @@ -312,10 +424,10 @@ func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -335,7 +447,7 @@ func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -348,7 +460,7 @@ func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) { // Run the Dapr sidecar with the component 2. Step(sidecar.Run(sidecarName2, - embedded.WithComponentsPath("./components/consumer_two"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -362,7 +474,9 @@ func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) { Run() } -func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) { +func (p *pulsarSuite) TestPulsarMultipleSubsDifferentConsumerIDs() { + t := p.T() + consumerGroup1 := watcher.NewUnordered() consumerGroup2 := watcher.NewUnordered() @@ -376,10 +490,10 @@ func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -401,7 +515,7 @@ func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -414,7 +528,7 @@ func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) { // Run the Dapr sidecar with the component 2. Step(sidecar.Run(sidecarName2, - embedded.WithComponentsPath("./components/consumer_two"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -427,7 +541,8 @@ func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) { Run() } -func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) { +func (p *pulsarSuite) TestPulsarMultiplePubSubsDifferentConsumerIDs() { + t := p.T() consumerGroup1 := watcher.NewUnordered() consumerGroup2 := watcher.NewUnordered() @@ -445,10 +560,10 @@ func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -470,7 +585,7 @@ func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -483,7 +598,7 @@ func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) { // Run the Dapr sidecar with the component 2. Step(sidecar.Run(sidecarName2, - embedded.WithComponentsPath("./components/consumer_two"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -498,7 +613,8 @@ func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) { Run() } -func TestPulsarNonexistingTopic(t *testing.T) { +func (p *pulsarSuite) TestPulsarNonexistingTopic() { + t := p.T() consumerGroup1 := watcher.NewUnordered() // Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages. @@ -511,10 +627,10 @@ func TestPulsarNonexistingTopic(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3), subscriberApplication(appID1, topicToBeCreated, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -537,7 +653,7 @@ func TestPulsarNonexistingTopic(t *testing.T) { })). // Run the Dapr sidecar with the component entitymanagement Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*3), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*3), @@ -550,7 +666,8 @@ func TestPulsarNonexistingTopic(t *testing.T) { Run() } -func TestPulsarNetworkInterruption(t *testing.T) { +func (p *pulsarSuite) TestPulsarNetworkInterruption() { + t := p.T() consumerGroup1 := watcher.NewUnordered() // Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages. @@ -563,10 +680,10 @@ func TestPulsarNetworkInterruption(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -589,7 +706,7 @@ func TestPulsarNetworkInterruption(t *testing.T) { })). // Run the Dapr sidecar with the component entitymanagement Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -603,7 +720,8 @@ func TestPulsarNetworkInterruption(t *testing.T) { Run() } -func TestPulsarPersitant(t *testing.T) { +func (p *pulsarSuite) TestPulsarPersitant() { + t := p.T() consumerGroup1 := watcher.NewUnordered() flow.New(t, "pulsar certification persistant test"). @@ -611,10 +729,10 @@ func TestPulsarPersitant(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -636,23 +754,24 @@ func TestPulsarPersitant(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), componentRuntimeOptions(), )). Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1)). - Step("stop pulsar server", dockercompose.Stop(clusterName, dockerComposeYAML, "standalone")). + Step("stop pulsar server", dockercompose.Stop(clusterName, p.dockerComposeYAML, p.services...)). Step("wait", flow.Sleep(5*time.Second)). - Step("start pulsar server", dockercompose.Start(clusterName, dockerComposeYAML, "standalone")). + Step("start pulsar server", dockercompose.Start(clusterName, p.dockerComposeYAML, p.services...)). Step("wait", flow.Sleep(10*time.Second)). Step("verify if app1 has received messages published to active topic", assertMessages(10*time.Second, consumerGroup1)). Step("reset", flow.Reset(consumerGroup1)). Run() } -func TestPulsarDelay(t *testing.T) { +func (p *pulsarSuite) TestPulsarDelay() { + t := p.T() consumerGroup1 := watcher.NewUnordered() date := time.Now() @@ -682,10 +801,10 @@ func TestPulsarDelay(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -707,7 +826,7 @@ func TestPulsarDelay(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_three"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_three")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -730,7 +849,8 @@ type schemaTest struct { Name string `json:"name"` } -func TestPulsarSchema(t *testing.T) { +func (p *pulsarSuite) TestPulsarSchema() { + t := p.T() consumerGroup1 := watcher.NewUnordered() publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { @@ -772,10 +892,10 @@ func TestPulsarSchema(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -795,7 +915,7 @@ func TestPulsarSchema(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_four"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_four")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -818,7 +938,7 @@ func componentRuntimeOptions() []runtime.Option { } } -func createMultiPartitionTopic(tenant, namespace, topic string, partition int) flow.Runnable { +func (p *pulsarSuite) createMultiPartitionTopic(tenant, namespace, topic string, partition int) flow.Runnable { return func(ctx flow.Context) error { reqURL := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/%s/%s/%s/partitions", tenant, namespace, topic) @@ -838,6 +958,19 @@ func createMultiPartitionTopic(tenant, namespace, topic string, partition int) f req.Header.Set("Content-Type", "application/json") + if p.authType == "oidc" { + cc, err := p.oidcClientCredentials() + if err != nil { + return err + } + token, err := cc.Token() + if err != nil { + return err + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } + rsp, err := http.DefaultClient.Do(req) if err != nil { @@ -858,7 +991,8 @@ func createMultiPartitionTopic(tenant, namespace, topic string, partition int) f } } -func TestPulsarPartitionedOrderingProcess(t *testing.T) { +func (p *pulsarSuite) TestPulsarPartitionedOrderingProcess() { + t := p.T() consumerGroup1 := watcher.NewOrdered() // Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages. @@ -867,14 +1001,14 @@ func TestPulsarPartitionedOrderingProcess(t *testing.T) { } flow.New(t, "pulsar certification - process message in order with partitioned-topic"). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset), subscriberApplicationWithoutError(appID1, topicMultiPartitionName, consumerGroup1))). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -897,10 +1031,10 @@ func TestPulsarPartitionedOrderingProcess(t *testing.T) { return err })). Step("create multi-partition topic explicitly", retry.Do(10*time.Second, 30, - createMultiPartitionTopic("public", "default", topicMultiPartitionName, 4))). + p.createMultiPartitionTopic("public", "default", topicMultiPartitionName, 4))). // Run the Dapr sidecar with the component entitymanagement Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_one"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset), @@ -913,7 +1047,7 @@ func TestPulsarPartitionedOrderingProcess(t *testing.T) { // Run the Dapr sidecar with the component 2. Step(sidecar.Run(sidecarName2, - embedded.WithComponentsPath("./components/consumer_two"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*3), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*3), @@ -927,7 +1061,8 @@ func TestPulsarPartitionedOrderingProcess(t *testing.T) { Run() } -func TestPulsarEncryptionFromFile(t *testing.T) { +func (p *pulsarSuite) TestPulsarEncryptionFromFile() { + t := p.T() consumerGroup1 := watcher.NewUnordered() publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { @@ -969,10 +1104,10 @@ func TestPulsarEncryptionFromFile(t *testing.T) { // Run subscriberApplication app1 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -992,7 +1127,7 @@ func TestPulsarEncryptionFromFile(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_five"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_five")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -1004,7 +1139,8 @@ func TestPulsarEncryptionFromFile(t *testing.T) { Run() } -func TestPulsarEncryptionFromData(t *testing.T) { +func (p *pulsarSuite) TestPulsarEncryptionFromData() { + t := p.T() consumerGroup1 := watcher.NewUnordered() publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable { @@ -1046,10 +1182,10 @@ func TestPulsarEncryptionFromData(t *testing.T) { // Run subscriberApplication app2 Step(app.Run(appID1, fmt.Sprintf(":%d", appPort), subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))). - Step(dockercompose.Run(clusterName, dockerComposeYAML)). + Step(dockercompose.Run(clusterName, p.dockerComposeYAML)). Step("wait", flow.Sleep(10*time.Second)). Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error { - client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"}) + client, err := p.client(t) if err != nil { return fmt.Errorf("could not create pulsar client: %v", err) } @@ -1069,7 +1205,7 @@ func TestPulsarEncryptionFromData(t *testing.T) { return err })). Step(sidecar.Run(sidecarName1, - embedded.WithComponentsPath("./components/consumer_six"), + embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_six")), embedded.WithAppProtocol(runtime.HTTPProtocol, appPort), embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort), embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort), @@ -1080,3 +1216,57 @@ func TestPulsarEncryptionFromData(t *testing.T) { Step("reset", flow.Reset(consumerGroup1)). Run() } + +func (p *pulsarSuite) client(t *testing.T) (pulsar.Client, error) { + t.Helper() + + opts := pulsar.ClientOptions{ + URL: "pulsar://localhost:6650", + } + switch p.authType { + case "oidc": + cc, err := p.oidcClientCredentials() + require.NoError(t, err) + opts.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token) + default: + } + + return pulsar.NewClient(opts) +} + +func (p *pulsarSuite) oidcClientCredentials() (*oidc.ClientCredentials, error) { + cc, err := oidc.NewClientCredentials(context.Background(), oidc.ClientCredentialsOptions{ + Logger: logger.NewLogger("dapr.test.readiness"), + TokenURL: "https://localhost:8085/issuer1/token", + ClientID: "foo", + ClientSecret: "bar", + Scopes: []string{"openid"}, + Audiences: []string{"pulsar"}, + CAPEM: p.oidcCAPEM, + }) + if err != nil { + return nil, err + } + + return cc, nil +} + +func peerCertificate(t *testing.T, hostport string) []byte { + conf := &tls.Config{InsecureSkipVerify: true} + + for { + time.Sleep(1 * time.Second) + + conn, err := tls.Dial("tcp", hostport, conf) + if err != nil { + t.Log(err) + continue + } + + defer conn.Close() + + certs := conn.ConnectionState().PeerCertificates + require.Len(t, certs, 1, "expected 1 peer certificate") + return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs[0].Raw}) + } +}