Skip to content

Commit

Permalink
batch entity inserts
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Apr 15, 2024
1 parent 59e2165 commit 83b0ac2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 21 deletions.
69 changes: 48 additions & 21 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

const { sql } = require('slonik');
const { Actor, Entity, Submission, Form } = require('../frames');
const { equals, extender, unjoiner, page, markDeleted, insertMany } = require('../../util/db');
const { map, mergeRight, pickAll } = require('ramda');
const { equals, extender, unjoiner, page, markDeleted } = require('../../util/db');
const { map, mergeRight, pickAll, splitEvery } = require('ramda');
const { blankStringToNull, construct } = require('../../util/util');
const { QueryOptions } = require('../../util/db');
const { odataFilter, odataOrderBy } = require('../../data/odata-filter');
Expand Down Expand Up @@ -90,28 +90,55 @@ createNew.audit.withResult = true;
// it could be used in places of createNew() but createNew uses a single query so it may be faster
// in single entity situations (eg. processing submissions to make entities)
// Note: if the entity schema changes, createMany and createNew would both need to change.
const createMany = (dataset, rawEntities, sourceId, userAgentIn) => async ({ all, context }) => {
const createMany = (dataset, rawEntities, sourceId, userAgentIn) => async ({ all, context, run }) => {
const creatorId = context.auth.actor.map((actor) => actor.id).orNull();
const userAgent = blankStringToNull(userAgentIn);

// Augment parsed entity data with dataset and creator IDs
const entitiesForInsert = rawEntities.map(e => new Entity({ datasetId: dataset.id, creatorId, ...e }));

const entities = await all(sql`${insertMany(entitiesForInsert)} RETURNING id`);

// Augment defs with IDs of freshly inserted entities and
// other default values
const defsForInsert = rawEntities.map((e, i) => new Entity.Def({
entityId: entities[i].id,
creatorId,
root: true,
current: true,
sourceId,
version: 1,
userAgent,
...e.def
}));
return all(insertMany(defsForInsert));
const _insertBatch = async (batch) => {
// Augment parsed entity data with dataset and creator IDs
const entityRows = batch.map(e => [dataset.id, e.uuid, creatorId]);
const entityColumnTypes = ['int4', 'uuid', 'int4'];
const entities = await all(sql`
INSERT INTO entities ("createdAt", "datasetId", "uuid", "creatorId")
SELECT clock_timestamp(), * FROM ${sql.unnest(entityRows, entityColumnTypes)} as t
RETURNING id`);

// Augment defs with IDs of freshly inserted entities and
// other default values
const defRows = batch.map((e, i) => [
entities[i].id,
e.def.label,
JSON.stringify(e.def.data),
JSON.stringify(e.def.dataReceived),
sourceId,
creatorId,
userAgent,
'true',
'true',
'1'
]);
const defColumnTypes = [
'int4',
'text',
'json',
'json',
'int4',
'int4',
'text',
'bool',
'bool',
'int4'
];

await run(sql`
INSERT INTO entity_defs ("createdAt", "entityId", "label", "data", "dataReceived",
"sourceId", "creatorId", "userAgent", "root", "current", "version")
SELECT clock_timestamp(), * FROM ${sql.unnest(defRows, defColumnTypes)} as t
RETURNING *`);
};

const batches = splitEvery(10000, rawEntities);
return Promise.all(batches.map(batch => _insertBatch(batch)));
};

createMany.audit = (dataset, entities, sourceId) => (log) =>
Expand Down
22 changes: 22 additions & 0 deletions test/integration/api/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,28 @@ describe('Entities API', () => {
});
}));

it('should insert entities in batches', testDataset(async (service) => {
const asAlice = await service.login('alice');

await asAlice.get('/v1/projects/1/datasets/people/entities')
.then(({ body }) => {
body.length.should.equal(0);
});

const entities = Array.from({ length: 1000 }, () => ({ label: 'an entity label', data: { first_name: 'foo' } }));
await asAlice.post('/v1/projects/1/datasets/people/entities')
.send({
source: {
name: 'people.csv',
size: 100,
},
entities
})
.expect(200)
.then(({ body }) => {
body.success.should.be.true();
});
}));

it('should generate uuids for entities when no uuid is provided', testDataset(async (service) => {
const asAlice = await service.login('alice');
Expand Down

0 comments on commit 83b0ac2

Please sign in to comment.