Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split apply combine #45

Draft
wants to merge 53 commits into
base: legacy
Choose a base branch
from
Draft

Split apply combine #45

wants to merge 53 commits into from

Conversation

cisaacstern
Copy link
Collaborator

@cisaacstern cisaacstern commented Jun 24, 2024

Closes #28

@cisaacstern cisaacstern changed the base branch from main to data-connections June 24, 2024 03:05
@cisaacstern
Copy link
Collaborator Author

cisaacstern commented Jun 24, 2024

@walljcg here's a start on how we can leverage lithops map reduce (as demonstrated in #28 (comment)) in the context of ecoscope workflows/tasks. The core task is here:

@distributed
def map_reduce(
groups: Annotated[list[tuple[str, ...]], Field()],
# lets asssume that when we call map_reduce, we have already set the
# arg_prevalidators and return_postvalidator for the mappers and reducer
# so by the time lithops sees them, they are ready to call
mappers: Annotated[list[tuple[DistributedTask, dict]], Field()],
reducer: Annotated[DistributedTask, Field()],
reducer_kwargs: Annotated[dict, Field(default_factory=dict)],
):
import lithops
# Configure the compute backend (local, cloud, etc.) with a configuration file:
# https://lithops-cloud.github.io/docs/source/configuration.html#configuration-file
fexec = lithops.FunctionExecutor()
def fused_mapper(element):
for i, (mapper, kwargs) in enumerate(mappers):
if i == 0:
result = mapper(element, **kwargs)
else:
result = mapper(result, **kwargs)
return result
fexec.map_reduce(
map_function=fused_mapper,
map_iterdata=groups,
reduce_function=reducer,
extra_args_reduce=reducer_kwargs,
)
return fexec.get_result()

And here is an example of what it might look like compiled into a runnable workflow:

# this can parallelize on local threads, gcp cloud run, or other cloud serverless
# compute backends, depending on the lithops configuration set at runtime.
map_reduce_return = map_reduce(
groups=split_groups_return,
mappers=[
(
draw_ecomap.replace(
# the dataframe is given here as a list of URIs to parquet files,
# so we need to deserialize it back into a geodataframe
arg_prevalidators={"geodataframe": gpd_from_parquet_uri},
return_postvalidator=functools.partial(
html_text_to_uri,
uri=os.environ["ECOSCOPE_WORKFLOWS_TMP"],
),
validate=True,
),
set_map_styles_return,
),
(
map_to_widget.replace(validate=True),
{},
),
],
reducer=gather_dashboard,
reducer_kwargs=set_groupers_return,
)

💡 Note that this does not yet run, but is more-so one step above pseudocode. Among other things, of course we don't actually need to split-apply-combine for time density maps in this way, I'm just using that as a toy example here.

In terms of where a script like this could run, if it runs locally, it can parallelize across python processes. In the cloud, we could package this script itself as a "launcher" serverless function on GCP Cloud Run, which once it gets to the lithops map-reduce section, would spawn additional Cloud Run functions. The "launcher" function (running this script) would then wait for all of the parallelized tasks to complete, gather the result, and send it back to wherever we need it (database api call to the server maybe).

Note also that this pattern (launch lithops from another cloud function) is conceptually almost identical to the Lithops Airflow Operator. (Which we may also want to use in the future for more complex DAGs, but "deploy lithops from a cloud function" is definitely lower latency for "easy/small" DAGs, as we've discussed.)

I've also iterated a bit on the workflow YAML spec design, so we can represent the map reduce operation there something like this:

- name: Map reduce
id: map-reduce
uses: ecoscope_workflows.tasks.parallelism.map_reduce
with:
groups: split-groups.return
mappers:
- uses: ecoscope_workflows.tasks.results.draw_ecomap
with:
# kws is a reserved name that indicates the value
# will be a dict and should be unpacked as kwargs
kws: set-map-styles.return
- uses: ecoscope_workflows.tasks.results.map_to_widget
with: {}
reducer:
uses: ecoscope_workflows.tasks.results.gather_dashboard
with:
groupers: set-groupers.return

To make this more legible, I'm experimenting with borrowing the GitHub Actions data structure of:

- name: "a human readable name"
  id: |
    # a unique id. we were using the task names for this before,
    # but to support reuse of the same task within a workflow, we'll need an `id`
  uses:  # the action name in github, or for us, the task importable reference
  from: |
    # i'm not actually using this here, and i don't think it's part of GitHub Actions,
    # but i thought this might be a nice way to include the github or pypi path for
    # extension task packages, which could be dynamically installed in a venv at compile
    # time (like pre-commit). https://github.com/moradology/venvception is a nice little
    # package that does this (ephemeral venvs), that a former collaborator wrote for our
    # last project.
  with:
    # kwargs to pass to the task

@cisaacstern cisaacstern changed the base branch from data-connections to main June 25, 2024 16:29
@atmorling atmorling mentioned this pull request Jul 10, 2024
@cisaacstern
Copy link
Collaborator Author

TODO:

e.g.

"filters": [
    {
      "title": "Animal Name",
      "key": "animal_name",
      "oneOfEnum": { 
          "type": "string",
          "oneOf": [
             {
               "const": "Ao",
               "title": "Ao"
              },
             {
               "const": "Bo",
               "title": "Bo"
             }
         ]
     }

@cisaacstern
Copy link
Collaborator Author

From discussion with @juanlescano-ng, for compatibility with react-jsonschema-form, we actually want to adjust the filters section so that it looks like this:

{
  "schema": {
  "type": "object",
  "properties": {
    "month": {
      "type": "string",
      "enum": [
        "January",
        "February"
      ],
      "enumNames": [
        "January",
        "February"
      ],
      "default": "January"
    },
    "animal_name": {
      "type": "string",
      "enum": [
        "Ao",
        "Bo"
      ],
      "enumNames": [
        "Ao",
        "Bo"
      ],
      "default": "Ao"
    }
  },
  "uiSchema": {
    "animal_name": {
      "ui:title": "Animal Name",
      "ui:emptyValue": "Ao",
      "ui:help": "The name of the elephant"
    },
    "month": {
      "ui:title": "Month",
      "ui:help": "The month of the year."
    }
  }
}

this can be demonstrated by copy-and-paste here:

https://rjsf-team.github.io/react-jsonschema-form/

@cisaacstern
Copy link
Collaborator Author

Here's a demo of this in action:

Screen.Recording.2024-07-16.at.10.52.00.AM.mov

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

POC: Split-Apply-Combine using lithops
2 participants