Skip to content

Commit

Permalink
Parametrize number of entries to read
Browse files Browse the repository at this point in the history
  • Loading branch information
ferdonline committed Oct 23, 2024
1 parent fbabe98 commit 0ce6c18
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions tools/rebalance-corenrn-data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def distribute_dat_to_bucket(dat_entry, buckets, bucket_sizes, base_dir="."):
heapq.heappush(bucket_sizes, (new_size, smallest_bucket_index))


def redistribute_files_dat(files_dat_file, n_buckets):
def redistribute_files_dat(files_dat_file, n_buckets, max_entries=None):
"""
Read and process each entry from the dat file and distribute them into buckets.
"""
Expand All @@ -32,8 +32,13 @@ def redistribute_files_dat(files_dat_file, n_buckets):

logging.debug("Reading distribution file: %s", files_dat_file)
with open(files_dat_file, 'r') as file:
# read header
metadata["version"] = file.readline().strip()
metadata["n_files"] = file.readline()
n_entries = file.readline()

metadata["n_files"] = max_entries or n_entries

# read all dat entries
dat_entries = file.readlines()

if (n_files := int(metadata["n_files"])) < len(dat_entries):
Expand Down Expand Up @@ -66,6 +71,7 @@ def write_dat_file(buckets, infos: dict):
"""
Output the result after processing all directories
"""
logging.info("Writing out data from %d buckets", len(buckets))

# CoreNeuron does RoundRobin - we need to transpose the entries
zipped_entries = itertools.zip_longest(*buckets)
Expand Down Expand Up @@ -106,6 +112,13 @@ def main():
type=int,
help="Optimize the distribution for given number of ranks"
)
parser.add_argument(
'--max-entries',
type=int,
default=None,
required=False,
help="Consider only the first N entries of the input file"
)
# Optional argument for verbose output
parser.add_argument(
'-v', '--verbose',
Expand All @@ -125,7 +138,7 @@ def main():
logging.info(f"Reading from input file: {args.input_file}")

# Do the redistribution
buckets, infos = redistribute_files_dat(args.input_file, args.n_ranks)
buckets, infos = redistribute_files_dat(args.input_file, args.n_ranks, args.max_entries)

# Create a new files.dat according to the new buckets
write_dat_file(buckets, infos)
Expand Down

0 comments on commit 0ce6c18

Please sign in to comment.