Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Fix race condition resulting from concurrent WaitGroup.Add.
Browse files Browse the repository at this point in the history
Generally a WaitGroup should be added to before the goroutine starts. Not doing so can result in a situation where Wait is called before a starting goroutine adds. This is what happened here, causing a race condition.

This PR also makes the reveiver variable consistent.
  • Loading branch information
gammazero committed Oct 5, 2023
1 parent f870783 commit 75ab079
Showing 1 changed file with 78 additions and 75 deletions.
153 changes: 78 additions & 75 deletions integration/singularity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,25 @@ func NewStore(o ...Option) (*SingularityStore, error) {
}, nil
}

func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) {
createSourceStorageRes, err := l.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{
func (s *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) {
createSourceStorageRes, err := s.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{
Context: ctx,
Request: &models.StorageCreateLocalStorageRequest{
Name: l.sourceName,
Path: l.local.Dir(),
Name: s.sourceName,
Path: s.local.Dir(),
},
})
if err != nil {
return nil, fmt.Errorf("failed to create source storage: %w", err)
}
logger.Infow("Created source storage", "id", createSourceStorageRes.Payload.ID)

createPreparationRes, err := l.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{
createPreparationRes, err := s.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{
Context: ctx,
Request: &models.DataprepCreateRequest{
MaxSize: &l.maxCarSize,
Name: l.preparationName,
SourceStorages: []string{l.sourceName},
MaxSize: &s.maxCarSize,
Name: s.preparationName,
SourceStorages: []string{s.sourceName},
},
})
if err != nil {
Expand All @@ -85,12 +85,12 @@ func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPr
return createPreparationRes.Payload, nil
}

func (l *SingularityStore) Start(ctx context.Context) error {
logger := logger.With("preparation", l.preparationName)
func (s *SingularityStore) Start(ctx context.Context) error {
logger := logger.With("preparation", s.preparationName)

// List out preparations and see if one with the configured name exists

listPreparationsRes, err := l.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{
listPreparationsRes, err := s.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{
Context: ctx,
})
if err != nil {
Expand All @@ -100,22 +100,22 @@ func (l *SingularityStore) Start(ctx context.Context) error {

var preparation *models.ModelPreparation
for _, preparationCmp := range listPreparationsRes.Payload {
if preparationCmp.Name == l.preparationName {
if preparationCmp.Name == s.preparationName {
preparation = preparationCmp
break
}
}
if preparation == nil {
// If no preparation was found, initialize it
_, err = l.initPreparation(ctx)
_, err = s.initPreparation(ctx)
if err != nil {
logger.Errorw("First-time preparation initialization failed", "err", err)
return fmt.Errorf("first-time preparation initialization failed: %w", err)
}
}

// Ensure default wallet is imported to singularity
listWalletsRes, err := l.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{
listWalletsRes, err := s.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{
Context: ctx,
})
if err != nil {
Expand All @@ -124,18 +124,18 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}
var wlt *models.ModelWallet
for _, existing := range listWalletsRes.Payload {
if existing.PrivateKey == l.walletKey {
if existing.PrivateKey == s.walletKey {
wlt = existing
logger.Infow("Wallet found on singularity", "id", existing.ID)
break
}
}
if wlt == nil {
logger.Info("Wallet is not found on singularity. Importing...")
importWalletRes, err := l.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{
importWalletRes, err := s.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{
Context: ctx,
Request: &models.WalletImportRequest{
PrivateKey: l.walletKey,
PrivateKey: s.walletKey,
},
})
if err != nil {
Expand All @@ -147,9 +147,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}

// Ensure wallet is assigned to preparation
listAttachedWalletsRes, err := l.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{
listAttachedWalletsRes, err := s.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{
Context: ctx,
ID: l.preparationName,
ID: s.preparationName,
})
if err != nil {
return err
Expand All @@ -164,9 +164,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}
if !walletFound {
logger.Info("Wallet was not found. Creating...")
if attachWalletRes, err := l.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{
if attachWalletRes, err := s.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{
Context: ctx,
ID: l.preparationName,
ID: s.preparationName,
Wallet: wlt.Address,
}); err != nil {
logger.Errorw("Failed to add wallet to preparation", "err", err)
Expand All @@ -177,9 +177,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}
// Ensure schedules are created
// TODO: handle config changes for replication -- singularity currently has no modify schedule endpoint
listPreparationSchedulesRes, err := l.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{
listPreparationSchedulesRes, err := s.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{
Context: ctx,
ID: l.preparationName,
ID: s.preparationName,
})

switch {
Expand All @@ -192,12 +192,12 @@ func (l *SingularityStore) Start(ctx context.Context) error {
return err
}

pricePerGBEpoch, _ := (new(big.Rat).SetFrac(l.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64()
pricePerGB, _ := (new(big.Rat).SetFrac(l.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64()
pricePerDeal, _ := (new(big.Rat).SetFrac(l.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64()
pricePerGBEpoch, _ := (new(big.Rat).SetFrac(s.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64()
pricePerGB, _ := (new(big.Rat).SetFrac(s.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64()
pricePerDeal, _ := (new(big.Rat).SetFrac(s.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64()

logger.Infof("Checking %v storage providers", len(l.storageProviders))
for _, sp := range l.storageProviders {
logger.Infof("Checking %v storage providers", len(s.storageProviders))
for _, sp := range s.storageProviders {
logger.Infof("Checking storage provider %s", sp)
var foundSchedule *models.ModelSchedule
logger := logger.With("provider", sp)
Expand All @@ -211,27 +211,27 @@ func (l *SingularityStore) Start(ctx context.Context) error {
if foundSchedule != nil {
// If schedule was found, update it
logger.Infow("Schedule found for provider. Updating with latest settings...", "id", foundSchedule.ID)
_, err := l.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{
_, err := s.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{
Context: ctx,
ID: foundSchedule.ID,
Body: &models.ScheduleUpdateRequest{
PricePerGbEpoch: pricePerGBEpoch,
PricePerGb: pricePerGB,
PricePerDeal: pricePerDeal,
Verified: &l.verifiedDeal,
Ipni: &l.ipniAnnounce,
KeepUnsealed: &l.keepUnsealed,
StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"),
ScheduleCron: l.scheduleCron,
ScheduleCronPerpetual: l.scheduleCronPerpetual,
ScheduleDealNumber: int64(l.scheduleDealNumber),
TotalDealNumber: int64(l.totalDealNumber),
ScheduleDealSize: l.scheduleDealSize,
TotalDealSize: l.totalDealSize,
MaxPendingDealSize: l.maxPendingDealSize,
MaxPendingDealNumber: int64(l.maxPendingDealNumber),
URLTemplate: l.scheduleUrlTemplate,
Verified: &s.verifiedDeal,
Ipni: &s.ipniAnnounce,
KeepUnsealed: &s.keepUnsealed,
StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"),
ScheduleCron: s.scheduleCron,
ScheduleCronPerpetual: s.scheduleCronPerpetual,
ScheduleDealNumber: int64(s.scheduleDealNumber),
TotalDealNumber: int64(s.totalDealNumber),
ScheduleDealSize: s.scheduleDealSize,
TotalDealSize: s.totalDealSize,
MaxPendingDealSize: s.maxPendingDealSize,
MaxPendingDealNumber: int64(s.maxPendingDealNumber),
URLTemplate: s.scheduleUrlTemplate,
},
})
if err != nil {
Expand All @@ -241,28 +241,28 @@ func (l *SingularityStore) Start(ctx context.Context) error {
} else {
// Otherwise, create it
logger.Info("Schedule not found for provider. Creating...")
if createScheduleRes, err := l.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{
if createScheduleRes, err := s.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{
Context: ctx,
Schedule: &models.ScheduleCreateRequest{
Preparation: l.preparationName,
Preparation: s.preparationName,
Provider: sp.String(),
PricePerGbEpoch: pricePerGBEpoch,
PricePerGb: pricePerGB,
PricePerDeal: pricePerDeal,
Verified: &l.verifiedDeal,
Ipni: &l.ipniAnnounce,
KeepUnsealed: &l.keepUnsealed,
StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"),
ScheduleCron: l.scheduleCron,
ScheduleCronPerpetual: l.scheduleCronPerpetual,
ScheduleDealNumber: int64(l.scheduleDealNumber),
TotalDealNumber: int64(l.totalDealNumber),
ScheduleDealSize: l.scheduleDealSize,
TotalDealSize: l.totalDealSize,
MaxPendingDealSize: l.maxPendingDealSize,
MaxPendingDealNumber: int64(l.maxPendingDealNumber),
URLTemplate: l.scheduleUrlTemplate,
Verified: &s.verifiedDeal,
Ipni: &s.ipniAnnounce,
KeepUnsealed: &s.keepUnsealed,
StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"),
ScheduleCron: s.scheduleCron,
ScheduleCronPerpetual: s.scheduleCronPerpetual,
ScheduleDealNumber: int64(s.scheduleDealNumber),
TotalDealNumber: int64(s.totalDealNumber),
ScheduleDealSize: s.scheduleDealSize,
TotalDealSize: s.totalDealSize,
MaxPendingDealSize: s.maxPendingDealSize,
MaxPendingDealNumber: int64(s.maxPendingDealNumber),
URLTemplate: s.scheduleUrlTemplate,
},
}); err != nil {
logger.Errorw("Failed to create schedule for provider", "err", err)
Expand All @@ -272,37 +272,41 @@ func (l *SingularityStore) Start(ctx context.Context) error {
}
}
}
go l.runPreparationJobs()
go l.runCleanupWorker()

s.closed.Add(1)
go s.runPreparationJobs()

s.closed.Add(1)
go s.runCleanupWorker()

return nil
}

func (l *SingularityStore) runPreparationJobs() {
l.closed.Add(1)
defer l.closed.Done()
func (s *SingularityStore) runPreparationJobs() {
defer s.closed.Done()

// Create a context that gets canceled when this function exits.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-l.closing:
case <-s.closing:
return
case fileID := <-l.toPack:
prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
case fileID := <-s.toPack:
prepareToPackSourceRes, err := s.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
Context: ctx,
ID: int64(fileID),
})
if err != nil {
logger.Errorw("preparing to pack file", "fileID", fileID, "error", err)
}
if prepareToPackSourceRes.Payload > l.packThreshold {
if prepareToPackSourceRes.Payload > s.packThreshold {
// mark outstanding pack jobs as ready to go so we can make CAR files
_, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
_, err := s.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
Context: ctx,
ID: l.preparationName,
Name: l.sourceName,
ID: s.preparationName,
Name: s.sourceName,
})
if err != nil {
logger.Errorw("preparing to pack source", "error", err)
Expand All @@ -312,12 +316,12 @@ func (l *SingularityStore) runPreparationJobs() {
}
}

func (l *SingularityStore) Shutdown(ctx context.Context) error {
close(l.closing)
func (s *SingularityStore) Shutdown(ctx context.Context) error {
close(s.closing)

done := make(chan struct{})
go func() {
l.closed.Wait()
s.closed.Wait()
close(done)
}()

Expand Down Expand Up @@ -472,7 +476,6 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc
}

func (s *SingularityStore) runCleanupWorker() {
s.closed.Add(1)
defer s.closed.Done()

// Run immediately once before starting ticker
Expand Down

0 comments on commit 75ab079

Please sign in to comment.