-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Making a worker use processes rather than cores #238
Comments
Hi, @fingoldo. We currently run just a single Dask worker process on each VM node. Given that Coiled makes it easy to use different instances types, we typically recommend that if you have GIL blocking code, you'd get better parallelism by running across many small VMs. For AWS, the smallest balanced non-burstable instance would be the So, supposing you want 10 workers, you might try |
Thanks a lot for replying Nat, your approach would still mean that out of 2 cores on the smallest instance, my code would run only on 1, right? It would be awkward to pay for 2 but be able to use only 1 (even if the 2nd is a hyperthreading core). For example, on a 2 vCPU t3.medium/us-east with default settings, my computations batch run in 10min 54s. Same instance/region, nthreads=1, spent 12min 48s. It's a bit weird since there is an obvious improvement, so the second thread is definitely helping (15% time reduction from NOT using nthreads=1), despite being HT and code being GIL-prone. I tried t3.xlarge with 4 vCPUs (with default settings) a few times, but the time did not go down much (9min 44s and 9min 55s), which meant it did not utilize the 2nd "true" core. In the dask dashboard I was seeing white gaps in the tasks flows on each of 4 "workers", probably indicating GIL blocks. So, adding 2 more threads (one of which was a true core) only helped by another ~13% wheras it could be a 50% win if the cores would be fully utilized. I checked on my laptop and I do get a benefit from hyperthreading as well, as some part of my computations are integer operations. I have i7-7700HQ with 4 real and 4 HT cores, processes=True, threads_per_worker=1, n_workers=4 results in 4min 7s and ~70% CPU load, whereas n_workers=8 runs in 3min 36s with 100% CPU load (13% advantage). So, your recommendation seems to be valid, thank you for it! Running with 1 process on 2 cores, one of which is HT seems to extract all horsepower from the could instance for my particular workload. But, there might be a lack of small instances at some provider momentarily, or big machines could be more cost-efficient (given that all the cores could be utilized) due to non-linear pricing and demand, or my code could impose some additional memory or GPU requirements. Plus if I need say 196 cores, instead of using 1 big machine I would have to pay for allocating and using of 98 VM images, which would unnecessarily add to storage costs. Basically, I would have to pay 2 orders of magnitude more for the storage for no reason, which would be not very lucrative. This does not feel right, as the "local" dask can already do the processes parallelization that I need with just a simple flag. The ease if spinning cloud instances through coiled would be not justified by the bigger storage bill and inability to get more diverse or cost-efficient servers. As a customer I'd say you need to look into it, as probably I'm not the only one who has pure python workloads. I found an older dask article mentioning nthreads (which you already support) and nprocs parameters:
I figured you might also support nprocs, just don't mention it in the docs. Tried looking for possible worker_options, could not find in the docs anything except nthreads. Could not locate the appropriate source codes as well. In the end, tried adding worker_options={"nprocs": 2} and starting a new cluster, it simply did not let the worker start (the scheduler was started though). Do I have to look into some kind of nested joblib or multiprocessing.Pool magic inside the function I need to run on a cluster if I want to use coiled with a bigger machine? |
@fingoldo you can also configure your workers to use a process pool to execute tasks (instead of the default thread pool). Here's a little snippet that demonstrates that in action: from concurrent.futures import ProcessPoolExecutor
import dask
from dask.distributed import WorkerPlugin
import coiled
# Needed when workers are using ProcessPoolExecutor
dask.config.set({"distributed.worker.daemon": False})
# Spin up a coiled cluster
cluster = coiled.Cluster(n_workers=10)
client = cluster.get_client()
# Swap out the default threadpool for a processpool on workers
class ProcessPoolWorkers(WorkerPlugin):
def setup(self, worker):
worker.executors["default"] = ProcessPoolExecutor(max_workers=worker.state.nthreads)
client.register_worker_plugin(ProcessPoolWorkers())
# Run some Dask code (replace with your actual code)
df = dask.datasets.timeseries(
"2000", "2005", partition_freq="2w"
).persist()
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute() I'd be curious to here if this helps with your performance issues in practice |
Great, glad that worked well enough for you!
This would surprise me. The small instances are just a smaller slice of the resources on a very large host. I don't think AWS cares much about how that host is sliced into VMs, so if you can get (say) 1/4 of the host as a single VM, you could just as well get 1/4 of the host as (say) 4 smaller VMs. I'm not sure that's how it works, but that's been my assumption and it fits my experience.
I agree those are potential issues. In my experience, those haven't been major concerns, and usually optimizing your workload at that level is going to have much less significant payoffs than other ways of optimizing (say, making code more efficient, or getting better utilization of spot instances—esp. since it's usually easier to get smaller rather than larger spot instances). But if you are running at sufficient scale that these sorts of optimizations make sense, lets talk! |
@jrbourbeau Unfortunately, proposed approach runs into error:
even though I ensured my_cluster_func definition is placed above |
It looks like import cloudpickle
cloudpickle.loads(cloudpickle.dumps(my_cluster_func)) passes or raises. |
it passes, actually. |
Interesting -- thanks for trying that out. If you're still interested in swapping out the threadpool for a processpool, do you have a full traceback / code you could share? |
I'll do it right away, thanks a tad! In the meanwhile, may I ask why Coiled devs decided to support in the worker_options nthreads, but not nprocs (as it was before) or nworkers (as it is supported now in dask.distributed)? It would be much easier if one of them would be supported. |
Coiled just forwards |
Wow, indeed, there is no such param. But I just recently run a command like "dask worker tcp://192.168.100.30:8786 --nworkers=4 --nthreads=1". and it worked the way I wanted. How does nworkers param works then? Sorry for my silly questions. If I change default worker class to Nanny (I saw it in the docs somewhere), will I be able to pass nworkers in worker_options?
and the worker still does not start. |
Ah, I see.
No, |
Sorry, it took me a while to prepare a clean reproducer. cloud.py import numpy as np
import math
def my_cloud_func(
input_array: np.ndarray,
):
# imagine some non gil-releasing computational load
total = 0.0
for x in input_array:
total += math.sin(x)
return total coiled_test.py from cloud import *
from concurrent.futures import ProcessPoolExecutor
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import WorkerPlugin
import cloudpickle
import logging
import coiled
import dask
def proxy_cloud_func(*args, **kwargs):
# for the function to be defined in-process. not sure if it's necessary.
from cloud import my_cluster_func
return my_cluster_func(*args, **kwargs)
def create_params_grid(
input_array: np.ndarray,
):
jobslist = []
for _ in range(1000):
jobslist.append(
delayed(proxy_cloud_func)(
input_array=input_array,
)
)
return jobslist
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format=("%(asctime)s - %(levelname)s - line:%(lineno)d - %(message)s"),
)
input_array = np.random.random(size=300_000)
use_dask = True
logging.info(f"Is my cluster func pickleable? { 'Yes' if cloudpickle.loads(cloudpickle.dumps(my_cluster_func)) else 'No' }")
logging.info(f"Is the proxy of my cluster func pickleable? { 'Yes' if cloudpickle.loads(cloudpickle.dumps(proxy_cloud_func)) else 'No' }")
if use_dask:
logging.info("Creating Coiled software env...")
coiled.create_software_environment(
name="minimal-ml-env",
pip="pandas numba numpy".split(),
)
logging.info("Creating Coiled cluster...")
# Needed when workers are using ProcessPoolExecutor
dask.config.set({"distributed.worker.daemon": False})
cluster = coiled.Cluster(
name="test",
n_workers=1,
# worker_options={"nthreads": 1},
software="minimal-ml-env",
use_best_zone=True,
compute_purchase_option="spot",
backend_options={"region_name": "us-east-2"},
scheduler_options={"idle_timeout": "10 minutes"},
scheduler_vm_types=["t3.medium"],
worker_vm_types=["m6i.large"],
)
# point Dask to remote cluster
client = cluster.get_client()
print("Dask scheduler dashboard:", client.dashboard_link)
# Swap out the default threadpool for a processpool on workers
class ProcessPoolWorkers(WorkerPlugin):
def setup(self, worker):
worker.executors["default"] = ProcessPoolExecutor(max_workers=worker.state.nthreads)
client.register_worker_plugin(ProcessPoolWorkers())
client.wait_for_workers(1)
client.upload_file("cloud.py")
params_grid = create_params_grid(input_array=input_array)
logging.info("Start of computing...")
if use_dask:
with parallel_backend("dask"):
res = Parallel(n_jobs=-1)(params_grid)
else:
res = Parallel(n_jobs=-1)(params_grid)
logging.info(f"Done. {res[0]}") When use_dask is set to True, upon running I get that PicklingError despite the func is deemed to be pickleable. |
Coming to this issue late, but But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting |
When adjusted for Loky, i.e. from loky import get_reusable_executor
...
coiled.create_software_environment(
name="minimal-ml-env",
pip="pandas numba numpy loky".split(),
)
...
# Swap out the default threadpool for a processpool on workers
class ProcessPoolWorkers(WorkerPlugin):
def setup(self, worker):
worker.executors["default"] = get_reusable_executor(max_workers=worker.state.nthreads) I get the following error:
|
Apologies, I was pointed out that dask and distributed were missing from the env. Still, after the fix,
no matter if I use Re-checked with ProcessPoolExecutor, for it setting deamon to False makes it throw (instead of daemonic processes are not allowed to have children)
|
Unrelated but we just pushed a change to our staging deployment that will show you those "dask is missing" errors more directly, instead of having to hunt for them in the system logs! Hopefully deployed pretty soon |
@fingoldo thought on this comment:
From my perspective it sounds like you're focused on using a ProcessPoolExecutor, but this is a very atypical path and, my guess is, not actually all that useful. Are you really hurting from having two vCPUs cost-wise? I'm curious how much computation costs you're running into if so such that it warrants this focus. Have you tried running your computation on ARM? They don't have vCPUs, so maybe you can stop caring if you're on that architecture. |
Maybe you are right and I am too concerned with cost. I have not much computations currently (anticipated savings only tens of $$ a day) but I estimate to have more in the coming months. I realized I always have the option of using a Dask cloudprovider which I should be able to fit to my liking without bothering anyone ) Will need to make it accept extra params to run on spot instances though. Thanks a lot everyone for spending time on this question, if it's not on the rodmap of Coiled, feel free to close this feature request. |
Have you tried arm=True? Arm supports single-cpu instances.
…On Sun, May 21, 2023 at 10:00 AM fingoldo ***@***.***> wrote:
@fingoldo <https://github.com/fingoldo> thought on this comment:
But really I think that this is probably too much technology for not that
big of a benefit. Again, if you're using so much that you're becoming quite
cost conscious then there are likely other things to think about. I see
that you've added things like Spot (🎉 ). Maybe setting arm=True would
give you another boost (and give you access to single-CPU instances anyway).
From my perspective it sounds like you're focused on using a
ProcessPoolExecutor, but this is a very atypical path and, my guess is, not
actually all that useful.
Are you really hurting from having two vCPUs cost-wise? I'm curious how
much computation costs you're running into if so such that it warrants this
focus. Have you tried running your computation on ARM? They don't have
vCPUs, so maybe you can stop caring if you're on that architecture.
Maybe you are right and I am too concerned with cost. I have not much
computations currently (anticipated savings only tens of $$ a day) but I
estimate to have more in the coming months. I realized I always have the
option of using a Dask cloudprovider which I should be able to fit to my
liking without bothering anyone ) Will need to make it accept extra params
to run on spot instances though.
Thanks a lot everyone for spending time on this question, if it's not on
the rodmap of Coiled, feel free to close this feature request.
—
Reply to this email directly, view it on GitHub
<#238 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTBAQ4GEN3EF4BJSQE3XHIU2NANCNFSM6AAAAAAW7KGER4>
.
You are receiving this because you commented.Message ID:
***@***.***>
--
<https://coiled.io>
Matthew Rocklin CEO, Dask Maintainer
|
Wow, I did not know that. Not tried yet as in previous versions ARM required some non-standard environment creation (via docker, I recall). Will definitely give it a try then. |
Hey @fingoldo , checking in, did using ARM help resolve this issue? |
Hi, I am trying to run distributed computing on AWS using coiled.
My code is pure Python and therefore can not bypass the GIL.
When testing with local Dask on my laptop, to activate all cores I had to use threads_per_worker settings like this:
cluster = LocalCluster(processes=True, threads_per_worker=1, n_workers=8)
In fact, I was telling dask to spawn as many worker processes as my node had cores, and use only 1 thread per process. That allowed my laptop to utilize all 8 of its (HyperThreaded, but still) cores, yielding the highest performance. As I get it, in distributed dask and in coiled workers have different meaning - it's basically nodes (which is confusing).
I can't pass threads_per_worker to cluster creation code anymore: I get TypeError: init() got an unexpected keyword argument 'threads_per_worker'.
I noticed that Coiled.Cluster has worker_options={"nthreads": 1} ability instead, but it was suspicious that documentation mentioned it as means to make computing "synchronous" rather that leverage processes instead of threads. Indeed, after trying it my not-so-great performance dropped even more.
To recap, my problem is that my code is in pure Python with GIL, and coiled only utilizes one core of an AWS machine with 4 cores (which I clearly see from the coiled dashboard and from the overall timing), despite upon submitting of the job dask says " Using backend DaskDistributedBackend with 4 concurrent workers.".
How do I achieve behaviour similar to LocalCluster with processes=True, threads_per_worker=1 cited above with coiled?
The text was updated successfully, but these errors were encountered: