From a2d212ced030dd93d20aece634607da8e562cf8e Mon Sep 17 00:00:00 2001
From: Jeff Hale
Date: Tue, 2 Apr 2024 15:05:59 -0400
Subject: [PATCH 1/3] Add Python 3.12 support and remove 3.7 support (#405)
Co-authored-by: Alex Streed
---
.github/workflows/tests.yml | 1 +
setup.py | 5 +++--
tests/test_client_waiter.py | 6 +++---
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index c7a7acea..d79d4860 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -13,6 +13,7 @@ jobs:
- "3.9"
- "3.10"
- "3.11"
+ - "3.12"
fail-fast: false
steps:
- uses: actions/checkout@v4
diff --git a/setup.py b/setup.py
index b8d7dd55..669a0b5e 100644
--- a/setup.py
+++ b/setup.py
@@ -24,7 +24,7 @@
version=versioneer.get_version(),
cmdclass=versioneer.get_cmdclass(),
packages=find_packages(exclude=("tests", "docs")),
- python_requires=">=3.7",
+ python_requires=">=3.8",
install_requires=install_requires,
entry_points={
"prefect.collections": [
@@ -38,10 +38,11 @@
"Intended Audience :: System Administrators",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3 :: Only",
- "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries",
],
)
diff --git a/tests/test_client_waiter.py b/tests/test_client_waiter.py
index 0c3d27e2..1a764e42 100644
--- a/tests/test_client_waiter.py
+++ b/tests/test_client_waiter.py
@@ -9,7 +9,7 @@
@pytest.fixture
def mock_waiter(monkeypatch):
- waiter = MagicMock()
+ waiter = MagicMock(name="mock_waiter")
monkeypatch.setattr(
"prefect_aws.client_waiter.create_waiter_with_client",
waiter,
@@ -44,7 +44,7 @@ def test_flow():
return waiter
test_flow()
- assert mock_waiter.wait.called_once_with("JobExists")
+ mock_waiter().wait.assert_called_once_with()
@mock_ec2
@@ -66,4 +66,4 @@ def test_flow():
return waiter
test_flow()
- assert mock_waiter.wait.called_once_with("instance_exists")
+ mock_waiter.wait.assert_called_once_with()
From df3b9f0ab2f04620babeb90b1a62d719cf23f214 Mon Sep 17 00:00:00 2001
From: Jeff Hale
Date: Wed, 3 Apr 2024 12:55:55 -0400
Subject: [PATCH 2/3] Update index to reflect current Prefect recs and fix an
example (#403)
Co-authored-by: Alexander Streed
---
docs/index.md | 209 +++++++++-----------------------------------------
1 file changed, 38 insertions(+), 171 deletions(-)
diff --git a/docs/index.md b/docs/index.md
index 091b54e9..19095a25 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -12,180 +12,61 @@
-
-
-## Welcome!
+## Welcome
-`prefect-aws` makes it easy to leverage the capabilities of AWS in your flows, featuring support for ECSTask, S3, Secrets Manager, Batch Job, and Client Waiter.
+`prefect-aws` makes it easy to leverage the capabilities of AWS in your workflows.
+## Getting started
-## Getting Started
-
-### Saving credentials to a block
-
-You will need an AWS account and credentials in order to use `prefect-aws`.
-
-1. Refer to the [AWS Configuration documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-creds) on how to retrieve your access key ID and secret access key
-2. Copy the access key ID and secret access key
-3. Create a short script and replace the placeholders with your credential information and desired block name:
-
-```python
-from prefect_aws import AwsCredentials
-AwsCredentials(
- aws_access_key_id="PLACEHOLDER",
- aws_secret_access_key="PLACEHOLDER",
- aws_session_token=None, # replace this with token if necessary
- region_name="us-east-2"
-).save("BLOCK-NAME-PLACEHOLDER")
-```
-
-Congrats! You can now load the saved block to use your credentials in your Python code:
-
-```python
-from prefect_aws import AwsCredentials
-AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
-```
-
-!!! info "Registering blocks"
-
- Register blocks in this module to
- [view and edit them](https://docs.prefect.io/ui/blocks/)
- on Prefect Cloud:
-
- ```bash
- prefect block register -m prefect_aws
- ```
-
-### Using Prefect with AWS ECS
-
-`prefect_aws` allows you to use [AWS ECS](https://aws.amazon.com/ecs/) as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability.
-
-The snippets below show how you can use `prefect_aws` to run a task on ECS. The first example uses the `ECSTask` block as [infrastructure](https://docs.prefect.io/concepts/infrastructure/) and the second example shows using ECS within a flow.
-
-#### As deployment Infrastructure
+### Installation
+Prefect requires Python 3.8 or newer.
-##### Set variables
+We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.
-To expedite copy/paste without the needing to update placeholders manually, update and execute the following.
+Install `prefect-aws`
```bash
-export CREDENTIALS_BLOCK_NAME="aws-credentials"
-export VPC_ID="vpc-id"
-export ECS_TASK_BLOCK_NAME="ecs-task-example"
-export S3_BUCKET_BLOCK_NAME="ecs-task-bucket-example"
-```
-
-##### Save an infrastructure and storage block
-
-Save a custom infrastructure and storage block by executing the following snippet.
-
-```python
-import os
-from prefect_aws import AwsCredentials, ECSTask, S3Bucket
-
-aws_credentials = AwsCredentials.load(os.environ["CREDENTIALS_BLOCK_NAME"])
-
-ecs_task = ECSTask(
- image="prefecthq/prefect:2-python3.10",
- aws_credentials=aws_credentials,
- vpc_id=os.environ["VPC_ID"],
-)
-ecs_task.save(os.environ["ECS_TASK_BLOCK_NAME"], overwrite=True)
-
-bucket_name = "ecs-task-bucket-example"
-s3_client = aws_credentials.get_s3_client()
-s3_client.create_bucket(
- Bucket=bucket_name,
- CreateBucketConfiguration={"LocationConstraint": aws_credentials.region_name}
-)
-s3_bucket = S3Bucket(
- bucket_name=bucket_name,
- credentials=aws_credentials,
-)
-s3_bucket.save(os.environ["S3_BUCKET_BLOCK_NAME"], overwrite=True)
-```
-
-##### Write a flow
-
-Then, use an existing flow to create a deployment with, or use the flow below if you don't have an existing flow handy.
-
-```python
-from prefect import flow
-
-@flow(log_prints=True)
-def ecs_task_flow():
- print("Hello, Prefect!")
-
-if __name__ == "__main__":
- ecs_task_flow()
+pip install prefect-aws
```
-##### Create a deployment
+### Registering blocks
-If the script was named "ecs_task_script.py", build a deployment manifest with the following command.
+Register [blocks](https://docs.prefect.io/ui/blocks/) in this module to make them available for use.
```bash
-prefect deployment build ecs_task_script.py:ecs_task_flow \
- -n ecs-task-deployment \
- -ib ecs-task/${ECS_TASK_BLOCK_NAME} \
- -sb s3-bucket/${S3_BUCKET_BLOCK_NAME} \
- --override env.EXTRA_PIP_PACKAGES=prefect-aws
+prefect block register -m prefect_aws
```
-Now apply the deployment!
-
-```bash
-prefect deployment apply ecs_task_flow-deployment.yaml
-```
-
-##### Test the deployment
+A list of available blocks in `prefect-aws` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-aws/#blocks-catalog).
-Start an [agent](https://docs.prefect.io/latest/concepts/work-pools/) in a separate terminal. The agent will poll the Prefect API's work pool for scheduled flow runs.
+### Saving credentials to a block
-```bash
-prefect agent start -q 'default'
-```
+You will need an AWS account and credentials to use `prefect-aws`.
-Run the deployment once to test it:
+1. Refer to the [AWS Configuration documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-creds) on how to retrieve your access key ID and secret access key
+2. Copy the access key ID and secret access key
+3. Create an `AWSCredenitals` block in the Prefect UI or use a Python script like the one below and replace the placeholders with your credential information and desired block name:
-```bash
-prefect deployment run ecs-task-flow/ecs-task-deployment
+```python
+from prefect_aws import AwsCredentials
+AwsCredentials(
+ aws_access_key_id="PLACEHOLDER",
+ aws_secret_access_key="PLACEHOLDER",
+ aws_session_token=None, # replace this with token if necessary
+ region_name="us-east-2"
+).save("BLOCK-NAME-PLACEHOLDER")
```
-Once the flow run has completed, you will see `Hello, Prefect!` logged in the CLI and the Prefect UI.
-
-!!! info "No class found for dispatch key"
-
- If you encounter an error message like `KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."`,
- ensure `prefect-aws` is installed in the environment in which your agent is running!
-
-Another tutorial on `ECSTask` can be found [here](https://towardsdatascience.com/prefect-aws-ecs-fargate-github-actions-make-serverless-dataflows-as-easy-as-py-f6025335effc).
-
-#### Within Flow
-
-You can also execute commands with an `ECSTask` block directly within a Prefect flow. Running containers via ECS in your flows is useful for executing non-Python code in a distributed manner while using Prefect.
+Congrats! You can now load the saved block to use your credentials in your Python code:
```python
-from prefect import flow
from prefect_aws import AwsCredentials
-from prefect_aws.ecs import ECSTask
-
-@flow
-def ecs_task_flow():
- ecs_task = ECSTask(
- image="prefecthq/prefect:2-python3.10",
- credentials=AwsCredentials.load("BLOCK-NAME-PLACEHOLDER"),
- region="us-east-2",
- command=["echo", "Hello, Prefect!"],
- )
- return ecs_task.run()
+AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
```
-This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.
-
### Using Prefect with AWS S3
`prefect_aws` allows you to read and write objects with AWS S3 within your Prefect flows.
@@ -208,7 +89,7 @@ def s3_flow():
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
s3_bucket = S3Bucket(
bucket_name="BUCKET-NAME-PLACEHOLDER",
- aws_credentials=aws_credentials
+ credentials=aws_credentials
)
s3_bucket_path = s3_bucket.upload_from_path(file_path)
@@ -242,36 +123,22 @@ def secrets_manager_flow():
secrets_manager_flow()
```
-## Resources
-
-Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!
-
-For more tips on how to use tasks and flows in a Collection, check out [Using Collections](https://docs.prefect.io/collections/usage/)!
-
-### Recipes
-
-For additional recipes and examples, check out [`prefect-recipes`](https://github.com/PrefectHQ/prefect-recipes).
-
-### Installation
+### Using Prefect with AWS ECS
-Install `prefect-aws`
+`prefect_aws` allows you to use [AWS ECS](https://aws.amazon.com/ecs/) as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability. This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.
-```bash
-pip install prefect-aws
-```
+See the [ECS guide](/ecs_guide/) for a full walkthrough.
-A list of available blocks in `prefect-aws` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-aws/#blocks-catalog).
-
-Requires an installation of Python 3.7+
+## Resources
-We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
+Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!
-These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).
+For more tips on how to use blocks and tasks in Prefect integration libraries, check out the [docs](https://docs.prefect.io/integrations/usage/)!
-### Feedback
+For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).
If you encounter any bugs while using `prefect-aws`, feel free to open an issue in the [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) repository.
-If you have any questions or issues while using `prefect-aws`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).
-
-Feel free to star or watch [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) for updates too!
+If you have any questions or issues while using `prefect-aws`, you can find help in the [Prefect Slack community](https://prefect.io/slack).
+
+Feel free to check out the source code and give [`prefect-aws`](https://github.com/PrefectHQ/prefect-aws) a ⭐️!
From e6b15c2211cd0a15974c0966f1e60ecbfba47735 Mon Sep 17 00:00:00 2001
From: Jean Luciano
Date: Wed, 3 Apr 2024 15:59:01 -0500
Subject: [PATCH 3/3] formats prefix to not be flowrun every time (#400)
---
prefect_aws/workers/ecs_worker.py | 16 +++++++++++++++-
tests/test_ecs.py | 2 +-
tests/workers/test_ecs_worker.py | 27 ++++++++++++++++++++-------
3 files changed, 36 insertions(+), 9 deletions(-)
diff --git a/prefect_aws/workers/ecs_worker.py b/prefect_aws/workers/ecs_worker.py
index 1a5c3d28..917d8bab 100644
--- a/prefect_aws/workers/ecs_worker.py
+++ b/prefect_aws/workers/ecs_worker.py
@@ -259,6 +259,7 @@ class ECSJobConfiguration(BaseJobConfiguration):
)
configure_cloudwatch_logs: Optional[bool] = Field(default=None)
cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
+ cloudwatch_logs_prefix: Optional[str] = Field(default=None)
network_configuration: Dict[str, Any] = Field(default_factory=dict)
stream_output: Optional[bool] = Field(default=None)
task_start_timeout_seconds: int = Field(default=300)
@@ -507,6 +508,16 @@ class ECSVariables(BaseVariables):
" for available options. "
),
)
+ cloudwatch_logs_prefix: Optional[str] = Field(
+ default=None,
+ description=(
+ "When `configure_cloudwatch_logs` is enabled, this setting may be used to"
+ " set a prefix for the log group. If not provided, the default prefix will"
+ " be `prefect-logs__`. If"
+ " `awslogs-stream-prefix` is present in `Cloudwatch logs options` this"
+ " setting will be ignored."
+ ),
+ )
network_configuration: Dict[str, Any] = Field(
default_factory=dict,
@@ -1276,13 +1287,16 @@ def _prepare_task_definition(
container["environment"].remove(item)
if configuration.configure_cloudwatch_logs:
+ prefix = f"prefect-logs_{self._work_pool_name}_{flow_run.deployment_id}"
container["logConfiguration"] = {
"logDriver": "awslogs",
"options": {
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": region,
- "awslogs-stream-prefix": configuration.name or "prefect",
+ "awslogs-stream-prefix": (
+ configuration.cloudwatch_logs_prefix or prefix
+ ),
**configuration.cloudwatch_logs_options,
},
}
diff --git a/tests/test_ecs.py b/tests/test_ecs.py
index 6c429e9f..b105fcd6 100644
--- a/tests/test_ecs.py
+++ b/tests/test_ecs.py
@@ -1232,8 +1232,8 @@ async def test_cloudwatch_log_options(aws_credentials):
configure_cloudwatch_logs=True,
execution_role_arn="test",
cloudwatch_logs_options={
- "awslogs-stream-prefix": "override-prefix",
"max-buffer-size": "2m",
+ "awslogs-stream-prefix": "override-prefix",
},
)
diff --git a/tests/workers/test_ecs_worker.py b/tests/workers/test_ecs_worker.py
index e4dab38c..509393f8 100644
--- a/tests/workers/test_ecs_worker.py
+++ b/tests/workers/test_ecs_worker.py
@@ -1324,8 +1324,20 @@ async def write_fake_log(task_arn):
@pytest.mark.usefixtures("ecs_mocks")
+@pytest.mark.parametrize(
+ "cloudwatch_logs_options",
+ [
+ {
+ "awslogs-stream-prefix": "override-prefix",
+ "max-buffer-size": "2m",
+ },
+ {
+ "max-buffer-size": "2m",
+ },
+ ],
+)
async def test_cloudwatch_log_options(
- aws_credentials: AwsCredentials, flow_run: FlowRun
+ aws_credentials: AwsCredentials, flow_run: FlowRun, cloudwatch_logs_options: dict
):
session = aws_credentials.get_boto3_session()
ecs_client = session.client("ecs")
@@ -1334,12 +1346,10 @@ async def test_cloudwatch_log_options(
aws_credentials=aws_credentials,
configure_cloudwatch_logs=True,
execution_role_arn="test",
- cloudwatch_logs_options={
- "awslogs-stream-prefix": "override-prefix",
- "max-buffer-size": "2m",
- },
+ cloudwatch_logs_options=cloudwatch_logs_options,
)
- async with ECSWorker(work_pool_name="test") as worker:
+ work_pool_name = "test"
+ async with ECSWorker(work_pool_name=work_pool_name) as worker:
result = await run_then_stop_task(worker, configuration, flow_run)
assert result.status_code == 0
@@ -1349,6 +1359,9 @@ async def test_cloudwatch_log_options(
task_definition = describe_task_definition(ecs_client, task)
for container in task_definition["containerDefinitions"]:
+ prefix = f"prefect-logs_{work_pool_name}_{flow_run.deployment_id}"
+ if cloudwatch_logs_options.get("awslogs-stream-prefix"):
+ prefix = cloudwatch_logs_options["awslogs-stream-prefix"]
if container["name"] == ECS_DEFAULT_CONTAINER_NAME:
# Assert that the container has logging configured with user
# provided options
@@ -1358,7 +1371,7 @@ async def test_cloudwatch_log_options(
"awslogs-create-group": "true",
"awslogs-group": "prefect",
"awslogs-region": "us-east-1",
- "awslogs-stream-prefix": "override-prefix",
+ "awslogs-stream-prefix": prefix,
"max-buffer-size": "2m",
},
}