Skip to content

Commit

Permalink
Fix ShareLock deadlock (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
NickyMateev authored and desislavaa committed Mar 31, 2020
1 parent 8e292c9 commit cf09077
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 7 deletions.
13 changes: 13 additions & 0 deletions storage/encrypting_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ func (er *encryptingRepository) Get(ctx context.Context, objectType types.Object
return obj, nil
}

func (er *encryptingRepository) GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error) {
obj, err := er.repository.Get(ctx, objectType, criteria...)
if err != nil {
return nil, err
}

if err := er.decrypt(ctx, obj); err != nil {
return nil, err
}

return obj, nil
}

func (er *encryptingRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
objList, err := er.repository.List(ctx, objectType, criteria...)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions storage/integrity_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ func (cr *integrityRepository) Get(ctx context.Context, objectType types.ObjectT
return obj, nil
}

func (cr *integrityRepository) GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error) {
obj, err := cr.repository.Get(ctx, objectType, criteria...)
if err != nil {
return nil, err
}
if err := cr.validateIntegrity(obj); err != nil {
return nil, err
}
return obj, nil
}

func (cr *integrityRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
objectList, err := cr.repository.List(ctx, objectType, criteria...)
if err != nil {
Expand Down
20 changes: 19 additions & 1 deletion storage/interceptable_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ func (ir *queryScopedInterceptableRepository) Get(ctx context.Context, objectTyp
return object, nil
}

func (ir *queryScopedInterceptableRepository) GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error) {
object, err := ir.repositoryInTransaction.Get(ctx, objectType, criteria...)
if err != nil {
return nil, err
}

return object, nil
}

func (ir *queryScopedInterceptableRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
objectList, err := ir.repositoryInTransaction.List(ctx, objectType, criteria...)
if err != nil {
Expand Down Expand Up @@ -251,7 +260,7 @@ func (ir *queryScopedInterceptableRepository) Update(ctx context.Context, obj ty
// postgres storage implementation also locks the retrieved row for update
objectType := obj.GetType()
byID := query.ByField(query.EqualsOperator, "id", obj.GetID())
oldObj, err := ir.Get(ctx, objectType, byID)
oldObj, err := ir.GetForUpdate(ctx, objectType, byID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -442,6 +451,15 @@ func (itr *InterceptableTransactionalRepository) Get(ctx context.Context, object
return object, nil
}

func (itr *InterceptableTransactionalRepository) GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error) {
object, err := itr.RawRepository.Get(ctx, objectType, criteria...)
if err != nil {
return nil, err
}

return object, nil
}

func (itr *InterceptableTransactionalRepository) List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
objectList, err := itr.RawRepository.List(ctx, objectType, criteria...)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions storage/interceptable_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ var _ = Describe("Interceptable TransactionalRepository", func() {
},
}, nil)

fakeStorage.GetForUpdateReturns(&types.ServiceBroker{
Base: types.Base{
UpdatedAt: updateTime,
Ready: true,
},
}, nil)

interceptableRepository = storage.NewInterceptableTransactionalRepository(fakeStorage)

orderNone := storage.InterceptorOrder{
Expand Down
3 changes: 3 additions & 0 deletions storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ type Repository interface {
// Get retrieves an object using the provided id from SM DB
Get(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error)

// GetForUpdate retrieves an object using the provided id from SM DB while also locking the retrieved rows
GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error)

// List retrieves all object from SM DB
List(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error)

Expand Down
8 changes: 4 additions & 4 deletions storage/postgres/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ FROM {{.ENTITY_TABLE}}
ON {{.ENTITY_TABLE}}.{{.PRIMARY_KEY}} = {{.LABELS_TABLE}}.{{.REF_COLUMN}}
{{end}}
{{.WHERE}}
{{.FOR_SHARE_OF}}
{{.FOR_UPDATE_OF}}
{{.LIMIT}};`

const SelectQueryTemplate = `
Expand Down Expand Up @@ -57,7 +57,7 @@ WHERE {{.ENTITY_TABLE}}.paging_sequence IN
(SELECT matching_resources.paging_sequence FROM matching_resources)
{{end}}
{{.ORDER_BY}}
{{.FOR_SHARE_OF}};`
{{.FOR_UPDATE_OF}};`

const DeleteQueryTemplate = `
DELETE FROM {{.ENTITY_TABLE}}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (pq *pgQuery) resolveQueryTemplate(ctx context.Context, template string) (s
"REF_COLUMN": pq.labelEntity.ReferenceColumn(),
"JOIN": pq.joinSQL(),
"WHERE": pq.whereSQL(),
"FOR_SHARE_OF": pq.lockSQL(),
"FOR_UPDATE_OF": pq.lockSQL(),
"ORDER_BY": pq.orderBySQL(),
"ORDER_BY_SEQUENCE": pq.orderBySequenceSQL(),
"LIMIT": pq.limitSQL(),
Expand Down Expand Up @@ -330,7 +330,7 @@ func (pq *pgQuery) lockSQL() string {
// Lock the rows if we are in transaction so that update operations on those rows can rely on unchanged data
// This allows us to handle concurrent updates on the same rows by executing them sequentially as
// before updating we have to anyway select the rows and can therefore lock them
return fmt.Sprintf("FOR SHARE OF %s", pq.entityTableName)
return fmt.Sprintf("FOR UPDATE OF %s", pq.entityTableName)
}
return ""
}
Expand Down
24 changes: 22 additions & 2 deletions storage/postgres/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,33 @@ func (ps *Storage) Get(ctx context.Context, objectType types.ObjectType, criteri
return result.ItemAt(0), nil
}

func (ps *Storage) GetForUpdate(ctx context.Context, objectType types.ObjectType, criteria ...query.Criterion) (types.Object, error) {
result, err := ps.list(ctx, objectType, true, criteria...)
if err != nil {
return nil, err
}
if result.Len() == 0 {
return nil, util.ErrNotFoundInStorage
}
return result.ItemAt(0), nil
}

func (ps *Storage) List(ctx context.Context, objType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
return ps.list(ctx, objType, false, criteria...)
}

func (ps *Storage) list(ctx context.Context, objType types.ObjectType, forUpdate bool, criteria ...query.Criterion) (types.ObjectList, error) {
entity, err := ps.scheme.provide(objType)
if err != nil {
return nil, err
}

rows, err := ps.queryBuilder.NewQuery(entity).WithCriteria(criteria...).WithLock().List(ctx)
queryBuilder := ps.queryBuilder.NewQuery(entity).WithCriteria(criteria...)
if forUpdate {
queryBuilder = queryBuilder.WithLock()
}

rows, err := queryBuilder.List(ctx)
if err != nil {
return nil, err
}
Expand All @@ -242,7 +262,7 @@ func (ps *Storage) Count(ctx context.Context, objType types.ObjectType, criteria
if err != nil {
return 0, err
}
return ps.queryBuilder.NewQuery(entity).WithCriteria(criteria...).WithLock().Count(ctx)
return ps.queryBuilder.NewQuery(entity).WithCriteria(criteria...).Count(ctx)
}

func (ps *Storage) DeleteReturning(ctx context.Context, objType types.ObjectType, criteria ...query.Criterion) (types.ObjectList, error) {
Expand Down
82 changes: 82 additions & 0 deletions storage/storagefakes/fake_storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cf09077

Please sign in to comment.