Skip to content

Commit

Permalink
postgres: remove internal timeouts
Browse files Browse the repository at this point in the history
This removes all the internal timeouts for this package. The idea was to
keep execution time bounded, but it's becoming increasingly clear this
is just a source of problems.

Signed-off-by: Hank Donnay <[email protected]>
  • Loading branch information
hdonnay committed Dec 12, 2023
1 parent 8fd9a12 commit f57cfb4
Show file tree
Hide file tree
Showing 17 changed files with 27 additions and 107 deletions.
12 changes: 2 additions & 10 deletions datastore/postgres/affectedmanifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,8 @@ WHERE
// by the vulnerability in question.
pkgsToFilter := []claircore.Package{}

tctx, done := context.WithTimeout(ctx, 30*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(tctx, selectPackages, v.Package.Name)
rows, err := s.pool.Query(ctx, selectPackages, v.Package.Name)

Check warning on line 131 in datastore/postgres/affectedmanifest.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/affectedmanifest.go#L131

Added line #L131 was not covered by tests
switch {
case errors.Is(err, nil):
case errors.Is(err, pgx.ErrNoRows):
Expand Down Expand Up @@ -206,10 +204,8 @@ WHERE
}

err = func() error {
tctx, done := context.WithTimeout(ctx, 30*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(tctx,
rows, err := s.pool.Query(ctx,

Check warning on line 208 in datastore/postgres/affectedmanifest.go

View check run for this annotation

Codecov / codecov/patch

datastore/postgres/affectedmanifest.go#L208

Added line #L208 was not covered by tests
selectAffected,
record.Package.ID,
v[2],
Expand Down Expand Up @@ -280,7 +276,6 @@ func protoRecord(ctx context.Context, pool *pgxpool.Pool, v claircore.Vulnerabil
// fill dist into prototype index record if exists
if (v.Dist != nil) && (v.Dist.Name != "") {
start := time.Now()
ctx, done := context.WithTimeout(ctx, timeout)
row := pool.QueryRow(ctx,
selectDist,
v.Dist.Arch,
Expand All @@ -294,7 +289,6 @@ func protoRecord(ctx context.Context, pool *pgxpool.Pool, v claircore.Vulnerabil
)
var id pgtype.Int8
err := row.Scan(&id)
done()
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return protoRecord, fmt.Errorf("failed to scan dist: %w", err)
Expand Down Expand Up @@ -323,15 +317,13 @@ func protoRecord(ctx context.Context, pool *pgxpool.Pool, v claircore.Vulnerabil
// fill repo into prototype index record if exists
if (v.Repo != nil) && (v.Repo.Name != "") {
start := time.Now()
ctx, done := context.WithTimeout(ctx, timeout)
row := pool.QueryRow(ctx, selectRepo,
v.Repo.Name,
v.Repo.Key,
v.Repo.URI,
)
var id pgtype.Int8
err := row.Scan(&id)
done()
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return protoRecord, fmt.Errorf("failed to scan repo: %w", err)
Expand Down
4 changes: 0 additions & 4 deletions datastore/postgres/distributionsbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,16 @@ func (s *IndexerStore) DistributionsByLayer(ctx context.Context, hash claircore.
// get scanner ids
scannerIDs := make([]int64, len(scnrs))
for i, scnr := range scnrs {
ctx, done := context.WithTimeout(ctx, time.Second)
start := time.Now()
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerIDs[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve distribution ids for scanner %q: %w", scnr, err)
}
distributionByLayerCounter.WithLabelValues("selectScanner").Add(1)
distributionByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())
}

ctx, done := context.WithTimeout(ctx, 30*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, query, hash, scannerIDs)
switch {
Expand Down
4 changes: 1 addition & 3 deletions datastore/postgres/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ func (s *MatcherStore) Get(ctx context.Context, records []*claircore.IndexRecord
batch.Queue(query)
}
// send the batch
tctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

start := time.Now()
res := tx.SendBatch(tctx, batch)
res := tx.SendBatch(ctx, batch)
// Can't just defer the close, because the batch must be fully handled
// before resolving the transaction. Maybe we can move this result handling
// into its own function to be able to just defer it.
Expand Down
16 changes: 4 additions & 12 deletions datastore/postgres/indexdistributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,17 @@ func (s *IndexerStore) IndexDistributions(ctx context.Context, dists []*claircor
)

// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexDistributions failed to create transaction: %v", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertDistStmt, err := tx.Prepare(tctx, "insertDistStmt", insert)
done()
insertDistStmt, err := tx.Prepare(ctx, "insertDistStmt", insert)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertDistScanArtifactWithStmt, err := tx.Prepare(tctx, "insertDistScanArtifactWith", insertWith)
done()
insertDistScanArtifactWithStmt, err := tx.Prepare(ctx, "insertDistScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -157,9 +151,7 @@ func (s *IndexerStore) IndexDistributions(ctx context.Context, dists []*claircor
indexDistributionsCounter.WithLabelValues("insertWith_batch").Add(1)
indexDistributionsDuration.WithLabelValues("insertWith_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 5*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexDistributions failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/indexer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ WHERE
func (s *IndexerStore) selectScanners(ctx context.Context, vs indexer.VersionedScanners) ([]int64, error) {
ids := make([]int64, len(vs))
for i, v := range vs {
ctx, done := context.WithTimeout(ctx, time.Second)
err := s.pool.QueryRow(ctx, selectScanner, v.Name(), v.Version(), v.Kind()).
Scan(&ids[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve id for scanner %q: %w", v.Name(), err)
}
Expand Down
12 changes: 3 additions & 9 deletions datastore/postgres/indexmanifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ func (s *IndexerStore) IndexManifest(ctx context.Context, ir *claircore.IndexRep
}

// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("postgres: indexManifest failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
queryStmt, err := tx.Prepare(tctx, "queryStmt", query)
done()
queryStmt, err := tx.Prepare(ctx, "queryStmt", query)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -127,9 +123,7 @@ func (s *IndexerStore) IndexManifest(ctx context.Context, ir *claircore.IndexRep
indexManifestCounter.WithLabelValues("query_batch").Add(1)
indexManifestDuration.WithLabelValues("query_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
Expand Down
16 changes: 4 additions & 12 deletions datastore/postgres/indexpackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,17 @@ func (s *IndexerStore) IndexPackages(ctx context.Context, pkgs []*claircore.Pack

ctx = zlog.ContextWithValues(ctx, "component", "datastore/postgres/indexPackages")
// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexPackage failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertPackageStmt, err := tx.Prepare(tctx, "insertPackageStmt", insert)
done()
insertPackageStmt, err := tx.Prepare(ctx, "insertPackageStmt", insert)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertPackageScanArtifactWithStmt, err := tx.Prepare(tctx, "insertPackageScanArtifactWith", insertWith)
done()
insertPackageScanArtifactWithStmt, err := tx.Prepare(ctx, "insertPackageScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create statement: %w", err)
}
Expand Down Expand Up @@ -198,9 +192,7 @@ func (s *IndexerStore) IndexPackages(ctx context.Context, pkgs []*claircore.Pack
Int("inserted", len(pkgs)-skipCt).
Msg("scanartifacts inserted")

tctx, done = context.WithTimeout(ctx, 5*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexPackages failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/indexreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func (s *IndexerStore) IndexReport(ctx context.Context, hash claircore.Digest) (
// then type convert back to scanner.domain object
var jsr jsonbIndexReport

ctx, done := context.WithTimeout(ctx, 5*time.Second)
defer done()
start := time.Now()
err := s.pool.QueryRow(ctx, query, hash).Scan(&jsr)
switch {
Expand Down
16 changes: 4 additions & 12 deletions datastore/postgres/indexrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,17 @@ func (s *IndexerStore) IndexRepositories(ctx context.Context, repos []*claircore
`
)
// obtain a transaction scoped batch
tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("store:indexRepositories failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertRepoStmt, err := tx.Prepare(tctx, "insertRepoStmt", insert)
done()
insertRepoStmt, err := tx.Prepare(ctx, "insertRepoStmt", insert)
if err != nil {
return fmt.Errorf("failed to create insert repo statement: %w", err)
}
tctx, done = context.WithTimeout(ctx, 5*time.Second)
insertRepoScanArtifactWithStmt, err := tx.Prepare(tctx, "insertRepoScanArtifactWith", insertWith)
done()
insertRepoScanArtifactWithStmt, err := tx.Prepare(ctx, "insertRepoScanArtifactWith", insertWith)
if err != nil {
return fmt.Errorf("failed to create insert repo scanartifact statement: %w", err)
}
Expand Down Expand Up @@ -142,9 +136,7 @@ func (s *IndexerStore) IndexRepositories(ctx context.Context, repos []*claircore
indexRepositoriesCounter.WithLabelValues("insertWith_batch").Add(1)
indexRepositoriesDuration.WithLabelValues("insertWith_batch").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("store:indexRepositories failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/layerscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ SELECT
`
)

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
start := time.Now()
var scannerID int64
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/manifestscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ func (s *IndexerStore) ManifestScanned(ctx context.Context, hash claircore.Diges
// get a map of the found ids which have scanned this package
foundIDs := map[int64]struct{}{}

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, selectScanned, hash)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions datastore/postgres/packagesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,16 @@ WHERE
// get scanner ids
scannerIDs := make([]int64, len(scnrs))
for i, scnr := range scnrs {
ctx, done := context.WithTimeout(ctx, time.Second)
start := time.Now()
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerIDs[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve scanner ids: %w", err)
}
packagesByLayerCounter.WithLabelValues("selectScanner").Add(1)
packagesByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())
}

ctx, done := context.WithTimeout(ctx, 15*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, query, hash, scannerIDs)
switch {
Expand Down
20 changes: 5 additions & 15 deletions datastore/postgres/persistmanifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,49 +65,39 @@ func (s *IndexerStore) PersistManifest(ctx context.Context, manifest claircore.M
`
)

tctx, done := context.WithTimeout(ctx, 5*time.Second)
tx, err := s.pool.Begin(tctx)
done()
tx, err := s.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to create transaction: %w", err)
}
defer tx.Rollback(ctx)

tctx, done = context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err = tx.Exec(tctx, insertManifest, manifest.Hash)
done()
_, err = tx.Exec(ctx, insertManifest, manifest.Hash)
if err != nil {
return fmt.Errorf("failed to insert manifest: %w", err)
}
persistManifestCounter.WithLabelValues("insertManifest").Add(1)
persistManifestDuration.WithLabelValues("insertManifest").Observe(time.Since(start).Seconds())

for i, layer := range manifest.Layers {
tctx, done = context.WithTimeout(ctx, 5*time.Second)
start := time.Now()
_, err = tx.Exec(tctx, insertLayer, layer.Hash)
done()
_, err = tx.Exec(ctx, insertLayer, layer.Hash)
if err != nil {
return fmt.Errorf("failed to insert layer: %w", err)
}
persistManifestCounter.WithLabelValues("insertLayer").Add(1)
persistManifestDuration.WithLabelValues("insertLayer").Observe(time.Since(start).Seconds())

tctx, done = context.WithTimeout(ctx, 5*time.Second)
start = time.Now()
_, err = tx.Exec(tctx, insertManifestLayer, manifest.Hash, layer.Hash, i)
done()
_, err = tx.Exec(ctx, insertManifestLayer, manifest.Hash, layer.Hash, i)
if err != nil {
return fmt.Errorf("failed to insert manifest -> layer link: %w", err)
}
persistManifestCounter.WithLabelValues("insertManifestLayer").Add(1)
persistManifestDuration.WithLabelValues("insertManifestLayer").Observe(time.Since(start).Seconds())
}

tctx, done = context.WithTimeout(ctx, 15*time.Second)
err = tx.Commit(tctx)
done()
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions datastore/postgres/repositoriesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ WHERE
return nil, fmt.Errorf("unable to select scanners: %w", err)
}

ctx, done := context.WithTimeout(ctx, 15*time.Second)
defer done()
start := time.Now()
rows, err := s.pool.Query(ctx, query, hash, scannerIDs)
switch {
Expand Down
Loading

0 comments on commit f57cfb4

Please sign in to comment.