Skip to content

Commit

Permalink
fixed multiple calls for multiple containers: only one call for one pod
Browse files Browse the repository at this point in the history
  • Loading branch information
Surax98 committed Sep 27, 2023
1 parent 6f5422d commit 987755f
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 90 deletions.
3 changes: 2 additions & 1 deletion pkg/common/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func NewServiceAccount() error {

defer f.Close()

script = "SERVICE_ACCOUNT_NAME=" + InterLinkConfigInst.ServiceAccount + "\n" +
script = "#!" + InterLinkConfigInst.BashPath + "\n" +
"SERVICE_ACCOUNT_NAME=" + InterLinkConfigInst.ServiceAccount + "\n" +
"CONTEXT=$(kubectl config current-context)\n" +
"NAMESPACE=" + InterLinkConfigInst.Namespace + "\n" +
"NEW_CONTEXT=" + InterLinkConfigInst.Namespace + "\n" +
Expand Down
12 changes: 0 additions & 12 deletions pkg/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,10 @@ import (
v1 "k8s.io/api/core/v1"
)

const (
RUNNING = 0
STOP = 1
UNKNOWN = 2
)

type PodStatus struct {
PodName string `json:"name"`
PodNamespace string `json:"namespace"`
Containers []v1.ContainerStatus `json:"containers"`
PodStatus uint `json:"status"`
}

type RetrievedContainer struct {
Expand Down Expand Up @@ -86,8 +79,3 @@ type LogStruct struct {
ContainerName string `json:"ContainerName"`
Opts ContainerLogOpts `json:"Opts"`
}

type JidStruct struct {
PodName string `json:"PodName"`
JIDs []string `json:"JIDs"`
}
43 changes: 16 additions & 27 deletions pkg/interlink/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"io"
"net/http"
"time"

"github.com/containerd/containerd/log"
commonIL "github.com/intertwin-eu/interlink/pkg/common"
Expand All @@ -29,37 +28,28 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) {
var retrieved_data []commonIL.RetrievedPodData
for _, pod := range req2 {

data := []commonIL.RetrievedPodData{}
data := commonIL.RetrievedPodData{}
if commonIL.InterLinkConfigInst.ExportPodData {
data, err = getData(pod)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
return
}
log.G(Ctx).Debug(data)
//log.G(Ctx).Debug(data)

}
data = []commonIL.RetrievedPodData{}
if commonIL.InterLinkConfigInst.ExportPodData {
data, err = getData(pod)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
return
}
log.G(Ctx).Debug(data)
}

if data == nil {
data = append(data, commonIL.RetrievedPodData{Pod: *pod})
}

retrieved_data = append(retrieved_data, data...)
retrieved_data = append(retrieved_data, data)

if retrieved_data != nil {
for _, test := range retrieved_data {
for _, test2 := range test.Containers {
log.G(Ctx).Debug(test2.Name)
}
}
bodyBytes, err = json.Marshal(retrieved_data)
log.G(Ctx).Debug(string(bodyBytes))
//log.G(Ctx).Debug(string(bodyBytes))
reader := bytes.NewReader(bodyBytes)

req, err = http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Sidecarurl+":"+commonIL.InterLinkConfigInst.Sidecarport+"/create", reader)
Expand All @@ -72,14 +62,13 @@ func CreateHandler(w http.ResponseWriter, r *http.Request) {

log.G(Ctx).Info("InterLink: forwarding Create call to sidecar")
var resp *http.Response
for {
resp, err = http.DefaultClient.Do(req)
if err != nil {
log.G(Ctx).Error(err)
time.Sleep(time.Second * 5)
} else {
break
}

resp, err = http.DefaultClient.Do(req)
if err != nil {
statusCode = http.StatusInternalServerError
w.WriteHeader(statusCode)
log.G(Ctx).Error(err)
return
}

statusCode = resp.StatusCode
Expand Down
2 changes: 1 addition & 1 deletion pkg/interlink/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
log.G(Ctx).Debug("InterLink: " + string(returnValue))
var returnJson []commonIL.PodStatus
returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.STOP})
returnJson = append(returnJson, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace})

