Skip to content

Commit

Permalink
feat(ingest): postprocess job handling
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Aug 12, 2023
1 parent 3657e88 commit 5da8a82
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 50 deletions.
25 changes: 22 additions & 3 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=invalid-name, missing-function-docstring, import-outside-toplevel

"""
cli for running ingest
"""
Expand All @@ -6,6 +8,7 @@
import yaml
from flask.cli import AppGroup

from .cluster import enqueue_atomic_tasks
from .manager import IngestionManager
from .utils import bootstrap
from ..graph.chunkedgraph import ChunkedGraph
Expand Down Expand Up @@ -39,8 +42,6 @@ def ingest_graph(
Main ingest command.
Takes ingest config from a yaml file and queues atomic tasks.
"""
from .cluster import enqueue_atomic_tasks

with open(dataset, "r") as stream:
config = yaml.safe_load(stream)

Expand All @@ -56,6 +57,16 @@ def ingest_graph(
enqueue_atomic_tasks(IngestionManager(ingest_config, meta))


@ingest_cli.command("postprocess")
def postprocess():
"""
Run postprocessing step on level 2 chunks.
"""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
enqueue_atomic_tasks(imanager, postprocess=True)


@ingest_cli.command("imanager")
@click.argument("graph_id", type=str)
@click.argument("dataset", type=click.Path(exists=True))
Expand Down Expand Up @@ -115,7 +126,15 @@ def ingest_status():
"""Print ingest status to console by layer."""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
layers = range(2, imanager.cg_meta.layer_count + 1)

layer = 2
completed = redis.scard(f"{layer}c")
print(f"{layer}\t: {completed} / {imanager.cg_meta.layer_count}")

completed = redis.scard(f"pp{layer}c")
print(f"{layer}\t: {completed} / {imanager.cg_meta.layer_count} [postprocess]")

layers = range(3, imanager.cg_meta.layer_count + 1)
for layer, layer_count in zip(layers, imanager.cg_meta.layer_chunk_counts):
completed = redis.scard(f"{layer}c")
print(f"{layer}\t: {completed} / {layer_count}")
Expand Down
79 changes: 32 additions & 47 deletions pychunkedgraph/ingest/cluster.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=invalid-name, missing-function-docstring, import-outside-toplevel

"""
Ingest / create chunkedgraph with workers.
"""
Expand All @@ -11,51 +13,24 @@
from .common import get_atomic_chunk_data
from .ran_agglomeration import get_active_edges
from .create.atomic_layer import add_atomic_edges
from .create.atomic_layer import postprocess_atomic_chunk
from .create.abstract_layers import add_layer
from ..graph.meta import ChunkedGraphMeta
from ..graph.chunks.hierarchy import get_children_chunk_coords
from ..utils.redis import keys as r_keys
from ..utils.redis import get_redis_connection


def _post_task_completion(imanager: IngestionManager, layer: int, coords: np.ndarray):
from os import environ

def _post_task_completion(
imanager: IngestionManager,
layer: int,
coords: np.ndarray,
postprocess: bool = False,
):
chunk_str = "_".join(map(str, coords))
# mark chunk as completed - "c"
imanager.redis.sadd(f"{layer}c", chunk_str)

if environ.get("DO_NOT_AUTOQUEUE_PARENT_CHUNKS", None) is not None:
return

parent_layer = layer + 1
if parent_layer > imanager.cg_meta.layer_count:
return

parent_coords = np.array(coords, int) // imanager.cg_meta.graph_config.FANOUT
parent_id_str = chunk_id_str(parent_layer, parent_coords)
imanager.redis.sadd(parent_id_str, chunk_str)

parent_chunk_str = "_".join(map(str, parent_coords))
if not imanager.redis.hget(parent_layer, parent_chunk_str):
# cache children chunk count
# checked by tracker worker to enqueue parent chunk
children_count = len(
get_children_chunk_coords(imanager.cg_meta, parent_layer, parent_coords)
)
imanager.redis.hset(parent_layer, parent_chunk_str, children_count)

tracker_queue = imanager.get_task_queue(f"t{layer}")
tracker_queue.enqueue(
enqueue_parent_task,
job_id=f"t{layer}_{chunk_str}",
job_timeout=f"30s",
result_ttl=0,
args=(
parent_layer,
parent_coords,
),
)
pprocess = "_pprocess" if postprocess else ""
imanager.redis.sadd(f"{layer}c{pprocess}", chunk_str)


def enqueue_parent_task(
Expand Down Expand Up @@ -127,7 +102,7 @@ def randomize_grid_points(X: int, Y: int, Z: int) -> Tuple[int, int, int]:
yield np.unravel_index(index, (X, Y, Z))


def enqueue_atomic_tasks(imanager: IngestionManager):
def enqueue_atomic_tasks(imanager: IngestionManager, postprocess: bool = False):
from os import environ
from time import sleep
from rq import Queue as RQueue
Expand All @@ -138,27 +113,32 @@ def enqueue_atomic_tasks(imanager: IngestionManager):
atomic_chunk_bounds = imanager.cg_meta.layer_chunk_bounds[2]
chunk_coords = randomize_grid_points(*atomic_chunk_bounds)
chunk_count = imanager.cg_meta.layer_chunk_counts[0]

print(f"total chunk count: {chunk_count}, queuing...")
batch_size = int(environ.get("L2JOB_BATCH_SIZE", 1000))

pprocess = ""
if postprocess:
pprocess = "_pprocess"
print("postprocessing l2 chunks")

queue_name = f"{imanager.config.CLUSTER.ATOMIC_Q_NAME}{pprocess}"
q = imanager.get_task_queue(queue_name)
job_datas = []
batch_size = int(environ.get("L2JOB_BATCH_SIZE", 1000))
for chunk_coord in chunk_coords:
q = imanager.get_task_queue(imanager.config.CLUSTER.ATOMIC_Q_NAME)
# buffer for optimal use of redis memory
if len(q) > imanager.config.CLUSTER.ATOMIC_Q_LIMIT:
print(f"Sleeping {imanager.config.CLUSTER.ATOMIC_Q_INTERVAL}s...")
sleep(imanager.config.CLUSTER.ATOMIC_Q_INTERVAL)

x, y, z = chunk_coord
chunk_str = f"{x}_{y}_{z}"
if imanager.redis.sismember("2c", chunk_str):
if imanager.redis.sismember(f"2c{pprocess}", chunk_str):
# already done, skip
continue
job_datas.append(
RQueue.prepare_data(
_create_atomic_chunk,
args=(chunk_coord,),
args=(chunk_coord, postprocess),
timeout=environ.get("L2JOB_TIMEOUT", "3m"),
result_ttl=0,
job_id=chunk_id_str(2, chunk_coord),
Expand All @@ -170,21 +150,26 @@ def enqueue_atomic_tasks(imanager: IngestionManager):
q.enqueue_many(job_datas)


def _create_atomic_chunk(coords: Sequence[int]):
def _create_atomic_chunk(coords: Sequence[int], postprocess: bool = False):
"""Creates single atomic chunk"""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
coords = np.array(list(coords), dtype=int)
chunk_edges_all, mapping = get_atomic_chunk_data(imanager, coords)
chunk_edges_active, isolated_ids = get_active_edges(chunk_edges_all, mapping)
add_atomic_edges(imanager.cg, coords, chunk_edges_active, isolated=isolated_ids)

if postprocess:
postprocess_atomic_chunk(imanager.cg, coords)
else:
chunk_edges_all, mapping = get_atomic_chunk_data(imanager, coords)
chunk_edges_active, isolated_ids = get_active_edges(chunk_edges_all, mapping)
add_atomic_edges(imanager.cg, coords, chunk_edges_active, isolated=isolated_ids)

if imanager.config.TEST_RUN:
# print for debugging
for k, v in chunk_edges_all.items():
print(k, len(v))
for k, v in chunk_edges_active.items():
print(f"active_{k}", len(v))
_post_task_completion(imanager, 2, coords)
_post_task_completion(imanager, 2, coords, postprocess=postprocess)


def _get_test_chunks(meta: ChunkedGraphMeta):
Expand Down

0 comments on commit 5da8a82

Please sign in to comment.