diff --git a/ods_tools/odtf/connector/__init__.py b/ods_tools/odtf/connector/__init__.py index 236820f4..8c5cc1b6 100644 --- a/ods_tools/odtf/connector/__init__.py +++ b/ods_tools/odtf/connector/__init__.py @@ -1,6 +1,5 @@ from .base import BaseConnector from .csv import CsvConnector -from .db import PostgresConnector, SQLiteConnector, SQLServerConnector __all__ = [ diff --git a/ods_tools/odtf/connector/db/__init__.py b/ods_tools/odtf/connector/db/__init__.py deleted file mode 100644 index 65d0e6d9..00000000 --- a/ods_tools/odtf/connector/db/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .mssql import SQLServerConnector -from .postgres import PostgresConnector -from .sqlite import SQLiteConnector - - -__all__ = [ - "SQLiteConnector", - "PostgresConnector", - "SQLServerConnector", -] diff --git a/ods_tools/odtf/connector/db/base.py b/ods_tools/odtf/connector/db/base.py deleted file mode 100644 index b05ca7ff..00000000 --- a/ods_tools/odtf/connector/db/base.py +++ /dev/null @@ -1,169 +0,0 @@ -from typing import Any, Dict, Iterable, List - -import sqlparams -import sqlparse - -from ..base import BaseConnector - -from .errors import DBQueryError - - -class BaseDBConnector(BaseConnector): - """ - Connects to a database for reading and writing data. - - **Options:** - - * `host` - Which host to use when connecting to the database - * `port` - The port to use when connecting to the database - * `database` - The database name or relative path to the file for sqlite3 - * `user` - The username to use when connecting to the database - * `password` - The password to use when connecting to the database - * `select_statement` - sql query to read the data from - * `insert_statement` - sql query to insert the data from - """ - - name = "BaseDB Connector" - options_schema = { - "type": "object", - "properties": { - "host": { - "type": "string", - "description": ( - "Which host to use when connecting to the database. " - "Not used with SQLite." - ), - "default": "", - "title": "Host", - }, - "port": { - "type": "string", - "description": ( - "The port to use when connecting to the database. " - "Not used with SQLite." - ), - "default": "", - "title": "Port", - }, - "database": { - "type": "string", - "description": ( - "The database name or relative path to the file for " - "sqlite3" - ), - "title": "Database", - }, - "user": { - "type": "string", - "description": ( - "The username to use when connecting to the database. " - "Not used with SQLite." - ), - "default": "", - "title": "User", - }, - "password": { - "type": "password", - "description": ( - "The password to use when connecting to the database. " - "Not used with SQLite." - ), - "default": "", - "title": "Password", - }, - "sql_statement": { - "type": "string", - "description": "The path to the file which contains the " - "sql statement to run", - "subtype": "path", - "title": "Select Statement File", - }, - }, - "required": ["database", "select_statement", "insert_statement"], - } - sql_params_output = "qmark" - - def __init__(self, config, **options): - super().__init__(config, **options) - - self.database = { - "host": options.get("host", ""), - "port": options.get("port", ""), - "database": options["database"], - "user": options.get("user", ""), - "password": options.get("password", ""), - } - self.sql_statement_path = config.absolute_path( - options["sql_statement"] - ) - - def _create_connection(self, database: Dict[str, str]): - raise NotImplementedError() - - def _get_cursor(self, conn): - cur = conn.cursor() - return cur - - def _get_select_statement(self) -> str: - """ - SQL string to select the data from the DB - - :return: string - """ - with open(self.sql_statement_path) as f: - select_statement = f.read() - - return select_statement - - def _get_insert_statements(self) -> List[str]: - """ - SQL string(s) to insert the data into the DB - - :return: List of sql statements - """ - with open(self.sql_statement_path) as f: - sql = f.read() - - return sqlparse.split(sql) - - def load(self, data: Iterable[Dict[str, Any]]): - insert_sql = self._get_insert_statements() - data = list( - data - ) # convert iterable to list as we reuse it based on number of queries - conn = self._create_connection(self.database) - - with conn: - cur = self._get_cursor(conn) - query = sqlparams.SQLParams("named", self.sql_params_output) - - # insert query can contain more than 1 statement - for line in insert_sql: - sql, params = query.formatmany(line, data) - try: - cur.executemany(sql, params) - except Exception as e: - raise DBQueryError(sql, e, data=data) - - def row_to_dict(self, row): - """ - Convert the row returned from the cursor into a dictionary - - :return: Dict - """ - return dict(row) - - def extract(self) -> Iterable[Dict[str, Any]]: - select_sql = self._get_select_statement() - conn = self._create_connection(self.database) - - with conn: - cur = self._get_cursor(conn) - try: - cur.execute(select_sql) - except Exception as e: - raise DBQueryError(select_sql, e) - - rows = cur.fetchall() - for row in rows: - yield self.row_to_dict(row) diff --git a/ods_tools/odtf/connector/db/errors.py b/ods_tools/odtf/connector/db/errors.py deleted file mode 100644 index 53bbf376..00000000 --- a/ods_tools/odtf/connector/db/errors.py +++ /dev/null @@ -1,18 +0,0 @@ -from ...errors import ConverterError - - -class DBConnectionError(ConverterError): - pass - - -class DBQueryError(ConverterError): - def __init__(self, query, error, data=None): - self.query = query - self.data = data - self.error = error - - super().__init__(f"Error running query: {query} with {data} - {error}") - - -class DBInsertDataError(ConverterError): - pass diff --git a/ods_tools/odtf/connector/db/mssql.py b/ods_tools/odtf/connector/db/mssql.py deleted file mode 100644 index 03ea9893..00000000 --- a/ods_tools/odtf/connector/db/mssql.py +++ /dev/null @@ -1,42 +0,0 @@ -from typing import Dict - -import pyodbc - -from .base import BaseDBConnector -from .errors import DBConnectionError - - -class SQLServerConnector(BaseDBConnector): - """ - Connects to an Microsoft SQL Server for reading and writing data. - """ - - name = "SQL Server Connector" - driver = "{ODBC Driver 17 for SQL Server}" - - def _create_connection(self, database: Dict[str, str]): - """ - Create database connection to the SQLite database specified in database - :param database: Dict object with connection info - - :return: Connection object - """ - - try: - conn = pyodbc.connect( - "DRIVER={};SERVER={};PORT={};DATABASE={};UID={};PWD={}".format( - self.driver, - database["host"], - database["port"], - database["database"], - database["user"], - database["password"], - ) - ) - except Exception: - raise DBConnectionError() - - return conn - - def row_to_dict(self, row): - return dict(zip([t[0] for t in row.cursor_description], row)) diff --git a/ods_tools/odtf/connector/db/postgres.py b/ods_tools/odtf/connector/db/postgres.py deleted file mode 100644 index a41dc6c2..00000000 --- a/ods_tools/odtf/connector/db/postgres.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Dict - -import psycopg2 -import psycopg2.extras - -from .base import BaseDBConnector -from .errors import DBConnectionError - - -class PostgresConnector(BaseDBConnector): - """ - Connects to a Postgres database for reading and writing data. - """ - - name = "Postgres Connector" - sql_params_output = "pyformat" - - def _create_connection(self, database: Dict[str, str]): - """ - Create database connection to the Postgres database - :param database: Dict with database connection settings - - :return: Connection object - """ - try: - conn = psycopg2.connect(**database) - except Exception: - raise DBConnectionError() - - return conn - - def _get_cursor(self, conn): - cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - return cur diff --git a/ods_tools/odtf/connector/db/sqlite.py b/ods_tools/odtf/connector/db/sqlite.py deleted file mode 100644 index 75cca99f..00000000 --- a/ods_tools/odtf/connector/db/sqlite.py +++ /dev/null @@ -1,55 +0,0 @@ -import sqlite3 -from sqlite3 import Error -from typing import Dict - -from .base import BaseDBConnector -from .errors import DBConnectionError - - -class SQLiteConnector(BaseDBConnector): - """ - Connects to an sqlite file on the local machine for reading and writing - data. - """ - - name = "SQLite Connector" - options_schema = { - "type": "object", - "properties": { - "database": { - "type": "string", - "description": ( - "The database name or relative path to the file for " - "sqlite3" - ), - "title": "Database", - "subtype": "path", - }, - "sql_statement": { - "type": "string", - "description": "The path to the file which contains the " - "sql statement to run", - "subtype": "path", - "title": "Select Statement File", - }, - }, - "required": ["database", "select_statement", "insert_statement"], - } - - def _create_connection(self, database: Dict[str, str]): - """ - Create database connection to the SQLite database specified in database - :param database: Dict object with connection info - - :return: Connection object - """ - - try: - conn = sqlite3.connect( - self.config.absolute_path(database["database"]) - ) - except Error: - raise DBConnectionError() - - conn.row_factory = sqlite3.Row - return conn diff --git a/ods_tools/odtf/controller.py b/ods_tools/odtf/controller.py index a853bd34..e1eb4d1b 100644 --- a/ods_tools/odtf/controller.py +++ b/ods_tools/odtf/controller.py @@ -57,7 +57,7 @@ def run(self): thread.join() else: for c in transformation_configs: - self._run_transformation(c) + self._run_transformation(c, pbar) get_logger().info( f"Transformation finished in {datetime.now() - start_time}" diff --git a/ods_tools/odtf/runner/__init__.py b/ods_tools/odtf/runner/__init__.py index 551bae96..2599936c 100644 --- a/ods_tools/odtf/runner/__init__.py +++ b/ods_tools/odtf/runner/__init__.py @@ -1,14 +1,8 @@ from .base import BaseRunner -from .dask import DaskRunner -from .eager import EagerRunner -from .modin import ModinRunner from .pandas import PandasRunner __all__ = [ "BaseRunner", - "DaskRunner", - "EagerRunner", "PandasRunner", - "ModinRunner", ] diff --git a/ods_tools/odtf/runner/dask.py b/ods_tools/odtf/runner/dask.py deleted file mode 100644 index 63286980..00000000 --- a/ods_tools/odtf/runner/dask.py +++ /dev/null @@ -1,53 +0,0 @@ -from itertools import chain, islice - -import dask -import pandas as pd -from dask import dataframe as dd - -from .pandas import PandasRunner - - -@dask.delayed -def read_pandas_chunk(c): - # pragma: no cover - # This is not picked up by coverage since it's a delayed dask object - return pd.DataFrame(c) - - -class DaskRunner(PandasRunner): - dataframe_type = dd.DataFrame - series_type = dd.Series - name = "Dask" - options_schema = { - "type": "object", - "properties": { - "chunk_size": { - "type": "int", - "default": 10000, - "title": "Chunk Size", - } - }, - } - - def __init__(self, config, **options): - super().__init__(config, **options) - - self.chunk_size = int(options.get("chunk_size", 10000)) - - def create_series(self, index, value): - return index.to_series().apply(lambda x: value) - - def chunk(self, iterable): - iterable = iter(iterable) - while True: - try: - slice = islice(iterable, self.chunk_size) - first_elem = next(slice) - yield chain((first_elem,), slice) - except StopIteration: - return - - def get_dataframe(self, extractor): - return dd.from_delayed( - [read_pandas_chunk(c) for c in self.chunk(extractor.extract())], - ) diff --git a/ods_tools/odtf/runner/eager.py b/ods_tools/odtf/runner/eager.py deleted file mode 100644 index bcf385d5..00000000 --- a/ods_tools/odtf/runner/eager.py +++ /dev/null @@ -1,37 +0,0 @@ -from functools import reduce -from typing import Any, AsyncIterable, Dict - -from ..connector import BaseConnector -from ..mapping import BaseMapping -from ..runner.base import BaseAsyncRunner -from ..notset import NotSet, NotSetType - - -class EagerRunner(BaseAsyncRunner): - name = "Eager" - - async def transform( - self, extractor: BaseConnector, mapping: BaseMapping - ) -> AsyncIterable[Dict[str, Any]]: - """ - Runs the transformation on each row as its passed in and yields the - result to the loader - - :param extractor: The data connection to extract data from - :param mapping: Mapping object describing the transformations to apply - - :return: An async iterable containing the transformed data - """ - transformations = mapping.get_transformations() - - async for row in extractor.aextract(): - transformed = reduce( - self.apply_transformation_set, transformations, row - ) - - if isinstance(transformed, NotSetType): - continue - - if len([v for v in transformed.values() if v != NotSet]) > 0: - # only yield rows that have some values set - yield transformed diff --git a/ods_tools/odtf/runner/modin.py b/ods_tools/odtf/runner/modin.py deleted file mode 100644 index d62ec17e..00000000 --- a/ods_tools/odtf/runner/modin.py +++ /dev/null @@ -1,39 +0,0 @@ -import os - -from ..files.csv import BufferedCsvReader -from ..notset import NotSetType -from .pandas import PandasRunner - - -class ModinRunner(PandasRunner): - name = "Modin" - options_schema = { - "type": "object", - "properties": { - "engine": { - "type": "string", - "enum": ["dask", "ray"], - "default": "dask", - "title": "Engine", - } - }, - } - - def __init__(self, config, **options): - super().__init__(config, **options) - self.engine = options.get("engine", "dask") - - def get_dataframe(self, extractor): - os.environ.setdefault("MODIN_ENGINE", self.engine) - import modin.pandas as pd # must be imported after modin engine is set - - self.dataframe_type = pd.DataFrame - self.series_type = pd.Series - return pd.read_csv(BufferedCsvReader(extractor.extract())) - - def combine_column(self, *args, **kwargs): - combined = super().combine_column(*args, **kwargs) - if not isinstance(combined, NotSetType) and "__reduced__" in combined: - return combined["__reduced__"] - else: - return combined diff --git a/ods_tools/odtf/runner/pandas.py b/ods_tools/odtf/runner/pandas.py index 40940b5f..7f8816c8 100644 --- a/ods_tools/odtf/runner/pandas.py +++ b/ods_tools/odtf/runner/pandas.py @@ -214,15 +214,15 @@ class PandasRunner(BaseRunner): row_value_conversions = { "int": lambda col, nullable, null_values: col.apply( type_converter((lambda v: int(float(v))), nullable, null_values), - convert_dtype=False, + # convert_dtype=False, ), "float": lambda col, nullable, null_values: col.apply( type_converter(float, nullable, null_values), - convert_dtype=False, + # convert_dtype=False, ), "string": lambda col, nullable, null_values: col.apply( type_converter(str, nullable, null_values), - convert_dtype=False, + # convert_dtype=False, ), } diff --git a/requirements.in b/requirements.in index b65388d6..35aa4b0b 100644 --- a/requirements.in +++ b/requirements.in @@ -7,9 +7,4 @@ oasis-data-manager packaging PyYAML lark -pyodbc -sqlparams -sqlparse -psycopg2 networkx -dask \ No newline at end of file