From 4c011bf8ac91844834bced4aa902e45e52b64e11 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Sat, 9 Sep 2023 19:00:17 +0000 Subject: [PATCH] fix(ingest): worker details in status --- pychunkedgraph/ingest/cli.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pychunkedgraph/ingest/cli.py b/pychunkedgraph/ingest/cli.py index cafd5f276..17c5d446d 100644 --- a/pychunkedgraph/ingest/cli.py +++ b/pychunkedgraph/ingest/cli.py @@ -8,6 +8,8 @@ import yaml from flask.cli import AppGroup from rq import Queue +from rq import Worker +from rq.worker import WorkerStatus from .cluster import create_atomic_chunk from .cluster import create_parent_chunk @@ -120,11 +122,14 @@ def ingest_status(): layer_counts = imanager.cg_meta.layer_chunk_counts pipeline = redis.pipeline() + worker_busy = [] for layer in layers: pipeline.scard(f"{layer}c") queue = Queue(f"l{layer}", connection=redis) pipeline.llen(queue.key) pipeline.zcard(queue.failed_job_registry.key) + workers = Worker.all(queue=queue) + worker_busy.append(sum([w.get_state() == WorkerStatus.BUSY for w in workers])) results = pipeline.execute() completed = [] @@ -136,13 +141,16 @@ def ingest_status(): queued.append(result[1]) failed.append(result[2]) - print("layer status:") + print(f"version: \t{imanager.cg.version}") + print(f"graph_id: \t{imanager.cg.graph_id}") + print(f"chunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}") + print("\nlayer status:") for layer, done, count in zip(layers, completed, layer_counts): print(f"{layer}\t: {done} / {count}") print("\n\nqueue status:") - for layer, q, f in zip(layers, queued, failed): - print(f"l{layer}\t: queued\t {q}\t, failed\t {f}") + for layer, q, f, wb in zip(layers, queued, failed, worker_busy): + print(f"l{layer}\t: queued: {q}\t\t failed: {f}\t\t busy: {wb}") @ingest_cli.command("chunk")