Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7004: use generators when returning from _deploy_ray_func remote function. #7005

Merged
merged 5 commits into from
Mar 13, 2024

Conversation

arunjose696
Copy link
Collaborator

What do these changes do?

Using yield in _deploy_ray_func to return generator instead of list.

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Have to use generators when returning from _deploy_ray_func remote function. #7004 ?
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

Comment on lines 329 to 331
for r in result:
for item in [r, len(r), len(r.columns), ip]:
yield item
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the tip to use generators only helps in cases where each iteration of the yielding for-loop actually creates some objects/allocates new memory. In this for loop we already have all the objects computed (dfs in result) and the memory for all the results was already allocated.

Can we instead use generators in split_result_of_axis_func_pandas function that actually creates a list of resulting dataframes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we should use generators here and in split_result_of_axis_func_pandas too.

@arunjose696 arunjose696 marked this pull request as draft March 6, 2024 18:52
@@ -80,7 +80,8 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None)
Splitted dataframe represented by list of frames.
"""
if num_splits == 1:
return [result]
yield result
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not execute the rest of code, and exit the function. Or I will have to put the rest of code in an else block.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else branch is good for me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@arunjose696 arunjose696 marked this pull request as ready for review March 7, 2024 08:11
@arunjose696 arunjose696 marked this pull request as draft March 7, 2024 08:13
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we check if all parts of result are dataframes. What are the scenarios where the result would be heterogeneous( composed of dataframes and non dataframes)?
One possibility I can think of is results could have errors, in this scenario I think it would it be sufficient to send [r, None, None, ip] for the errors and send[r, len(r), len(r.columns), ip]for the results that are dataframes. Would this suffice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would suffice.

@@ -80,7 +80,8 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None)
Splitted dataframe represented by list of frames.
"""
if num_splits == 1:
return [result]
yield result
return
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To not execute the rest of code, and exit the function. Or I will have to put the rest of code in an else block.

@YarShev
Copy link
Collaborator

YarShev commented Mar 7, 2024

@arunjose696, once you fix all CI jobs, please convert this PR to ready for review and put some time and memory measurements in the PR description.

@arunjose696 arunjose696 marked this pull request as ready for review March 11, 2024 09:42
@arunjose696
Copy link
Collaborator Author

@arunjose696, once you fix all CI jobs, please convert this PR to ready for review and put some time and memory measurements in the PR description.

<style> </style>
  Memory   Time  
  Current Master Current Master
Physionet 20.6 20.7 40.3 41sec
ny_taxi_ml 108 111 12 12.3min
ny_taxi 55.4 53.2 25.9 26.7sec
Census 25.9 26.3 1.12 1.18min
Exfo 81 82.2 3.77 3.81min
Fraud 24.8 24.7 1.56 1.61min

dchigarev
dchigarev previously approved these changes Mar 13, 2024
Copy link
Collaborator

@dchigarev dchigarev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@YarShev
Copy link
Collaborator

YarShev commented Mar 13, 2024

@AndreyPavlenko, since you are also going to use generators in remote functions for virtual partitions in #6991, can you look at the changes in this PR? How do they affect your changes and should we merge them?

dchigarev
dchigarev previously approved these changes Mar 13, 2024
@@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions(
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
if return_generator:
return generate_result_of_axis_func_pandas(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return generate_result_of_axis_func_pandas(
yield from generate_result_of_axis_func_pandas(

?

Copy link
Collaborator Author

@arunjose696 arunjose696 Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldnt work because using yeild in a function would turn it to a generator.

We do not require generators but lists for some branches of if , For the backends such as dask as we try to return a list of partitions, but as there is yield statement in the function a generator would still be returned and thus partitions would be empty when materialized.

https://stackoverflow.com/questions/26595895/return-and-yield-in-the-same-function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean not just yield but yield from. Would it work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with yield from as well still the function returns a generator when called so the code fails for dask.

@YarShev
Copy link
Collaborator

YarShev commented Mar 13, 2024

CI is failing

Signed-off-by: arunjose696 <[email protected]>
@AndreyPavlenko
Copy link
Collaborator

@AndreyPavlenko, since you are also going to use generators in remote functions for virtual partitions in #6991, can you look at the changes in this PR? How do they affect your changes and should we merge them?

In #6991 _deploy_ray_func() is not used. A different approach is used there. The virtual partition's apply() functions do not split the resulting frame, but return a list of lazy partitions instead. Each partition has a deferred function, that should get the required piece of the df, i.e. the partition0 receives df.iloc[0:10], partition1 - df.iloc[10:20] ... and so on. These functions are executed lazy. It allows to do not split the entire frame if only a subset of the frame is required in the subsequent operations.

@YarShev YarShev merged commit eb740b9 into modin-project:master Mar 13, 2024
45 checks passed
@anmyachev
Copy link
Collaborator

anmyachev commented Mar 27, 2024

Using generators to reduce heap memory usage in remote functions.

@YarShev @arunjose696 In what minimal version of Ray did this feature appear? It seems we have implicitly increased it.

UPD: Generators are supported starting from ray 2.1.0: https://github.com/ray-project/ray/releases/tag/ray-2.1.0

@YarShev
Copy link
Collaborator

YarShev commented Mar 27, 2024

@anmyachev, oh, good catch! It seems true that generators are supported starting from ray 2.1.0. We started using generators since we introduced lazy execution for block partitions. It looks like we would have to change a lot of code. @AndreyPavlenko, do you think how much it takes for us to put a check for the ray version to either use generator or not? Or we can just update a minimal supported Ray version?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Have to use generators when returning from _deploy_ray_func remote function.
5 participants