Skip to content

Commit

Permalink
Merge pull request #35 from dciangot/test
Browse files Browse the repository at this point in the history
fix k8s informer startup in virtualkubelet
  • Loading branch information
dciangot authored Jul 26, 2023
2 parents ee37a2d + e83786a commit 7ac8df6
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 44 deletions.
37 changes: 34 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"io/ioutil"
"os"
"path"
"time"

//"k8s.io/client-go/rest"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"

"net/http"

Expand All @@ -45,7 +48,6 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/node/api"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)

Expand Down Expand Up @@ -183,19 +185,24 @@ func main() {
}
return nil
}()
//<-nc.Ready()
//close(nc)

eb := record.NewBroadcaster()

EventRecorder := eb.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(opts.NodeName, "pod-controller")})

resync, err := time.ParseDuration("30s")

podInformerFactory := informers.NewSharedInformerFactoryWithOptions(
localClient,
100,
resync,
PodInformerFilter(opts.NodeName),
)

scmInformerFactory := informers.NewSharedInformerFactoryWithOptions(
localClient,
100,
resync,
)

podControllerConfig := node.PodControllerConfig{
Expand All @@ -208,6 +215,30 @@ func main() {
ServiceInformer: scmInformerFactory.Core().V1().Services(),
}

// stop signal for the informer
stopper := make(chan struct{})
defer close(stopper)

// start informers ->
go podInformerFactory.Start(stopper)
go scmInformerFactory.Start(stopper)

// start to sync and call list
if !cache.WaitForCacheSync(stopper, podInformerFactory.Core().V1().Pods().Informer().HasSynced) {
log.G(ctx).Fatal(fmt.Errorf("timed out waiting for caches to sync"))
return
}

// // DEBUG
// lister := podInformerFactory.Core().V1().Pods().Lister().Pods("")
// pods, err := lister.List(labels.Everything())
// if err != nil {
// fmt.Println(err)
// }
// for pod := range pods {
// fmt.Println("pods:", pods[pod].Name)
// }

pc, err := node.NewPodController(podControllerConfig) // <-- instatiates the pod controller
if err != nil {
log.G(ctx).Fatal(err)
Expand Down
87 changes: 68 additions & 19 deletions pkg/virtualkubelet/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,62 +18,86 @@ import (

var NoReq uint8

func createRequest(pod commonIL.Request, token string) []byte {
func createRequest(pod commonIL.Request, token string) ([]byte, error) {
var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN})

bodyBytes, err := json.Marshal(pod)
if err != nil {
log.L.Error(err)
return nil, err
}
reader := bytes.NewReader(bodyBytes)
req, err := http.NewRequest(http.MethodPost, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/create", reader)

if err != nil {
log.L.Error(err)
return nil, err
}

req.Header.Add("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.L.Error(err)
return nil, err
}

returnValue, _ = ioutil.ReadAll(resp.Body)
var response commonIL.PodStatus
json.Unmarshal(returnValue, &response)
err = json.Unmarshal(returnValue, &response)
if err != nil {
log.L.Error(err)
return nil, err
}

return returnValue
return returnValue, nil
}

func deleteRequest(pod commonIL.Request, token string) []byte {
func deleteRequest(pod commonIL.Request, token string) ([]byte, error) {
var returnValue, _ = json.Marshal(commonIL.PodStatus{PodStatus: commonIL.UNKNOWN})

bodyBytes, err := json.Marshal(pod)
if err != nil {
log.L.Error(err)
return nil, err
}
reader := bytes.NewReader(bodyBytes)
req, err := http.NewRequest(http.MethodDelete, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/delete", reader)
if err != nil {
log.L.Error(err)
return nil, err
}

req.Header.Add("Authorization", "Bearer "+token)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.L.Error(err)
return nil, err
}

returnValue, _ = ioutil.ReadAll(resp.Body)
var response commonIL.PodStatus
json.Unmarshal(returnValue, &response)
err = json.Unmarshal(returnValue, &response)
if err != nil {
log.L.Error(err)
return nil, err
}

return returnValue
return returnValue, nil
}

func statusRequest(podsList commonIL.Request, token string) []byte {
func statusRequest(podsList commonIL.Request, token string) ([]byte, error) {
var returnValue []byte
var response []commonIL.StatusResponse

bodyBytes, err := json.Marshal(podsList)
if err != nil {
log.L.Error(err)
return nil, err
}
reader := bytes.NewReader(bodyBytes)
req, err := http.NewRequest(http.MethodGet, commonIL.InterLinkConfigInst.Interlinkurl+":"+commonIL.InterLinkConfigInst.Interlinkport+"/status", reader)
if err != nil {
log.L.Error(err)
return nil, err
}

log.L.Println(string(bodyBytes))
Expand All @@ -83,12 +107,17 @@ func statusRequest(podsList commonIL.Request, token string) []byte {
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.L.Error(err)
return nil, err
}

returnValue, _ = ioutil.ReadAll(resp.Body)
json.Unmarshal(returnValue, &response)
err = json.Unmarshal(returnValue, &response)
if err != nil {
log.L.Error(err)
return nil, err
}

return returnValue
return returnValue, nil
}

func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8, imageLocation string, pod *v1.Pod, container v1.Container) error {
Expand All @@ -104,33 +133,51 @@ func RemoteExecution(p *VirtualKubeletProvider, ctx context.Context, mode int8,
switch mode {
case CREATE:
//v1.Pod used only for secrets and volumes management; TO BE IMPLEMENTED
returnVal := createRequest(req, token)
log.L.Println(string(returnVal))
returnVal, err := createRequest(req, token)
if err != nil {
log.G(ctx).Error(err)
return err
}
log.G(ctx).Info(string(returnVal))
break

case DELETE:
if NoReq > 0 {
NoReq--
} else {
returnVal := deleteRequest(req, token)
log.L.Println(string(returnVal))
returnVal, err := deleteRequest(req, token)
if err != nil {
log.G(ctx).Error(err)
return err
}
log.G(ctx).Info(string(returnVal))
}
break
}
return nil
}

func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token string) {
func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token string) error {
if len(p.pods) == 0 {
return
return nil
}
var returnVal []byte
var ret commonIL.StatusResponse
var PodsList commonIL.Request
PodsList.Pods = p.pods
log.G(ctx).Info(p.pods)

returnVal = statusRequest(PodsList, token)
json.Unmarshal(returnVal, &ret)
returnVal, err := statusRequest(PodsList, token)
if err != nil {
log.G(ctx).Error(err)
return err
}

err = json.Unmarshal(returnVal, &ret)
if err != nil {
log.G(ctx).Error(err)
return err
}

for podIndex, podStatus := range ret.PodStatus {
if podStatus.PodStatus == 1 {
Expand All @@ -144,10 +191,12 @@ func checkPodsStatus(p *VirtualKubeletProvider, ctx context.Context, token strin

execReturn, _ := shell.Execute()
if execReturn.Stderr != "" {
fmt.Println("Could not delete pod " + ret.PodName[podIndex].Name)
log.G(ctx).Error(fmt.Errorf("Could not delete pod " + ret.PodName[podIndex].Name))
return fmt.Errorf("Could not delete pod " + ret.PodName[podIndex].Name)
}
}
}

log.L.Println(ret)
return nil
}
49 changes: 27 additions & 22 deletions pkg/virtualkubelet/virtualkubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,10 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err
// run init container with remote execution enabled
for _, container := range pod.Spec.InitContainers {
// MUST TODO: Run init containers sequentialy and NOT all-together
RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container)
err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container)
if err != nil {
return err
}
}

pod.Status = v1.PodStatus{
Expand Down Expand Up @@ -307,24 +310,9 @@ func (p *VirtualKubeletProvider) CreatePod(ctx context.Context, pod *v1.Pod) err

if !hasInitContainers {
err = RemoteExecution(p, ctx, CREATE, distribution+container.Image, pod, container)

}
if err != nil {
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: container.Name,
Image: container.Image,
Ready: false,
RestartCount: 1,
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Message: "Could not reach remote cluster",
StartedAt: now,
ExitCode: 130,
},
},
})
pod.Status.Phase = v1.PodFailed
continue
if err != nil {
return err
}
}
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Name: container.Name,
Expand Down Expand Up @@ -387,10 +375,21 @@ func (p *VirtualKubeletProvider) DeletePod(ctx context.Context, pod *v1.Pod) (er
pod.Status.Reason = "KNOCProviderPodDeleted"

for _, container := range pod.Spec.Containers {
RemoteExecution(p, ctx, DELETE, "", pod, container)
err = RemoteExecution(p, ctx, DELETE, "", pod, container)
if err != nil {
log.G(ctx).Error(err)
return err
}

}

for _, container := range pod.Spec.InitContainers {
RemoteExecution(p, ctx, DELETE, "", pod, container)
err = RemoteExecution(p, ctx, DELETE, "", pod, container)
if err != nil {
log.G(ctx).Error(err)
return err
}

}
for idx := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[idx].Ready = false
Expand Down Expand Up @@ -542,6 +541,8 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) {
<-t.C
}

log.G(ctx).Info("statusLoop")

b, err := os.ReadFile(commonIL.InterLinkConfigInst.VKTokenFile) // just pass the file name
if err != nil {
fmt.Print(err)
Expand All @@ -561,7 +562,11 @@ func (p *VirtualKubeletProvider) statusLoop(ctx context.Context) {
fmt.Print(err)
}
token = string(b)
checkPodsStatus(p, ctx, token)
err = checkPodsStatus(p, ctx, token)
if err != nil {
log.G(ctx).Error(err)
}
log.G(ctx).Info("statusLoop=end")
}
}

Expand Down

0 comments on commit 7ac8df6

Please sign in to comment.