From ec2d39d5d11fe3f8816650b9e3ec3fac3173b3c7 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Thu, 8 Feb 2024 17:57:57 -0800 Subject: [PATCH 1/8] initial ingest --- data/test.csv | 2 ++ pyproject.toml | 16 ++++++++++------ src/_version.py | 4 ++-- src/consumer.py | 30 ++++++++++++++++++++++++++++++ src/ingest.py | 32 ++++++++++++++++++++++++++++++++ src/main.py | 0 src/producer.py | 25 +++++++++++++++++++++++++ 7 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 data/test.csv create mode 100644 src/consumer.py create mode 100644 src/ingest.py delete mode 100644 src/main.py create mode 100644 src/producer.py diff --git a/data/test.csv b/data/test.csv new file mode 100644 index 0000000..0eadb69 --- /dev/null +++ b/data/test.csv @@ -0,0 +1,2 @@ +a,b,c +1,2,3 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 91e8f33..0eecbce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,17 +19,21 @@ classifiers = [ ] dependencies = [ - "pika", - "tiled[client]" - + "pika" ] -dev-dependencies = [ + +dynamic = ["version"] + + +[project.optional-dependencies] +dev = [ "pytest", "pre-commit", "flake8" ] - -dynamic = ["version"] +tiled = [ + "tiled[server]" +] [tool.hatch] diff --git a/src/_version.py b/src/_version.py index a9bda8e..c423085 100644 --- a/src/_version.py +++ b/src/_version.py @@ -12,5 +12,5 @@ __version_tuple__: VERSION_TUPLE version_tuple: VERSION_TUPLE -__version__ = version = '0.1.dev0+d20240206' -__version_tuple__ = version_tuple = (0, 1, 'dev0', 'd20240206') +__version__ = version = '0.1.dev2+g833e774.d20240209' +__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g833e774.d20240209') diff --git a/src/consumer.py b/src/consumer.py new file mode 100644 index 0000000..af30dba --- /dev/null +++ b/src/consumer.py @@ -0,0 +1,30 @@ +import pika +import json + + +def callback(ch, method, properties, body): + # Decode the JSON message + message = json.loads(body) + print(message) + # Prcess the message + # TODO: Add your logic here + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + +# Connect to RabbitMQ +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +# Declare the queue +channel.queue_declare(queue='mle_ingest') + +# Set the prefetch count to limit the number of unacknowledged messages +channel.basic_qos(prefetch_count=1) + +# Start consuming messages +channel.basic_consume(queue='mle_ingest', on_message_callback=callback) + +# Enter a loop to continuously consume messages +channel.start_consuming() diff --git a/src/ingest.py b/src/ingest.py new file mode 100644 index 0000000..f99fd91 --- /dev/null +++ b/src/ingest.py @@ -0,0 +1,32 @@ +import asyncio +import logging +import os +from pathlib import Path + +from tiled.catalog.register import register +from tiled.catalog import from_uri +import tiled.config + +logger = logging.getLogger(__name__) + +TILED_URL = os.getenv("TILED_URL", "http://127.0.0.1:8000") +TILED_API_KEY = os.getenv("TILED_SINGLE_USER_API_KEY") + + +def register_file(path: str): + config = tiled.config.parse_configs("../mlex_tomo_framework/tiled/deploy/config") + first_tree = config['trees'][0] + assert first_tree['tree'] == 'catalog' + catalog_adapter = from_uri( + first_tree['args']['uri'], + readable_storage=first_tree['args']['readable_storage'], + adapters_by_mimetype=config['media_types'] or None, + ) + + asyncio.run(register( + catalog=catalog_adapter, + path=Path(path) + )) + + +register_file("./test.csv") \ No newline at end of file diff --git a/src/main.py b/src/main.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/producer.py b/src/producer.py new file mode 100644 index 0000000..017a872 --- /dev/null +++ b/src/producer.py @@ -0,0 +1,25 @@ +import pika +import json + +# Connect to RabbitMQ server +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +# Declare the queue +channel.queue_declare(queue='mle_ingest') + +# Define the message to send +message = { + 'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3' +} + +# Convert the message to JSON +message_json = json.dumps(message) + +# Publish the message to the queue +channel.basic_publish(exchange='', routing_key='mle_ingest', body=message_json) + +# Close the connection +connection.close() From a8bfa539ee8ddbc0b90691ce3f27a3a774ce751e Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Fri, 9 Feb 2024 11:03:30 -0800 Subject: [PATCH 2/8] rabbmit mq code --- src/consume.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/publish.py | 28 +++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/consume.py create mode 100644 src/publish.py diff --git a/src/consume.py b/src/consume.py new file mode 100644 index 0000000..db93203 --- /dev/null +++ b/src/consume.py @@ -0,0 +1,52 @@ +"""Basic message consumer example""" +import functools +import logging +import pika +from pika.exchange_type import ExchangeType + +LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' + '-35s %(lineno) -5d: %(message)s') +LOGGER = logging.getLogger(__name__) + +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) + + +def on_message(chan, method_frame, header_frame, body, userdata=None): + """Called when a message is received. Log message and ack it.""" + LOGGER.info('Delivery properties: %s, message metadata: %s', method_frame, header_frame) + LOGGER.info('Userdata: %s, message body: %s', userdata, body) + chan.basic_ack(delivery_tag=method_frame.delivery_tag) + + +def main(): + """Main method.""" + credentials = pika.PlainCredentials('guest', 'guest') + parameters = pika.ConnectionParameters('localhost', credentials=credentials) + connection = pika.BlockingConnection(parameters) + + channel = connection.channel() + channel.exchange_declare( + exchange='mlexchange_exchange', + exchange_type=ExchangeType.direct, + passive=False, + durable=True, + auto_delete=False) + channel.queue_declare(queue='standard', auto_delete=True) + channel.queue_bind( + queue='standard', exchange='mlexchange_exchange', routing_key='tomo_reconstruction') + channel.basic_qos(prefetch_count=1) + + on_message_callback = functools.partial( + on_message, userdata='on_message_userdata') + channel.basic_consume('standard', on_message_callback) + + try: + channel.start_consuming() + except KeyboardInterrupt: + channel.stop_consuming() + + connection.close() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/publish.py b/src/publish.py new file mode 100644 index 0000000..c1b3c94 --- /dev/null +++ b/src/publish.py @@ -0,0 +1,28 @@ +import json +import logging +import pika +from pika import DeliveryMode +from pika.exchange_type import ExchangeType + +logging.basicConfig(level=logging.INFO) + +json_message = json.dumps({"file_path": "/path/to/file"}) + +credentials = pika.PlainCredentials('guest', 'guest') +parameters = pika.ConnectionParameters('localhost', credentials=credentials) +connection = pika.BlockingConnection(parameters) +channel = connection.channel() +channel.exchange_declare(exchange="test_exchange", + exchange_type=ExchangeType.direct, + passive=False, + durable=True, + auto_delete=False) + +print("Sending message to create a queue") +channel.basic_publish( + 'mlexchange_exchange', 'tomo_reconstruction', json_message, + pika.BasicProperties(content_type='application/json', + delivery_mode=DeliveryMode.Transient)) + + +connection.close() \ No newline at end of file From 8abf4ed3f50db3ee7d341407aa7b30f8d99dbdbf Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Fri, 9 Feb 2024 14:51:12 -0800 Subject: [PATCH 3/8] clean rabbit --- src/consume.py | 52 ------------------------------------------------- src/consumer.py | 8 +++++--- src/producer.py | 39 ++++++++++++++++++++----------------- src/publish.py | 28 -------------------------- 4 files changed, 26 insertions(+), 101 deletions(-) delete mode 100644 src/consume.py delete mode 100644 src/publish.py diff --git a/src/consume.py b/src/consume.py deleted file mode 100644 index db93203..0000000 --- a/src/consume.py +++ /dev/null @@ -1,52 +0,0 @@ -"""Basic message consumer example""" -import functools -import logging -import pika -from pika.exchange_type import ExchangeType - -LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' - '-35s %(lineno) -5d: %(message)s') -LOGGER = logging.getLogger(__name__) - -logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) - - -def on_message(chan, method_frame, header_frame, body, userdata=None): - """Called when a message is received. Log message and ack it.""" - LOGGER.info('Delivery properties: %s, message metadata: %s', method_frame, header_frame) - LOGGER.info('Userdata: %s, message body: %s', userdata, body) - chan.basic_ack(delivery_tag=method_frame.delivery_tag) - - -def main(): - """Main method.""" - credentials = pika.PlainCredentials('guest', 'guest') - parameters = pika.ConnectionParameters('localhost', credentials=credentials) - connection = pika.BlockingConnection(parameters) - - channel = connection.channel() - channel.exchange_declare( - exchange='mlexchange_exchange', - exchange_type=ExchangeType.direct, - passive=False, - durable=True, - auto_delete=False) - channel.queue_declare(queue='standard', auto_delete=True) - channel.queue_bind( - queue='standard', exchange='mlexchange_exchange', routing_key='tomo_reconstruction') - channel.basic_qos(prefetch_count=1) - - on_message_callback = functools.partial( - on_message, userdata='on_message_userdata') - channel.basic_consume('standard', on_message_callback) - - try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() - - connection.close() - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/consumer.py b/src/consumer.py index af30dba..b56866d 100644 --- a/src/consumer.py +++ b/src/consumer.py @@ -14,17 +14,19 @@ def callback(ch, method, properties, body): # Connect to RabbitMQ -connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +credentials = pika.PlainCredentials('guest', 'guest') +parameters = pika.ConnectionParameters('localhost', credentials=credentials) +connection = pika.BlockingConnection(parameters) channel = connection.channel() # Declare the queue -channel.queue_declare(queue='mle_ingest') +channel.queue_declare(queue='tomo_reconstruction') # Set the prefetch count to limit the number of unacknowledged messages channel.basic_qos(prefetch_count=1) # Start consuming messages -channel.basic_consume(queue='mle_ingest', on_message_callback=callback) +channel.basic_consume(queue='tomo_reconstruction', on_message_callback=callback) # Enter a loop to continuously consume messages channel.start_consuming() diff --git a/src/producer.py b/src/producer.py index 017a872..c1b3c94 100644 --- a/src/producer.py +++ b/src/producer.py @@ -1,25 +1,28 @@ -import pika import json +import logging +import pika +from pika import DeliveryMode +from pika.exchange_type import ExchangeType -# Connect to RabbitMQ server -connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) -channel = connection.channel() +logging.basicConfig(level=logging.INFO) -# Declare the queue -channel.queue_declare(queue='mle_ingest') +json_message = json.dumps({"file_path": "/path/to/file"}) -# Define the message to send -message = { - 'key1': 'value1', - 'key2': 'value2', - 'key3': 'value3' -} +credentials = pika.PlainCredentials('guest', 'guest') +parameters = pika.ConnectionParameters('localhost', credentials=credentials) +connection = pika.BlockingConnection(parameters) +channel = connection.channel() +channel.exchange_declare(exchange="test_exchange", + exchange_type=ExchangeType.direct, + passive=False, + durable=True, + auto_delete=False) -# Convert the message to JSON -message_json = json.dumps(message) +print("Sending message to create a queue") +channel.basic_publish( + 'mlexchange_exchange', 'tomo_reconstruction', json_message, + pika.BasicProperties(content_type='application/json', + delivery_mode=DeliveryMode.Transient)) -# Publish the message to the queue -channel.basic_publish(exchange='', routing_key='mle_ingest', body=message_json) -# Close the connection -connection.close() +connection.close() \ No newline at end of file diff --git a/src/publish.py b/src/publish.py deleted file mode 100644 index c1b3c94..0000000 --- a/src/publish.py +++ /dev/null @@ -1,28 +0,0 @@ -import json -import logging -import pika -from pika import DeliveryMode -from pika.exchange_type import ExchangeType - -logging.basicConfig(level=logging.INFO) - -json_message = json.dumps({"file_path": "/path/to/file"}) - -credentials = pika.PlainCredentials('guest', 'guest') -parameters = pika.ConnectionParameters('localhost', credentials=credentials) -connection = pika.BlockingConnection(parameters) -channel = connection.channel() -channel.exchange_declare(exchange="test_exchange", - exchange_type=ExchangeType.direct, - passive=False, - durable=True, - auto_delete=False) - -print("Sending message to create a queue") -channel.basic_publish( - 'mlexchange_exchange', 'tomo_reconstruction', json_message, - pika.BasicProperties(content_type='application/json', - delivery_mode=DeliveryMode.Transient)) - - -connection.close() \ No newline at end of file From 6c2e735ec9092daedf692e6d524bfa06719b324e Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Fri, 9 Feb 2024 17:55:01 -0800 Subject: [PATCH 4/8] added rabbitmq and updated ingest --- .flake8 | 7 +++++++ Dockerfile | 7 +++---- pyproject.toml | 9 +++------ src/__init__.py | 0 src/_version.py | 4 ++-- src/consumer.py | 32 ------------------------------ src/ingest.py | 38 ++++++++++++++++++++---------------- src/producer.py | 28 --------------------------- src/rabbitmq/__init__.py | 0 src/rabbitmq/consumer.py | 38 ++++++++++++++++++++++++++++++++++++ src/rabbitmq/producer.py | 42 ++++++++++++++++++++++++++++++++++++++++ src/rabbitmq/schemas.py | 1 + 12 files changed, 117 insertions(+), 89 deletions(-) create mode 100644 .flake8 create mode 100644 src/__init__.py delete mode 100644 src/consumer.py delete mode 100644 src/producer.py create mode 100644 src/rabbitmq/__init__.py create mode 100644 src/rabbitmq/consumer.py create mode 100644 src/rabbitmq/producer.py create mode 100644 src/rabbitmq/schemas.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..baebaf3 --- /dev/null +++ b/.flake8 @@ -0,0 +1,7 @@ +[flake8] +exclude = + .git, + __pycache__, + versioneer.py, + src/_version.py, +max-line-length = 115 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 64a13e4..b9f081f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,10 +2,9 @@ FROM python:3.11 WORKDIR /app -COPY ./requirements.txt /code/requirements.txt +COPY . /app +RUN pip install --no-cache-dir --upgrade . -RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt -COPY . /app -CMD ["python", "main.py"] \ No newline at end of file +# CMD ["python", "main.py"]s \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0eecbce..653e292 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ requires = ["hatchling", "hatch-vcs"] build-backend = "hatchling.build" [project] -name = "tiled_listener" +name = "tiled_ingestor" description = "Process that listens on a queue for items to add to tiled" readme = { file = "README.md", content-type = "text/markdown" } @@ -19,7 +19,8 @@ classifiers = [ ] dependencies = [ - "pika" + "pika", + "tiled[server]==0.1.0a114" ] dynamic = ["version"] @@ -31,10 +32,6 @@ dev = [ "pre-commit", "flake8" ] -tiled = [ - "tiled[server]" -] - [tool.hatch] version.source = "vcs" diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_version.py b/src/_version.py index c423085..1bc48fa 100644 --- a/src/_version.py +++ b/src/_version.py @@ -12,5 +12,5 @@ __version_tuple__: VERSION_TUPLE version_tuple: VERSION_TUPLE -__version__ = version = '0.1.dev2+g833e774.d20240209' -__version_tuple__ = version_tuple = (0, 1, 'dev2', 'g833e774.d20240209') +__version__ = version = '0.1.dev5+g8abf4ed.d20240210' +__version_tuple__ = version_tuple = (0, 1, 'dev5', 'g8abf4ed.d20240210') diff --git a/src/consumer.py b/src/consumer.py deleted file mode 100644 index b56866d..0000000 --- a/src/consumer.py +++ /dev/null @@ -1,32 +0,0 @@ -import pika -import json - - -def callback(ch, method, properties, body): - # Decode the JSON message - message = json.loads(body) - print(message) - # Prcess the message - # TODO: Add your logic here - - # Acknowledge the message - ch.basic_ack(delivery_tag=method.delivery_tag) - - -# Connect to RabbitMQ -credentials = pika.PlainCredentials('guest', 'guest') -parameters = pika.ConnectionParameters('localhost', credentials=credentials) -connection = pika.BlockingConnection(parameters) -channel = connection.channel() - -# Declare the queue -channel.queue_declare(queue='tomo_reconstruction') - -# Set the prefetch count to limit the number of unacknowledged messages -channel.basic_qos(prefetch_count=1) - -# Start consuming messages -channel.basic_consume(queue='tomo_reconstruction', on_message_callback=callback) - -# Enter a loop to continuously consume messages -channel.start_consuming() diff --git a/src/ingest.py b/src/ingest.py index f99fd91..c37c15a 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -1,7 +1,7 @@ import asyncio import logging -import os from pathlib import Path +import sys from tiled.catalog.register import register from tiled.catalog import from_uri @@ -9,24 +9,28 @@ logger = logging.getLogger(__name__) -TILED_URL = os.getenv("TILED_URL", "http://127.0.0.1:8000") -TILED_API_KEY = os.getenv("TILED_SINGLE_USER_API_KEY") +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) -def register_file(path: str): - config = tiled.config.parse_configs("../mlex_tomo_framework/tiled/deploy/config") - first_tree = config['trees'][0] - assert first_tree['tree'] == 'catalog' - catalog_adapter = from_uri( - first_tree['args']['uri'], - readable_storage=first_tree['args']['readable_storage'], - adapters_by_mimetype=config['media_types'] or None, +async def process_file(file_path: str, tiled_tree_path: str = "/"): + + config = tiled.config.parse_configs("/deploy/config") + # find the tree in configuration that matches the provided tiled_tree_path + matching_tree = next( + (tree for tree in config["trees"] if tree["path"] == tiled_tree_path), None ) + assert matching_tree, f"No tiled tree configured for tree path {tiled_tree_path}" + assert ( + matching_tree["tree"] == "catalog" + ), f"Matching tiled tree {tiled_tree_path} is not a catalog" - asyncio.run(register( - catalog=catalog_adapter, - path=Path(path) - )) - + catalog_adapter = from_uri( + matching_tree["args"]["uri"], + readable_storage=matching_tree["args"]["readable_storage"], + adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype"), + ) + response = await register(catalog=catalog_adapter, path=Path(file_path)) + print(response) -register_file("./test.csv") \ No newline at end of file +asyncio.run(process_file("/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257/")) +# asyncio.run(process_file("/tiled_storage/test1")) diff --git a/src/producer.py b/src/producer.py deleted file mode 100644 index c1b3c94..0000000 --- a/src/producer.py +++ /dev/null @@ -1,28 +0,0 @@ -import json -import logging -import pika -from pika import DeliveryMode -from pika.exchange_type import ExchangeType - -logging.basicConfig(level=logging.INFO) - -json_message = json.dumps({"file_path": "/path/to/file"}) - -credentials = pika.PlainCredentials('guest', 'guest') -parameters = pika.ConnectionParameters('localhost', credentials=credentials) -connection = pika.BlockingConnection(parameters) -channel = connection.channel() -channel.exchange_declare(exchange="test_exchange", - exchange_type=ExchangeType.direct, - passive=False, - durable=True, - auto_delete=False) - -print("Sending message to create a queue") -channel.basic_publish( - 'mlexchange_exchange', 'tomo_reconstruction', json_message, - pika.BasicProperties(content_type='application/json', - delivery_mode=DeliveryMode.Transient)) - - -connection.close() \ No newline at end of file diff --git a/src/rabbitmq/__init__.py b/src/rabbitmq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rabbitmq/consumer.py b/src/rabbitmq/consumer.py new file mode 100644 index 0000000..b776307 --- /dev/null +++ b/src/rabbitmq/consumer.py @@ -0,0 +1,38 @@ +import asyncio +import pika +import json + +from ..ingest import process_file +from .schemas import NEW_FILE_PATH_KEY + + +def process_message(ch, method, properties, body): + # Decode the JSON message + message = json.loads(body) + # Prcess the message + # TODO: Add your logic here + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + new_file_path = message.get(NEW_FILE_PATH_KEY) + assert new_file_path, f"Message received from rabbitMQ does not contain {NEW_FILE_PATH_KEY}" + asyncio.run(process_file(new_file_path)) + + +# Connect to RabbitMQ +credentials = pika.PlainCredentials("guest", "guest") +parameters = pika.ConnectionParameters("localhost", credentials=credentials) +connection = pika.BlockingConnection(parameters) +channel = connection.channel() + +# Declare the queue +channel.queue_declare(queue="tomo_reconstruction") + +# Set the prefetch count to limit the number of unacknowledged messages +channel.basic_qos(prefetch_count=1) + +# Start consuming messages +channel.basic_consume(queue="tomo_reconstruction", on_message_callback=process_message) + +# Enter a loop to continuously consume messages +channel.start_consuming() diff --git a/src/rabbitmq/producer.py b/src/rabbitmq/producer.py new file mode 100644 index 0000000..1f9c200 --- /dev/null +++ b/src/rabbitmq/producer.py @@ -0,0 +1,42 @@ +import json +import logging +import sys + +import pika +from pika import DeliveryMode +from pika.exchange_type import ExchangeType + +from schemas import NEW_FILE_PATH_KEY + + +logging.basicConfig(level=logging.INFO) + +def send_message(new_file: str): + json_message = json.dumps({NEW_FILE_PATH_KEY: new_file}) + + credentials = pika.PlainCredentials("guest", "guest") + parameters = pika.ConnectionParameters("localhost", credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.exchange_declare( + exchange="test_exchange", + exchange_type=ExchangeType.direct, + passive=False, + durable=True, + auto_delete=False, + ) + + print("Sending message to create a queue") + channel.basic_publish( + "mlexchange_exchange", + "tomo_reconstruction", + json_message, + pika.BasicProperties( + content_type="application/json", delivery_mode=DeliveryMode.Transient + ), + ) + connection.close() + + +if __name__ == "__main__": + new_file = sys.argv[1] diff --git a/src/rabbitmq/schemas.py b/src/rabbitmq/schemas.py new file mode 100644 index 0000000..c8a2363 --- /dev/null +++ b/src/rabbitmq/schemas.py @@ -0,0 +1 @@ +NEW_FILE_PATH_KEY = "new_file_path" From 446057c28ac5b3d84fb9a5210e62b46b32310505 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sat, 10 Feb 2024 13:57:57 -0800 Subject: [PATCH 5/8] updates --- .gitignore | 4 ++- pyproject.toml | 3 ++- src/_version.py | 16 ------------ src/ingest.py | 54 +++++++++++++++++++++++++++++++--------- src/rabbitmq/consumer.py | 4 ++- src/rabbitmq/producer.py | 1 + 6 files changed, 51 insertions(+), 31 deletions(-) delete mode 100644 src/_version.py diff --git a/.gitignore b/.gitignore index 22f1f5d..3e3e2bd 100644 --- a/.gitignore +++ b/.gitignore @@ -127,4 +127,6 @@ dmypy.json api_keys.yml users.yml -splash_flows_globus/ \ No newline at end of file +splash_flows_globus/ + +src/_version.py \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 653e292..c8890e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,8 @@ classifiers = [ dependencies = [ "pika", - "tiled[server]==0.1.0a114" + "tiled[server]==0.1.0a114", + "python-dotenv" ] dynamic = ["version"] diff --git a/src/_version.py b/src/_version.py deleted file mode 100644 index 1bc48fa..0000000 --- a/src/_version.py +++ /dev/null @@ -1,16 +0,0 @@ -# file generated by setuptools_scm -# don't change, don't track in version control -TYPE_CHECKING = False -if TYPE_CHECKING: - from typing import Tuple, Union - VERSION_TUPLE = Tuple[Union[int, str], ...] -else: - VERSION_TUPLE = object - -version: str -__version__: str -__version_tuple__: VERSION_TUPLE -version_tuple: VERSION_TUPLE - -__version__ = version = '0.1.dev5+g8abf4ed.d20240210' -__version_tuple__ = version_tuple = (0, 1, 'dev5', 'g8abf4ed.d20240210') diff --git a/src/ingest.py b/src/ingest.py index c37c15a..96674fe 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -3,7 +3,7 @@ from pathlib import Path import sys -from tiled.catalog.register import register +from tiled.catalog.register import identity, register from tiled.catalog import from_uri import tiled.config @@ -12,25 +12,55 @@ logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) -async def process_file(file_path: str, tiled_tree_path: str = "/"): - - config = tiled.config.parse_configs("/deploy/config") +async def process_file( + file_path: str, + tiled_config_tree_path: str = "/", + config_path: str = "/deploy/config", + path_prefix: str = "/" +): + config = tiled.config.parse_configs(config_path) # find the tree in configuration that matches the provided tiled_tree_path matching_tree = next( - (tree for tree in config["trees"] if tree["path"] == tiled_tree_path), None + (tree for tree in config["trees"] if tree["path"] == tiled_config_tree_path), None ) - assert matching_tree, f"No tiled tree configured for tree path {tiled_tree_path}" + assert matching_tree, f"No tiled tree configured for tree path {tiled_config_tree_path}" assert ( matching_tree["tree"] == "catalog" - ), f"Matching tiled tree {tiled_tree_path} is not a catalog" + ), f"Matching tiled tree {tiled_config_tree_path} is not a catalog" catalog_adapter = from_uri( matching_tree["args"]["uri"], readable_storage=matching_tree["args"]["readable_storage"], - adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype"), + adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype") ) - response = await register(catalog=catalog_adapter, path=Path(file_path)) - print(response) + await register( + catalog=catalog_adapter, + key_from_filename=identity, + path=file_path, + prefix=path_prefix) + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "outside_container": + # if we're debugging this outside of a container, we might want our + # own settings + import dotenv + dotenv.load_dotenv() + asyncio.run( + process_file( + "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257", + config_path="../mlex_tomo_framework/tiled/deploy/config", + path_prefix="/tiled_storage/beamlines/8.3.2/recons/" + ) + ) + else: + from pprint import pprint + import os + pprint(os.environ) + asyncio.run( + process_file( + "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257", + path_prefix="/tiled_storage/beamlines/8.3.2/recons/" + ) + ) -asyncio.run(process_file("/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257/")) -# asyncio.run(process_file("/tiled_storage/test1")) diff --git a/src/rabbitmq/consumer.py b/src/rabbitmq/consumer.py index b776307..e2e4b69 100644 --- a/src/rabbitmq/consumer.py +++ b/src/rabbitmq/consumer.py @@ -15,7 +15,9 @@ def process_message(ch, method, properties, body): # Acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag) new_file_path = message.get(NEW_FILE_PATH_KEY) - assert new_file_path, f"Message received from rabbitMQ does not contain {NEW_FILE_PATH_KEY}" + assert ( + new_file_path + ), f"Message received from rabbitMQ does not contain {NEW_FILE_PATH_KEY}" asyncio.run(process_file(new_file_path)) diff --git a/src/rabbitmq/producer.py b/src/rabbitmq/producer.py index 1f9c200..717da2e 100644 --- a/src/rabbitmq/producer.py +++ b/src/rabbitmq/producer.py @@ -11,6 +11,7 @@ logging.basicConfig(level=logging.INFO) + def send_message(new_file: str): json_message = json.dumps({NEW_FILE_PATH_KEY: new_file}) From ad30a437eb1a6890a397f94c1da5517c7aae70b2 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sun, 11 Feb 2024 11:08:35 -0800 Subject: [PATCH 6/8] don't overwrite...and add some comments --- src/ingest.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index 96674fe..c735622 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -1,6 +1,5 @@ import asyncio import logging -from pathlib import Path import sys from tiled.catalog.register import identity, register @@ -18,8 +17,27 @@ async def process_file( config_path: str = "/deploy/config", path_prefix: str = "/" ): + """ + Process a file that already exists and register it with tiled as a catalog. + We looks for a match in the tiled config file based on tiled_config_tree_path. This will be + the tree that we import to. Should work with folders of TIFF sequence as well as single filed like + hdf5 or datasets like zarr. But honestly, on tiff sequence is tested. + + Args: + file_path (str): The path of the file to be processed. + tiled_config_tree_path (str, optional): The path of the tiled tree configuration. Defaults to "/". + config_path (str, optional): The path of the configuration file. Defaults to "/deploy/config". + path_prefix (str, optional): The prefix to be added to the registered path. Defaults to "/". + + Raises: + AssertionError: If no tiled tree is configured for the provided tree path. + AssertionError: If the matching tiled tree is not a catalog. + + Returns: + None + """ config = tiled.config.parse_configs(config_path) - # find the tree in configuration that matches the provided tiled_tree_path + # find the tree in tiled configuration that matches the provided tiled_tree_path matching_tree = next( (tree for tree in config["trees"] if tree["path"] == tiled_config_tree_path), None ) @@ -28,16 +46,20 @@ async def process_file( matching_tree["tree"] == "catalog" ), f"Matching tiled tree {tiled_config_tree_path} is not a catalog" + # using thre tree in the configuration, generate a catalog(adapter) catalog_adapter = from_uri( matching_tree["args"]["uri"], readable_storage=matching_tree["args"]["readable_storage"], adapters_by_mimetype=matching_tree["args"].get("adapters_by_mimetype") ) + + # Register with tiled. This writes entries into the database for all of the nodes down to the data node await register( catalog=catalog_adapter, key_from_filename=identity, path=file_path, - prefix=path_prefix) + prefix=path_prefix, + overwrite=False) if __name__ == "__main__": @@ -48,7 +70,7 @@ async def process_file( dotenv.load_dotenv() asyncio.run( process_file( - "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257", + "../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313", config_path="../mlex_tomo_framework/tiled/deploy/config", path_prefix="/tiled_storage/beamlines/8.3.2/recons/" ) From 9974967f19f84c1c7af6f073c04fe5dc923cff4c Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sun, 11 Feb 2024 11:08:46 -0800 Subject: [PATCH 7/8] add gh flows --- .github/workflows/publish_container.yml | 50 +++++++++++++++++++++++++ .github/workflows/python_app.yml | 35 +++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 .github/workflows/publish_container.yml create mode 100644 .github/workflows/python_app.yml diff --git a/.github/workflows/publish_container.yml b/.github/workflows/publish_container.yml new file mode 100644 index 0000000..8f7f535 --- /dev/null +++ b/.github/workflows/publish_container.yml @@ -0,0 +1,50 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Create and publish image + +on: + push: + branches: ['main'] + tags: ['v*'] + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build-and-push-image: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Log in to the Container registry + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + - name: Build and push Docker image + uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc + with: + context: . + file: Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file diff --git a/.github/workflows/python_app.yml b/.github/workflows/python_app.yml new file mode 100644 index 0000000..41ad795 --- /dev/null +++ b/.github/workflows/python_app.yml @@ -0,0 +1,35 @@ +name: mlex ingest tiled + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.11 + cache: 'pip' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest + pip install . + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pytest \ No newline at end of file From 8bfe6df95db99e3021e73daeef33f3142892df37 Mon Sep 17 00:00:00 2001 From: Dylan McReynolds Date: Sun, 11 Feb 2024 11:24:29 -0800 Subject: [PATCH 8/8] Fixed prefex --- src/ingest.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index c735622..0069e72 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -72,7 +72,7 @@ async def process_file( process_file( "../mlex_tomo_framework/data/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313", config_path="../mlex_tomo_framework/tiled/deploy/config", - path_prefix="/tiled_storage/beamlines/8.3.2/recons/" + path_prefix="/beamlines/8.3.2/recons/" ) ) else: @@ -81,8 +81,9 @@ async def process_file( pprint(os.environ) asyncio.run( process_file( - "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257", - path_prefix="/tiled_storage/beamlines/8.3.2/recons/" + # "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120550_test_no_xrays_n257", + "/tiled_storage/beamlines/8.3.2/recons/rec20240207_120829_test_no_xrays_n1313", + path_prefix="/beamlines/8.3.2/recons/" ) )