Skip to content

Commit

Permalink
Insert labels in batches to avoid pq param limit (#491)
Browse files Browse the repository at this point in the history
Cherry-pick PR #489
  • Loading branch information
dotchev authored Apr 15, 2020
1 parent 86c5561 commit 4510024
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 5 deletions.
29 changes: 26 additions & 3 deletions storage/postgres/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,32 @@ func (ps *Storage) createLabels(ctx context.Context, labels []PostgresLabel) err
strings.Join(dbTags, ", :"),
)

log.C(ctx).Debugf("Executing query %s", sqlQuery)
_, err := ps.pgDB.NamedExecContext(ctx, sqlQuery, labels)
return checkIntegrityViolation(ctx, err)
// break into batches so the PostgreSQL limit of 65535 parameters is not exceeded
const maxParams = 65535
maxRows := maxParams / len(dbTags)
for len(labels) > 0 {
rows := min(len(labels), maxRows)
log.C(ctx).Debugf("Executing query %s", sqlQuery)
result, err := ps.pgDB.NamedExecContext(ctx, sqlQuery, labels[:rows])
if err != nil {
return checkIntegrityViolation(ctx, err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
log.C(ctx).Debugf("Could not get number of affected rows: %v", err)
} else {
log.C(ctx).Debugf("%d rows inserted", rowsAffected)
}
labels = labels[rows:]
}
return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

func (ps *Storage) deleteLabels(ctx context.Context, objectType types.ObjectType, entityID string, removedLabels types.Labels) error {
Expand Down
2 changes: 1 addition & 1 deletion test/common/application.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
server:
request_timeout: 4000ms
request_timeout: 30s
shutdown_timeout: 4000ms
port: 1234
httpclient:
Expand Down
2 changes: 1 addition & 1 deletion test/configuration_test/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var _ = Describe("Service Manager Config API", func() {
"max_body_bytes": 1048576,
"max_header_bytes": 1024,
"port": 1234,
"request_timeout": "4000ms",
"request_timeout": "30s",
"shutdown_timeout": "4000ms"
},
"storage": {
Expand Down
18 changes: 18 additions & 0 deletions test/visibility_test/visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ var _ = test.DescribeTestsFor(test.TestCase{
})
})

Context("When many labels are provided", func() {
It("should return 201", func() {
// see https://github.com/lib/pq/blob/master/conn.go#L1282
const labelCount = 20000 // 20000 * 6 > 65535 - max postgres parameter number
orgs := make(common.Array, labelCount)
for i := range orgs {
orgs[i] = fmt.Sprintf("org-id-%d", i)
}
postVisibilityRequestWithLabels["labels"] = common.Object{
"org_id": orgs,
}
ctx.SMWithOAuth.POST(web.VisibilitiesURL).
WithJSON(postVisibilityRequestWithLabels).
Expect().Status(http.StatusCreated).
JSON().Object().Path("$.labels.org_id").Array().ContainsOnly(orgs...)
})
})

Context("When creating labeled visibility for which a public one exists", func() {
It("Should return 409", func() {
ctx.SMWithOAuth.POST(web.VisibilitiesURL).
Expand Down

0 comments on commit 4510024

Please sign in to comment.