Skip to content

Commit

Permalink
HMS-4888: run both AWS and Pulp repo stores
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan Holloway <[email protected]>
  • Loading branch information
loadtheaccumulator committed Oct 25, 2024
1 parent 824dcbe commit 2a17565
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 38 deletions.
10 changes: 8 additions & 2 deletions pkg/models/commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
RepoStatusBuilding = "BUILDING"
// RepoStatusError is for when a Repo is on a error state
RepoStatusError = "ERROR"
// RepoStatusPending is for when the repo process is starting
RepoStatusPending = "PENDING"
// RepoStatusSkipped is for when a Repo is available to the user
RepoStatusSkipped = "SKIPPED"
// RepoStatusSuccess is for when a Repo is available to the user
RepoStatusSuccess = "SUCCESS"
)
Expand Down Expand Up @@ -54,8 +58,10 @@ type Commit struct {
// Repo is the delivery mechanism of a Commit over HTTP
type Repo struct {
Model
URL string `json:"RepoURL"`
Status string `json:"RepoStatus"`
URL string `json:"RepoURL"`
Status string `json:"RepoStatus"`
PulpURL string `json:"pulp_repo_url"`
PulpStatus string `json:"pulp_repo_status"`
}

// Package represents the packages a Commit can have
Expand Down
93 changes: 81 additions & 12 deletions pkg/services/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,55 @@ func (s *ImageService) processImage(ctx context.Context, id uint, loopDelay time
return nil
}

func (s *ImageService) runPulpRepoProcess(ctx context.Context, repo *models.Repo) (*models.Repo, error) {
repo.PulpStatus = models.RepoStatusBuilding
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo, err := s.RepoBuilder.StoreRepo(ctx, repo)
if err != nil {
s.log.WithField("error", err.Error()).Error("Error importing ostree repo into Pulp")
repo.PulpStatus = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

return repo, nil
}

func (s *ImageService) runAWSRepoProcess(repo *models.Repo) (*models.Repo, error) {
repo.Status = models.RepoStatusBuilding
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo, err := s.RepoBuilder.ImportRepo(repo)
if err != nil {
repo.Status = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

return repo, nil
}

// CreateRepoForImage creates the OSTree repo to host that image
func (s *ImageService) CreateRepoForImage(ctx context.Context, img *models.Image) (*models.Repo, error) {
s.log.Info("Creating OSTree repo for image")
repo := &models.Repo{
Status: models.RepoStatusBuilding,
Status: models.RepoStatusPending,
PulpStatus: models.RepoStatusPending,
}
tx := db.DB.Create(repo)
if tx.Error != nil {
Expand All @@ -879,23 +923,48 @@ func (s *ImageService) CreateRepoForImage(ctx context.Context, img *models.Image
}
s.log.Debug("OSTree repo was saved to commit")

var repository *models.Repo
var err error
if feature.PulpIntegration.IsEnabled() {
s.log.Debug("Running Pulp repo process")
repository, err = s.RepoBuilder.StoreRepo(ctx, repo)
// Pulp repo process needs to run if AWS repo process is disabled (flag set true)
if feature.PulpIntegration.IsEnabled() || feature.PulpIntegrationDisableAWSRepoStore.IsEnabled() {
s.log.Info("Running Pulp repo process")
repo, err = s.runPulpRepoProcess(ctx, repo)
if err != nil {
return repo, err
}
} else {
s.log.Debug("Running AWS repo process")
repository, err = s.RepoBuilder.ImportRepo(repo)
repo.PulpStatus = models.RepoStatusSkipped
}
if err != nil {
return nil, err

// set feature flag true to only test Pulp (flag not created in Unleash and defaults to false)
if !feature.PulpIntegrationDisableAWSRepoStore.IsEnabled() {
s.log.Info("Running AWS repo process")
repo, err = s.runAWSRepoProcess(repo)
if err != nil {
return repo, err
}
} else {
repo.Status = models.RepoStatusSkipped
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

parsedURL, _ := url.Parse(repository.URL)
s.log.WithField("url", parsedURL.Redacted()).Info("OSTree repo is ready")
parsedURL, _ := url.Parse(repo.URL)
parsedPulpURL, _ := url.Parse(repo.PulpURL)
s.log.WithFields(log.Fields{
"aws_url": parsedURL.Redacted(),
"aws_status": repo.Status,
"pulp_url": parsedPulpURL.Redacted(),
"pulp_status": repo.PulpStatus,
}).Info("OSTree repo is ready")

if repo.Status != models.RepoStatusSuccess && repo.PulpStatus != models.RepoStatusSuccess {
return nil, goErrors.New("No repo has been created")
}

return repository, nil
return repo, nil
}

// SetErrorStatusOnImage is a helper function that sets the error status on images
Expand Down
42 changes: 38 additions & 4 deletions pkg/services/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,12 +2593,46 @@ var _ = Describe("Image Service Test", func() {
Expect(repo.ID).To(Equal(expectedRepo.ID))
})

It("should return error when ImportRepo fails", func() {
expecteError := errors.New("expected RepoBuilder.ImportRepo error")
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(nil, expecteError)
It("should return successfully if AWS repo status is success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusSuccess,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusError,
}
err := db.DB.Create(&expectedRepo).Error
Expect(err).ToNot(HaveOccurred())
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, nil)
_, err = service.CreateRepoForImage(context.Background(), image)
Expect(err).ToNot(HaveOccurred())
})

It("should return successfully if Pulp repo status is success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusError,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusSuccess,
}
err := db.DB.Create(&expectedRepo).Error
Expect(err).ToNot(HaveOccurred())
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, nil)
_, err = service.CreateRepoForImage(context.Background(), image)
Expect(err).ToNot(HaveOccurred())
})

It("should return error if AWS AND Pulp repo status is not success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusError,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusError,
}
expectedError := errors.New("No repo has been created")
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, expectedError)
_, err := service.CreateRepoForImage(context.Background(), image)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(expecteError))
Expect(err).To(MatchError(expectedError))
})
})