bodyBytes, err = json.Marshal(returnJson)
if err != nil {
Expand Down
41 changes: 17 additions & 24 deletions pkg/interlink/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,25 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func getData(pod *v1.Pod) ([]commonIL.RetrievedPodData, error) {
var retrieved_data []commonIL.RetrievedPodData
func getData(pod *v1.Pod) (commonIL.RetrievedPodData, error) {
var retrieved_data commonIL.RetrievedPodData
retrieved_data.Pod = *pod
for _, container := range pod.Spec.Containers {
log.G(Ctx).Info("- Retrieving Secrets and ConfigMaps for the Docker Sidecar. Container: " + container.Name)

data, err := retrieve_data(container, pod)
if err != nil {
log.G(Ctx).Error(err)
return nil, err
}

if data.Containers != nil {
data.Pod = *pod
retrieved_data = append(retrieved_data, data)
return commonIL.RetrievedPodData{}, err
}
retrieved_data.Containers = append(retrieved_data.Containers, data)
}

return retrieved_data, nil
}

func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodData, error) {
retrieved_data := commonIL.RetrievedPodData{}
func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedContainer, error) {
retrieved_data := commonIL.RetrievedContainer{}
for _, mount_var := range container.VolumeMounts {
log.G(Ctx).Debug("-- Retrieving data for mountpoint " + mount_var.Name)

Expand All @@ -50,16 +47,15 @@ func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodDa

if err != nil {
log.G(Ctx).Error(err)
return commonIL.RetrievedPodData{}, err
return commonIL.RetrievedContainer{}, err
} else {
log.G(Ctx).Debug("---- Retrieved ConfigMap " + podVolumeSpec.ConfigMap.Name)
}

if configMap != nil {
if retrieved_data.Containers == nil {
retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name})
}
retrieved_data.Containers[len(retrieved_data.Containers)-1].ConfigMaps = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].ConfigMaps, *configMap)

retrieved_data.Name = container.Name
retrieved_data.ConfigMaps = append(retrieved_data.ConfigMaps, *configMap)
}

} else if podVolumeSpec != nil && podVolumeSpec.Secret != nil {
Expand All @@ -70,24 +66,21 @@ func retrieve_data(container v1.Container, pod *v1.Pod) (commonIL.RetrievedPodDa

if err != nil {
log.G(Ctx).Error(err)
return commonIL.RetrievedPodData{}, err
return commonIL.RetrievedContainer{}, err
} else {
log.G(Ctx).Debug("---- Retrieved Secret " + svs.SecretName)
}

if secret.Data != nil {
if retrieved_data.Containers == nil {
retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name})
}
retrieved_data.Containers[len(retrieved_data.Containers)-1].Secrets = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].Secrets, *secret)
retrieved_data.Name = container.Name
retrieved_data.Secrets = append(retrieved_data.Secrets, *secret)
}

} else if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil {
edPath := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name)
if retrieved_data.Containers == nil {
retrieved_data.Containers = append(retrieved_data.Containers, commonIL.RetrievedContainer{Name: container.Name})
}
retrieved_data.Containers[len(retrieved_data.Containers)-1].EmptyDirs = append(retrieved_data.Containers[len(retrieved_data.Containers)-1].EmptyDirs, edPath)

retrieved_data.Name = container.Name
retrieved_data.EmptyDirs = append(retrieved_data.EmptyDirs, edPath)
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions pkg/sidecars/docker/Status.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
}

for i, pod := range req {
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace, PodStatus: commonIL.UNKNOWN})
resp = append(resp, commonIL.PodStatus{PodName: pod.Name, PodNamespace: pod.Namespace})
for _, container := range pod.Spec.Containers {
log.G(Ctx).Debug("- Getting status for container " + container.Name)
cmd := []string{"ps -af name=^" + container.Name + "$ --format \"{{.Status}}\""}
Expand All @@ -58,18 +58,20 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {

containerstatus := strings.Split(execReturn.Stdout, " ")

if containerstatus[0] == "" {
if execReturn.Stdout != "" {
if containerstatus[0] == "Created" {
log.G(Ctx).Info("-- Container " + container.Name + " is going ready...")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}, Ready: false})
} else if containerstatus[0] == "Up" {
log.G(Ctx).Info("-- Container " + container.Name + " is running")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, Ready: true})
} else if containerstatus[0] == "Exited" {
log.G(Ctx).Info("-- Container " + container.Name + " has been stopped")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}, Ready: false})
}
} else {
log.G(Ctx).Info("-- Container " + container.Name + " doesn't exist")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}})
} else if containerstatus[0] == "Created" {
log.G(Ctx).Info("-- Container " + container.Name + " is going ready...")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{}}})
} else if containerstatus[0] == "Up" {
log.G(Ctx).Info("-- Container " + container.Name + " is running")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}})
} else if containerstatus[0] == "Exited" {
log.G(Ctx).Info("-- Container " + container.Name + " has been stopped")
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}})
resp[i].Containers = append(resp[i].Containers, v1.ContainerStatus{Name: container.Name, State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}}, Ready: false})
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/virtualkubelet/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func createRequest(pods []*v1.Pod, token string) ([]byte, error) {
var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN})
var returnValue, _ = json.Marshal(commonIL.PodStatus{})

