Skip to content

Commit

Permalink
fix(ingest): worker details in status
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Sep 9, 2023
1 parent cdae248 commit 4c011bf
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import yaml
from flask.cli import AppGroup
from rq import Queue
from rq import Worker
from rq.worker import WorkerStatus

from .cluster import create_atomic_chunk
from .cluster import create_parent_chunk
Expand Down Expand Up @@ -120,11 +122,14 @@ def ingest_status():
layer_counts = imanager.cg_meta.layer_chunk_counts

pipeline = redis.pipeline()
worker_busy = []
for layer in layers:
pipeline.scard(f"{layer}c")
queue = Queue(f"l{layer}", connection=redis)
pipeline.llen(queue.key)
pipeline.zcard(queue.failed_job_registry.key)
workers = Worker.all(queue=queue)
worker_busy.append(sum([w.get_state() == WorkerStatus.BUSY for w in workers]))

results = pipeline.execute()
completed = []
Expand All @@ -136,13 +141,16 @@ def ingest_status():
queued.append(result[1])
failed.append(result[2])

print("layer status:")
print(f"version: \t{imanager.cg.version}")
print(f"graph_id: \t{imanager.cg.graph_id}")
print(f"chunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}")
print("\nlayer status:")
for layer, done, count in zip(layers, completed, layer_counts):
print(f"{layer}\t: {done} / {count}")

print("\n\nqueue status:")
for layer, q, f in zip(layers, queued, failed):
print(f"l{layer}\t: queued\t {q}\t, failed\t {f}")
for layer, q, f, wb in zip(layers, queued, failed, worker_busy):
print(f"l{layer}\t: queued: {q}\t\t failed: {f}\t\t busy: {wb}")


@ingest_cli.command("chunk")
Expand Down

0 comments on commit 4c011bf

Please sign in to comment.