Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create example for the UKCEH GEAR-1hrly dataset #9

Open
mattjbr123 opened this issue Aug 15, 2024 · 14 comments · May be fixed by NERC-CEH/object_store_tutorial#5
Open

Create example for the UKCEH GEAR-1hrly dataset #9

mattjbr123 opened this issue Aug 15, 2024 · 14 comments · May be fixed by NERC-CEH/object_store_tutorial#5
Assignees

Comments

@mattjbr123
Copy link
Collaborator

mattjbr123 commented Aug 15, 2024

As part of the FDRI project work package 2 I am (co-)developing a "Gridded Data Tools" product that intends to allow easy conversion of gridded data to ARCO format, easy upload to object-storage, cataloguing of the data and easy access and analysis of this data.

I thought this object_store_tutorial repo would be a good place to start, at least for the "ingestion" stage (conversion and upload to object storage) replicating the workflow shown in the README, scripts and notebooks for the UKCEH GEAR-1hrly dataset. Initially away from any complicated cloud infrastructure like kubernetes, argo, airflow etc. and just running it manually locally or on JASMIN.

I'll attempt to document progress here, and add a further script and/or notebook to the repo in the respective folder. Development work will be on the 'GEAR' branch.

@mattjbr123 mattjbr123 self-assigned this Aug 15, 2024
@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 16, 2024

16/08/2024
Things I discovered today whilst adapting the recipe for the GEAR dataset:

  • Apache Beam can now run using a Dask distributed cluster. This is definitely an option to explore in FDRI. Not sure how developed it is yet though, as new.
  • Pangeo-forge-recipes now has a command line interface used to run recipes and implement changes to some of the configuration variables at runtime through use of a config file. Only some of the variables support being set this way, but the selection mostly made sense as things you'd want to change at runtime. The CLI may also only work when the recipe has been formatted specifically as a 'feedstock', if I read the docs correctly.
  • There is now a 'dynamic chunking' function that can be provided to the 'StoreToZarr' operator, which allows for chunk sizes that are not fix and can change throughout the file/dataset, depending on certain dimensions/attributes of the data. Most obvious appliation of this I can think of is for datasets which contain leap years. Some examples/development of this here?
  • There seems to be a greater ecosystem (well, 4 as opposed to none a year ago) of user-developed recipes now available on pangeo-forge (the idea of calling it pangeo-forge was to replicate elements of the conda-forge ecosystem). Worth exploring perhaps.

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 16, 2024

Next thing to do is to test out the adapted recipe with the GEAR dataset, running locally on the JASMIN sci machines first with a subset of the data, then with all the data on the LOTUS cluster. Using the 'DirectRunner' of Apache Beam, which is the un-optimized one-size-fits-all Runner.
If/when successful, maybe look into the new DaskRunner for Beam, as this may unlock writing direct to object storage without needing to get it working on Flink, or rewriting the codebase for other workflow engines (such as Airflow).

mattjbr123 referenced this issue in NERC-CEH/object_store_tutorial Aug 16, 2024
@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 22, 2024

22/08/2024

What I've found out after testing the adapted recipe.

  • The values of the key-value pairs in target_chunks have to be integer type
  • The netcdf files are more complicated with this dataset
  • They have multiple variables in them of different dimensions, e.g. the main data variable might have dimensions of (time, y, x) but the time_bounds variable would have dimensions of (time, 2).
  • The bounds variables are the main issue. These define the boundaries of the grid boxes and time 'boxes' of the data. We do ultimately want to keep them in the dataset as this is a requirement of CF-compliance, and it would be good to maintain this as much as possible when converted to ARCO
  • There must be a way to tell pangeo-forge-recipes how to handle the different recipes but it doesn't seem obvious to me at first glance and there are no examples mentioning this use-case, which seems quite an omission when a lot of NetCDF data researchers use will have this
  • The core problem at the moment is that pangeo-forge-recipes is expecting all the variables in the file to have the same dimensions specifed in the target_chunks kwarg. The bounds variables are not like this.

To do next:

  • Dig in to the pangeo-forge-recipes code to figure out where a sensible place might be to tell it about the other variables, and what to do with them. Investigate the dynamic chunking function which might be a starting point for this?
  • If not obvious, start a thread on the pangeo discussion forum

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 22, 2024

The solution might be as simple as adding the dimension to the target_chunks variable... It's not. pangeo-forge-recipes really does seem to require all the variables in the file to be the same size (have identical dimensions and dimension sizes).

This seems to be the relevant issue: pangeo-forge/pangeo-forge-recipes#644

