Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy mean, sum etc? #34

Open
rafaqz opened this issue Jun 4, 2021 · 17 comments
Open

Lazy mean, sum etc? #34

rafaqz opened this issue Jun 4, 2021 · 17 comments

Comments

@rafaqz
Copy link
Collaborator

rafaqz commented Jun 4, 2021

It seems possible to add a reducing function application wrapper that will take e.g. the mean/sum etc along an axis lazily, similar to how lazy broadcasting works.

@meggart
Copy link
Owner

meggart commented Jun 10, 2021

I think this would be possible, yes from looking at the imp,ementation of reductions,

DiskArrays.jl/src/ops.jl

Lines 79 to 116 in 234420f

quote
function Base._mapreduce(f,op,v::$t)
mapreduce(op,eachchunk(v)) do cI
a = v[toRanges(cI)...]
mapreduce(f,op,a)
end
end
function Base.mapreducedim!(f, op, R::AbstractArray, A::$t)
sR = size(R)
foreach(eachchunk(A)) do cI
a = A[toRanges(cI)...]
ainds = map((cinds, arsize)->arsize==1 ? Base.OneTo(1) : cinds, toRanges(cI),size(R))
#Maybe the view into R here is problematic and a copy would be faster
Base.mapreducedim!(f,op,view(R,ainds...),a)
end
R
end
function Base.mapfoldl_impl(f, op, nt::NamedTuple{()}, itr::$t)
cc = eachchunk(itr)
isempty(cc) && return Base.mapreduce_empty_iter(f, op, itr, IteratorEltype(itr))
Base.mapfoldl_impl(f,op,nt,itr,cc)
end
function Base.mapfoldl_impl(f, op, nt::NamedTuple{()}, itr::$t, cc)
y = first(cc)
a = itr[toRanges(y)...]
init = mapfoldl(f,op,a)
Base.mapfoldl_impl(f,op,(init=init,),itr,Iterators.drop(cc,1))
end
function Base.mapfoldl_impl(f, op, nt::NamedTuple{(:init,)}, itr::$t, cc)
init = nt.init
for y in cc
a = itr[toRanges(y)...]
init = mapfoldl(f,op,a,init=init)
end
init
end
end
at least mapreduce and mapfoldl should be possible. However, mapreducedim! might be a bit more tricky, because at the stage where we enter the DiskArray method, the output array is already pre-allocated. But in principle, having a ReducedDiskArray would be possible.

However, in the last months I have been repeatedly looking at Dagger.jl and thinking about how to best integrate it into DiskArrays computations. I personally instead of adding more DiskArray subtypes would prefer to work towards a DaggerDiskArray where we simply have delayed representations of each chunk of the data and then we can lazily stack computations subsets etc, similar to how dask+xarray work in the python world. The big advantage would be to get distributed chunk processing for free and we would (reductions and broadcast), so there might be a big potential in this. What do you think?

@rafaqz
Copy link
Collaborator Author

rafaqz commented Jun 10, 2021

That sounds better, my idea is basically poor mans Dagger.jl. I've also been looking at Dagger.jl. A DaggerDiskArray would get GeoData.jl to/past feature parity with xarray.

@meggart
Copy link
Owner

meggart commented Jun 18, 2021

I have a first version of a to_dagger function, which simply converts any chunked DiskArray into a Dagger.DArray, their storage models are quite similar:

using Dagger, Zarr, DiskArrays
using DiskArrays: eachchunk
using Dagger: ArrayDomain, DArray, DomainBlocks

function to_dagger(a::AbstractDiskArray)
    dr = delayed(r->a[r.indices...])
    t = dr.(eachchunk(a))
    cs = eachchunk(a)
    ard = DomainBlocks(cs.offset .+ 1,map((s,l)->cumsum(fill(l,s)),cs.chunkgridsize,cs.chunksize))
    DArray(
        eltype(a), 
        Dagger.domain(a),
        ard,
        dr.(eachchunk(a))
    )
end

Now you cab stack lazy operations on this:

a = zzeros(Float32,1000,1000,chunks=(100,100))
a[:,:] = rand(Float32,1000,1000);

a2 = to_dagger(a)

someop = sum(sin.(a2 ./ 100), dims=2)

and then you can use collect(someop) to do the computation. What would still be missing is to use DiskArrays as sinks as well, in this case probably only Zarr would work because of parallel write, or as an alternative writing to a single netcdf per chunk.

So I am currently trying to figure out a way to find out the chunk sizes of these lazy operations so that this can be done automatically.

@meggart
Copy link
Owner

meggart commented Jun 18, 2021

Pinging @visr here, who has shown interest in coupling Zarr to Dagger before, also @felixcremer.

@rafaqz
Copy link
Collaborator Author

rafaqz commented Jun 18, 2021

Wow that seems so simple. Probably just writing multiple files is a reasonable option for formats other than zarr.

@visr
Copy link
Contributor

visr commented Jun 18, 2021

Great to see that mapping to Dagger is possible in such a simple function.

Probably just writing multiple files is a reasonable option for formats other than zarr.

I guess this is where https://github.com/shashi/FileTrees.jl could maybe help?

I see there is also some discussion on JuliaParallel/Dagger.jl#214 and experimentation at https://github.com/JuliaParallel/DaggerArrays.jl.

@rafaqz
Copy link
Collaborator Author

rafaqz commented Jun 18, 2021

For netcdf parallel writes there is also PnetCDF oops that's the old format, it just uses HDF5 MPI, but someone would have to wrap it.

@visr
Copy link
Contributor

visr commented Jun 18, 2021

Yeah there's this issue about parallel NetCDF I/O: Alexander-Barth/NCDatasets.jl#122. Am not familiar with it myself.

@meggart
Copy link
Owner

meggart commented Jun 18, 2021

I have a first version of a to_zarr, I am not sure if this is how it should be done, I am still learning to understand the Dagger internals, but at least this works and does parallel write:

function to_zarr(a::Dagger.ArrayOp, 
        outpath; 
        compressor = Zarr.BloscCompressor, 
        attrs = Dict())
    #First we stage the computation to determine the output chunk sizes
    staged_tosave = Dagger.stage(Dagger.Context(), a)
    doms = staged_tosave.subdomains
    #Now we compute the single output chunks from the BlockDomain
    chunksizes = unique.(diff.(vcat.(doms.start .- 1, doms.cumlength)))
    @assert all(length.(chunksizes).==1)
    chunksizes = first.(chunksizes)
    #Create the output
    outarray = zcreate(eltype(staged_tosave),size(staged_tosave)...; 
        path = outpath, 
        chunks = chunksizes
    )
    #Make a lazy representation of the writing process
    r = map(staged_tosave.chunks, staged_tosave.subdomains) do data, dom
        f = delayed() do d
            println("Process ", myid(), "writing at ", dom.indexes)
            outarray[dom.indexes...] = d
            return true
        end
        f(data)
    end
    #And merge the results
    @assert collect(reduce(+, r)) == length(staged_tosave.chunks)
    return outarray
end

Then you can do

a = zzeros(Float32,1000,1000,chunks=(100,500))
a[:,:] = rand(Float32,1000,1000);

a2 = to_dagger(a)

someop = sum(sin.(a2 ./ 1000), dims=2)

to_zarr(someop, tempname())

And the results get written to disk. I will do more tests, but if this works all as expected we could stop maintaining all the reduction and broadcast code here in DiskArrays and make Dagger the default backend. Then we would use this package only to provide the interface between the different DiskArrays as was planned initially. Probably we should discuss this in a separate issue.

@meggart
Copy link
Owner

meggart commented Jun 18, 2021

Although I am not completely convinced. The following starts reading the data:

cmip6 = zopen("gs://cmip6/CMIP6/HighResMIP/CMCC/CMCC-CM2-HR4/highresSST-present/r1i1p1f1/6hrPlev/psl/gn/v20170706/")

psl = to_dagger(cmip6["psl"])

view(psl,:,:,1:1000)

still some work to do as it seems

@rafaqz
Copy link
Collaborator Author

rafaqz commented Jun 18, 2021

Awesome. Will be good to see if we can get wrappers to work with that as well, to e.g. save xarray/zarr encoding at the same time.

I wonder if depending on Dagger here is a good idea:

julia> @time using Dagger
  0.773514 seconds (1.59 M allocations: 106.885 MiB, 8.10% gc time)

(separately)

