Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let's blow away dist, and also shared memory #552

Draft
wants to merge 51 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
9b25914
Merge branch 'main' of github.com:mosaicml/streaming
knighton Dec 25, 2023
7867b10
Move epoch_size arg.
knighton Dec 26, 2023
afe835a
Move allow_unsafe_types arg.
knighton Dec 26, 2023
010c613
Fix usage.
knighton Dec 26, 2023
2f4875b
Propagate allow_unsafe_types as a normal Stream argument.
knighton Dec 26, 2023
333605e
Explicit list the kwargs for Stream.apply_defaults().
knighton Dec 26, 2023
2d8a905
Tweak docstrings.
knighton Dec 26, 2023
d298479
Complete rewrite of local dir collision detection using regular files.
knighton Dec 26, 2023
eff80e6
Add psutil.
knighton Dec 26, 2023
8fb9dca
Fix.
knighton Dec 26, 2023
223f3ca
Fix.
knighton Dec 26, 2023
664cbc1
Fix.
knighton Dec 26, 2023
02416d7
Fix.
knighton Dec 26, 2023
b0b3b56
Fix.
knighton Dec 26, 2023
23505c2
Fix.
knighton Dec 26, 2023
67b936f
Fix.
knighton Dec 26, 2023
fa14130
Fix.
knighton Dec 26, 2023
c54887b
Fix.
knighton Dec 26, 2023
c214589
Fix.
knighton Dec 26, 2023
c0c82bd
Remove dist from StreamingDataset init.
knighton Dec 26, 2023
105ee16
Sleep first out of race paranoia.
knighton Dec 26, 2023
afafcd8
Fix.
knighton Dec 26, 2023
62f043b
Break up base/interproc/registry.py -> base/coord/job/...
knighton Dec 27, 2023
57b2a05
base/world.py -> base/coord/world.py.
knighton Dec 27, 2023
408bd02
MMapped objects: Buffer -> Array -> Number -> Barrier.
knighton Dec 27, 2023
4a38a1e
Merge branch 'main' into james/nodist
knighton Dec 27, 2023
d3bb79c
Fix.
knighton Dec 27, 2023
43dd010
Add dirname field to JobDirectory.
knighton Dec 27, 2023
e252c1c
On the fly FileLock to solve pickling issue in StreamingDataset due t…
knighton Dec 27, 2023
ef09ca1
Refactor.
knighton Dec 27, 2023
4d34c29
Harden and organize SD args checking.
knighton Dec 27, 2023
e836452
Move shmem dir under the coord/ tree.
knighton Dec 27, 2023
9b024e9
Switch args order.
knighton Dec 28, 2023
5f0b430
Rewrite the mmap data structures.
knighton Dec 28, 2023
b73e966
Refactor.
knighton Dec 28, 2023
d35912e
Merge branch 'james/nodist' of github.com:mosaicml/streaming into jam…
knighton Dec 28, 2023
a7a33e9
Fix (tightened typing).
knighton Dec 28, 2023
a878fe5
More of that.
knighton Dec 28, 2023
470df2b
Update simulator.
knighton Dec 28, 2023
68c1fb7
Fix.
knighton Dec 28, 2023
5085ac7
Tweak tests.
knighton Dec 28, 2023
c6f2f4f
Fix.
knighton Dec 28, 2023
a3d810c
Fix.
knighton Dec 28, 2023
9dc7391
Partially revert some changes.
knighton Dec 28, 2023
7dd0d35
Hack.
knighton Dec 28, 2023
b1abc29
Fix.
knighton Dec 28, 2023
0d4bd6d
Temporarily disable test that hangs in CI.
knighton Dec 28, 2023
05ff308
Another.
knighton Dec 28, 2023
f5b93ed
test_config_root.
knighton Dec 28, 2023
0227c88
Fix.
knighton Dec 28, 2023
30d650c
Merge branch 'james/nodist' of github.com:mosaicml/streaming into jam…
knighton Jan 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,13 @@ def _modules_to_rst() -> List[types.ModuleType]:
document_modules: List[types.Module] = [
streaming,
streaming.base.compression,
streaming.base.coord,
streaming.base.format,
streaming.base.hashing,
streaming.base.partition,
streaming.base.shared,
streaming.base.shuffle,
streaming.base.storage,
streaming.base.util,
streaming.base.world,
]
exclude_modules: List[types.Module] = [streaming.base, streaming._version]
for name in streaming.__dict__:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
'azure-storage-blob>=12.0.0,<13',
'azure-storage-file-datalake>=12.11.0,<13',
'azure-identity>=1.13.0',
'psutil>=5.9.4',
]