From which we can narrow down the problem further to "pangeo-forge-recipes requires all variables that are not coordinates to have the concat_dim in them". The proposed solution would be fine if only the xarray open_dataset function (which pangeo-forge-recipes's OpenWithXarray function uses) accepted preprocess as a kwarg. In reality only xarray open_mfdataset accepts preprocess. So to implement this solution we would have to develop our own custom OpenWithXarray that uses open_mfdataset instead, which definitely seems overkill. Instead we might have to create our own Beam Transform function (similar to here) that does the same thing.

Might be worth putting a PR together to add in that suggested more helpful error

mattjbr123 referenced this issue in NERC-CEH/object_store_tutorial Aug 22, 2024
@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 23, 2024

23/08/2024

Today has been a day of trying to understand how 'preprocessors' might work in a Beam pipeline and pangeo-forge-recipes context. I've figured out the general struture and syntax of preprocessors, just through looking at multiple examples, but there's definitely a shortfall in the pangeo-forge-recipes documentation here.

I've been doing this so that I can add a preprocessor that converts the annoying extra variables in the netcdf files to coordinates so that the OutputToZarr pangeo-forge-recipes Beam PTransform can handle the dataset, as suggested above. I found an example in another recipe that should do more or less what I want.

The general structure of preprocessors seems to be:

from pangeo_forge_recipes.transforms import Indexed, T

# They are implemented as subclasses of the beam.PTransform class
class NameOfPreprocess(beam.PTransform):

    # not sure why it needs to be a staticmethod
    @staticmethod
    # the preprocess function should take in and return an object of type Indexed[T]
    # these are pangeo-forge-recipes derived types, internal to the functioning of the pangeo-forge-recipes transforms
    # I think they consist of a list of 2-item tuples, each containing some type of 'index' and a 'chunk' of the dataset or a reference to it, as can be seen in the first line of the function below
    def _name_of_preprocess(item: Indexed[T]) -> Indexed[T]:
        index, ds = item
        # do something to each ds chunk here 
        # and leave index untouched
        return index, ds

    # this expand function is a necessary part of developing your own Beam PTransforms, I think
    # it wraps the above preprocess function and applies it to the PCollection, i.e. all the 'ds' chunks in Indexed
    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(self._name_of_preprocess)

then in the pipeline:


recipe = (
        beam.Create(pattern.items())
        | OpenURLWithFSSpec()
        | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
        | NameOfPreprocess()
        | StoreToZarr(
            combine_dims=pattern.combine_dim_keys,
            target_root='.',
            store_name='out.zarr',
        )
    )

A cleaner alternative seems like it could be:

def preprocess(index, ds):
    return index, ds.expand_dims(dim="t") # or some other simple preprocess

and


recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(cache=None)
    | OpenWithXarray()
    | beam.MapTuple(preprocess)
        | StoreToZarr(
            combine_dims=pattern.combine_dim_keys,
            target_root='.',
            store_name='out.zarr',
    )
)

mattjbr123 referenced this issue in NERC-CEH/object_store_tutorial Aug 27, 2024
mattjbr123 referenced this issue in NERC-CEH/object_store_tutorial Aug 27, 2024
@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 27, 2024

Have now implemented the preprocesser, seems to be working, albeit taking a long time (~1hr) to process with a single core, even when pruned to only 12 files (which I've just noticed are in the wrong order, it's doing all the januaries from the first 12 years instead of the first 12 months of the first year...)

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 27, 2024

Key takeaway is that some manual writing of a preprocessing function is likely going to be necessary for each dataset, so plenty of training material on this should be made available. This could realistically just be in the form of: "all you need to worry about it is the bit inside the _name_of_preprocess function (what you actually want to do in your preprocessing of the data), the rest is just copy-and-paste-able around it"

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 28, 2024

28/08/2024 findings

The

  • time_coverage_start
  • time_coverage_end
  • history
  • date_created

attributes were dropped during the processing and don't appear in the final (zarr) version of the dataset. I guess this makes sense as the dataset has now been modified and these attributes need to change but there presumably is no code in pangeo-forge-recipes yet that can do this. The meaning of 'date_created' is also unclear after modification of the 'official' version of the dataset - is it the creation date of the original official version or the now-modified one?

Otherwise the datasets do appear identical at least at an 'ncdump' style first-glance

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Aug 29, 2024

29/08/2024 - 25/09/24

Getting a strange error when trying to run it in parallel, which I haven't seen before...

terminate called after throwing an instance of 'std::out_of_range'
  what():  basic_string::erase: __pos (which is 16) > this->size() (which is 15)

which is a C++ error in a basic erase function which is trying to erase index 16 of a string which doesn't exist because the string ends at index 15.
It's not clear where this error is happening, the rest of the text surrounding this line seems unhelpful.

It is something to do with my python environment - a particular version of a particular package must be breaking things as the original workflow for the G2G data runs fine in my old environment, but not in the new.

To figure out which package is the culprit I will clone the old environment and gradually update the key packages in it, until it breaks, at which point I can look and see what dependencies the new package required and repeat the process with these dependencies until I have it.

The problematic packages seem to be

  • pyarrow
  • protobuf
  • grpcio
    Pyarrow depends on the other two packages.

Fortunately, older versions of the packages seem to work, specifically:

  • pyarrow 8.0.1
  • protobuf 3.20.3
  • grpcio 1.47.1 or 1.46.4

Any pyarrow version above 8.0.1 seems to reproduce the error. This limits the environment to python<=3.10 which isn't ideal, but at least it works for now and I can proceed with the rest of the project...

I will spin this post off into an issue of it's own which merits further debugging, starting with the suggestions in the pangeo forums thread: https://discourse.pangeo.io/t/strange-error-using-pangeo-forge-recipes-apache-beam-in-parallel/4540

@mattjbr123
Copy link
Collaborator Author

Have posted a query in the pangeo discourse forums to see if anyone else there might be able to shed more light on the problem!
https://discourse.pangeo.io/t/strange-error-using-pangeo-forge-recipes-apache-beam-in-parallel/4540

@mattjbr123
Copy link
Collaborator Author

Successfully run with parallelisation with this environment.

Though I've now noticed that the pipeline has added a time dimension to the variables that we converted from data variables to coordinate variables in the preprocessing function (so the variables x_bnds, y_bnds and crs)

@mattjbr123
Copy link
Collaborator Author

There is some relevant discussion over at the UKCEH RSE Space: NERC-CEH/rse_group#21 (comment)

@mattjbr123
Copy link
Collaborator Author

mattjbr123 commented Oct 1, 2024

Only remaining thing to do on this particular issue is to check it runs on LOTUS on JASMIN

@mattjbr123
Copy link
Collaborator Author

All other issues have been spun off into their own issues

@mattjbr123 mattjbr123 transferred this issue from NERC-CEH/object_store_tutorial Oct 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

Successfully merging a pull request may close this issue.

1 participant