Skip to content

Commit

Permalink
WIP, ENH: memory efficient DXT segs
Browse files Browse the repository at this point in the history
Fixes #779

* at the moment on `main`, DXT record data is effectively
stored as a list of dictionaries of lists of dictionaries
that look like this:

```
DXT_list -> [rec0, rec1, ..., recN]
recN -> {"id":, ...,
          "rank":, ...,
          "write_segments": ...,
          ...}
recN["write_segments"] -> [seg0, seg1, ..., segN]
segN -> {"offset": int,
         "length": int,
         "start_time": float,
         "end_time": float}
```

- the list of segments is extremely memory inefficient, with
the smallest file in the matching issue exceeding 20 GB of
physical memory in `mod_read_all_dxt_records`:

```
Line #    Mem usage    Increment  Occurrences   Line Contents
   852                                                 # fetch records
   853   92.484 MiB   18.820 MiB           1           rec = backend.log_get_dxt_record(self.log, mod, dtype=dtype)
   854 20295.188 MiB    0.773 MiB        1025           while rec != None:
   855 20295.188 MiB    0.000 MiB        1024               self.records[mod].append(rec)
   856 20295.188 MiB    0.000 MiB        1024               self.data['modules'][mod]['num_records'] += 1
   857
   858                                                     # fetch next
   859 20295.188 MiB 20201.930 MiB        1024               rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype)
```

- if we switch to NumPy arrays the memory footprint drops a lot
(see below),
and the performance informally seems similar (36 seconds
vs. 33 seconds on `main` to produce a `report` object
with smallest file in matching issue):

```
Line #    Mem usage    Increment  Occurrences   Line Contents
   859 3222.547 MiB 3146.344 MiB        1024               rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype)
```

- this branch currently uses NumPy record arrays,
because I thought they'd be a better fit for a data
structure with 2 int columns and 2 float columns;
however, there is a big performance hit over
regular NumPy arrays (almost 6 minutes vs. 33
seconds for the smallest file in matchin issue);
so, if we could live without the extra dtype
structuring of a recarray, maybe that would be best
(we could also try to use a pandas dataframe, which
is another natural fit for dtype columns..)
  • Loading branch information
tylerjereddy committed Sep 19, 2022
1 parent b6c8599 commit 9fa66b8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 20 deletions.
40 changes: 21 additions & 19 deletions darshan-util/pydarshan/darshan/backend/cffi_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,29 +565,31 @@ def log_get_dxt_record(log, mod_name, reads=True, writes=True, dtype='dict'):

size_of = ffi.sizeof("struct dxt_file_record")
segments = ffi.cast("struct segment_info *", buf[0] + size_of )

arr_write = np.recarray(wcnt, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])
arr_read = np.recarray(rcnt, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])

for i in range(wcnt):
seg = {
"offset": segments[i].offset,
"length": segments[i].length,
"start_time": segments[i].start_time,
"end_time": segments[i].end_time
}
rec['write_segments'].append(seg)


for i in range(rcnt):
i = i + wcnt
seg = {
"offset": segments[i].offset,
"length": segments[i].length,
"start_time": segments[i].start_time,
"end_time": segments[i].end_time
}
rec['read_segments'].append(seg)
arr_write[i, ...] = (segments[i].offset,
segments[i].length,
segments[i].start_time,
segments[i].end_time)

for k in range(rcnt):
i = k + wcnt
arr_read[k, ...] = (segments[i].offset,
segments[i].length,
segments[i].start_time,
segments[i].end_time)


rec['write_segments'] = arr_write
rec['read_segments'] = arr_read
if dtype == "pandas":
rec['read_segments'] = pd.DataFrame(rec['read_segments'])
rec['write_segments'] = pd.DataFrame(rec['write_segments'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ def get_rd_wr_dfs(
# ignore for the same reason as above
seg_df = _dict[seg_key] # type: ignore
if seg_df.size:
seg_df.columns = ["offset",
"length",
"start_time",
"end_time"]
# drop unused columns from the dataframe
seg_df = seg_df.drop(columns=drop_columns)
# create new column for the ranks
Expand Down
50 changes: 49 additions & 1 deletion darshan-util/pydarshan/darshan/tests/test_moddxt.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os

import pytest
import numpy as np
from numpy.testing import assert_allclose
import darshan.backend.cffi_backend as backend
from darshan.log_utils import get_log_path

Expand Down Expand Up @@ -33,7 +35,53 @@
'read_segments': []})])
def test_dxt_records(logfile, mod, expected_dict):
# regression guard for DXT records values
# write_segments and read_segments are now NumPy
# recarrays, to save considerable memory
# per gh-779
# TODO: refactor for simplicity--we can probably
# just initialize the expected values via
# np.array() with the appropriate structured dtypes
expected_write_segs = np.recarray(1, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])
expected_read_segs = np.recarray(1, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])
if expected_dict["write_segments"]:
expected_write_segs.offset = expected_dict["write_segments"][0]["offset"]
expected_write_segs.length = expected_dict["write_segments"][0]["length"]
expected_write_segs.start_time = expected_dict["write_segments"][0]["start_time"]
expected_write_segs.end_time = expected_dict["write_segments"][0]["end_time"]
else:
expected_write_segs = np.recarray(0, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])
if expected_dict["read_segments"]:
expected_read_segs.offset = expected_dict["read_segments"][0]["offset"]
expected_read_segs.length = expected_dict["read_segments"][0]["length"]
expected_read_segs.start_time = expected_dict["read_segments"][0]["start_time"]
expected_read_segs.end_time = expected_dict["read_segments"][0]["end_time"]
else:
expected_read_segs = np.recarray(0, dtype=[("offset", int),
("length", int),
("start_time", float),
("end_time", float)])
expected_dict["write_segments"] = expected_write_segs
expected_dict["read_segments"] = expected_read_segs

logfile = get_log_path(logfile)
log = backend.log_open(logfile)
rec = backend.log_get_record(log, mod)
assert rec == expected_dict
for key in expected_dict.keys():
if "segments" in key:
# careful, can't use assert_allclose directly
# on recarrays
assert_allclose(rec[key].offset, expected_dict[key].offset)
assert_allclose(rec[key].length, expected_dict[key].length)
assert_allclose(rec[key].start_time, expected_dict[key].start_time)
assert_allclose(rec[key].end_time, expected_dict[key].end_time)
else:
assert rec[key] == expected_dict[key]

0 comments on commit 9fa66b8

Please sign in to comment.