bodyBytes, err := json.Marshal(pods)
if err != nil {
Expand Down Expand Up @@ -54,7 +54,7 @@ func createRequest(pods []*v1.Pod, token string) ([]byte, error) {
}

func deleteRequest(pods []*v1.Pod, token string) ([]byte, error) {
returnValue, _ := json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN})
returnValue, _ := json.Marshal(commonIL.PodStatus{})

bodyBytes, err := json.Marshal(pods)
if err != nil {
Expand Down Expand Up @@ -130,7 +130,7 @@ func statusRequest(podsList []*v1.Pod, token string) ([]byte, error) {
return returnValue, nil
}

func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod, container v1.Container) error {
func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod) error {
var req []*v1.Pod
req = []*v1.Pod{pod}

Expand Down Expand Up @@ -216,8 +216,8 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin
toBeDeleted = false
updatePod = true
if pod.Status.ContainerStatuses != nil {
pod.Status.ContainerStatuses[index].State.Running = containerStatus.State.Running
pod.Status.ContainerStatuses[index].Ready = true
pod.Status.ContainerStatuses[index].State = containerStatus.State
pod.Status.ContainerStatuses[index].Ready = containerStatus.Ready
}
}
}
Expand Down
27 changes: 19 additions & 8 deletions pkg/virtualkubelet/virtualkubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err
var hasInitContainers bool = false
var state v1.ContainerState
defer span.End()
distribution := "docker://"
//distribution := "docker://"
// Add the pod's coordinates to the current span.
ctx = addAttributes(ctx, span, NamespaceKey, pod.Namespace, NameKey, pod.Name)
key, err := BuildKey(pod)
Expand All @@ -250,18 +250,23 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err
}
state = running_state

err = RemoteExecution(p, ctx, CREATE, "", pod)
if err != nil {
return err
}

// in case we have initContainers we need to stop main containers from executing for now ...
if len(pod.Spec.InitContainers) > 0 {
state = waiting_state
hasInitContainers = true
// run init container with remote execution enabled
for _, container := range pod.Spec.InitContainers {
/*for _, container := range pod.Spec.InitContainers {
// MUST TODO: Run init containers sequentialy and NOT all-together
err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container)
if err != nil {
return err
}
}
}*/

pod.Status = v1.PodStatus{
Phase: v1.PodRunning,
Expand Down Expand Up @@ -307,14 +312,14 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err
}
// deploy main containers
for _, container := range pod.Spec.Containers {
var err error
//var err error

if !hasInitContainers {
/*if !hasInitContainers {
err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container)
if err != nil {
return err
}
}
}*/
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: container.Name,
Image: container.Image,
Expand Down Expand Up @@ -375,7 +380,13 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er
pod.Status.Phase = v1.PodSucceeded
pod.Status.Reason = "KNOCProviderPodDeleted"

for _, container := range pod.Spec.Containers {
err = RemoteExecution(p, ctx, DELETE, "", pod)
if err != nil {
log.G(ctx).Error(err)
return err
}

/*for _, container := range pod.Spec.Containers {
err = RemoteExecution(p, ctx, DELETE, "", pod, container)
if err != nil {
log.G(ctx).Error(err)
Expand All @@ -391,7 +402,7 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er
return err
}
}
}*/
for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false
pod.Status.ContainerStatuses[idx].State = v1.ContainerState{
Expand Down

0 comments on commit 987755f

Please sign in to comment.