Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify dataframe_to_mds to accept streaming DF #478

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

maddiedawson
Copy link
Contributor

@maddiedawson maddiedawson commented Oct 20, 2023

Description of changes:

Modify util to accept a streaming dataframe. Performs merge index per-batch.

Tested in https://e2-dogfood.staging.cloud.databricks.com/?o=6051921418418893#notebook/4230899825197178

Issue #, if available:

Merge Checklist:

Put an x without space in the boxes that apply. If you are unsure about any checklist, please don't hesitate to ask. We are here to help! This is simply a reminder of what we are going to look for before merging your pull request.

General

  • I have read the contributor guidelines
  • This is a documentation change or typo fix. If so, skip the rest of this checklist.
  • I certify that the changes I am introducing will be backward compatible, and I have discussed concerns about this, if any, with the MosaicML team.
  • I have updated any necessary documentation, including README and API docs (if appropriate).

Tests

  • I ran pre-commit on my change. (check out the pre-commit section of prerequisites)
  • I have added tests that prove my fix is effective or that my feature works (if appropriate).
  • I ran the tests locally to make sure it pass. (check out testing)
  • I have added unit and/or integration tests as appropriate to ensure backward compatibility of the changes.

@@ -261,6 +262,9 @@ def write_mds(iterator: Iterable):
if cu.remote is None:
mds_path = (cu.local, '')
else:
if dataframe.isStreaming:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why only outputting to local is possible here? What is the challenge to support original out (meaning local, remote or (local, remote))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to disallow concurrent writes to the top-level index.json. We need a lock file that is shared among all workers. This is implemented here using open with flags to ensure that opening the lock file fails if it already exists.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants