-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CoreNeuron Rebalancer #206
Open
ferdonline
wants to merge
9
commits into
main
Choose a base branch
from
leite/corenrn-rebalancer
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
be0f36b
Adding corenrn files.dat rebalancer
ferdonline fbabe98
Transpose to match CoreNeuron RoundRobin reading
ferdonline 0ce6c18
Parametrize number of entries to read
ferdonline 75edaaf
Make executable script and add header
ferdonline d3e199c
Parametrize output file
ferdonline 00246c4
UI details
ferdonline 8bf789f
Better distribution using knapsack. New option: --histogram
ferdonline e8bdb88
Display larger ranks. Fix bucket display
ferdonline 9d2431e
Fill gaps with -1 so that the rank doesnt change when loading RR
ferdonline File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
#!/bin/env python3 | ||
|
||
""" | ||
A post-processing script to redistribute CoreNeuron input files | ||
more evenly across ranks based on their filesystem size. | ||
|
||
Blue Brain Project - EPFL, 2024 | ||
|
||
""" | ||
|
||
import argparse | ||
import heapq | ||
import itertools | ||
import logging | ||
import math | ||
import os | ||
import sys | ||
|
||
# Numpy may be required (histogram) | ||
numpy = None | ||
|
||
|
||
def distribute_dat_to_bucket(dat_entry, size, buckets, bucket_sizes, base_dir="."): | ||
""" | ||
Distribute a single file into the bucket with the least total size. | ||
""" | ||
# Pop the bucket with the smallest size | ||
smallest_size, smallest_bucket_index = heapq.heappop(bucket_sizes) | ||
# Assign the file to this bucket | ||
buckets[smallest_bucket_index].append(dat_entry + "\n") # add newline for speedy write | ||
# Update the bucket size in the heap | ||
new_size = smallest_size + size | ||
heapq.heappush(bucket_sizes, (new_size, smallest_bucket_index)) | ||
|
||
|
||
def redistribute_files_dat(files_dat_file, n_buckets, max_entries=None, show_stats=False): | ||
""" | ||
Read and process each entry from the dat file and distribute them into buckets. | ||
""" | ||
base_dir = os.path.dirname(files_dat_file) | ||
metadata = {} | ||
|
||
logging.debug("Reading distribution file: %s", files_dat_file) | ||
with open(files_dat_file, 'r') as file: | ||
# read header | ||
metadata["version"] = file.readline().strip() | ||
n_entries = file.readline() | ||
|
||
metadata["n_files"] = max_entries or n_entries | ||
|
||
# read all dat entries | ||
dat_entries = file.readlines() | ||
|
||
if (n_files := int(metadata["n_files"])) < len(dat_entries): | ||
logging.warning("files.dat: processing reduced number of entries: %d", n_files) | ||
dat_entries = dat_entries[:n_files] | ||
|
||
logging.info("Distributing files into %d buckets...", n_buckets) | ||
|
||
if len(dat_entries) < n_buckets: | ||
raise RuntimeError("Too little data for selected number of ranks. Specify less") | ||
|
||
# Initialize empty buckets | ||
buckets = [[] for _ in range(n_buckets)] | ||
|
||
# Create a heap to keep track of bucket sizes. Each entry is (bucket_size, bucket_index) | ||
bucket_heap = [(0, i) for i in range(n_buckets)] | ||
heapq.heapify(bucket_heap) # Turn the list into a heap | ||
|
||
dat_entries = [entry.strip() for entry in dat_entries] | ||
entry_sizes = [(entry, get_entry_size(base_dir, entry)) for entry in with_progress(dat_entries)] | ||
entry_sizes = sorted(entry_sizes, key=lambda e: e[1], reverse=True) | ||
|
||
for dat_entry, size in entry_sizes: | ||
try: | ||
distribute_dat_to_bucket(dat_entry, size, buckets, bucket_heap, base_dir) | ||
except Exception as e: | ||
raise RuntimeError(f"Error processing dat entry {dat_entry}") from e | ||
|
||
if show_stats: | ||
logging.info("Top 10 rank accumulated sizes") | ||
for size, rank_i in heapq.nlargest(10, bucket_heap): | ||
print(f" Rank {rank_i}: {size/(1024*1024):.1f} MiB") | ||
|
||
rank_sizes = [bucket[0] for bucket in bucket_heap] | ||
show_histogram(rank_sizes) | ||
|
||
return buckets, metadata | ||
|
||
|
||
def write_dat_file(buckets, infos: dict, output_file="rebalanced-files.dat"): | ||
""" | ||
Output the result after processing all directories | ||
""" | ||
logging.info("Writing out data from %d buckets to file: %s", len(buckets), output_file) | ||
|
||
# CoreNeuron does RoundRobin - we need to transpose the entries | ||
# When a sequence finishes use "-1" (to keep in sync) | ||
zipped_entries = itertools.zip_longest(*buckets, fillvalue="-1\n") | ||
|
||
with open(output_file, "w") as out: | ||
print(infos["version"], file=out) | ||
print(infos["n_files"], file=out) | ||
|
||
for entries in zipped_entries: | ||
for entry in entries: | ||
out.write(entry) | ||
|
||
|
||
def get_entry_size(base_dir, dat_entry): | ||
"""Obtain the file size of a dat entry""" | ||
dat_file = f"{dat_entry}_2.dat" | ||
file_path = os.path.join(base_dir, dat_file) | ||
return os.path.getsize(file_path) | ||
|
||
|
||
def with_progress(elements): | ||
"""A quick and easy generator for displaying progress while iterating""" | ||
total_elems = len(elements) | ||
report_every = math.ceil(total_elems / 50) | ||
logging.info(f"Processing {total_elems} entries", ) | ||
for i, elem in enumerate(elements): | ||
if i % report_every == 0: | ||
print(f"{i:10} [{i*100/total_elems:3.0f}%]", file=sys.stderr) | ||
yield elem | ||
|
||
|
||
def show_histogram(rank_buckets, n_bins=50): | ||
"""A simple histogram CLI visualizer""" | ||
logging.info("Histogram of the ranks sizes") | ||
freq, bins = numpy.histogram(rank_buckets, bins=n_bins) | ||
bin_start = bins[0] | ||
for count, bin_end in zip(freq, bins[1:]): | ||
if count: | ||
print(f" [{bin_start/(1024*1024):5.0f} - {bin_end/(1024*1024):5.0f}]: {count:0d}") | ||
bin_start = bin_end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a tool to be used in the future and not only a script that will be discarded soon maybe makes sense to have a class?
|
||
def main(): | ||
# Step 1: Set up argparse for the CLI | ||
parser = argparse.ArgumentParser( | ||
description="Redistribute CoreNeuron dat files, optimizing for a given number of ranks" | ||
) | ||
parser.add_argument( | ||
'input_file', | ||
type=str, | ||
help="Path to the CoreNeuron input file, typically files.dat" | ||
) | ||
parser.add_argument( | ||
'n_ranks', | ||
type=int, | ||
help="Number of target ranks" | ||
) | ||
parser.add_argument( | ||
'--max-entries', | ||
type=int, | ||
default=None, | ||
required=False, | ||
help="Consider only the first N entries of the input file" | ||
) | ||
parser.add_argument( | ||
'--output-file', | ||
type=str, | ||
default="rebalanced-files.dat", | ||
required=False, | ||
help="The rebalanced output file path" | ||
) | ||
# Optional argument for verbose output | ||
parser.add_argument( | ||
'-v', '--verbose', | ||
action='store_true', | ||
help="Enable verbose output for debugging." | ||
) | ||
parser.add_argument( | ||
'--histogram', | ||
action='store_true', | ||
help="Additionally display the histogram of the ranks accumulated sizes" | ||
) | ||
|
||
args = parser.parse_args() | ||
|
||
logging_level = logging.DEBUG if args.verbose else logging.INFO | ||
logging.basicConfig(level=logging_level, format="%(levelname)s :: %(message)s") | ||
|
||
if args.histogram: | ||
global numpy | ||
import numpy | ||
|
||
if not os.path.isfile(args.input_file): | ||
logging.error("Input file could not be found!") | ||
return 1 | ||
else: | ||
logging.info(f"Reading from input file: {args.input_file}") | ||
|
||
# Do the redistribution | ||
buckets, infos = redistribute_files_dat( | ||
args.input_file, args.n_ranks, args.max_entries, args.histogram | ||
) | ||
|
||
# Create a new files.dat according to the new buckets | ||
write_dat_file(buckets, infos, args.output_file) | ||
|
||
logging.info("DONE") | ||
|
||
|
||
if __name__ == "__main__": | ||
sys.exit(main()) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it makes sense to have some constants (for example):
Or the names you think makes more sense