extra_deps = {}
Expand Down
204 changes: 85 additions & 119 deletions simulation/core/sim_dataset.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion simulation/core/sim_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Contains info about the nodes, ranks, and workers of the run for simulation purposes."""

from streaming.base.world import World
from streaming.base.coord.world import World


class SimulationWorld(World):
Expand Down
30 changes: 24 additions & 6 deletions simulation/core/yaml_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,29 @@ def create_simulation_dataset(nodes: int, devices: int, workers: int, global_bat
sampling_granularity = train_dataset.get('sampling_granularity', 1)
batching_method = train_dataset.get('batching_method', 'random')

dataset = SimulationDataset(nodes, devices, workers, streams, remote, local, split,
download_retry, download_timeout, validate_hash, keep_zip,
epoch_size, predownload, cache_limit, partition_algo,
num_canonical_nodes, batch_size, shuffle, shuffle_algo,
shuffle_seed, shuffle_block_size, sampling_method,
sampling_granularity, batching_method)
dataset = SimulationDataset(nodes=nodes,
devices=devices,
workers=workers,
streams=streams,
remote=remote,
local=local,
split=split,
download_retry=download_retry,
download_timeout=download_timeout,
validate_hash=validate_hash,
keep_zip=keep_zip,
epoch_size=epoch_size,
predownload=predownload,
cache_limit=cache_limit,
partition_algo=partition_algo,
num_canonical_nodes=num_canonical_nodes,
batch_size=batch_size,
shuffle=shuffle,
shuffle_algo=shuffle_algo,
shuffle_seed=shuffle_seed,
shuffle_block_size=shuffle_block_size,
sampling_method=sampling_method,
sampling_granularity=sampling_granularity,
batching_method=batching_method)

return dataset
2 changes: 1 addition & 1 deletion streaming/base/batching/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from streaming.base.batching.per_stream import generate_work_per_stream_batching
from streaming.base.batching.random import generate_work_random_batching
from streaming.base.batching.stratified import generate_work_stratified_batching
from streaming.base.world import World
from streaming.base.coord.world import World

if TYPE_CHECKING:
from streaming.base.dataset import StreamingDataset
Expand Down
5 changes: 1 addition & 4 deletions streaming/base/batching/per_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import numpy as np
from numpy.typing import NDArray

from streaming.base.coord.world import World
from streaming.base.partition import get_partitions
from streaming.base.shuffle import get_shuffle
from streaming.base.world import World

if TYPE_CHECKING:
from streaming.base.dataset import StreamingDataset
Expand Down Expand Up @@ -63,9 +63,6 @@ def generate_work_per_stream_batching(dataset: StreamingDataset, world: World, e
# same as the ratio of the stream's samples to overall samples.
# This ensures that the overall training shuffle block size is still approximately
# equal to what is set by the user, and allows for reasoning about cache_limit as well.
if not isinstance(dataset.shuffle_block_size, int):
raise TypeError(f'Dataset `shuffle_block_size` must be an integer. ' +
f'Got {type(dataset.shuffle_block_size)} instead.')
shuffle_block_portion = int(dataset.shuffle_block_size * stream.proportion)
stream_shuffle = get_shuffle(dataset.shuffle_algo, shuffle_units,
dataset.num_canonical_nodes, dataset.shuffle_seed, epoch,
Expand Down
5 changes: 1 addition & 4 deletions streaming/base/batching/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import numpy as np
from numpy.typing import NDArray

from streaming.base.coord.world import World
from streaming.base.partition import get_partitions
from streaming.base.shuffle import get_shuffle
from streaming.base.world import World

if TYPE_CHECKING:
from streaming.base.dataset import StreamingDataset
Expand Down Expand Up @@ -58,9 +58,6 @@ def generate_work_random_batching(dataset: StreamingDataset, world: World, epoch

# If we need to shuffle, shuffle in a node-aware and *underlying* shard-aware way.
if dataset.shuffle:
if not isinstance(dataset.shuffle_block_size, int):
raise TypeError(f'Dataset `shuffle_block_size` must be an integer. ' +
f'Got {type(dataset.shuffle_block_size)} instead.')
shuffle = get_shuffle(dataset.shuffle_algo, shuffle_units, dataset.num_canonical_nodes,
dataset.shuffle_seed, epoch, dataset.shuffle_block_size)
big_ids = np.where(big_ids != -1, shuffle[big_ids], -1)
Expand Down
5 changes: 1 addition & 4 deletions streaming/base/batching/stratified.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import numpy as np
from numpy.typing import NDArray

from streaming.base.coord.world import World
from streaming.base.partition import get_partitions
from streaming.base.shuffle import get_shuffle
from streaming.base.world import World

if TYPE_CHECKING:
from streaming.base.dataset import StreamingDataset
Expand Down Expand Up @@ -75,9 +75,6 @@ def generate_work_stratified_batching(dataset: StreamingDataset, world: World, e
# same as the ratio of the stream's samples to overall samples.
# This ensures that the overall training shuffle block size is still approximately
# equal to what is set by the user, and allows for reasoning about cache_limit as well.
if not isinstance(dataset.shuffle_block_size, int):
raise TypeError(f'Dataset `shuffle_block_size` must be an integer. ' +
f'Got {type(dataset.shuffle_block_size)} instead.')
shuffle_block_portion = int(dataset.shuffle_block_size * stream.proportion)
stream_shuffle = get_shuffle(dataset.shuffle_algo, shuffle_units,
dataset.num_canonical_nodes, dataset.shuffle_seed, epoch,
Expand Down
15 changes: 15 additions & 0 deletions streaming/base/coord/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Coordination among ranks and workers."""

