From 10ee2770f51578cd98fc06972a762aa77c05d226 Mon Sep 17 00:00:00 2001 From: Abhishek Divekar Date: Mon, 23 Sep 2024 20:56:51 +0530 Subject: [PATCH] Added dispatch_apply and error to Parameters creation. --- .../base/framework/trainer/RayTuneTrainer.py | 1 + src/synthesizrr/base/util/concurrency.py | 118 +++++++++++++++++- src/synthesizrr/base/util/language.py | 64 ++++++++-- 3 files changed, 172 insertions(+), 11 deletions(-) diff --git a/src/synthesizrr/base/framework/trainer/RayTuneTrainer.py b/src/synthesizrr/base/framework/trainer/RayTuneTrainer.py index 5baf96f..0f172bd 100644 --- a/src/synthesizrr/base/framework/trainer/RayTuneTrainer.py +++ b/src/synthesizrr/base/framework/trainer/RayTuneTrainer.py @@ -1541,6 +1541,7 @@ def get_final_metrics_stats( continue final_dataset_metrics[dataset_metric.display_name]: Dict[str, Union[int, float, Dict]] = { 'mean': np.mean(final_dataset_metrics[dataset_metric.display_name]), + 'median': np.median(final_dataset_metrics[dataset_metric.display_name]), 'std': np.std(final_dataset_metrics[dataset_metric.display_name], ddof=1), ## Unbiased 'min': np.min(final_dataset_metrics[dataset_metric.display_name]), 'max': np.max(final_dataset_metrics[dataset_metric.display_name]), diff --git a/src/synthesizrr/base/util/concurrency.py b/src/synthesizrr/base/util/concurrency.py index e08cda3..efa3694 100644 --- a/src/synthesizrr/base/util/concurrency.py +++ b/src/synthesizrr/base/util/concurrency.py @@ -15,10 +15,10 @@ import ray from ray.exceptions import GetTimeoutError from ray.util.dask import RayDaskCallback -from pydantic import validate_arguments, conint, confloat -from synthesizrr.base.util.language import ProgressBar, set_param_from_alias, type_str, get_default, first_item, Parameters +from pydantic import conint, confloat +from synthesizrr.base.util.language import ProgressBar, set_param_from_alias, type_str, get_default, first_item, Parameters, \ + is_list_or_set_like, is_dict_like, PandasSeries, filter_kwargs from synthesizrr.base.constants.DataProcessingConstants import Parallelize, FailureAction, Status, COMPLETED_STATUSES - from functools import partial ## Jupyter-compatible asyncio usage: import asyncio @@ -445,6 +445,117 @@ def dispatch_executor( return None +def dispatch_apply( + struct: Union[List, Tuple, np.ndarray, PandasSeries, Set, frozenset, Dict], + *args, + fn: Callable, + parallelize: Parallelize, + forward_parallelize: bool = False, + item_wait: Optional[float] = None, + iter_wait: Optional[float] = None, + iter: bool = False, + **kwargs +) -> Any: + parallelize: Parallelize = Parallelize.from_str(parallelize) + item_wait: float = get_default( + item_wait, + { + Parallelize.ray: _RAY_ACCUMULATE_ITEM_WAIT, + Parallelize.processes: _LOCAL_ACCUMULATE_ITEM_WAIT, + Parallelize.threads: _LOCAL_ACCUMULATE_ITEM_WAIT, + Parallelize.asyncio: 0.0, + Parallelize.sync: 0.0, + }[parallelize] + ) + iter_wait: float = get_default( + iter_wait, + { + Parallelize.ray: _RAY_ACCUMULATE_ITER_WAIT, + Parallelize.processes: _LOCAL_ACCUMULATE_ITER_WAIT, + Parallelize.threads: _LOCAL_ACCUMULATE_ITER_WAIT, + Parallelize.asyncio: 0.0, + Parallelize.sync: 0.0, + }[parallelize] + ) + if forward_parallelize: + kwargs['parallelize'] = parallelize + executor: Optional = dispatch_executor( + parallelize=parallelize, + **kwargs, + ) + try: + set_param_from_alias(kwargs, param='progress_bar', alias=['progress', 'pbar'], default=True) + progress_bar: Union[ProgressBar, Dict, bool] = kwargs.pop('progress_bar', False) + submit_pbar: ProgressBar = ProgressBar.of( + progress_bar, + total=len(struct), + desc='Submitting', + prefer_kwargs=False, + unit='item', + ) + collect_pbar: ProgressBar = ProgressBar.of( + progress_bar, + total=len(struct), + desc='Collecting', + prefer_kwargs=False, + unit='item', + ) + if is_list_or_set_like(struct): + futs = [] + for v in struct: + def submit_task(item, **dispatch_kwargs): + return fn(item, **dispatch_kwargs) + + futs.append( + dispatch( + fn=submit_task, + item=v, + parallelize=parallelize, + executor=executor, + delay=item_wait, + **filter_kwargs(fn, **kwargs), + ) + ) + submit_pbar.update(1) + elif is_dict_like(struct): + futs = {} + for k, v in struct.items(): + def submit_task(item, **dispatch_kwargs): + return fn(item, **dispatch_kwargs) + + futs[k] = dispatch( + fn=submit_task, + key=k, + item=v, + parallelize=parallelize, + executor=executor, + delay=item_wait, + **filter_kwargs(fn, **kwargs), + ) + submit_pbar.update(1) + else: + raise NotImplementedError(f'Unsupported type: {type_str(struct)}') + submit_pbar.success() + if iter: + return accumulate_iter( + futs, + item_wait=item_wait, + iter_wait=iter_wait, + progress_bar=collect_pbar, + **kwargs + ) + else: + return accumulate( + futs, + item_wait=item_wait, + iter_wait=iter_wait, + progress_bar=collect_pbar, + **kwargs + ) + finally: + stop_executor(executor) + + def get_result( x, *, @@ -785,7 +896,6 @@ def wait( wait_if_future(futures) -@validate_arguments def retry( fn, *args, diff --git a/src/synthesizrr/base/util/language.py b/src/synthesizrr/base/util/language.py index d216242..340c3a5 100644 --- a/src/synthesizrr/base/util/language.py +++ b/src/synthesizrr/base/util/language.py @@ -602,13 +602,19 @@ def _create_param(p: inspect.Parameter) -> inspect.Parameter: wrapper.__signature__ = sig wrapper.__annotations__ = {f"{n}_" if n in names_to_fix else n: v for n, v in f.__annotations__.items()} - return validate_arguments( - wrapper, - config={ - "allow_population_by_field_name": True, - "arbitrary_types_allowed": True, - } - ) + try: + return validate_arguments( + wrapper, + config={ + "allow_population_by_field_name": True, + "arbitrary_types_allowed": True, + } + ) + except Exception as e: + raise ValueError( + f'Error creating model for function {get_fn_spec(f).resolved_name}.' + f'\nEncountered Exception: {format_exception_msg(e)}' + ) def not_impl( @@ -1531,6 +1537,27 @@ def invert_dict(d: Dict) -> Dict: return d_inv +def iter_dict(d, depth: int = 1, *, _cur_depth: int = 0): + """ + Recursively iterate over nested dictionaries and yield keys at each depth. + + :param d: The dictionary to iterate over. + :param depth: The current depth of recursion (used for tracking depth of keys). + :return: Yields tuples where the first elements are keys at different depths, and the last element is the value. + """ + assert isinstance(d, dict), f'Input must be a dictionary, found: {type(d)}' + assert isinstance(depth, int) and depth >= 1, f'depth must be an integer (1 or more)' + + for k, v in d.items(): + if isinstance(v, dict) and _cur_depth < depth - 1: + # If the value is a dictionary, recurse + for subkeys in iter_dict(v, _cur_depth=_cur_depth + 1, depth=depth): + yield (k,) + subkeys + else: + # If the value is not a dictionary, yield the key-value pair + yield (k, v) + + ## ======================== NumPy utils ======================== ## def is_numpy_integer_array(data: Any) -> bool: if not isinstance(data, np.ndarray): @@ -2625,6 +2652,15 @@ class Parameters(BaseModel, ABC): aliases: ClassVar[Tuple[str, ...]] = tuple() dict_exclude: ClassVar[Tuple[str, ...]] = tuple() + def __init__(self, *args, **kwargs): + try: + super().__init__(*args, **kwargs) + except Exception as e: + raise ValueError( + f'Cannot create Pydantic instance of type "{self.class_name}".' + f'\nEncountered exception: {format_exception_msg(e)}' + ) + @classproperty def class_name(cls) -> str: return str(cls.__name__) ## Will return the child class name. @@ -3227,6 +3263,15 @@ def create_progress_bar( smoothing=smoothing, **kwargs ) + elif style == 'ray': + from ray.experimental import tqdm_ray + kwargs = filter_keys( + kwargs, + keys=set(get_fn_spec(tqdm_ray.tqdm).args + get_fn_spec(tqdm_ray.tqdm).kwargs), + how='include', + ) + from ray.experimental import tqdm_ray + return tqdm_ray.tqdm(**kwargs) else: return StdTqdmProgressBar( ncols=ncols, @@ -3311,6 +3356,11 @@ def ignore_all_output(): yield +@contextmanager +def ignore_nothing(): + yield + + # from pydantic import Field, AliasChoices # def Alias(*, default: Optional[Any] = None, alias: Union[Tuple[str, ...], List[str], Set[str], str]): # alias: AliasChoices = AliasChoices(*as_tuple(alias))