From d9a29cf77bf9caff7521b7712aa335c830027401 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 24 Nov 2023 16:00:02 -0800 Subject: [PATCH] File ranges in deals with different providers are part of different replica --- integration/singularity/store.go | 112 ++++++++++++++++--------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/integration/singularity/store.go b/integration/singularity/store.go index b71d938..1c242a0 100644 --- a/integration/singularity/store.go +++ b/integration/singularity/store.go @@ -473,6 +473,24 @@ func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error) return NewReader(s.singularityClient, uint64(fileID), getFileRes.Payload.Size), nil } +// Describe returns a blob descriptor describing a blob's replica(s). +// +// Rules about replica composition: +// +// - If twe different ranges are in deals with two separate providers, then +// these are considered to be in separate replicas. This limitation is imposed +// to limit complexity. +// +// - In order to have a complete replica, each file range must be in a deal +// with the same provider. +// +// - If different ranges of one file are in different deals, then all of the +// deals make up one complete replica. +// +// - If every file range has 2 deals, then there are 2 complete replicas. +// +// - If some file ranges have 2 deals and some have 1, then there is one +// complete replica and one one partial replica. func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) { fileID, err := s.idMap.get(id) if err != nil { @@ -516,75 +534,61 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err return descriptor, nil } - // In order to have a complete replica, each file range must be in a deal. - // The number of replicas is the lowest number of deals for all ranges. - // This means a single replica can have multiple providers. - // - // - If different ranges of one file are in different deals, then all of - // the deals make up one complete replica. - // - // - If every file range has 2 deals, then there are 2 complete replicas. - // - // - If some file ranges have 2 deals and some have 1, then there is one - // complete replica and one one partial replica. - - // Count the number of replicas of each file range, and record the lowest - // count across all ranges. - var n int + providerReplicas := make(map[string][]blob.Replica) + var prevDealCount, totalReplicas int + + // All ranges in a replica are handled by the same provider. for i := range dealsForRanges { - // ---- START DEBUG---- fileRange := dealsForRanges[i].FileRange deals := dealsForRanges[i].Deals - fmt.Println("---> File range", fileRange.Offset, "-", fileRange.Offset+fileRange.Length, "has", len(deals), "deals") - for _, deal := range deals { - fmt.Println(" -->> ID:", deal.ID, "DealID:", deal.DealID, "PieceCid", deal.PieceCid) + provRangeCounts := make(map[string]int) + if i != 0 && len(deals) != prevDealCount { + logger.Warnw("File range has different number of deals that previous file range", + "range", fileRange, "deals", len(deals), "previousDeals", prevDealCount) } - // ---- END DEBUG---- + prevDealCount = len(deals) - count := len(dealsForRanges[i].Deals) - if count == 0 { - // No complete replica because this file range is not in any deal. - return descriptor, nil - } - if n == -1 || count < n { - n = count - } - } - fmt.Println() - - // Collect information about about deals in each complete replica. - dealsSeen := map[int64]struct{}{} - replicas := make([]blob.Replica, 0, n) - for i := 0; i < n; i++ { - var pieces []blob.Piece - var providers []string - // Collect info for all deals in replica i. - for j := range dealsForRanges { - deal := dealsForRanges[j].Deals[i] - if _, seen := dealsSeen[deal.ID]; seen { - // Already saw this deal for a different range. - fmt.Println("---> already seen deal", deal.ID) - continue - } - dealsSeen[deal.ID] = struct{}{} + for _, deal := range deals { updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt) if err != nil { updatedAt = time.Time{} } - pieces = append(pieces, blob.Piece{ + piece := blob.Piece{ Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)), LastUpdated: updatedAt, PieceCID: deal.PieceCid, Status: string(deal.State), - }) - providers = append(providers, deal.Provider) + } + + // Deals with different providers are alrays for different replicas. + replicas := providerReplicas[deal.Provider] + // Get number of deals so far for this range for this provider. + j := provRangeCounts[deal.Provider] + if len(replicas) == j { + // Need a new replica since this file range has more deals than + // there are replicas. + // + // If there are multiple deals for the same file range, then + // these deals are for separate replicas. + providerReplicas[deal.Provider] = append(replicas, blob.Replica{ + Provider: deal.Provider, + Pieces: []blob.Piece{piece}, + }) + provRangeCounts[deal.Provider]++ + totalReplicas++ + } else { + // Deal is part of an existing replica that a previous range is part of. + replicas[j].Pieces = append(replicas[j].Pieces, piece) + } } - replicas = append(replicas, blob.Replica{ - // TODO: need to support multiple providers per replica - Provider: providers[0], - Pieces: pieces, - }) } + + replicas := make([]blob.Replica, 0, totalReplicas) + for prov, provReplicas := range providerReplicas { + replicas = append(replicas, provReplicas...) + logger.Infof("Provider %s has %d replicas", prov, len(provReplicas)) + } + descriptor.Replicas = replicas return descriptor, nil }