Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Reuse connection #112

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions tap_postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

CURSOR_ITER_SIZE = 20000

TRANSACTIONAL_CONNECTION = None
LOGICAL_CONNECTION = None


# pylint: disable=invalid-name,missing-function-docstring
def calculate_destination_stream_name(stream, md_map):
Expand Down Expand Up @@ -53,11 +56,33 @@ def open_connection(conn_config, logical_replication=False):
cfg['sslmode'] = conn_config['sslmode']

if logical_replication:
cfg['connection_factory'] = psycopg2.extras.LogicalReplicationConnection
return open_logical_connection(cfg)

return open_transactional_connection(cfg)


# pylint: disable=global-statement
def open_transactional_connection(conn_config):
global TRANSACTIONAL_CONNECTION

if TRANSACTIONAL_CONNECTION is None or TRANSACTIONAL_CONNECTION.closed:
LOGGER.info("OPENING TRANSACTIONAL CONNECTION")
TRANSACTIONAL_CONNECTION = psycopg2.connect(**conn_config)

return TRANSACTIONAL_CONNECTION


# pylint: disable=global-statement
def open_logical_connection(conn_config):
global LOGICAL_CONNECTION

conn_config['connection_factory'] = psycopg2.extras.LogicalReplicationConnection

conn = psycopg2.connect(**cfg)
if LOGICAL_CONNECTION is None or LOGICAL_CONNECTION.closed:
LOGGER.info("OPENING LOGICAL CONNECTION")
LOGICAL_CONNECTION = psycopg2.connect(**conn_config)

return conn
return LOGICAL_CONNECTION

def prepare_columns_for_select_sql(c, md_map):
column_name = ' "{}" '.format(canonicalize_identifier(c))
Expand Down