Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow communication between Dask actors and the runner node #113

Open
landmanbester opened this issue Aug 11, 2024 · 0 comments
Open

Slow communication between Dask actors and the runner node #113

landmanbester opened this issue Aug 11, 2024 · 0 comments

Comments

@landmanbester
Copy link
Collaborator

The backward step requires communicating large dense arrays from the worker to the runner node. The following simple reproducer is meant to illustrate that the communication from the workers to the runner takes a significant amount of time

import sys
import numpy as np
from time import time
import dask
dask.config.set({
        'distributed.comm.compression': {
            'on': False,
            'type': 'lz4'
        }
})

class band_actor(object):
    def __init__(self, nx, ny, nbasis, bid):
        self.nx = nx
        self.ny = ny
        self.nbasis = nbasis
        self.bid = bid

    def do_something(self, x):
        ti = time()
        y = np.random.randn(self.nbasis, self.nx, self.ny)
        print(f'band {self.bid} = ', time() - ti)
        return y

def main(nband, nbasis, nx, ny):
    from dask.distributed import Client, LocalCluster
    cluster = LocalCluster(processes=True,
                            n_workers=nband,
                            threads_per_worker=1,
                            memory_limit=0,  # str(mem_limit/nworkers)+'GB'
                            asynchronous=False)
    client = Client(cluster, direct_to_workers=True)
    client.wait_for_workers(nband)

    _main(nband, nbasis, nx, ny)

def _main(nband, nbasis, nx, ny):
    from distributed import get_client, as_completed
    client = get_client()
    names = list(client.scheduler_info()['workers'].keys())
    futures = []
    for wname, bandid in zip(names, range(nband)):
        f = client.submit(band_actor,
                          nx,
                          ny,
                          nbasis,
                          bandid,
                          workers=wname,
                          actor=True,
                          pure=False)
        futures.append(f)

    actors = list(map(lambda f: f.result(), futures))

    niter = 2
    for i in range(niter):
        ti = time()
        ratio = np.random.randn(nbasis, nx, ny)
        print('ratio = ', time() - ti)

        ti = time()
        futures = list(map(lambda a: a.do_something(ratio), actors))
        print('submit = ', time() - ti)
        ti = time()
        results = []
        for fut in as_completed(futures):
            results.append(fut.result())
        print('result = ', time() - ti)



if __name__=='__main__':
    nband = int(sys.argv[1])
    nbasis = int(sys.argv[2])
    nx = int(sys.argv[3])
    ny = int(sys.argv[4])
    print(nband, nbasis, nx, ny)
    main(nband, nbasis, nx, ny)

Sample output from a representative size problem is the following

$ python actor_transfers.py 10 4 8192 8192
10 4 8192 8192
ratio =  6.603142023086548
submit =  0.0007979869842529297
band 0 =  6.500479459762573
band 1 =  6.201739549636841
band 6 =  6.064023017883301
band 9 =  6.040329694747925
band 5 =  6.069493055343628
band 4 =  5.986522674560547
band 2 =  6.046694040298462
band 3 =  6.001271724700928
band 7 =  6.132995128631592
band 8 =  6.157159090042114
result =  19.300325870513916
ratio =  6.187124967575073
submit =  0.0002925395965576172
band 0 =  6.295117139816284
band 1 =  6.209485292434692
band 2 =  6.173954486846924
band 6 =  6.105499267578125
band 4 =  6.307347774505615
band 8 =  6.160896301269531
band 3 =  6.261183261871338
band 5 =  6.0839598178863525
band 9 =  6.098883867263794
band 7 =  6.115009784698486
result =  20.101259231567383

Seems to me that the transfers take approximately 12-13s for arrays of this size. In the realistic case where each worker has to do a number of FFTs and wavelet transforms (on arrays of the same size) this is about 40% of the total time spend per iteration. Looking at htop I see one single (red) thread during the transfer. It might be possible to use shared memory to avoid the transfer on a single node but I'm not sure if it is possible to get around this in the distributed case. Switching the compression on or off doesn't seem to make a difference. If the client doesn't connect directly to workers the total time goes from 20s to about 40s per iteration which I guess makes a degree of sense. Pinging @sjperkins @JSKenyon @o-smirnov for inspiration

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant