Skip to content

Commit

Permalink
[testing-on-gke] extract common parser code (#2489)
Browse files Browse the repository at this point in the history
* Add common file for fio/dlio parse_logs.py

* use parse_logs_common fio/dlio parse_logs.py

* avoid requiring unnecessary arguments

* update license header year

* Empty commit

Temp: reuse common string literal
Revert "Temp: reuse common string literal"

* Remove some code duplication

* address review comments
  • Loading branch information
gargnitingoogle authored Sep 19, 2024
1 parent a2c3a2a commit 781f497
Show file tree
Hide file tree
Showing 3 changed files with 408 additions and 311 deletions.
281 changes: 139 additions & 142 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
sys.path.append("../")
import dlio_workload
from utils.utils import get_memory, get_cpu, standard_timestamp, is_mash_installed
from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS

_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs"

Expand All @@ -49,83 +50,44 @@
}


def ensureDir(dirpath):
try:
os.makedirs(dirpath)
except FileExistsError:
pass
def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int:
"""Downloads instanceId-specific dlio outputs for each dlioWorkload locally.
Outputs in the bucket are in the following object naming format
(details in ./unet3d-loading-test/templates/dlio-tester.yaml).
gs://<bucket>/logs/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/per_epoch_stats.json
gs://<bucket>/logs/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/summary.json
gs://<bucket>/logs/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/gcsfuse-generic/gcsfuse_mount_options
These are downloaded locally as:
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/per_epoch_stats.json
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/summary.json
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/gcsfuse-generic/gcsfuse_mount_options
"""

def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
for dlioWorkload in dlioWorkloads:
print(f"Downloading DLIO logs from the bucket {dlioWorkload.bucket}...")
result = subprocess.run(
[
"gcloud",
"-q", # ignore prompts
"storage",
"cp",
"-r",
"--no-user-output-enabled", # do not print names of files being copied
f"gs://{dlioWorkload.bucket}/logs/{instanceId}",
_LOCAL_LOGS_LOCATION,
],
capture_output=False,
text=True,
srcObjects = f"gs://{dlioWorkload.bucket}/logs/{instanceId}"
print(f"Downloading DLIO logs from the {srcObjects} ...")
returncode, errorStr = download_gcs_objects(
srcObjects, _LOCAL_LOGS_LOCATION
)
if result.returncode < 0:
print(f"failed to fetch DLIO logs, error: {result.stderr}")
if returncode < 0:
print(f"Failed to download DLIO logs from {srcObjects}: {errorStr}")
return returncode
return 0


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="DLIO Unet3d test output parser",
description=(
"This program takes in a json workload configuration file and parses"
" it for valid DLIO workloads and the locations of their test outputs"
" on GCS. It downloads each such output object locally to"
" {_LOCAL_LOGS_LOCATION} and parses them for DLIO test runs, and then"
" dumps their output metrics into a CSV report file."
),
)
parser.add_argument(
"--workload-config",
help=(
"A json configuration file to define workloads that were run to"
" generate the outputs that should be parsed."
),
required=True,
)
parser.add_argument(
"--project-number",
help=(
"project-number (e.g. 93817472919) is needed to fetch the cpu/memory"
" utilization data from GCP."
),
required=True,
)
parser.add_argument(
"--instance-id",
help="unique string ID for current test-run",
required=True,
)
parser.add_argument(
"-o",
"--output-file",
metavar="Output file (CSV) path",
help="File path of the output metrics (in CSV format)",
default="output.csv",
)
args = parser.parse_args()
def createOutputScenariosFromDownloadedFiles(args: dict) -> dict:
"""Creates output records from the downloaded local files.
ensureDir(_LOCAL_LOGS_LOCATION)
The following creates a dict called 'output'
from the downloaded dlio output files, which are in the following format.
dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads, args.instance_id)
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/per_epoch_stats.json
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/<scenario>/summary.json
<_LOCAL_LOGS_LOCATION>/<instanceId>/<numFilesTrain>-<recordLength>-<batchSize>-<hash>/gcsfuse-generic/gcsfuse_mount_options
"""
Output dict structure:
"{num_files_train}-{mean_file_size}-{batch_size}":
"mean_file_size": str
"num_files_train": str
Expand All @@ -135,31 +97,32 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
"gcsfuse-generic": [record1, record2, record3, record4]
"gcsfuse-file-cache": [record1, record2, record3, record4]
"gcsfuse-no-file-cache": [record1, record2, record3, record4]
"""
output = {}
mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")
"""

