Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BBPBGLIB-1102] Add load balance based on dry run estimate #111

Merged
merged 15 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,31 @@ part in the execution, is then multiplied by the number of ranks used in the exe
The final result is then printed to the user in a human readable format together with an estimate
of the number of nodes needed to run the simulation on the same machine used to run the dry run.

Dry Run Memory Load Balancing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The dry run mode also provides a memory load balancing feature. It helps balance the memory usage
of the ranks of the simulation, so that the user does not incur easily in out-of-memory errors.
At the moment the implementation is still WIP so the user can save the allocation data to a file
but cannot import to use it in a real simulation.

The workflow of the memory load balancing is as follows: for each cell in the circuit we have an
estimate of both the memory load of the cell itself based on their METype and the amount of synapses
that each METype has on average. With this information we can have a good estimate of the memory
load of each gid in the circuit.

We've opted for a greedy approach to distribute the gids in order to keep the implementation simple
and fast. The algorithm is as follows:

- Sort our ranks in a heap so that the emptiest rank is always at the top
- Assign gids in batches of 10 to the emptiest rank
- Rince and repeat until all gids are assigned

The user can specify the number of ranks to target using the `--num-target-ranks` flag in the CLI of neurodamus.
The default value is 40. The allocation dictionary, containing the assignment of gids to ranks per each population,
is then saved to the `allocation.pkl.gz` file in a pickled gzipped format. In the near future users will be able to
import this data in any following simulation in order to improve the memory balance.

Development
------------

