From 352e1b75da599510b0cb8387d58681fc69a3a830 Mon Sep 17 00:00:00 2001 From: rrajesh Date: Fri, 25 Oct 2024 11:47:45 +0200 Subject: [PATCH] [YUNIKORN-1977] Added E2E test for deploying pod and verifying user info with non-kube-admin user (#915) Closes: #915 Signed-off-by: Peter Bacsko --- test/e2e/framework/helpers/k8s/k8s_utils.go | 110 +++++++-- .../user_group_limit/user_group_limit_test.go | 224 +++++++++++++++++- 2 files changed, 300 insertions(+), 34 deletions(-) diff --git a/test/e2e/framework/helpers/k8s/k8s_utils.go b/test/e2e/framework/helpers/k8s/k8s_utils.go index 77a472f57..ef40becf4 100644 --- a/test/e2e/framework/helpers/k8s/k8s_utils.go +++ b/test/e2e/framework/helpers/k8s/k8s_utils.go @@ -17,20 +17,19 @@ package k8s import ( - "bytes" "context" "errors" "fmt" "net/http" "net/url" "os" - "os/exec" "path/filepath" "strings" "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -1091,14 +1090,6 @@ func (k *KubeCtl) CreateSecret(secret *v1.Secret, namespace string) (*v1.Secret, return k.clientSet.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) } -func GetSecretObj(yamlPath string) (*v1.Secret, error) { - o, err := common.Yaml2Obj(yamlPath) - if err != nil { - return nil, err - } - return o.(*v1.Secret), err -} - func (k *KubeCtl) CreateServiceAccount(accountName string, namespace string) (*v1.ServiceAccount, error) { return k.clientSet.CoreV1().ServiceAccounts(namespace).Create(context.TODO(), &v1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{Name: accountName}, @@ -1383,21 +1374,6 @@ func (k *KubeCtl) isNumJobPodsInDesiredState(jobName string, namespace string, n } } -func ApplyYamlWithKubectl(path, namespace string) error { - cmd := exec.Command("kubectl", "apply", "-f", path, "-n", namespace) - var stderr bytes.Buffer - cmd.Stderr = &stderr - // if err != nil, isn't represent yaml format error. - // it only represent the cmd.Run() fail. - err := cmd.Run() - // if yaml format error, errStr will show the detail - errStr := stderr.String() - if err != nil && errStr != "" { - return fmt.Errorf("apply fail with %s", errStr) - } - return nil -} - func (k *KubeCtl) GetNodes() (*v1.NodeList, error) { return k.clientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) } @@ -1779,3 +1755,87 @@ func (k *KubeCtl) DeleteStorageClass(scName string) error { } return nil } + +func (k *KubeCtl) GetSecrets(namespace string) (*v1.SecretList, error) { + return k.clientSet.CoreV1().Secrets(namespace).List(context.TODO(), metav1.ListOptions{}) +} + +// GetSecretValue retrieves the value for a specific key from a Kubernetes secret. +func (k *KubeCtl) GetSecretValue(namespace, secretName, key string) (string, error) { + err := k.WaitForSecret(namespace, secretName, 5*time.Second) + if err != nil { + return "", err + } + secret, err := k.GetSecret(namespace, secretName) + if err != nil { + return "", err + } + // Check if the key exists in the secret + value, ok := secret.Data[key] + if !ok { + return "", fmt.Errorf("key %s not found in secret %s", key, secretName) + } + return string(value), nil +} + +func (k *KubeCtl) GetSecret(namespace, secretName string) (*v1.Secret, error) { + secret, err := k.clientSet.CoreV1().Secrets(namespace).Get(context.TODO(), secretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return secret, nil +} + +func (k *KubeCtl) WaitForSecret(namespace, secretName string, timeout time.Duration) error { + var cond wait.ConditionFunc // nolint:gosimple + cond = func() (done bool, err error) { + secret, err := k.GetSecret(namespace, secretName) + if err != nil { + return false, err + } + if secret != nil { + return true, nil + } + return false, nil + } + return wait.PollUntilContextTimeout(context.TODO(), time.Second, timeout, false, cond.WithContext()) +} + +func WriteConfigToFile(config *rest.Config, kubeconfigPath string) error { + // Build the kubeconfig API object from the rest.Config + kubeConfig := &clientcmdapi.Config{ + Clusters: map[string]*clientcmdapi.Cluster{ + "default-cluster": { + Server: config.Host, + CertificateAuthorityData: config.CAData, + InsecureSkipTLSVerify: config.Insecure, + }, + }, + AuthInfos: map[string]*clientcmdapi.AuthInfo{ + "default-auth": { + Token: config.BearerToken, + }, + }, + Contexts: map[string]*clientcmdapi.Context{ + "default-context": { + Cluster: "default-cluster", + AuthInfo: "default-auth", + }, + }, + CurrentContext: "default-context", + } + + // Ensure the directory where the file is being written exists + err := os.MkdirAll(filepath.Dir(kubeconfigPath), os.ModePerm) + if err != nil { + return fmt.Errorf("failed to create directory for kubeconfig file: %v", err) + } + + // Write the kubeconfig to the specified file + err = clientcmd.WriteToFile(*kubeConfig, kubeconfigPath) + if err != nil { + return fmt.Errorf("failed to write kubeconfig to file: %v", err) + } + + return nil +} diff --git a/test/e2e/user_group_limit/user_group_limit_test.go b/test/e2e/user_group_limit/user_group_limit_test.go index 7eb3d016b..45ba70c13 100644 --- a/test/e2e/user_group_limit/user_group_limit_test.go +++ b/test/e2e/user_group_limit/user_group_limit_test.go @@ -19,31 +19,45 @@ package user_group_limit_test import ( + "context" "encoding/json" "fmt" "net/url" + "os" + "path/filepath" "runtime" + "strings" "time" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/webservice/dao" + amCommon "github.com/apache/yunikorn-k8shim/pkg/admission/common" amconf "github.com/apache/yunikorn-k8shim/pkg/admission/conf" "github.com/apache/yunikorn-k8shim/pkg/common/constants" + tests "github.com/apache/yunikorn-k8shim/test/e2e" + "github.com/apache/yunikorn-k8shim/test/e2e/framework/configmanager" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/common" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/k8s" "github.com/apache/yunikorn-k8shim/test/e2e/framework/helpers/yunikorn" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/client-go/kubernetes" ) type TestType int @@ -103,8 +117,11 @@ var _ = ginkgo.BeforeEach(func() { var _ = ginkgo.AfterSuite(func() { ginkgo.By("Check Yunikorn's health") checks, err := yunikorn.GetFailedHealthChecks() - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(checks).To(gomega.Equal(""), checks) + ginkgo.By("Tearing down namespace: " + dev) + err = kClient.TearDownNamespace(dev) + Ω(err).NotTo(HaveOccurred()) }) var _ = ginkgo.Describe("UserGroupLimit", func() { @@ -909,40 +926,229 @@ var _ = ginkgo.Describe("UserGroupLimit", func() { checkUsageWildcardGroups(groupTestType, group2, sandboxQueue1, []*v1.Pod{group2Sandbox1Pod1, group2Sandbox1Pod2, group2Sandbox1Pod3}) }) + ginkgo.It("Verify User info for the non kube admin user", func() { + var clientset *kubernetes.Clientset + var namespace = "default" + var serviceAccountName = "test-user-sa" + var podName = "test-pod" + var secretName = "test-user-sa-token" // #nosec G101 + + ginkgo.By("Update config") + // The wait wrapper still can't fully guarantee that the config in AdmissionController has been updated. + admissionCustomConfig = map[string]string{ + "log.core.scheduler.ugm.level": "debug", + amconf.AMAccessControlBypassAuth: constants.False, + } + yunikorn.WaitForAdmissionControllerRefreshConfAfterAction(func() { + yunikorn.UpdateCustomConfigMapWrapperWithMap(oldConfigMap, "", admissionCustomConfig, func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + err := common.AddQueue(sc, constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{ + Name: "default", + Limits: []configs.Limit{ + { + Limit: "user entry", + Users: []string{user1}, + MaxApplications: 1, + MaxResources: map[string]string{ + siCommon.Memory: fmt.Sprintf("%dM", mediumMem), + }, + }, + { + Limit: "user2 entry", + Users: []string{user2}, + MaxApplications: 2, + MaxResources: map[string]string{ + siCommon.Memory: fmt.Sprintf("%dM", largeMem), + }, + }, + }}) + if err != nil { + return err + } + return common.AddQueue(sc, constants.DefaultPartition, constants.RootQueue, configs.QueueConfig{Name: "sandbox2"}) + }) + }) + defer func() { + // cleanup + ginkgo.By("Cleaning up resources...") + err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + err = clientset.CoreV1().ServiceAccounts(namespace).Delete(context.TODO(), serviceAccountName, metav1.DeleteOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + err = kClient.DeleteClusterRole("pod-creator-role") + gomega.Ω(err).NotTo(HaveOccurred()) + err = kClient.DeleteClusterRoleBindings("pod-creator-role-binding") + gomega.Ω(err).NotTo(HaveOccurred()) + }() + // Create Service Account + ginkgo.By("Creating Service Account...") + sa, err := kClient.CreateServiceAccount(serviceAccountName, namespace) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a ClusterRole with necessary permissions + ginkgo.By("Creating ClusterRole...") + clusterRole := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-creator-role", + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"pods", "serviceaccounts", "test-user-sa"}, + Verbs: []string{"create", "get", "list", "watch", "delete"}, + }, + }, + } + _, err = kClient.CreateClusterRole(clusterRole) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a ClusterRoleBinding to bind the ClusterRole to the service account + ginkgo.By("Creating ClusterRoleBinding...") + clusterRoleBinding := &rbacv1.ClusterRoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-creator-role-binding", + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: "pod-creator-role", + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: sa.Name, + Namespace: namespace, + }, + }, + } + _, err = kClient.CreateClusterRoleBinding(clusterRoleBinding.ObjectMeta.Name, clusterRoleBinding.RoleRef.Name, clusterRoleBinding.Subjects[0].Namespace, clusterRoleBinding.Subjects[0].Name) + gomega.Ω(err).NotTo(HaveOccurred()) + // Create a Secret for the Service Account + ginkgo.By("Creating Secret for the Service Account...") + // create a object of v1.Secret + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Annotations: map[string]string{ + "kubernetes.io/service-account.name": serviceAccountName, + }, + }, + Type: v1.SecretTypeServiceAccountToken, + } + _, err = kClient.CreateSecret(secret, namespace) + gomega.Ω(err).NotTo(HaveOccurred()) + // Get the token value from the Secret + ginkgo.By("Getting the token value from the Secret...") + userTokenValue, err := kClient.GetSecretValue(namespace, secretName, "token") + gomega.Ω(err).NotTo(HaveOccurred()) + // use deep copy not to hardcode the kubeconfig + config, err := kClient.GetKubeConfig() + gomega.Ω(err).NotTo(HaveOccurred()) + config.BearerToken = userTokenValue + newConf := rest.CopyConfig(config) // copy existing config + // Use token-based authentication instead of client certificates + newConf.CAFile = "" + newConf.CertFile = "" + newConf.KeyFile = "" + newConf.BearerToken = userTokenValue + kubeconfigPath := filepath.Join(os.TempDir(), "test-user-config") + err = k8s.WriteConfigToFile(newConf, kubeconfigPath) + gomega.Ω(err).NotTo(HaveOccurred()) + config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath) + gomega.Ω(err).NotTo(HaveOccurred()) + clientset, err = kubernetes.NewForConfig(config) + gomega.Ω(err).NotTo(HaveOccurred()) + ginkgo.By("Creating Pod...") + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Annotations: map[string]string{ + "created-by": fmt.Sprintf("system:serviceaccount:%s:%s", namespace, serviceAccountName), + "user-token": userTokenValue, // Log the token in the annotation + }, + Labels: map[string]string{"applicationId": "test-app"}, + }, + Spec: v1.PodSpec{ + ServiceAccountName: serviceAccountName, + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx", + Ports: []v1.ContainerPort{{ContainerPort: 80}}, + }, + }, + }, + } + _, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + createdPod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + gomega.Ω(err).NotTo(HaveOccurred()) + ginkgo.By("Verifying User Info...") + userInfo, err := GetUserInfoFromPodAnnotation(createdPod) + gomega.Ω(err).NotTo(HaveOccurred()) + // user info should contain the substring "system:serviceaccount:default:test-user-sa" + gomega.Ω(strings.Contains(fmt.Sprintf("%v", userInfo), "system:serviceaccount:default:test-user-sa")).To(gomega.BeTrue()) + queueName2 := "root_22" + yunikorn.UpdateCustomConfigMapWrapper(oldConfigMap, "", func(sc *configs.SchedulerConfig) error { + // remove placement rules so we can control queue + sc.Partitions[0].PlacementRules = nil + var err error + if err = common.AddQueue(sc, "default", "root", configs.QueueConfig{ + Name: queueName2, + Resources: configs.Resources{Guaranteed: map[string]string{"memory": fmt.Sprintf("%dM", 200)}}, + Properties: map[string]string{"preemption.delay": "1s"}, + }); err != nil { + return err + } + return nil + }) + }) ginkgo.AfterEach(func() { tests.DumpClusterInfoIfSpecFailed(suiteName, []string{dev}) ginkgo.By("Tearing down namespace: " + dev) err := kClient.TearDownNamespace(dev) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) // reset config ginkgo.By("Restoring YuniKorn configuration") yunikorn.RestoreConfigMapWrapper(oldConfigMap) }) }) +func GetUserInfoFromPodAnnotation(pod *v1.Pod) (*si.UserGroupInformation, error) { + userInfo, ok := pod.Annotations[amCommon.UserInfoAnnotation] + if !ok { + return nil, fmt.Errorf("user info not found in pod annotation") + } + var userInfoObj si.UserGroupInformation + err := json.Unmarshal([]byte(userInfo), &userInfoObj) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal user info from pod annotation") + } + return &userInfoObj, nil +} + func deploySleepPod(usergroup *si.UserGroupInformation, queuePath string, expectedRunning bool, reason string) *v1.Pod { usergroupJsonBytes, err := json.Marshal(usergroup) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) sleepPodConfig := k8s.SleepPodConfig{NS: dev, Mem: smallMem, Labels: map[string]string{constants.LabelQueueName: queuePath}} sleepPodObj, err := k8s.InitSleepPod(sleepPodConfig) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) sleepPodObj.Annotations[amCommon.UserInfoAnnotation] = string(usergroupJsonBytes) ginkgo.By(fmt.Sprintf("%s deploys the sleep pod %s to queue %s", usergroup, sleepPodObj.Name, queuePath)) sleepPod, err := kClient.CreatePod(sleepPodObj, dev) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) if expectedRunning { ginkgo.By(fmt.Sprintf("The sleep pod %s can be scheduled %s", sleepPod.Name, reason)) err = kClient.WaitForPodRunning(dev, sleepPod.Name, 60*time.Second) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) } else { ginkgo.By(fmt.Sprintf("The sleep pod %s can't be scheduled %s", sleepPod.Name, reason)) // Since Pending is the initial state of PodPhase, sleep for 5 seconds, then check whether the pod is still in Pending state. time.Sleep(5 * time.Second) err = kClient.WaitForPodPending(sleepPod.Namespace, sleepPod.Name, 60*time.Second) - gomega.Ω(err).NotTo(gomega.HaveOccurred()) + gomega.Ω(err).NotTo(HaveOccurred()) } return sleepPod } @@ -952,14 +1158,14 @@ func checkUsage(testType TestType, name string, queuePath string, expectedRunnin if testType == userTestType { ginkgo.By(fmt.Sprintf("Check user resource usage for %s in queue %s", name, queuePath)) userUsageDAOInfo, err := restClient.GetUserUsage(constants.DefaultPartition, name) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(userUsageDAOInfo).NotTo(gomega.BeNil()) rootQueueResourceUsageDAO = userUsageDAOInfo.Queues } else if testType == groupTestType { ginkgo.By(fmt.Sprintf("Check group resource usage for %s in queue %s", name, queuePath)) groupUsageDAOInfo, err := restClient.GetGroupUsage(constants.DefaultPartition, name) - Ω(err).NotTo(gomega.HaveOccurred()) + Ω(err).NotTo(HaveOccurred()) Ω(groupUsageDAOInfo).NotTo(gomega.BeNil()) rootQueueResourceUsageDAO = groupUsageDAOInfo.Queues