Skip to content

Commit

Permalink
Pubsub pulsar authentication ~OIDC~ OAuth2 (#3026)
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <[email protected]>
Signed-off-by: Alessandro (Ale) Segala <[email protected]>
Co-authored-by: Alessandro (Ale) Segala <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
Co-authored-by: Bernd Verst <[email protected]>
  • Loading branch information
4 people authored Aug 3, 2023
1 parent 566c7fd commit 7937d34
Show file tree
Hide file tree
Showing 22 changed files with 874 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/certification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ jobs:
set +e
gotestsum --jsonfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.json \
--junitfile ${{ env.TEST_OUTPUT_FILE_PREFIX }}_certification.xml --format standard-quiet -- \
-coverprofile=cover.out -covermode=set -tags=certtests -coverpkg=${{ matrix.source-pkg }}
-coverprofile=cover.out -covermode=set -tags=certtests -timeout=30m -coverpkg=${{ matrix.source-pkg }}
status=$?
echo "Completed certification tests for ${{ matrix.component }} ... "
if test $status -ne 0; then
Expand Down
165 changes: 165 additions & 0 deletions internal/authentication/oauth2/clientcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oauth2

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"net/url"
"sync"
"time"

"golang.org/x/oauth2"
ccreds "golang.org/x/oauth2/clientcredentials"

"github.com/dapr/kit/logger"
)

// ClientCredentialsMetadata is the metadata fields which can be used by a
// component to configure an OIDC client_credentials token source.
type ClientCredentialsMetadata struct {
TokenCAPEM string `mapstructure:"oauth2TokenCAPEM"`
TokenURL string `mapstructure:"oauth2TokenURL"`
ClientID string `mapstructure:"oauth2ClientID"`
ClientSecret string `mapstructure:"oauth2ClientSecret"`
Audiences []string `mapstructure:"oauth2Audiences"`
Scopes []string `mapstructure:"oauth2Scopes"`
}

type ClientCredentialsOptions struct {
Logger logger.Logger
TokenURL string
ClientID string
ClientSecret string
Scopes []string
Audiences []string
CAPEM []byte
}

// ClientCredentials is an OAuth2 Token Source that uses the client_credentials
// grant type to fetch a token.
type ClientCredentials struct {
log logger.Logger
currentToken *oauth2.Token
httpClient *http.Client
fetchTokenFn func(context.Context) (*oauth2.Token, error)

lock sync.RWMutex
}

func NewClientCredentials(ctx context.Context, opts ClientCredentialsOptions) (*ClientCredentials, error) {
conf, httpClient, err := opts.toConfig()
if err != nil {
return nil, err
}

token, err := conf.Token(context.WithValue(ctx, oauth2.HTTPClient, httpClient))
if err != nil {
return nil, fmt.Errorf("error fetching initial oauth2 client_credentials token: %w", err)
}

opts.Logger.Info("Fetched initial oauth2 client_credentials token")

return &ClientCredentials{
log: opts.Logger,
currentToken: token,
httpClient: httpClient,
fetchTokenFn: conf.Token,
}, nil
}

func (c *ClientCredentialsOptions) toConfig() (*ccreds.Config, *http.Client, error) {
if len(c.Scopes) == 0 {
return nil, nil, errors.New("oauth2 client_credentials token source requires at least one scope")
}

if len(c.Audiences) == 0 {
return nil, nil, errors.New("oauth2 client_credentials token source requires at least one audience")
}

_, err := url.Parse(c.TokenURL)
if err != nil {
return nil, nil, fmt.Errorf("error parsing token URL: %w", err)
}

conf := &ccreds.Config{
ClientID: c.ClientID,
ClientSecret: c.ClientSecret,
TokenURL: c.TokenURL,
Scopes: c.Scopes,
EndpointParams: url.Values{"audience": c.Audiences},
}

// If caPool is nil, then the Go TLS library will use the system's root CA.
var caPool *x509.CertPool
if len(c.CAPEM) > 0 {
caPool = x509.NewCertPool()
if !caPool.AppendCertsFromPEM(c.CAPEM) {
return nil, nil, errors.New("failed to parse CA PEM")
}
}

return conf, &http.Client{
Timeout: time.Second * 30,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: caPool,
},
},
}, nil
}

func (c *ClientCredentials) Token() (string, error) {
c.lock.RLock()
defer c.lock.RUnlock()

if !c.currentToken.Valid() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := c.renewToken(ctx); err != nil {
return "", err
}
}

return c.currentToken.AccessToken, nil
}

func (c *ClientCredentials) renewToken(ctx context.Context) error {
c.lock.Lock()
defer c.lock.Unlock()

// We need to check if the current token is valid because we might have lost
// the mutex lock race from the caller and we don't want to double-fetch a
// token unnecessarily!
if c.currentToken.Valid() {
return nil
}

token, err := c.fetchTokenFn(context.WithValue(ctx, oauth2.HTTPClient, c.httpClient))
if err != nil {
return err
}

if !c.currentToken.Valid() {
return errors.New("oauth2 client_credentials token source returned an invalid token")
}

c.currentToken = token
return nil
}
95 changes: 95 additions & 0 deletions internal/authentication/oauth2/clientcredentials_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package oauth2

