编辑
2024-07-04
👨‍🎓 无限进步
00
请注意,本文编写于 350 天前,最后修改于 225 天前,其中某些信息可能已经过时。

目录

need to initialize Worker Process
how to initialize Worker Process
Example of initizing worker process
Exapmle of Thread and Process Names the initialize
Example of Accessing an initialized Variable in a worker

need to initialize Worker Process

我们可以使用诸如 apply() 的函数向进程池发出临时任务,也可以使用诸如 map() 的函数将相同的函数应用于一个可迭代的项目集合。发出的任务的结果可以同步获取,或者可以稍后使用诸如 apply_async()map_async() 的异步版本的函数获取任务的结果。

在使用进程池时,我们可能需要初始化一个变量、数据或资源,以便每个工作进程在执行其任务时都可以使用该资源。例如,可能需要为每个进程分配一个日志记录的句柄或连接到远程服务器的连接,并在执行任务时保持打开和重复使用。

我们需要一种方法,在执行任何任务之前,调用一个函数来初始化进程池中每个工作进程。

how to initialize Worker Process

例如:

python
# 工作进程初始化函数 def worker_init(): # ... # 创建进程池并初始化工作进程 pool = multiprocessing.pool.Pool(initializer=worker_init)

如果我们的工作进程初始化函数需要参数,可以通过 initargs 参数将这些参数传递给进程池的构造函数,initargs 接受一个有序的列表或元组,用于传递自定义初始化函数的参数。

例如:

python
# 工作进程初始化函数 def worker_init(arg1, arg2, arg3): # ... # 创建进程池并初始化工作进程 pool = multiprocessing.pool.Pool(initializer=worker_init, initargs=(arg1, arg2, arg3))

现在我们知道如何在进程池中初始化工作进程了,让我们看一个示例。

Example of initizing worker process

python
# SuperFastPython.com # example of initializing worker processes in the process pool from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # report a message print('Worker executing task...', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # report a message print('Initializing worker...', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join()

输出结果:

Initializing worker... Worker executing task... Initializing worker... Worker executing task... Worker executing task... Worker executing task...

Exapmle of Thread and Process Names the initialize

python
# SuperFastPython.com # example of initializing workers in a process pool and reporting threads and processes from time import sleep from multiprocessing.pool import Pool from multiprocessing import current_process from threading import current_thread # task executed in a worker process def task(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Worker executing task, process={process.name}, thread={thread.name}', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def initialize_worker(): # get the current process process = current_process() # get the current thread thread = current_thread() # report a message print(f'Initializing worker, process={process.name}, thread={thread.name}', flush=True) # protect the entry point if __name__ == '__main__': # create and configure the process pool with Pool(2, initializer=initialize_worker) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join()

运行这个示例会创建线程池,并配置它以便使用我们的自定义函数初始化工作进程,就像之前一样。

四个任务被提交到进程池,并且主进程等待所有任务完成。

在初始化工作进程和执行任务时,报告了工作进程及其进程内线程的名称。

有趣的是,我们可以看到执行任务的进程和初始化工作进程的进程是相同的,而且在每种情况下使用的线程也是相同的。

例如,第一个工作进程的名称是“SpawnPoolWorker-1”,它在主线程“MainThread”中执行初始化函数和任务函数。

这突显了我们可以在工作进程中使用全局变量和线程局部存储等机制来在工作进程执行的任务之间共享数据。

结果:

Initializing worker, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Initializing worker, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread Worker executing task, process=SpawnPoolWorker-1, thread=MainThread Worker executing task, process=SpawnPoolWorker-2, thread=MainThread

Example of Accessing an initialized Variable in a worker

python
# SuperFastPython.com # example of sharing data with all workers via the initializer from time import sleep from multiprocessing.pool import Pool # task executed in a worker process def task(): # declare global variable global custom_data # report a message with global variable print(f'Worker executing with: {custom_data}', flush=True) # block for a moment sleep(1) # initialize a worker in the process pool def worker_init(custom): # declare global variable global custom_data # assign the global variable custom_data = custom # report a message print(f'Initializing worker with: {custom_data}', flush=True) # protect the entry point if __name__ == '__main__': # define data to share with all workers data = 'Global State' # create and configure the process pool with Pool(2, initializer=worker_init, initargs=(data,)) as pool: # issue tasks to the process pool for _ in range(4): pool.apply_async(task) # close the process pool pool.close() # wait for all tasks to complete pool.join()

结果:

Initializing worker with: Global State Worker executing with: Global State Initializing worker with: Global State Worker executing with: Global State Worker executing with: Global State Worker executing with: Global State

本文作者:Eric

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!