Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
File ranges in deals with different providers are part of different r…
Browse files Browse the repository at this point in the history
…eplica
  • Loading branch information
gammazero committed Nov 25, 2023
1 parent 8fe769a commit d9a29cf
Showing 1 changed file with 58 additions and 54 deletions.
112 changes: 58 additions & 54 deletions integration/singularity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,24 @@ func (s *Store) Get(ctx context.Context, id blob.ID) (io.ReadSeekCloser, error)
return NewReader(s.singularityClient, uint64(fileID), getFileRes.Payload.Size), nil
}

// Describe returns a blob descriptor describing a blob's replica(s).
//
// Rules about replica composition:
//
// - If twe different ranges are in deals with two separate providers, then
// these are considered to be in separate replicas. This limitation is imposed
// to limit complexity.
//
// - In order to have a complete replica, each file range must be in a deal
// with the same provider.
//
// - If different ranges of one file are in different deals, then all of the
// deals make up one complete replica.
//
// - If every file range has 2 deals, then there are 2 complete replicas.
//
// - If some file ranges have 2 deals and some have 1, then there is one
// complete replica and one one partial replica.
func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, error) {
fileID, err := s.idMap.get(id)
if err != nil {
Expand Down Expand Up @@ -516,75 +534,61 @@ func (s *Store) Describe(ctx context.Context, id blob.ID) (*blob.Descriptor, err
return descriptor, nil
}

// In order to have a complete replica, each file range must be in a deal.
// The number of replicas is the lowest number of deals for all ranges.
// This means a single replica can have multiple providers.
//
// - If different ranges of one file are in different deals, then all of
// the deals make up one complete replica.
//
// - If every file range has 2 deals, then there are 2 complete replicas.
//
// - If some file ranges have 2 deals and some have 1, then there is one
// complete replica and one one partial replica.

// Count the number of replicas of each file range, and record the lowest
// count across all ranges.
var n int
providerReplicas := make(map[string][]blob.Replica)
var prevDealCount, totalReplicas int

// All ranges in a replica are handled by the same provider.
for i := range dealsForRanges {
// ---- START DEBUG----
fileRange := dealsForRanges[i].FileRange
deals := dealsForRanges[i].Deals
fmt.Println("---> File range", fileRange.Offset, "-", fileRange.Offset+fileRange.Length, "has", len(deals), "deals")
for _, deal := range deals {
fmt.Println(" -->> ID:", deal.ID, "DealID:", deal.DealID, "PieceCid", deal.PieceCid)
provRangeCounts := make(map[string]int)
if i != 0 && len(deals) != prevDealCount {
logger.Warnw("File range has different number of deals that previous file range",
"range", fileRange, "deals", len(deals), "previousDeals", prevDealCount)
}
// ---- END DEBUG----
prevDealCount = len(deals)

count := len(dealsForRanges[i].Deals)
if count == 0 {
// No complete replica because this file range is not in any deal.
return descriptor, nil
}
if n == -1 || count < n {
n = count
}
}
fmt.Println()

// Collect information about about deals in each complete replica.
dealsSeen := map[int64]struct{}{}
replicas := make([]blob.Replica, 0, n)
for i := 0; i < n; i++ {
var pieces []blob.Piece
var providers []string
// Collect info for all deals in replica i.
for j := range dealsForRanges {
deal := dealsForRanges[j].Deals[i]
if _, seen := dealsSeen[deal.ID]; seen {
// Already saw this deal for a different range.
fmt.Println("---> already seen deal", deal.ID)
continue
}
dealsSeen[deal.ID] = struct{}{}
for _, deal := range deals {
updatedAt, err := time.Parse("2006-01-02 15:04:05-07:00", deal.LastVerifiedAt)
if err != nil {
updatedAt = time.Time{}
}
pieces = append(pieces, blob.Piece{
piece := blob.Piece{
Expiration: epochutil.EpochToTime(int32(deal.EndEpoch)),
LastUpdated: updatedAt,
PieceCID: deal.PieceCid,
Status: string(deal.State),
})
providers = append(providers, deal.Provider)
}

// Deals with different providers are alrays for different replicas.
replicas := providerReplicas[deal.Provider]
// Get number of deals so far for this range for this provider.
j := provRangeCounts[deal.Provider]
if len(replicas) == j {
// Need a new replica since this file range has more deals than
// there are replicas.
//
// If there are multiple deals for the same file range, then
// these deals are for separate replicas.
providerReplicas[deal.Provider] = append(replicas, blob.Replica{
Provider: deal.Provider,
Pieces: []blob.Piece{piece},
})
provRangeCounts[deal.Provider]++
totalReplicas++
} else {
// Deal is part of an existing replica that a previous range is part of.
replicas[j].Pieces = append(replicas[j].Pieces, piece)
}
}
replicas = append(replicas, blob.Replica{
// TODO: need to support multiple providers per replica
Provider: providers[0],
Pieces: pieces,
})
}

replicas := make([]blob.Replica, 0, totalReplicas)
for prov, provReplicas := range providerReplicas {
replicas = append(replicas, provReplicas...)
logger.Infof("Provider %s has %d replicas", prov, len(provReplicas))
}

descriptor.Replicas = replicas
return descriptor, nil
}
Expand Down

0 comments on commit d9a29cf

Please sign in to comment.