Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processpool #515

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion fastcore/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,19 @@
'fastcore.net.urlsend': ('net.html#urlsend', 'fastcore/net.py'),
'fastcore.net.urlvalid': ('net.html#urlvalid', 'fastcore/net.py'),
'fastcore.net.urlwrap': ('net.html#urlwrap', 'fastcore/net.py')},
'fastcore.parallel': { 'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
'fastcore.parallel': { 'fastcore.parallel.NoDaemonProcess': ('parallel.html#nodaemonprocess', 'fastcore/parallel.py'),
'fastcore.parallel.NoDaemonProcess.daemon': ( 'parallel.html#nodaemonprocess.daemon',
'fastcore/parallel.py'),
'fastcore.parallel.ProcessPool': ('parallel.html#processpool', 'fastcore/parallel.py'),
'fastcore.parallel.ProcessPool.__init__': ('parallel.html#processpool.__init__', 'fastcore/parallel.py'),
'fastcore.parallel.ProcessPool.map': ('parallel.html#processpool.map', 'fastcore/parallel.py'),
'fastcore.parallel.ProcessPoolExecutor': ('parallel.html#processpoolexecutor', 'fastcore/parallel.py'),
'fastcore.parallel.ProcessPoolExecutor.__init__': ( 'parallel.html#processpoolexecutor.__init__',
'fastcore/parallel.py'),
'fastcore.parallel.ProcessPoolExecutor.map': ( 'parallel.html#processpoolexecutor.map',
'fastcore/parallel.py'),
'fastcore.parallel.ThreadPool': ('parallel.html#threadpool', 'fastcore/parallel.py'),
'fastcore.parallel.ThreadPool.__init__': ('parallel.html#threadpool.__init__', 'fastcore/parallel.py'),
'fastcore.parallel.ThreadPoolExecutor': ('parallel.html#threadpoolexecutor', 'fastcore/parallel.py'),
'fastcore.parallel.ThreadPoolExecutor.__init__': ( 'parallel.html#threadpoolexecutor.__init__',
'fastcore/parallel.py'),
Expand Down
91 changes: 71 additions & 20 deletions fastcore/parallel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03a_parallel.ipynb.

# %% auto 0
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel', 'add_one',
'run_procs', 'parallel_gen']
__all__ = ['threaded', 'startthread', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'NoDaemonProcess',
'ProcessPool', 'ThreadPool', 'parallel', 'add_one', 'run_procs', 'parallel_gen']

# %% ../nbs/03a_parallel.ipynb 1
from .imports import *
Expand All @@ -11,15 +11,14 @@
from .meta import *
from .xtras import *
from functools import wraps

import concurrent.futures,time
from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
from multiprocessing import pool,Process,Queue,Manager,set_start_method,get_all_start_methods,get_context
from threading import Thread
try:
if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method("fork")
except: pass

# %% ../nbs/03a_parallel.ipynb 4
# %% ../nbs/03a_parallel.ipynb 5
def threaded(f):
"Run `f` in a thread, and returns the thread"
@wraps(f)
Expand All @@ -29,12 +28,12 @@ def _f(*args, **kwargs):
return res
return _f

# %% ../nbs/03a_parallel.ipynb 6
# %% ../nbs/03a_parallel.ipynb 7
def startthread(f):
"Like `threaded`, but start thread immediately"
threaded(f)()

# %% ../nbs/03a_parallel.ipynb 8
# %% ../nbs/03a_parallel.ipynb 9
def _call(lock, pause, n, g, item):
l = False
if pause:
Expand All @@ -45,7 +44,7 @@ def _call(lock, pause, n, g, item):
if l: lock.release()
return g(item)

# %% ../nbs/03a_parallel.ipynb 9
# %% ../nbs/03a_parallel.ipynb 10
def parallelable(param_name, num_workers, f=None):
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
Expand All @@ -54,7 +53,7 @@ def parallelable(param_name, num_workers, f=None):
return False
return True

# %% ../nbs/03a_parallel.ipynb 10
# %% ../nbs/03a_parallel.ipynb 11
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
Expand All @@ -72,7 +71,7 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)

# %% ../nbs/03a_parallel.ipynb 12
# %% ../nbs/03a_parallel.ipynb 13
@delegates()
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
Expand All @@ -95,49 +94,101 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
except Exception as e: self.on_exc(e)

# %% ../nbs/03a_parallel.ipynb 14
# %% ../nbs/03a_parallel.ipynb 15
class NoDaemonProcess(Process):
# See https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass

