Skip to content

Commit

Permalink
Abstract interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ybressler committed May 29, 2024
1 parent 92b5954 commit 943d205
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
17 changes: 17 additions & 0 deletions src/process_data/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
from typing import Any


class BaseProcessDataInterface(ABC):
"""
Baseclass for data processing interfaces
"""

@classmethod
@abstractmethod
def in_memory(cls, file_name: str) -> Any: ...

@classmethod
@abstractmethod
def streaming(cls, file_name: str) -> Any: ...
13 changes: 8 additions & 5 deletions src/process_data/pandas/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@

import argparse
import time

import pandas as pd
from parallel_pandas import ParallelPandas

from src.process_data.base import BaseProcessDataInterface


class PandasThing:
class PandasInterface(BaseProcessDataInterface):
"""
Interface for executing pandas transformations.
Use as follows:
>> df = PandasThing.in_memory("foo.txt")
>> df = PandasInterface.in_memory("foo.txt")
Alternatively, enable parallel processing
>> df = PandasThing(parallel = True).in_memory("foo.txt")
>> df = PandasInterface(parallel = True).in_memory("foo.txt")
However, initializing with parallel processing won't really affect us,
since most of our data processing is IO bound.
Expand Down Expand Up @@ -70,7 +73,7 @@ def process_chunk(cls, chunk: pd.DataFrame, df_result: pd.DataFrame) -> pd.DataF
return df_result # noqa

@classmethod
def in_chunks(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame:
def streaming(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame:
"""
Process parts of the file, concat results, and continue
Expand Down Expand Up @@ -98,7 +101,7 @@ def in_chunks(cls, filename: str, chunksize: int = 1_000_000) -> pd.DataFrame:
args = parser.parse_args()

start = time.time()
df = PandasThing().in_chunks(args.file_name)
df = PandasInterface().streaming(args.file_name)

duration = time.time() - start
print(f"Duration = {duration: .2f}s")
Expand Down
8 changes: 5 additions & 3 deletions src/process_data/polars/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import time
import polars as pl

from src.process_data.base import BaseProcessDataInterface

class PolarsThing:

class PolarsInterface(BaseProcessDataInterface):
"""
Interface for executing polars transformations.
Use as follows:
>> df = PolarsThing.in_memory("foo.txt")
>> df = PolarsInterface.in_memory("foo.txt")
"""

Expand Down Expand Up @@ -82,7 +84,7 @@ def streaming(cls, filename: str) -> pl.DataFrame:
args = parser.parse_args()

start = time.time()
df = PolarsThing().streaming(args.file_name)
df = PolarsInterface().streaming(args.file_name)

duration = time.time() - start
print(f"Duration = {duration: .2f}s")
Expand Down

0 comments on commit 943d205

Please sign in to comment.