Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HMS-4888: run both AWS and Pulp repo stores #2721

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/models/commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ const (
RepoStatusBuilding = "BUILDING"
// RepoStatusError is for when a Repo is on a error state
RepoStatusError = "ERROR"
// RepoStatusPending is for when the repo process is starting
RepoStatusPending = "PENDING"
// RepoStatusSkipped is for when a Repo is available to the user
RepoStatusSkipped = "SKIPPED"
// RepoStatusSuccess is for when a Repo is available to the user
RepoStatusSuccess = "SUCCESS"
)
Expand Down Expand Up @@ -54,8 +58,10 @@ type Commit struct {
// Repo is the delivery mechanism of a Commit over HTTP
type Repo struct {
Model
URL string `json:"RepoURL"`
Status string `json:"RepoStatus"`
URL string `json:"RepoURL"`
Status string `json:"RepoStatus"`
PulpURL string `json:"pulp_repo_url"`
PulpStatus string `json:"pulp_repo_status"`
}

// Package represents the packages a Commit can have
Expand Down
93 changes: 81 additions & 12 deletions pkg/services/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,11 +857,55 @@ func (s *ImageService) processImage(ctx context.Context, id uint, loopDelay time
return nil
}

func (s *ImageService) runPulpRepoProcess(ctx context.Context, repo *models.Repo) (*models.Repo, error) {
repo.PulpStatus = models.RepoStatusBuilding
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo, err := s.RepoBuilder.StoreRepo(ctx, repo)
if err != nil {
s.log.WithField("error", err.Error()).Error("Error importing ostree repo into Pulp")
repo.PulpStatus = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

return repo, nil
}

func (s *ImageService) runAWSRepoProcess(repo *models.Repo) (*models.Repo, error) {
repo.Status = models.RepoStatusBuilding
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo, err := s.RepoBuilder.ImportRepo(repo)
if err != nil {
repo.Status = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

return repo, nil
}

// CreateRepoForImage creates the OSTree repo to host that image
func (s *ImageService) CreateRepoForImage(ctx context.Context, img *models.Image) (*models.Repo, error) {
s.log.Info("Creating OSTree repo for image")
repo := &models.Repo{
Status: models.RepoStatusBuilding,
Status: models.RepoStatusPending,
PulpStatus: models.RepoStatusPending,
}
tx := db.DB.Create(repo)
if tx.Error != nil {
Expand All @@ -879,23 +923,48 @@ func (s *ImageService) CreateRepoForImage(ctx context.Context, img *models.Image
}
s.log.Debug("OSTree repo was saved to commit")

var repository *models.Repo
var err error
if feature.PulpIntegration.IsEnabled() {
s.log.Debug("Running Pulp repo process")
repository, err = s.RepoBuilder.StoreRepo(ctx, repo)
// Pulp repo process needs to run if AWS repo process is disabled (flag set true)
if feature.PulpIntegration.IsEnabled() || feature.PulpIntegrationDisableAWSRepoStore.IsEnabled() {
s.log.Info("Running Pulp repo process")
repo, err = s.runPulpRepoProcess(ctx, repo)
if err != nil {
return repo, err
}
} else {
s.log.Debug("Running AWS repo process")
repository, err = s.RepoBuilder.ImportRepo(repo)
repo.PulpStatus = models.RepoStatusSkipped
}
if err != nil {
return nil, err

// set feature flag true to only test Pulp (flag not created in Unleash and defaults to false)
if !feature.PulpIntegrationDisableAWSRepoStore.IsEnabled() {
s.log.Info("Running AWS repo process")
repo, err = s.runAWSRepoProcess(repo)
if err != nil {
return repo, err
}
} else {
repo.Status = models.RepoStatusSkipped
result := db.DB.Save(&repo)
if result.Error != nil {
s.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}
}

parsedURL, _ := url.Parse(repository.URL)
s.log.WithField("url", parsedURL.Redacted()).Info("OSTree repo is ready")
parsedURL, _ := url.Parse(repo.URL)
parsedPulpURL, _ := url.Parse(repo.PulpURL)
s.log.WithFields(log.Fields{
"aws_url": parsedURL.Redacted(),
"aws_status": repo.Status,
"pulp_url": parsedPulpURL.Redacted(),
"pulp_status": repo.PulpStatus,
}).Info("OSTree repo is ready")

if repo.Status != models.RepoStatusSuccess && repo.PulpStatus != models.RepoStatusSuccess {
return nil, goErrors.New("No repo has been created")
}

return repository, nil
return repo, nil
}

// SetErrorStatusOnImage is a helper function that sets the error status on images
Expand Down
42 changes: 38 additions & 4 deletions pkg/services/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2593,12 +2593,46 @@ var _ = Describe("Image Service Test", func() {
Expect(repo.ID).To(Equal(expectedRepo.ID))
})

It("should return error when ImportRepo fails", func() {
expecteError := errors.New("expected RepoBuilder.ImportRepo error")
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(nil, expecteError)
It("should return successfully if AWS repo status is success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusSuccess,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusError,
}
err := db.DB.Create(&expectedRepo).Error
Expect(err).ToNot(HaveOccurred())
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, nil)
_, err = service.CreateRepoForImage(context.Background(), image)
Expect(err).ToNot(HaveOccurred())
})

It("should return successfully if Pulp repo status is success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusError,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusSuccess,
}
err := db.DB.Create(&expectedRepo).Error
Expect(err).ToNot(HaveOccurred())
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, nil)
_, err = service.CreateRepoForImage(context.Background(), image)
Expect(err).ToNot(HaveOccurred())
})

It("should return error if AWS AND Pulp repo status is not success", func() {
expectedRepo := models.Repo{
URL: faker.URL(),
Status: models.RepoStatusError,
PulpURL: faker.URL(),
PulpStatus: models.RepoStatusError,
}
expectedError := errors.New("No repo has been created")
mockRepoBuilder.EXPECT().ImportRepo(gomock.AssignableToTypeOf(&models.Repo{})).Return(&expectedRepo, expectedError)
_, err := service.CreateRepoForImage(context.Background(), image)
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(expecteError))
Expect(err).To(MatchError(expectedError))
})
})

