Skip to content

Commit

Permalink
Merge pull request #1 from dylanmcreynolds/caerbannog
Browse files Browse the repository at this point in the history
Caerbannog
  • Loading branch information
dylanmcreynolds authored Feb 14, 2024
2 parents 833e774 + 8bfe6df commit 8e96134
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 28 deletions.
7 changes: 7 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[flake8]
exclude =
.git,
__pycache__,
versioneer.py,
src/_version.py,
max-line-length = 115
50 changes: 50 additions & 0 deletions .github/workflows/publish_container.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Create and publish image

on:
push:
branches: ['main']
tags: ['v*']

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
build-and-push-image:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write

steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Log in to the Container registry
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

- name: Build and push Docker image
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
with:
context: .
file: Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
35 changes: 35 additions & 0 deletions .github/workflows/python_app.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: mlex ingest tiled

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.11
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
pip install .
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,6 @@ dmypy.json
api_keys.yml
users.yml

splash_flows_globus/
splash_flows_globus/

src/_version.py
7 changes: 3 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ FROM python:3.11

WORKDIR /app

COPY ./requirements.txt /code/requirements.txt
COPY . /app
RUN pip install --no-cache-dir --upgrade .

RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt

COPY . /app

CMD ["python", "main.py"]
# CMD ["python", "main.py"]s
2 changes: 2 additions & 0 deletions data/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
a,b,c
1,2,3
16 changes: 9 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["hatchling", "hatch-vcs"]
build-backend = "hatchling.build"

[project]
name = "tiled_listener"
name = "tiled_ingestor"
description = "Process that listens on a queue for items to add to tiled"
readme = { file = "README.md", content-type = "text/markdown" }

Expand All @@ -20,18 +20,20 @@ classifiers = [

dependencies = [
"pika",
"tiled[client]"

"tiled[server]==0.1.0a114",
"python-dotenv"
]
dev-dependencies = [

dynamic = ["version"]


[project.optional-dependencies]
dev = [
"pytest",
"pre-commit",
"flake8"
]

dynamic = ["version"]


[tool.hatch]
version.source = "vcs"
build.hooks.vcs.version-file = "src/_version.py"
Expand Down
File renamed without changes.
16 changes: 0 additions & 16 deletions src/_version.py

This file was deleted.

89 changes: 89 additions & 0 deletions src/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
import logging
import sys

from tiled.catalog.register import identity, register
from tiled.catalog import from_uri
import tiled.config

logger = logging.getLogger(__name__)

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


async def process_file(
file_path: str,
tiled_config_tree_path: str = "/",
config_path: str = "/deploy/config",
path_prefix: str = "/"
):
"""
Process a file that already exists and register it with tiled as a catalog.
We looks for a match in the tiled config file based on tiled_config_tree_path. This will be
the tree that we import to. Should work with folders of TIFF sequence as well as single filed like
hdf5 or datasets like zarr. But honestly, on tiff sequence is tested.
Args:
file_path (str): The path of the file to be processed.
tiled_config_tree_path (str, optional): The path of the tiled tree configuration. Defaults to "/".
config_path (str, optional): The path of the configuration file. Defaults to "/deploy/config".
path_prefix (str, optional): The prefix to be added to the registered path. Defaults to "/".
Raises:
AssertionError: If no tiled tree is configured for the provided tree path.
AssertionError: If the matching tiled tree is not a catalog.
Returns:
None
"""
config = tiled.config.parse_configs(config_path)
# find the tree in tiled configuration that matches the provided tiled_tree_path
matching_tree = next(
(tree for tree in config["trees"] if tree["path"] == tiled_config_tree_path), None
)
assert matching_tree, f"No tiled tree configured for tree path {tiled_config_tree_path}"
assert (
matching_tree["tree"] == "catalog"
), f"Matching tiled tree {tiled_config_tree_path} is not a catalog"

# using thre tree in the configuration, generate a catalog(adapter)
catalog_adapter = from_uri(
matching_tree["args"]["uri"],
readable_storage=matching_tree["args"]["readable_storage"],
adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype")
)

# Register with tiled. This writes entries into the database for all of the nodes down to the data node
await register(
catalog=catalog_adapter,
key_from_filename=identity,
path=file_path,
prefix=path_prefix,
overwrite=False)


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "outside_container":
# if we're debugging this outside of a container, we might want our
# own settings
import dotenv
dotenv.load_dotenv()
asyncio.run(
process_file(
"../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313",
config_path="../mlex_tomo_framework/tiled/deploy/config",
path_prefix="/beamlines/8.3.2/recons/"
)
)
else:
from pprint import pprint
import os
pprint(os.environ)
asyncio.run(
process_file(
# "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257",
"/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313",
path_prefix="/beamlines/8.3.2/recons/"
)
)

Empty file added src/rabbitmq/__init__.py
Empty file.
40 changes: 40 additions & 0 deletions src/rabbitmq/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import pika
import json

from ..ingest import process_file
from .schemas import NEW_FILE_PATH_KEY


def process_message(ch, method, properties, body):
# Decode the JSON message
message = json.loads(body)
# Prcess the message
# TODO: Add your logic here

# Acknowledge the message
ch.basic_ack(delivery_tag=method.delivery_tag)
new_file_path = message.get(NEW_FILE_PATH_KEY)
assert (
new_file_path
), f"Message received from rabbitMQ does not contain {NEW_FILE_PATH_KEY}"
asyncio.run(process_file(new_file_path))


# Connect to RabbitMQ
credentials = pika.PlainCredentials("guest", "guest")
parameters = pika.ConnectionParameters("localhost", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="tomo_reconstruction")

# Set the prefetch count to limit the number of unacknowledged messages
channel.basic_qos(prefetch_count=1)

# Start consuming messages
channel.basic_consume(queue="tomo_reconstruction", on_message_callback=process_message)

# Enter a loop to continuously consume messages
channel.start_consuming()
43 changes: 43 additions & 0 deletions src/rabbitmq/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import json
import logging
import sys

import pika
from pika import DeliveryMode
from pika.exchange_type import ExchangeType

from schemas import NEW_FILE_PATH_KEY


logging.basicConfig(level=logging.INFO)


def send_message(new_file: str):
json_message = json.dumps({NEW_FILE_PATH_KEY: new_file})

credentials = pika.PlainCredentials("guest", "guest")
parameters = pika.ConnectionParameters("localhost", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(
exchange="test_exchange",
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False,
)

print("Sending message to create a queue")
channel.basic_publish(
"mlexchange_exchange",
"tomo_reconstruction",
json_message,
pika.BasicProperties(
content_type="application/json", delivery_mode=DeliveryMode.Transient
),
)
connection.close()


if __name__ == "__main__":
new_file = sys.argv[1]
1 change: 1 addition & 0 deletions src/rabbitmq/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NEW_FILE_PATH_KEY = "new_file_path"

0 comments on commit 8e96134

Please sign in to comment.