Skip to content

Commit

Permalink
Simplify CLI related code
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 11, 2024
1 parent 3e162cb commit 62dc500
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 182 deletions.
83 changes: 57 additions & 26 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import asyncio
import logging
from pathlib import Path
from typing import List, Optional
from typing import Coroutine, List, Optional
from datetime import date, timedelta, datetime, timezone
from typing import List, Optional

from oonipipeline.temporal.client_operations import (
TemporalConfig,
run_backfill,
run_create_schedules,
run_status,
run_clear_schedules,
get_status,
temporal_connect,
)
from oonipipeline.temporal.schedules import (
clear_all_schedules,
schedule_all,
schedule_backfill,
)
from oonipipeline.temporal.workers import start_workers

Expand All @@ -23,6 +27,14 @@
from ..netinfo import NetinfoDB
from ..settings import config


def run_async(main: Coroutine):
try:
asyncio.run(main)
except KeyboardInterrupt:
print("shutting down")


def _parse_csv(ctx, param, s: Optional[str]) -> List[str]:
if s:
return s.split(",")
Expand Down Expand Up @@ -151,15 +163,19 @@ def backfill(
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)

run_backfill(
workflow_name=workflow_name,
temporal_config=temporal_config,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
)
async def main():
client = await temporal_connect(temporal_config=temporal_config)

return await schedule_backfill(
client=client,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
workflow_name=workflow_name,
)

run_async(main())

@cli.command()
@probe_cc_option
Expand All @@ -178,13 +194,18 @@ def schedule(
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)

run_create_schedules(
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=config.clickhouse_url,
data_dir=config.data_dir,
temporal_config=temporal_config,
)
async def main():
client = await temporal_connect(temporal_config=temporal_config)

return await schedule_all(
client=client,
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=config.clickhouse_url,
data_dir=config.data_dir,
)

run_async(main())


@cli.command()
Expand All @@ -204,11 +225,16 @@ def clear_schedules(
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)

run_clear_schedules(
probe_cc=probe_cc,
test_name=test_name,
temporal_config=temporal_config,
)
async def main():
client = await temporal_connect(temporal_config=temporal_config)

return await clear_all_schedules(
client=client,
probe_cc=probe_cc,
test_name=test_name,
)

run_async(main())


@cli.command()
Expand All @@ -222,7 +248,12 @@ def status():
temporal_tls_client_cert_path=config.temporal_tls_client_cert_path,
temporal_tls_client_key_path=config.temporal_tls_client_key_path,
)
run_status(temporal_config=temporal_config)

run_async(
get_status(
temporal_config=temporal_config,
)
)


@cli.command()
Expand Down
10 changes: 1 addition & 9 deletions oonipipeline/src/oonipipeline/db/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,7 @@ def format_create_query(
]


def make_create_queries(
num_layers=1,
min_time=10,
max_time=500,
min_rows=10_0000,
max_rows=100_000,
min_bytes=10_000_000,
max_bytes=1_000_000_000,
):
def make_create_queries():
create_queries = []
for model in table_models:
table_name = model.__table_name__
Expand Down
146 changes: 0 additions & 146 deletions oonipipeline/src/oonipipeline/temporal/client_operations.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple

from oonipipeline.temporal.schedules import (
ScheduleIdMap,
schedule_all,
schedule_backfill,
clear_schedules,
)

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
Expand Down Expand Up @@ -112,64 +104,6 @@ async def temporal_connect(
return client


async def execute_backfill(
probe_cc: List[str],
test_name: List[str],
start_at: datetime,
end_at: datetime,
workflow_name: str,
temporal_config: TemporalConfig,
):
log.info(f"creating all schedules")

client = await temporal_connect(temporal_config=temporal_config)

return await schedule_backfill(
client=client,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
workflow_name=workflow_name,
)


async def create_schedules(
probe_cc: List[str],
test_name: List[str],
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
) -> ScheduleIdMap:
log.info(f"creating all schedules")

client = await temporal_connect(temporal_config=temporal_config)

return await schedule_all(
client=client,
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
)


async def execute_clear_schedules(
probe_cc: List[str],
test_name: List[str],
temporal_config: TemporalConfig,
) -> List[str]:
log.info(f"rescheduling everything")

client = await temporal_connect(temporal_config=temporal_config)

return await clear_schedules(
client=client,
probe_cc=probe_cc,
test_name=test_name,
)


async def get_status(
temporal_config: TemporalConfig,
) -> Tuple[List[WorkflowExecution], List[WorkflowExecution]]:
Expand Down Expand Up @@ -205,83 +139,3 @@ async def get_status(
print(f" execution_time={workflow.execution_time}")
print(f" execution_time={workflow.execution_time}")
return active_observation_workflows, active_observation_workflows


def run_backfill(
temporal_config: TemporalConfig,
probe_cc: List[str],
test_name: List[str],
workflow_name: str,
start_at: datetime,
end_at: datetime,
):
try:
asyncio.run(
execute_backfill(
temporal_config=temporal_config,
workflow_name=workflow_name,
probe_cc=probe_cc,
test_name=test_name,
start_at=start_at,
end_at=end_at,
)
)
except KeyboardInterrupt:
print("shutting down")


def run_create_schedules(
probe_cc: List[str],
test_name: List[str],
clickhouse_url: str,
data_dir: str,
temporal_config: TemporalConfig,
):
try:
asyncio.run(
create_schedules(
probe_cc=probe_cc,
test_name=test_name,
clickhouse_url=clickhouse_url,
data_dir=data_dir,
temporal_config=temporal_config,
)
)
except KeyboardInterrupt:
print("shutting down")


def run_clear_schedules(
probe_cc: List[str],
test_name: List[str],
temporal_config: TemporalConfig,
):
try:
asyncio.run(
execute_clear_schedules(
probe_cc=probe_cc,
test_name=test_name,
temporal_config=temporal_config,
)
)
except KeyboardInterrupt:
print("shutting down")


def start_event_loop(async_task):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(async_task())


def run_status(
temporal_config: TemporalConfig,
):
try:
asyncio.run(
get_status(
temporal_config=temporal_config,
)
)
except KeyboardInterrupt:
print("shutting down")
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/temporal/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ async def schedule_all(
return schedule_id_map


async def clear_schedules(
async def clear_all_schedules(
client: TemporalClient,
probe_cc: List[str],
test_name: List[str],
Expand Down

0 comments on commit 62dc500

Please sign in to comment.