Expand Down
2 changes: 2 additions & 0 deletions neurodamus/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def neurodamus(args=None):
--enable-shm=[ON, OFF] Enables the use of /dev/shm for coreneuron_input [default: ON]
--model-stats Show model stats in CoreNEURON simulations [default: False]
--dry-run Dry-run simulation to estimate memory usage [default: False]
--num-target-ranks=<number> Number of ranks to target for dry-run load balancing
[default: 40]
"""
options = docopt_sanitize(docopt(neurodamus.__doc__, args))
config_file = options.pop("ConfigFile")
Expand Down
7 changes: 5 additions & 2 deletions neurodamus/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,12 +744,13 @@ def _get_conn_stats(self, dst_target):
return {}

local_counter = Counter()
dst_pop_name = self._cell_manager.population_name

# NOTE:
# - Estimation (and extrapolation) is performed per metype since properties can vary
# - Consider only the cells for the current target

for metype, me_gids in self._dry_run_stats.metype_gids.items():
for metype, me_gids in self._dry_run_stats.metype_gids[dst_pop_name].items():
me_gids = set(me_gids).intersection(new_gids)
me_gids_count = len(me_gids)
if not me_gids_count:
Expand Down Expand Up @@ -787,8 +788,10 @@ def _get_conn_stats(self, dst_target):
# Extrapolation
logging.debug("Cells samples / total: %d / %s", sampled_gids_count, me_gids_count)
me_estimated_sum = sum(metype_estimate.values())
average_syns_per_cell = me_estimated_sum / me_gids_count
self._dry_run_stats.average_syns_per_cell[metype] = average_syns_per_cell
log_all(VERBOSE_LOGLEVEL, "%s: Average syns/cell: %.1f, Estimated total: %d ",
metype, me_estimated_sum / me_gids_count, me_estimated_sum)
metype, average_syns_per_cell, me_estimated_sum)
local_counter.update(metype_estimate)

return local_counter
Expand Down
3 changes: 3 additions & 0 deletions neurodamus/core/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CliOptions(ConfigT):
model_stats = False
simulator = None
dry_run = False
num_target_ranks = 40

# Restricted Functionality support, mostly for testing

Expand Down Expand Up @@ -235,6 +236,7 @@ class _SimConfig(object):
spike_location = "soma"
spike_threshold = -30
dry_run = False
num_target_ranks = 40

_validators = []
_requisitors = []
Expand Down Expand Up @@ -274,6 +276,7 @@ def init(cls, config_file, cli_options):
cls.modifications = compat.Map(cls._config_parser.parsedModifications or {})
cls.cli_options = CliOptions(**(cli_options or {}))
cls.dry_run = cls.cli_options.dry_run
cls.num_target_ranks = cls.cli_options.num_target_ranks
# change simulator by request before validator and init hoc config
if cls.cli_options.simulator:
cls._parsed_run["Simulator"] = cls.cli_options.simulator
Expand Down
2 changes: 1 addition & 1 deletion neurodamus/io/cell_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def load_base_info_dry_run():
# skip_metypes = set(dry_run_stats.metype_memory.keys())
metype_gids, counts = _retrieve_unique_metypes(node_pop, all_gids)
dry_run_stats.metype_counts += counts
dry_run_stats.metype_gids = metype_gids
dry_run_stats.metype_gids[node_population] = metype_gids
gid_metype_bundle = list(metype_gids.values())
gidvec = dry_run_distribution(gid_metype_bundle, stride, stride_offset, total_cells)

Expand Down
3 changes: 3 additions & 0 deletions neurodamus/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1867,6 +1867,9 @@ def run(self):
log_stage("============= DRY RUN (SKIP SIMULATION) =============")
self._dry_run_stats.display_total()
self._dry_run_stats.display_node_suggestions()
ranks = int(SimConfig.num_target_ranks)
st4rl3ss marked this conversation as resolved.
Show resolved Hide resolved
self._dry_run_stats.collect_all_mpi()
self._dry_run_stats.distribute_cells(ranks)
return
if not SimConfig.simulate_model:
self.sim_init()
Expand Down
153 changes: 153 additions & 0 deletions neurodamus/utils/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import json
import psutil
import multiprocessing
import heapq
import pickle
import gzip

from ..core import MPI, NeurodamusCore as Nd, run_only_rank0
from .compat import Vector
from collections import defaultdict

import numpy as np

Expand Down Expand Up @@ -160,6 +165,60 @@ def pretty_printing_memory_mb(memory_mb):
return "%.2lf PB" % (memory_mb / 1024 ** 3)


@run_only_rank0
def print_allocation_stats(rank_allocation, rank_memory):
"""
Print statistics of the memory allocation across ranks.
Args:
rank_allocation (dict): A dictionary where keys are rank IDs and
values are lists of cell IDs assigned to each rank.
rank_memory (dict): A dictionary where keys are rank IDs
and values are the total memory load on each rank.
"""
logging.debug("Rank allocation: {}".format(rank_allocation))
logging.debug("Total memory per rank: {}".format(rank_memory))
import statistics
for pop, rank_dict in rank_memory.items():
values = list(rank_dict.values())
logging.info("Population: {}".format(pop))
logging.info("Mean allocation per rank [KB]: {}".format(round(statistics.mean(values))))
try:
stdev = round(statistics.stdev(values))
except statistics.StatisticsError:
stdev = 0
logging.info("Stdev of allocation per rank [KB]: {}".format(stdev))


@run_only_rank0
def export_allocation_stats(rank_allocation, filename):
"""
Export allocation dictionary to serialized pickle file.
"""
compressed_data = gzip.compress(pickle.dumps(rank_allocation))
with open(filename, 'wb') as f:
f.write(compressed_data)


@run_only_rank0
def import_allocation_stats(filename):
st4rl3ss marked this conversation as resolved.
Show resolved Hide resolved
"""
Import allocation dictionary from serialized pickle file.
"""
with open(filename, 'rb') as f:
compressed_data = f.read()

return pickle.loads(gzip.decompress(compressed_data))


@run_only_rank0
def allocation_stats_exists(filename):
"""
Check if the allocation stats file exists.
"""
return os.path.exists(filename)


class SynapseMemoryUsage:
''' A small class that works as a lookup table
for the memory used by each type of synapse.
Expand All @@ -179,9 +238,12 @@ def get_memory_usage(cls, count, synapse_type):

class DryRunStats:
_MEMORY_USAGE_FILENAME = "cell_memory_usage.json"
_ALLOCATION_FILENAME = "allocation.pkl.gz"

def __init__(self) -> None:
self.metype_memory = {}
self.average_syns_per_cell = {}
self.metype_gids = {}
self.metype_counts = Counter()
self.synapse_counts = Counter()
_, _, self.base_memory, _ = get_task_level_mem_usage()
Expand Down Expand Up @@ -213,6 +275,8 @@ def collect_all_mpi(self):
# We combine memory dict via update(). That means if a previous circuit computed
# cells for the same METype (hopefully unlikely!) the last estimate prevails.
self.metype_memory = MPI.py_reduce(self.metype_memory, {}, lambda x, y: x.update(y))
self.average_syns_per_cell = MPI.py_reduce(self.average_syns_per_cell, {},
lambda x, y: x.update(y))
self.metype_counts = self.metype_counts # Cell counts is complete in every rank

@run_only_rank0
Expand Down Expand Up @@ -335,3 +399,92 @@ def display_node_suggestions(self):
f"{pretty_printing_memory_mb(node_total_memory)} on the current node.")
logging.info("Please remember that it is suggested to use the same class of nodes "
"for both the dryrun and the actual simulation.")

@run_only_rank0
def distribute_cells(self, num_ranks, batch_size=10) -> (dict, dict):
"""
Distributes cells across ranks based on their memory load.
This function uses a greedy algorithm to distribute cells across ranks such that
the total memory load is balanced. Cells with higher memory load are distributed first.
Args:
dry_run_stats (DryRunStats): A DryRunStats object.
num_ranks (int): The number of ranks.
Returns:
rank_allocation (dict): A dictionary where keys are rank IDs and
values are lists of cell IDs assigned to each rank.
rank_memory (dict): A dictionary where keys are rank IDs
and values are the total memory load on each rank.
"""
logging.debug("Distributing cells across %d ranks", num_ranks)

self.validate_inputs_distribute(num_ranks, batch_size)

# Multiply the average number of synapses per cell by 2.0
# This is done since the biggest memory load for a synapse is 2.0 kB and at this point in
# the code we have lost the information on whether they are excitatory or inhibitory
# so we just take the biggest value to be safe. (the difference between the two is minimal)
average_syns_mem_per_cell = {k: v * 2.0 for k, v in self.average_syns_per_cell.items()}

# Prepare a list of tuples (cell_id, memory_load)
# We sum the memory load of the cell type and the average number of synapses per cell
def generate_cells(metype_gids):
for cell_type, gids in metype_gids.items():
memory_usage = (self.metype_memory[cell_type] +
average_syns_mem_per_cell[cell_type])
for gid in gids:
yield gid, memory_usage

ranks = [(0, i) for i in range(num_ranks)] # (total_memory, rank_id)
heapq.heapify(ranks)
all_allocation = {}
all_memory = {}

def assign_cells_to_rank(rank_allocation, rank_memory, batch, batch_memory):
total_memory, rank_id = heapq.heappop(ranks)
logging.debug("Assigning batch to rank %d", rank_id)
rank_allocation[rank_id].extend(batch)
total_memory += batch_memory
rank_memory[rank_id] = total_memory
heapq.heappush(ranks, (total_memory, rank_id))

for pop, metype_gids in self.metype_gids.items():
logging.info("Distributing cells of population %s", pop)
rank_allocation = defaultdict(Vector)
rank_memory = {}
batch = []
batch_memory = 0

for cell_id, memory in generate_cells(metype_gids):
batch.append(cell_id)
batch_memory += memory
if len(batch) == batch_size:
st4rl3ss marked this conversation as resolved.
Show resolved Hide resolved
assign_cells_to_rank(rank_allocation, rank_memory, batch, batch_memory)
batch = []
batch_memory = 0

if batch:
assign_cells_to_rank(rank_allocation, rank_memory, batch, batch_memory)

all_allocation[pop] = rank_allocation
all_memory[pop] = rank_memory

print_allocation_stats(all_allocation, all_memory)
export_allocation_stats(all_allocation, self._ALLOCATION_FILENAME)

return all_allocation, rank_memory

def validate_inputs_distribute(self, num_ranks, batch_size):
assert isinstance(num_ranks, int), "num_ranks must be an integer"
assert num_ranks > 0, "num_ranks must be a positive integer"
assert isinstance(batch_size, int), "batch_size must be an integer"
assert batch_size > 0, "batch_size must be a positive integer"
set_metype_gids = set()
for values in self.metype_gids.values():
set_metype_gids.update(values.keys())
assert set_metype_gids == set(self.metype_memory.keys())
average_syns_keys = set(self.average_syns_per_cell.keys())
metype_memory_keys = set(self.metype_memory.keys())
assert average_syns_keys == metype_memory_keys
23 changes: 23 additions & 0 deletions tests/integration-e2e/test_dry_run_worflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
from neurodamus.utils.memory import import_allocation_stats, export_allocation_stats


def convert_to_standard_types(obj):
"""Converts an object containing defaultdicts of Vectors to standard Python types."""
result = {}
for node, vectors in obj.items():
result[node] = {key: list(vector) for key, vector in vectors.items()}
return result


def test_dry_run_workflow(USECASE3):
"""
Expand All @@ -23,3 +33,16 @@ def test_dry_run_workflow(USECASE3):
}
assert nd._dry_run_stats.metype_counts == expected_items
assert nd._dry_run_stats.suggest_nodes(0.3) > 0

# Test that the allocation works and can be saved and loaded
rank_allocation, _ = nd._dry_run_stats.distribute_cells(2)
export_allocation_stats(rank_allocation, USECASE3 / "allocation.pkl.gz")
rank_allocation = import_allocation_stats(USECASE3 / "allocation.pkl.gz")
rank_allocation_standard = convert_to_standard_types(rank_allocation)

expected_items = {
'NodeA': {0: [1, 2, 3]},
'NodeB': {1: [1, 2]}
}

assert rank_allocation_standard == expected_items