diff --git a/bindings/kubernetes/kubernetes.go b/bindings/kubernetes/kubernetes.go index 55d94d4a52..043969d18a 100644 --- a/bindings/kubernetes/kubernetes.go +++ b/bindings/kubernetes/kubernetes.go @@ -17,13 +17,14 @@ import ( "context" "encoding/json" "errors" + "fmt" + "os" "reflect" - "strings" "sync" "sync/atomic" "time" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -32,28 +33,27 @@ import ( kubeclient "github.com/dapr/components-contrib/internal/authentication/kubernetes" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" - "github.com/dapr/kit/ptr" ) type kubernetesInput struct { - kubeClient kubernetes.Interface - namespace string - resyncPeriod time.Duration - logger logger.Logger - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup + metadata kubernetesMetadata + kubeClient kubernetes.Interface + logger logger.Logger + closed atomic.Bool + closeCh chan struct{} + wg sync.WaitGroup } type EventResponse struct { - Event string `json:"event"` - OldVal v1.Event `json:"oldVal"` - NewVal v1.Event `json:"newVal"` + Event string `json:"event"` + OldVal corev1.Event `json:"oldVal"` + NewVal corev1.Event `json:"newVal"` } type kubernetesMetadata struct { - Namespace string `mapstructure:"namespace"` - ResyncPeriod *time.Duration `mapstructure:"resyncPeriodInSec"` + Namespace string `mapstructure:"namespace"` + KubeconfigPath string `mapstructure:"kubeconfigPath"` + ResyncPeriod time.Duration `mapstructure:"resyncPeriod" mapstructurealiases:"resyncPeriodInSec"` } // NewKubernetes returns a new Kubernetes event input binding. @@ -65,32 +65,42 @@ func NewKubernetes(logger logger.Logger) bindings.InputBinding { } func (k *kubernetesInput) Init(ctx context.Context, metadata bindings.Metadata) error { - client, err := kubeclient.GetKubeClient(k.logger) + err := k.parseMetadata(metadata) if err != nil { - return err + return fmt.Errorf("failed to parse metadata: %w", err) + } + + kubeconfigPath := k.metadata.KubeconfigPath + if kubeconfigPath == "" { + kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args) + } + + client, err := kubeclient.GetKubeClient(kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to initialize Kubernetes client: %w", err) } k.kubeClient = client - return k.parseMetadata(metadata) + return nil } func (k *kubernetesInput) parseMetadata(meta bindings.Metadata) error { - m := kubernetesMetadata{} - err := metadata.DecodeMetadata(meta.Properties, &m) + // Set default values + k.metadata = kubernetesMetadata{ + ResyncPeriod: 10 * time.Second, + } + + // Decode + err := metadata.DecodeMetadata(meta.Properties, &k.metadata) if err != nil { - if strings.Contains(err.Error(), "resyncPeriodInSec") { - k.logger.Warnf("invalid resyncPeriodInSec; %v; defaulting to 10s", err) - m.ResyncPeriod = ptr.Of(time.Second * 10) - } else { - return err - } + return err } - k.resyncPeriod = *m.ResyncPeriod - if m.Namespace == "" { + // Validate + if k.metadata.Namespace == "" { return errors.New("namespace is missing in metadata") } - k.namespace = m.Namespace + return nil } @@ -101,21 +111,21 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er watchlist := cache.NewListWatchFromClient( k.kubeClient.CoreV1().RESTClient(), "events", - k.namespace, + k.metadata.Namespace, fields.Everything(), ) resultChan := make(chan EventResponse) _, controller := cache.NewInformer( watchlist, - &v1.Event{}, - k.resyncPeriod, + &corev1.Event{}, + k.metadata.ResyncPeriod, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { if obj != nil { resultChan <- EventResponse{ Event: "add", - NewVal: *(obj.(*v1.Event)), - OldVal: v1.Event{}, + NewVal: *(obj.(*corev1.Event)), + OldVal: corev1.Event{}, } } else { k.logger.Warnf("Nil Object in Add handle %v", obj) @@ -125,8 +135,8 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er if obj != nil { resultChan <- EventResponse{ Event: "delete", - OldVal: *(obj.(*v1.Event)), - NewVal: v1.Event{}, + OldVal: *(obj.(*corev1.Event)), + NewVal: corev1.Event{}, } } else { k.logger.Warnf("Nil Object in Delete handle %v", obj) @@ -136,8 +146,8 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er if oldObj != nil && newObj != nil { resultChan <- EventResponse{ Event: "update", - OldVal: *(oldObj.(*v1.Event)), - NewVal: *(newObj.(*v1.Event)), + OldVal: *(oldObj.(*corev1.Event)), + NewVal: *(newObj.(*corev1.Event)), } } else { k.logger.Warnf("Nil Objects in Update handle %v %v", oldObj, newObj) @@ -159,7 +169,7 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er } }() - // Start the controller in backgound + // Start the controller in background go func() { defer k.wg.Done() controller.Run(readCtx.Done()) diff --git a/bindings/kubernetes/kubernetes_test.go b/bindings/kubernetes/kubernetes_test.go index b36059188b..02f95e899d 100644 --- a/bindings/kubernetes/kubernetes_test.go +++ b/bindings/kubernetes/kubernetes_test.go @@ -33,8 +33,8 @@ func TestParseMetadata(t *testing.T) { i := kubernetesInput{logger: logger.NewLogger("test")} i.parseMetadata(m) - assert.Equal(t, nsName, i.namespace, "The namespaces should be the same.") - assert.Equal(t, resyncPeriod, i.resyncPeriod, "The resyncPeriod should be the same.") + assert.Equal(t, nsName, i.metadata.Namespace, "The namespaces should be the same.") + assert.Equal(t, resyncPeriod, i.metadata.ResyncPeriod, "The resyncPeriod should be the same.") }) t.Run("parse metadata no namespace", func(t *testing.T) { m := bindings.Metadata{} @@ -43,18 +43,7 @@ func TestParseMetadata(t *testing.T) { i := kubernetesInput{logger: logger.NewLogger("test")} err := i.parseMetadata(m) - assert.NotNil(t, err, "Expected err to be returned.") - assert.Equal(t, "namespace is missing in metadata", err.Error(), "Error message not same.") - }) - t.Run("parse metadata invalid resync period", func(t *testing.T) { - m := bindings.Metadata{} - m.Properties = map[string]string{"namespace": nsName, "resyncPeriodInSec": "invalid"} - - i := kubernetesInput{logger: logger.NewLogger("test")} - err := i.parseMetadata(m) - - assert.Nil(t, err, "Expected err to be nil.") - assert.Equal(t, nsName, i.namespace, "The namespaces should be the same.") - assert.Equal(t, time.Second*10, i.resyncPeriod, "The resyncPeriod should be the same.") + assert.Error(t, err, "Expected err to be returned.") + assert.ErrorContains(t, err, "namespace is missing in metadata", "Error message not same.") }) } diff --git a/crypto/kubernetes/secrets/component.go b/crypto/kubernetes/secrets/component.go index 3f4222aad0..b41e99b182 100644 --- a/crypto/kubernetes/secrets/component.go +++ b/crypto/kubernetes/secrets/component.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "os" "reflect" "strings" "time" @@ -65,7 +66,11 @@ func (k *kubeSecretsCrypto) Init(_ context.Context, metadata contribCrypto.Metad } // Init Kubernetes client - k.kubeClient, err = kubeclient.GetKubeClient(k.logger) + kubeconfigPath := k.md.KubeconfigPath + if kubeconfigPath == "" { + kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args) + } + k.kubeClient, err = kubeclient.GetKubeClient(kubeconfigPath) if err != nil { return fmt.Errorf("failed to init Kubernetes client: %w", err) } diff --git a/crypto/kubernetes/secrets/metadata.go b/crypto/kubernetes/secrets/metadata.go index 4789ca0b08..36344e0eff 100644 --- a/crypto/kubernetes/secrets/metadata.go +++ b/crypto/kubernetes/secrets/metadata.go @@ -20,8 +20,12 @@ import ( type secretsMetadata struct { // Default namespace to retrieve secrets from. - // If unset, the namespace must be specified for each key, as `namespace/secretName/key` + // If unset, the namespace must be specified for each key, as `namespace/secretName/key`. DefaultNamespace string `json:"defaultNamespace" mapstructure:"defaultNamespace"` + + // Path to a kubeconfig file. + // If empty, uses the default values. + KubeconfigPath string `json:"kubeconfigPath" mapstructure:"kubeconfigPath"` } func (m *secretsMetadata) InitWithMetadata(meta contribCrypto.Metadata) error { diff --git a/internal/authentication/kubernetes/client.go b/internal/authentication/kubernetes/client.go index 1648599363..c105a4ae36 100644 --- a/internal/authentication/kubernetes/client.go +++ b/internal/authentication/kubernetes/client.go @@ -39,7 +39,7 @@ func init() { } } -func getKubeconfigPath(log logger.Logger, args []string) string { +func GetKubeconfigPath(log logger.Logger, args []string) string { // Check if the path is set via the CLI flag `--kubeconfig` // This is deprecated but kept for backwards compatibility var cliVal string @@ -73,10 +73,10 @@ func getKubeconfigPath(log logger.Logger, args []string) string { } // GetKubeClient returns a kubernetes client. -func GetKubeClient(log logger.Logger) (*kubernetes.Clientset, error) { +func GetKubeClient(kubeconfig string) (*kubernetes.Clientset, error) { conf, err := rest.InClusterConfig() if err != nil { - conf, err = clientcmd.BuildConfigFromFlags("", getKubeconfigPath(log, os.Args)) + conf, err = clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } diff --git a/internal/authentication/kubernetes/client_test.go b/internal/authentication/kubernetes/client_test.go index 456a9b75c1..63c9b454a9 100644 --- a/internal/authentication/kubernetes/client_test.go +++ b/internal/authentication/kubernetes/client_test.go @@ -53,7 +53,7 @@ func TestGetKubeconfigPath(t *testing.T) { if args == nil { args = []string{} } - if got := getKubeconfigPath(log, args); got != tt.want { + if got := GetKubeconfigPath(log, args); got != tt.want { t.Errorf("getKubeconfigPath() = %v, want %v", got, tt.want) } }) diff --git a/secretstores/kubernetes/kubernetes.go b/secretstores/kubernetes/kubernetes.go index 9ae6878b42..13f0da8653 100644 --- a/secretstores/kubernetes/kubernetes.go +++ b/secretstores/kubernetes/kubernetes.go @@ -13,13 +13,13 @@ limitations under the License. package kubernetes -//nolint:nosnakecase import ( "context" "errors" + "fmt" "os" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" kubeclient "github.com/dapr/components-contrib/internal/authentication/kubernetes" @@ -32,6 +32,7 @@ var _ secretstores.SecretStore = (*kubernetesSecretStore)(nil) type kubernetesSecretStore struct { kubeClient kubernetes.Interface + md kubernetesMetadata logger logger.Logger } @@ -42,11 +43,21 @@ func NewKubernetesSecretStore(logger logger.Logger) secretstores.SecretStore { // Init creates a Kubernetes client. func (k *kubernetesSecretStore) Init(_ context.Context, metadata secretstores.Metadata) error { - client, err := kubeclient.GetKubeClient(k.logger) + // Init metadata + err := k.md.InitWithMetadata(metadata) + if err != nil { + return fmt.Errorf("failed to load metadata: %w", err) + } + + // Init Kubernetes client + kubeconfigPath := k.md.KubeconfigPath + if kubeconfigPath == "" { + kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args) + } + k.kubeClient, err = kubeclient.GetKubeClient(kubeconfigPath) if err != nil { return err } - k.kubeClient = client return nil } @@ -61,7 +72,7 @@ func (k *kubernetesSecretStore) GetSecret(ctx context.Context, req secretstores. return resp, err } - secret, err := k.kubeClient.CoreV1().Secrets(namespace).Get(ctx, req.Name, meta_v1.GetOptions{}) //nolint:nosnakecase + secret, err := k.kubeClient.CoreV1().Secrets(namespace).Get(ctx, req.Name, metav1.GetOptions{}) if err != nil { return resp, err } @@ -83,7 +94,7 @@ func (k *kubernetesSecretStore) BulkGetSecret(ctx context.Context, req secretsto return resp, err } - secrets, err := k.kubeClient.CoreV1().Secrets(namespace).List(ctx, meta_v1.ListOptions{}) //nolint:nosnakecase + secrets, err := k.kubeClient.CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return resp, err } @@ -103,12 +114,15 @@ func (k *kubernetesSecretStore) getNamespaceFromMetadata(metadata map[string]str return val, nil } - val := os.Getenv("NAMESPACE") - if val != "" { + if val := os.Getenv("NAMESPACE"); val != "" { return val, nil } - return "", errors.New("namespace is missing on metadata and NAMESPACE env variable") + if k.md.DefaultNamespace != "" { + return k.md.DefaultNamespace, nil + } + + return "", errors.New("namespace is missing on metadata and NAMESPACE env variable, and no default namespace is set") } // Features returns the features available in this secret store. diff --git a/secretstores/kubernetes/kubernetes_test.go b/secretstores/kubernetes/kubernetes_test.go index 2e196dcffb..3a8c1645e6 100644 --- a/secretstores/kubernetes/kubernetes_test.go +++ b/secretstores/kubernetes/kubernetes_test.go @@ -28,16 +28,16 @@ func TestGetNamespace(t *testing.T) { namespace := "a" ns, err := store.getNamespaceFromMetadata(map[string]string{"namespace": namespace}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, namespace, ns) }) t.Run("has namespace env", func(t *testing.T) { store := kubernetesSecretStore{logger: logger.NewLogger("test")} - os.Setenv("NAMESPACE", "b") + t.Setenv("NAMESPACE", "b") ns, err := store.getNamespaceFromMetadata(map[string]string{}) - assert.Nil(t, err) + assert.NoError(t, err) assert.Equal(t, "b", ns) }) @@ -46,8 +46,21 @@ func TestGetNamespace(t *testing.T) { os.Setenv("NAMESPACE", "") _, err := store.getNamespaceFromMetadata(map[string]string{}) - assert.NotNil(t, err) - assert.Equal(t, "namespace is missing on metadata and NAMESPACE env variable", err.Error()) + assert.Error(t, err) + assert.ErrorContains(t, err, "namespace is missing") + }) + + t.Run("has default namespace", func(t *testing.T) { + store := kubernetesSecretStore{ + logger: logger.NewLogger("test"), + md: kubernetesMetadata{ + DefaultNamespace: "c", + }, + } + + ns, err := store.getNamespaceFromMetadata(map[string]string{}) + assert.NoError(t, err) + assert.Equal(t, "c", ns) }) } diff --git a/secretstores/kubernetes/metadata.go b/secretstores/kubernetes/metadata.go new file mode 100644 index 0000000000..90c1a0e96d --- /dev/null +++ b/secretstores/kubernetes/metadata.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "github.com/dapr/components-contrib/metadata" + "github.com/dapr/components-contrib/secretstores" +) + +type kubernetesMetadata struct { + // Default namespace to retrieve secrets from. + // If unset, the namespace must be specified for each key, as `namespace/secretName/key`. + DefaultNamespace string `json:"defaultNamespace" mapstructure:"defaultNamespace"` + + // Path to a kubeconfig file. + // If empty, uses the default values. + KubeconfigPath string `json:"kubeconfigPath" mapstructure:"kubeconfigPath"` +} + +func (m *kubernetesMetadata) InitWithMetadata(meta secretstores.Metadata) error { + m.reset() + + // Decode the metadata + err := metadata.DecodeMetadata(meta.Properties, &m) + if err != nil { + return err + } + + return nil +} + +// Reset the object +func (m *kubernetesMetadata) reset() { + m.DefaultNamespace = "" +}