我们可以使用诸如 apply()
的函数向进程池发出临时任务,也可以使用诸如 map()
的函数将相同的函数应用于一个可迭代的项目集合。发出的任务的结果可以同步获取,或者可以稍后使用诸如 apply_async()
和 map_async()
的异步版本的函数获取任务的结果。
在使用进程池时,我们可能需要初始化一个变量、数据或资源,以便每个工作进程在执行其任务时都可以使用该资源。例如,可能需要为每个进程分配一个日志记录的句柄或连接到远程服务器的连接,并在执行任务时保持打开和重复使用。
我们需要一种方法,在执行任何任务之前,调用一个函数来初始化进程池中每个工作进程。
例如:
python# 工作进程初始化函数
def worker_init():
# ...
# 创建进程池并初始化工作进程
pool = multiprocessing.pool.Pool(initializer=worker_init)
如果我们的工作进程初始化函数需要参数,可以通过 initargs
参数将这些参数传递给进程池的构造函数,initargs
接受一个有序的列表或元组,用于传递自定义初始化函数的参数。
例如:
python# 工作进程初始化函数
def worker_init(arg1, arg2, arg3):
# ...
# 创建进程池并初始化工作进程
pool = multiprocessing.pool.Pool(initializer=worker_init, initargs=(arg1, arg2, arg3))
现在我们知道如何在进程池中初始化工作进程了,让我们看一个示例。
python# SuperFastPython.com
# example of initializing worker processes in the process pool
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# report a message
print('Worker executing task...', flush=True)
# block for a moment
sleep(1)
# initialize a worker in the process pool
def initialize_worker():
# report a message
print('Initializing worker...', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(2, initializer=initialize_worker) as pool:
# issue tasks to the process pool
for _ in range(4):
pool.apply_async(task)
# close the process pool
pool.close()
# wait for all tasks to complete
pool.join()
输出结果:
Initializing worker... Worker executing task... Initializing worker... Worker executing task... Worker executing task... Worker executing task...
python# SuperFastPython.com
# example of initializing workers in a process pool and reporting threads and processes
from time import sleep
from multiprocessing.pool import Pool
from multiprocessing import current_process
from threading import current_thread
# task executed in a worker process
def task():
# get the current process
process = current_process()
# get the current thread
thread = current_thread()
# report a message
print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True)
# block for a moment
sleep(1)
# initialize a worker in the process pool
def initialize_worker():
# get the current process
process = current_process()
# get the current thread
thread = current_thread()
# report a message
print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(2, initializer=initialize_worker) as pool:
# issue tasks to the process pool
for _ in range(4):
pool.apply_async(task)
# close the process pool
pool.close()
# wait for all tasks to complete
pool.join()
运行这个示例会创建线程池,并配置它以便使用我们的自定义函数初始化工作进程,就像之前一样。
四个任务被提交到进程池,并且主进程等待所有任务完成。
在初始化工作进程和执行任务时,报告了工作进程及其进程内线程的名称。
有趣的是,我们可以看到执行任务的进程和初始化工作进程的进程是相同的,而且在每种情况下使用的线程也是相同的。
例如,第一个工作进程的名称是“SpawnPoolWorker-1”,它在主线程“MainThread”中执行初始化函数和任务函数。
这突显了我们可以在工作进程中使用全局变量和线程局部存储等机制来在工作进程执行的任务之间共享数据。
结果:
Initializing worker, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Initializing worker, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread
python# SuperFastPython.com
# example of sharing data with all workers via the initializer
from time import sleep
from multiprocessing.pool import Pool
# task executed in a worker process
def task():
# declare global variable
global custom_data
# report a message with global variable
print(f'Worker executing with: {custom_data}', flush=True)
# block for a moment
sleep(1)
# initialize a worker in the process pool
def worker_init(custom):
# declare global variable
global custom_data
# assign the global variable
custom_data = custom
# report a message
print(f'Initializing worker with: {custom_data}', flush=True)
# protect the entry point
if __name__ == '__main__':
# define data to share with all workers
data = 'Global State'
# create and configure the process pool
with Pool(2, initializer=worker_init, initargs=(data,)) as pool:
# issue tasks to the process pool
for _ in range(4):
pool.apply_async(task)
# close the process pool
pool.close()
# wait for all tasks to complete
pool.join()
结果:
Initializing worker with: Global State Worker executing with: Global State Initializing worker with: Global State Worker executing with: Global State Worker executing with: Global State Worker executing with: Global State
本文作者:Eric
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!