Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add support for Spark Connect #63

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
2e86208
fix: work in progress
mikita-sakalouski Aug 9, 2024
0aa6a53
Merge remote-tracking branch 'origin/main' into 33-feature-ensure-tha…
mikita-sakalouski Sep 10, 2024
a9fe361
feat: add delta test skipping for 3.5
mikita-sakalouski Sep 19, 2024
f2ab79f
refactor: update DataFrame imports to use koheesio.spark across the c…
mikita-sakalouski Sep 30, 2024
f3e6014
chore: update dependencies and improve SQL step handling
mikita-sakalouski Oct 8, 2024
f24ed3e
Added show_string utility
dannymeijer Oct 9, 2024
ecbdc9a
refactor: replace spark_minor_version with SPARK_MINOR_VERSION consta…
mikita-sakalouski Oct 9, 2024
18ff011
few more fixes
dannymeijer Oct 9, 2024
8a84038
Merge remote-tracking branch 'origin/33-feature-ensure-that-we-can-su…
dannymeijer Oct 9, 2024
6a291b2
few more fixes
dannymeijer Oct 9, 2024
3bfe84a
refactor: update utility functions and improve test assertions for cl…
mikita-sakalouski Oct 9, 2024
a3a5ad6
few more fixes
dannymeijer Oct 9, 2024
31bb7d7
Down to the last 36 tests to fix
dannymeijer Oct 14, 2024
c837cd4
fix typo
dannymeijer Oct 14, 2024
3b09e5e
refactor: streamline imports in row_number_dedup.py for clarity
mikita-sakalouski Oct 14, 2024
ac9eee5
refactor: enhance BoxCsvFileReader to use pandas for CSV parsing
mikita-sakalouski Oct 14, 2024
b360d8e
last 24
dannymeijer Oct 15, 2024
cfab89f
fix formatting
dannymeijer Oct 15, 2024
891c7f5
one more test
dannymeijer Oct 15, 2024
349770c
17 more remaining
dannymeijer Oct 16, 2024
1b47c75
Last 21
dannymeijer Oct 16, 2024
4c93701
Last 21
dannymeijer Oct 16, 2024
1a77512
Last 20
dannymeijer Oct 16, 2024
224c0cc
EOD
dannymeijer Oct 21, 2024
42b7d86
fix: refactor connect types
mikita-sakalouski Oct 21, 2024
0c3b4f1
fix: improve session handling and type annotations in connect_utils a…
mikita-sakalouski Oct 21, 2024
586d76a
fix: improve tests
mikita-sakalouski Oct 22, 2024
46a18ca
snowflake refactoring (95% done)
dannymeijer Oct 22, 2024
16290e3
fix: adjust imports os spark connect
mikita-sakalouski Oct 22, 2024
fd415ad
Merge branch '33-feature-ensure-that-we-can-support-dbr-143lts' of pe…
mikita-sakalouski Oct 22, 2024
77aa482
fix: update Snowflake integration tests and improve session handling
mikita-sakalouski Oct 22, 2024
a8219f8
fix: update imports and add type ignores in Snowflake integration
mikita-sakalouski Oct 22, 2024
e32e9a7
fix: disable snowflake tests
mikita-sakalouski Oct 22, 2024
9fda4df
fix: update dependencies and improve Spark integration handling
mikita-sakalouski Oct 22, 2024
f251e87
fix: remove TypeAlias usage and simplify type definitions in common.py
mikita-sakalouski Oct 22, 2024
a7319f6
fix: improve tests
mikita-sakalouski Oct 22, 2024
53bb8ec
Merge remote-tracking branch 'origin/main' into 33-feature-ensure-tha…
mikita-sakalouski Oct 22, 2024
b570064
fix: spark imports
mikita-sakalouski Oct 22, 2024
916e1a8
fix: import DataStreamReader
mikita-sakalouski Oct 22, 2024
635a525
fix: active spark session
mikita-sakalouski Oct 22, 2024
5856960
fix: conftest
mikita-sakalouski Oct 22, 2024
d377bb1
fix: tests
mikita-sakalouski Oct 22, 2024
95a9d70
fix: spark remote parallel
mikita-sakalouski Oct 22, 2024
9baca2c
fix: remote port
mikita-sakalouski Oct 22, 2024
3e63806
fix: try with random port
mikita-sakalouski Oct 22, 2024
7b28ba4
fix: tests
mikita-sakalouski Oct 22, 2024
29e784f
fix: tests
mikita-sakalouski Oct 22, 2024
e9b0aca
fix: fail fast
mikita-sakalouski Oct 22, 2024
988fb03
fix: github action test
mikita-sakalouski Oct 22, 2024
b0fd123
fix: delta packages for builder
mikita-sakalouski Oct 22, 2024
b426f66
fix: delta packages
mikita-sakalouski Oct 22, 2024
14c6519
fix: get_active_session
mikita-sakalouski Oct 22, 2024
a1ce806
fix: tests
mikita-sakalouski Oct 22, 2024
754a21e
fix: handle multiple import errors for AnalysisException and ParseExc…
mikita-sakalouski Oct 22, 2024
7fddd06
refactor: reorganize imports and clean up unused references
mikita-sakalouski Oct 22, 2024
ccaed64
fix: connect parallel testing
mikita-sakalouski Oct 22, 2024
56e4f6c
fix: test
mikita-sakalouski Oct 23, 2024
a1bc658
33 part 2 making snowflake work with connect (#84)
dannymeijer Oct 24, 2024
a9fbd1c
ran make fmt
dannymeijer Oct 24, 2024
5b9c716
small fix
dannymeijer Oct 24, 2024
dcdf3d9
refactor: add type hints and clean up imports across multiple files
mikita-sakalouski Oct 28, 2024
18f8873
Merge branch '33-feature-ensure-that-we-can-support-dbr-143lts' of pe…
mikita-sakalouski Oct 28, 2024
1f85306
refactor: add type hints and improve code clarity in multiple modules
mikita-sakalouski Oct 28, 2024
4f88889
mypy down to 318
dannymeijer Oct 29, 2024
1c742b4
progress
dannymeijer Oct 29, 2024
b63dc7c
as good as I can make it
dannymeijer Oct 29, 2024
4751375
version bump
dannymeijer Oct 29, 2024
67c1e68
typo
dannymeijer Oct 29, 2024
3da04ae
refactor: type hints to use Union for better clarity
mikita-sakalouski Oct 29, 2024
5a3733e
fix: tests
mikita-sakalouski Oct 29, 2024
6af1f56
fix: initialize lists with default_factory in ColumnsTransformation
mikita-sakalouski Oct 29, 2024
2e13df2
fix: initialize lists with None in ColumnsTransformation
mikita-sakalouski Oct 29, 2024
767974c
some more improvements
dannymeijer Oct 29, 2024
c724154
Merge remote-tracking branch 'origin/33-feature-ensure-that-we-can-su…
dannymeijer Oct 29, 2024
d027370
some more improvements
dannymeijer Oct 29, 2024
dba6204
small bugfix
dannymeijer Oct 29, 2024
cc29864
datetime utc fix (and deprecation proof util)
dannymeijer Oct 29, 2024
122f3ef
fix: simplify merge_cond retrieval and improve readability in DeltaTa…
mikita-sakalouski Oct 29, 2024
117c2ba
Merge branch '33-feature-ensure-that-we-can-support-dbr-143lts' of pe…
mikita-sakalouski Oct 29, 2024
ed3b1bf
feat: add support for pull requests on release branches in GitHub Act…
mikita-sakalouski Oct 29, 2024
c9190ba
fix: update GitHub Actions workflow to fetch target branch instead of…
mikita-sakalouski Oct 29, 2024
e324ce2
fix: update GitHub Actions workflow to fallback to 'main' branch if b…
mikita-sakalouski Oct 29, 2024
7d6bbfe
fix: based on PR comments
mikita-sakalouski Oct 29, 2024
aad8c3a
fix: ValidationError
mikita-sakalouski Oct 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
pull_request:
branches:
- main
- release/*
workflow_dispatch:
inputs:
logLevel:
Expand Down Expand Up @@ -40,8 +41,8 @@ jobs:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}
- name: Fetch main branch
run: git fetch origin main:main
- name: Fetch target branch
run: git fetch origin ${{ github.event.pull_request.base.ref || 'main'}}:${{ github.event.pull_request.base.ref || 'main'}}
- name: Check changes
id: check
run: |
Expand Down Expand Up @@ -71,10 +72,12 @@ jobs:
# os: [ubuntu-latest, windows-latest, macos-latest] # FIXME: Add Windows and macOS
os: [ubuntu-latest]
python-version: ['3.9', '3.10', '3.11', '3.12']
pyspark-version: ['33', '34', '35']
pyspark-version: ['33', '34', '35', '35r']
exclude:
- python-version: '3.9'
pyspark-version: '35'
- python-version: '3.9'
pyspark-version: '35r'
- python-version: '3.11'
pyspark-version: '33'
- python-version: '3.11'
Expand All @@ -100,7 +103,7 @@ jobs:
# hatch fmt --check --python=${{ matrix.python-version }}

- name: Run tests
run: hatch test --python=${{ matrix.python-version }} -i version=pyspark${{ matrix.pyspark-version }}
run: hatch test --python=${{ matrix.python-version }} -i version=pyspark${{ matrix.pyspark-version }} --verbose

# https://github.com/marketplace/actions/alls-green#why
final_check: # This job does nothing and is only used for the branch protection
Expand Down
6 changes: 3 additions & 3 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ coverage: cov
all-tests:
@echo "\033[1mRunning all tests:\033[0m\n\033[35m This will run the full test suite\033[0m"
@echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m"
@hatch test --no-header --no-summary
@hatch test --no-header
.PHONY: spark-tests ## testing - Run SPARK tests in ALL environments
spark-tests:
@echo "\033[1mRunning Spark tests:\033[0m\n\033[35m This will run the Spark test suite against all specified environments\033[0m"
@echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m"
@hatch test -m spark --no-header --no-summary
@hatch test -m spark --no-header
.PHONY: non-spark-tests ## testing - Run non-spark tests in ALL environments
non-spark-tests:
@echo "\033[1mRunning non-Spark tests:\033[0m\n\033[35m This will run the non-Spark test suite against all specified environments\033[0m"
@hatch test -m "not spark" --no-header --no-summary
@hatch test -m "not spark" --no-header

.PHONY: dev-test ## testing - Run pytest, with all tests in the dev environment
dev-test:
Expand Down
68 changes: 55 additions & 13 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,30 @@ async_http = [
"nest-asyncio>=1.6.0",
]
box = ["boxsdk[jwt]==3.8.1"]
pandas = ["pandas>=1.3", "setuptools", "numpy<2.0.0"]
pandas = ["pandas>=1.3", "setuptools", "numpy<2.0.0", "pandas-stubs"]
pyspark = ["pyspark>=3.2.0", "pyarrow>13"]
pyspark_connect = ["pyspark[connect]>=3.5"]
se = ["spark-expectations>=2.1.0"]
# SFTP dependencies in to_csv line_iterator
sftp = ["paramiko>=2.6.0"]
delta = ["delta-spark>=2.2"]
excel = ["openpyxl>=3.0.0"]
# Tableau dependencies
tableau = [
"tableauhyperapi>=0.0.19484",
"tableauserverclient>=0.25",
tableau = ["tableauhyperapi>=0.0.19484", "tableauserverclient>=0.25"]
# Snowflake dependencies
snowflake = ["snowflake-connector-python>=3.12.0"]
# Development dependencies
dev = [
"black",
"isort",
"ruff",
"mypy",
"pylint",
"colorama",
"types-PyYAML",
"types-requests",

]
dev = ["black", "isort", "ruff", "mypy", "pylint", "colorama", "types-PyYAML"]
test = [
"chispa",
"coverage[toml]",
Expand Down Expand Up @@ -186,6 +197,7 @@ features = [
"excel",
"se",
"box",
"snowflake",
"tableau",
"dev",
]
Expand Down Expand Up @@ -216,6 +228,7 @@ lint = ["- ruff-fmt", "- mypy-check", "pylint-check"]
log-versions = "python --version && {env:HATCH_UV} pip freeze | grep pyspark"
test = "- pytest{env:HATCH_TEST_ARGS:} {args} -n 2"
spark-tests = "test -m spark"
spark-remote-tests = "test -m spark -m \"not skip_on_remote_session\""
non-spark-tests = "test -m \"not spark\""

# scripts.run = "echo bla {env:HATCH_TEST_ARGS:} {args}"
Expand Down Expand Up @@ -251,17 +264,18 @@ features = [
"sftp",
"delta",
"excel",
"snowflake",
"tableau",
"dev",
"test",
]

parallel = true
retries = 2
retry-delay = 1
retry-delay = 3

[tool.hatch.envs.hatch-test.scripts]
run = "pytest{env:HATCH_TEST_ARGS:} {args} -n auto"
run = "pytest{env:HATCH_TEST_ARGS:} {args}"
run-cov = "coverage run -m pytest{env:HATCH_TEST_ARGS:} {args}"
cov-combine = "coverage combine"
cov-report = "coverage report"
Expand All @@ -273,11 +287,11 @@ version = ["pyspark33", "pyspark34"]

[[tool.hatch.envs.hatch-test.matrix]]
python = ["3.10"]
version = ["pyspark33", "pyspark34", "pyspark35"]
version = ["pyspark33", "pyspark34", "pyspark35", "pyspark35r"]

[[tool.hatch.envs.hatch-test.matrix]]
python = ["3.11", "3.12"]
version = ["pyspark35"]
version = ["pyspark35", "pyspark35r"]

[tool.hatch.envs.hatch-test.overrides]
matrix.version.extra-dependencies = [
Expand All @@ -299,6 +313,9 @@ matrix.version.extra-dependencies = [
{ value = "pyspark>=3.5,<3.6", if = [
"pyspark35",
] },
{ value = "pyspark[connect]>=3.5,<3.6", if = [
"pyspark35r",
] },
]

name.".*".env-vars = [
Expand All @@ -308,10 +325,19 @@ name.".*".env-vars = [
{ key = "KOHEESIO__PRINT_LOGO", value = "False" },
]

name.".*(pyspark35r).*".env-vars = [
# enable soark connect, setting to local as it will trigger
# spark to start local spark server and enbale remote session
{ key = "SPARK_REMOTE", value = "local" },
{ key = "SPARK_TESTING", value = "True" },
]


[tool.pytest.ini_options]
addopts = "-q --color=yes --order-scope=module"
log_level = "CRITICAL"
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "scope"
markers = [
"default: added to all tests by default if no other marker expect of standard pytest markers is present",
"spark: mark a test as a Spark test",
Expand All @@ -325,10 +351,18 @@ filterwarnings = [
# pyspark.pandas warnings
"ignore:distutils.*:DeprecationWarning:pyspark.pandas.*",
"ignore:'PYARROW_IGNORE_TIMEZONE'.*:UserWarning:pyspark.pandas.*",
# pydantic warnings
"ignore:A custom validator is returning a value other than `self`.*.*:UserWarning:pydantic.main.*",
# pyspark.sql.connect warnings
"ignore:is_datetime64tz_dtype.*:DeprecationWarning:pyspark.sql.connect.*",
"ignore:distutils.*:DeprecationWarning:pyspark.sql.connect.*",
# pyspark.sql.pandas warnings
"ignore:distutils.*:DeprecationWarning:pyspark.sql.pandas.*",
"ignore:is_datetime64tz_dtype.*:DeprecationWarning:pyspark.sql.pandas.*",
"ignore:is_categorical_dtype.*:DeprecationWarning:pyspark.sql.pandas.*",
"ignore:iteritems.*:FutureWarning:pyspark.sql.pandas.*",
# Koheesio warnings
"ignore:DayTimeIntervalType .*:UserWarning:koheesio.spark.snowflake.*",
"ignore:DayTimeIntervalType.*:UserWarning:koheesio.spark.snowflake.*",
]

[tool.coverage.run]
Expand Down Expand Up @@ -403,16 +437,16 @@ features = [
"box",
"pandas",
"pyspark",
"se",
# "se",
"sftp",
"snowflake",
"delta",
"excel",
"tableau",
"dev",
"test",
"docs",
]
extra-dependencies = ["pyspark==3.4.*"]


### ~~~~~~~~~~~~~~~~~~ ###
Expand Down Expand Up @@ -569,10 +603,18 @@ unfixable = []
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

[tool.mypy]
python_version = "3.10"
files = ["koheesio/**/*.py"]
plugins = ["pydantic.mypy"]
pretty = true
warn_unused_configs = true
check_untyped_defs = false
disallow_untyped_calls = false
disallow_untyped_defs = true
files = ["koheesio/**/*.py"]
warn_no_return = false
implicit_optional = true
allow_untyped_globals = true
disable_error_code = ["attr-defined", "return-value", "union-attr", "override"]

[tool.pylint.main]
fail-under = 9.5
Expand Down
4 changes: 2 additions & 2 deletions src/koheesio/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

LICENSE_INFO = "Licensed as Apache 2.0"
SOURCE = "https://github.com/Nike-Inc/koheesio"
__version__ = "0.8.1"
__version__ = "0.9.0rc0"
__logo__ = (
75,
(
Expand All @@ -32,7 +32,7 @@


# fmt: off
def _about(): # pragma: no cover
def _about() -> str: # pragma: no cover
"""Return the Koheesio logo and version/about information as a string
Note: this code is not meant to be readable, instead it is written to be as compact as possible
"""
Expand Down
2 changes: 1 addition & 1 deletion src/koheesio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
]


def print_logo():
def print_logo() -> None:
global _logo_printed
global _koheesio_print_logo

Expand Down
17 changes: 7 additions & 10 deletions src/koheesio/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ class AsyncStepMetaClass(StepMetaClass):
It inherits from the StepMetaClass and provides additional functionality for
executing asynchronous steps.

Attributes:
None

Methods:
_execute_wrapper: Wrapper method for executing asynchronous steps.
Methods
-------
_execute_wrapper: Wrapper method for executing asynchronous steps.

"""

def _execute_wrapper(cls, *args, **kwargs):
def _execute_wrapper(cls, *args, **kwargs): # type: ignore[no-untyped-def]
"""Wrapper method for executing asynchronous steps.

This method is called when an asynchronous step is executed. It wraps the
Expand Down Expand Up @@ -60,16 +58,14 @@ class AsyncStepOutput(Step.Output):
Merge key-value map with self.
"""

def merge(self, other: Union[Dict, StepOutput]):
def merge(self, other: Union[Dict, StepOutput]) -> "AsyncStepOutput":
"""Merge key,value map with self

Examples
--------
```python
step_output = StepOutput(foo="bar")
step_output.merge(
{"lorem": "ipsum"}
) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
```

Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.
Expand All @@ -89,6 +85,7 @@ def merge(self, other: Union[Dict, StepOutput]):
return self


# noinspection PyUnresolvedReferences
class AsyncStep(Step, ABC, metaclass=AsyncStepMetaClass):
"""
Asynchronous step class that inherits from Step and uses the AsyncStepMetaClass metaclass.
Expand Down
Loading