Path to v1.0.0: https://github.com/lawrencejones/pgsink/projects/1
Draft docs can be seen at: docs
pgsink is a Postgres change-capture device that supports high-throughput and low-latency capture to a variety of sinks.
You'd use this project if your primary database is Postgres and you want a stress-free, quick-to-setup and easy-to-operate tool to replicate your data to other stores such as BigQuery or Elasticsearch, which works with any size Postgres database.
There are many change-capture projects out there, and several support Postgres.
As an example, pgsink is similar to debezium in performance and durability goals, but with a much simpler setup (no Kafka required). We also bear similarity to Netflix's dblog, with the benefit of being open-source and available for use.
We win in these comparisons when you want a simple, no additional dependencies setup. We also benefit from the sole focus on Postgres instead of many upstream sources, as we can optimise our data-access pattern for large, high-transaction volume Postgres databases. Examples of this are keeping transactions short to help vacuums, and traversing tables using efficient indexes.
This makes pgsink a much safer bet for people managing production critical Postgres databases.
As an overview of important packages, for those understanding the source code:
changelog
the input to sinks, produced by subscription or importdecode
configures decoding of Postgres types into Golang (text -> int64)imports
create, manage and work import jobs, producing changelog entrieslogical
parsing of the pgoutput logical encoding, used by subscriptionsinks
implements different types of sink, from files to Google BigQuerysubscription
Postgres change capture via replication, generating a changelog
This project comes with a docker-compose development environment. Boot the environment like so:
$ docker-compose up -d
docker-compose up -d
pgsink_prometheus_1 is up-to-date
pgsink_postgres_1 is up-to-date
pgsink_grafana_1 is up-to-date
Then run make recreatedb
to create a pgsink
database. You can now access
your database like so:
$ psql --host localhost --user pgsink pgsink
pgsink=> \q
pgsink will work with this database: try pgsink --sink=file --decode-only
.
We use goose to run database migrations. Create new migrations like so:
$ go run internal/migration/cmd/goose.go --dir internal/migration create create_import_jobs_table go
2019/12/29 14:59:51 Created new file: internal/migration/20191229145951_create_import_jobs_table.go
Running migrations is done using the make target:
$ make migrate structure.sql
$ go run internal/migration/cmd/goose.go --install up
2021/01/09 15:38:29 requested --install, so creating schema 'pgsink'
2021/01/09 15:38:29 goose: no migrations to run. current version: 20210102200953
docker-compose --env-file=/dev/null exec -T postgres pg_dump -U postgres pgsink --schema-only --schema=pgsink >structure.sql
Boot a Postgres database, then create an example table.
$ createdb pgsink
$ psql pgsink
psql (11.5)
Type "help" for help.
pgsink=# create table public.example (id bigserial primary key, msg text);
CREATE TABLE
pgsink=# insert into public.example (msg) values ('hello world');
INSERT 1
pgsink will stream these changes from the database and send it to the
configured sink. Changes are expressed as a stream of messages, either a
Schema
that describes the structure of a Postgres table, or a Modification
corresponding to an insert/update/delete of a row in Postgres.
Our example would produce the following modification, where timestamp
is the
time at which the change was committed and sequence
the operation index within
the transaction:
{
"timestamp": "2019-10-04T16:05:55.123456+01:00",
"sequence": 1,
"namespace": "public",
"name": "example",
"before": null,
"after": {
"id": "1",
"msg": "hello world"
}
}
Also sent, arriving before the modification element, will be a schema entry that
describes the public.example
table. We represent these as Avro schemas, built
from the Postgres catalog information.
{
"timestamp": "2019-10-04T16:05:55.123456+01:00",
"schema": {
"namespace": "public.example",
"type": "record",
"name": "value",
"fields": [
{
"name": "id",
"type": ["long", "null"],
"default": null
},
{
"name": "msg",
"type": ["string", "null"],
"default": null
}
]
}
}
Schemas are published whenever we first discover a relation. Use the timestamp field to order each successive schema event to ensure stale messages don't override more recent data.