Skip to content

Commit

Permalink
Implement an authorization layer for operator-to-workspace communicat…
Browse files Browse the repository at this point in the history
…ion (#712)

<!--Thanks for your contribution. See [CONTRIBUTING](CONTRIBUTING.md)
    for Pulumi's contribution guidelines.

    Help us merge your changes more quickly by adding more details such
    as labels, milestones, and reviewers.-->

### Overview

This PR implements an authentication and authorization layer for the
agent's RPC endpoint.

Authentication is performed by authenticating a bearer token via the
TokenReview API. The operator uses its built-in service account token.
Authorization is performed via the SubjectAccessReview API, which checks
for following RBAC permission:
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
rules:
- apiGroups:
  - auto.pulumi.com
  resources:
  - workspaces/rpc
  verbs:
  - use
```

The workspace pod's service account must be granted the
`system:auth-delegator` role using a `ClusterRoleBinding`. For.
convenience, the installer creates a service account named `pulumi` into
the `default` namespace, with an associated binding.

The operator itself is granted the necessary permission to access the
RPC endpoint.

### Proposed changes

<!--Give us a brief description of what you've done and what it solves.
-->

- [x] agent grpc interceptor
- [x] agent command args (`--auth-mode=kube`,
`--kube-workspace-name=random-yaml`)
- [x] operator client credentials
- [x] operator RBAC permissions
- [ ] ~cluster role binding for workspace service account (to
ClusterRole named `system:auth-delegator`)~
- [x] install a "default/pulumi" service account with RBAC
- [x] Provide a Flux sample network policy
- [x] Update the e2e test manifests to have requisite account, rbac, and
network policy.

### Future Enhancement

This implementation uses the operator's default service account token,
but to further improve security it should use
an audience-scoped token, where the audience is the agent service
address as opposed to the API server. Such tokens may be created by the
operator with a call to TokenRequest, and checked with TokenReview by
adding the expected audience to the context
(`authenticator.WithAudience`).

### Related issues (optional)

<!--Refer to related PRs or issues: #1234, or 'Fixes #1234' or 'Closes
#1234'.
Or link to full URLs to issues or pull requests in other GitHub
repositories. -->

Closes #609 

#### Examples
Some example requests:
```
random-yaml-workspace-0 pulumi 2024-10-09T21:09:43.905Z INFO    cmd.serve.grpc  finished unary call with code OK        {"grpc.start_time": "2024-10-09T21:09:43Z", "grpc.request.deadline": "2024-10-09T21:59:43Z", "system": "grpc", "span.kind": "server", "grpc.service": "agent.AutomationService", "grpc.method": "WhoAmI", "user.id": "81be050c-9ad4-4708-9a52-413064700747", "user.name": "system:serviceaccount:default:dev", "peer.address": "127.0.0.1:56394", "auth.mode": "kubernetes", "grpc.code": "OK", "grpc.time_ms": 441.086}

random-yaml-workspace-0 pulumi 2024-10-09T21:09:52.934Z INFO    cmd.serve.grpc  finished unary call with code Unauthenticated   {"grpc.start_time": "2024-10-09T21:09:52Z", "grpc.request.deadline": "2024-10-09T21:59:52Z", "system": "grpc", "span.kind": "server", "grpc.service": "agent.AutomationService", "grpc.method": "WhoAmI", "peer.address": "127.0.0.1:57380", "auth.mode": "kubernetes", "error": "rpc error: code = Unauthenticated desc = Request unauthenticated with Bearer", "grpc.code": "Unauthenticated", "grpc.time_ms": 0.095}
```
  • Loading branch information
EronWright authored Oct 12, 2024
1 parent a4c8810 commit 7883699
Show file tree
Hide file tree
Showing 27 changed files with 1,018 additions and 35 deletions.
19 changes: 18 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,26 @@
"-v=false",
"--workspace=${input:workdir}",
"-s=dev"
]
},
{
"name": "Agent (kubernetes)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "agent",
"args": [
"serve",
"-v=true",
"--workspace=${input:workdir}",
"-s=dev",
"--auth-mode=kube",
"--kube-workspace-namespace=default",
"--kube-workspace-name=random-yaml"
],
"env": {
"AWS_REGION": "us-west-1",
"POD_NAMESPACE": "default",
"POD_SA_NAME": "fake"
}
}
],
Expand Down
23 changes: 22 additions & 1 deletion agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ package cmd
import (
"os"

"flag"

"github.com/spf13/cobra"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

var verbose bool
var (
verbose bool
kubeContext string
)

// a command-specific logger
var log *zap.SugaredLogger
Expand Down Expand Up @@ -51,6 +58,7 @@ to use to perform stack operations.`,

// initialize a command-specific logger
log = zap.L().Named("cmd").Named(cmd.Name()).Sugar()
cmd.SilenceErrors = true
return nil
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
Expand All @@ -64,10 +72,23 @@ to use to perform stack operations.`,
func Execute() {
err := rootCmd.Execute()
if err != nil {
if log != nil {
log.Error(err.Error())
}
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging")

// register the Kubernetes flags (e.g. for serve command when using Kubernetes RBAC for authorization)
fs := flag.NewFlagSet("kubernetes", flag.ExitOnError)
config.RegisterFlags(fs)
rootCmd.PersistentFlags().AddGoFlagSet(fs)
rootCmd.PersistentFlags().StringVar(&kubeContext, "context", "", "Kubernetes context override")
}

func GetKubeConfig() (*rest.Config, error) {
return config.GetConfigWithContext(kubeContext)
}
78 changes: 62 additions & 16 deletions agent/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ import (
"runtime/debug"
"syscall"

grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/pulumi/pulumi-kubernetes-operator/v2/agent/pkg/server"
"github.com/pulumi/pulumi-kubernetes-operator/v2/agent/version"
"github.com/pulumi/pulumi/sdk/v3/go/auto"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
)

const (
AuthModeNone = "none"
AuthModeKubernetes = "kube"
)

var (
Expand All @@ -40,6 +47,10 @@ var (
_stack string
_host string
_port int

_authMode string
_workspaceNamespace string
_workspaceName string
)

// serveCmd represents the serve command
Expand All @@ -48,7 +59,22 @@ var serveCmd = &cobra.Command{
Short: "Serve the agent RPC service",
Long: `Start the agent gRPC server.
`,
Run: func(cmd *cobra.Command, args []string) {
PreRunE: func(cmd *cobra.Command, args []string) error {
if _authMode != AuthModeNone && _authMode != AuthModeKubernetes {
return fmt.Errorf("unsupported auth mode: %s", _authMode)
}
if _authMode == AuthModeKubernetes {
if _workspaceNamespace == "" {
return fmt.Errorf("--kube-workspace-namespace is required when auth mode is kubernetes")
}
if _workspaceName == "" {
return fmt.Errorf("--kube-workspace-name is required when auth mode is kubernetes")
}
}
cmd.SilenceUsage = true
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

log.Infow("Pulumi Kubernetes Agent", "version", version.Version)
Expand All @@ -63,23 +89,42 @@ var serveCmd = &cobra.Command{
}
}

// Prepare the authorizer function
var authFunc grpc_auth.AuthFunc
switch _authMode {
case AuthModeKubernetes:
kubeConfig, err := GetKubeConfig()
if err != nil {
return fmt.Errorf("unable to load the kubeconfig: %w", err)
}

authFunc, err = server.NewKubeAuth(log.Desugar(), kubeConfig, server.KubeAuthOptions{
WorkspaceName: types.NamespacedName{
Namespace: _workspaceNamespace,
Name: _workspaceName,
},
})
if err != nil {
return fmt.Errorf("unable to initialize the Kubernetes authorizer: %w", err)
}
log.Infow("activated the Kubernetes authorization mode",
zap.String("workspace.namespace", _workspaceNamespace), zap.String("workspace.name", _workspaceName))
}

// open the workspace using auto api
workspaceOpts := []auto.LocalWorkspaceOption{}
workDir, err := filepath.EvalSymlinks(_workDir) // resolve the true location of the workspace
if err != nil {
log.Fatalw("unable to resolve the workspace directory", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to resolve the workspace directory: %w", err)
}
workspaceOpts = append(workspaceOpts, auto.WorkDir(workDir))
workspace, err := auto.NewLocalWorkspace(ctx, workspaceOpts...)
if err != nil {
log.Fatalw("unable to open the workspace", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to open the workspace: %w", err)
}
proj, err := workspace.ProjectSettings(ctx)
if err != nil {
log.Fatalw("unable to get the project settings", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to get the project settings: %w", err)
}
log.Infow("opened a local workspace", "workspace", workDir,
"project", proj.Name, "runtime", proj.Runtime.Name())
Expand All @@ -96,8 +141,7 @@ var serveCmd = &cobra.Command{
}
log.Infow("installing project dependencies")
if err := workspace.Install(ctx, opts); err != nil {
log.Fatalw("installation failed", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to install project dependencies: %w", err)
}
log.Infow("installation completed")
} else {
Expand All @@ -110,30 +154,28 @@ var serveCmd = &cobra.Command{
StackName: _stack,
})
if err != nil {
log.Fatalw("unable to make an automation server", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to make an automation server: %w", err)
}
address := fmt.Sprintf("%s:%d", _host, _port)
log.Infow("starting the RPC server", "address", address)

s := server.NewGRPC(autoServer, log)
s := server.NewGRPC(log, autoServer, authFunc)

// Start the grpc server
lis, err := net.Listen("tcp", address)
if err != nil {
log.Errorw("fatal: unable to start the RPC server", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to listen on %s: %w", address, err)
}
log.Infow("server listening", "address", lis.Addr(), "workspace", workDir)

ctx, cancel := context.WithCancel(ctx)
setupSignalHandler(cancel)
if err := s.Serve(ctx, lis); err != nil {
log.Errorw("fatal: server failure", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unexpected serve error: %w", err)
}

log.Infow("server stopped")
return nil
},
}

Expand Down Expand Up @@ -166,4 +208,8 @@ func init() {

serveCmd.Flags().StringVar(&_host, "host", "0.0.0.0", "Server bind address (default: 0.0.0.0)")
serveCmd.Flags().IntVar(&_port, "port", 50051, "Server port (default: 50051)")

serveCmd.Flags().StringVar(&_authMode, "auth-mode", AuthModeNone, "Authorization mode (none, kube)")
serveCmd.Flags().StringVar(&_workspaceNamespace, "kube-workspace-namespace", os.Getenv("WORKSPACE_NAMESPACE"), "The Workspace object namespace (for kubernetes auth mode)")
serveCmd.Flags().StringVar(&_workspaceName, "kube-workspace-name", os.Getenv("WORKSPACE_NAME"), "The Workspace object name (for kubernetes auth mode)")
}
27 changes: 27 additions & 0 deletions agent/hack/token.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Apply this manifest file to create a token with which to authenticate to the agent.
# To get the token, run the following command: kubectl describe secret/dev-token
# To test: kubectl auth can-i use workspaces/random-yaml --subresource rpc --as system:serviceaccount:default:dev
apiVersion: v1
kind: ServiceAccount
metadata:
name: dev
---
apiVersion: v1
kind: Secret
metadata:
name: dev-token
annotations:
kubernetes.io/service-account.name: dev
type: kubernetes.io/service-account-token
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: dev:cluster-admin
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: dev
75 changes: 75 additions & 0 deletions agent/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright © 2024 Pulumi Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"

"golang.org/x/oauth2"
"google.golang.org/grpc/credentials"
"k8s.io/client-go/transport"
)

// NewTokenCredentials adds the provided bearer token to a request.
// If tokenFile is non-empty, it is periodically read,
// and the last successfully read content is used as the bearer token.
// If tokenFile is non-empty and bearer is empty, the tokenFile is read
// immediately to populate the initial bearer token.
func NewTokenCredentials(bearer string, tokenFile string) (*TokenCredentials, error) {
if len(tokenFile) == 0 {
return &TokenCredentials{bearer, nil}, nil
}
source := transport.NewCachedFileTokenSource(tokenFile)
if len(bearer) == 0 {
token, err := source.Token()
if err != nil {
return nil, err
}
bearer = token.AccessToken
}
return &TokenCredentials{bearer, source}, nil
}

type TokenCredentials struct {
bearer string
source oauth2.TokenSource
}

// GetRequestMetadata gets the current request metadata, refreshing tokens
// if required. This should be called by the transport layer on each
// request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status for
// the RPC (restricted to an allowable set of codes as defined by gRFC
// A54). uri is the URI of the entry point for the request. When supported
// by the underlying implementation, ctx can be used for timeout and
// cancellation. Additionally, RequestInfo data will be available via ctx
// to this call.
func (k *TokenCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
token := k.bearer
if k.source != nil {
if refreshedToken, err := k.source.Token(); err == nil {
token = refreshedToken.AccessToken
}
}
return map[string]string{"authorization": "Bearer " + token}, nil
}

func (k *TokenCredentials) RequireTransportSecurity() bool {
return false
}

var _ credentials.PerRPCCredentials = &TokenCredentials{}
Loading

0 comments on commit 7883699

Please sign in to comment.