from streaming.base.coord.job import JobDirectory, JobRegistry
from streaming.base.coord.mmap import MMapArray, MMapBarrier, MMapBuffer, MMapNumber
from streaming.base.coord.shmem import (SharedArray, SharedBarrier, SharedMemory, SharedScalar,
get_shm_prefix)
from streaming.base.coord.world import World

__all__ = [
'JobDirectory', 'JobRegistry', 'MMapArray', 'MMapBarrier', 'MMapBuffer', 'MMapNumber',
'SharedArray', 'SharedBarrier', 'SharedMemory', 'get_shm_prefix', 'SharedScalar', 'World'
]
9 changes: 9 additions & 0 deletions streaming/base/coord/job/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Handling for jobs, which are collections of StreamingDataset replicas with the same config."""

from streaming.base.coord.job.directory import JobDirectory
from streaming.base.coord.job.registry import JobRegistry

__all__ = ['JobDirectory', 'JobRegistry']
51 changes: 51 additions & 0 deletions streaming/base/coord/job/directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""A directory containing all dataset-wide filesystem state for a Streaming job."""

import os
from pathlib import Path
from typing import Sequence

from streaming.base.coord.job.registry import JobRegistry
from streaming.base.coord.world import World
from streaming.base.stream import Stream

__all__ = ['JobDirectory']


class JobDirectory:
"""Represents a Streaming job lease. On ``__del__``, cleans up after itself.

When it goes out of scope naturally, this Job will delete its config dir and its hold on all
the local dirs it is streaming to.

If this process dies badly and the destructor is not reached, the same cleanup will be done by
some future process incidentally as it registers or unregisters a Streaming job. It can tell it
died by a combination of pid and process create time.

Args:
registry (JobRegistry): Stremaing job registry.
"""

def __init__(self, registry: JobRegistry, streams: Sequence[Stream], world: World) -> None:
self.registry = registry
self.streams = streams
self.world = world
self.job_hash = registry.register(streams, world)
self.dirname = Path(os.path.join(registry.config_root, self.job_hash))

def get_filename(self, path: str) -> str:
"""Get a filename by relative path under its job dir.

Args:
path (str): Path relative to job dir.

Returns:
str: Filename.
"""
return os.path.join(self.registry.config_root, self.job_hash, path)

def __del__(self) -> None:
"""Destructor."""
self.registry.unregister(self.job_hash, self.world)
65 changes: 65 additions & 0 deletions streaming/base/coord/job/entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""An entry in a Streaming job registry file."""

from typing import Any, Dict, List, Optional

from typing_extensions import Self

__all__ = ['JobEntry']


class JobEntry:
"""Info about a Streaming job for local dir reuse detection purposes.

Args:
index (int, optional): The job's index in the total list.
job_hash (str): Job hash.
stream_hashes (List[str]): Stream hashes.
stream_locals (List[str], optional): Stream locals, if available.
process_id (int): PID of local rank zero of the Streaming job.
register_time (int): Process registration time.
"""

def __init__(
self,
*,
index: Optional[int] = None,
job_hash: str,
stream_hashes: List[str],
stream_locals: Optional[List[str]] = None,
process_id: int,
register_time: int,
) -> None:
self.index = index
self.job_hash = job_hash
self.stream_hashes = stream_hashes
self.stream_locals = stream_locals
self.process_id = process_id
self.register_time = register_time

@classmethod
def from_json(cls, obj: Dict[str, Any]) -> Self:
"""Load from JSON.

Args:
obj (Dict[str, Any]): Source JSON object.

Returns:
Self: Loaded JobEntry.
"""
return cls(job_hash=obj['job_hash'],
stream_hashes=obj['stream_hashes'],
stream_locals=obj.get('stream_locals'),
process_id=obj['process_id'],
register_time=obj['register_time'])

def to_json(self) -> Dict[str, Any]:
return {
'job_hash': self.job_hash,
'stream_hashes': self.stream_hashes,
# stream_locals is not saved, only their hashes.
'process_id': self.process_id,
'register_time': self.register_time,
}
Loading