From d8421789104e591483cdcfb61f51e8e4263ae973 Mon Sep 17 00:00:00 2001 From: Surax98 Date: Wed, 18 Oct 2023 08:55:02 +0000 Subject: [PATCH] caching slurm status and adding pre-exec commands. To be checked --- kustomizations/InterLinkConfig.yaml | 3 +- main.go | 4 +- pkg/common/func.go | 4 + pkg/common/types.go | 1 + pkg/sidecars/slurm/Create.go | 9 +- pkg/sidecars/slurm/Status.go | 195 ++++++++++++++-------------- pkg/sidecars/slurm/aux.go | 7 + pkg/virtualkubelet/execute.go | 2 +- 8 files changed, 120 insertions(+), 105 deletions(-) diff --git a/kustomizations/InterLinkConfig.yaml b/kustomizations/InterLinkConfig.yaml index 2d01c5e2..7fd84c3f 100644 --- a/kustomizations/InterLinkConfig.yaml +++ b/kustomizations/InterLinkConfig.yaml @@ -14,4 +14,5 @@ Namespace: "knoc" Tsocks: false TsocksPath: "$WORK/tsocks-1.8beta5+ds1/libtsocks.so" TsocksLoginNode: "login01" -BashPath: /bin/bash \ No newline at end of file +BashPath: /bin/bash +Pod_IP: "172.16.9.11" \ No newline at end of file diff --git a/main.go b/main.go index 24ab0420..55834f8e 100644 --- a/main.go +++ b/main.go @@ -107,7 +107,7 @@ func main() { NodeName: opts.NodeName, OperatingSystem: "Linux", // https://github.com/liqotech/liqo/blob/d8798732002abb7452c2ff1c99b3e5098f848c93/deployments/liqo/templates/liqo-gateway-deployment.yaml#L69 - InternalIP: "127.0.0.1", + InternalIP: "172.16.9.11", DaemonPort: int32(dport), } @@ -246,7 +246,7 @@ func main() { api.AttachPodRoutes(podRoutes, mux, true) - parsedIP := net.ParseIP(os.Getenv("POD_IP")) + parsedIP := net.ParseIP(commonIL.InterLinkConfigInst.PodIP) retriever := newSelfSignedCertificateRetriever(cfg.NodeName, parsedIP) server := &http.Server{ diff --git a/pkg/common/func.go b/pkg/common/func.go index 211a5aff..d058d903 100644 --- a/pkg/common/func.go +++ b/pkg/common/func.go @@ -70,6 +70,10 @@ func NewInterLinkConfig() { InterLinkConfigInst.Scancelpath = os.Getenv("SCANCELPATH") } + if os.Getenv("POD_IP") != "" { + InterLinkConfigInst.PodIP = os.Getenv("POD_IP") + } + if os.Getenv("TSOCKS") != "" { if os.Getenv("TSOCKS") != "true" && os.Getenv("TSOCKS") != "false" { fmt.Println("export TSOCKS as true or false") diff --git a/pkg/common/types.go b/pkg/common/types.go index 439ff716..e0dbc912 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -52,6 +52,7 @@ type InterLinkConfig struct { Tsocksconfig string `yaml:"TsocksConfig"` Tsockslogin string `yaml:"TsocksLoginNode"` BashPath string `yaml:"BashPath"` + PodIP string `yaml:"Pod_IP"` set bool } diff --git a/pkg/sidecars/slurm/Create.go b/pkg/sidecars/slurm/Create.go index 2f6ad24f..b15916b2 100644 --- a/pkg/sidecars/slurm/Create.go +++ b/pkg/sidecars/slurm/Create.go @@ -9,8 +9,6 @@ import ( "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/common" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func SubmitHandler(w http.ResponseWriter, r *http.Request) { @@ -40,11 +38,8 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { } for _, data := range req { - var metadata metav1.ObjectMeta - var containers []v1.Container - - containers = data.Pod.Spec.Containers - metadata = data.Pod.ObjectMeta + containers := data.Pod.Spec.Containers + metadata := data.Pod.ObjectMeta var singularity_command_pod []SingularityCommand diff --git a/pkg/sidecars/slurm/Status.go b/pkg/sidecars/slurm/Status.go index 1fed9297..9dd228b4 100644 --- a/pkg/sidecars/slurm/Status.go +++ b/pkg/sidecars/slurm/Status.go @@ -20,6 +20,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { var resp []commonIL.PodStatus statusCode := http.StatusOK log.G(Ctx).Info("Slurm Sidecar: received GetStatus call") + timeNow := time.Now() bodyBytes, err := io.ReadAll(r.Body) if err != nil { @@ -30,109 +31,116 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { return } - json.Unmarshal(bodyBytes, &req) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - return - } + if timeNow.Sub(timer) >= time.Second*10 { - cmd := []string{"--me"} - shell := exec.ExecTask{ - Command: "squeue", - Args: cmd, - Shell: true, - } - execReturn, _ := shell.Execute() - execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "") + json.Unmarshal(bodyBytes, &req) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + return + } + cmd := []string{"--me"} + shell := exec.ExecTask{ + Command: "squeue", + Args: cmd, + Shell: true, + } + execReturn, _ := shell.Execute() + execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "") - if execReturn.Stderr != "" { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs")) - log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) - return - } + if execReturn.Stderr != "" { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs")) + log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) + return + } - for _, pod := range req { - for i, jid := range JIDs { - if jid.PodUID == string(pod.UID) { - cmd := []string{"--noheader", "-a", "-j " + jid.JID} - shell := exec.ExecTask{ - Command: commonIL.InterLinkConfigInst.Squeuepath, - Args: cmd, - Shell: true, - } - execReturn, _ := shell.Execute() + for _, pod := range req { + for i, jid := range JIDs { + if jid.PodUID == string(pod.UID) { + cmd := []string{"--noheader", "-a", "-j " + jid.JID} + shell := exec.ExecTask{ + Command: commonIL.InterLinkConfigInst.Squeuepath, + Args: cmd, + Shell: true, + } + execReturn, _ := shell.Execute() - if execReturn.Stderr != "" { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs")) - log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) - return - } else { - pattern := `(CD|CG|F|PD|PR|R|S|ST)` - re := regexp.MustCompile(pattern) - match := re.FindString(execReturn.Stdout) + if execReturn.Stderr != "" { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs")) + log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) + return + } else { + pattern := `(CD|CG|F|PD|PR|R|S|ST)` + re := regexp.MustCompile(pattern) + match := re.FindString(execReturn.Stdout) - log.G(Ctx).Info("JID: " + jid.JID + " | Status: " + match + " | Pod: " + pod.Name) + log.G(Ctx).Info("JID: " + jid.JID + " | Status: " + match + " | Pod: " + pod.Name) - switch match { - case "CD": - if jid.EndTime.IsZero() { - JIDs[i].EndTime = time.Now() + switch match { + case "CD": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "CG": + if jid.StartTime.IsZero() { + JIDs[i].StartTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "F": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "PD": + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "PR": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "R": + if jid.StartTime.IsZero() { + JIDs[i].StartTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "S": + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "ST": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + default: + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "CG": - if jid.StartTime.IsZero() { - JIDs[i].StartTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "F": - if jid.EndTime.IsZero() { - JIDs[i].EndTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "PD": - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "PR": - if jid.EndTime.IsZero() { - JIDs[i].EndTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "R": - if jid.StartTime.IsZero() { - JIDs[i].StartTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "S": - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - case "ST": - if jid.EndTime.IsZero() { - JIDs[i].EndTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) - default: - if jid.EndTime.IsZero() { - JIDs[i].EndTime = time.Now() - } - containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) } } } } + cachedStatus = resp + timer = time.Now() + } else { + log.G(Ctx).Debug("Cached status") + resp = cachedStatus } log.G(Ctx).Debug(resp) @@ -141,11 +149,10 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred deleting containers. Check Docker Sidecar's logs")) } else { - bodyBytes, err = json.Marshal(resp) + bodyBytes, err := json.Marshal(resp) if err != nil { w.WriteHeader(statusCode) w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs")) - log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) log.G(Ctx).Error(err) return } diff --git a/pkg/sidecars/slurm/aux.go b/pkg/sidecars/slurm/aux.go index 79e8f44d..5a0a2f26 100644 --- a/pkg/sidecars/slurm/aux.go +++ b/pkg/sidecars/slurm/aux.go @@ -24,6 +24,8 @@ var Clientset *kubernetes.Clientset var Ctx context.Context var kubecfg *rest.Config var JIDs []JidStruct +var timer time.Time +var cachedStatus []commonIL.PodStatus type JidStruct struct { PodUID string `json:"PodName"` @@ -176,6 +178,7 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] } } } + for _, slurm_flag := range sbatch_flags_from_argo { sbatch_flags_as_string += "\n#SBATCH " + slurm_flag } @@ -205,6 +208,10 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands [] prefix += "\n" + commonIL.InterLinkConfigInst.Commandprefix } + if preExecAnnotations, ok := metadata.Annotations["job.knoc.io/pre-exec"]; ok { + prefix += "\n" + preExecAnnotations + } + sbatch_macros := "#!" + commonIL.InterLinkConfigInst.BashPath + "\n#SBATCH --job-name=" + podUID + sbatch_flags_as_string + diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 4a39c3cc..4f314d68 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -189,7 +189,7 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin updatePod := false pod, err := p.GetPod(ctx, podStatus.PodNamespace, podStatus.PodName) - log.G(ctx).Debug(pod) + //log.G(ctx).Debug(pod) if err != nil { log.G(ctx).Error(err) return err