Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cast arrays and hstore columns using Python instead of Postgres db #72

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

drdee
Copy link
Contributor

@drdee drdee commented Nov 20, 2019

Description of change

Currently, the tap-postgres is using a live database connection to cast array and hstore columns to their Python equivalent when using logical replication as the synchronization strategy.

This live database connection is expensive and can create backlogs of large volumes of non-consumed data from the replication slot. This in turn can cause out-of-disk space errors which
then have to be prevented by dropping the replication slot and resetting the integration in Stitchdata and doing another full sync.

This PR changes that behaviour and uses Python to do conversion of a Postgres stringified array into a Python list.

I've also added unit tests to demonstrate that this approach works. The fixture data was generated by

  • creating a Postgres table using the SQL script below
  • consume the events from the Postgres replication slot using the Python script at the bottom
CREATE EXTENSION IF NOT EXISTS citext;
DROP TABLE test;
CREATE TABLE IF NOT EXISTS test (
    id SERIAL,
    bit_array bit(2)[] NULL,
    boolean_array boolean[] NULL,
    cidr_array cidr[] NULL,
    citext_array citext[] NULL,
    date_array date[] NULL,
    double_precision_array double precision[] NULL,
    hstore_array hstore[] NULL,
    int_array integer[] NULL,
    bigint_array bigint[] NULL,
    inet_array inet[] NULL,
    json_array json[] NULL,
    jsonb_array jsonb[] NULL,
    macaddr_array macaddr[] NULL,
    money_array money[] NULL,
    decimal_array numeric[] NULL,
    real_array real[] NULL,
    smallint_array smallint[] NULL,
    text_array text[] NULL,
    time_array time without time zone[] NULL,
    timestamp_array timestamp with time zone[] NULL,
    uuid_array uuid[] NULL,
    PRIMARY KEY(id)
    );

BEGIN;
INSERT INTO test (bit_array,boolean_array,cidr_array,citext_array,date_array,double_precision_array,hstore_array,int_array,bigint_array,inet_array,json_array,jsonb_array,macaddr_array,money_array,decimal_array,real_array,smallint_array,text_array,time_array,timestamp_array,uuid_array)
    VALUES(
        '{"10", "01", null}',
        '{"true", "false", null}',
        '{"127.0.0.1", "10.0.0.0/32", null}',
        '{"CASE_INSENSITIVE", "case_insensitive", null, "CASE,,INSENSITIVE''"}',
        '{"12-31-2000", "01-01-2001", null}',
        '{"3.14159265359000", "3.1415926", null}',
        '{"foo => bar", "foo => null", null, "foo => bar,"}',
        '{"1", "2", null}',
        '{"9223372036854775807", null}',
        '{"198.24.10.0/24", null}',
        '{"{\"foo\":\"bar\"}", null}', -- json_array
        '{"{\"foo\":\"bar\"}", null}', -- jsonb_array
        '{"08:00:2b:01:02:03", null}',
        '{"19.99", null}', -- money_array,
        '{"19.9999999", null}', -- decimal_array,
        '{"3.14159", null}', -- real_array,
        '{"0", "1", null}', -- smallint_array,
        '{"foo", "bar", null, "foo,bar","diederik''s motel "}', -- text_array,
        '{"16:38:47+00:00", null}', -- time_array,
        '{"2019-11-19T16:38:47+00:00", null}', -- timestamp_array,
        '{"123e4567-e89b-12d3-a456-426655440000", null}' -- uuid_array
    );

COMMIT;
import time
import json
import psycopg2
from psycopg2.extras import LogicalReplicationConnection

conn = psycopg2.connect(
    'dbname=postgres user=postgres password=postgres host=127.0.0.1',
    connection_factory=psycopg2.extras.LogicalReplicationConnection
    )
cur = conn.cursor()
cur.start_replication(slot_name='test_slot', decode=True)
try:
    while True:
        try:
            message = cur.read_message()
        except psycopg2.DatabaseError as e:
            print("capture", e)
        if message:
            print(json.loads(message.payload))
            message.cursor.send_feedback(flush_lsn=message.data_start)
        else:            
            time.sleep(1.0)
except KeyboardInterrupt:
    cur.close()
    conn.close()

QA steps

  • automated tests passing
  • manual qa steps passing (list below)

Risks

Rollback steps

  • revert this branch

@dmosorast @cmerrick @dylan-stitch

@drdee drdee requested review from KAllan357 and luandy64 and removed request for KAllan357 January 27, 2020 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant