diff --git a/kustomizations/InterLinkConfig.yaml b/kustomizations/InterLinkConfig.yaml index 3a8cb916..2d01c5e2 100644 --- a/kustomizations/InterLinkConfig.yaml +++ b/kustomizations/InterLinkConfig.yaml @@ -5,6 +5,7 @@ InterlinkPort: "3000" SidecarPort: "4000" SbatchPath: "/usr/bin/sbatch" ScancelPath: "/usr/bin/scancel" +SqueuePath: "/usr/bin/squeue" CommandPrefix: "" ExportPodData: true DataRootFolder: ".knoc/" @@ -13,3 +14,4 @@ Namespace: "knoc" Tsocks: false TsocksPath: "$WORK/tsocks-1.8beta5+ds1/libtsocks.so" TsocksLoginNode: "login01" +BashPath: /bin/bash \ No newline at end of file diff --git a/main.go b/main.go index 917a669f..24ab0420 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,6 @@ import ( "context" "crypto/tls" "fmt" - "io/ioutil" "net" "os" "path" @@ -113,7 +112,7 @@ func main() { } var kubecfg *rest.Config - kubecfgFile, err := ioutil.ReadFile(os.Getenv("KUBECONFIG")) + kubecfgFile, err := os.ReadFile(os.Getenv("KUBECONFIG")) if err != nil { log.G(ctx).Error(err) log.G(ctx).Info("Trying InCluster configuration") diff --git a/pkg/common/func.go b/pkg/common/func.go index 438f0ac4..211a5aff 100644 --- a/pkg/common/func.go +++ b/pkg/common/func.go @@ -11,6 +11,7 @@ import ( "os" "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "github.com/containerd/containerd/log" @@ -20,6 +21,7 @@ import ( ) var InterLinkConfigInst InterLinkConfig +var Clientset *kubernetes.Clientset func NewInterLinkConfig() { if InterLinkConfigInst.set == false { @@ -126,7 +128,8 @@ func NewServiceAccount() error { defer f.Close() - script = "SERVICE_ACCOUNT_NAME=" + InterLinkConfigInst.ServiceAccount + "\n" + + script = "#!" + InterLinkConfigInst.BashPath + "\n" + + "SERVICE_ACCOUNT_NAME=" + InterLinkConfigInst.ServiceAccount + "\n" + "CONTEXT=$(kubectl config current-context)\n" + "NAMESPACE=" + InterLinkConfigInst.Namespace + "\n" + "NEW_CONTEXT=" + InterLinkConfigInst.Namespace + "\n" + diff --git a/pkg/common/types.go b/pkg/common/types.go index 7d72e287..3b05ece0 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -7,16 +7,10 @@ import ( v1 "k8s.io/api/core/v1" ) -const ( - RUNNING = 0 - STOP = 1 - UNKNOWN = 2 -) - type PodStatus struct { - PodName string `json:"name"` - PodNamespace string `json:"namespace"` - PodStatus uint `json:"status"` + PodName string `json:"name"` + PodNamespace string `json:"namespace"` + Containers []v1.ContainerStatus `json:"containers"` } type RetrievedContainer struct { @@ -45,6 +39,7 @@ type InterLinkConfig struct { Sidecarurl string `yaml:"SidecarURL"` Sbatchpath string `yaml:"SbatchPath"` Scancelpath string `yaml:"ScancelPath"` + Squeuepath string `yaml:"SqueuePath"` Interlinkport string `yaml:"InterlinkPort"` Sidecarport string `yaml:"SidecarPort"` Commandprefix string `yaml:"CommandPrefix"` @@ -85,8 +80,3 @@ type LogStruct struct { ContainerName string `json:"ContainerName"` Opts ContainerLogOpts `json:"Opts"` } - -type JidStruct struct { - PodName string `json:"PodName"` - JIDs []string `json:"JIDs"` -} diff --git a/pkg/interlink/create.go b/pkg/interlink/create.go index eeffaa94..0927bd4f 100644 --- a/pkg/interlink/create.go +++ b/pkg/interlink/create.go @@ -5,7 +5,6 @@ import ( "encoding/json" "io" "net/http" - "time" "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/common" @@ -29,7 +28,7 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) { var retrieved_data []commonIL.RetrievedPodData for _, pod := range req2 { - data := []commonIL.RetrievedPodData{} + data := commonIL.RetrievedPodData{} if commonIL.InterLinkConfigInst.ExportPodData { data, err = getData(pod) if err != nil { @@ -37,29 +36,15 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(statusCode) return } - log.G(Ctx).Debug(data) + //log.G(Ctx).Debug(data) } - data = []commonIL.RetrievedPodData{} - if commonIL.InterLinkConfigInst.ExportPodData { - data, err = getData(pod) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - return - } - log.G(Ctx).Debug(data) - } - if data == nil { - data = append(data, commonIL.RetrievedPodData{Pod: *pod}) - } - - retrieved_data = append(retrieved_data, data...) + retrieved_data = append(retrieved_data, data) if retrieved_data != nil { bodyBytes, err = json.Marshal(retrieved_data) - log.G(Ctx).Debug(string(bodyBytes)) + //log.G(Ctx).Debug(string(bodyBytes)) reader := bytes.NewReader(bodyBytes) req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/create", reader) @@ -72,14 +57,13 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) { log.G(Ctx).Info("InterLink: forwarding Create call to sidecar") var resp *http.Response - for { - resp, err = http.DefaultClient.Do(req) - if err != nil { - log.G(Ctx).Error(err) - time.Sleep(time.Second * 5) - } else { - break - } + + resp, err = http.DefaultClient.Do(req) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + log.G(Ctx).Error(err) + return } statusCode = resp.StatusCode diff --git a/pkg/interlink/delete.go b/pkg/interlink/delete.go index 753272ca..57862e39 100644 --- a/pkg/interlink/delete.go +++ b/pkg/interlink/delete.go @@ -52,7 +52,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { } log.G(Ctx).Debug("InterLink: " + string(returnValue)) var returnJson []commonIL.PodStatus - returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.STOP}) + returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace}) bodyBytes, err = json.Marshal(returnJson) if err != nil { diff --git a/pkg/interlink/func.go b/pkg/interlink/func.go index 17ca3200..640e2d31 100644 --- a/pkg/interlink/func.go +++ b/pkg/interlink/func.go @@ -9,28 +9,25 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func getData(pod *v1.Pod) ([]commonIL.RetrievedPodData, error) { - var retrieved_data []commonIL.RetrievedPodData +func getData(pod *v1.Pod) (commonIL.RetrievedPodData, error) { + var retrieved_data commonIL.RetrievedPodData + retrieved_data.Pod = *pod for _, container := range pod.Spec.Containers { log.G(Ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. Container: " + container.Name) data, err := retrieve_data(container, pod) if err != nil { log.G(Ctx).Error(err) - return nil, err - } - - if data.Containers != nil { - data.Pod = *pod - retrieved_data = append(retrieved_data, data) + return commonIL.RetrievedPodData{}, err } + retrieved_data.Containers = append(retrieved_data.Containers, data) } return retrieved_data, nil } -func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodData, error) { - retrieved_data := commonIL.RetrievedPodData{} +func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedContainer, error) { + retrieved_data := commonIL.RetrievedContainer{} for _, mount_var := range container.VolumeMounts { log.G(Ctx).Debug("-- Retrieving data for mountpoint " + mount_var.Name) @@ -50,16 +47,15 @@ func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodDa if err != nil { log.G(Ctx).Error(err) - return commonIL.RetrievedPodData{}, err + return commonIL.RetrievedContainer{}, err } else { log.G(Ctx).Debug("---- Retrieved ConfigMap " + podVolumeSpec.ConfigMap.Name) } if configMap != nil { - if retrieved_data.Containers == nil { - retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name}) - } - retrieved_data.Containers[len(retrieved_data.Containers)-1].ConfigMaps = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].ConfigMaps, *configMap) + + retrieved_data.Name = container.Name + retrieved_data.ConfigMaps = append(retrieved_data.ConfigMaps, *configMap) } } else if podVolumeSpec != nil && podVolumeSpec.Secret != nil { @@ -70,24 +66,21 @@ func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodDa if err != nil { log.G(Ctx).Error(err) - return commonIL.RetrievedPodData{}, err + return commonIL.RetrievedContainer{}, err } else { log.G(Ctx).Debug("---- Retrieved Secret " + svs.SecretName) } if secret.Data != nil { - if retrieved_data.Containers == nil { - retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name}) - } - retrieved_data.Containers[len(retrieved_data.Containers)-1].Secrets = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].Secrets, *secret) + retrieved_data.Name = container.Name + retrieved_data.Secrets = append(retrieved_data.Secrets, *secret) } } else if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil { edPath := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name) - if retrieved_data.Containers == nil { - retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name}) - } - retrieved_data.Containers[len(retrieved_data.Containers)-1].EmptyDirs = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].EmptyDirs, edPath) + + retrieved_data.Name = container.Name + retrieved_data.EmptyDirs = append(retrieved_data.EmptyDirs, edPath) } } } diff --git a/pkg/sidecars/docker/Delete.go b/pkg/sidecars/docker/Delete.go index 89763e8f..5f9fab7c 100644 --- a/pkg/sidecars/docker/Delete.go +++ b/pkg/sidecars/docker/Delete.go @@ -61,23 +61,25 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { continue } - cmd = []string{"rm", execReturn.Stdout} - shell = exec.ExecTask{ - Command: "docker", - Args: cmd, - Shell: true, - } - execReturn, _ = shell.Execute() - execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "") + if execReturn.Stdout != "" { + cmd = []string{"rm", execReturn.Stdout} + shell = exec.ExecTask{ + Command: "docker", + Args: cmd, + Shell: true, + } + execReturn, _ = shell.Execute() + execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "") - if execReturn.Stderr != "" { - log.G(Ctx).Error("-- Error deleting container " + container.Name) - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Some errors occurred while deleting container. Check Docker Sidecar's logs")) - return - } else { - log.G(Ctx).Info("- Deleted container " + container.Name) + if execReturn.Stderr != "" { + log.G(Ctx).Error("-- Error deleting container " + container.Name) + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Some errors occurred while deleting container. Check Docker Sidecar's logs")) + return + } else { + log.G(Ctx).Info("- Deleted container " + container.Name) + } } os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID)) diff --git a/pkg/sidecars/docker/Status.go b/pkg/sidecars/docker/Status.go index be84b76e..253adb2d 100644 --- a/pkg/sidecars/docker/Status.go +++ b/pkg/sidecars/docker/Status.go @@ -36,10 +36,11 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { return } - for _, pod := range req { + for i, pod := range req { + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace}) for _, container := range pod.Spec.Containers { log.G(Ctx).Debug("- Getting status for container " + container.Name) - cmd := []string{"ps -aqf name=" + container.Name} + cmd := []string{"ps -af name=^" + container.Name + "$ --format \"{{.Status}}\""} shell := exec.ExecTask{ Command: "docker", @@ -55,12 +56,22 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { break } - if execReturn.Stdout == "" { - log.G(Ctx).Info("-- Container " + container.Name + " is not running") - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.STOP}) + containerstatus := strings.Split(execReturn.Stdout, " ") + + if execReturn.Stdout != "" { + if containerstatus[0] == "Created" { + log.G(Ctx).Info("-- Container " + container.Name + " is going ready...") + resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false}) + } else if containerstatus[0] == "Up" { + log.G(Ctx).Info("-- Container " + container.Name + " is running") + resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, Ready: true}) + } else if containerstatus[0] == "Exited" { + log.G(Ctx).Info("-- Container " + container.Name + " has been stopped") + resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}, Ready: false}) + } } else { - log.G(Ctx).Info("-- Container " + container.Name + " is running") - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.RUNNING}) + log.G(Ctx).Info("-- Container " + container.Name + " doesn't exist") + resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}, Ready: false}) } } } diff --git a/pkg/sidecars/slurm/Create.go b/pkg/sidecars/slurm/Create.go index 001b99af..2f6ad24f 100644 --- a/pkg/sidecars/slurm/Create.go +++ b/pkg/sidecars/slurm/Create.go @@ -35,6 +35,10 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { return } + for _, test := range req { + log.G(Ctx).Debug(test.Pod.Name) + } + for _, data := range req { var metadata metav1.ObjectMeta var containers []v1.Container @@ -42,6 +46,8 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { containers = data.Pod.Spec.Containers metadata = data.Pod.ObjectMeta + var singularity_command_pod []SingularityCommand + for _, container := range containers { log.G(Ctx).Info("- Beginning script generation for container " + container.Name) commstr1 := []string{"singularity", "exec"} @@ -76,55 +82,36 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) { singularity_command = append(singularity_command, container.Command...) singularity_command = append(singularity_command, container.Args...) - path, err := produce_slurm_script(container, string(data.Pod.UID), metadata, singularity_command) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error producing Slurm script. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) - return - } - out, err := slurm_batch_submit(path) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) - return - } - err = handle_jid(container, string(data.Pod.UID), out, data.Pod) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) - err = delete_container(container, string(data.Pod.UID)) - return - } - - jid, err := os.ReadFile(commonIL.InterLinkConfigInst.DataRootFolder + string(data.Pod.UID) + "_" + container.Name + ".jid") - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Some errors occurred while creating container. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) - return - } + singularity_command_pod = append(singularity_command_pod, SingularityCommand{command: singularity_command, containerName: container.Name}) + } - flag := true - for _, JID := range JIDs { - if JID.PodName == data.Pod.Name { - flag = false - JID.JIDs = append(JID.JIDs, string(jid)) - } - } - if flag { - JIDs = append(JIDs, commonIL.JidStruct{PodName: data.Pod.Name, JIDs: []string{string(jid)}}) - } + path, err := produce_slurm_script(string(data.Pod.UID), metadata, singularity_command_pod) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error producing Slurm script. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) + return + } + out, err := slurm_batch_submit(path) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error submitting Slurm script. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) + return + } + err = handle_jid(string(data.Pod.UID), out, data.Pod) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error handling JID. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + data.Pod.Namespace + "-" + string(data.Pod.UID)) + err = delete_container(string(data.Pod.UID)) + return } } diff --git a/pkg/sidecars/slurm/Delete.go b/pkg/sidecars/slurm/Delete.go index c4687a26..e25a86a1 100644 --- a/pkg/sidecars/slurm/Delete.go +++ b/pkg/sidecars/slurm/Delete.go @@ -35,20 +35,16 @@ func StopHandler(w http.ResponseWriter, r *http.Request) { } for _, pod := range req { - containers := pod.Spec.Containers - - for _, container := range containers { - err = delete_container(container, string(pod.UID)) - if err != nil { - statusCode = http.StatusInternalServerError - w.WriteHeader(statusCode) - w.Write([]byte("Error deleting containers. Check Slurm Sidecar's logs")) - log.G(Ctx).Error(err) - return - } - if os.Getenv("SHARED_FS") != "true" { - err = os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID)) - } + err = delete_container(string(pod.UID)) + if err != nil { + statusCode = http.StatusInternalServerError + w.WriteHeader(statusCode) + w.Write([]byte("Error deleting containers. Check Slurm Sidecar's logs")) + log.G(Ctx).Error(err) + return + } + if os.Getenv("SHARED_FS") != "true" { + err = os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + pod.Namespace + "-" + string(pod.UID)) } } diff --git a/pkg/sidecars/slurm/Status.go b/pkg/sidecars/slurm/Status.go index 13215553..1fed9297 100644 --- a/pkg/sidecars/slurm/Status.go +++ b/pkg/sidecars/slurm/Status.go @@ -4,12 +4,15 @@ import ( "encoding/json" "io" "net/http" + "regexp" "strings" + "time" exec "github.com/alexellis/go-execute/pkg/v1" "github.com/containerd/containerd/log" commonIL "github.com/intertwin-eu/interlink/pkg/common" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func StatusHandler(w http.ResponseWriter, r *http.Request) { @@ -54,13 +57,11 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { } for _, pod := range req { - var flag = false - for _, JID := range JIDs { - for _, jid := range JID.JIDs { - - cmd := []string{"-c", "squeue --me | grep " + jid} + for i, jid := range JIDs { + if jid.PodUID == string(pod.UID) { + cmd := []string{"--noheader", "-a", "-j " + jid.JID} shell := exec.ExecTask{ - Command: "bash", + Command: commonIL.InterLinkConfigInst.Squeuepath, Args: cmd, Shell: true, } @@ -72,22 +73,70 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs")) log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr) return - } else if execReturn.Stdout != "" { - flag = true - log.G(Ctx).Info(execReturn.Stdout) - } else if execReturn.Stdout == "" { - removeJID(jid) + } else { + pattern := `(CD|CG|F|PD|PR|R|S|ST)` + re := regexp.MustCompile(pattern) + match := re.FindString(execReturn.Stdout) + + log.G(Ctx).Info("JID: " + jid.JID + " | Status: " + match + " | Pod: " + pod.Name) + + switch match { + case "CD": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "CG": + if jid.StartTime.IsZero() { + JIDs[i].StartTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "F": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "PD": + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "PR": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "R": + if jid.StartTime.IsZero() { + JIDs[i].StartTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "S": + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + case "ST": + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + default: + if jid.EndTime.IsZero() { + JIDs[i].EndTime = time.Now() + } + containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false} + resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}}) + } } } } - - if flag { - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.RUNNING}) - } else { - resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.STOP}) - } } + log.G(Ctx).Debug(resp) + w.WriteHeader(statusCode) if statusCode != http.StatusOK { w.Write([]byte("Some errors occurred deleting containers. Check Docker Sidecar's logs")) diff --git a/pkg/sidecars/slurm/aux.go b/pkg/sidecars/slurm/aux.go index 2a0791a4..79e8f44d 100644 --- a/pkg/sidecars/slurm/aux.go +++ b/pkg/sidecars/slurm/aux.go @@ -3,13 +3,12 @@ package slurm import ( "context" "errors" - "fmt" "os" "os/exec" "path/filepath" "regexp" - "strconv" "strings" + "time" exec2 "github.com/alexellis/go-execute/pkg/v1" "github.com/containerd/containerd/log" @@ -24,7 +23,19 @@ var prefix string var Clientset *kubernetes.Clientset var Ctx context.Context var kubecfg *rest.Config -var JIDs []commonIL.JidStruct +var JIDs []JidStruct + +type JidStruct struct { + PodUID string `json:"PodName"` + JID string `json:"JID"` + StartTime time.Time `json:"StartTime"` + EndTime time.Time `json:"EndTime"` +} + +type SingularityCommand struct { + containerName string + command []string +} func prepare_envs(container v1.Container) []string { log.G(Ctx).Info("-- Appending envs") @@ -132,9 +143,9 @@ func prepare_mounts(container v1.Container, data []commonIL.RetrievedPodData) ([ return append(mount, mount_data), nil } -func produce_slurm_script(container v1.Container, podUID string, metadata metav1.ObjectMeta, command []string) (string, error) { +func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands []SingularityCommand) (string, error) { log.G(Ctx).Info("-- Creating file for the Slurm script") - path := commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".sh" + path := commonIL.InterLinkConfigInst.DataRootFolder + podUID + ".sh" postfix := "" err := os.RemoveAll(path) @@ -143,6 +154,8 @@ func produce_slurm_script(container v1.Container, podUID string, metadata metav1 return "", err } f, err := os.Create(path) + defer f.Close() + if err != nil { log.G(Ctx).Error("Unable to create file " + path) return "", err @@ -158,7 +171,9 @@ func produce_slurm_script(container v1.Container, podUID string, metadata metav1 if mpi_flags, ok := metadata.Annotations["slurm-job.knoc.io/mpi-flags"]; ok { if mpi_flags != "true" { mpi := append([]string{"mpiexec", "-np", "$SLURM_NTASKS"}, strings.Split(mpi_flags, " ")...) - command = append(mpi, command...) + for _, singularityCommand := range commands { + singularityCommand.command = append(mpi, singularityCommand.command...) + } } } for _, slurm_flag := range sbatch_flags_from_argo { @@ -182,8 +197,8 @@ func produce_slurm_script(container v1.Container, podUID string, metadata metav1 prefix += "\nssh -4 -N -D $port " + commonIL.InterLinkConfigInst.Tsockslogin + " &" prefix += "\nSSH_PID=$!" - prefix += "\necho \"local = 10.0.0.0/255.0.0.0 \nserver = 127.0.0.1 \nserver_port = $port\" >> .tmp/" + podUID + "_" + container.Name + "_tsocks.conf" - prefix += "\nexport TSOCKS_CONF_FILE=.tmp/" + podUID + "_" + container.Name + "_tsocks.conf && export LD_PRELOAD=" + commonIL.InterLinkConfigInst.Tsockspath + prefix += "\necho \"local = 10.0.0.0/255.0.0.0 \nserver = 127.0.0.1 \nserver_port = $port\" >> .tmp/" + podUID + "_tsocks.conf" + prefix += "\nexport TSOCKS_CONF_FILE=.tmp/" + podUID + "_tsocks.conf && export LD_PRELOAD=" + commonIL.InterLinkConfigInst.Tsockspath } if commonIL.InterLinkConfigInst.Commandprefix != "" { @@ -191,7 +206,7 @@ func produce_slurm_script(container v1.Container, podUID string, metadata metav1 } sbatch_macros := "#!" + commonIL.InterLinkConfigInst.BashPath + - "\n#SBATCH --job-name=" + podUID + "_" + container.Name + + "\n#SBATCH --job-name=" + podUID + sbatch_flags_as_string + "\n. ~/.bash_profile" + //"\nmodule load singularity" + @@ -202,8 +217,17 @@ func produce_slurm_script(container v1.Container, podUID string, metadata metav1 log.G(Ctx).Debug("--- Writing file") - _, err = f.WriteString(sbatch_macros + "\n" + strings.Join(command[:], " ") + " >> " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".out 2>> " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".err \n echo $? > " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".status" + postfix) - defer f.Close() + var stringToBeWritten string + + stringToBeWritten += sbatch_macros + + for _, singularityCommand := range commands { + stringToBeWritten += "\n" + strings.Join(singularityCommand.command[:], " ") + " >> " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + singularityCommand.containerName + ".out 2>> " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + singularityCommand.containerName + ".err; echo $? > " + commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + singularityCommand.containerName + ".status &" + } + + stringToBeWritten += "\n" + postfix + + _, err = f.WriteString(stringToBeWritten) if err != nil { log.G(Ctx).Error(err) @@ -240,10 +264,10 @@ func slurm_batch_submit(path string) (string, error) { return string(execReturn.Stdout), nil } -func handle_jid(container v1.Container, podUID string, output string, pod v1.Pod) error { +func handle_jid(podUID string, output string, pod v1.Pod) error { r := regexp.MustCompile(`Submitted batch job (?P\d+)`) jid := r.FindStringSubmatch(output) - f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".jid") + f, err := os.Create(commonIL.InterLinkConfigInst.DataRootFolder + podUID + ".jid") if err != nil { log.G(Ctx).Error("Can't create jid_file") return err @@ -254,70 +278,47 @@ func handle_jid(container v1.Container, podUID string, output string, pod v1.Pod log.G(Ctx).Error(err) return err } + JIDs = append(JIDs, JidStruct{PodUID: string(pod.UID), JID: string(jid[1])}) + log.G(Ctx).Info("Job ID is: " + jid[1]) return nil } func removeJID(jidToBeRemoved string) { - for i, JID := range JIDs { - for j, jid := range JID.JIDs { - if jid == jidToBeRemoved { - if len(JID.JIDs) == 1 { - if len(JIDs) == 1 { - JIDs = nil - return - } - - if i == 0 { - JIDs = JIDs[1:] - return - } else if i == len(JIDs)-1 { - JIDs = JIDs[:j] - return - } else { - JIDs = append(JIDs[:i-1], JIDs[i+1:]...) - return - } - } - if j == 0 { - JID.JIDs = JID.JIDs[1:] - return - } else if j == len(JID.JIDs)-1 { - JID.JIDs = JID.JIDs[:j] - return - } else { - JID.JIDs = append(JID.JIDs[:j-1], JID.JIDs[j+1:]...) - return - } - + for i, jid := range JIDs { + if jid.JID == jidToBeRemoved { + if len(JIDs) == 1 { + JIDs = nil + } else if i == 0 { + JIDs = JIDs[1:] + } else if i == len(JIDs)-1 { + JIDs = JIDs[:i] + } else { + JIDs = append(JIDs[:i-1], JIDs[i+1:]...) + return } } } } -func delete_container(container v1.Container, podUID string) error { - log.G(Ctx).Info("- Deleting container " + container.Name) - data, err := os.ReadFile(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".jid") - if err != nil { - log.G(Ctx).Error(err) - return err - } - jid, err := strconv.Atoi(string(data)) - if err != nil { - log.G(Ctx).Error(err) - return err - } - _, err = exec.Command(commonIL.InterLinkConfigInst.Scancelpath, fmt.Sprint(jid)).Output() - if err != nil { - log.G(Ctx).Error(err) - return err - } else { - log.G(Ctx).Info("- Deleted job ", jid) +func delete_container(podUID string) error { + log.G(Ctx).Info("- Deleting Job for pod " + podUID) + for _, jid := range JIDs { + if jid.PodUID == podUID { + _, err := exec.Command(commonIL.InterLinkConfigInst.Scancelpath, jid.JID).Output() + if err != nil { + log.G(Ctx).Error(err) + return err + } else { + log.G(Ctx).Info("- Deleted Job ", jid.JID) + } + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + ".out") + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + ".err") + os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + ".status") + removeJID(jid.JID) + return nil + } } - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".out") - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".err") - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".status") - os.RemoveAll(commonIL.InterLinkConfigInst.DataRootFolder + podUID + "_" + container.Name + ".jid") - return nil + return errors.New("Unable to find a JID for the provided pod") } func mountData(container v1.Container, pod v1.Pod, data interface{}) ([]string, []string, error) { diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 0fa47532..4a39c3cc 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "io" "net/http" "os" @@ -13,13 +12,12 @@ import ( commonIL "github.com/intertwin-eu/interlink/pkg/common" - exec "github.com/alexellis/go-execute/pkg/v1" "github.com/containerd/containerd/log" v1 "k8s.io/api/core/v1" ) func createRequest(pods []*v1.Pod, token string) ([]byte, error) { - var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN}) + var returnValue, _ = json.Marshal(commonIL.PodStatus{}) bodyBytes, err := json.Marshal(pods) if err != nil { @@ -56,7 +54,7 @@ func createRequest(pods []*v1.Pod, token string) ([]byte, error) { } func deleteRequest(pods []*v1.Pod, token string) ([]byte, error) { - returnValue, _ := json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN}) + returnValue, _ := json.Marshal(commonIL.PodStatus{}) bodyBytes, err := json.Marshal(pods) if err != nil { @@ -132,7 +130,7 @@ func statusRequest(podsList []*v1.Pod, token string) ([]byte, error) { return returnValue, nil } -func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod, container v1.Container) error { +func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod) error { var req []*v1.Pod req = []*v1.Pod{pod} @@ -144,7 +142,7 @@ func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, switch mode { case CREATE: - //v1.Pod used only for secrets and volumes management; TO BE IMPLEMENTED + //pod.Spec.NodeSelector["kubernetes.io/hostname"] = "emptyNode" returnVal, err := createRequest(req, token) if err != nil { log.G(ctx).Error(err) @@ -186,19 +184,58 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin return err } - for i, podStatus := range ret { - if podStatus.PodStatus == 1 { - cmd := []string{"delete pod " + ret[i].PodName + " -n " + ret[i].PodNamespace} - shell := exec.ExecTask{ - Command: "kubectl", - Args: cmd, - Shell: true, + for _, podStatus := range ret { + toBeDeleted := true + updatePod := false + + pod, err := p.GetPod(ctx, podStatus.PodNamespace, podStatus.PodName) + log.G(ctx).Debug(pod) + if err != nil { + log.G(ctx).Error(err) + return err + } + + for _, containerStatus := range podStatus.Containers { + index := 0 + + for i, checkedContainer := range pod.Status.ContainerStatuses { + if checkedContainer.Name == containerStatus.Name { + index = i + } + } + + if containerStatus.State.Terminated != nil { + log.G(ctx).Info("Deleting Pod " + podStatus.PodName + ": Service " + containerStatus.Name + " is not running on Sidecar") + toBeDeleted = true + updatePod = false + } else if containerStatus.State.Waiting != nil { + log.G(ctx).Info("Pod " + podStatus.PodName + ": Service " + containerStatus.Name + " is setting up on Sidecar") + toBeDeleted = false + updatePod = false + } else if containerStatus.State.Running != nil { + toBeDeleted = false + updatePod = true + if pod.Status.ContainerStatuses != nil { + pod.Status.ContainerStatuses[index].State = containerStatus.State + pod.Status.ContainerStatuses[index].Ready = containerStatus.Ready + } + } + } + + if toBeDeleted { + err = p.DeletePod(ctx, pod) + + if err != nil { + log.G(ctx).Error(err) + return err } + } else if updatePod && pod.Status.Phase != v1.PodRunning { + pod.Status.Phase = v1.PodRunning + err = p.UpdatePod(ctx, pod) - execReturn, _ := shell.Execute() - if execReturn.Stderr != "" { - log.G(ctx).Error(fmt.Errorf("Could not delete pod " + ret[i].PodName)) - return fmt.Errorf("Could not delete pod " + ret[i].PodName) + if err != nil { + log.G(ctx).Error(err) + return err } } } diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 9afeb0d9..9474d099 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -230,7 +230,7 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err var hasInitContainers bool = false var state v1.ContainerState defer span.End() - distribution := "docker://" + //distribution := "docker://" // Add the pod's coordinates to the current span. ctx = addAttributes(ctx, span, NamespaceKey, pod.Namespace, NameKey, pod.Name) key, err := BuildKey(pod) @@ -250,18 +250,23 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err } state = running_state + err = RemoteExecution(p, ctx, CREATE, "", pod) + if err != nil { + return err + } + // in case we have initContainers we need to stop main containers from executing for now ... if len(pod.Spec.InitContainers) > 0 { state = waiting_state hasInitContainers = true // run init container with remote execution enabled - for _, container := range pod.Spec.InitContainers { + /*for _, container := range pod.Spec.InitContainers { // MUST TODO: Run init containers sequentialy and NOT all-together err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container) if err != nil { return err } - } + }*/ pod.Status = v1.PodStatus{ Phase: v1.PodRunning, @@ -285,7 +290,7 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err } } else { pod.Status = v1.PodStatus{ - Phase: v1.PodRunning, + Phase: v1.PodPending, HostIP: "127.0.0.1", PodIP: "127.0.0.1", StartTime: &now, @@ -307,14 +312,14 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err } // deploy main containers for _, container := range pod.Spec.Containers { - var err error + //var err error - if !hasInitContainers { + /*if !hasInitContainers { err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container) if err != nil { return err } - } + }*/ pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ Name: container.Name, Image: container.Image, @@ -375,7 +380,13 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er pod.Status.Phase = v1.PodSucceeded pod.Status.Reason = "KNOCProviderPodDeleted" - for _, container := range pod.Spec.Containers { + err = RemoteExecution(p, ctx, DELETE, "", pod) + if err != nil { + log.G(ctx).Error(err) + return err + } + + /*for _, container := range pod.Spec.Containers { err = RemoteExecution(p, ctx, DELETE, "", pod, container) if err != nil { log.G(ctx).Error(err) @@ -391,7 +402,7 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er return err } - } + }*/ for idx := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[idx].Ready = false pod.Status.ContainerStatuses[idx].State = v1.ContainerState{