output = {}
for root, _, files in os.walk(_LOCAL_LOGS_LOCATION + "/" + args.instance_id):
print(f"Parsing directory {root} ...")
if files:
print(f"Parsing directory {root} ...")
per_epoch_stats_file = root + "/per_epoch_stats.json"
summary_file = root + "/summary.json"

# If directory contains gcsfuse_mount_options file, then parse gcsfuse
# mount options from it in record.
gcsfuse_mount_options = ""
gcsfuse_mount_options_file = root + "/gcsfuse_mount_options"
if os.path.isfile(gcsfuse_mount_options_file):
with open(gcsfuse_mount_options_file) as f:
gcsfuse_mount_options = f.read().strip()

per_epoch_stats_file = root + "/per_epoch_stats.json"
summary_file = root + "/summary.json"

# Load per_epoch_stats.json .
with open(per_epoch_stats_file, "r") as f:
try:
per_epoch_stats_data = json.load(f)
except:
print(f"failed to json-parse {per_epoch_stats_file}")
continue

# Load summary.json .
with open(summary_file, "r") as f:
try:
summary_data = json.load(f)
Expand All @@ -168,6 +131,7 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
continue

for i in range(summary_data["epochs"]):
# Get numFilesTrain, recordLength, batchSize from the file/dir path.
key = root.split("/")[-2]
key_split = key.split("-")

Expand All @@ -184,6 +148,7 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
},
}

# Create a record for this key.
r = record.copy()
r["pod_name"] = summary_data["hostname"]
r["epoch"] = i + 1
Expand All @@ -204,95 +169,127 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str):
per_epoch_stats_data[str(i + 1)]["start"]
)
r["end"] = standard_timestamp(per_epoch_stats_data[str(i + 1)]["end"])
if r["scenario"] != "local-ssd" and mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
pass

def fetch_cpu_memory_data():
if r["scenario"] != "local-ssd":
if mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
pass

fetch_cpu_memory_data()

r["gcsfuse_mount_options"] = gcsfuse_mount_options

# This print is for debugging in case something goes wrong.
pprint.pprint(r)

# If a slot for record for this particular epoch has not been created yet,
# append enough empty records to make a slot.
while len(output[key]["records"][r["scenario"]]) < i + 1:
output[key]["records"][r["scenario"]].append({})

# Insert the record at the appropriate slot.
output[key]["records"][r["scenario"]][i] = r

scenario_order = [
"local-ssd",
"gcsfuse-generic",
"gcsfuse-no-file-cache",
"gcsfuse-file-cache",
]
return output

output_file_path = args.output_file
ensureDir(os.path.dirname(output_file_path))
output_file = open(output_file_path, "a")
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory (MB),GCSFuse"
" Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse Highest CPU"
" (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n"
)

for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
def writeRecordsToCsvOutputFile(output: dict, output_file_path: str):
with open(output_file_path, "a") as output_file:
# Write a new header row.
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory"
" (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse"
" Highest CPU (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n"
)

for scenario in scenario_order:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
)

for scenario in SUPPORTED_SCENARIOS:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue

for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]

try:
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
else:
r["throughput_over_local_ssd"] = "NA"

except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0
except:
raise
output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
)
else:
for i in range(len(record_set["records"][scenario])):
r = record_set["records"][scenario][i]
r["throughput_over_local_ssd"] = "NA"

except Exception as e:
print(
"Error: failed to parse/write record-set for"
f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}"
)
continue

output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
)

output_file.close()
output_file.close()


if __name__ == "__main__":
args = parseLogParserArguments()
ensureDir(_LOCAL_LOGS_LOCATION)

dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads, args.instance_id)

mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

output = createOutputScenariosFromDownloadedFiles(args)

output_file_path = args.output_file
# Create the parent directory of output_file_path if doesn't
# exist already.
ensureDir(os.path.dirname(output_file_path))
writeRecordsToCsvOutputFile(output, output_file_path)
print(
"\n\nSuccessfully published outputs of DLIO test runs to"
f" {output_file_path} !!!\n\n"
)
Loading

0 comments on commit 781f497

Please sign in to comment.