-
Notifications
You must be signed in to change notification settings - Fork 217
Data Imports
A complete example of importing data, from just a few records to a 100 million and beyond.
Let's use the following destination table for our example:
CREATE TABLE products(
id SERIAL PRIMARY KEY, -- auto-incremented product id
title TEXT NOT NULL, -- product title/name
price NUMERIC(15,6) NOT NULL, -- product price
units INT NOT NULL -- availability / quantity
);
When inserting more than 1 record, you should use type ColumnSet to generate multi-row insert queries that are significantly better-performing than single-row inserts.
See also:
For completeness, let's throw in some data-changing rules for the import data:
- Value for column
price
must come from propertycost
- When
units
property is missing completely, insert 1
So we will start with something like this:
const pgp = require('pg-promise')({
capSQL: true // generate capitalized SQL
});
const db = pgp(/*connection details*/); // your database object
// Creating a reusable/static ColumnSet for generating INSERT queries:
const cs = new pgp.helpers.ColumnSet([
'title',
{name: 'price', prop: 'cost'},
{name: 'units', def: 1}
], {table: 'products'});
See class Column about how you can configure columns for any situation/logic.
The next part depends on how many records you are going to insert, or more specifically, how many of them are going to be inside your Node.js process at once.
If you are going to have 100% of data in the process before inserting it, that means you can insert all the data through a single multi-row insert query:
// data = array of objects that represent the import data:
const data = [
{title: 'red apples', cost: 2.35, units: 1000},
{title: 'large oranges', cost: 4.50}
];
const insert = pgp.helpers.insert(data, cs);
//=> INSERT INTO "products"("title","price","units")
// VALUES('red apples',2.35,1000),('large oranges',4.5,1)
db.none(insert)
.then(() => {
// success, all records inserted
})
.catch(error => {
// error
});
The best part - you do not even need to use a transaction here, because a multi-row insert is an atomic operation, i.e. if one set of values fails - none will be inserted.
There are many situations when your data cannot be fully inside the process:
- The data may be simply too large to be stored inside the process at once
- The data needs to be imported into the process asynchronously, in bulks/pages
In such cases you need to implement a massive-insert operation, as explained below.
The key element here is a function that on each call will fetch the next set of data rows from its source, either in a sequential manner (like from a stream), or using a paging logic (from an index):
function getNextData(t, pageIndex) {
// t = database transaction protocol, if you need to execute queries
// 1. returns a promise that represents the next data that will be available
// 2. retrieves the next data batch (array of objects), either sequentially or
// according to the pageIndex, and resolves with that array of objects.
// 3. when no more data left, it must resolve with null
}
In our example with table products
we can pull a whole 10,000 records in a single batch, because those records are tiny. But in your application that decision should be based on a number of factors, including:
- How much time do you want to spend pulling each batch into the process?
- How much memory will the batch occupy inside the process?
And now the main implementation:
db.tx('massive-insert', t => {
const processData = data => {
if (data) {
const insert = pgp.helpers.insert(data, cs);
return t.none(insert);
}
};
return t.sequence(index => getNextData(t, index).then(processData));
})
.then(data => {
// COMMIT has been executed
console.log('Total batches:', data.total, ', Duration:', data.duration);
})
.catch(error => {
// ROLLBACK has been executed
console.log(error);
});
We execute a sequence inside a transaction (method tx) that keeps pulling the data and inserting it. The sequence successfully ends when its callback either returns undefined
or resolves with undefined
, which is what happens when our getNextData
resolves with null
.
Think way north of 100 million records, to a 100 billion records and beyond, and you may find yourself in a situation when it is simply too much to fit into a single transaction, typically due to the hardware limitations. Sometimes the limit can be much smaller, if your records are large, like containing binary data, for example.
The solution showed earlier is still perfectly valid. But you would want to add logical partitioning to the data being inserted. You would execute multiple transactions, with each up to 100 million records, for example.
The easiest approach is to have your function getNextData
check for such a large threshold, and to be able to pick it up for a new transaction next time from where it last ended.
This will make huge data imports manageable, both logically and from the hardware point of view.
How fast is this solution?
For obvious reasons, we can only offer a figure that does not include the time needed to pull the data from the source within the getNextData
function.
We will have to simply generate the data, to make our evaluation possible:
// Generating 10,000 records 1000 times, for the total of 10 million records:
function getNextData(t, pageIndex) {
let data = null;
if (pageIndex < 1000) {
data = [];
for (let i = 0; i < 10000; i++) {
const idx = pageIndex * 10000 + i; // to insert unique product names
data.push({
title: 'product-' + idx,
cost: 123.456,
units: 5
});
}
}
return Promise.resolve(data);
}
See below the figures for such logic, with the environment as follows:
- Both the import process and PostgreSQL server are running on the same PC
- The PC hardware: i7-4770K, 32GB RAM, SSD Samsung 840 Pro 256GB
- Software: Windows 10 64-bit, PostgreSQL 9.5, Node.js 8.1.3, pg-promise v6.3.1
records | 10 | 100 | 1k | 10k | 100k | 1m | 10m | 100m |
---|---|---|---|---|---|---|---|---|
time | <1ms | 4ms | 15ms | 110ms | 1040ms | 10.5s | 105s | 17.4m |
During the whole test, Node.js process never used more than 6% of the CPU, or never more than 80MB of RAM. And all inserts were done within a single transaction, exactly as shown in our example.
You should be able to get much better performance figures with a dedicated PostgreSQL server.
In the code above we only used pageIndex
to pull the data. But the fuller syntax of method spex.sequence supports 3 parameters, with the second one being the previously resolved value.
And if inside your getNextData
function you need a special value to be able to request the next set of records, you can modify your code to the following:
function getNextData(t, index, nextPageInfo) {
// t = database transaction protocol
// NOTE: nextPageInfo = undefined when index = 0
return new Promise((resolve, reject) {
/* pull the next data, according to nextPageInfo */
/* do reject(error) on an error, to ROLLBACK changes */
if(/* there is still data left*/) {
// if whateverNextDetails = undefined, the data will be inserted,
// but the sequence will end there (as success).
resolve({data, nextPageInfo: whateverNextDetails});
} else {
resolve(null);
}
});
}
db.tx('massive-insert', t => {
return t.sequence((index, data) => {
return getNextData(t, index, data)
.then(a => {
if (a) {
const insert = pgp.helpers.insert(a.data, cs);
return t.none(insert).then(() => a.nextPageInfo);
}
})
});
})
.then(data => {
// COMMIT has been executed
console.log('Total batches:', data.total, ', Duration:', data.duration);
})
.catch(error => {
// ROLLBACK has been executed
console.log(error);
});
Please note that since we are chaining the result from getNextData
to the value of nextPageInfo
, then if its value is undefined
, it will end the sequence (as success).
If you want to count how many records in total has been inserted, you can do it in 2 ways:
- You can use the ability of method sequence to track all the resolved records (option
track
):
db.tx('massive-insert', t => {
return t.sequence((index, data) => {
return getNextData(t, index, data)
.then(a => {
if (a) {
const insert = pgp.helpers.insert(a.data, cs);
return t.none(insert).then(() => a.data.length);
}
})
}, {track: true});
})
.then(data => {
// data = array of integers - number of records inserted with each query
})
- You can simply count records yourself:
db.tx('massive-insert', t => {
let total = 0;
return t.sequence((index, data) => {
return getNextData(t, index, data)
.then(a => {
if (a) {
total += a.data.length;
const insert = pgp.helpers.insert(a.data, cs);
return t.none(insert);
}
})
}).then(() => total);
})
.then(data => {
// data = total number of records inserted
})
pg-promise