Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Upgrade to Airflow 2.5.0 (#939)
Browse files Browse the repository at this point in the history
* Bump Airflow to v2.5.0

* Replace PostgresResultOperator with SQLExecuteQueryOperator

* Replace PostgresOperator with SQLExecuteQueryOperator

* Only apply warning ignorations to affected tests
  • Loading branch information
AetherUnbound committed Jan 10, 2023
1 parent 2ac1c94 commit 61714f6
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 171 deletions.
82 changes: 0 additions & 82 deletions openverse_catalog/dags/common/operators/postgres_result.py

This file was deleted.

12 changes: 7 additions & 5 deletions openverse_catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow import DAG
from airflow.models.dagrun import DagRun
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.settings import SASession
from airflow.utils.session import provide_session
from airflow.utils.state import State
Expand All @@ -45,7 +46,6 @@
OPENLEDGER_API_CONN_ID,
XCOM_PULL_TEMPLATE,
)
from common.operators.postgres_result import PostgresResultOperator
from data_refresh.data_refresh_task_factory import create_data_refresh_task_group
from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh
from data_refresh.refresh_popularity_metrics_task_factory import (
Expand Down Expand Up @@ -206,11 +206,12 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
)

# Get the current number of records in the target API table
before_record_count = PostgresResultOperator(
before_record_count = SQLExecuteQueryOperator(
task_id="get_before_record_count",
postgres_conn_id=OPENLEDGER_API_CONN_ID,
conn_id=OPENLEDGER_API_CONN_ID,
sql=count_sql,
handler=_single_value,
return_last=True,
)

# Refresh underlying popularity tables. This is required infrequently in order
Expand All @@ -231,11 +232,12 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
)

# Get the final number of records in the API table after the refresh
after_record_count = PostgresResultOperator(
after_record_count = SQLExecuteQueryOperator(
task_id="get_after_record_count",
postgres_conn_id=OPENLEDGER_API_CONN_ID,
conn_id=OPENLEDGER_API_CONN_ID,
sql=count_sql,
handler=_single_value,
return_last=True,
)

# Report the count difference to Slack
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from common.constants import POSTGRES_CONN_ID
from common.licenses import NO_LICENSE_FOUND, get_license_info
Expand Down Expand Up @@ -137,17 +137,17 @@ def create_preingestion_tasks():
},
)

create_inaturalist_schema = PostgresOperator(
create_inaturalist_schema = SQLExecuteQueryOperator(
task_id="create_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / "create_schema.sql").read_text(),
)

with TaskGroup(group_id="load_source_files") as load_source_files:
for source_name in SOURCE_FILE_NAMES:
PostgresOperator(
SQLExecuteQueryOperator(
task_id=f"load_{source_name}",
postgres_conn_id=POSTGRES_CONN_ID,
conn_id=POSTGRES_CONN_ID,
sql=(SCRIPT_DIR / f"{source_name}.sql").read_text(),
),

Expand All @@ -157,9 +157,9 @@ def create_preingestion_tasks():

@staticmethod
def create_postingestion_tasks():
drop_inaturalist_schema = PostgresOperator(
drop_inaturalist_schema = SQLExecuteQueryOperator(
task_id="drop_inaturalist_schema",
postgres_conn_id=POSTGRES_CONN_ID,
conn_id=POSTGRES_CONN_ID,
sql="DROP SCHEMA IF EXISTS inaturalist CASCADE",
)
return drop_inaturalist_schema
5 changes: 0 additions & 5 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,12 @@ addopts =
# This occurs because the default config is loaded when running `just test --extended`
# which happens to still have SMTP credential defaults assigned in it. We do not set
# these anywhere in the dev stack so it can be safely ignored.
# Subdag
# This appears to be coming from Airflow internals during testing as a result of
# loading the example DAGs:
# /opt/airflow/.local/lib/python3.10/site-packages/airflow/example_dags/example_subdag_operator.py:43: RemovedInAirflow3Warning
# distutils
# Warning in dependency, nothing we can do
# flask
# Warning in dependency, nothing we can do
# "removed"/"remoevd" is due to https://github.com/pallets/flask/pull/4757
filterwarnings=
ignore:Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead:airflow.exceptions.RemovedInAirflow3Warning
ignore:This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning
ignore:distutils Version classes are deprecated. Use packaging.version instead:DeprecationWarning
ignore:.*is deprecated and will be (remoevd|removed) in Flask 2.3.:DeprecationWarning
2 changes: 1 addition & 1 deletion requirements_prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Note: Unpinned packages have their versions determined by the Airflow constraints file

apache-airflow[amazon,postgres,http]==2.4.2
apache-airflow[amazon,postgres,http]==2.5.0
lxml
psycopg2-binary
requests-file==1.5.1
Expand Down
71 changes: 0 additions & 71 deletions tests/dags/common/operators/test_postgres_result.py

This file was deleted.

14 changes: 14 additions & 0 deletions tests/dags/common/sensors/test_single_run_external_dags_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@ def create_dagrun(dag, dag_state):
)


# This appears to be coming from Airflow internals during testing as a result of
# loading the example DAGs:
# /opt/airflow/.local/lib/python3.10/site-packages/airflow/example_dags/example_subdag_operator.py:43: RemovedInAirflow3Warning # noqa: E501
@pytest.mark.filterwarnings(
"ignore:This class is deprecated. Please use "
"`airflow.utils.task_group.TaskGroup`.:airflow.exceptions.RemovedInAirflow3Warning"
)
# This also appears to be coming from Airflow internals during testing as a result of
# loading the example bash operator DAG:
# /home/airflow/.local/lib/python3.10/site-packages/airflow/models/dag.py:3492: RemovedInAirflow3Warning # noqa: E501
@pytest.mark.filterwarnings(
"ignore:Param `schedule_interval` is deprecated and will be removed in a future release. "
"Please use `schedule` instead.:airflow.exceptions.RemovedInAirflow3Warning"
)
class TestExternalDAGsSensor(unittest.TestCase):
def setUp(self):
Pool.create_or_update_pool(TEST_POOL, slots=1, description="test pool")
Expand Down

0 comments on commit 61714f6

Please sign in to comment.