Skip to content

Commit

Permalink
Update to version v2.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
YikaiHu committed Apr 27, 2023
1 parent a19a2cc commit 0baf83b
Show file tree
Hide file tree
Showing 22 changed files with 358 additions and 84 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.4.0] - 2023-04-28
### Added
- Support for requester pay mode in S3 transfer task.

## [2.3.0] - 2023-03-30
- Support S3 Access Key Rotation

Expand Down
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ If you have deployed a version before v2.0.2 (You can go to CloudFormation, chec

![S3 Plugin Architecture](s3-plugin-architect.png)

A *Finder* job running in AWS Fargate lists all the objects in source and destination buckets and determines what objects should be transferred, a message for each object to be transferred will be created in SQS. A *time-based CloudWatch rule* will trigger the ECS task to run every hour.

This plugin also supports S3 Event notification to trigger the data transfer (near real-time), only if the source bucket is in the same account (and region) as the one you deploy this plugin to. The event message will also be sent the same SQS queue.

The *Worker* job running in EC2 consumes the message in SQS and transfer the object from source bucket to destination bucket. You can use Auto Scaling Group to controll the number of EC2 instances to transfer the data based on your business need.

If an object or a part of an object failed to transfer, the EC2 instance will release the message in the Queue, and the object will be transferred again after the message is visible in the queue (Default visibility timeout is set to 15 minutes, extended for large objects). After a few retries, if the transfer still failed, the message will be sent to the Dead Letter Queue and an alarm will be triggered.
The Amazon S3 plugin runs the following workflows:

1. A time-based Event Bridge rule triggers a AWS Lambda function on an hourly basis.
2. AWS Lambda uses the launch template to launch a data comparison job (JobFinder) in an [Amazon Elastic Compute Cloud (Amazon EC2)](https://aws.amazon.com/ec2/).
3. The job lists all the objects in the source and destination
buckets, makes comparisons among objects and determines which objects should be transferred.
4. Amazon EC2 sends a message for each object that will be transferred to [Amazon Simple Queue Service (Amazon SQS)](https://aws.amazon.com/sqs/). Amazon S3 event messages can also be supported for more real-time data transfer; whenever there is object uploaded to source bucket, the event message is sent to the same Amazon SQS queue.
5. A JobWorker running in Amazon EC2 consumes the messages in SQS and transfers the object from the source bucket to the destination bucket. You can use an Auto Scaling Group to control the number of EC2 instances to transfer the data based on business need.
6. A record with transfer status for each object is stored in Amazon DynamoDB.
7. The Amazon EC2 instance will get (download) the object from the source bucket based on the Amazon SQS message.
8. The Amazon EC2 instance will put (upload) the object to the destination bucket based on the Amazon SQS message.

This plugin supports transfer large size file. It will divide it into small parts and leverage the [multipart upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html) feature of Amazon S3.

Expand Down
Binary file modified s3-plugin-architect.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions source/bin/main-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ stackSuppressions([
], [
{ id: 'AwsSolutions-IAM5', reason: 'some policies need to get dynamic resources' },
{ id: 'AwsSolutions-IAM4', reason: 'these policies is used by CDK Customer Resource lambda' },
{
id: 'AwsSolutions-L1',
reason: 'not applicable to use the latest lambda runtime version for aws cdk cr',
},
]);

Aspects.of(app).add(new AwsSolutionsChecks());
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
8 changes: 8 additions & 0 deletions source/lambda/custom-resource/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[run]
omit =
tests/*
.venv-*/*
test/*
*/__init__.py
source =
.
139 changes: 139 additions & 0 deletions source/lambda/custom-resource/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0


import json
import boto3
import os
from botocore import config
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

stack_name = os.environ["STACK_NAME"]

solution_version = os.environ.get("SOLUTION_VERSION", "v1.0.0")
solution_id = os.environ.get("SOLUTION_ID", "SO8002")
user_agent_config = {
"user_agent_extra": f"AwsSolution/{solution_id}/{solution_version}"
}
default_config = config.Config(**user_agent_config)

default_region = os.environ.get("AWS_REGION")

bucket_name = os.environ.get("BUCKET_NAME", "")
object_prefix = os.environ.get("OBJECT_PREFIX", "")

event_queue_name = os.environ.get("EVENT_QUEUE_NAME", "")
event_queue_arn = os.environ.get("EVENT_QUEUE_ARN", "")

event_action = os.environ.get("EVENT_ACTION", "")

notification_id = f"{stack_name}-{event_queue_name}"


def lambda_handler(event, context):
request_type = event["RequestType"]
if request_type == "Create" or request_type == "Update":
return on_create()
if request_type == "Delete":
return on_delete()
raise Exception("Invalid request type: %s" % request_type)


def on_create():
config_events = []
if event_action == "CreateAndDelete":
config_events = ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
elif event_action == "Create":
config_events = ["s3:ObjectCreated:*"]
else:
return {
"statusCode": 200,
"body": json.dumps("Skip creating s3 events."),
}

try:
s3 = boto3.client("s3")
history_config = s3.get_bucket_notification_configuration(
Bucket=bucket_name,
)
logger.info(f"history notification config is {history_config}")

queue_configurations = history_config.get("QueueConfigurations", [])
queue_configurations.append(
{
"Id": notification_id,
"QueueArn": event_queue_arn,
"Events": config_events,
"Filter": {
"Key": {"FilterRules": [{"Name": "prefix", "Value": object_prefix}]}
},
}
)

notification_config = {
"QueueConfigurations": queue_configurations,
"TopicConfigurations": history_config.get("TopicConfigurations", []),
"LambdaFunctionConfigurations": history_config.get(
"LambdaFunctionConfigurations", []
),
}
if "EventBridgeConfiguration" in history_config:
notification_config["EventBridgeConfiguration"] = history_config[
"EventBridgeConfiguration"
]
resp = s3.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration=notification_config,
)
logger.info(f"put_bucket_notification_configuration resp is {resp}")
except Exception as err:
logger.error("Create log source s3 bucket notification failed, %s" % err)
raise

return {
"statusCode": 200,
"body": json.dumps("Create log source s3 bucket notification success!"),
}


def on_delete():
if event_action in ["CreateAndDelete", "Create"]:
try:
s3 = boto3.client("s3")
history_config = s3.get_bucket_notification_configuration(
Bucket=bucket_name,
)
logger.info(f"history notification config is {history_config}")
queue_configurations = history_config.get("QueueConfigurations", [])
deleted_queue_configurations = [
x for x in queue_configurations if x["Id"] != notification_id
]

notification_config = {
"QueueConfigurations": deleted_queue_configurations,
"TopicConfigurations": history_config.get("TopicConfigurations", []),
"LambdaFunctionConfigurations": history_config.get(
"LambdaFunctionConfigurations", []
),
}
if "EventBridgeConfiguration" in history_config:
notification_config["EventBridgeConfiguration"] = history_config[
"EventBridgeConfiguration"
]

resp = s3.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration=notification_config,
)
logger.info(f"put_bucket_notification_configuration resp is {resp}")
except Exception as err:
print("Delete log source s3 bucket notification failed, %s" % err)
raise

return {
"statusCode": 200,
"body": json.dumps("Delete log source s3 bucket notification success!"),
}
Empty file.
29 changes: 29 additions & 0 deletions source/lambda/custom-resource/test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import os
import pytest


@pytest.fixture(autouse=True)
def default_environment_variables():
"""Mocked AWS evivronment variables such as AWS credentials and region"""
os.environ["AWS_ACCESS_KEY_ID"] = "mocked-aws-access-key-id"
os.environ["AWS_SECRET_ACCESS_KEY"] = "mocked-aws-secret-access-key"
os.environ["AWS_SESSION_TOKEN"] = "mocked-aws-session-token"
os.environ["AWS_REGION"] = "us-east-1"
os.environ["SOLUTION_VERSION"] = "v1.0.0"
os.environ["SOLUTION_ID"] = "SO8002"

os.environ["STACK_NAME"] = "test"

os.environ["BUCKET_NAME"] = "test-bucket"
os.environ["OBJECT_PREFIX"] = "test"

os.environ["EVENT_QUEUE_ARN"] = "arn:aws:sqs:us-east-1:123456789012:test-queue"
os.environ[
"LOG_EVENT_QUEUE_URL"
] = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
os.environ["EVENT_QUEUE_NAME"] = "test-queue"

os.environ["EVENT_ACTION"] = "CreateAndDelete"
4 changes: 4 additions & 0 deletions source/lambda/custom-resource/test/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
boto3
moto
pytest
pytest-cov
39 changes: 39 additions & 0 deletions source/lambda/custom-resource/test/test_custom_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from moto import mock_logs, mock_s3, settings
import pytest
import os
import boto3


@pytest.fixture
def s3_client():
bucket_name = os.environ.get("BUCKET_NAME")
with mock_s3():
s3 = boto3.resource("s3", region_name="us-east-1")
# Create the bucket
s3.create_bucket(Bucket=bucket_name)
yield


def test_lambda_handler_on_create(s3_client):
from lambda_function import lambda_handler

assert lambda_handler(
{
"RequestType": "Create",
},
None,
)


def test_lambda_handler_on_delete(s3_client):
from lambda_function import lambda_handler

assert lambda_handler(
{
"RequestType": "Delete",
},
None,
)
24 changes: 22 additions & 2 deletions source/lib/common-resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ import {
aws_cloudwatch_actions as actions,
aws_sns as sns,
aws_sns_subscriptions as sub,
aws_kms as kms
aws_kms as kms,
aws_s3 as s3,
} from 'aws-cdk-lib';
import { NagSuppressions } from "cdk-nag";

import { addCfnNagSuppressRules } from "./main-stack";

export interface CommonProps {
readonly alarmEmail: string,
readonly srcIBucket: s3.IBucket
}

export class CommonStack extends Construct {
Expand Down Expand Up @@ -94,6 +96,24 @@ export class CommonStack extends Construct {
{ id: "AwsSolutions-SQS4", reason: "this queue only used by DTH solution" },
]);

this.sqsQueue.addToResourcePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
conditions: {
ArnLike: {
"aws:SourceArn": props.srcIBucket.bucketArn,
},
},
principals: [new iam.ServicePrincipal("s3.amazonaws.com")],
resources: [this.sqsQueue.queueArn],
actions: [
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
],
})
);

