Skip to content

Commit

Permalink
Issue open-horizon#4112 - Feature Request: Avoid rebuilding amd64_ana…
Browse files Browse the repository at this point in the history
…x_k8s tar file in agent autoupgrade code

Signed-off-by: Le Zhang <[email protected]>
  • Loading branch information
LiilyZhang committed Jul 23, 2024
1 parent 5ed9120 commit 09a1d20
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 111 deletions.
104 changes: 29 additions & 75 deletions clusterupgrade/cluster_install_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func getImageTagFromManifestFile(manifestFolder string) (string, string, error)
return "", "", fmt.Errorf("failed to get RepoTags from docker manifest file, docker manifest: %v", dockerManifestData)
}

func decompress(tarGZFilePath, targetFolder string) error {
func getAgentTarballFromGzip(tarGZFilePath, imageTarballPath string) error {
reader, err := os.Open(tarGZFilePath)
if err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to open %v, error was: %v", tarGZFilePath, err)))
Expand All @@ -421,14 +421,32 @@ func decompress(tarGZFilePath, targetFolder string) error {
}
defer uncompressStream.Close()

tarfile, err := os.Create(imageTarballPath)
if err != nil {
return err
}
defer tarfile.Close()

_, err = io.Copy(tarfile, uncompressStream)
return err
}

func extractImageManifest(tarballPath, targetFolder string) error {
reader, err := os.Open(tarballPath)
if err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to open %v, error was: %v", tarballPath, err)))
return err
}
defer reader.Close()

// create the target folder if it is not exist
if _, err := os.Stat(targetFolder); err != nil {
if err := os.MkdirAll(targetFolder, 0755); err != nil {
return err
}
}

tarReader := tar.NewReader(uncompressStream)
tarReader := tar.NewReader(reader)
for {
header, err := tarReader.Next()
switch {
Expand All @@ -444,85 +462,21 @@ func decompress(tarGZFilePath, targetFolder string) error {

target := path.Join(targetFolder, header.Name)
switch header.Typeflag {
// if its a dir and it doesn't exist create it
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
if err := os.MkdirAll(target, 0755); err != nil {
// if it's a manifest file, create it
case tar.TypeReg:
if header.Name == DOCKER_MANIFEST_FILE {
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return err
}
if _, err := io.Copy(f, tarReader); err != nil {
f.Close()
return err
}
}

// if it's a file create it
case tar.TypeReg:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return err
}
if _, err := io.Copy(f, tarReader); err != nil {
f.Close()
return err
}
f.Close()
}
}
}

func createTarballWithFileAndFolder(tarballFilePath string, src string) error {
tarfile, err := os.Create(tarballFilePath)
if err != nil {
return err
}
defer tarfile.Close()

tarfileWriter := tar.NewWriter(tarfile)
defer tarfileWriter.Close()

return addFiles(tarfileWriter, src, "")
}

func addFiles(w *tar.Writer, srcPath, base string) error {
// Open the Directory
files, err := ioutil.ReadDir(path.Join(srcPath, base))
if err != nil {
return err
}

for _, fileInfo := range files {
if !fileInfo.IsDir() {
filePath := path.Join(srcPath, base, fileInfo.Name())
headerName := path.Join(base, fileInfo.Name())
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()

// prepare the tar header
header := new(tar.Header)
header.Name = headerName
header.Size = fileInfo.Size()
header.Mode = int64(fileInfo.Mode())
header.ModTime = fileInfo.ModTime()

err = w.WriteHeader(header)
if err != nil {
return err
}

_, err = io.Copy(w, file)
if err != nil {
return err
}
} else if fileInfo.IsDir() {
// Recurse
newBase := base + fileInfo.Name() + "/"
err = addFiles(w, srcPath, newBase)
if err != nil {
return err
}
}
}
return nil
}

func imageExistInRemoteRegistry(fullTag string, versiontTag string, kc authn.Keychain) bool {
Expand Down
76 changes: 40 additions & 36 deletions clusterupgrade/cluster_upgrade_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (w *ClusterUpgradeWorker) CommandHandler(command worker.Command) bool {
switch command.(type) {
case *ClusterUpgradeCommand:
cmd := command.(*ClusterUpgradeCommand)
w.HandleClusterUpgrade(exchange.GetOrg(w.GetExchangeId()), cmd.Msg.Message.NMPStatus.AgentUpgrade.BaseWorkingDirectory, cmd.Msg.Message.NMPName)
w.HandleClusterUpgrade(exchange.GetOrg(w.GetExchangeId()), cmd.Msg.Message.NMPStatus.AgentUpgrade.BaseWorkingDirectory, cmd.Msg.Message.NMPName, cmd.Msg.Message.NMPStatus.AgentUpgrade.UpgradedVersions)
case *NodeRegisteredCommand:
w.EC = getEC(w.Config, w.db)
default:
Expand Down Expand Up @@ -361,7 +361,7 @@ func (w *ClusterUpgradeWorker) GetUncompletedAgreements() ([]string, error) {
return ag_ids, nil
}

func (w *ClusterUpgradeWorker) HandleClusterUpgrade(org string, baseWorkingDir string, nmpName string) {
func (w *ClusterUpgradeWorker) HandleClusterUpgrade(org string, baseWorkingDir string, nmpName string, agentUpgradeVersions exchangecommon.AgentUpgradeVersions) {
// nmpName: {org}/{nmpName}
// baseWorkingDir: /var/horizon/nmp/
glog.Infof(cuwlog(fmt.Sprintf("Start handling edge cluster upgrade for nmp: %v", nmpName)))
Expand Down Expand Up @@ -463,7 +463,7 @@ func (w *ClusterUpgradeWorker) HandleClusterUpgrade(org string, baseWorkingDir s
glog.Infof(cuwlog(fmt.Sprintf("exchangeURL and/or cert are validated for nmp %v", nmpName)))
}

imageVersionIsSame, newImageVersion, currentImageVersion, err := checkAgentImage(w.kubeClient, workDir)
imageVersionIsSame, newImageVersion, currentImageVersion, err := checkAgentImage(w.kubeClient, workDir, agentUpgradeVersions.SoftwareVersion)
if err != nil {
errMessage = fmt.Sprintf("Failed to compare agent image version for nmp: %v, error: %v", nmpName, err)
glog.Errorf(cuwlog(errMessage))
Expand Down Expand Up @@ -628,7 +628,6 @@ func checkAgentConfig(kubeClient *KubeClient, workDir string) (bool, map[string]

if _, ok := configInAgentFile[HZN_NAMESPACE_SCOPED_ENV_NAME]; ok {
agentIsNamespaceScope := cutil.IsNamespaceScoped()
strconv.FormatBool(agentIsNamespaceScope)
configInAgentFile[HZN_NAMESPACE_SCOPED_ENV_NAME] = strconv.FormatBool(agentIsNamespaceScope)
}

Expand Down Expand Up @@ -678,47 +677,52 @@ func compareCertContent(certInAgentFile []byte, certInK8S []byte) bool {
}

// checkAgentImage returns compare result of current image version and image version to update, image version to update, current image version, error
func checkAgentImage(kubeClient *KubeClient, workDir string) (bool, string, string, error) {
func checkAgentImage(kubeClient *KubeClient, workDir string, agentSoftwareVersionToUpgrade string) (bool, string, string, error) {
// image file is: /var/horizon/nmp/<org>/nmpID/amd64_anax_k8s.tar.gz
currentAgentVersion := version.HORIZON_VERSION
imageTarGzFilePath := path.Join(workDir, AGENT_IMAGE_TAR_GZ)
glog.Infof(cuwlog(fmt.Sprintf("Getting image tar file: %v", imageTarGzFilePath)))
if currentAgentVersion != agentSoftwareVersionToUpgrade {
imageTarGzFilePath := path.Join(workDir, AGENT_IMAGE_TAR_GZ)
glog.Infof(cuwlog(fmt.Sprintf("Getting image tar file: %v", imageTarGzFilePath)))

if _, err := os.Stat(imageTarGzFilePath); os.IsNotExist(err) {
// image tar.gz is not exist, means download worker doesn't download it (same version of image)
return true, currentAgentVersion, currentAgentVersion, nil
}
if _, err := os.Stat(imageTarGzFilePath); os.IsNotExist(err) {
// image tar.gz is not exist, means download worker doesn't download it (same version of image)
return true, currentAgentVersion, currentAgentVersion, nil
}

imageTarballPath := path.Join(workDir, AGENT_IMAGE_TAR)
decompressTargetFolder := fmt.Sprintf("./%s", AGENT_IMAGE_NAME)
imageTarballPath := path.Join(workDir, AGENT_IMAGE_TAR)

// decompress image tar.gz
if err := decompress(imageTarGzFilePath, decompressTargetFolder); err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to extract agent image tar.gz %v, error: %v", imageTarGzFilePath, err)))
return false, "", "", err
}
// get amd64_anax_k8s.tar from amd64_anax_k8s.tar.gz
if err := getAgentTarballFromGzip(imageTarGzFilePath, imageTarballPath); err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to extract agent image tarball from %v, error: %v", imageTarGzFilePath, err)))
return false, "", "", err
}

fullImageTag, imageTag, err := getImageTagFromManifestFile(decompressTargetFolder)
if err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to get image tag from manifest file in side %v, error: %v", imageTarGzFilePath, err)))
return false, "", currentAgentVersion, err
}
glog.Infof(cuwlog(fmt.Sprintf("Get image %v from tar file, extracted image tag: %v", fullImageTag, imageTag)))
decompressTargetFolder := fmt.Sprintf("./%s", AGENT_IMAGE_NAME)

// extract the docker manifest file from image tarball
if err := extractImageManifest(imageTarballPath, decompressTargetFolder); err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to extract docker manifest file from agent image tallball %v, error: %v", imageTarballPath, err)))
return false, "", "", err
}

_, imageTagInPackage, err := getImageTagFromManifestFile(decompressTargetFolder)
if err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to get image tag from manifest file in side %v, error: %v", imageTarGzFilePath, err)))
return false, "", currentAgentVersion, err
}
glog.Infof(cuwlog(fmt.Sprintf("Get image from tar file, extracted image tag: %v", imageTagInPackage)))

if imageTagInPackage != agentSoftwareVersionToUpgrade {
glog.Errorf(cuwlog(fmt.Sprintf("Image version from docker manifest file (%v) does not match the image version specified in the NMP manifest (%v). Please check the %v of %v in the CSS.", imageTagInPackage, agentSoftwareVersionToUpgrade, AGENT_IMAGE_TAR_GZ, agentSoftwareVersionToUpgrade)))
return false, "", currentAgentVersion, err
}

if currentAgentVersion != imageTag {
// push image to image registry
imageRegistry := os.Getenv("AGENT_CLUSTER_IMAGE_REGISTRY_HOST")
if imageRegistry == "" {
return false, "", "", fmt.Errorf("failed to get edge cluster image registry host from environment veriable: %v", imageRegistry)
}

// create tarball
glog.Infof(cuwlog(fmt.Sprintf("Making tarball %v", imageTarballPath)))
if err := createTarballWithFileAndFolder(imageTarballPath, decompressTargetFolder); err != nil {
glog.Errorf(cuwlog(fmt.Sprintf("Failed to create image tar at %v, error: %v", imageTarballPath, err)))
return false, "", currentAgentVersion, err
}

// $ docker load --input amd64_anax_k8s.tar.gz
// Loaded image: hyc-edge-team-staging-docker-local.artifactory.swg-devops.com/amd64_anax_k8s:2.30.0-689
glog.Infof(cuwlog(fmt.Sprintf("Loading docker image from: %v", imageTarballPath)))
Expand All @@ -740,9 +744,9 @@ func checkAgentImage(kubeClient *KubeClient, workDir string) (bool, string, stri
// - ocp: default-route-openshift-image-registry.apps.prowler.cp.fyre.ibm.com/openhorizon-agent/amd64_anax_k8s:2.30.0-689
// - k3s: 10.43.100.65:5000/openhorizon-agent/amd64_anax_k8s:2.30.0-689
// - cluster use remote ICR: <remote-host>/<agent-namespace>/amd64_anax_k8s:2.30.0-689
newImageRepoWithTag := fmt.Sprintf("%s/%s/%s:%s", imageRegistry, AGENT_NAMESPACE, AGENT_IMAGE_NAME, imageTag)
newImageRepoWithTag := fmt.Sprintf("%s/%s/%s:%s", imageRegistry, AGENT_NAMESPACE, AGENT_IMAGE_NAME, agentSoftwareVersionToUpgrade)
if usingRemoteICR {
newImageRepoWithTag = fmt.Sprintf("%s/%s:%s", imageRegistry, AGENT_IMAGE_NAME, imageTag)
newImageRepoWithTag = fmt.Sprintf("%s/%s:%s", imageRegistry, AGENT_IMAGE_NAME, agentSoftwareVersionToUpgrade)
}
glog.Infof(cuwlog(fmt.Sprintf("New image repo with tag: %v", newImageRepoWithTag)))

Expand Down Expand Up @@ -770,7 +774,7 @@ func checkAgentImage(kubeClient *KubeClient, workDir string) (bool, string, stri

// if image exists skip pushing
glog.Infof(cuwlog(fmt.Sprintf("checking if image %v exists on remote registry...", tag.String())))
if imageExistInRemoteRegistry(tag.String(), imageTag, kc) {
if imageExistInRemoteRegistry(tag.String(), agentSoftwareVersionToUpgrade, kc) {
glog.Infof(cuwlog(fmt.Sprintf("image tag %v exists on remote registry, skip pushing image", tag.String())))
skipImagePush = true
}
Expand All @@ -792,7 +796,7 @@ func checkAgentImage(kubeClient *KubeClient, workDir string) (bool, string, stri
}

}
return (currentAgentVersion == imageTag), imageTag, currentAgentVersion, nil
return (currentAgentVersion == agentSoftwareVersionToUpgrade), agentSoftwareVersionToUpgrade, currentAgentVersion, nil
}

func checkAgentImageAgainstStatusFile(workDir string) (bool, error) {
Expand Down

0 comments on commit 09a1d20

Please sign in to comment.