diff --git a/src/process_data/base.py b/src/process_data/base.py new file mode 100644 index 0000000..c0c3a87 --- /dev/null +++ b/src/process_data/base.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +from abc import ABC, abstractmethod +from typing import Any + + +class BaseProcessDataInterface(ABC): + """ + Baseclass for data processing interfaces + """ + + @classmethod + @abstractmethod + def in_memory(cls, file_name: str) -> Any: ... + + @classmethod + @abstractmethod + def streaming(cls, file_name: str) -> Any: ... diff --git a/src/process_data/pandas/main.py b/src/process_data/pandas/main.py index e0583a0..bed16b3 100644 --- a/src/process_data/pandas/main.py +++ b/src/process_data/pandas/main.py @@ -5,19 +5,22 @@ import argparse import time + import pandas as pd from parallel_pandas import ParallelPandas +from src.process_data.base import BaseProcessDataInterface + -class PandasThing: +class PandasInterface(BaseProcessDataInterface): """ Interface for executing pandas transformations. Use as follows: - >> df = PandasThing.in_memory("foo.txt") + >> df = PandasInterface.in_memory("foo.txt") Alternatively, enable parallel processing - >> df = PandasThing(parallel = True).in_memory("foo.txt") + >> df = PandasInterface(parallel = True).in_memory("foo.txt") However, initializing with parallel processing won't really affect us, since most of our data processing is IO bound. @@ -70,7 +73,7 @@ def process_chunk(cls, chunk: pd.DataFrame, df_result: pd.DataFrame) -> pd.DataF return df_result # noqa @classmethod - def in_chunks(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame: + def streaming(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame: """ Process parts of the file, concat results, and continue @@ -98,7 +101,7 @@ def in_chunks(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame: args = parser.parse_args() start = time.time() - df = PandasThing().in_chunks(args.file_name) + df = PandasInterface().streaming(args.file_name) duration = time.time() - start print(f"Duration = {duration: .2f}s") diff --git a/src/process_data/polars/main.py b/src/process_data/polars/main.py index ec8d402..624ff66 100644 --- a/src/process_data/polars/main.py +++ b/src/process_data/polars/main.py @@ -7,13 +7,15 @@ import time import polars as pl +from src.process_data.base import BaseProcessDataInterface -class PolarsThing: + +class PolarsInterface(BaseProcessDataInterface): """ Interface for executing polars transformations. Use as follows: - >> df = PolarsThing.in_memory("foo.txt") + >> df = PolarsInterface.in_memory("foo.txt") """ @@ -82,7 +84,7 @@ def streaming(cls, filename: str) -> pl.DataFrame: args = parser.parse_args() start = time.time() - df = PolarsThing().streaming(args.file_name) + df = PolarsInterface().streaming(args.file_name) duration = time.time() - start print(f"Duration = {duration: .2f}s")