Stactools-Pipelines is a large scale, turnkey processing framework to create STAC metadata and cloud optimized formats for data stored on S3 using stactools packages. It's goal is to make it easier for teams to develop scalable metadata pipelines by reducing the infrastructure boilerplate they need to build themselves.
- Python>=3.11
- Docker
- tox
- awscli
- An IAM role with sufficient permissions for creating, destroying and modifying the relevant stack resources.
This framework expects that a stactools package has already been created for your dataset of interest and that the dataset is stored on S3. Check out stactools-packages to find out if a package already exists or how to create your own.
This is a template repository. You can follow these instructions to create your organization's own repo from the template. Once you have created your own repository from the template, you develop your own pipelines.
To install for pipeline development run
pip install .["dev"]
An example pipeline structure for Sentinel 1 is included in the repo here to use as a reference for developing your own pipeline.
To develop a new pipeline create a directory in pipelines using a simple name for your pipeline dataset.
At a minimum you need to include
-
app.py
A Lambda application with ahandler
function defined which consumes anSQSEvent
creates a STAC Item and posts it to theingestor
. -
requirements.txt
That contains theapp.py
's dependencies so a container image can be built. -
config.yaml
Your pipeline's configuration settings. -
test_app.py
Apytest
based unit test file which exercisesapp.py
. -
collection.py
A Lambda application with ahandler
function which creates a STAC Collection and posts it to theingestor
. -
test_collection.py
Apytest
based unit test file which exercisescollection.py
.
If you have pre-existing granules in your bucket and your bucket has S3 Inventory enabled, you can also optionally include functions which read the inventory to process these pre-existing granules. These include
-
historic.py
A Lambda application which queries rows from the bucket's inventory file loaded into an Athena table. -
test_historic.py
Apytest
based unit test file which exerciseshistoric.py
.
-
id
Required The pipeline name. This should be the same as the pipeline's parent folder and should use_
s for separators to support Python module discovery. -
compute
Required Currently only theawslambda
value is supported. -
ingestor_url
Required The ingestor API's root path with the stage value included. -
secret_arn
Required The secret manager ARN for using a Cognito JWKS implementation with the ingestor API. -
sns
Optional The SNS topic to listen to for new granule notifications. -
inventory_location
Optional The location of an S3 inventory or file listing that can be used by thepipeline
to process and ingest existing granules. Include ahistoric.py
file (and atest_historic.py
) in your pipeline which implements aquery_inventory
,row_to_message_body
andhandler
method to query the inventory and send the results to the processing queue. -
historic_frequency
Optional If aninventory_location
is included thehistoric_frequency
(how often in hours thehistoric.py
is run) must also be included. A value of0
indicates that thehistoric.py
function will run a single time on deployment and process the entire inventory. If a value of >0
is specified then aninitial_chunk
must also be specified. The pipeline will build a stack which uses these values to incrementally chunk through the inventory file withcron
executions to process until the entire inventory has been processed.
Your pipeline's app.py
function receives notification messages about granules to be processed and uses the information in these messages to create new STAC items (and optionally, new cloud optimized versions of the data). You'll need to understand a bit about the strcture of the files that make up a granule in your dataset and a bit about how the SNS notifications are configured for your bucket (if they are enabled).
As an example, note that the SNS topic for the Sentinel-1 bucket uses a custom message structure that includes an id
and path
property describing the "directory" key rather than individual file keys like most SNS configurations. Note that the Sentinel-1 app.py has logic to correctly parse this message and that the Sentinel-1 stactools package actually expects an href
value that is a "directory" rather than a file.
At a minimum the handler
method in your pipeline's app.py
should create a STAC item and post that item to the ingestor endpoint.
To make unit testing easier and more consistent, stactools-pipelines
includes a number of pytest fixtures and mocks which you can use to isolate your handler function from external services and verify that it behaving as expected. These fixtures use some pytest parameterization magic to patch the appropriate imports in your pipeline's app.py
when your tests are run.
Using the Sentinel-1 pipeline as an example. The test_handler
[is decorated with[(https://docs.pytest.org/en/7.1.x/how-to/parametrize.html) the pipeline_id
and module
. When the test is run all the fixtures included in conftest.py patch your app.py
's use of the real module allowing you to test in isolation. It is recommended to use these fixtures for unit testing and behavior verification.
To run the unit tests for your pipeline
Create an environment setting using your pipline name.
$ export PIPELINE=<Your pipeline name>
And call tox to run your unit tests
$ tox
Deploying a pipeline will use the pipeline's config.yaml
to deploy all the necessary infrastructure. This includes STAC Collection and Item creation Lambdas and any queues or Athena tables that are required. A new completely separate Cloudformation stack is created for each pipeline. The pipleine being deployed is controlled by the PIPELINE
environment variable. The collection.py
Lambda is executed automatically by the stack after deployment in order to create the STAC collection in the target STAC API. If an sns
was specified in the config.yaml
the app.py
Lambda will be subscribed and will begin processing notifications as soon as the stack deployment is complete.
To deploy a pipeline
Create a development virtual environment with
$ tox -e dev
$ source devenv/bin/activate
Create environment settings for your pipeline deployment
$ export PROJECT=<The project name for resource cost tracking>
$ export PIPELINE=<Your pipeline name>
With an AWS profile enabled with sufficient permissions build and push your pipeline image with
$ python image_builder.py
This will package your pipeline's Lambda code and the its dependencies specified in the requirements.txt into a Docker image and push it to ECR for use by the pipeline Lambdas.
Deploy the infrastructure for your pipeline with
$ cdk deploy
This will create Cloudformation stack for your pipline.
To create a development virtual environment for core repository development use
$ tox -e dev
$ source devenv/bin/activate