Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet streaming [WIP] #538

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ name: Installation
on:
push:
branches:
- dev
- main
- release/*
pull_request:
branches:
- dev
- main
- release/*
workflow_dispatch: {}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/linting.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ name: Linting
on:
push:
branches:
- dev
- main
- release/*
pull_request:
branches:
- dev
- main
- release/*
workflow_call:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ name: Test
on:
push:
branches:
- dev
- main
- release/*
pull_request:
branches:
- dev
- main
- release/*
workflow_call:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ default_language_version:
python: python3
# Skip the pre-commit check for below directories to have
# a consistency with the official tfrecord preprocessing scripts
exclude: "^(streaming/text/convert/enwiki/)"
exclude: "^(examples/text/enwiki_tok/)"
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ style:
$(PYTHON) -m docformatter -ri $(dirs)

longlines:
find streaming tests -type f -name "*.py" | xargs grep -x '.\{100,\}'
python3 scripts/long_lines.py

test:
$(PYTHON) -m $(PYTEST) $(EXTRA_ARGS)
Expand Down
35 changes: 18 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,44 +143,45 @@ dataloader = DataLoader(dataset)

### 📚 What next?

Getting started guides, examples, API references, and other useful information can be found in our [docs](https://streaming.docs.mosaicml.com/).
Getting started guides, example notebooks, API references, and other useful information can be found in our [docs](https://streaming.docs.mosaicml.com/).

We have end-to-end tutorials for training a model on:

- [CIFAR-10](https://streaming.docs.mosaicml.com/en/stable/examples/cifar10.html)
- [FaceSynthetics](https://streaming.docs.mosaicml.com/en/stable/examples/facesynthetics.html)
- [SyntheticNLP](https://streaming.docs.mosaicml.com/en/stable/examples/synthetic_nlp.html)
- [CIFAR-10](https://streaming.docs.mosaicml.com/en/stable/notebooks/cifar10.html)
- [FaceSynthetics](https://streaming.docs.mosaicml.com/en/stable/notebooks/facesynthetics.html)
- [SyntheticNLP](https://streaming.docs.mosaicml.com/en/stable/notebooks/synthetic_nlp.html)

We also have starter code for the following popular datasets, which can be found in the `streaming` [directory](https://github.com/mosaicml/streaming/tree/main/streaming):
We also have starter code for the following popular datasets, which can be found under [`examples/`](https://github.com/mosaicml/streaming/tree/main/examples) organized by modality:

| Dataset | Task | Read | Write |
| --- | --- | --- | --- |
| LAION-400M | Text and image | [Read](https://github.com/mosaicml/diffusion-benchmark/blob/main/data.py) | [Write](https://github.com/mosaicml/streaming/tree/main/streaming/multimodal/convert/laion/laion400m) |
| WebVid | Text and video | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/multimodal/webvid.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/multimodal/convert/webvid.py) |
| C4 | Text | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/text/c4.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/text/convert/c4.py) |
| EnWiki | Text | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/text/enwiki.py) | [Write](https://github.com/mosaicml/streaming/tree/main/streaming/text/convert/enwiki) |
| Pile | Text | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/text/pile.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/text/convert/pile.py)
| ADE20K | Image segmentation | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/vision/ade20k.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/vision/convert/ade20k.py)
| CIFAR10 | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/vision/cifar10.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/vision/convert/cifar10.py) |
| COCO | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/vision/coco.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/vision/convert/coco.py) |
| ImageNet | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/streaming/vision/imagenet.py) | [Write](https://github.com/mosaicml/streaming/blob/main/streaming/vision/convert/imagenet.py) |
| LAION-400M | Text and image | [Read](https://github.com/mosaicml/diffusion-benchmark/blob/main/data.py) | [Write](https://github.com/mosaicml/streaming/tree/main/examples/multimodal/laion400m) |
| WebVid | Text and video | [Read](https://github.com/mosaicml/streaming/blob/main/examples/multimodal/webvid/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/multimodal/webvid/webvid/write/) |
| C4 | Text | [Read](https://github.com/mosaicml/streaming/blob/main/examples/text/c4/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/text/c4/write.py) |
| EnWiki | Text | [Read](https://github.com/mosaicml/streaming/blob/main/examples/text/enwiki_text/read.py) | [Write](https://github.com/mosaicml/streaming/tree/main/examples/text/enwiki/write.py) |
| Pile | Text | [Read](https://github.com/mosaicml/streaming/blob/main/examples/text/pile/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/text/pile/write.py)
| ADE20K | Image segmentation | [Read](https://github.com/mosaicml/streaming/blob/main/examples/vision/ade20k/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/vision/ade20k/write.py)
| CIFAR10 | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/examples/vision/cifar10/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/vision/cifar10/write.py) |
| COCO | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/examples/vision/coco/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/vision/coco/write.py) |
| ImageNet | Image classification | [Read](https://github.com/mosaicml/streaming/blob/main/examples/vision/imagenet/read.py) | [Write](https://github.com/mosaicml/streaming/blob/main/examples/vision/imagenet/write.py) |

**To start training on these datasets:**

1. Convert raw data into .mds format using the corresponding script from the `convert` directory.
1. Convert raw data into .mds format using the corresponding `write.py` script.

For example:

<!--pytest.mark.skip-->
```bash
$ python -m streaming.multimodal.convert.webvid --in <CSV file> --out <MDS output directory>
$ python -m examples.multimodal.webvid.write.craw_webvid --in <CSV file> --out_root <MDS output directory>
```

2. Import dataset class to start training the model.

<!--pytest.mark.skip-->
```python
from streaming.multimodal import StreamingInsideWebVid
from examples.multimodal.webvid.read import StreamingInsideWebVid

dataset = StreamingInsideWebVid(local=local, remote=remote, shuffle=True)
```

Expand Down
16 changes: 8 additions & 8 deletions STYLE_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ so other contributors will know why this error was silenced.
A public API, generally speaking, can be invoked by a user without a leading underscore in any portion of the path.
The following are examples of public APIs:

* Standalone functions in public modules (e.g. `streaming.base.distributed.get_world_size`)
* Classes in public modules (e.g. `streaming.base.format.MDSWriter`)
* Public methods in public classes (e.g. `streaming.base.format.MDSWriter.write`)
* Public modules (e.g. `streaming.base.dataset`)
* Standalone functions in public modules (e.g. `streaming.distributed.get_world_size`)
* Classes in public modules (e.g. `streaming.format.MDSWriter`)
* Public methods in public classes (e.g. `streaming.format.MDSWriter.write`)
* Public modules (e.g. `streaming.dataset`)

The following rules apply to public APIs:
1. All public APIs must have a docstring (see the Documentation section below)
Expand Down Expand Up @@ -201,16 +201,16 @@ All public modules must define `__all__` to be the list of members that should b
The variable is necessary to 1) limit what `from XXX import *` imports, and 2) ensure that the documentation only
includes exported members, not unrelated re-imports.

For example, from [streaming/base/dataset.py](streaming/base/dataset.py)
For example, from [streaming/dataset.py](streaming/dataset.py)

```python
"""The :class:`Dataset` class, used for building streaming iterable datasets."""
from torch.utils.data import IterableDataset

from streaming.base.format import reader_from_json
from streaming.base.spanner import Spanner
from streaming.format import shard_from_json
from streaming.spanner import Spanner

__all__ = ["Dataset"] # export only the Dataset, not other imports like `Spanner` or `reader_from_json`
__all__ = ["Dataset"] # Export `Dataset` only, not the others e.g. `Spanner` or `shard_from_json`.


class Dataset(IterableDataset):
Expand Down
14 changes: 14 additions & 0 deletions benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Streaming benchmarking."""

from benchmarks import compression as compression
from benchmarks import epoch as epoch
from benchmarks import hashing as hashing
from benchmarks import partition as partition
from benchmarks import samples as samples
from benchmarks import serialization as serialization
from benchmarks import shuffle as shuffle

__all__ = ['compression', 'epoch', 'hashing', 'partition', 'samples', 'serialization', 'shuffle']
204 changes: 204 additions & 0 deletions benchmarks/backends/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Generate a synthetic dataset."""

from typing import Dict, List, Tuple, TypeVar

import numpy as np
from numpy.random import Generator
from tqdm import tqdm

__all__ = ['generate']


def _generate_int(rng: Generator,
pos_prob: float = 0.75,
low: int = -1_000_000_000,
high: int = 1_000_000_000) -> int:
"""Pick a random integer to say in words.

This is a synthetic dataset whose random numbers need to be distinct, deterministic given a
seed, and little else. We choose a distribution that seems the most pleasing to us.

Properties:
* About 80% positive and 20% negative.
* Magnitude of up to a billion on either side of zero.
* Strongly skewed toward the origin, i.e. chosen uniformly across base-10 digit lengths (at
least until running out of integers of that length anyway).

Args:
rng (Generator): NumPy random number generator.
pos_prob (float): Probability of output being positive. Defaults to ``0.75``.
low (int): Minimum of output range. Must be negative. Defaults to ``-1_000_000_000``.
high (int): Maximum of output range. Must be positive. Defaults to ``1_000_000_000``.
"""
if not 0 <= pos_prob <= 1:
raise ValueError(f'Invalid positive probability ``pos_prob``: 0 <= {pos_prob} <= 1.')

if not low < 0 < high:
raise ValueError(f'Invalid sampling range ``low`` and/or ``high``: {low} < 0 < {high}.')

is_pos = rng.uniform() < pos_prob
max_digits = np.log10(high) if is_pos else np.log10(-low)
exponent = rng.uniform(0, max_digits)
magnitude = int(10**exponent)
sign = is_pos * 2 - 1
return sign * magnitude


def _generate_ints(count: int,
seed: int = 0x1337,
pos_prob: float = 0.75,
low: int = -1_000_000_000,
high: int = 1_000_000_000,
show_progress: bool = True) -> List[int]:
"""Sample until we have the given number of distinct integers.

Args:
count (int): How many samples to draw.
seed (int): Seed for the random number generator. Defaults to ``0x1337``.
pos_prob (float): Probability of output being positive. Defaults to ``0.75``.
low (int): Minimum of output range. Must be negative. Defaults to ``-1_000_000_000``.
high (int): Maximum of output range. Must be positive. Defaults to ``1_000_000_000``.
show_progress (bool): Whether to display a progress bar. Defaults to ``True``.

Returns:
List[int]: The integers that were drawn.
"""
rng = np.random.default_rng(seed)
nums = set()
progress_bar = tqdm(total=count, leave=False) if show_progress else None
while len(nums) < count:
num = _generate_int(rng)
if num in nums:
continue

nums.add(num)
if progress_bar:
progress_bar.update(1)
if progress_bar:
progress_bar.close()

nums = sorted(nums)
rng.shuffle(nums)
return nums


_ones = ('zero one two three four five six seven eight nine ten eleven twelve thirteen fourteen '
'fifteen sixteen seventeen eighteen nineteen').split()

_tens = 'twenty thirty forty fifty sixty seventy eighty ninety'.split()


def _int_to_words(num: int) -> List[str]:
"""Say an integer as a list of words.

Args:
num (int): The integer.

Returns:
List[str]: The integer as a list of words.
"""
if num < 0:
return ['negative'] + _int_to_words(-num)
elif num <= 19:
return [_ones[num]]
elif num < 100:
tens = [_tens[num // 10 - 2]]
ones = [_ones[num % 10]] if num % 10 else []
return tens + ones
elif num < 1_000:
hundreds = [_ones[num // 100], 'hundred']
etc = _int_to_words(num % 100) if num % 100 else []
return hundreds + etc
elif num < 1_000_000:
thousands = _int_to_words(num // 1_000) + ['thousand']
etc = _int_to_words(num % 1_000) if num % 1_000 else []
return thousands + etc
elif num < 1_000_000_000:
millions = _int_to_words(num // 1_000_000) + ['million']
etc = _int_to_words(num % 1_000_000) if num % 1_000_000 else []
return millions + etc
else:
raise ValueError('Integer out of range: -1,000,000,000 < {num} < +1,000,000,000.')


def _int_to_text(num: int) -> str:
"""Say an integer as text.

Args:
num (int): The integer.

Returns:
str: The integer as text.
"""
words = _int_to_words(num)
return ' '.join(words)


T = TypeVar('T')


def _split(items: List[T], sizes: List[int]) -> List[List[T]]:
"""Divide the given items across the splits given by their sizes.

Args:
items (List[Any]): The items to divide across the spans.
sizes (List[int]): Number of items per split.

Returns:
List[List[Any]]: Each split of items.
"""
total = sum(sizes)
if len(items) != total:
raise ValueError(f'Number of items must match the combined size of the splits: ' +
f'{len(items)} items vs splits of size {sizes} = {total}.')

splits = []
begin = 0
for size in sizes:
split = items[begin:begin + size]
splits.append(split)
begin += size

return splits


def generate(split2size: Dict[str, int],
seed: int = 0x1337,
pos_prob: float = 0.75,
low: int = -1_000_000_000,
high: int = 1_000_000_000,
show_progress: bool = True) -> Dict[str, Tuple[List[int], List[str]]]:
"""Generate a dataset, made of splits, to be saved in different forms for comparison.

Args:
split2size (Dict[str, int]): Mapping of split name to size in samples.
seed (int): Seed for the random number generator. Defaults to ``0x1337``.
pos_prob (float): Probability of output being positive. Defaults to ``0.75``.
low (int): Minimum of output range. Must be negative. Defaults to ``-1_000_000_000``.
high (int): Maximum of output range. Must be positive. Defaults to ``1_000_000_000``.
show_progress (bool): Whether to show a progress bar. Defaults to ``True``.

Returns:
Dict[str, Tuple[List[int], List[str]]]: Mapping of split name to nums and texts.
"""
split_sizes = []
total = 0
for split in sorted(split2size):
size = split2size[split]
split_sizes.append(size)
total += size

nums = _generate_ints(total, seed, low, high, show_progress)
nums_per_split = _split(nums, split_sizes)

texts = list(map(_int_to_text, nums))
texts_per_split = _split(texts, split_sizes)

dataset = {}
for index, split in enumerate(sorted(split2size)):
dataset[split] = nums_per_split[index], texts_per_split[index]

return dataset
Loading
Loading