Skip to content

Commit

Permalink
Merge branch 'v2' of github.com:pulumi/pulumi-kubernetes-operator int…
Browse files Browse the repository at this point in the history
…o blampe/applyconfig
  • Loading branch information
blampe committed Oct 17, 2024
2 parents de8481c + 2884fad commit f3715ee
Show file tree
Hide file tree
Showing 30 changed files with 1,312 additions and 102 deletions.
19 changes: 18 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,26 @@
"-v=false",
"--workspace=${input:workdir}",
"-s=dev"
]
},
{
"name": "Agent (kubernetes)",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "agent",
"args": [
"serve",
"-v=true",
"--workspace=${input:workdir}",
"-s=dev",
"--auth-mode=kube",
"--kube-workspace-namespace=default",
"--kube-workspace-name=random-yaml"
],
"env": {
"AWS_REGION": "us-west-1",
"POD_NAMESPACE": "default",
"POD_SA_NAME": "fake"
}
}
],
Expand Down
23 changes: 22 additions & 1 deletion agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ package cmd
import (
"os"

"flag"

"github.com/spf13/cobra"
"go.uber.org/zap"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

var verbose bool
var (
verbose bool
kubeContext string
)

// a command-specific logger
var log *zap.SugaredLogger
Expand Down Expand Up @@ -51,6 +58,7 @@ to use to perform stack operations.`,

// initialize a command-specific logger
log = zap.L().Named("cmd").Named(cmd.Name()).Sugar()
cmd.SilenceErrors = true
return nil
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {
Expand All @@ -64,10 +72,23 @@ to use to perform stack operations.`,
func Execute() {
err := rootCmd.Execute()
if err != nil {
if log != nil {
log.Error(err.Error())
}
os.Exit(1)
}
}

func init() {
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose logging")

// register the Kubernetes flags (e.g. for serve command when using Kubernetes RBAC for authorization)
fs := flag.NewFlagSet("kubernetes", flag.ExitOnError)
config.RegisterFlags(fs)
rootCmd.PersistentFlags().AddGoFlagSet(fs)
rootCmd.PersistentFlags().StringVar(&kubeContext, "context", "", "Kubernetes context override")
}

func GetKubeConfig() (*rest.Config, error) {
return config.GetConfigWithContext(kubeContext)
}
78 changes: 62 additions & 16 deletions agent/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ import (
"runtime/debug"
"syscall"

grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"github.com/pulumi/pulumi-kubernetes-operator/v2/agent/pkg/server"
"github.com/pulumi/pulumi-kubernetes-operator/v2/agent/version"
"github.com/pulumi/pulumi/sdk/v3/go/auto"
"github.com/spf13/cobra"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
)

const (
AuthModeNone = "none"
AuthModeKubernetes = "kube"
)

var (
Expand All @@ -40,6 +47,10 @@ var (
_stack string
_host string
_port int

_authMode string
_workspaceNamespace string
_workspaceName string
)

// serveCmd represents the serve command
Expand All @@ -48,7 +59,22 @@ var serveCmd = &cobra.Command{
Short: "Serve the agent RPC service",
Long: `Start the agent gRPC server.
`,
Run: func(cmd *cobra.Command, args []string) {
PreRunE: func(cmd *cobra.Command, args []string) error {
if _authMode != AuthModeNone && _authMode != AuthModeKubernetes {
return fmt.Errorf("unsupported auth mode: %s", _authMode)
}
if _authMode == AuthModeKubernetes {
if _workspaceNamespace == "" {
return fmt.Errorf("--kube-workspace-namespace is required when auth mode is kubernetes")
}
if _workspaceName == "" {
return fmt.Errorf("--kube-workspace-name is required when auth mode is kubernetes")
}
}
cmd.SilenceUsage = true
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

log.Infow("Pulumi Kubernetes Agent", "version", version.Version)
Expand All @@ -63,23 +89,42 @@ var serveCmd = &cobra.Command{
}
}

// Prepare the authorizer function
var authFunc grpc_auth.AuthFunc
switch _authMode {
case AuthModeKubernetes:
kubeConfig, err := GetKubeConfig()
if err != nil {
return fmt.Errorf("unable to load the kubeconfig: %w", err)
}

authFunc, err = server.NewKubeAuth(log.Desugar(), kubeConfig, server.KubeAuthOptions{
WorkspaceName: types.NamespacedName{
Namespace: _workspaceNamespace,
Name: _workspaceName,
},
})
if err != nil {
return fmt.Errorf("unable to initialize the Kubernetes authorizer: %w", err)
}
log.Infow("activated the Kubernetes authorization mode",
zap.String("workspace.namespace", _workspaceNamespace), zap.String("workspace.name", _workspaceName))
}

// open the workspace using auto api
workspaceOpts := []auto.LocalWorkspaceOption{}
workDir, err := filepath.EvalSymlinks(_workDir) // resolve the true location of the workspace
if err != nil {
log.Fatalw("unable to resolve the workspace directory", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to resolve the workspace directory: %w", err)
}
workspaceOpts = append(workspaceOpts, auto.WorkDir(workDir))
workspace, err := auto.NewLocalWorkspace(ctx, workspaceOpts...)
if err != nil {
log.Fatalw("unable to open the workspace", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to open the workspace: %w", err)
}
proj, err := workspace.ProjectSettings(ctx)
if err != nil {
log.Fatalw("unable to get the project settings", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to get the project settings: %w", err)
}
log.Infow("opened a local workspace", "workspace", workDir,
"project", proj.Name, "runtime", proj.Runtime.Name())
Expand All @@ -96,8 +141,7 @@ var serveCmd = &cobra.Command{
}
log.Infow("installing project dependencies")
if err := workspace.Install(ctx, opts); err != nil {
log.Fatalw("installation failed", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to install project dependencies: %w", err)
}
log.Infow("installation completed")
} else {
Expand All @@ -110,30 +154,28 @@ var serveCmd = &cobra.Command{
StackName: _stack,
})
if err != nil {
log.Fatalw("unable to make an automation server", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to make an automation server: %w", err)
}
address := fmt.Sprintf("%s:%d", _host, _port)
log.Infow("starting the RPC server", "address", address)

s := server.NewGRPC(autoServer, log)
s := server.NewGRPC(log, autoServer, authFunc)

// Start the grpc server
lis, err := net.Listen("tcp", address)
if err != nil {
log.Errorw("fatal: unable to start the RPC server", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unable to listen on %s: %w", address, err)
}
log.Infow("server listening", "address", lis.Addr(), "workspace", workDir)

ctx, cancel := context.WithCancel(ctx)
setupSignalHandler(cancel)
if err := s.Serve(ctx, lis); err != nil {
log.Errorw("fatal: server failure", zap.Error(err))
os.Exit(1)
return fmt.Errorf("unexpected serve error: %w", err)
}

log.Infow("server stopped")
return nil
},
}

Expand Down Expand Up @@ -166,4 +208,8 @@ func init() {

serveCmd.Flags().StringVar(&_host, "host", "0.0.0.0", "Server bind address (default: 0.0.0.0)")
serveCmd.Flags().IntVar(&_port, "port", 50051, "Server port (default: 50051)")

serveCmd.Flags().StringVar(&_authMode, "auth-mode", AuthModeNone, "Authorization mode (none, kube)")
serveCmd.Flags().StringVar(&_workspaceNamespace, "kube-workspace-namespace", os.Getenv("WORKSPACE_NAMESPACE"), "The Workspace object namespace (for kubernetes auth mode)")
serveCmd.Flags().StringVar(&_workspaceName, "kube-workspace-name", os.Getenv("WORKSPACE_NAME"), "The Workspace object name (for kubernetes auth mode)")
}
27 changes: 27 additions & 0 deletions agent/hack/token.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Apply this manifest file to create a token with which to authenticate to the agent.
# To get the token, run the following command: kubectl describe secret/dev-token
# To test: kubectl auth can-i use workspaces/random-yaml --subresource rpc --as system:serviceaccount:default:dev
apiVersion: v1
kind: ServiceAccount
metadata:
name: dev
---
apiVersion: v1
kind: Secret
metadata:
name: dev-token
annotations:
kubernetes.io/service-account.name: dev
type: kubernetes.io/service-account-token
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: dev:cluster-admin
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: dev
75 changes: 75 additions & 0 deletions agent/pkg/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright © 2024 Pulumi Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"

"golang.org/x/oauth2"
"google.golang.org/grpc/credentials"
"k8s.io/client-go/transport"
)

// NewTokenCredentials adds the provided bearer token to a request.
// If tokenFile is non-empty, it is periodically read,
// and the last successfully read content is used as the bearer token.
// If tokenFile is non-empty and bearer is empty, the tokenFile is read
// immediately to populate the initial bearer token.
func NewTokenCredentials(bearer string, tokenFile string) (*TokenCredentials, error) {
if len(tokenFile) == 0 {
return &TokenCredentials{bearer, nil}, nil
}
source := transport.NewCachedFileTokenSource(tokenFile)
if len(bearer) == 0 {
token, err := source.Token()
if err != nil {
return nil, err
}
bearer = token.AccessToken
}
return &TokenCredentials{bearer, source}, nil
}

type TokenCredentials struct {
bearer string
source oauth2.TokenSource
}

// GetRequestMetadata gets the current request metadata, refreshing tokens
// if required. This should be called by the transport layer on each
// request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status for
// the RPC (restricted to an allowable set of codes as defined by gRFC
// A54). uri is the URI of the entry point for the request. When supported
// by the underlying implementation, ctx can be used for timeout and
// cancellation. Additionally, RequestInfo data will be available via ctx
// to this call.
func (k *TokenCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
token := k.bearer
if k.source != nil {
if refreshedToken, err := k.source.Token(); err == nil {
token = refreshedToken.AccessToken
}
}
return map[string]string{"authorization": "Bearer " + token}, nil
}

func (k *TokenCredentials) RequireTransportSecurity() bool {
return false
}

var _ credentials.PerRPCCredentials = &TokenCredentials{}
Loading

0 comments on commit f3715ee

Please sign in to comment.