From 3bb6261bba140981d720009f5972e10e29139d7a Mon Sep 17 00:00:00 2001 From: Kensuke Nakazawa <62752589+knakazawa99@users.noreply.github.com> Date: Thu, 25 Apr 2024 07:03:13 +0900 Subject: [PATCH] Feature Implementation: AWS Glue Job Execution Support (#308) Co-authored-by: Alexander Streed Co-authored-by: Alexander Streed --- docs/glue_job.md | 6 ++ mkdocs.yml | 1 + prefect_aws/glue_job.py | 188 +++++++++++++++++++++++++++++++++++++ requirements.txt | 3 +- tests/mock_aws_credentials | 6 +- tests/test_glue_job.py | 154 ++++++++++++++++++++++++++++++ 6 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 docs/glue_job.md create mode 100644 prefect_aws/glue_job.py create mode 100644 tests/test_glue_job.py diff --git a/docs/glue_job.md b/docs/glue_job.md new file mode 100644 index 00000000..2908049c --- /dev/null +++ b/docs/glue_job.md @@ -0,0 +1,6 @@ +--- +description: Tasks for interacting with AWS Glue Job +notes: This documentation page is generated from source file docstrings. +--- + +::: prefect_aws.glue_job diff --git a/mkdocs.yml b/mkdocs.yml index d5084099..87aebb08 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -90,6 +90,7 @@ nav: - Lambda: lambda_function.md - Deployments: - Steps: deployments/steps.md + - Glue Job: glue_job.md - S3: s3.md - Secrets Manager: secrets_manager.md diff --git a/prefect_aws/glue_job.py b/prefect_aws/glue_job.py new file mode 100644 index 00000000..5dba2f4d --- /dev/null +++ b/prefect_aws/glue_job.py @@ -0,0 +1,188 @@ +""" +Integrations with the AWS Glue Job. + +""" +import time +from typing import Any, Optional + +from prefect.blocks.abstract import JobBlock, JobRun +from pydantic import VERSION as PYDANTIC_VERSION + +if PYDANTIC_VERSION.startswith("2."): + from pydantic.v1 import BaseModel, Field +else: + from pydantic import BaseModel, Field + +from prefect_aws import AwsCredentials + +_GlueJobClient = Any + + +class GlueJobRun(JobRun, BaseModel): + """Execute a Glue Job""" + + job_name: str = Field( + ..., + title="AWS Glue Job Name", + description="The name of the job definition to use.", + ) + + job_id: str = Field( + ..., + title="AWS Glue Job ID", + description="The ID of the job run.", + ) + + job_watch_poll_interval: float = Field( + default=60.0, + description=( + "The amount of time to wait between AWS API calls while monitoring the " + "state of an Glue Job." + ), + ) + + _error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"] + + aws_credentials: AwsCredentials = Field( + title="AWS Credentials", + default_factory=AwsCredentials, + description="The AWS credentials to use to connect to Glue.", + ) + + client: _GlueJobClient = Field(default=None, description="") + + async def fetch_result(self) -> str: + """fetch glue job state""" + job = self._get_job_run() + return job["JobRun"]["JobRunState"] + + def wait_for_completion(self) -> None: + """ + Wait for the job run to complete and get exit code + """ + self.logger.info(f"watching job {self.job_name} with run id {self.job_id}") + while True: + job = self._get_job_run() + job_state = job["JobRun"]["JobRunState"] + if job_state in self._error_states: + # Generate a dynamic exception type from the AWS name + self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}") + raise RuntimeError(job["JobRun"]["ErrorMessage"]) + elif job_state == "SUCCEEDED": + self.logger.info(f"job succeeded: {self.job_id}") + break + + time.sleep(self.job_watch_poll_interval) + + def _get_job_run(self): + """get glue job""" + return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id) + + +class GlueJobBlock(JobBlock): + """Execute a job to the AWS Glue Job service. + + Attributes: + job_name: The name of the job definition to use. + arguments: The job arguments associated with this run. + For this job run, they replace the default arguments set in the job + definition itself. + You can specify arguments here that your own job-execution script consumes, + as well as arguments that Glue itself consumes. + Job arguments may be logged. Do not pass plaintext secrets as arguments. + Retrieve secrets from a Glue Connection, Secrets Manager or other secret + management mechanism if you intend to keep them within the Job. + [doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html) + job_watch_poll_interval: The amount of time to wait between AWS API + calls while monitoring the state of a Glue Job. + default is 60s because of jobs that use AWS Glue versions 2.0 and later + have a 1-minute minimum. + [AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls) + + Example: + Start a job to AWS Glue Job. + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.glue_job import GlueJobBlock + + + @flow + def example_run_glue_job(): + aws_credentials = AwsCredentials( + aws_access_key_id="your_access_key_id", + aws_secret_access_key="your_secret_access_key" + ) + glue_job_run = GlueJobBlock( + job_name="your_glue_job_name", + arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"}, + ).trigger() + + return glue_job_run.wait_for_completion() + + + example_run_glue_job() + ``` + """ + + job_name: str = Field( + ..., + title="AWS Glue Job Name", + description="The name of the job definition to use.", + ) + + arguments: Optional[dict] = Field( + default=None, + title="AWS Glue Job Arguments", + description="The job arguments associated with this run.", + ) + job_watch_poll_interval: float = Field( + default=60.0, + description=( + "The amount of time to wait between AWS API calls while monitoring the " + "state of an Glue Job." + ), + ) + + aws_credentials: AwsCredentials = Field( + title="AWS Credentials", + default_factory=AwsCredentials, + description="The AWS credentials to use to connect to Glue.", + ) + + async def trigger(self) -> GlueJobRun: + """trigger for GlueJobRun""" + client = self._get_client() + job_run_id = self._start_job(client) + return GlueJobRun( + job_name=self.job_name, + job_id=job_run_id, + job_watch_poll_interval=self.job_watch_poll_interval, + ) + + def _start_job(self, client: _GlueJobClient) -> str: + """ + Start the AWS Glue Job + [doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html) + """ + self.logger.info( + f"starting job {self.job_name} with arguments {self.arguments}" + ) + try: + response = client.start_job_run( + JobName=self.job_name, + Arguments=self.arguments, + ) + job_run_id = str(response["JobRunId"]) + self.logger.info(f"job started with job run id: {job_run_id}") + return job_run_id + except Exception as e: + self.logger.error(f"failed to start job: {e}") + raise RuntimeError + + def _get_client(self) -> _GlueJobClient: + """ + Retrieve a Glue Job Client + """ + boto_session = self.aws_credentials.get_boto3_session() + return boto_session.client("glue") diff --git a/requirements.txt b/requirements.txt index 4a764f81..e4418d7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ botocore>=1.27.53 mypy_boto3_s3>=1.24.94 mypy_boto3_secretsmanager>=1.26.49 prefect>=2.16.4 -tenacity>=8.0.0 \ No newline at end of file +pyparsing>=3.1.1 +tenacity>=8.0.0 diff --git a/tests/mock_aws_credentials b/tests/mock_aws_credentials index 19dfa017..fd2dbab7 100644 --- a/tests/mock_aws_credentials +++ b/tests/mock_aws_credentials @@ -1,7 +1,11 @@ [TEST_PROFILE_1] aws_access_key_id = mock aws_secret_access_key = mock +aws_region = us-east-1 +aws_default_region = us-east-1 [TEST_PROFILE_2] aws_access_key_id = mock -aws_secret_access_key = mock \ No newline at end of file +aws_secret_access_key = mock +aws_region = us-east-1 +aws_default_region = us-east-1 diff --git a/tests/test_glue_job.py b/tests/test_glue_job.py new file mode 100644 index 00000000..0c0ddac2 --- /dev/null +++ b/tests/test_glue_job.py @@ -0,0 +1,154 @@ +from unittest.mock import MagicMock + +import pytest +from moto import mock_glue + +from prefect_aws.glue_job import GlueJobBlock, GlueJobRun + + +@pytest.fixture(scope="function") +def glue_job_client(aws_credentials): + with mock_glue(): + boto_session = aws_credentials.get_boto3_session() + yield boto_session.client("glue", region_name="us-east-1") + + +async def test_fetch_result(aws_credentials, glue_job_client): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + job_run_id = glue_job_client.start_job_run( + JobName="test_job_name", + Arguments={}, + )["JobRunId"] + glue_job_run = GlueJobRun( + job_name="test_job_name", job_id=job_run_id, client=glue_job_client + ) + result = await glue_job_run.fetch_result() + assert result == "SUCCEEDED" + + +def test_wait_for_completion(aws_credentials, glue_job_client): + with mock_glue(): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + job_run_id = glue_job_client.start_job_run( + JobName="test_job_name", + Arguments={}, + )["JobRunId"] + + glue_job_run = GlueJobRun( + job_name="test_job_name", + job_id=job_run_id, + job_watch_poll_interval=0.1, + client=glue_job_client, + ) + + glue_job_client.get_job_run = MagicMock( + side_effect=[ + { + "JobRun": { + "JobName": "test_job_name", + "JobRunState": "RUNNING", + } + }, + { + "JobRun": { + "JobName": "test_job_name", + "JobRunState": "SUCCEEDED", + } + }, + ] + ) + glue_job_run.wait_for_completion() + + +def test_wait_for_completion_fail(aws_credentials, glue_job_client): + with mock_glue(): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + job_run_id = glue_job_client.start_job_run( + JobName="test_job_name", + Arguments={}, + )["JobRunId"] + glue_job_client.get_job_run = MagicMock( + side_effect=[ + { + "JobRun": { + "JobName": "test_job_name", + "JobRunState": "FAILED", + "ErrorMessage": "err", + } + }, + ] + ) + + glue_job_run = GlueJobRun( + job_name="test_job_name", job_id=job_run_id, client=glue_job_client + ) + with pytest.raises(RuntimeError): + glue_job_run.wait_for_completion() + + +def test__get_job_run(aws_credentials, glue_job_client): + with mock_glue(): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + job_run_id = glue_job_client.start_job_run( + JobName="test_job_name", + Arguments={}, + )["JobRunId"] + + glue_job_run = GlueJobRun( + job_name="test_job_name", job_id=job_run_id, client=glue_job_client + ) + response = glue_job_run._get_job_run() + assert response["JobRun"]["JobRunState"] == "SUCCEEDED" + + +async def test_trigger(aws_credentials, glue_job_client): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + glue_job = GlueJobBlock( + job_name="test_job_name", + arguments={"arg1": "value1"}, + aws_credential=aws_credentials, + ) + glue_job._get_client = MagicMock(side_effect=[glue_job_client]) + glue_job._start_job = MagicMock(side_effect=["test_job_id"]) + glue_job_run = await glue_job.trigger() + assert isinstance(glue_job_run, GlueJobRun) + + +def test_start_job(aws_credentials, glue_job_client): + with mock_glue(): + glue_job_client.create_job( + Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} + ) + glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"}) + + glue_job_client.start_job_run = MagicMock( + side_effect=[{"JobRunId": "test_job_run_id"}] + ) + job_run_id = glue_job._start_job(glue_job_client) + assert job_run_id == "test_job_run_id" + + +def test_start_job_fail_because_not_exist_job(aws_credentials, glue_job_client): + with mock_glue(): + glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"}) + with pytest.raises(RuntimeError): + glue_job._start_job(glue_job_client) + + +def test_get_client(aws_credentials): + with mock_glue(): + glue_job_run = GlueJobBlock( + job_name="test_job_name", aws_credentials=aws_credentials + ) + client = glue_job_run._get_client() + assert hasattr(client, "get_job_run")