Skip to content

Commit

Permalink
add publish to sns functionality to raw sync lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Sep 19, 2024
1 parent 51420b9 commit fb220e6
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 61 deletions.
1 change: 1 addition & 0 deletions config/develop/namespaced/lambda-dispatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ template:
dependencies:
- develop/namespaced/lambda-dispatch-role.yaml
- develop/namespaced/sqs-input-to-dispatch.yaml
- develop/namespaced/sns-dispatch.yaml
- develop/s3-cloudformation-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch"
parameters:
Expand Down
8 changes: 2 additions & 6 deletions config/develop/namespaced/lambda-raw-sync-role.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
template:
path: lambda-raw-role.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-raw-role"
path: lambda-raw-sync-role.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync-role"
dependencies:
- develop/namespaced/sns-dispatch.yaml
- develop/namespaced/sqs-input-to-dispatch.yaml
- develop/s3-cloudformation-bucket.yaml
- develop/s3-input-bucket.yaml
- develop/s3-raw-bucket.yaml
parameters:
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
S3TargetBucketName: {{ stack_group_config.raw_bucket_name }}
Expand Down
4 changes: 2 additions & 2 deletions config/develop/namespaced/lambda-raw-sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ template:
artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda"
dependencies:
- develop/namespaced/lambda-raw-sync-role.yaml
- develop/namespaced/sns-dispatch.yaml
- develop/s3-cloudformation-bucket.yaml
- develop/s3-raw-bucket.yaml
- develop/s3-input-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync"
parameters:
RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync-role::RoleArn"
S3InputBucket: {{ stack_group_config.input_bucket_name }}
S3InputKeyPrefix: "{{ stack_group_config.namespace }}/"
S3RawBucket: {{ stack_group_config.raw_bucket_name }}
S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/"
SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn"
stack_tags: {{ stack_group_config.default_stack_tags }}
1 change: 1 addition & 0 deletions config/prod/namespaced/lambda-dispatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ template:
dependencies:
- prod/namespaced/lambda-dispatch-role.yaml
- prod/namespaced/sqs-input-to-dispatch.yaml
- prod/namespaced/sns-dispatch.yaml
- prod/s3-cloudformation-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-dispatch"
parameters:
Expand Down
11 changes: 11 additions & 0 deletions config/prod/namespaced/lambda-raw-sync-role.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
template:
path: lambda-raw-sync-role.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync-role"
dependencies:
- prod/namespaced/sns-dispatch.yaml
parameters:
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
S3TargetBucketName: {{ stack_group_config.raw_bucket_name }}
SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn"
stack_tags:
{{ stack_group_config.default_stack_tags }}
18 changes: 18 additions & 0 deletions config/prod/namespaced/lambda-raw-sync.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
template:
type: sam
path: src/lambda_function/raw_sync/template.yaml
artifact_bucket_name: {{ stack_group_config.template_bucket_name }}
artifact_prefix: "{{ stack_group_config.namespace }}/src/lambda"
dependencies:
- prod/namespaced/lambda-raw-sync-role.yaml
- prod/namespaced/sns-dispatch.yaml
- prod/s3-cloudformation-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-raw-sync"
parameters:
RoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-raw-sync-role::RoleArn"
S3InputBucket: {{ stack_group_config.input_bucket_name }}
S3InputKeyPrefix: "{{ stack_group_config.namespace }}/"
S3RawBucket: {{ stack_group_config.raw_bucket_name }}
S3RawKeyPrefix: "{{ stack_group_config.namespace }}/json/"
SNSTopicArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-dispatch::SnsTopicArn"
stack_tags: {{ stack_group_config.default_stack_tags }}
162 changes: 122 additions & 40 deletions src/lambda_function/raw_sync/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
the export is submitted to the raw Lambda (via the dispatch SNS topic) for processing.
"""

import json
import logging
import os
import struct
Expand Down Expand Up @@ -41,16 +42,57 @@ def lambda_handler(event: dict, context: dict) -> None:
input_key_prefix = os.environ.get("INPUT_S3_KEY_PREFIX")
raw_bucket = os.environ.get("RAW_S3_BUCKET")
raw_key_prefix = os.environ.get("RAW_S3_KEY_PREFIX")
dispatch_sns_arn = os.environ.get("SNS_TOPIC_ARN")
main(
event=event,
s3_client=s3_client,
input_bucket=input_bucket,
input_key_prefix=input_key_prefix,
raw_bucket=raw_bucket,
raw_key_prefix=raw_key_prefix,
dispatch_sns_arn=dispatch_sns_arn,
)


def append_s3_key(key: str, key_format: str, result: dict) -> None:
"""
Organizes an S3 object key by appending it to the appropriate entry in the result dictionary
This is a helper function for `list_s3_objects`.
Args:
key (str): The S3 object key to process.
key_format (str): The format of the key, either "raw" or "input".
result (dict): The dictionary where keys are appended. For the "raw" format, it is a
nested dictionary structured as result[data_type][cohort]. For "input",
it is structured as result[cohort].
Returns:
None
"""
if not key.endswith("/"): # Ignore keys that represent "folders"
key_components = key.split("/")
if key_format == "raw":
try:
data_type = next(
part.split("=")[1]
for part in key_components
if part.startswith("dataset=")
)
cohort = next(
part.split("=")[1]
for part in key_components
if part.startswith("cohort=")
)
result[data_type][cohort].append(key)
except StopIteration:
# Skip keys that don't match the expected pattern
return
elif key_format == "input" and len(key_components) == 3:
cohort = key_components[1]
result[cohort].append(key)


def list_s3_objects(
s3_client: boto3.client, bucket: str, key_prefix: str, key_format: str
) -> dict:
Expand Down Expand Up @@ -115,43 +157,25 @@ def list_s3_objects(
result = defaultdict(lambda: defaultdict(list))
elif key_format == "input":
result = defaultdict(list)

for response in response_iterator:
for obj in response.get("Contents", []):
key = obj["Key"]
key_components = key.split("/")
if key_format == "raw":
try:
data_type = next(
part.split("=")[1]
for part in key_components
if part.startswith("dataset=")
)
cohort = next(
part.split("=")[1]
for part in key_components
if part.startswith("cohort=")
)
result[data_type][cohort].append(key)
except StopIteration:
# Skip keys that don't match the expected pattern
continue
elif key_format == "input" and len(key_components) == 3:
cohort = key_components[1]
result[cohort].append(key)

append_s3_key(
key=key,
key_format=key_format,
result=result,
)
return result


def match_corresponding_raw_object(
namespace: str,
data_type: str,
cohort: str,
file_identifier: str,
expected_key: str,
raw_keys: list[dict],
) -> Optional[str]:
"""
Find a matching S3 key for a given export file.
Find a matching raw object for a given export file and filename.
Given a `namespace`, `cohort`, `data_type`, and `filename`, the matching
S3 key conforms to:
Expand All @@ -164,18 +188,20 @@ def match_corresponding_raw_object(
cohort (str): The cohort name
file_identifier (str): The identifier of the original JSON file. The identifier is
the basename without any extensions.
expected_key (str): The key of the corresponding raw object.
raw_keys (dict): A dictionary formatted as the dictionary returned by `list_s3_objects`.
Returns (str): The matching S3 key from `raw_keys`, or None if no match is found.
"""
expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/{file_identifier}.ndjson.gz"
logger.debug(f"Expecting to find matching object at {expected_key}")

# Navigate through raw_keys to locate the correct `data_type` and `cohort`
if data_type in raw_keys:
if cohort in raw_keys[data_type]:
# Iterate through the list of keys under the specified `data_type`` and `cohort`
for key in raw_keys[data_type][cohort]:
if key == expected_key:
logger.debug(f"Found matching object {expected_key}")
return key
return None

Expand Down Expand Up @@ -235,11 +261,11 @@ def unpack_eocd_fields(body: bytes, eocd_offset: int) -> list[int]:
Both are int type.
"""
eocd_fields = struct.unpack("<4s4H2LH", body[eocd_offset : eocd_offset + 22])
logger.info(f"EOCD Record: {eocd_fields}")
logger.debug(f"EOCD Record: {eocd_fields}")
central_directory_offset = eocd_fields[-2]
central_directory_size = eocd_fields[-3]
logger.info(f"Central Directory Offset: {central_directory_offset}")
logger.info(f"Central Directory Size: {central_directory_size}")
logger.debug(f"Central Directory Offset: {central_directory_offset}")
logger.debug(f"Central Directory Size: {central_directory_size}")
return central_directory_offset, central_directory_size


Expand Down Expand Up @@ -308,13 +334,17 @@ def list_files_in_archive(
Defaults to 64 KB.
Returns:
list[str]: A list of file names contained within the ZIP archive, filtered to exclude:
list[dict]: A list of dict with information about the files contained within the ZIP archive.
The dict has keys `filename` and `file_size`, which contain the respective values from
the ZipInfo object.
Files are filtered to exclude:
- Directories (i.e., paths containing "/").
- Files named "Manifest".
- Empty files (file size == 0).
If no files match the criteria or the EOCD record is not found, it returns an empty list.
If no files match the criteria or the EOCD record is not found, an empty list is returned.
Notes:
- The function may trigger multiple recursive calls if the EOCD record is large or non-existent,
Expand Down Expand Up @@ -387,17 +417,53 @@ def list_files_in_archive(
and "Manifest" not in zip_info.filename
and zip_info.file_size > 0
):
file_list.append(zip_info.filename)
file_object = {
"filename": zip_info.filename,
"file_size": zip_info.file_size,
}
file_list.append(file_object)
return file_list


def publish_to_sns(
bucket: str, key: str, path: str, file_size: int, sns_arn: str
) -> None:
"""
Publishes file information to an SNS topic.
We use this function to publish a message to the dispatch SNS topic, allowing
the raw Lambda to process this file and write it as an object to the
raw S3 bucket.
Args:
bucket (str): The input S3 bucket.
key (str): The S3 key of the export.
path (str): The file path within the export.
file_size (int): The size of the file in bytes.
sns_arn (str): The ARN of the dispatch SNS topic.
Returns:
None
"""
sns_client = boto3.client("sns")
file_info = {
"Bucket": bucket,
"Key": key,
"Path": path,
"FileSize": file_size,
}
logger.info(f"Publishing {file_info} to {sns_arn}")
sns_client.publish(TopicArn=sns_arn, Message=json.dumps(file_info))


def main(
event: dict,
s3_client: boto3.client,
input_bucket: str,
input_key_prefix: str,
raw_bucket: str,
raw_key_prefix: str,
dispatch_sns_arn: str,
) -> None:
export_keys = list_s3_objects(
s3_client=s3_client,
Expand All @@ -411,26 +477,42 @@ def main(
key_prefix=raw_key_prefix,
key_format="raw",
)
for export_key in export_keys:
for export_key in sum(export_keys.values(), []):
# input bucket keys are formatted like `{namespace}/{cohort}/{export_basename}`
namespace, cohort = export_key.split("/")[:2]
file_list = list_files_in_archive(
s3_client=s3_client,
bucket=input_bucket,
key=export_key,
)
for filename in file_list:
for file_object in file_list:
filename = file_object["filename"]
logger.info(
f"Checking corresponding raw object for {filename} "
f"from s3://{input_bucket}/{export_key}"
)
data_type = filename.split("_")[0]
file_identifier = filename.split(".")[0]
expected_key = (
f"{namespace}/json/dataset={data_type}"
f"/cohort={cohort}/{file_identifier}.ndjson.gz"
)
corresponding_raw_object = match_corresponding_raw_object(
namespace=namespace,
cohort=cohort,
data_type=data_type,
file_identifier=filename,
cohort=cohort,
expected_key=expected_key,
raw_keys=raw_keys,
)
if corresponding_raw_object is None:
logger.info(
f"Did not find corresponding file for {filename} from "
f"s3://{raw_bucket}/{export_key} in raw bucket "
f"s3://{raw_bucket}/{namespace}/"
f"Did not find corresponding raw object for {filename} from "
f"s3://{input_bucket}/{export_key} at "
f"s3://{raw_bucket}/{expected_key}"
)
publish_to_sns(
bucket=input_bucket,
key=export_key,
path=filename,
file_size=file_object["file_size"],
sns_arn=dispatch_sns_arn,
)
5 changes: 5 additions & 0 deletions src/lambda_function/raw_sync/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ Parameters:
Type: String
Description: S3 key prefix where files are written.

SNSTopicArn:
Type: String
Description: The ARN of the dispatch SNS topic.

LambdaPythonVersion:
Type: String
Description: Python version to use for this lambda function
Expand All @@ -51,6 +55,7 @@ Resources:
INPUT_S3_KEY_PREFIX: !Ref S3InputKeyPrefix
RAW_S3_BUCKET: !Ref S3RawBucket
RAW_S3_KEY_PREFIX: !Ref S3RawKeyPrefix
SNS_TOPIC_ARN: !Ref SNSTopicArn

Outputs:
RawSyncFunctionArn:
Expand Down
Loading

0 comments on commit fb220e6

Please sign in to comment.