From af4e24a1264f0a25dfae5dc3b70b1597b53e9758 Mon Sep 17 00:00:00 2001 From: Felix Vh <24791380+vcfgv@users.noreply.github.com> Date: Wed, 14 Sep 2022 16:09:09 +0800 Subject: [PATCH] Remove storage service from supervisor (#3254) (cherry picked from commit 71b10354f2631b026b94e04237c35003e8228af0) --- mars/dataframe/utils.py | 48 +++--- mars/deploy/oscar/base_config.yml | 2 +- .../oscar/tests/test_checked_session.py | 77 +-------- .../tests/test_clean_up_and_restore_func.py | 149 ++++++++++++++++++ mars/deploy/oscar/tests/test_local.py | 2 - mars/deploy/oscar/tests/test_ray.py | 26 +++ mars/deploy/oscar/tests/test_ray_dag.py | 29 +++- mars/deploy/oscar/tests/test_ray_dag_oscar.py | 20 +++ mars/services/context.py | 18 +-- mars/services/storage/supervisor/__init__.py | 6 +- mars/services/storage/supervisor/service.py | 57 ------- mars/utils.py | 25 +++ 12 files changed, 272 insertions(+), 187 deletions(-) create mode 100644 mars/deploy/oscar/tests/test_clean_up_and_restore_func.py delete mode 100644 mars/services/storage/supervisor/service.py diff --git a/mars/dataframe/utils.py b/mars/dataframe/utils.py index ca0e684018..727d12c307 100644 --- a/mars/dataframe/utils.py +++ b/mars/dataframe/utils.py @@ -14,6 +14,7 @@ import os import sys +import cloudpickle import functools import itertools import logging @@ -32,7 +33,6 @@ from ..core import Entity, ExecutableTuple from ..core.context import Context, get_context from ..lib.mmh3 import hash as mmh_hash -from ..services.task.execution.ray.context import RayExecutionContext from ..tensor.utils import dictify_chunk_size, normalize_chunk_sizes from ..typing import ChunkType, TileableType from ..utils import ( @@ -42,6 +42,7 @@ ModulePlaceholder, is_full_slice, parse_readable_size, + is_on_ray, ) try: @@ -1432,6 +1433,9 @@ def _concat_chunks(merge_chunks: List[ChunkType], output_index: int): return new_op.new_tileable(df_or_series.op.inputs, kws=[params]) +# TODO: clean_up_func, is_on_ray and restore_func functions may be +# removed or refactored in the future to calculate func size +# with more accuracy as well as address some serialization issues. def clean_up_func(op): closure_clean_up_bytes_threshold = int( os.getenv("MARS_CLOSURE_CLEAN_UP_BYTES_THRESHOLD", 10**4) @@ -1441,20 +1445,6 @@ def clean_up_func(op): ctx = get_context() if ctx is None: return - # Before PR #3165 is merged, func cleanup is temporarily disabled under ray task mode. - # https://github.com/mars-project/mars/pull/3165 - if isinstance(ctx, RayExecutionContext): - logger.warning("Func cleanup is currently disabled under ray task mode.") - return - # Note: Vineyard internally uses `pickle` which fails to pickle - # cell objects and corresponding functions. - if vineyard is not None: - storage_backend = ctx.get_storage_info() - if storage_backend.get("name", None) == "vineyard": - logger.warning( - "Func cleanup is currently disabled when vineyard is used as storage backend." - ) - return func = op.func if hasattr(func, "__closure__") and func.__closure__ is not None: @@ -1464,21 +1454,27 @@ def clean_up_func(op): if counted_bytes >= closure_clean_up_bytes_threshold: op.need_clean_up_func = True break - # Note: op.func_key is set only when op.need_clean_up_func is True. + # Note: op.func_key is set only when func was put into storage. if op.need_clean_up_func: assert ( op.logic_key is not None - ), "Logic key wasn't calculated before cleaning up func." - op.func_key = ctx.storage_put(op.func) - op.func = None + ), f"Logic key of {op} wasn't calculated before cleaning up func." + logger.debug(f"{op} need cleaning up func.") + if is_on_ray(ctx): + import ray + + op.func_key = ray.put(op.func) + op.func = None + else: + op.func = cloudpickle.dumps(op.func) def restore_func(ctx: Context, op): if op.need_clean_up_func and ctx is not None: - assert ( - op.func_key is not None - ), "Func key wasn't properly set while cleaning up func." - assert ( - op.func is None - ), "While restoring func, op.func should be None to ensure that cleanup was executed." - op.func = ctx.storage_get(op.func_key) + logger.debug(f"{op} need restoring func.") + if is_on_ray(ctx): + import ray + + op.func = ray.get(op.func_key) + else: + op.func = cloudpickle.loads(op.func) diff --git a/mars/deploy/oscar/base_config.yml b/mars/deploy/oscar/base_config.yml index e10421fa72..bb35da1c2e 100644 --- a/mars/deploy/oscar/base_config.yml +++ b/mars/deploy/oscar/base_config.yml @@ -19,7 +19,7 @@ storage: default_config: transfer_block_size: 5 * 1024 ** 2 plasma: - store_memory: 12% + store_memory: 20% "@overriding_fields": ["backends"] meta: store: dict diff --git a/mars/deploy/oscar/tests/test_checked_session.py b/mars/deploy/oscar/tests/test_checked_session.py index f74abc0906..5690215d86 100644 --- a/mars/deploy/oscar/tests/test_checked_session.py +++ b/mars/deploy/oscar/tests/test_checked_session.py @@ -15,14 +15,11 @@ from typing import Any, Dict import numpy as np -import pandas as pd import pytest -from .... import dataframe as md from .... import tensor as mt -from ....dataframe.base.apply import ApplyOperand from ....config import option_context -from ....core import TileableGraph, TileableType, OperandType +from ....core import TileableType, OperandType from ....services.task.supervisor.tests import CheckedTaskPreprocessor from ....services.subtask.worker.tests import CheckedSubtaskProcessor from ..local import _load_config @@ -42,42 +39,6 @@ def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): return super()._execute_operand(ctx, op) -class FuncKeyCheckedTaskPreprocessor(CheckedTaskPreprocessor): - def tile(self, tileable_graph: TileableGraph): - ops = [t.op for t in tileable_graph if isinstance(t.op, ApplyOperand)] - assert all(hasattr(op, "func_key") for op in ops) - assert all(op.func_key is None for op in ops) - assert all(op.func is not None for op in ops) - assert all(op.need_clean_up_func is False for op in ops) - result = super().tile(tileable_graph) - for op in ops: - assert hasattr(op, "func_key") - if op.func_key is not None: - assert op.need_clean_up_func is True - assert op.func is None - else: - assert op.need_clean_up_func is False - assert op.func is not None - return result - - -class FuncKeyCheckedSubtaskProcessor(CheckedSubtaskProcessor): - def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): - if isinstance(op, ApplyOperand): - assert hasattr(op, "func_key") - if op.func_key is not None: - assert op.need_clean_up_func is True - assert op.func is None - else: - assert op.need_clean_up_func is False - assert op.func is not None - result = super()._execute_operand(ctx, op) - assert op.func is not None - return result - else: - return super()._execute_operand(ctx, op) - - @pytest.fixture(scope="module") def setup(): with option_context({"show_progress": False}): @@ -134,39 +95,3 @@ def test_check_subtask_processor(setup): b.execute(extra_config={"check_all": False}) sess.stop_server() - - -def test_clean_up_and_restore_func(setup): - config = _load_config(CONFIG_FILE) - config["task"][ - "task_preprocessor_cls" - ] = "mars.deploy.oscar.tests.test_checked_session.FuncKeyCheckedTaskPreprocessor" - config["subtask"][ - "subtask_processor_cls" - ] = "mars.deploy.oscar.tests.test_checked_session.FuncKeyCheckedSubtaskProcessor" - - sess = new_test_session(default=True, config=config) - - cols = [chr(ord("A") + i) for i in range(10)] - df_raw = pd.DataFrame(dict((c, [i**2 for i in range(20)]) for c in cols)) - df = md.DataFrame(df_raw, chunk_size=5) - - x_small = pd.Series([i for i in range(10)]) - y_small = pd.Series([i for i in range(10)]) - x_large = pd.Series([i for i in range(10**4)]) - y_large = pd.Series([i for i in range(10**4)]) - - def closure_small(z): - return pd.concat([x_small, y_small], ignore_index=True) - - def closure_large(z): - return pd.concat([x_large, y_large], ignore_index=True) - - # no need to clean up func, func_key won't be set - r_small = df.apply(closure_small, axis=1) - r_small.execute() - # need to clean up func, func_key will be set - r_large = df.apply(closure_large, axis=1) - r_large.execute() - - sess.stop_server() diff --git a/mars/deploy/oscar/tests/test_clean_up_and_restore_func.py b/mars/deploy/oscar/tests/test_clean_up_and_restore_func.py new file mode 100644 index 0000000000..dd485d5f9c --- /dev/null +++ b/mars/deploy/oscar/tests/test_clean_up_and_restore_func.py @@ -0,0 +1,149 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import pandas as pd +import pytest + +from .... import dataframe as md +from ....dataframe.base.apply import ApplyOperand +from ....config import option_context +from ....core import TileableGraph, OperandType +from ....services.task.supervisor.tests import CheckedTaskPreprocessor +from ....services.subtask.worker.tests import CheckedSubtaskProcessor +from ....services.task.supervisor.preprocessor import TaskPreprocessor +from ....services.subtask.worker.processor import SubtaskProcessor + +from ....utils import lazy_import +from ..local import _load_config as _load_mars_config +from ..tests.session import new_test_session, CONFIG_FILE + +ray = lazy_import("ray") + + +class MarsBackendFuncCheckedTaskPreprocessor(CheckedTaskPreprocessor): + def tile(self, tileable_graph: TileableGraph): + ops = [t.op for t in tileable_graph if isinstance(t.op, ApplyOperand)] + for op in ops: + assert hasattr(op, "func_key") + assert op.func_key is None + assert op.func is not None + assert callable(op.func) + assert op.need_clean_up_func is False + result = super().tile(tileable_graph) + for op in ops: + assert hasattr(op, "func_key") + assert op.func_key is None + if op.need_clean_up_func: + assert isinstance(op.func, bytes) + else: + assert callable(op.func) + return result + + +class MarsBackendFuncCheckedSubtaskProcessor(CheckedSubtaskProcessor): + def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): + if isinstance(op, ApplyOperand): + assert hasattr(op, "func_key") + assert op.func_key is None + if op.need_clean_up_func: + assert isinstance(op.func, bytes) + else: + assert callable(op.func) + result = super()._execute_operand(ctx, op) + assert op.func is not None + assert callable(op.func) + return result + else: + return super()._execute_operand(ctx, op) + + +class RayBackendFuncTaskPreprocessor(TaskPreprocessor): + def tile(self, tileable_graph: TileableGraph): + ops = [t.op for t in tileable_graph if isinstance(t.op, ApplyOperand)] + for op in ops: + assert hasattr(op, "func_key") + assert op.func_key is None + assert op.func is not None + assert callable(op.func) + assert op.need_clean_up_func is False + result = super().tile(tileable_graph) + for op in ops: + assert hasattr(op, "func_key") + if op.need_clean_up_func: + assert op.func is None + assert isinstance(op.func_key, ray.ObjectRef) + else: + assert callable(op.func) + assert op.func_key is None + return result + + +class RayBackendFuncSubtaskProcessor(SubtaskProcessor): + def _execute_operand(self, ctx: Dict[str, Any], op: OperandType): + if isinstance(op, ApplyOperand): + assert hasattr(op, "func_key") + if op.need_clean_up_func: + assert op.func is None + assert isinstance(op.func_key, ray.ObjectRef) + else: + assert callable(op.func) + assert op.func_key is None + result = super()._execute_operand(ctx, op) + assert op.func is not None + assert callable(op.func) + return result + else: + return super()._execute_operand(ctx, op) + + +@pytest.fixture(scope="module") +def setup(): + with option_context({"show_progress": False}): + yield + + +def test_mars_backend_clean_up_and_restore_func(setup): + config = _load_mars_config(CONFIG_FILE) + config["task"][ + "task_preprocessor_cls" + ] = "mars.deploy.oscar.tests.test_clean_up_and_restore_func.MarsBackendFuncCheckedTaskPreprocessor" + config["subtask"][ + "subtask_processor_cls" + ] = "mars.deploy.oscar.tests.test_clean_up_and_restore_func.MarsBackendFuncCheckedSubtaskProcessor" + + sess = new_test_session(default=True, config=config) + + cols = [chr(ord("A") + i) for i in range(10)] + df_raw = pd.DataFrame(dict((c, [i**2 for i in range(20)]) for c in cols)) + df = md.DataFrame(df_raw, chunk_size=5) + + x_small = pd.Series([i for i in range(10)]) + y_small = pd.Series([i for i in range(10)]) + x_large = pd.Series([i for i in range(10**4)]) + y_large = pd.Series([i for i in range(10**4)]) + + def closure_small(z): + return pd.concat([x_small, y_small], ignore_index=True) + + def closure_large(z): + return pd.concat([x_large, y_large], ignore_index=True) + + r_small = df.apply(closure_small, axis=1) + r_small.execute() + r_large = df.apply(closure_large, axis=1) + r_large.execute() + + sess.stop_server() diff --git a/mars/deploy/oscar/tests/test_local.py b/mars/deploy/oscar/tests/test_local.py index 0657d07659..38b7901af5 100644 --- a/mars/deploy/oscar/tests/test_local.py +++ b/mars/deploy/oscar/tests/test_local.py @@ -366,8 +366,6 @@ async def test_execute_describe(create_cluster): ) -# Note: Vineyard internally uses `pickle` which fails to pickle -# cell objects and corresponding functions. @pytest.mark.asyncio async def test_execute_apply_closure(create_cluster): # DataFrame diff --git a/mars/deploy/oscar/tests/test_ray.py b/mars/deploy/oscar/tests/test_ray.py index df58347b26..7b1f746ed7 100644 --- a/mars/deploy/oscar/tests/test_ray.py +++ b/mars/deploy/oscar/tests/test_ray.py @@ -129,6 +129,32 @@ async def test_execute_describe(ray_start_regular_shared, create_cluster): await test_local.test_execute_describe(create_cluster) +@require_ray +@pytest.mark.asyncio +async def test_execute_apply_closure(ray_start_regular_shared, create_cluster): + await test_local.test_execute_apply_closure(create_cluster) + + +@require_ray +@pytest.mark.parametrize( + "create_cluster", + [ + { + "config": { + "task.task_preprocessor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncTaskPreprocessor", + "subtask.subtask_processor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncSubtaskProcessor", + } + } + ], + indirect=True, +) +@pytest.mark.asyncio +async def test_ray_oscar_clean_up_and_restore_func( + ray_start_regular_shared, create_cluster +): + await test_local.test_execute_apply_closure(create_cluster) + + @require_ray @pytest.mark.asyncio async def test_fetch_infos(ray_start_regular_shared, create_cluster): diff --git a/mars/deploy/oscar/tests/test_ray_dag.py b/mars/deploy/oscar/tests/test_ray_dag.py index f01e8d87dd..57ea73c7b9 100644 --- a/mars/deploy/oscar/tests/test_ray_dag.py +++ b/mars/deploy/oscar/tests/test_ray_dag.py @@ -13,7 +13,6 @@ # limitations under the License. import copy -import logging import os import time @@ -193,11 +192,27 @@ def test_merge_groupby(ray_start_regular_shared2, setup, method, auto_merge): test_session.test_merge_groupby(setup, method, auto_merge) -# Before PR #3165 is merged, func cleanup is temporarily disabled under ray task mode. -# https://github.com/mars-project/mars/pull/3165 @require_ray @pytest.mark.asyncio -async def test_execute_apply_closure(ray_start_regular_shared2, create_cluster, caplog): - with caplog.at_level(logging.WARNING): - await test_local.test_execute_apply_closure(create_cluster) - assert "Func cleanup is currently disabled under ray task mode." in caplog.text +async def test_execute_apply_closure(ray_start_regular_shared2, create_cluster): + await test_local.test_execute_apply_closure(create_cluster) + + +@require_ray +@pytest.mark.parametrize( + "create_cluster", + [ + { + "config": { + "task.task_preprocessor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncTaskPreprocessor", + "subtask.subtask_processor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncSubtaskProcessor", + } + } + ], + indirect=True, +) +@pytest.mark.asyncio +async def test_ray_dag_clean_up_and_restore_func( + ray_start_regular_shared2, create_cluster +): + await test_local.test_execute_apply_closure(create_cluster) diff --git a/mars/deploy/oscar/tests/test_ray_dag_oscar.py b/mars/deploy/oscar/tests/test_ray_dag_oscar.py index 41d28c8c6c..987e5cf549 100644 --- a/mars/deploy/oscar/tests/test_ray_dag_oscar.py +++ b/mars/deploy/oscar/tests/test_ray_dag_oscar.py @@ -52,3 +52,23 @@ async def test_iterative_tiling(ray_start_regular_shared2, create_cluster): @require_ray async def test_execute_describe(ray_start_regular_shared2, create_cluster): await test_local.test_execute_describe(create_cluster) + + +@require_ray +@pytest.mark.parametrize( + "create_cluster", + [ + { + "config": { + "task.task_preprocessor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncTaskPreprocessor", + "subtask.subtask_processor_cls": "mars.deploy.oscar.tests.test_clean_up_and_restore_func.RayBackendFuncSubtaskProcessor", + } + } + ], + indirect=True, +) +@pytest.mark.asyncio +async def test_ray_dag_oscar_clean_up_and_restore_func( + ray_start_regular_shared2, create_cluster +): + await test_local.test_execute_apply_closure(create_cluster) diff --git a/mars/services/context.py b/mars/services/context.py index 3085a9ed9d..b2d9095fcf 100644 --- a/mars/services/context.py +++ b/mars/services/context.py @@ -23,7 +23,7 @@ from ..core.context import Context from ..storage.base import StorageLevel from ..typing import BandType, SessionType -from ..utils import implements, is_ray_address, new_random_id +from ..utils import implements, is_ray_address from .cluster import ClusterAPI, NodeRole from .session import SessionAPI from .storage import StorageAPI @@ -193,22 +193,6 @@ async def _fetch_chunks(self, data_keys: List[str]): ) await storage_api.fetch.batch(*fetches) - def storage_put(self, obj: object): - key = new_random_id(32) - self._call(self._storage_put(key=key, obj=obj)) - return key - - async def _storage_put(self, key: str, obj: object): - storage_api = await StorageAPI.create(self.session_id, self.supervisor_address) - return await storage_api.put(key, obj) - - def storage_get(self, key: str): - return self._call(self._storage_get(key)) - - async def _storage_get(self, key: str): - storage_api = await StorageAPI.create(self.session_id, self.supervisor_address) - return await storage_api.get(key) - @implements(Context.get_chunks_result) def get_chunks_result(self, data_keys: List[str], fetch_only: bool = False) -> List: if not fetch_only: diff --git a/mars/services/storage/supervisor/__init__.py b/mars/services/storage/supervisor/__init__.py index 431bd18cb6..72f31ef6d9 100644 --- a/mars/services/storage/supervisor/__init__.py +++ b/mars/services/storage/supervisor/__init__.py @@ -12,4 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .service import StorageSupervisorService +from ...core import EmptyService + + +class StorageSupervisorService(EmptyService): + pass diff --git a/mars/services/storage/supervisor/service.py b/mars/services/storage/supervisor/service.py deleted file mode 100644 index 408f3f7209..0000000000 --- a/mars/services/storage/supervisor/service.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .... import oscar as mo -from ...core import AbstractService -from ..core import StorageManagerActor - - -class StorageSupervisorService(AbstractService): - """ - Storage service on supervisor - - Service Configuration - --------------------- - { - "storage": { - "backends": ["plasma"], - "": "", - } - } - """ - - async def start(self): - storage_configs = self._config["storage"] - backends = storage_configs.get("backends") - options = storage_configs.get("default_config", dict()) - transfer_block_size = options.get("transfer_block_size", None) - backend_config = {} - for backend in backends: - storage_config = storage_configs.get(backend, dict()) - backend_config[backend] = storage_config - - await mo.create_actor( - StorageManagerActor, - backend_config, - transfer_block_size, - uid=StorageManagerActor.default_uid(), - address=self._address, - ) - - async def stop(self): - await mo.destroy_actor( - mo.create_actor_ref( - address=self._address, uid=StorageManagerActor.default_uid() - ) - ) diff --git a/mars/utils.py b/mars/utils.py index 14852ff40a..650fe5e491 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -1703,6 +1703,31 @@ def is_ray_address(address: str) -> bool: return False +# TODO: clean_up_func, is_on_ray and restore_func functions may be +# removed or refactored in the future to calculate func size +# with more accuracy as well as address some serialization issues. +def is_on_ray(ctx): + from .services.task.execution.ray.context import ( + RayExecutionContext, + RayExecutionWorkerContext, + ) + + # There are three conditions + # a. mars backend + # b. ray backend(oscar), c. ray backend(dag) + # When a. or b. is selected, ctx is an instance of ThreadedServiceContext. + # The main difference between them is whether worker address matches ray scheme. + # To avoid duplicated checks, here we choose the first worker address. + # When c. is selected, ctx is an instance of RayExecutionContext or RayExecutionWorkerContext, + # while get_worker_addresses method isn't currently implemented in RayExecutionWorkerContext. + try: + worker_addresses = ctx.get_worker_addresses() + except AttributeError: # pragma: no cover + assert isinstance(ctx, RayExecutionWorkerContext) + return True + return isinstance(ctx, RayExecutionContext) or is_ray_address(worker_addresses[0]) + + def cache_tileables(*tileables): from .core import ENTITY_TYPE