From bfdd989f7eeff820dfec530426633c9878b50134 Mon Sep 17 00:00:00 2001 From: Yordan Ivanov Date: Thu, 9 Sep 2021 16:48:52 +0300 Subject: [PATCH] Reuse connection Tap-postgres opens a new connection every time it needs to cast a value. This is highly inefficient as opening a connection is usually a slow and resource-intensive operation. An easy fix would be to use something like PgBouncer, but it's even better if we open just once connection and reuse it for all queries. We created a Singleton Postgres connection wrapper. This wrapper actually holds up to two connections, since we need two different connection factories. The `connect` method returns the connection we need based on the arguments provided. --- tap_postgres/db.py | 21 ++++-------------- tap_postgres/postgres.py | 46 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 17 deletions(-) create mode 100644 tap_postgres/postgres.py diff --git a/tap_postgres/db.py b/tap_postgres/db.py index 591323cd..c8c5fea6 100644 --- a/tap_postgres/db.py +++ b/tap_postgres/db.py @@ -11,6 +11,8 @@ from typing import List from dateutil.parser import parse +from tap_postgres.postgres import Postgres + LOGGER = singer.get_logger('tap_postgres') CURSOR_ITER_SIZE = 20000 @@ -39,23 +41,8 @@ def fully_qualified_table_name(schema, table): def open_connection(conn_config, logical_replication=False): - cfg = { - 'application_name': 'pipelinewise', - 'host': conn_config['host'], - 'dbname': conn_config['dbname'], - 'user': conn_config['user'], - 'password': conn_config['password'], - 'port': conn_config['port'], - 'connect_timeout': 30 - } - - if conn_config.get('sslmode'): - cfg['sslmode'] = conn_config['sslmode'] - - if logical_replication: - cfg['connection_factory'] = psycopg2.extras.LogicalReplicationConnection - - conn = psycopg2.connect(**cfg) + pg = Postgres.get_instance() + conn = pg.open_connection(conn_config, logical_replication) return conn diff --git a/tap_postgres/postgres.py b/tap_postgres/postgres.py new file mode 100644 index 00000000..0569c46a --- /dev/null +++ b/tap_postgres/postgres.py @@ -0,0 +1,46 @@ +import psycopg2 +import psycopg2.extras + + +# pylint: disable=missing-class-docstring,missing-function-docstring +class Postgres: + __instance = None + + @staticmethod + def get_instance(): + if Postgres.__instance is None: + Postgres() + + return Postgres.__instance + + def __init__(self): + if Postgres.__instance is not None: + raise Exception("This class is a singleton!") + + Postgres.__instance = self + self.connections = {"logical": None, "transactional": None} + + def open_connection(self, conn_config, logical_replication): + connection_type = "logical" if logical_replication else "transactional" + + cfg = { + 'application_name': 'pipelinewise', + 'host': conn_config['host'], + 'dbname': conn_config['dbname'], + 'user': conn_config['user'], + 'password': conn_config['password'], + 'port': conn_config['port'], + 'connect_timeout': 30 + } + + if conn_config.get('sslmode'): + cfg['sslmode'] = conn_config['sslmode'] + + if logical_replication: + cfg['connection_factory'] = psycopg2.extras.LogicalReplicationConnection + + if not self.connections[connection_type] or self.connections[connection_type].closed: + self.connections[connection_type] = psycopg2.connect(**cfg) + + return self.connections[connection_type] +