import (
"net/url"
"testing"

"github.com/stretchr/testify/assert"
ccreds "golang.org/x/oauth2/clientcredentials"
)

func Test_toConfig(t *testing.T) {
tests := map[string]struct {
opts ClientCredentialsOptions
expConfig *ccreds.Config
expErr bool
}{
"no scopes should error": {
opts: ClientCredentialsOptions{
TokenURL: "https://localhost:8080",
ClientID: "client-id",
ClientSecret: "client-secret",
Audiences: []string{"audience"},
},
expErr: true,
},
"bad URL endpoint should error": {
opts: ClientCredentialsOptions{
TokenURL: "&&htp:/f url",
ClientID: "client-id",
ClientSecret: "client-secret",
Audiences: []string{"audience"},
Scopes: []string{"foo"},
},
expErr: true,
},
"bad CA certificate should error": {
opts: ClientCredentialsOptions{
TokenURL: "http://localhost:8080",
ClientID: "client-id",
ClientSecret: "client-secret",
Audiences: []string{"audience"},
Scopes: []string{"foo"},
CAPEM: []byte("ca-pem"),
},
expErr: true,
},
"no audiences should error": {
opts: ClientCredentialsOptions{
TokenURL: "http://localhost:8080",
ClientID: "client-id",
ClientSecret: "client-secret",
Scopes: []string{"foo"},
},
expErr: true,
},
"should default scope": {
opts: ClientCredentialsOptions{
TokenURL: "http://localhost:8080",
ClientID: "client-id",
ClientSecret: "client-secret",
Audiences: []string{"audience"},
Scopes: []string{"foo", "bar"},
},
expConfig: &ccreds.Config{
ClientID: "client-id",
ClientSecret: "client-secret",
TokenURL: "http://localhost:8080",
Scopes: []string{"foo", "bar"},
EndpointParams: url.Values{"audience": []string{"audience"}},
},
expErr: false,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
config, _, err := test.opts.toConfig()
assert.Equalf(t, test.expErr, err != nil, "%v", err)
assert.Equal(t, test.expConfig, config)
})
}
}
10 changes: 8 additions & 2 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ limitations under the License.

package pulsar

import "time"
import (
"time"

"github.com/dapr/components-contrib/internal/authentication/oauth2"
)

type pulsarMetadata struct {
Host string `mapstructure:"host"`
Expand All @@ -26,12 +30,14 @@ type pulsarMetadata struct {
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
Token string `mapstructure:"token"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`

Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}

type schemaMetadata struct {
Expand Down
27 changes: 23 additions & 4 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"sync/atomic"
"time"

"github.com/hamba/avro/v2"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/hamba/avro/v2"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/dapr/components-contrib/internal/authentication/oauth2"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
Expand Down Expand Up @@ -157,7 +157,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return &m, nil
}

func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error {
func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
m, err := parsePulsarMetadata(metadata)
if err != nil {
return err
Expand All @@ -173,9 +173,28 @@ func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error {
ConnectionTimeout: 30 * time.Second,
TLSAllowInsecureConnection: !m.EnableTLS,
}
if m.Token != "" {

switch {
case len(m.Token) > 0:
options.Authentication = pulsar.NewAuthenticationToken(m.Token)
case len(m.ClientCredentialsMetadata.TokenURL) > 0:
var cc *oauth2.ClientCredentials
cc, err = oauth2.NewClientCredentials(ctx, oauth2.ClientCredentialsOptions{
Logger: p.logger,
TokenURL: m.ClientCredentialsMetadata.TokenURL,
CAPEM: []byte(m.ClientCredentialsMetadata.TokenCAPEM),
ClientID: m.ClientCredentialsMetadata.ClientID,
ClientSecret: m.ClientCredentialsMetadata.ClientSecret,
Scopes: m.ClientCredentialsMetadata.Scopes,
Audiences: m.ClientCredentialsMetadata.Audiences,
})
if err != nil {
return fmt.Errorf("could not instantiate oauth2 token provider: %w", err)
}

options.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token)
}

client, err := pulsar.NewClient(options)
if err != nil {
return fmt.Errorf("could not instantiate pulsar client: %v", err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification5
- name: redeliveryDelay
value: 200ms
- name: publicKey
value: public.key
- name: privateKey
value: private.key
- name: keys
value: myapp.key
- name: oauth2TokenURL
value: https://localhost:8085/issuer1/token
- name: oauth2ClientID
value: foo
- name: oauth2ClientSecret
value: bar
- name: oauth2Scopes
value: openid
- name: oauth2Audiences
value: pulsar
- name: oauth2TokenCAPEM
value: "{{ .OAuth2CAPEM }}"
Loading

0 comments on commit 7937d34

Please sign in to comment.