需要向进程池分发任务
pool 提供了一个用于执行临时任务的可重用进程池。
The pool provides 8 ways to issue tasks to workers in the process pool.
They are:
我们可以使用 apply() 函数向进程池发出一次性任务。
apply() 函数采用要由工作进程执行的函数的名称。
python# issue a task to the process pool
pool.apply(task)
如果要执行的函数采用参数,则可以将它们指定为“args”参数的元组,或者通过“kwds”参数将其指定为字典。
python# issue a task to the process pool with arguments
pool.apply(task,args=(arg1,arg2,arg3))
apply()函数和apply_async()函数都可以用来向进程池发出一次性任务。
以下总结了这两个函数之间的关键区别:
apply()函数应该用于向进程池发出目标任务函数,调用者可以或必须在任务完成时阻塞等待。
apply_async()函数应该用于向进程池发出目标任务函数,调用者不能或不应该在任务执行时阻塞等待。
现在我们知道如何向进程池发出一次性任务,让我们看一些实际的工作示例。
这个方法,阻塞,而且是单一性的,所以多进程是没有起效果的。代码为:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
for item in range(10):
pool.apply(task)
# close the process pool
pool.close()
# wait for all tasks to finish
pool.join()
输出结果:
Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done Task executing Task done
首先输出 Task executing, 睡1s,然后Task Done。
性能和上边的一样,不过是加了个args
python# SuperFastPython.com
# example of issuing a task with apply() with arguments to the process pool
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(data):
# report a message
print(f'Task executing: {data}', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done: {data}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
pool.apply(task, args=('Hello World',))
# close the process pool
pool.close()
代码:
python
# SuperFastPython.com
# example of issuing a task with apply() to the process pool with a return value
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# generate a value
value = random()
# report a message
print(f'Task executing: {value}', flush=True)
# block for a moment
sleep(1)
# report a message
# flush=True 的作用是强制将缓冲区的内容立即输出到终端,而不是等待缓冲区满或者程序结束时才输出。
print(f'Task done {value}', flush=True)
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
result = pool.apply(task)
# report value
print(f'Main got: {result}')
# close the process pool
pool.close()
python# SuperFastPython.com
# example of issuing a task with apply() to the process pool that raises an exception
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# fail
raise Exception('Something bad happened')
# report a message
print(f'Task done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
try:
pool.apply(task)
except Exception as e:
print(f'Failed with: {e}')
# close the process pool
pool.close()
在创建进程池时可以配置,这将准备好子进程工作。
一个控制一组工作进程的进程池对象,可以向其中提交任务。它支持带有超时和回调的异步结果,并实现了并行映射功能。
— 多进程 — 基于进程的并行处理
进程池允许您以目标函数的形式发出任务,这些任务将由工作进程执行。
不阻塞
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
for item in range(10):
pool.apply_async(task)
# close the process pool
pool.close()
# wait for all tasks to finish
pool.join()
输出结果:
Task executing Task executing Task executing Task executing Task executing Task executing Task executing Task executing Task executing Task executing Task done Task done Task done Task done Task done Task done Task done Task done Task done Task done
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool with arguments
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(message):
# report a message
print(f'Task executing: {message}', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done: {message}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
for item in range(10):
# issue tasks to the process pool
pool.apply_async(task, args=('Hello world',))
# close the process pool
pool.close()
# wait for all tasks to finish
pool.join()
输出结果:
Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task executing: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world Task done: Hello world
我们可以向进程池发出一个返回值的任务,并指定一个回调函数来处理返回的值。
这可以通过 "callback" 参数实现。
在这个例子中,我们可以更新上面的例子,使得任务函数 task() 生成一个值并返回它。然后,我们可以定义一个函数来处理返回的值,本例中只是简单地报告这个值。
首先,我们可以定义一个函数来处理从目标任务函数返回的值。
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool with a callback
from random import random
from time import sleep
from multiprocessing.pool import Pool
# handle the return value callback
def return_callback(result):
print(f'Callback received: {result}', flush=True)
# task executed in a worker process
def task():
# generate a random value
value = random()
# report a message
print(f'Task generated {value}', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done with {value}', flush=True)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
for i in range(10):
# issue tasks to the process pool
pool.apply_async(task, callback=return_callback)
# close the process pool
pool.close()
# wait for all tasks to finish
pool.join()
输出结果:
Task generated 0.5840054487296511 Task generated 0.9516317802644778 Task generated 0.7571636709749359 Task generated 0.16765812470809205 Task generated 0.4002398315786354 Task generated 0.2872766239198933 Task generated 0.22745911553235576 Task generated 0.17575934052357256 Task generated 0.8223771746049614 Task generated 0.21040701403925788 Task done with 0.5840054487296511 Task done with 0.9516317802644778 Task done with 0.7571636709749359 Task done with 0.16765812470809205 Callback received: 0.5840054487296511 Task done with 0.4002398315786354 Callback received: 0.7571636709749359 Task done with 0.2872766239198933 Callback received: 0.9516317802644778 Callback received: 0.16765812470809205 Task done with 0.17575934052357256 Task done with 0.22745911553235576 Callback received: 0.4002398315786354 Callback received: 0.2872766239198933 Task done with 0.8223771746049614 Callback received: 0.22745911553235576 Callback received: 0.17575934052357256 Callback received: 0.8223771746049614 Task done with 0.21040701403925788 Callback received: 0.21040701403925788
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool with an error callback
from random import random
from time import sleep
from multiprocessing.pool import Pool
# handle any errors in the task function
def custom_error_callback(error):
print(f'Got an Error: {error}', flush=True)
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# raise an exception
raise Exception('Something bad happened')
# report a message
print(f'Task done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
for i in range(5):
# issue tasks to the process pool
pool.apply_async(task, error_callback=custom_error_callback)
# close the process pool
pool.close()
# wait for all tasks to finish
pool.join()
输出结果:
Task executing Task executing Task executing Task executing Task executing Got an Error: Something bad happened Got an Error: Something bad happened Got an Error: Something bad happened Got an Error: Something bad happened Got an Error: Something bad happened
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool and waiting for the task to complete
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
for i in range(5):
# issue tasks to the process pool
result = pool.apply_async(task)
# wait for the task to complete
result.wait()
# close the process pool
pool.close()
输出结果:
Task executing Task executing Task executing Task executing Task executing Task done Task done Task done Task done Task done
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool and wait for the result
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# generate a random value
value = random()
# report a message
print(f'Task generated {value}', flush=True)
# block for a moment
sleep(1)
# report a message
print(f'Task done with {value}', flush=True)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
for i in range(5):
result = pool.apply_async(task)
# wait for the return value
value = result.get()
# report the return value
print(f'Got: {value}')
# close the process pool
pool.close()
输出结果:
Task generated 0.10878768090475144 Task generated 0.44626058234847743 Task generated 0.17344064689369554 Task generated 0.6143663307741349 Task generated 0.06570417815815988 Task done with 0.10878768090475144 Task done with 0.44626058234847743 Task done with 0.17344064689369554 Task done with 0.6143663307741349 Task done with 0.06570417815815988 Got: 0.06570417815815988
代码:
python# SuperFastPython.com
# example of issuing a task with apply_async() to the process pool and handle exception
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print(f'Task executing', flush=True)
# block for a moment
sleep(1)
# raise an exception
raise Exception('Something bad happened')
# report a message
print(f'Task done', flush=True)
# return a value
return "DONE!"
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
pool = Pool()
# issue tasks to the process pool
for i in range(5):
result = pool.apply_async(task)
# wait for the return value
try:
value = result.get()
except Exception as e:
print(f'Failed with: {e}')
# close the process pool
pool.close()
输出结果:
Task executing Task executing Task executing Task executing Task executing Failed with: Something bad happened
参考地址 进程池提供了一个并行版本的内置 map() 函数,用于发出任务。
map() 函数接受一个目标函数的名称和一个可迭代对象作为参数。为提供的可迭代对象中的每个项目创建一个任务,调用目标函数。它返回一个可迭代对象,包含每次调用目标函数后的返回值。
首先遍历可迭代对象,并一次性发出所有任务。可以指定一个 chunksize 参数,将任务分成组,每个组可能被发送到每个工作进程以批量执行。
map() 函数在以下几个方面具有优势:
并行处理:map() 函数能够利用多进程或者多线程并行处理数据。它将一个函数应用于一个可迭代对象(如列表),并且可以将工作分发给多个工作进程或线程。这样可以加速处理速度,特别是在多核处理器上可以充分利用多核资源。
简单易用:使用map()函数可以简洁地实现对一个可迭代对象中每个元素的相同操作。不需要显式编写循环来逐个处理元素,这样可以减少代码量并提高代码的可读性。
函数式编程风格:map()函数支持函数式编程风格,可以传递一个函数作为参数,并将其应用到所有元素上。这种风格有助于编写简洁、可组合和可重用的代码。
高级特性:Python的map()函数还支持一些高级特性,如惰性计算(lazy evaluation)和惰性操作(lazy operations)。在某些情况下,它可以提供性能优势和内存效率,特别是与生成器表达式等结合使用时。
总结一下,map() 方法的能力如下:
以下总结了这两个函数之间的关键区别:
map()
函数会阻塞,而 map_async()
函数不会阻塞。map()
函数返回目标函数的返回值组成的可迭代对象,而 map_async()
函数返回一个 AsyncResult 对象。map()
函数不支持回调函数,而 map_async()
函数可以在返回值和错误上执行回调函数。代码
python# SuperFastPython.com
# example of parallel map() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# execute tasks in order
for result in pool.map(task, range(10)):
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
结果:
Task 0 executing with 0.3797105402718005 Task 1 executing with 0.14328429645394736 Task 2 executing with 0.7725829616008802 Task 3 executing with 0.3478235093056873 Task 4 executing with 0.3190084426508476 Task 5 executing with 0.5743335167746704 Task 6 executing with 0.8394248595809973 Task 7 executing with 0.8177697713255525 Task 8 executing with 0.21716030721607726 Task 9 executing with 0.11004484619663757 Got result: 0.3797105402718005 Got result: 0.14328429645394736 Got result: 0.7725829616008802 Got result: 0.3478235093056873 Got result: 0.3190084426508476 Got result: 0.5743335167746704 Got result: 0.8394248595809973 Got result: 0.8177697713255525 Got result: 0.21716030721607726 Got result: 0.11004484619663757
运行这个示例首先使用默认配置创建进程池。
进程池将会有与系统中每个逻辑CPU核心数量相同的子工作进程。
然后调用 map() 函数对范围进行操作。
这会为范围中的每个整数(从0到9)调用一次 task() 函数。返回一个包含每个函数调用结果的迭代器,按顺序排列。
每次调用 task 函数会生成一个介于0到1之间的随机数,并报告一条消息,然后阻塞,最后返回一个值。
主进程遍历从 task() 函数调用返回的值,并报告生成的值,与每个子进程中生成的值相匹配。
重要的是,所有 task() 函数调用都会在结果迭代器返回之前被发出并执行。我们无法在调用者完成时迭代结果。
代码:
python# SuperFastPython.com
# example of parallel map() with the process pool and a task that does not return a value
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# execute tasks, block until all completed
pool.map(task, range(10))
# process pool is closed automatically
输出结果:
Task 0 executing with 0.33454314205917246 Task 1 executing with 0.47323143512889077 Task 2 executing with 0.1460069962000885 Task 3 executing with 0.9807582979521244 Task 4 executing with 0.7477675113522123 Task 5 executing with 0.7474832897631315 Task 6 executing with 0.2673773104818564 Task 7 executing with 0.7800831692712938 Task 8 executing with 0.9909646483118719 Task 9 executing with 0.13751172558451952
代码:
python# SuperFastPython.com
# example of parallel map() with the process pool with a larger iterable
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(1)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(4) as pool:
# execute tasks, block until all complete
pool.map(task, range(40), chunksize=1)
# process pool is closed automatically
结果:
Task 0 executing with 0.8082787568483673 Task 1 executing with 0.19376076012832755 Task 2 executing with 0.26373954211000694 Task 3 executing with 0.5670585650691654 Task 4 executing with 0.5311162256544141 Task 5 executing with 0.2964043794954244 Task 6 executing with 0.8768115505127568 Task 7 executing with 0.08577434465218758 Task 8 executing with 0.1472905847887539 Task 9 executing with 0.8000131231460409 Task 10 executing with 0.1226818599487518 Task 11 executing with 0.22718660277863345 Task 12 executing with 0.47740381163873447 Task 13 executing with 0.755652238042654 Task 14 executing with 0.3816307380533749 Task 15 executing with 0.8175495585682796 Task 16 executing with 0.31730565614112194 Task 17 executing with 0.26503210499315943 Task 18 executing with 0.9584923649128245 Task 19 executing with 0.2831673527586537 Task 20 executing with 0.23384012468842796 Task 21 executing with 0.36134413355098616 Task 22 executing with 0.42777533165708115 Task 23 executing with 0.7731458330020224 Task 24 executing with 0.31697788427284745 Task 25 executing with 0.8328982351710499 Task 26 executing with 0.409224329772068 Task 27 executing with 0.9903339115079458 Task 28 executing with 0.9322219200498291 Task 29 executing with 0.02096025321618633 Task 30 executing with 0.5583339911956919 Task 31 executing with 0.022326533282114203 Task 32 executing with 0.8813618672654001 Task 33 executing with 0.19818041918485507 Task 34 executing with 0.07959055263987791 Task 35 executing with 0.8593789582174505 Task 36 executing with 0.7596797212785216 Task 37 executing with 0.1250247795206576 Task 38 executing with 0.00674736680917154 Task 39 executing with 0.77133250168632 4个4个执行
运行这个示例首先创建一个带有4个子进程工作进程的进程池。
然后对范围调用 map()
函数。
这会为范围中的每个整数(从0到39)调用一次 task()
函数。返回一个包含每个函数调用结果的迭代器,按顺序排列。
每次调用 task()
函数会生成一个介于0到1之间的随机数,并报告一条消息,然后阻塞,最后返回一个值。
主进程遍历从 task()
函数调用返回的值,并报告生成的值,与每个子进程中生成的值相匹配。
在我的系统上,这个示例大约花费了12.2秒来完成。
Pool() 函数创建的进程池默认会创建与系统中的逻辑 CPU 核心数量相同的子进程。
chunksize 默认情况下是将所有任务分成尽可能均匀的块,并将每个块作为单个任务发送给工作进程池的工作进程。显式指定来chunksize=1的参数,表示map()会将每个任务(40)分成大小为1的块并将每个块作为单个任务发送给工作进程池中的工作进程
代码:
python# SuperFastPython.com
# example of parallel map() with the process pool with a larger iterable and chunksize
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(1)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(4) as pool:
# execute tasks in chunks, block until all complete
pool.map(task, range(40), chunksize=10)
# process pool is closed automatically
结果:
4个一循环,是pool得大小决定的 task序号,是chunksize决定的,会将40分成大小为10的并分发给每个任务(此处一共4个) 所以:是 0-10-20-30 1-11-21-31 这样循环的 Task 0 executing with 0.4276540131975046 Task 10 executing with 0.3510518930429164 Task 20 executing with 0.010898695183638285 Task 30 executing with 0.7694875902010727 Task 1 executing with 0.8393221332124374 Task 11 executing with 0.20759955336900815 Task 21 executing with 0.31045981964337677 Task 31 executing with 0.026172326326703077 Task 2 executing with 0.27360558191315154 Task 12 executing with 0.759337042194054 Task 22 executing with 0.6922538066107337 Task 32 executing with 0.005092227726946952 Task 3 executing with 0.21115814805156108 Task 13 executing with 0.7629885053212317 Task 23 executing with 0.5719492216197393 Task 33 executing with 0.42963105643770505 Task 4 executing with 0.20495740149311215 Task 24 executing with 0.5653055012561037 Task 14 executing with 0.5659665620333661 Task 34 executing with 0.30104404013683717 Task 5 executing with 0.12077260537750123 Task 15 executing with 0.8678901024659832 Task 25 executing with 0.6166575850055673 Task 35 executing with 0.9456216077803805 Task 6 executing with 0.010209560611675106 Task 16 executing with 0.5687392990426766 Task 26 executing with 0.15573155742290135 Task 36 executing with 0.6853634702169649 Task 7 executing with 0.7926085982014993 Task 17 executing with 0.5410064940634144 Task 27 executing with 0.5464409466858695 Task 37 executing with 0.7212084042075214 Task 8 executing with 0.4345060308018942 Task 18 executing with 0.8143815333599862 Task 38 executing with 0.5164709927402747 Task 28 executing with 0.3553266191974772 Task 9 executing with 0.40741724068703555 Task 19 executing with 0.7272607053362428 Task 39 executing with 0.028821121430974816 Task 29 executing with 0.8096770152151328
imap()函数是map()函数的惰性版本,它逐个提交任务给进程池,并在任务完成时获取结果。
另外,还要记住,进程池提供了map()函数的一个版本,其中目标函数并行地对提供的可迭代对象中的每个项目进行调用。
Pool.map()函数的问题在于,它将提供的可迭代对象转换为列表,并一次性为每个项目发出一个任务。如果可迭代对象包含许多个或数千个项目,这可能会占用大量主内存。
作为替代方案,进程池提供了imap()函数,它是map的惰性版本,用于以惰性方式将目标函数应用于可迭代对象中的每个项目。
具体来说:
以下总结了这两个函数之间的关键区别:
imap()
函数逐个将任务提交给进程池,而 map()
函数一次性将所有任务提交给进程池。imap()
函数在每个任务完成之前会阻塞,而 map()
函数在所有任务完成之前会阻塞。imap()
和 imap_unordered()
函数有许多共同点,例如:
imap()
和 imap_unordered()
都可以用于通过进程池向可迭代对象中的所有项目发出调用函数的任务。imap()
和 imap_unordered()
都是 map()
函数的惰性版本。imap()
和 imap_unordered()
函数立即返回一个包含返回值的可迭代对象。然而,这两个函数之间存在一个关键的区别:
imap()
返回的可迭代对象按照完成的顺序逐个生成结果。imap_unordered()
函数则按照任务完成的任意顺序生成结果。代码:
python# SuperFastPython.com
# example of parallel imap() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
print(f'----------- sleep {value+2}')
sleep(value+2)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# execute tasks in order
for result in pool.imap(task, range(50)):
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
结果:
Task 0 executing with 0.7976385057282228 ----------- sleep 2.797638505728223 Task 1 executing with 0.8773395360586137 ----------- sleep 2.8773395360586136 Task 2 executing with 0.4640498967497839 ----------- sleep 2.464049896749784 Task 3 executing with 0.4280801969331871 ----------- sleep 2.428080196933187 Task 4 executing with 0.2514914930848101 ----------- sleep 2.25149149308481 Task 5 executing with 0.6554988958695885 ----------- sleep 2.6554988958695884 Task 6 executing with 0.3052523960992747 ----------- sleep 2.3052523960992746 Task 7 executing with 0.1456463972658295 ----------- sleep 2.1456463972658293 Task 8 executing with 0.9228415049104269 ----------- sleep 2.922841504910427 Task 9 executing with 0.013075641148591943 ----------- sleep 2.0130756411485917 Task 10 executing with 0.47357597434520327 ----------- sleep 2.473575974345203 Task 11 executing with 0.9698595954013147 ----------- sleep 2.9698595954013145 Task 12 executing with 0.9526617555157718 ----------- sleep 2.9526617555157717 Task 13 executing with 0.0975520120919221 ----------- sleep 2.097552012091922 Task 14 executing with 0.020677843702383347 ----------- sleep 2.0206778437023836 Task 15 executing with 0.6826293789481297 ----------- sleep 2.6826293789481297 Task 16 executing with 0.15319035792652014 ----------- sleep 2.15319035792652 Task 17 executing with 0.9916595947911816 ----------- sleep 2.9916595947911815 Task 18 executing with 0.8227766104439835 ----------- sleep 2.8227766104439835 Task 19 executing with 0.7648964201026498 ----------- sleep 2.76489642010265 Task 20 executing with 0.9183000031182742 ----------- sleep 2.918300003118274 Got result: 0.7976385057282228 Task 21 executing with 0.04837849692951868 ----------- sleep 2.0483784969295185 Got result: 0.8773395360586137 Got result: 0.4640498967497839 Got result: 0.4280801969331871 Got result: 0.2514914930848101 Got result: 0.6554988958695885 Got result: 0.3052523960992747 Got result: 0.1456463972658295 Task 22 executing with 0.3973050563278292 ----------- sleep 2.3973050563278293 Got result: 0.9228415049104269 Got result: 0.013075641148591943 Got result: 0.47357597434520327 Task 23 executing with 0.3922799729624259 ----------- sleep 2.392279972962426 Got result: 0.9698595954013147 Task 24 executing with 0.28034767287441986 ----------- sleep 2.28034767287442 Task 25 executing with 0.4706021760927469 ----------- sleep 2.470602176092747 Task 26 executing with 0.9940484666999507 ----------- sleep 2.994048466699951 Task 27 executing with 0.7553544988702916 ----------- sleep 2.7553544988702914 Task 28 executing with 0.26627350713939846 ----------- sleep 2.2662735071393985 Got result: 0.9526617555157718 Got result: 0.0975520120919221 Got result: 0.020677843702383347 Task 29 executing with 0.6143710522875874 ----------- sleep 2.6143710522875874 Got result: 0.6826293789481297 Got result: 0.15319035792652014 Task 30 executing with 0.1134230396049336 ----------- sleep 2.1134230396049336 Task 31 executing with 0.9613353522426518 ----------- sleep 2.961335352242652 Task 32 executing with 0.9928174959836296 ----------- sleep 2.99281749598363 Task 33 executing with 0.05801047553528538 ----------- sleep 2.0580104755352853 Task 34 executing with 0.7597426620658714 ----------- sleep 2.7597426620658716 Got result: 0.9916595947911816 Got result: 0.8227766104439835 Got result: 0.7648964201026498 Task 35 executing with 0.6417110656274684 ----------- sleep 2.6417110656274683 Got result: 0.9183000031182742 Got result: 0.04837849692951868 Got result: 0.3973050563278292 Got result: 0.3922799729624259 Task 36 executing with 0.17509574737885525 ----------- sleep 2.1750957473788555 Got result: 0.28034767287441986 Task 37 executing with 0.05653713409626737 ----------- sleep 2.0565371340962675 Got result: 0.4706021760927469 Task 38 executing with 0.09318105444753089 ----------- sleep 2.093181054447531 Task 39 executing with 0.08569578553505197 ----------- sleep 2.085695785535052 Task 40 executing with 0.5920871487229622 ----------- sleep 2.592087148722962 Task 41 executing with 0.7231926786544903 ----------- sleep 2.7231926786544904 Got result: 0.9940484666999507 Task 42 executing with 0.9527310342350643 ----------- sleep 2.9527310342350646 Task 43 executing with 0.9254954062513698 ----------- sleep 2.92549540625137 Got result: 0.7553544988702916 Got result: 0.26627350713939846 Got result: 0.6143710522875874 Got result: 0.1134230396049336 Task 44 executing with 0.8593550509382479 ----------- sleep 2.859355050938248 Task 45 executing with 0.7360065781774454 ----------- sleep 2.736006578177445 Got result: 0.9613353522426518 Task 46 executing with 0.25894800159803844 ----------- sleep 2.2589480015980383 Task 47 executing with 0.7586350718862785 ----------- sleep 2.7586350718862787 Got result: 0.9928174959836296 Got result: 0.05801047553528538 Got result: 0.7597426620658714 Got result: 0.6417110656274684 Task 48 executing with 0.021630441992429583 ----------- sleep 2.0216304419924294 Got result: 0.17509574737885525 Task 49 executing with 0.806873603196441 ----------- sleep 2.8068736031964407 Got result: 0.05653713409626737 Got result: 0.09318105444753089 Got result: 0.08569578553505197 Got result: 0.5920871487229622 Got result: 0.7231926786544903 Got result: 0.9527310342350643 Got result: 0.9254954062513698 Got result: 0.8593550509382479 Got result: 0.7360065781774454 Got result: 0.25894800159803844 Got result: 0.7586350718862785 Got result: 0.021630441992429583 Got result: 0.806873603196441
首先会放了12个任务,遇到阻塞,占满了,后期会根据内存释放的情况,输出结果或者继续任务
运行这个示例首先使用默认配置创建进程池。
进程池将会为系统中的每个逻辑 CPU 核心创建一个子工作进程。
然后对范围调用imap()
函数。
这会为范围中的每个整数(从0到49)调用一次task()
函数。返回一个按顺序排列的结果迭代器,其中包含每次函数调用的结果。
每次调用task()
函数会生成一个介于0到1之间的随机数,并报告一条消息,然后阻塞,最后返回一个值。
主进程遍历从task()
函数调用返回的值,并报告生成的值,与每个子进程中生成的值相匹配。
重要的是,任务会逐个提交给进程池,随着池中空间的释放而进行。
同样重要的是,主进程在任务完成时即时报告结果。
下面是结果的截断列表。我们可以看到,任务正在运行并报告它们生成的结果,而主进程在接收和报告返回值。
这与map()
函数不同,后者必须等待所有任务完成后才能报告返回值。
代码:
python# SuperFastPython.com
# example of parallel imap() with the process pool and a task that does not return a value
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value+2)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# issue tasks to the process pool
pool.imap(task, range(50))
# shutdown the process pool
pool.close()
# wait for all issued task to complete
pool.join()
结果:
Task 0 executing with 0.8025281567456855 Task 1 executing with 0.2719032314094808 Task 2 executing with 0.16013665898388563 Task 3 executing with 0.19201654037630478 Task 4 executing with 0.6595740727568851 Task 5 executing with 0.7898822650822007 Task 6 executing with 0.8782348908366461 Task 7 executing with 0.6594298859995971 Task 8 executing with 0.5148436496248936 Task 9 executing with 0.15716953739718553 Task 10 executing with 0.5671437570373693 Task 11 executing with 0.22614485578299237 Task 12 executing with 0.5698178652478716 Task 13 executing with 0.09619856151432804 Task 14 executing with 0.8337764659354885 Task 15 executing with 0.05655847087572774 Task 16 executing with 0.8605222112723191 Task 17 executing with 0.8861027168227714 Task 18 executing with 0.9295972501481827 Task 19 executing with 0.5566131459032182 Task 20 executing with 0.5087037248802538 Task 21 executing with 0.5348100320715075 Task 22 executing with 0.6712273465512043 Task 23 executing with 0.6288073362863664 Task 24 executing with 0.3826070784830343 Task 25 executing with 0.30902631266756 Task 26 executing with 0.20760449107433032 Task 27 executing with 0.9072229273804596 Task 28 executing with 0.021597176678428087 Task 29 executing with 0.23398861999150633 Task 30 executing with 0.8895985607082505 Task 31 executing with 0.3283236392334866 Task 32 executing with 0.05882649132805862 Task 33 executing with 0.23922106796005405 Task 34 executing with 0.16697700502082558 Task 35 executing with 0.0893450585635508 Task 36 executing with 0.972610072771512 Task 37 executing with 0.02583320954541235 Task 38 executing with 0.3759974957575599 Task 39 executing with 0.5116197415630128 Task 40 executing with 0.22250014974430277 Task 41 executing with 0.19118322498062612 Task 42 executing with 0.7519011749253323 Task 43 executing with 0.08150771239175503 Task 44 executing with 0.5305501184275665 Task 45 executing with 0.8699033033473972 Task 46 executing with 0.5278631190226738 Task 47 executing with 0.23084880174180467 Task 48 executing with 0.8397290025249506 Task 49 executing with 0.26304432876715156
任务先执行了12个,后期根据完成的结果,执行其他的任务
对imap()
函数的调用会立即返回。
因此,我们必须显式等待进程池中的所有任务完成。否则,进程池的上下文管理器会退出并强制终止进程池及其中所有正在运行的任务。
可以通过先关闭进程池,阻止进一步向进程池提交任务,然后加入(join)进程池等待所有任务完成和所有工作进程关闭来实现这一点。
代码:
python# SuperFastPython.com
# example of parallel imap() with the process pool with a larger iterable and chunksize
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(1)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(4) as pool:
# issue tasks to the process pool
pool.imap(task, range(40), chunksize=10)
# shutdown the process pool
pool.close()
# wait for all issued task to complete
pool.join()
输出结果:
Task 10 executing with 0.9424678578620796 Task 0 executing with 0.7569749022973862 Task 20 executing with 0.007998245589154629 Task 30 executing with 0.21817659450596738 Task 11 executing with 0.5785937200713644 Task 1 executing with 0.6177313956278402 Task 31 executing with 0.9105411743989735 Task 21 executing with 0.27326216938396186 Task 32 executing with 0.4062208920488781 Task 2 executing with 0.8140506711298271 Task 12 executing with 0.9034975833169594 Task 22 executing with 0.43899970961255974 Task 33 executing with 0.49985045261249317 Task 3 executing with 0.24751799831035504 Task 23 executing with 0.7692489833842153 Task 13 executing with 0.9769952277475739 Task 4 executing with 0.6074539457939938 Task 34 executing with 0.1399876280905441 Task 14 executing with 0.646170746419252 Task 24 executing with 0.7271203149528022 Task 5 executing with 0.6932295412950661 Task 25 executing with 0.9052762416179745 Task 15 executing with 0.7109256833784178 Task 35 executing with 0.9386128676757766 Task 36 executing with 0.7171321672952267 Task 6 executing with 0.021164948166335074 Task 26 executing with 0.5635745352420497 Task 16 executing with 0.5981875980962169 Task 37 executing with 0.3452213956166946 Task 27 executing with 0.35065529018476693 Task 7 executing with 0.6462588958116396 Task 17 executing with 0.402357408123041 Task 38 executing with 0.2855626465487794 Task 28 executing with 0.03916914759497292 Task 8 executing with 0.5415865652279505 Task 18 executing with 0.8372673816363609 Task 29 executing with 0.5620820361050013 Task 19 executing with 0.597882816682178 Task 9 executing with 0.8800945459399566 Task 39 executing with 0.49745841647243405
运行示例首先创建了具有 4 个子进程工作者的进程池。
然后使用 imap() 函数对范围调用,chunksize 设置为 10。
这将向进程池发出 4 个工作单元,每个工作单元分别由 10 次对 task() 函数的调用组成,分别分配给每个子进程工作者。
每次对 task 函数的调用生成一个介于 0 和 1 之间的随机数,并报告一个消息,然后阻塞,最后返回一个值。
主进程继续执行,首先关闭进程池,然后加入它以等待所有任务完成。
进程池中的任务完成后,进程池关闭。
imap() 的问题在于,任务按完成顺序返回。
如果有一些任务完成较慢,它们将阻塞结果流。
需要一种 imap() 函数的版本,它允许返回值在任务完成时尽快迭代。也就是说,按照任务完成的顺序迭代结果,而不是按照任务完成的顺序。
imap_unordered() 函数提供了这种能力。
与内置的 map() 函数不同,Pool.imap_unordered() 函数只接受一个可迭代对象作为参数。这意味着在进程中执行的目标函数只能接受一个参数。
与 Pool.map() 函数不同,Pool.imap_unordered() 函数会逐个迭代提供的可迭代对象,并向进程池发出任务。它会在任务完成时逐个产生返回值,而不是等待所有任务完成后一次性产生。
与 Pool.imap() 函数不同,Pool.imap_unordered() 函数会按照任务完成的顺序产生返回值,而不是按照将任务提交给进程池的顺序产生。
虽然 Pool.imap_unordered() 函数是惰性的,但我们可以将任务分块发送到进程池。也就是说,我们可以从输入的可迭代对象中产生固定数量的项,并将它们作为一个任务发送给子工作进程执行。
这样做可以使在非常长的可迭代对象中完成大量任务更有效率,因为目标任务函数的参数和返回值可以分批传输,减少计算开销。
imap_unordered() 和 imap() 函数有许多共同点,例如:
然而,这两个函数之间有一个关键的区别:
imap_unordered()
返回的可迭代对象按照任务完成的任意顺序产生结果,而 imap()
函数则按照任务提交的顺序产生返回值。
代码:
python# SuperFastPython.com
# example of parallel imap_unordered() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# execute tasks in order, process results out of order
for result in pool.imap_unordered(task, range(50)):
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
代码:
python# SuperFastPython.com
# example of parallel imap_unordered() with the process pool and a task that does not return a value
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# issue tasks to the process pool
pool.imap_unordered(task, range(50))
# shutdown the process pool
pool.close()
# wait for all issued task to complete
pool.join()
代码:
python# SuperFastPython.com
# example of parallel imap_unordered() with the process pool with a larger iterable and chunksize
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier):
# generate a value
value = random()
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(1)
# return the generated value
return value
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(4) as pool:
# issue tasks to the process pool
for result in pool.imap_unordered(task, range(40), chunksize=2):
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
结果:
ask 0 executing with 0.3667905754061538 Task 2 executing with 0.6469568333686948 Task 4 executing with 0.04192191869984441 Task 6 executing with 0.5513840996318545 Task 1 executing with 0.029554805342302704 Task 5 executing with 0.38605519013357803 Task 3 executing with 0.5673858146244963 Task 7 executing with 0.1307683448137984 Task 8 executing with 0.6462034853580191 Task 10 executing with 0.5958450898190231 Task 12 executing with 0.06922093257180306 Got result: 0.04192191869984441 Got result: 0.38605519013357803 Got result: 0.5513840996318545 Task 14 executing with 0.18814883064331622 Got result: 0.1307683448137984 Got result: 0.3667905754061538 Got result: 0.029554805342302704 Got result: 0.6469568333686948 Got result: 0.5673858146244963 Task 9 executing with 0.9392966017363353 Task 11 executing with 0.11418741714895098 Task 13 executing with 0.7222859296797918 Task 15 executing with 0.5208925382440494 Task 16 executing with 0.05988960610369154 Task 18 executing with 0.17468698904879965 Got result: 0.6462034853580191 Got result: 0.9392966017363353 Got result: 0.5958450898190231 Task 20 executing with 0.2233074380837261 Got result: 0.11418741714895098 Task 22 executing with 0.037130709932817085 Got result: 0.06922093257180306 Got result: 0.7222859296797918 Got result: 0.18814883064331622 Got result: 0.5208925382440494 Task 17 executing with 0.6041499710132169 Task 23 executing with 0.5183511086247533 Task 19 executing with 0.670391792828118 Task 21 executing with 0.7228662830076794 Task 24 executing with 0.1374166668240251 Task 26 executing with 0.3118496777058847 Got result: 0.05988960610369154 Task 28 executing with 0.8460071656209756 Got result: 0.6041499710132169 Got result: 0.037130709932817085 Task 30 executing with 0.8793889657785481 Got result: 0.5183511086247533 Got result: 0.17468698904879965 Got result: 0.670391792828118 Got result: 0.2233074380837261 Got result: 0.7228662830076794 Task 25 executing with 0.8279933272046041 Task 27 executing with 0.22560611841521971 Task 31 executing with 0.5454166318278051 Task 29 executing with 0.4895208764108916 Task 32 executing with 0.38245153063072224 Task 34 executing with 0.7467663223933227 Got result: 0.1374166668240251 Got result: 0.8279933272046041 Task 36 executing with 0.2329740774553356 Task 38 executing with 0.8659995641708716 Got result: 0.8460071656209756 Got result: 0.4895208764108916 Got result: 0.8793889657785481 Got result: 0.5454166318278051 Got result: 0.3118496777058847 Got result: 0.22560611841521971 Task 33 executing with 0.4244062432263287 Task 35 executing with 0.993869808120856 Task 37 executing with 0.20968069783793764 Task 39 executing with 0.5941662795420694 Got result: 0.38245153063072224 Got result: 0.4244062432263287 Got result: 0.7467663223933227 Got result: 0.993869808120856 Got result: 0.2329740774553356 Got result: 0.20968069783793764 Got result: 0.8659995641708716 Got result: 0.5941662795420694
Pool.map() 函数的问题在于,它只接受一个项的可迭代对象,仅允许目标任务函数有一个参数。这与内置的 map() 函数不同。
只能接受一个参数的任务函数是一个严重的限制。
Pool.starmap() 函数提供了一种绕过这种限制的方法。
starmap()
和 map()
都可以用于通过进程池向可迭代对象中的所有项发出调用函数的任务。
starmap()
函数与 map()
函数之间的关键区别在于,starmap()
支持接受多个参数的目标函数,而 map()
函数只支持接受一个参数的目标函数。
以下总结了这两个函数之间的关键区别:
starmap()
函数会阻塞,而 starmap_async()
函数不会阻塞。starmap()
函数返回目标函数的返回值的可迭代对象,而 starmap_async()
函数返回一个 AsyncResult。starmap()
函数不支持回调函数,而 starmap_async()
函数可以在返回值和错误上执行回调函数。代码:
python# SuperFastPython.com
# example of using starmap() with the process pool
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, value):
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return (identifier, value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i, random()) for i in range(10)]
# execute tasks and process results in order
for result in pool.starmap(task, items):
print(f'Got result: {result}', flush=True)
# process pool is closed automatically
结果:
Task 0 executing with 0.3552299985706231 Task 1 executing with 0.4106210262382387 Task 2 executing with 0.581990122090642 Task 3 executing with 0.2817390114152698 Task 4 executing with 0.9794450638746192 Task 5 executing with 0.25827697923383863 Task 6 executing with 0.056572848615411786 Task 7 executing with 0.40846379168254066 Task 8 executing with 0.938812338604445 Task 9 executing with 0.7132943335862058 Got result: (0, 0.3552299985706231) Got result: (1, 0.4106210262382387) Got result: (2, 0.581990122090642) Got result: (3, 0.2817390114152698) Got result: (4, 0.9794450638746192) Got result: (5, 0.25827697923383863) Got result: (6, 0.056572848615411786) Got result: (7, 0.40846379168254066) Got result: (8, 0.938812338604445) Got result: (9, 0.7132943335862058)
代码:
# SuperFastPython.com # example of using starmap() in the process pool with no return values from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issue tasks to the process pool and wait for tasks to complete pool.starmap(task, items) # process pool is closed automatically
结果
Task 1 executing with 0.05512225599057419 Task 0 executing with 0.294523851506147 Task 2 executing with 0.21723792346516735 Task 3 executing with 0.6907330242768945 Task 4 executing with 0.7724781314080533 Task 5 executing with 0.3332543827290817 Task 6 executing with 0.6503655461021364 Task 7 executing with 0.19290441469853814 Task 8 executing with 0.10586752653015896 Task 9 executing with 0.28346981545727257
代码:
# SuperFastPython.com # example of using starmap() in the process pool with chunksize from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # execute tasks and process results in order for result in pool.starmap(task, items, chunksize=2): print(f'Got result: {result}', flush=True) # process pool is closed automatically
结果:
Task 0 executing with 0.4825672128410279 Task 2 executing with 0.7572436608620695 Task 4 executing with 0.5964237878448914 Task 6 executing with 0.6067230792109038 Task 8 executing with 0.871739445952691 Task 1 executing with 0.4983217301877694 Task 5 executing with 0.19180226403398437 Task 7 executing with 0.24773169180249732 Task 3 executing with 0.18219982599738882 Task 9 executing with 0.11196408703764815 Got result: (0, 0.4825672128410279) Got result: (1, 0.4983217301877694) Got result: (2, 0.7572436608620695) Got result: (3, 0.18219982599738882) Got result: (4, 0.5964237878448914) Got result: (5, 0.19180226403398437) Got result: (6, 0.6067230792109038) Got result: (7, 0.24773169180249732) Got result: (8, 0.871739445952691) Got result: (9, 0.11196408703764815)
Pool.starmap() 函数的问题在于,它会阻塞直到所有任务完成。调用者必须等待直到提供的可迭代对象上所有的函数调用返回。
Pool.starmap_async() 函数提供了一种绕过这种限制
starmap_async()
和 starmap()
都可以用于通过进程池向接受多个参数的函数发出任务。
以下总结了这两个函数之间的主要区别:
starmap_async()
函数不会阻塞,而 starmap()
函数会阻塞。starmap_async()
函数返回一个 AsyncResult,而 starmap()
函数返回目标函数的返回值的可迭代对象。starmap_async()
函数可以在返回值和错误上执行回调函数,而 starmap()
函数不支持回调函数。# SuperFastPython.com # example of parallel starmap_async() with the process pool from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # iterate results for result in result.get(): print(f'Got result: {result}', flush=True) # process pool is closed automatically
# SuperFastPython.com # example of parallel starmap_async() with the process pool and no return values from random import random from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(identifier, value): # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool result = pool.starmap_async(task, items) # wait for tasks to complete result.wait() # process pool is closed automatically
python# SuperFastPython.com
# example of parallel starmap_async() with the process pool and wait for tasks to complete
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, value):
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items1 = [(i, random()) for i in range(10)]
# issues tasks to process pool
_ = pool.starmap_async(task, items1)
# prepare arguments
items2 = [(i, random()) for i in range(11, 20)]
# issues tasks to process pool
_ = pool.starmap_async(task, items2)
# close the process pool
pool.close()
# wait for all tasks to complete and processes to close
pool.join()
# process pool is closed automatically
代码:
python# SuperFastPython.com
# example of parallel starmap_async() with the process pool and a callback function
from random import random
from time import sleep
from multiprocessing.pool import Pool
# custom callback function
def custom_callback(result):
print(f'Callback got values: {result}')
# task executed in a worker process
def task(identifier, value):
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return (identifier, value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i, random()) for i in range(10)]
# issues tasks to process pool
_ = pool.starmap_async(task, items, callback=custom_callback)
# close the process pool
pool.close()
# wait for all tasks to complete and processes to close
pool.join()
结果:
Task 0 executing with 0.8787468763744749 Task 1 executing with 0.855238661469258 Task 2 executing with 0.6214196781628788 Task 3 executing with 0.06560315349947898 Task 4 executing with 0.5347215884684701 Task 5 executing with 0.21791466631433876 Task 6 executing with 0.043874633635723415 Task 7 executing with 0.4409367950368527 Task 8 executing with 0.15818804142848264 Task 9 executing with 0.5263502637236465 Callback got values: [(0, 0.8787468763744749), (1, 0.855238661469258), (2, 0.6214196781628788), (3, 0.06560315349947898), (4, 0.5347215884684701), (5, 0.21791466631433876), (6, 0.043874633635723415), (7, 0.4409367950368527), (8, 0.15818804142848264), (9, 0.5263502637236465)]
然后调用 starmap_async()
函数处理范围和返回回调函数。这会为准备好的列表中的每个元组发出十次对 task()
函数的调用。
主进程随后关闭进程池,并阻塞直到所有任务完成并且进程池中的所有进程关闭。
每次调用 task()
函数报告一个消息,阻塞一段时间,然后返回一个值元组。
当所有 task()
函数都完成时,将调用回调函数,并提供返回值的可迭代对象。
这个可迭代对象被直接打印出来,一次性显示所有返回值。
最后,工作进程被关闭,主进程继续执行。
代码:
# SuperFastPython.com # example of parallel starmap_async() with the process pool and an error callback function from random import random from time import sleep from multiprocessing.pool import Pool # custom error callback function def custom_error_callback(error): print(f'Got an error: {error}') # task executed in a worker process def task(identifier, value): # conditionally raise an error if identifier == 5: raise Exception('Something bad happened') # report a message print(f'Task {identifier} executing with {value}', flush=True) # block for a moment sleep(value) # return the generated value return (identifier, value) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool() as pool: # prepare arguments items = [(i, random()) for i in range(10)] # issues tasks to process pool _ = pool.starmap_async(task, items, error_callback=custom_error_callback) # close the process pool pool.close() # wait for all tasks to complete and processes to close pool.join()
结果;
Task 0 executing with 0.32452658689493674 Task 1 executing with 0.5821179337942584 Task 2 executing with 0.7997124576401543 Task 3 executing with 0.1943927607174064 Task 4 executing with 0.10711240651880105 Task 6 executing with 0.4827355123145727 Task 7 executing with 0.5298204685358817 Task 8 executing with 0.6903627900729435 Task 9 executing with 0.6311929788118359 Got an error: Something bad happened
代码:
python# SuperFastPython.com
# example of parallel starmap_async() with the process pool and handle an exception
from random import random
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task(identifier, value):
# conditionally raise an error
if identifier == 5:
raise Exception('Something bad happened')
# report a message
print(f'Task {identifier} executing with {value}', flush=True)
# block for a moment
sleep(value)
# return the generated value
return (identifier, value)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool() as pool:
# prepare arguments
items = [(i, random()) for i in range(10)]
# issues tasks to process pool
result = pool.starmap_async(task, items)
# get the return values
try:
values = result.get()
except Exception as e:
print(f'Failed with: {e}')
结果:
Task 0 executing with 0.6494174827605155 Task 1 executing with 0.7181300600649207 Task 2 executing with 0.7332478123825167 Task 3 executing with 0.6059214187100785 Task 4 executing with 0.5043679220008417 Task 6 executing with 0.982612191193357 Task 7 executing with 0.553035350282885 Task 8 executing with 0.4658380553573399 Task 9 executing with 0.8045380093351808 Failed with: Something bad happened
我们在比较用于向进程池发出任务的函数时,可能考虑的一些属性包括:
本文作者:Eric
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!