Skip to content

Commit

Permalink
caching slurm status and adding pre-exec commands. To be checked
Browse files Browse the repository at this point in the history
  • Loading branch information
Surax98 committed Oct 18, 2023
1 parent 9166fa7 commit d842178
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 105 deletions.
3 changes: 2 additions & 1 deletion kustomizations/InterLinkConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ Namespace: "knoc"
Tsocks: false
TsocksPath: "$WORK/tsocks-1.8beta5+ds1/libtsocks.so"
TsocksLoginNode: "login01"
BashPath: /bin/bash
BashPath: /bin/bash
Pod_IP: "172.16.9.11"
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
NodeName: opts.NodeName,
OperatingSystem: "Linux",
// https://github.com/liqotech/liqo/blob/d8798732002abb7452c2ff1c99b3e5098f848c93/deployments/liqo/templates/liqo-gateway-deployment.yaml#L69
InternalIP: "127.0.0.1",
InternalIP: "172.16.9.11",
DaemonPort: int32(dport),
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func main() {

api.AttachPodRoutes(podRoutes, mux, true)

parsedIP := net.ParseIP(os.Getenv("POD_IP"))
parsedIP := net.ParseIP(commonIL.InterLinkConfigInst.PodIP)
retriever := newSelfSignedCertificateRetriever(cfg.NodeName, parsedIP)

server := &http.Server{
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func NewInterLinkConfig() {
InterLinkConfigInst.Scancelpath = os.Getenv("SCANCELPATH")
}

if os.Getenv("POD_IP") != "" {
InterLinkConfigInst.PodIP = os.Getenv("POD_IP")
}

if os.Getenv("TSOCKS") != "" {
if os.Getenv("TSOCKS") != "true" && os.Getenv("TSOCKS") != "false" {
fmt.Println("export TSOCKS as true or false")
Expand Down
1 change: 1 addition & 0 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type InterLinkConfig struct {
Tsocksconfig string `yaml:"TsocksConfig"`
Tsockslogin string `yaml:"TsocksLoginNode"`
BashPath string `yaml:"BashPath"`
PodIP string `yaml:"Pod_IP"`
set bool
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/sidecars/slurm/Create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/containerd/containerd/log"
commonIL "github.com/intertwin-eu/interlink/pkg/common"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func SubmitHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -40,11 +38,8 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) {
}

for _, data := range req {
var metadata metav1.ObjectMeta
var containers []v1.Container

containers = data.Pod.Spec.Containers
metadata = data.Pod.ObjectMeta
containers := data.Pod.Spec.Containers
metadata := data.Pod.ObjectMeta

var singularity_command_pod []SingularityCommand

Expand Down
195 changes: 101 additions & 94 deletions pkg/sidecars/slurm/Status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
var resp []commonIL.PodStatus
statusCode := http.StatusOK
log.G(Ctx).Info("Slurm Sidecar: received GetStatus call")
timeNow := time.Now()

bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -30,109 +31,116 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
return
}

json.Unmarshal(bodyBytes, &req)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs"))
log.G(Ctx).Error(err)
return
}
if timeNow.Sub(timer) >= time.Second*10 {

cmd := []string{"--me"}
shell := exec.ExecTask{
Command: "squeue",
Args: cmd,
Shell: true,
}
execReturn, _ := shell.Execute()
execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "")
json.Unmarshal(bodyBytes, &req)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs"))
log.G(Ctx).Error(err)
return
}
cmd := []string{"--me"}
shell := exec.ExecTask{
Command: "squeue",
Args: cmd,
Shell: true,
}
execReturn, _ := shell.Execute()
execReturn.Stdout = strings.ReplaceAll(execReturn.Stdout, "\n", "")

if execReturn.Stderr != "" {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs"))
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
return
}
if execReturn.Stderr != "" {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs"))
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
return
}

for _, pod := range req {
for i, jid := range JIDs {
if jid.PodUID == string(pod.UID) {
cmd := []string{"--noheader", "-a", "-j " + jid.JID}
shell := exec.ExecTask{
Command: commonIL.InterLinkConfigInst.Squeuepath,
Args: cmd,
Shell: true,
}
execReturn, _ := shell.Execute()
for _, pod := range req {
for i, jid := range JIDs {
if jid.PodUID == string(pod.UID) {
cmd := []string{"--noheader", "-a", "-j " + jid.JID}
shell := exec.ExecTask{
Command: commonIL.InterLinkConfigInst.Squeuepath,
Args: cmd,
Shell: true,
}
execReturn, _ := shell.Execute()

if execReturn.Stderr != "" {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs"))
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
return
} else {
pattern := `(CD|CG|F|PD|PR|R|S|ST)`
re := regexp.MustCompile(pattern)
match := re.FindString(execReturn.Stdout)
if execReturn.Stderr != "" {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
w.Write([]byte("Error executing Squeue. Check Slurm Sidecar's logs"))
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
return
} else {
pattern := `(CD|CG|F|PD|PR|R|S|ST)`
re := regexp.MustCompile(pattern)
match := re.FindString(execReturn.Stdout)

log.G(Ctx).Info("JID: " + jid.JID + " | Status: " + match + " | Pod: " + pod.Name)
log.G(Ctx).Info("JID: " + jid.JID + " | Status: " + match + " | Pod: " + pod.Name)

switch match {
case "CD":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
switch match {
case "CD":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "CG":
if jid.StartTime.IsZero() {
JIDs[i].StartTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "F":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "PD":
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "PR":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "R":
if jid.StartTime.IsZero() {
JIDs[i].StartTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "S":
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "ST":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
default:
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "CG":
if jid.StartTime.IsZero() {
JIDs[i].StartTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "F":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "PD":
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "PR":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "R":
if jid.StartTime.IsZero() {
JIDs[i].StartTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.Time{JIDs[i].StartTime}}}, Ready: true}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "S":
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
case "ST":
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
default:
if jid.EndTime.IsZero() {
JIDs[i].EndTime = time.Now()
}
containerStatus := v1.ContainerStatus{Name: pod.Spec.Containers[0].Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{StartedAt: metav1.Time{JIDs[i].StartTime}, FinishedAt: metav1.Time{JIDs[i].EndTime}}}, Ready: false}
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, Containers: []v1.ContainerStatus{containerStatus}})
}
}
}
}
cachedStatus = resp
timer = time.Now()
} else {
log.G(Ctx).Debug("Cached status")
resp = cachedStatus
}

log.G(Ctx).Debug(resp)
Expand All @@ -141,11 +149,10 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
if statusCode != http.StatusOK {
w.Write([]byte("Some errors occurred deleting containers. Check Docker Sidecar's logs"))
} else {
bodyBytes, err = json.Marshal(resp)
bodyBytes, err := json.Marshal(resp)
if err != nil {
w.WriteHeader(statusCode)
w.Write([]byte("Some errors occurred while retrieving container status. Check Slurm Sidecar's logs"))
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
log.G(Ctx).Error(err)
return
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sidecars/slurm/aux.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var Clientset *kubernetes.Clientset
var Ctx context.Context
var kubecfg *rest.Config
var JIDs []JidStruct
var timer time.Time
var cachedStatus []commonIL.PodStatus

type JidStruct struct {
PodUID string `json:"PodName"`
Expand Down Expand Up @@ -176,6 +178,7 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands []
}
}
}

for _, slurm_flag := range sbatch_flags_from_argo {
sbatch_flags_as_string += "\n#SBATCH " + slurm_flag
}
Expand Down Expand Up @@ -205,6 +208,10 @@ func produce_slurm_script(podUID string, metadata metav1.ObjectMeta, commands []
prefix += "\n" + commonIL.InterLinkConfigInst.Commandprefix
}

if preExecAnnotations, ok := metadata.Annotations["job.knoc.io/pre-exec"]; ok {
prefix += "\n" + preExecAnnotations
}

sbatch_macros := "#!" + commonIL.InterLinkConfigInst.BashPath +
"\n#SBATCH --job-name=" + podUID +
sbatch_flags_as_string +
Expand Down
2 changes: 1 addition & 1 deletion pkg/virtualkubelet/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin
updatePod := false

pod, err := p.GetPod(ctx, podStatus.PodNamespace, podStatus.PodName)
log.G(ctx).Debug(pod)
//log.G(ctx).Debug(pod)
if err != nil {
log.G(ctx).Error(err)
return err
Expand Down

0 comments on commit d842178

Please sign in to comment.