Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Reuse connection
Browse files Browse the repository at this point in the history
Tap-postgres opens a new connection every time it needs to cast a value.
This is highly inefficient as opening a connection is usually a slow and
resource-intensive operation. An easy fix would be to use something like
PgBouncer, but it's even better if we open just once connection and
reuse it for all queries.

We created a Singleton Postgres connection wrapper. This wrapper
actually holds up to two connections, since we need two different
connection factories. The `connect` method returns the connection we
need based on the arguments provided.
  • Loading branch information
ivanovyordan committed Sep 9, 2021
1 parent b1ba14d commit bfdd989
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 17 deletions.
21 changes: 4 additions & 17 deletions tap_postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from typing import List
from dateutil.parser import parse

from tap_postgres.postgres import Postgres

LOGGER = singer.get_logger('tap_postgres')

CURSOR_ITER_SIZE = 20000
Expand Down Expand Up @@ -39,23 +41,8 @@ def fully_qualified_table_name(schema, table):


def open_connection(conn_config, logical_replication=False):
cfg = {
'application_name': 'pipelinewise',
'host': conn_config['host'],
'dbname': conn_config['dbname'],
'user': conn_config['user'],
'password': conn_config['password'],
'port': conn_config['port'],
'connect_timeout': 30
}

if conn_config.get('sslmode'):
cfg['sslmode'] = conn_config['sslmode']

if logical_replication:
cfg['connection_factory'] = psycopg2.extras.LogicalReplicationConnection

conn = psycopg2.connect(**cfg)
pg = Postgres.get_instance()
conn = pg.open_connection(conn_config, logical_replication)

return conn

Expand Down
46 changes: 46 additions & 0 deletions tap_postgres/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import psycopg2
import psycopg2.extras


# pylint: disable=missing-class-docstring,missing-function-docstring
class Postgres:
__instance = None

@staticmethod
def get_instance():
if Postgres.__instance is None:
Postgres()

return Postgres.__instance

def __init__(self):
if Postgres.__instance is not None:
raise Exception("This class is a singleton!")

Postgres.__instance = self
self.connections = {"logical": None, "transactional": None}

def open_connection(self, conn_config, logical_replication):
connection_type = "logical" if logical_replication else "transactional"

cfg = {
'application_name': 'pipelinewise',
'host': conn_config['host'],
'dbname': conn_config['dbname'],
'user': conn_config['user'],
'password': conn_config['password'],
'port': conn_config['port'],
'connect_timeout': 30
}

if conn_config.get('sslmode'):
cfg['sslmode'] = conn_config['sslmode']

if logical_replication:
cfg['connection_factory'] = psycopg2.extras.LogicalReplicationConnection

if not self.connections[connection_type] or self.connections[connection_type].closed:
self.connections[connection_type] = psycopg2.connect(**cfg)

return self.connections[connection_type]

0 comments on commit bfdd989

Please sign in to comment.