From 62dc50082f6e09d8785bb52a7db13309990d5e83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arturo=20Filast=C3=B2?= Date: Wed, 11 Sep 2024 14:37:24 -0400 Subject: [PATCH] Simplify CLI related code --- oonipipeline/src/oonipipeline/cli/commands.py | 83 ++++++---- .../src/oonipipeline/db/create_tables.py | 10 +- .../temporal/client_operations.py | 146 ------------------ .../src/oonipipeline/temporal/schedules.py | 2 +- 4 files changed, 59 insertions(+), 182 deletions(-) diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py index c81e1536..590ce0a1 100644 --- a/oonipipeline/src/oonipipeline/cli/commands.py +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -1,15 +1,19 @@ +import asyncio import logging from pathlib import Path -from typing import List, Optional +from typing import Coroutine, List, Optional from datetime import date, timedelta, datetime, timezone from typing import List, Optional from oonipipeline.temporal.client_operations import ( TemporalConfig, - run_backfill, - run_create_schedules, - run_status, - run_clear_schedules, + get_status, + temporal_connect, +) +from oonipipeline.temporal.schedules import ( + clear_all_schedules, + schedule_all, + schedule_backfill, ) from oonipipeline.temporal.workers import start_workers @@ -23,6 +27,14 @@ from ..netinfo import NetinfoDB from ..settings import config + +def run_async(main: Coroutine): + try: + asyncio.run(main) + except KeyboardInterrupt: + print("shutting down") + + def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: if s: return s.split(",") @@ -151,15 +163,19 @@ def backfill( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_backfill( - workflow_name=workflow_name, - temporal_config=temporal_config, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + return await schedule_backfill( + client=client, + probe_cc=probe_cc, + test_name=test_name, + start_at=start_at, + end_at=end_at, + workflow_name=workflow_name, + ) + + run_async(main()) @cli.command() @probe_cc_option @@ -178,13 +194,18 @@ def schedule( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_create_schedules( - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=config.clickhouse_url, - data_dir=config.data_dir, - temporal_config=temporal_config, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + + return await schedule_all( + client=client, + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=config.clickhouse_url, + data_dir=config.data_dir, + ) + + run_async(main()) @cli.command() @@ -204,11 +225,16 @@ def clear_schedules( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_clear_schedules( - probe_cc=probe_cc, - test_name=test_name, - temporal_config=temporal_config, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + + return await clear_all_schedules( + client=client, + probe_cc=probe_cc, + test_name=test_name, + ) + + run_async(main()) @cli.command() @@ -222,7 +248,12 @@ def status(): temporal_tls_client_cert_path=config.temporal_tls_client_cert_path, temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_status(temporal_config=temporal_config) + + run_async( + get_status( + temporal_config=temporal_config, + ) + ) @cli.command() diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index 1a42d86a..ee8efb20 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -176,15 +176,7 @@ def format_create_query( ] -def make_create_queries( - num_layers=1, - min_time=10, - max_time=500, - min_rows=10_0000, - max_rows=100_000, - min_bytes=10_000_000, - max_bytes=1_000_000_000, -): +def make_create_queries(): create_queries = [] for model in table_models: table_name = model.__table_name__ diff --git a/oonipipeline/src/oonipipeline/temporal/client_operations.py b/oonipipeline/src/oonipipeline/temporal/client_operations.py index 3a6589f1..f8194623 100644 --- a/oonipipeline/src/oonipipeline/temporal/client_operations.py +++ b/oonipipeline/src/oonipipeline/temporal/client_operations.py @@ -1,15 +1,7 @@ -import asyncio import logging from dataclasses import dataclass -from datetime import datetime from typing import List, Optional, Tuple -from oonipipeline.temporal.schedules import ( - ScheduleIdMap, - schedule_all, - schedule_backfill, - clear_schedules, -) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -112,64 +104,6 @@ async def temporal_connect( return client -async def execute_backfill( - probe_cc: List[str], - test_name: List[str], - start_at: datetime, - end_at: datetime, - workflow_name: str, - temporal_config: TemporalConfig, -): - log.info(f"creating all schedules") - - client = await temporal_connect(temporal_config=temporal_config) - - return await schedule_backfill( - client=client, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - workflow_name=workflow_name, - ) - - -async def create_schedules( - probe_cc: List[str], - test_name: List[str], - clickhouse_url: str, - data_dir: str, - temporal_config: TemporalConfig, -) -> ScheduleIdMap: - log.info(f"creating all schedules") - - client = await temporal_connect(temporal_config=temporal_config) - - return await schedule_all( - client=client, - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=clickhouse_url, - data_dir=data_dir, - ) - - -async def execute_clear_schedules( - probe_cc: List[str], - test_name: List[str], - temporal_config: TemporalConfig, -) -> List[str]: - log.info(f"rescheduling everything") - - client = await temporal_connect(temporal_config=temporal_config) - - return await clear_schedules( - client=client, - probe_cc=probe_cc, - test_name=test_name, - ) - - async def get_status( temporal_config: TemporalConfig, ) -> Tuple[List[WorkflowExecution], List[WorkflowExecution]]: @@ -205,83 +139,3 @@ async def get_status( print(f" execution_time={workflow.execution_time}") print(f" execution_time={workflow.execution_time}") return active_observation_workflows, active_observation_workflows - - -def run_backfill( - temporal_config: TemporalConfig, - probe_cc: List[str], - test_name: List[str], - workflow_name: str, - start_at: datetime, - end_at: datetime, -): - try: - asyncio.run( - execute_backfill( - temporal_config=temporal_config, - workflow_name=workflow_name, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def run_create_schedules( - probe_cc: List[str], - test_name: List[str], - clickhouse_url: str, - data_dir: str, - temporal_config: TemporalConfig, -): - try: - asyncio.run( - create_schedules( - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=clickhouse_url, - data_dir=data_dir, - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def run_clear_schedules( - probe_cc: List[str], - test_name: List[str], - temporal_config: TemporalConfig, -): - try: - asyncio.run( - execute_clear_schedules( - probe_cc=probe_cc, - test_name=test_name, - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def start_event_loop(async_task): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(async_task()) - - -def run_status( - temporal_config: TemporalConfig, -): - try: - asyncio.run( - get_status( - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") diff --git a/oonipipeline/src/oonipipeline/temporal/schedules.py b/oonipipeline/src/oonipipeline/temporal/schedules.py index abee47b2..0f6ff219 100644 --- a/oonipipeline/src/oonipipeline/temporal/schedules.py +++ b/oonipipeline/src/oonipipeline/temporal/schedules.py @@ -179,7 +179,7 @@ async def schedule_all( return schedule_id_map -async def clear_schedules( +async def clear_all_schedules( client: TemporalClient, probe_cc: List[str], test_name: List[str],