Expand Down
39 changes: 19 additions & 20 deletions pkg/services/repobuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,38 +371,33 @@ func (rb *RepoBuilder) StoreRepo(ctx context.Context, repo *models.Repo) (*model
var cmt models.Commit
cmtDB := db.DB.Where("repo_id = ?", repo.ID).First(&cmt)
if cmtDB.Error != nil {
return nil, cmtDB.Error
return repo, cmtDB.Error
}

var err error
if feature.PulpIntegration.IsEnabled() {
log.WithContext(ctx).Debug("Running Pulp repo process")

repoURL, err := repostore.PulpRepoStore(ctx, cmt.OrgID, *cmt.RepoID, cmt.ImageBuildTarURL)
if err != nil {
log.WithContext(ctx).WithField("error", err.Error()).Error("Error storing Image Builder commit in Pulp OSTree repo")
log.WithContext(ctx).Debug("Storing repo via Pulp")
repo.PulpURL, err = repostore.PulpRepoStore(ctx, cmt.OrgID, *cmt.RepoID, cmt.ImageBuildTarURL)
if err != nil {
log.WithContext(ctx).WithField("error", err.Error()).Error("Error storing Image Builder commit in Pulp OSTree repo")

return nil, err
repo.PulpStatus = models.RepoStatusError
result := db.DB.Save(&repo)
if result.Error != nil {
rb.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

repo.URL = repoURL
repo.Status = models.RepoStatusSuccess
} else {
// run the legacy AWS repo storage and return
log.WithContext(ctx).Debug("Running AWS repo process")
repo, err = rb.ImportRepo(repo)
}
if err != nil {
return nil, err
return repo, err
}

repo.PulpStatus = models.RepoStatusSuccess
result := db.DB.Save(&repo)
if result.Error != nil {
rb.log.WithField("error", result.Error.Error()).Error("Error saving repo")
return nil, fmt.Errorf("error saving status :: %s", result.Error.Error())
return repo, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

redactedURL, _ := url.Parse(repo.URL)
redactedURL, _ := url.Parse(repo.PulpURL)
log.WithContext(ctx).WithField("repo_url", redactedURL.Redacted()).Info("Commit stored in Pulp OSTree repo")

return repo, nil
Expand All @@ -416,9 +411,10 @@ func (rb *RepoBuilder) ImportRepo(r *models.Repo) (*models.Repo, error) {
if cmtDB.Error != nil {
return nil, cmtDB.Error
}

cfg := config.Get()
path := filepath.Clean(filepath.Join(cfg.RepoTempPath, strconv.FormatUint(uint64(r.ID), 10)))
rb.log.WithField("path", path).Debug("Importing repo...")
rb.log.WithField("path", path).Debug("Storing repo via AWS S3")
err := os.MkdirAll(path, os.FileMode(0755))
if err != nil {
rb.log.Error(err)
Expand Down Expand Up @@ -480,6 +476,9 @@ func (rb *RepoBuilder) ImportRepo(r *models.Repo) (*models.Repo, error) {
return nil, fmt.Errorf("error saving status :: %s", result.Error.Error())
}

redactedURL, _ := url.Parse(r.URL)
rb.log.WithField("repo_url", redactedURL.Redacted()).Info("Commit stored in AWS OSTree repo")

return r, nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/services/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,15 @@ func (s *UpdateService) CreateUpdate(id uint) (*models.UpdateTransaction, error)
// NewTemplateRemoteInfo contains the info for the ostree remote file to be written to the system
func NewTemplateRemoteInfo(update *models.UpdateTransaction) TemplateRemoteInfo {

updateURL := update.Repo.URL
if feature.PulpIntegrationUpdateViaPulp.IsEnabled() {
updateURL = update.Repo.PulpURL
}

return TemplateRemoteInfo{
RemoteURL: update.Repo.URL,
RemoteURL: updateURL,
RemoteName: "rhel-edge",
ContentURL: update.Repo.URL,
ContentURL: updateURL,
UpdateTransactionID: update.ID,
GpgVerify: config.Get().GpgVerify,
OSTreeRef: update.Commit.OSTreeRef,
Expand Down
6 changes: 6 additions & 0 deletions unleash/features/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ var SilentGormLogging = &Flag{Name: "edge-management.silent_gorm_logging", EnvVa
// PulpIntegration covers the overall integration of pulp and deprecation of AWS storage
var PulpIntegration = &Flag{Name: "edge-management.pulp_integration", EnvVar: "FEATURE_PULP_INTEGRATION"}

// PulpIntegrationDisableAWSRepoStore disables the AWS repo store process for development
var PulpIntegrationDisableAWSRepoStore = &Flag{Name: "edge-management.pulp_integration_disable_awsrepostore", EnvVar: "FEATURE_PULP_INTEGRATION_DISABLE_AWSREPOSTORE"}

// PulpIntegrationUpdateViaPulp uses the Pulp Distribution URL for image and system updates
var PulpIntegrationUpdateViaPulp = &Flag{Name: "edge-management.pulp_integration_updateviapulp", EnvVar: "FEATURE_PULP_INTEGRATION_UPDATEVIAPULP"}

// (ADD FEATURE FLAGS ABOVE)
// FEATURE FLAG CHECK CODE

Expand Down