Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Feature Implementation: AWS Glue Job Execution Support #308

Merged
merged 45 commits into from
Apr 24, 2024

Conversation

knakazawa99
Copy link
Contributor

@knakazawa99 knakazawa99 commented Sep 4, 2023

This pull request implements the ability to execute AWS Glue Jobs directly from Prefect. This feature was developed based on the existing ECS Task program and is intended to provide a powerful preprocessor for data used in aggregate processing and machine learning models.

Closes #307

Example

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


example_run_glue_job()

Screenshots

Blocks Catalog
スクリーンショット 2024-03-17 18 31 00

Examples Catalog
スクリーンショット 2024-03-17 18 31 36

API Reference
スクリーンショット 2024-03-17 17 26 16

スクリーンショット 2024-03-17 17 26 34 スクリーンショット 2024-03-17 17 26 49

Checklist

  • References any related issue by including "Closes #" or "Closes ".
    • If no issue exists and your change is not a small fix, please create an issue first.
  • Includes tests or only affects documentation.
  • Passes pre-commit checks.
    • Run pre-commit install && pre-commit run --all locally for formatting and linting.
  • Includes screenshots of documentation updates.
    • Run mkdocs serve view documentation locally.
  • Summarizes PR's changes in CHANGELOG.md

@knakazawa99 knakazawa99 requested a review from a team as a code owner September 4, 2023 22:22
@feliperazeek
Copy link

Would love to use this!

aws_secret_access_key="your_secret_access_key"
)
glue_job = GlueJob(
glue_job_name="your_glue_job_name",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are referencing glue_job_name but I believe you are actually using job_name instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feliperazeek
Thanks for your comment.
fixed it.
8f9e9af

Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @knakazawa99! It looks like the tests are failing with a missing pyparsing dependency. You can add pyparsing to requirements.txt to ensure it gets installed with this library. Let me know when the tests are passing and I can give the change a more thorough review.

Also, we introduced a worker concept a little while ago that can be used in place of infrastructure blocks. If you're willing, it'd be great if you could also create an AWS Glue worker! It shouldn't be too tough since you've already created an infrastructure block. We have a guide on creating a worker here which will be helpful if you decide to create an AWS Glue worker.

@knakazawa99
Copy link
Contributor Author

@desertaxle
Thank you for your feedback!
I have noticed that the tests are failing due to a NoRegionError from the AWS SDK. This error occurs because the region is not specified when creating the AWS client in the tests.
In my local environment, the tests pass because the AWS SDK automatically uses the region specified in my local AWS configuration. However, in the GitHub Actions environment, there is no such configuration, hence the error.
To fix this, I am considering adding a region to the mock_aws_credentials used in the tests. However, I understand that this might be outside the scope of this PR.
Could you please advise if it would be acceptable to make this change within this PR?
Looking forward to your guidance on this matter.

@desertaxle
Copy link
Member

desertaxle commented Oct 10, 2023

To fix this, I am considering adding a region to the mock_aws_credentials used in the tests.

This fix sounds good to me!

Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few high-level comments! I'll do another pass once the tests are passing!

prefect_aws/workers/glue_job_worker.py Outdated Show resolved Hide resolved
Comment on lines 31 to 35
arguments: Optional[dict] = Field(
default=None,
title="AWS Glue Job Arguments",
description="The job arguments associated with this run.",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how flexible this field is, but it'd be nice to have some stronger typing while maintaining flexibility. Introducing a variables class that contains some values that will often vary between deployments and creating a default template can help with that. You can checkout the ECS worker's variables class as an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fix it.
Does this match what you had in mind?
e71b947

Comment on lines 26 to 30
job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseJobConfiguration class has a name attribute that automatically gets populated with the flow name. I think it'd be worthwhile to use that attribute rather than introducing another name attribute.

Suggested change
job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, does this require a Glue job to exist already? Usually, workers dynamically create jobs to execute flow runs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am user very interested in using this feature, I actually I have forked this into my repo already. I think it's expected that the glue job exists already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feliperazeek are you using the worker to run Glue jobs or the block in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name is good and will work, but I think the implementation of the worker needs to change to operate the same as our other workers. I'd expect a Prefect Glue worker to create a Glue job for each flow run that it picks up and then monitor that Glue job for completion.

@knakazawa99 I think we can take this PR in one of two directions:

  1. Update the worker and block to create new Glue jobs for executing flow runs
  2. Remove the worker and update the block to implement the JobBlock interface instead of the Infrastructure interface. This will make it more suited for executing existing Glue jobs (which I'm realizing might have been the original intent of the PR, which I misunderstood).

Let me know which direction you'd like to go with this PR, and I will do whatever I can to help. Thank you for your patience and sticking with this PR!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feliperazeek are you using the worker to run Glue jobs or the block in this PR?

Running glue jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desertaxle
Is the following direction correct?

  1. delete Worker
  2. modify the part that inherits Infrastructure to match the Interface of JobBlock

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay. Yes, that is the direction that I'm suggesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desertaxle
I have fixed it. However, the test is failing due to the boto3. I have not been able to reproduce this error in my local environment.

prefect_aws/workers/glue_job_worker.py Outdated Show resolved Hide resolved
prefect_aws/workers/glue_job_worker.py Outdated Show resolved Hide resolved
prefect_aws/workers/glue_job_worker.py Outdated Show resolved Hide resolved
@desertaxle
Copy link
Member

Hey @knakazawa99, apologies for the delay. I pushed a change that fixes the error you saw in the test. I'll give this one more review today so we can merge it as soon as possible.

Comment on lines 186 to 192
async def trigger(self) -> GlueJobRun:
"""trigger for GlueJobRun"""
return GlueJobRun(
job_name=self.job_name,
arguments=self.arguments,
job_watch_poll_interval=self.job_watch_poll_interval,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should start the job run here and then return the GlueJobRun object. The user can choose to wait for the job, or not, but the job should run whether or not they wait for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment.
I misunderstood the responsibilities of Glue Job Run.
It has been corrected.

f3c7d2c

Comment on lines 50 to 53
client: Any = Field(default=None, description="")
job_id: str = Field(
default="",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look like they could be private fields.

Copy link
Contributor Author

@knakazawa99 knakazawa99 Mar 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Glue Job Run responsibilities have changed and require public access.

from prefect_aws import AwsCredentials


class GlueJobRun(JobRun, BaseModel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class doesn't need to be a Pydantic BaseModel since we don't need validation. I think it might be simpler to make it a plain Python class

Suggested change
class GlueJobRun(JobRun, BaseModel):
class GlueJobRun(JobRun):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it useful to depend on pydantic for uniformity with other object definitions, validation, and attribute descriptions.

@knakazawa99
Copy link
Contributor Author

@desertaxle
Thank you for your comment.
I've made improvements based on your comments.

Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks so much for your patience and perseverance @knakazawa99!

@zzstoatzz zzstoatzz merged commit 3bb6261 into PrefectHQ:main Apr 24, 2024
7 checks passed
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature Request: Support for AWS Glue Job Execution
4 participants