# %% ../nbs/03a_parallel.ipynb 16
@delegates()
class ProcessPool(pool.Pool):
"Same as Python's Pool, except can pass `max_workers==0` for serial execution"
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, daemonic=False, **kwargs):
if max_workers is None: max_workers=defaults.cpus
store_attr()
self.not_parallel = max_workers==0
if self.not_parallel: max_workers=1
if not daemonic:
class NoDaemonContext(type(kwargs.get('context', get_context()))):
Process = NoDaemonProcess
kwargs['context'] = NoDaemonContext()
super().__init__(max_workers, **kwargs)

def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
assert timeout is None, "timeout is not supported by ProcessPool, use ProcessPoolExecutor instead"
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
self.not_parallel = self.max_workers==0
if self.not_parallel: self.max_workers=1

if self.not_parallel == False: self.lock = Manager().Lock()
g = partial(f, *args, **kwargs)
if self.not_parallel: return map(g, items)
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
try: return super().map(_g, items, chunksize=chunksize)
except Exception as e: self.on_exc(e)

# %% ../nbs/03a_parallel.ipynb 17
@delegates()
class ThreadPool():
# If you have a need for a ThreadPool, please open an issue.
def __init__(self, *args, **kwargs):
raise NotImplementedError("`ThreadPool` is not implemented")

# %% ../nbs/03a_parallel.ipynb 18
try: from fastprogress import progress_bar
except: progress_bar = None

# %% ../nbs/03a_parallel.ipynb 15
# %% ../nbs/03a_parallel.ipynb 19
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):
method=None, threadpool=False, timeout=None, chunksize=1,
executor=True, maxtasksperchild=None, **kwargs):
"Applies `func` in parallel to `items`, using `n_workers`"
kwpool = {}
if threadpool: pool = ThreadPoolExecutor
if threadpool: pool = ThreadPoolExecutor if executor else ThreadPool
else:
pool = ProcessPoolExecutor if executor else ProcessPool
if not method and sys.platform == 'darwin': method='fork'
if method: kwpool['mp_context'] = get_context(method)
pool = ProcessPoolExecutor
if method:
if executor: kwpool['mp_context'] = get_context(method)
else: kwpool['context'] = get_context(method)

if maxtasksperchild:
assert pool==ProcessPool, "`maxtasksperchild` is only supported by ProcessPool"
kwpool['maxtasksperchild'] = maxtasksperchild
with pool(n_workers, pause=pause, **kwpool) as ex:
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
if progress and progress_bar:
if total is None: total = len(items)
r = progress_bar(r, total=total, leave=False)
return L(r)

# %% ../nbs/03a_parallel.ipynb 16
# %% ../nbs/03a_parallel.ipynb 20
def add_one(x, a=1):
# this import is necessary for multiprocessing in notebook on windows
import random
time.sleep(random.random()/80)
return x+a

# %% ../nbs/03a_parallel.ipynb 22
# %% ../nbs/03a_parallel.ipynb 26
def run_procs(f, f_done, args):
"Call `f` for each item in `args` in parallel, yielding `f_done`"
processes = L(args).map(Process, args=arg0, target=f)
for o in processes: o.start()
yield from f_done()
processes.map(Self.join())

# %% ../nbs/03a_parallel.ipynb 23
# %% ../nbs/03a_parallel.ipynb 27
def _f_pg(obj, queue, batch, start_idx):
for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))

def _done_pg(queue, items): return (queue.get() for _ in items)

# %% ../nbs/03a_parallel.ipynb 24
# %% ../nbs/03a_parallel.ipynb 28
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
if not parallelable('n_workers', n_workers): n_workers = 0
Expand Down
2 changes: 1 addition & 1 deletion nbs/000_tour.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@
"split_at_heading": true
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "python3",
"language": "python",
"name": "python3"
}
Expand Down
2 changes: 1 addition & 1 deletion nbs/00_test.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@
"split_at_heading": true
},
"kernelspec": {
"display_name": "Python 3.7.15 ('fastcore-wZINr0y3')",
"display_name": "python3",
"language": "python",
"name": "python3"
}
Expand Down
2 changes: 1 addition & 1 deletion nbs/01_basics.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -6110,7 +6110,7 @@
"split_at_heading": true
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "python3",
"language": "python",
"name": "python3"
}
Expand Down
2 changes: 1 addition & 1 deletion nbs/02_foundation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2463,7 +2463,7 @@
"split_at_heading": true
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "python3",
"language": "python",
"name": "python3"
}
Expand Down
2 changes: 1 addition & 1 deletion nbs/03_xtras.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@
"split_at_heading": true
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "python3",
"language": "python",
"name": "python3"
}
Expand Down
Loading