julia> @time using DiskArrays
  0.256858 seconds (514.44 k allocations: 30.299 MiB, 9.24% compilation time)

Might be better to have DiskArraysDagger.jl as a subpackage here?

@rafaqz
Copy link
Collaborator Author

rafaqz commented Jun 20, 2021

I've been thinking about this how parallel dagger backend and writing to GDAL will work if Dagger.jl was the default, as I'm making a lot of use of generic DiskArrays broadcasts to write to files in GeoData.jl. I guess we can use defaults that are single threaded, or we can set the threading based on user input or the file types in the broadcast?

Maybe a trait specifying if an AbstracDiskArray can do parallel writes would help here e.g. writemode(::SomDiskArray) = Parallel(). And maybe we could allow users to specify GPU() parallelism.

@jpsamaroo
Copy link

We're having a discussion at JuliaParallel/Dagger.jl#230 (comment) for planning swap-to-disk support in Dagger. My latest reply (JuliaParallel/Dagger.jl#230 (comment)) details a system by which the scheduler (and also interested users/libraries) can instruct the scheduler to move data to-and-from disk or other persistent storage.

One could construct tasks in the DAG that have "storage chunks" as inputs, and Dagger would automatically read them into memory and move them to the processor that will be operating on them. That task could then return a new storage chunk, which would result in the result being moved to the worker associated with the storage and data being written to disk there.

Does that sound like the sort of API that would be useful to have for DiskArrays? Is there anything this API lacks that you would need?

@jpsamaroo
Copy link

One complication to the above would be the use of mmap for efficient reads/writes. I think we can solve this with "scopes", which are constraints on tasks/data that prevent operations outside of the scope (where the scope could be per-thread, per-process, per-node, etc.). We could have a Storage subtype which represents an mmap-ed file, which would (upon construction) result in a per-node scope being set on the chunk. This would force all tasks operating on that data to be co-located onto that node, which would prevent accidentally serializing that data to another node.

@meggart
Copy link
Owner

meggart commented Jul 30, 2021

Thanks a lot for joining the discussion @jpsamaroo

One could construct tasks in the DAG that have "storage chunks" as inputs, and Dagger would automatically read them into memory and move them to the processor that will be operating on them. That task could then return a new storage chunk, which would result in the result being moved to the worker associated with the storage and data being written to disk there.

A short comment on this one. I think for the DiskArrays data model the workflow would be a bit simpler because we do not support distributed storage at all. In most climate/earth system science applications (and all I have been in involved in), we have our input data on shared file systems or stored in cloud buckets, so one can asume significant latency for single IO operations, but all workers have equal access to the same storage.

So our main interface would not be mmap's and intermediate storage, but rather the very beginning of the processing chain (as proposed in my first example, defining one chunks/thunk for each sub-domain of the input data) or at the very end (dumping a structured set of thunks into a single coherent output dataset).

So what I proposed was really just similar to dask's zarr interface as described here: https://docs.dask.org/en/latest/array-creation.html?highlight=zarr#zarr

I will read the discussion you linked and see if I can comment,

@meggart
Copy link
Owner

meggart commented Jan 13, 2022

It looks like the most recent developments in Dagger are to do all operations in an eager way instead of lazy, so probably the coupling to Dagger, although interesting, will not really help with lazy sums. I think it will not be too hard to add a ReducedDiskArray type where we can have lazy representations of Sums and Means. Will try in the next weeks.

@jpsamaroo
Copy link

@meggart for my understanding, the purpose of this package is to amortize the high cost of disk access by fusing operations (so that you ideally only do one read/write syscall for each chunk of the data), correct? If so, I agree that Dagger is not yet suitable for integration here, and will continue moving in the direction of an eager execution model. However, I also have plans to integrate Dagger with the Symbolics ecosystem, to allow rewriting and fusing parts of the DAG so that such optimizations could be applied automatically. Together with a new Dagger operator to delay DAG execution while a user-specified sub-graph is being built (which I want for a variety of reasons), we should be able to fuse operations to achieve single-access behavior.

I'll definitely need some time before I can get to implementing this functionality, but once I do, we should hopefully be able to create a set of rewrite rules that use DiskArrays.jl to do this fusion of "naive" array operations. I'll ping you once I've gotten those features working, and maybe then we can collaborate on making DiskArrays and Dagger work together 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants