Skip to content

Commit

Permalink
wip: ocdbt conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Feb 6, 2024
1 parent 7062dd6 commit c6a311c
Showing 1 changed file with 93 additions and 26 deletions.
119 changes: 93 additions & 26 deletions pychunkedgraph/ingest/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@
Ingest / create chunkedgraph with workers.
"""

from typing import Sequence, Tuple
import logging
import os
import time
from typing import Any, Iterable, Sequence, Tuple
from os import environ
from time import sleep


import numpy as np
import tensorstore as ts
from rq import Queue as RQueue


from .utils import chunk_id_str
from .manager import IngestionManager
Expand All @@ -13,6 +22,8 @@
from .create.atomic_layer import add_atomic_edges
from .create.abstract_layers import add_layer
from ..graph.meta import ChunkedGraphMeta
from ..graph.edges import Edges, put_edges
from ..graph.utils.basetypes import NODE_ID
from ..graph.chunks.hierarchy import get_children_chunk_coords
from ..utils.redis import keys as r_keys
from ..utils.redis import get_redis_connection
Expand Down Expand Up @@ -127,27 +138,14 @@ def randomize_grid_points(X: int, Y: int, Z: int) -> Tuple[int, int, int]:
yield np.unravel_index(index, (X, Y, Z))


def enqueue_atomic_tasks(imanager: IngestionManager):
from os import environ
from time import sleep
from rq import Queue as RQueue

chunk_coords = _get_test_chunks(imanager.cg.meta)
chunk_count = len(chunk_coords)
if not imanager.config.TEST_RUN:
atomic_chunk_bounds = imanager.cg_meta.layer_chunk_bounds[2]
chunk_coords = randomize_grid_points(*atomic_chunk_bounds)
chunk_count = imanager.cg_meta.layer_chunk_counts[0]

print(f"total chunk count: {chunk_count}, queuing...")
batch_size = int(environ.get("L2JOB_BATCH_SIZE", 1000))

def _queue_tasks(imanager: IngestionManager, chunk_coords: Iterable):
job_datas = []
batch_size = int(environ.get("L2JOB_BATCH_SIZE", 1000))
for chunk_coord in chunk_coords:
q = imanager.get_task_queue(imanager.config.CLUSTER.ATOMIC_Q_NAME)
# buffer for optimal use of redis memory
if len(q) > imanager.config.CLUSTER.ATOMIC_Q_LIMIT:
print(f"Sleeping {imanager.config.CLUSTER.ATOMIC_Q_INTERVAL}s...")
logging.info(f"Sleeping {imanager.config.CLUSTER.ATOMIC_Q_INTERVAL}s...")
sleep(imanager.config.CLUSTER.ATOMIC_Q_INTERVAL)

x, y, z = chunk_coord
Expand All @@ -159,7 +157,7 @@ def enqueue_atomic_tasks(imanager: IngestionManager):
RQueue.prepare_data(
_create_atomic_chunk,
args=(chunk_coord,),
timeout=environ.get("L2JOB_TIMEOUT", "3m"),
timeout=environ.get("L2JOB_TIMEOUT", "5m"),
result_ttl=0,
job_id=chunk_id_str(2, chunk_coord),
)
Expand All @@ -170,20 +168,89 @@ def enqueue_atomic_tasks(imanager: IngestionManager):
q.enqueue_many(job_datas)


def enqueue_atomic_tasks(imanager: IngestionManager):
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

chunk_coords = _get_test_chunks(imanager.cg.meta)
chunk_count = len(chunk_coords)
if not imanager.config.TEST_RUN:
atomic_chunk_bounds = imanager.cg_meta.layer_chunk_bounds[2]
chunk_coords = randomize_grid_points(*atomic_chunk_bounds)
chunk_count = imanager.cg_meta.layer_chunk_counts[0]

server = ts.ocdbt.DistributedCoordinatorServer()
_ = ts.KvStore.open(
{
"driver": "ocdbt",
"base": f"{imanager.cg.meta.data_source.EDGES}/ocdbt",
"coordinator": {"address": f"localhost:{server.port}"},
}
).result()

imanager.redis.set("OCDBT_COORDINATOR_PORT", str(server.port))
ocdbt_host = "localhost"
try:
ocdbt_host = os.environ["MY_POD_IP"]
imanager.redis.set("OCDBT_COORDINATOR_HOST", os.environ["MY_POD_IP"])
except KeyError:
imanager.redis.set("OCDBT_COORDINATOR_HOST", ocdbt_host)

logging.info(f"OCDBT Coordinator address {ocdbt_host}:{server.port}")
logging.info(f"Total chunk count: {chunk_count}, queuing...")
_queue_tasks(imanager, chunk_coords)
logging.info("All tasks queued. Keep this alive for ocdbt coordinator server.")
while True:
time.sleep(60)


def _create_atomic_chunk(coords: Sequence[int]):
"""Creates single atomic chunk"""
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
coords = np.array(list(coords), dtype=int)
chunk_edges_all, mapping = get_atomic_chunk_data(imanager, coords)
chunk_edges_active, isolated_ids = get_active_edges(chunk_edges_all, mapping)
add_atomic_edges(imanager.cg, coords, chunk_edges_active, isolated=isolated_ids)
if imanager.config.TEST_RUN:
# print for debugging
for k, v in chunk_edges_all.items():
print(k, len(v))
for k, v in chunk_edges_active.items():
print(f"active_{k}", len(v))

node_ids1 = []
node_ids2 = []
affinities = []
areas = []
for edges in chunk_edges_all.values():
node_ids1.extend(edges.node_ids1)
node_ids2.extend(edges.node_ids2)
affinities.extend(edges.affinities)
areas.extend(edges.areas)

edges = Edges(node_ids1, node_ids2, affinities=affinities, areas=areas)
nodes = np.concatenate(
[edges.node_ids1, edges.node_ids2, np.fromiter(mapping.keys(), dtype=NODE_ID)]
)
nodes = np.unique(nodes)

chunk_id = imanager.cg.get_chunk_id(layer=1, x=coords[0], y=coords[1], z=coords[2])
chunk_ids = imanager.cg.get_chunk_ids_from_node_ids(nodes)

host = imanager.redis.get("OCDBT_COORDINATOR_HOST").decode()
port = imanager.redis.get("OCDBT_COORDINATOR_PORT").decode()
os.environ["OCDBT_COORDINATOR_HOST"] = host
os.environ["OCDBT_COORDINATOR_PORT"] = port
logging.info(f"OCDBT Coordinator address {host}:{port}")

put_edges(
f"{imanager.cg.meta.data_source.EDGES}/ocdbt",
nodes[chunk_ids == chunk_id],
edges,
)

# chunk_edges_active, isolated_ids = get_active_edges(chunk_edges_all, mapping)
# add_atomic_edges(imanager.cg, coords, chunk_edges_active, isolated=isolated_ids)
# if imanager.config.TEST_RUN:
# # print for debugging
# for k, v in chunk_edges_all.items():
# print(k, len(v))
# for k, v in chunk_edges_active.items():
# print(f"active_{k}", len(v))
_post_task_completion(imanager, 2, coords)


Expand Down

0 comments on commit c6a311c

Please sign in to comment.