Expand Down
39 changes: 19 additions & 20 deletions pkg/services/repobuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,38 +371,33 @@ func (rb *RepoBuilder) StoreRepo(ctx context.Context, repo *models.Repo) (*model
var cmt models.Commit
cmtDB := db.DB.Where("repo_id = ?", repo.ID).First(&cmt)
if cmtDB.Error != nil {
return nil, cmtDB.Error
return repo, cmtDB.Error
}

var err error
if feature.PulpIntegration.IsEnabled() {
log.WithContext(ctx).Debug("Running Pulp repo process")

repoURL, err := repostore.PulpRepoStore(ctx, cmt.OrgID, *cmt.RepoID, cmt.ImageBuildTarURL)
if err != nil {
log.WithContext(ctx).WithField("error", err.Error()).Error("Error storing Image Builder commit in Pulp OSTree repo")
log.WithContext(ctx).Debug("Storing repo via Pulp")
repo.PulpURL, err = repostore.PulpRepoStore(ctx, cmt.OrgID, *cmt.RepoID, cmt.ImageBuildTarURL)
if err != nil {
log.WithContext(ctx).WithField("error", err.Error()).Error("Error storing Image Builder commit in Pulp OSTree repo")

return nil, err
repo.PulpStatus = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
rb.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo.URL = repoURL
repo.Status = models.RepoStatusSuccess
} else {
// run the legacy AWS repo storage and return
log.WithContext(ctx).Debug("Running AWS repo process")
repo, err = rb.ImportRepo(repo)
}
if err != nil {
return nil, err
return repo, err
}

repo.PulpStatus = models.RepoStatusSuccess
result := db.DB.Save(&repo)
if result.Error != nil {
rb.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return nil, fmt.Errorf("error saving status :: %s", result.Error.Error())
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

redactedURL, _ := url.Parse(repo.URL)
redactedURL, _ := url.Parse(repo.PulpURL)
log.WithContext(ctx).WithField("repo_url", redactedURL.Redacted()).Info("Commit stored in Pulp OSTree repo")

return repo, nil
Expand All @@ -416,9 +411,10 @@ func (rb *RepoBuilder) ImportRepo(r *models.Repo) (*models.Repo, error) {
if cmtDB.Error != nil {
return nil, cmtDB.Error
}

cfg := config.Get()
path := filepath.Clean(filepath.Join(cfg.RepoTempPath, strconv.FormatUint(uint64(r.ID), 10)))
rb.log.WithField("path", path).Debug("Importing repo...")
rb.log.WithField("path", path).Debug("Storing repo via AWS S3")
err := os.MkdirAll(path, os.FileMode(0755))
if err != nil {
rb.log.Error(err)
Expand Down Expand Up @@ -480,6 +476,9 @@ func (rb *RepoBuilder) ImportRepo(r *models.Repo) (*models.Repo, error) {
return nil, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

redactedURL, _ := url.Parse(r.URL)
rb.log.WithField("repo_url", redactedURL.Redacted()).Info("Commit stored in AWS OSTree repo")

return r, nil
}

Expand Down
3 changes: 3 additions & 0 deletions unleash/features/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ var SilentGormLogging = &Flag{Name: "edge-management.silent_gorm_logging", EnvVa
// PulpIntegration covers the overall integration of pulp and deprecation of AWS storage
var PulpIntegration = &Flag{Name: "edge-management.pulp_integration", EnvVar: "FEATURE_PULP_INTEGRATION"}

// PulpIntegrationDisableAWSRepoStore disables the AWS repo store process for development
var PulpIntegrationDisableAWSRepoStore = &Flag{Name: "edge-management.pulp_integration_disable_awsrepostore", EnvVar: "FEATURE_PULP_INTEGRATION_DISABLE_AWSREPOSTORE"}

// (ADD FEATURE FLAGS ABOVE)
// FEATURE FLAG CHECK CODE

Expand Down

0 comments on commit 2a17565

Please sign in to comment.