From 75ab0796e1ba45e3016217f34d49ca7072f259d0 Mon Sep 17 00:00:00 2001 From: gammazero Date: Thu, 5 Oct 2023 11:11:52 -0700 Subject: [PATCH] Fix race condition resulting from concurrent WaitGroup.Add. Generally a WaitGroup should be added to before the goroutine starts. Not doing so can result in a situation where Wait is called before a starting goroutine adds. This is what happened here, causing a race condition. This PR also makes the reveiver variable consistent. --- integration/singularity/store.go | 153 ++++++++++++++++--------------- 1 file changed, 78 insertions(+), 75 deletions(-) diff --git a/integration/singularity/store.go b/integration/singularity/store.go index 9d5e61a..32a8f7e 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -56,12 +56,12 @@ func NewStore(o ...Option) (*SingularityStore, error) { }, nil } -func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) { - createSourceStorageRes, err := l.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{ +func (s *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) { + createSourceStorageRes, err := s.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{ Context: ctx, Request: &models.StorageCreateLocalStorageRequest{ - Name: l.sourceName, - Path: l.local.Dir(), + Name: s.sourceName, + Path: s.local.Dir(), }, }) if err != nil { @@ -69,12 +69,12 @@ func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPr } logger.Infow("Created source storage", "id", createSourceStorageRes.Payload.ID) - createPreparationRes, err := l.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{ + createPreparationRes, err := s.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{ Context: ctx, Request: &models.DataprepCreateRequest{ - MaxSize: &l.maxCarSize, - Name: l.preparationName, - SourceStorages: []string{l.sourceName}, + MaxSize: &s.maxCarSize, + Name: s.preparationName, + SourceStorages: []string{s.sourceName}, }, }) if err != nil { @@ -85,12 +85,12 @@ func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPr return createPreparationRes.Payload, nil } -func (l *SingularityStore) Start(ctx context.Context) error { - logger := logger.With("preparation", l.preparationName) +func (s *SingularityStore) Start(ctx context.Context) error { + logger := logger.With("preparation", s.preparationName) // List out preparations and see if one with the configured name exists - listPreparationsRes, err := l.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{ + listPreparationsRes, err := s.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{ Context: ctx, }) if err != nil { @@ -100,14 +100,14 @@ func (l *SingularityStore) Start(ctx context.Context) error { var preparation *models.ModelPreparation for _, preparationCmp := range listPreparationsRes.Payload { - if preparationCmp.Name == l.preparationName { + if preparationCmp.Name == s.preparationName { preparation = preparationCmp break } } if preparation == nil { // If no preparation was found, initialize it - _, err = l.initPreparation(ctx) + _, err = s.initPreparation(ctx) if err != nil { logger.Errorw("First-time preparation initialization failed", "err", err) return fmt.Errorf("first-time preparation initialization failed: %w", err) @@ -115,7 +115,7 @@ func (l *SingularityStore) Start(ctx context.Context) error { } // Ensure default wallet is imported to singularity - listWalletsRes, err := l.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{ + listWalletsRes, err := s.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{ Context: ctx, }) if err != nil { @@ -124,7 +124,7 @@ func (l *SingularityStore) Start(ctx context.Context) error { } var wlt *models.ModelWallet for _, existing := range listWalletsRes.Payload { - if existing.PrivateKey == l.walletKey { + if existing.PrivateKey == s.walletKey { wlt = existing logger.Infow("Wallet found on singularity", "id", existing.ID) break @@ -132,10 +132,10 @@ func (l *SingularityStore) Start(ctx context.Context) error { } if wlt == nil { logger.Info("Wallet is not found on singularity. Importing...") - importWalletRes, err := l.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{ + importWalletRes, err := s.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{ Context: ctx, Request: &models.WalletImportRequest{ - PrivateKey: l.walletKey, + PrivateKey: s.walletKey, }, }) if err != nil { @@ -147,9 +147,9 @@ func (l *SingularityStore) Start(ctx context.Context) error { } // Ensure wallet is assigned to preparation - listAttachedWalletsRes, err := l.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{ + listAttachedWalletsRes, err := s.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{ Context: ctx, - ID: l.preparationName, + ID: s.preparationName, }) if err != nil { return err @@ -164,9 +164,9 @@ func (l *SingularityStore) Start(ctx context.Context) error { } if !walletFound { logger.Info("Wallet was not found. Creating...") - if attachWalletRes, err := l.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{ + if attachWalletRes, err := s.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{ Context: ctx, - ID: l.preparationName, + ID: s.preparationName, Wallet: wlt.Address, }); err != nil { logger.Errorw("Failed to add wallet to preparation", "err", err) @@ -177,9 +177,9 @@ func (l *SingularityStore) Start(ctx context.Context) error { } // Ensure schedules are created // TODO: handle config changes for replication -- singularity currently has no modify schedule endpoint - listPreparationSchedulesRes, err := l.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{ + listPreparationSchedulesRes, err := s.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{ Context: ctx, - ID: l.preparationName, + ID: s.preparationName, }) switch { @@ -192,12 +192,12 @@ func (l *SingularityStore) Start(ctx context.Context) error { return err } - pricePerGBEpoch, _ := (new(big.Rat).SetFrac(l.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64() - pricePerGB, _ := (new(big.Rat).SetFrac(l.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64() - pricePerDeal, _ := (new(big.Rat).SetFrac(l.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64() + pricePerGBEpoch, _ := (new(big.Rat).SetFrac(s.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64() + pricePerGB, _ := (new(big.Rat).SetFrac(s.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64() + pricePerDeal, _ := (new(big.Rat).SetFrac(s.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64() - logger.Infof("Checking %v storage providers", len(l.storageProviders)) - for _, sp := range l.storageProviders { + logger.Infof("Checking %v storage providers", len(s.storageProviders)) + for _, sp := range s.storageProviders { logger.Infof("Checking storage provider %s", sp) var foundSchedule *models.ModelSchedule logger := logger.With("provider", sp) @@ -211,27 +211,27 @@ func (l *SingularityStore) Start(ctx context.Context) error { if foundSchedule != nil { // If schedule was found, update it logger.Infow("Schedule found for provider. Updating with latest settings...", "id", foundSchedule.ID) - _, err := l.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{ + _, err := s.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{ Context: ctx, ID: foundSchedule.ID, Body: &models.ScheduleUpdateRequest{ PricePerGbEpoch: pricePerGBEpoch, PricePerGb: pricePerGB, PricePerDeal: pricePerDeal, - Verified: &l.verifiedDeal, - Ipni: &l.ipniAnnounce, - KeepUnsealed: &l.keepUnsealed, - StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"), - Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"), - ScheduleCron: l.scheduleCron, - ScheduleCronPerpetual: l.scheduleCronPerpetual, - ScheduleDealNumber: int64(l.scheduleDealNumber), - TotalDealNumber: int64(l.totalDealNumber), - ScheduleDealSize: l.scheduleDealSize, - TotalDealSize: l.totalDealSize, - MaxPendingDealSize: l.maxPendingDealSize, - MaxPendingDealNumber: int64(l.maxPendingDealNumber), - URLTemplate: l.scheduleUrlTemplate, + Verified: &s.verifiedDeal, + Ipni: &s.ipniAnnounce, + KeepUnsealed: &s.keepUnsealed, + StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"), + Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"), + ScheduleCron: s.scheduleCron, + ScheduleCronPerpetual: s.scheduleCronPerpetual, + ScheduleDealNumber: int64(s.scheduleDealNumber), + TotalDealNumber: int64(s.totalDealNumber), + ScheduleDealSize: s.scheduleDealSize, + TotalDealSize: s.totalDealSize, + MaxPendingDealSize: s.maxPendingDealSize, + MaxPendingDealNumber: int64(s.maxPendingDealNumber), + URLTemplate: s.scheduleUrlTemplate, }, }) if err != nil { @@ -241,28 +241,28 @@ func (l *SingularityStore) Start(ctx context.Context) error { } else { // Otherwise, create it logger.Info("Schedule not found for provider. Creating...") - if createScheduleRes, err := l.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{ + if createScheduleRes, err := s.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{ Context: ctx, Schedule: &models.ScheduleCreateRequest{ - Preparation: l.preparationName, + Preparation: s.preparationName, Provider: sp.String(), PricePerGbEpoch: pricePerGBEpoch, PricePerGb: pricePerGB, PricePerDeal: pricePerDeal, - Verified: &l.verifiedDeal, - Ipni: &l.ipniAnnounce, - KeepUnsealed: &l.keepUnsealed, - StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"), - Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"), - ScheduleCron: l.scheduleCron, - ScheduleCronPerpetual: l.scheduleCronPerpetual, - ScheduleDealNumber: int64(l.scheduleDealNumber), - TotalDealNumber: int64(l.totalDealNumber), - ScheduleDealSize: l.scheduleDealSize, - TotalDealSize: l.totalDealSize, - MaxPendingDealSize: l.maxPendingDealSize, - MaxPendingDealNumber: int64(l.maxPendingDealNumber), - URLTemplate: l.scheduleUrlTemplate, + Verified: &s.verifiedDeal, + Ipni: &s.ipniAnnounce, + KeepUnsealed: &s.keepUnsealed, + StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"), + Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"), + ScheduleCron: s.scheduleCron, + ScheduleCronPerpetual: s.scheduleCronPerpetual, + ScheduleDealNumber: int64(s.scheduleDealNumber), + TotalDealNumber: int64(s.totalDealNumber), + ScheduleDealSize: s.scheduleDealSize, + TotalDealSize: s.totalDealSize, + MaxPendingDealSize: s.maxPendingDealSize, + MaxPendingDealNumber: int64(s.maxPendingDealNumber), + URLTemplate: s.scheduleUrlTemplate, }, }); err != nil { logger.Errorw("Failed to create schedule for provider", "err", err) @@ -272,14 +272,18 @@ func (l *SingularityStore) Start(ctx context.Context) error { } } } - go l.runPreparationJobs() - go l.runCleanupWorker() + + s.closed.Add(1) + go s.runPreparationJobs() + + s.closed.Add(1) + go s.runCleanupWorker() + return nil } -func (l *SingularityStore) runPreparationJobs() { - l.closed.Add(1) - defer l.closed.Done() +func (s *SingularityStore) runPreparationJobs() { + defer s.closed.Done() // Create a context that gets canceled when this function exits. ctx, cancel := context.WithCancel(context.Background()) @@ -287,22 +291,22 @@ func (l *SingularityStore) runPreparationJobs() { for { select { - case <-l.closing: + case <-s.closing: return - case fileID := <-l.toPack: - prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{ + case fileID := <-s.toPack: + prepareToPackSourceRes, err := s.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{ Context: ctx, ID: int64(fileID), }) if err != nil { logger.Errorw("preparing to pack file", "fileID", fileID, "error", err) } - if prepareToPackSourceRes.Payload > l.packThreshold { + if prepareToPackSourceRes.Payload > s.packThreshold { // mark outstanding pack jobs as ready to go so we can make CAR files - _, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{ + _, err := s.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{ Context: ctx, - ID: l.preparationName, - Name: l.sourceName, + ID: s.preparationName, + Name: s.sourceName, }) if err != nil { logger.Errorw("preparing to pack source", "error", err) @@ -312,12 +316,12 @@ func (l *SingularityStore) runPreparationJobs() { } } -func (l *SingularityStore) Shutdown(ctx context.Context) error { - close(l.closing) +func (s *SingularityStore) Shutdown(ctx context.Context) error { + close(s.closing) done := make(chan struct{}) go func() { - l.closed.Wait() + s.closed.Wait() close(done) }() @@ -472,7 +476,6 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc } func (s *SingularityStore) runCleanupWorker() { - s.closed.Add(1) defer s.closed.Done() // Run immediately once before starting ticker