const cfnSqsQueue = this.sqsQueue.node.defaultChild as sqs.CfnQueue;
cfnSqsQueue.overrideLogicalId('S3TransferQueue')
addCfnNagSuppressRules(cfnSqsQueue, [
Expand Down Expand Up @@ -155,7 +175,7 @@ export class CommonStack extends Construct {
resources: ["*"],
effect: iam.Effect.ALLOW,
principals: [
new iam.AccountRootPrincipal()
new iam.AccountRootPrincipal()
],
}),
],
Expand Down
3 changes: 2 additions & 1 deletion source/lib/ec2-finder-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ export class Ec2FinderStack extends Construct {
`echo "export SRC_ENDPOINT=${props.env.SRC_ENDPOINT}" >> env.sh`,
`echo "export SRC_CREDENTIALS=${props.env.SRC_CREDENTIALS}" >> env.sh`,
`echo "export SRC_IN_CURRENT_ACCOUNT=${props.env.SRC_IN_CURRENT_ACCOUNT}" >> env.sh`,
`echo "export PAYER_REQUEST=${props.env.PAYER_REQUEST}" >> env.sh`,

`echo "export DEST_BUCKET=${props.env.DEST_BUCKET}" >> env.sh`,
`echo "export DEST_PREFIX=${props.env.DEST_PREFIX}" >> env.sh`,
Expand Down Expand Up @@ -285,7 +286,7 @@ export class Ec2FinderStack extends Construct {
runtime: lambda.Runtime.PYTHON_3_9,
handler: "lambda_function.lambda_handler",
code: lambda.Code.fromAsset(
path.join(__dirname, "../lambda")
path.join(__dirname, "../lambda/asg-helper")
),
memorySize: 256,
timeout: Duration.minutes(15),
Expand Down
1 change: 1 addition & 0 deletions source/lib/ec2-worker-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ export class Ec2WorkerStack extends Construct {
`echo "export SRC_ENDPOINT=${props.env.SRC_ENDPOINT}" >> env.sh`,
`echo "export SRC_CREDENTIALS=${props.env.SRC_CREDENTIALS}" >> env.sh`,
`echo "export SRC_IN_CURRENT_ACCOUNT=${props.env.SRC_IN_CURRENT_ACCOUNT}" >> env.sh`,
`echo "export PAYER_REQUEST=${props.env.PAYER_REQUEST}" >> env.sh`,

`echo "export DEST_BUCKET=${props.env.DEST_BUCKET}" >> env.sh`,
`echo "export DEST_PREFIX=${props.env.DEST_PREFIX}" >> env.sh`,
Expand Down
Loading

0 comments on commit 0baf83b

Please sign in to comment.