From e83786a31471d0b602ca7ec407a9f795905c1b43 Mon Sep 17 00:00:00 2001 From: Diego Ciangottini Date: Wed, 26 Jul 2023 13:13:58 +0000 Subject: [PATCH] working version? --- main.go | 37 +++++++++++- pkg/virtualkubelet/execute.go | 87 ++++++++++++++++++++++------ pkg/virtualkubelet/virtualkubelet.go | 49 +++++++++------- 3 files changed, 129 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index b4a2b966..9f0993cf 100644 --- a/main.go +++ b/main.go @@ -23,12 +23,15 @@ import ( "io/ioutil" "os" "path" + "time" //"k8s.io/client-go/rest" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/record" "net/http" @@ -45,7 +48,6 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/node/api" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" - "k8s.io/client-go/tools/record" stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" ) @@ -183,19 +185,24 @@ func main() { } return nil }() + //<-nc.Ready() + //close(nc) eb := record.NewBroadcaster() + EventRecorder := eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(opts.NodeName, "pod-controller")}) + resync, err := time.ParseDuration("30s") + podInformerFactory := informers.NewSharedInformerFactoryWithOptions( localClient, - 100, + resync, PodInformerFilter(opts.NodeName), ) scmInformerFactory := informers.NewSharedInformerFactoryWithOptions( localClient, - 100, + resync, ) podControllerConfig := node.PodControllerConfig{ @@ -208,6 +215,30 @@ func main() { ServiceInformer: scmInformerFactory.Core().V1().Services(), } + // stop signal for the informer + stopper := make(chan struct{}) + defer close(stopper) + + // start informers -> + go podInformerFactory.Start(stopper) + go scmInformerFactory.Start(stopper) + + // start to sync and call list + if !cache.WaitForCacheSync(stopper, podInformerFactory.Core().V1().Pods().Informer().HasSynced) { + log.G(ctx).Fatal(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + // // DEBUG + // lister := podInformerFactory.Core().V1().Pods().Lister().Pods("") + // pods, err := lister.List(labels.Everything()) + // if err != nil { + // fmt.Println(err) + // } + // for pod := range pods { + // fmt.Println("pods:", pods[pod].Name) + // } + pc, err := node.NewPodController(podControllerConfig) // <-- instatiates the pod controller if err != nil { log.G(ctx).Fatal(err) diff --git a/pkg/virtualkubelet/execute.go b/pkg/virtualkubelet/execute.go index 60c74a43..14de4b91 100644 --- a/pkg/virtualkubelet/execute.go +++ b/pkg/virtualkubelet/execute.go @@ -18,62 +18,86 @@ import ( var NoReq uint8 -func createRequest(pod commonIL.Request, token string) []byte { +func createRequest(pod commonIL.Request, token string) ([]byte, error) { var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN}) bodyBytes, err := json.Marshal(pod) + if err != nil { + log.L.Error(err) + return nil, err + } reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/create", reader) - if err != nil { log.L.Error(err) + return nil, err } req.Header.Add("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { log.L.Error(err) + return nil, err } returnValue, _ = ioutil.ReadAll(resp.Body) var response commonIL.PodStatus - json.Unmarshal(returnValue, &response) + err = json.Unmarshal(returnValue, &response) + if err != nil { + log.L.Error(err) + return nil, err + } - return returnValue + return returnValue, nil } -func deleteRequest(pod commonIL.Request, token string) []byte { +func deleteRequest(pod commonIL.Request, token string) ([]byte, error) { var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN}) bodyBytes, err := json.Marshal(pod) + if err != nil { + log.L.Error(err) + return nil, err + } reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodDelete, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/delete", reader) if err != nil { log.L.Error(err) + return nil, err } req.Header.Add("Authorization", "Bearer "+token) resp, err := http.DefaultClient.Do(req) if err != nil { log.L.Error(err) + return nil, err } returnValue, _ = ioutil.ReadAll(resp.Body) var response commonIL.PodStatus - json.Unmarshal(returnValue, &response) + err = json.Unmarshal(returnValue, &response) + if err != nil { + log.L.Error(err) + return nil, err + } - return returnValue + return returnValue, nil } -func statusRequest(podsList commonIL.Request, token string) []byte { +func statusRequest(podsList commonIL.Request, token string) ([]byte, error) { var returnValue []byte var response []commonIL.StatusResponse bodyBytes, err := json.Marshal(podsList) + if err != nil { + log.L.Error(err) + return nil, err + } reader := bytes.NewReader(bodyBytes) req, err := http.NewRequest(http.MethodGet, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/status", reader) if err != nil { log.L.Error(err) + return nil, err } log.L.Println(string(bodyBytes)) @@ -83,12 +107,17 @@ func statusRequest(podsList commonIL.Request, token string) []byte { resp, err := http.DefaultClient.Do(req) if err != nil { log.L.Error(err) + return nil, err } returnValue, _ = ioutil.ReadAll(resp.Body) - json.Unmarshal(returnValue, &response) + err = json.Unmarshal(returnValue, &response) + if err != nil { + log.L.Error(err) + return nil, err + } - return returnValue + return returnValue, nil } func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod, container v1.Container) error { @@ -104,33 +133,51 @@ func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, switch mode { case CREATE: //v1.Pod used only for secrets and volumes management; TO BE IMPLEMENTED - returnVal := createRequest(req, token) - log.L.Println(string(returnVal)) + returnVal, err := createRequest(req, token) + if err != nil { + log.G(ctx).Error(err) + return err + } + log.G(ctx).Info(string(returnVal)) break case DELETE: if NoReq > 0 { NoReq-- } else { - returnVal := deleteRequest(req, token) - log.L.Println(string(returnVal)) + returnVal, err := deleteRequest(req, token) + if err != nil { + log.G(ctx).Error(err) + return err + } + log.G(ctx).Info(string(returnVal)) } break } return nil } -func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token string) { +func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token string) error { if len(p.pods) == 0 { - return + return nil } var returnVal []byte var ret commonIL.StatusResponse var PodsList commonIL.Request PodsList.Pods = p.pods + log.G(ctx).Info(p.pods) - returnVal = statusRequest(PodsList, token) - json.Unmarshal(returnVal, &ret) + returnVal, err := statusRequest(PodsList, token) + if err != nil { + log.G(ctx).Error(err) + return err + } + + err = json.Unmarshal(returnVal, &ret) + if err != nil { + log.G(ctx).Error(err) + return err + } for podIndex, podStatus := range ret.PodStatus { if podStatus.PodStatus == 1 { @@ -144,10 +191,12 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin execReturn, _ := shell.Execute() if execReturn.Stderr != "" { - fmt.Println("Could not delete pod " + ret.PodName[podIndex].Name) + log.G(ctx).Error(fmt.Errorf("Could not delete pod " + ret.PodName[podIndex].Name)) + return fmt.Errorf("Could not delete pod " + ret.PodName[podIndex].Name) } } } log.L.Println(ret) + return nil } diff --git a/pkg/virtualkubelet/virtualkubelet.go b/pkg/virtualkubelet/virtualkubelet.go index 1564d124..4cf87f2c 100644 --- a/pkg/virtualkubelet/virtualkubelet.go +++ b/pkg/virtualkubelet/virtualkubelet.go @@ -256,7 +256,10 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err // run init container with remote execution enabled for _, container := range pod.Spec.InitContainers { // MUST TODO: Run init containers sequentialy and NOT all-together - RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container) + err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container) + if err != nil { + return err + } } pod.Status = v1.PodStatus{ @@ -307,24 +310,9 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err if !hasInitContainers { err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container) - - } - if err != nil { - pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ - Name: container.Name, - Image: container.Image, - Ready: false, - RestartCount: 1, - State: v1.ContainerState{ - Terminated: &v1.ContainerStateTerminated{ - Message: "Could not reach remote cluster", - StartedAt: now, - ExitCode: 130, - }, - }, - }) - pod.Status.Phase = v1.PodFailed - continue + if err != nil { + return err + } } pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{ Name: container.Name, @@ -387,10 +375,21 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er pod.Status.Reason = "KNOCProviderPodDeleted" for _, container := range pod.Spec.Containers { - RemoteExecution(p, ctx, DELETE, "", pod, container) + err = RemoteExecution(p, ctx, DELETE, "", pod, container) + if err != nil { + log.G(ctx).Error(err) + return err + } + } + for _, container := range pod.Spec.InitContainers { - RemoteExecution(p, ctx, DELETE, "", pod, container) + err = RemoteExecution(p, ctx, DELETE, "", pod, container) + if err != nil { + log.G(ctx).Error(err) + return err + } + } for idx := range pod.Status.ContainerStatuses { pod.Status.ContainerStatuses[idx].Ready = false @@ -542,6 +541,8 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) { <-t.C } + log.G(ctx).Info("statusLoop") + b, err := os.ReadFile(commonIL.InterLinkConfigInst.VKTokenFile) // just pass the file name if err != nil { fmt.Print(err) @@ -561,7 +562,11 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) { fmt.Print(err) } token = string(b) - checkPodsStatus(p, ctx, token) + err = checkPodsStatus(p, ctx, token) + if err != nil { + log.G(ctx).Error(err) + } + log.G(ctx).Info("statusLoop=end") } }