编辑
2022-11-16
🧰语言-python
00
请注意,本文编写于 946 天前,最后修改于 225 天前,其中某些信息可能已经过时。

目录

python的并发与异步模块
1.异步与同步的概念
2. 并发与并行
3. python的并发方式
多进程、多线程、GIL
协程
4. 协程的实现方式
a.案例-访问一个网站
b. 访问多个网站
c. gather与wait的区别
d. 带返回值的异步
e. 带回调函数的异步
f. 取消协程任务
g. 协程相互嵌套
5. 定时启动任务
6. 结合线程池
使用request完成异步编程(使用线程池)
用socket完成异步编程(使用线程池)
7. 与多进程的结合
基础用法
获取返回结果
线程池管理
对传参控制
生成器的send与yield
附录
1. cpu&io密集型
2. 其他aio库

python的并发与异步模块

1.异步与同步的概念

同步:指的是一系列任务必须逐一完成。所有未被完成的任务,必须等待当前任务完成后,才能开始执行;对于当前执行的任务而言,必须要从头到尾一次执行完。比如我们去体检,测视力的时候,医生需要一个人从上到下把他能看清的符号都指出来,然后再叫下一个人进来。这就是一种同步机制——前一个人没有完成这一项检查之前,后面所有人都不能进来检查视力。

异步:则是在当前任务没有完全完成的情况下,可以在中途暂停,去执行一个其他的任务。当这个任务的暂停结束后,再切换回来继续刚刚的进度执行下去。这种机制主要是针对等待时间比较长,会产生资源浪费的情况。比如体检的时候抽血,结果可能需要10分钟的时间才能出来。那这等待的10分钟就可以去做其他的项目,比如查一下视力,测一下身高。当抽血结果出来之后,再回去取抽血的结果。

异步机制虽然看上去比同步机制更为合理,但是因为让出CPU会造成额外开销,因此异步并不总是比同步机制更合适。

2. 并发与并行

并发(concurrent):是指计算机可以处理多个任务,且并非像串行执行,这些任务并不需要等待计算机完成其中一个后再开始下一个,而是可以在多个任务间切换执行,或者同时执行多个任务。

并行(parallelism):是指则是指多核计算机可以使用自己的多个CPU同时独立地执行多个任务。从定义上可以看出,并行的要求更为严格。必须是多核计算机利用其多核同时处理多个任务。而并发并不要求一定是调用多核同时工作,仅仅调用单核通过时分复用的方式在多个任务中切换执行也属于并发的一种。

Joe Armstrong博士的画描述了这样一个场景,并发就是两队人去一个咖啡机打咖啡,咖啡机只有一台,只是不断切换打咖啡的人来实现服务多个人。并行就是两队人去两个咖啡机打咖啡,咖啡机有两台,但是队内依旧是通过排队换人的方式实现的一个咖啡机服务多个人。

并发的本质,是多路复用,将有限资源强制分给多个用户,而并行则是同时发生的多个并发事件。

什么是多路复用? 多路复用是指两个及其以上用户共享公用信道的一种机制。通过多路复用技术,多个终端能共享一条高速信道,从而达到节省信道资源的目的。分为:频分复用,时分复用,码分复用和波分复用

什么是时分复用技术? 时分复用是采用时间分片的方式来实现传输信道的多路复用,也就是说每一路信号传输都使用信道的全部贷款,但它只是使用其中的某个时隙

比如我们把上课的教室看作一个信道,不同的老师在每天不同的时间段去使用这件教室就相当于是时分复用,因为我们从宏观上来看,同一天这个教室被分配给了多个用户使用,那从微观上来看,在某一时刻该教室还只是分配给了一个老师。

3. python的并发方式

多进程、多线程、GIL

像其他很多语言一样,python的多进程都可以实心并行运算,每一个进程都可以调用一个CPU独立完成自己的任务,在这些进程中,每个进程可以拥有多个进程

然而在python中的多线程是无法同时使用多个cpu的,因为在python中有一个GIL(global Interpreter Lock)存在。在一个进程中,每个线程都需要获取GIL才能进入CPU执行,而GIL在一个进程中只存在一个,因此同一个进程里,最多只有一个线程可以在CPU中执行。

只有当时间片用尽时,或者当前执行的线程等待时,才会释放GIL,由其他线程进行竞争。

因此在python中,处理I/O密集型任务可以使用多线程,但是在CPU密集型任务中,由于多线程只能调用一个CPU,无法完成加速,因此往往使用多进程

协程

由于每个进程都有自己的独立内存空间,因此上下文进程切换的开销比较大。同一个进程内的线程是共享内存的,因此上下文切换比进程要快。但是由于线程需要线程锁,并且当线程锁释放时,线程的工作需要由计算机进行调度,因此依然存在一定的切换开销。

协程(coroutine)则是在进程内更为轻量的一种微线程,多个协程之间切换并不依赖多线程的机制,而是在同一个线程里执行。协程本质上和线程一样,可以看作是一种可以灵活中断的函数。当执行一个普通的函数调用时,如果A调用B,则B运行结束返回之后才会继续执行A。而协程A运行时如果中断去执行协程B,则不需要等B执行完,而是当A的中断条件结束后随时可以由B切换回A。

协程在运行的时候和线程类似,占用空间更小。在切换时,协程比线程效率更高。当一个线程的时间片用尽之后,CPU会中断,由OS切换上下文,此时进程的内核空间需要保存当前线程的上下文。但是协程却不需要保存上下文,因为其调度算法是在用户态完成的。因此,由程序自身控制的协程没有线程切换的开销。同时,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源也不需要多线程的锁机制。

总的来说,协程是在不开辟线程的基础上完成多任务,也就是单线程的情况下完成多任务,多个任务按照一定顺序交替执行,通俗理解只要在def里面只看到一个yield关键字就是协程。

协程概述与使用场景

进程、线程、协程的区别是什么?

4. 协程的实现方式

在python3.5之前,都是使用生成器的一些技巧完成协程任务,他们的调度方式依然是 事件循环+协程模式。

这样设计结构和维护虽然相对于回调函数简单一些,但是代码还是有一些混乱,并且又当作生成器又当作协程,都是还是一些技巧性的东西,为了将语义变得更加明确,于是在python3.5使用了async和await(功能与yield from类似)

关键词正式定义原生协程,asyncio是python解决异步io编程的一个完整框架。

具有如下定义:

  1. 包含各种特定系统实现的模块化事件循环
  2. 传输与协议抽象
  3. 对TCP、UDP、SSL子进程,延时调用以及其他的具体支持
  4. 模仿futures模块适用于事件循环使用到Future类
  5. 基于yield from的协议和任务,可以使用顺序执行的方式编写并发代码
  6. 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  7. 模仿threading 模块中的同步语法,可以用在单线程内实现协程同步

协程编程离不开的三大要点:

  1. 事件循环
  2. 回调(驱动生成器)
  3. epoll/select(IO多路复用)

asyncio是一个异步编程的框架,可以解决异步编程,协程调度问题,线程问题,是整个异步IO的解决方案

a.案例-访问一个网站

python
async def get_url_title(url): # 使用关键词async定义一个协程 print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) # 这一步至关重要 # asyncio.sleep(2) 功能:异步非阻塞等待2s,作用是模拟访问网站消耗的时间 # await 的作用类似 yield,即这个时候把线程资源控制权交出去,监听这个描述符直到这个任务完成 # await 后面只能接三种类型 ''' 1. 协程:Python 协程属于 可等待 对象,因此可以在其他协程中被等待: 2. 任务:任务 被用来设置日程以便 并发 执行协程。(当一个协程通过 asyncio.create_task() 等函数被打包为一个 任务,该协程将自动排入日程准备立即运行) 3. Future 对象:Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。(当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。) 如果await time.sleep(2) 是会报错的 ''' print('网站访问成功') if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 一行代码创造事件循环 loop.run_until_complete(get_url_title('http://www.langzi.fun')) # 这是一个阻塞的方法,可以理解成多线程中的join方法 # 直到get_url_title('http://www.langzi.fun')完成后,才会继续执行下面的代码 end_time = time.time() print('消耗时间:{}'.format(end_time-start_time))

b. 访问多个网站

python
import asyncio import time import requests async def get_url_title(url): print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) print('网站访问成功') def main(): for item in range(30): res = requests.get(url='http://www.langzi.fun') print(res.status_code) if __name__ == '__main__': start_time = time.time() # main() loop = asyncio.get_event_loop() # 创造一个事件循环 tasks = [get_url_title('http://www.langzi.fun')for i in range(30)] # 这个列表代表总任务量,即执行10次get_url_title()函数 loop.run_until_complete(asyncio.wait(tasks)) # asyncio.wait后面接上非空可迭代对象,一般来说是功能函数列表 # 功能是一次性提交多个任务,等待完成 # loop.run_until_complete(asyncio.gather(*tasks)) # 和上面代码功能一致,但是gather更加高级,如果是列表就需要加上* # 这里会等到全部的任务执行完后才会执行后面的代码 end_time = time.time() print('消耗时间:{}'.format(end_time-start_time))

循环访问页面对比结果:

循环次数requestasync
103.9s2.008s
3013.34s2.0089s

c. gather与wait的区别

  • gather更擅长于将函数聚合在一起
  • wait更擅长筛选运行状况

即gather更高级,可以将任务进行分组,也可以取消任务。

python
import asyncio async def get_url_title(url): print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) print('网站访问成功') return 'success' if __name__ == '__main__': loop = asyncio.get_event_loop() # 使用wait方法 # tasks = [get_url_title('http://www.langzi.fun')for i in range(10)] # loop.run_until_complete(asyncio.wait(tasks)) # 使用gather方法实现分组导入(方法1) group1 = [get_url_title('http://www.langzi.fun')for i in range(3)] group2 = [get_url_title('http://www.baidu.com')for i in range(5)] loop.run_until_complete(asyncio.gather(*group1,*group2)) # 这种方法会把两个全部一次性导入 # 使用gather方法实现分组导入(方法2) group1 = [get_url_title('http://www.langzi.fun')for i in range(3)] group2 = [get_url_title('http://www.baidu.com')for i in range(5)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) #group2.cancel() 取消group2任务 loop.run_until_complete(asyncio.gather(group1,group2)) # 这种方法会先把group1导入,然后导入group2

返回结果

开始访问网站:http://www.langzi.fun 开始访问网站:http://www.baidu.com 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 开始访问网站:http://www.baidu.com 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功

另外一种使用gather获取返回结果

python
import asyncio async def get_url_title(url): print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) print('网站访问成功') return 'success' if __name__ == '__main__': loop = asyncio.get_event_loop() # 使用gather方法传递任务获取结果 group1 = asyncio.ensure_future(get_url_title('http://www.langzi.fun')) loop.run_until_complete(asyncio.gather(group1)) # 如果不是列表就不需要加* print(group1.result())

返回结果

开始访问网站:http://www.langzi.fun 网站访问成功 success

d. 带返回值的异步

python
# -*- coding:utf-8 -*- import asyncio import time async def get_url_title(url): print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) print('网站访问成功') return 'success' if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环 get_future = loop.create_task(get_url_title('http://www.langzi.fun')) #get_future = asyncio.ensure_future(get_url_title('http://www.langzi.fun')) # 这两行代码功能用法一模一样 loop.run_until_complete(get_future) print('获取结果:{}'.format(get_future.result())) # 获取结果 end_time = time.time() print('消耗时间:{}'.format(end_time-start_time))

输出

开始访问网站:http://www.langzi.fun 网站访问成功 获取结果:success 消耗时间:2.001279354095459

多个网址传入,访问多个网址的返回值呢?只需要把前面的知识点汇总一起即可使用:

python
if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环 tasks = [loop.create_task(get_url_title('http://www.langzi.fun')) for i in range(10)] # 把所有要返回的函数加载到一个列表 loop.run_until_complete(asyncio.wait(tasks)) # 这里和上面用法一样 print('获取结果:{}'.format([x.result() for x in tasks])) # 因为结果都在一个列表,在列表中取值即可 end_time = time.time() print('消耗时间:{}'.format(end_time-start_time))

输出

开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 开始访问网站:http://www.langzi.fun 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 网站访问成功 获取结果:['success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success', 'success'] 消耗时间:2.005147933959961

e. 带回调函数的异步

python
# -*- coding:utf-8 -*- import asyncio from functools import partial # partial的功能是 固定函数参数,返回一个新的函数。你可以这么理解: ''' from functools import partial def go(x,y): return x+y g = partial(go,y=2) print(g(1)) 返回结果:3 g = partial(go,x=5,y=2) print(g()) 返回结果:7 ''' async def get_url_title(url): print('开始访问网站:{}'.format(url)) await asyncio.sleep(2) print('网站访问成功') # 当这个协程函数快要结束返回值的时候,会调用下面的call_back函数 # 等待call_back函数执行完毕后,才返回这个协程函数的值 return 'success' def call_back(future,url): # 注意这里必须要传递future参数,因为这里的future即代表下面的get_future对象 print('检测网址:{}状态正常'.format(url)) if __name__ == '__main__': loop = asyncio.get_event_loop() # 创建一个事件循环 get_future = loop.create_task(get_url_title('http://www.langzi.fun')) # 将一个任务注册到loop事件循环中 get_future.add_done_callback(partial(call_back,url = 'http://www.langzi.fun')) # 这里是设置,当上面的任务完成要返回结果的时候,执行call_back函数 # 注意call_back函数不能加上(),也就意味着你只能依靠partial方法进行传递参数 loop.run_until_complete(get_future) # 等待任务完成 print('获取结果:{}'.format(get_future.result())) # 获取结果

总结

  1. 协程函数必须要使用关键词async定义
  2. 如果遇到了要等待的对象,必须使用await
  3. 使用await后面的任务,必须是可等待对象(三种主要类型:协程、任务和Future)
  4. 运行前,必须要创建一个事件循环(loop=asyncio,get_event_loop())
  5. 然后把任务加载到该事件循环中即可
  6. 如果需要获取协程函数的返回值,需要使用loop.create_task()或asyncio.ensure_future()函数,在最后使用.result()获取返回结果
  7. 如果想要把多个任务注册到loop中,需要使用一个列表包含他们,调用的时候使用asyncio.wait(list)

f. 取消协程任务

存在多个任务协程,想使用ctrl c推出协程,例如:

python
import asyncio async def get_time_sleep(t): print('开始运行,等待:{}s'.format(t)) await asyncio.sleep(t) print('运行结束') if __name__ == '__main__': loop = asyncio.get_event_loop() # 创建一个事件循环 task_1 = get_time_sleep(1) task_2 = get_time_sleep(2) task_3 = get_time_sleep(3) tasks = [task_1,task_2,task_3] # 三个协程任务加载到一个列表 try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt: # 当检测到键盘输入 ctrl c的时候 all_tasks = asyncio.Task.all_tasks() # 获取注册到loop下的所有task for task in all_tasks: print('开始取消协程') task.cancel() # 取消该协程,如果取消成功则返回True loop.stop() # 停止循环 loop.run_forever() # loop事件循环一直运行 # 这两步必须要做 finally: loop.close() # 关闭事件循环

run_forever()会一直运行,直到stop被调用,但是不能像下面这样调stop

loop.run_forever() loop.stop()

run_forever不返回,stop永远也不会被调用。

可以用 gather 把多个协程合并成一个 future,并添加回调,然后在回调里再去停止 loop。

python
async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') def done_callback(loop, futu): loop.stop() loop = asyncio.get_event_loop() futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop)) loop.run_forever()

关于loop.close(),简单来说,loop 只要不关闭,就还可以再运行。

python
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

g. 协程相互嵌套

python
import asyncio async def sum_tion(x,y): print('开始执行传入参数相加:{} + {}'.format(x,y)) await asyncio.sleep(1) # 模拟等待1S return (x+y) async def print_sum(x,y): result = await sum_tion(x,y) print(result) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1000,2000)) loop.close()

返回结果:

开始执行传入参数相加:1000 + 2000 3000

执行流程:

  1. run_until_complete运行,会注册task(协程:print_sum)并开启事件循环
  2. print_sum协程中嵌套了子协程,此时print_sum协程暂停(类似委托生成器),转到子协程(协程:sum_tion)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操作
  3. 1秒后,调用方将控制权给到子协程(调用方与子协程直接通信),子协程执行接下来的代码,直到再遇到wait(此实例没有)
  4. 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句
  5. 向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。

如果想要获取协程嵌套函数返回的值,就必须使用回调:

python
import asyncio async def sum_tion(x,y)->int: print('开始执行传入参数相加:{} + {}'.format(x,y)) await asyncio.sleep(1) # 模拟等待1S return (x+y) async def print_sum(x,y): result = await sum_tion(x,y) return result def callback(future): return future.result() if __name__ == '__main__': loop = asyncio.get_event_loop() future = loop.create_task(print_sum(100,200)) # 如果想要获取嵌套协程返回的值,就必须使用回调 future.add_done_callback(callback) loop.run_until_complete(future) print(future.result()) loop.close()

返回结果

开始执行传入参数相加:100 + 200 300

5. 定时启动任务

asyncio提供定时启动协程任务,通过call_soon,call_later,call_at实现,他们的区别如下:

  • call_soon 是立即执行
python
def callback(sleep_times): print("预计消耗时间 {} s".format(sleep_times)) def stoploop(loop): print('时间消耗完毕') loop.stop() if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环 loop.call_soon(callback,5) # 立即启动callback函数 loop.call_soon(stoploop,loop) # 上面执行完毕后,立即启动执行stoploop函数 loop.run_forever() #要用这个run_forever运行,因为没有传入协程 print('总共耗时:{}'.format(time.time()-start_time))
  • call_later 是设置一定时间启动执行
python
def callback(sleep_times): print("预计消耗时间 {} s".format(sleep_times)) def stoploop(loop): print('时间消耗完毕') loop.stop() if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() loop.call_later(1,callback,1.0) # 等待1秒后执行callback函数,传入参数是1.0 loop.call_later(5,stoploop,loop) # 等待5秒后执行stoploop函数,传入参数是loop loop.run_forever() print('总共耗时:{}'.format(time.time()-start_time))
  • call_at 与call_later类似,但是他制定的时间不再是传统意义上的时间,而是loop的内部钟时间,效果和call_later一样
python
import time import asyncio def callback(loop): print("传入loop.time()时间为: {} s".format(loop.time())) def stoploop(loop): print('时间消耗完毕') loop.stop() if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() now = loop.time() # loop内部的时钟时间 loop.call_at(now+1,callback,loop) # 等待loop内部时钟时间加上1s后,执行callba函数,传入参数为loop loop.call_at(now+3,callback,loop) # 等待loop内部时钟时间加上3s后,执行callba函数,传入参数为loop loop.call_at(now+5,stoploop,loop) # 等待loop内部时钟时间加上1s后,执行stoploop函数,传入参数为loop
  • call_soon_threadsafe 线程安全的call_soon

    l_soon_threadsafe用法和call_soon一致。但在涉及多线程时, 会使用它.

总结

  1. call_soon直接启动
  2. call_later自己定时启动
  3. call_at根据loop.time()内部的时间,设置等待时间启动
  4. call_soon_threadsafe和call_soon方法一致,是保证线程安全的
  5. 他们都是比较底层的,在正常使用时很少用到。

6. 结合线程池

Asyncio是异步IO编程的解决方案,异步IO是包括多线程,多进程,和协程的。

所以asyncio是可以完成多线程多进程和协程的,在开头说到,协程是单线程的,如果遇到阻塞的话,会阻塞所有的代码任务,所以是不能加入阻塞IO的,但是比如requests库是阻塞的,socket如果不设置setblocking(false)的话,也是阻塞的,这个时候可以放到一个线程中去做也是可以解决的,即在协程中集成阻塞IO,就加入多线程一起解决问题。

使用request完成异步编程(使用线程池)

python
from concurrent.futures import ThreadPoolExecutor import requests import asyncio import time import re def get_url_title(url): # 功能是获取网址的标题 r = requests.get(url) try: title = re.search('<title>(.*?)</title>',r.content.decode(),re.S|re.I).group(1) except Exception as e: title = e print(title) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # 创建一个事件循环 p = ThreadPoolExecutor(5) # 创建一个线程池,开启5个线程 tasks = [loop.run_in_executor(p,get_url_title,'http://www.langzi.fun')for i in range(10)] # 这一步很重要,使用loop.run_in_executor()函数:内部接受的是阻塞的线程池,执行的函数,传入的参数 # 即对网站访问10次,使用线程池访问 loop.run_until_complete(asyncio.wait(tasks)) # 等待所有的任务完成 print(time.time()-start_time)

返回结果

Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 5.589502334594727

访问10次消耗时间为5.5s,尝试将 p = ThreadPoolExecutor(10),线程数量设置成10个线程,消耗时间为4.6s,改用从进程池p = ProcessPoolExecutor(10),也是一样可以运行的,不过10个进程消耗时间也是5.5s,并且消耗更多的CPU资源。

用socket完成异步编程(使用线程池)

python
import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse import time import re def get_url(url): # 通过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = '/' # 建立socket连接 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80)) client.send( "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8')) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode('utf8') html_data = data.split('\r\n\r\n')[1] # 把请求头信息去掉, 只要网页内容 title = re.search('<title>(.*?)</title>',html_data,re.S|re.I).group(1) print(title) client.close() if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() p = ThreadPoolExecutor(3) # 线程池 放3个线程 tasks = [loop.run_in_executor(p,get_url,'http://www.langzi.fun') for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) print('last time:{}'.format(time.time() - start_time))

运行结果

Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 Langzi - Never Setter 永不将就 - 致力于Python开发网络安全工具,分享Python底层与进阶知识,漏洞扫描器开发与爬虫开发 last time:5.132313966751099

7. 与多进程的结合

依赖

  1. python3.6+
  2. asyncio
  3. aiomultiprocess

基础用法

python
import asyncio import aiomultiprocess import aiohttp async def get_url_info(url): # 这一部分就是发起网络请求,没有aiomultiprocess的参与 async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() print(f'{resp.url}:{resp.status}') async def main(): # main()函数是核心函数,负责给每个CPU提供任务 p = aiomultiprocess.Process(target=get_url_info,args=('http://www.langzi.fun',)) # 传入参数 await p # p是消耗事件的操作,需要使用await if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

如果是传入一个网址列表的话:

python
async def main(): # main()函数是核心函数,负责给每个CPU提供任务 tasks = ['http://www.langzi.fun' for i in range(10)] # tasks列表有10个网址 for url in tasks: p = aiomultiprocess.Process(target=get_url_info,args=(url,)) # 传入参数 await p # p是消耗事件的操作,需要使用await if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

获取返回结果

python
# -*- coding:utf-8 -*- import asyncio import aiomultiprocess import aiohttp async def get_url_info(url): # 这一部分就是发起网络请求,没有aiomultiprocess的参与 async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text() #print(f'{resp.url}:{resp.status}') async def main(): # main()函数是核心函数,负责给每个CPU提供任务 p = aiomultiprocess.Worker(target=get_url_info,args=('http://www.langzi.fun',)) # 传入参数,这里用Worker res = await p print(res) # p是消耗事件的操作,需要使用await if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

返回结果

<!DOCTYPE html> <html class="theme-next muse use-motion" lang="zh-Hans"> <head><meta name="generator" content="Hexo 3.8.0"> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" c ....

线程池管理

python
# -*- coding:utf-8 -*- import asyncio import aiomultiprocess import aiohttp async def get_url_info(url): # 这一部分就是发起网络请求,没有aiomultiprocess的参与 async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.read() async def main(): # main()函数是核心函数,负责给每个CPU提供任务 tasks = ['http://www.langzi.fun' for i in range(10)] # 10个网址保存到列表 async with aiomultiprocess.Pool() as pool: # 开启进程池 result = await pool.map(get_url_info,tasks) # 这里必须要await,使用pool.map()方法 print(result) # 返回的结果是一个列表 if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) # 如果是python3.7的话,就直接使用asyncio.run(mian())即可

返回结果

[b'<!DOCTYPE html>\n\n\n\n \n\n\n<html class="theme-next muse use.......

线程池传递参数,设置CPU核心数和协程数量

async with aiomultiprocess.Pool(processes=8, childconcurrency=16) as pool:

对传参控制

对传递的参数一个是列表,一个是单个的字符串

python
dirs = ['a.jpg', 'b.jpg', 'c.jpg'] async def get_image(url:'http://test.com/'): async with aiohttp.ClientSession() as session: for dir in dirs: print(url+dir) async with session.get(url=url+dir,timeout=3) as resp: if resp.status == 200: return await resp.content() # only need one exists image url async def main(): urls = urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/'] async with aiomultiprocess.Pool() as pool: result = await pool.map(get_image,urls) print(result) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())

该库的作者jreese 告诉我可以使用pool.starmap()的方法来处理参数:

python
async def get_image(url, dirs): ... async def main(): urls = [...] dirs = [...] async with Pool() as pool: result = await pool.starmap(get_image, ((url, dirs) for url in urls))

具有如下方法:

  • close:关闭
  • flush:刷新
  • read:读取文件所有
  • readline:读取第一行,结果保存到列表
  • readlines:读取所有行,结果保存到列表
  • write:写入
  • writelines:接收参数是一个可迭代对象,写入

生成器的send与yield

在python的3.5版本前,协程是基于生成器(generator)实现的。在理解协程的工作方式之前,可以先搞清楚生成器的通信问题。这里写一个例子简单说一下生成器的工作方式。

首先构建一个生成器函数gen_demo。

python
recv = 20 def gen_demo(): global recv yield 1 yield 2 recv = yield 3 while recv > 0: yield 4 recv = yield 5 print('recv:', recv) yield 'Finish.'

asyncio是用来编写并发代码的库,使用async/await语法。其作用多个提高性能Python异步框架的基础,包括网络和网站服务,数据链接库、分布式任务队列等

asyncio 是基于生成器

附录

1. cpu&io密集型

通常我们说一个程序是cpu密集型就是cpu用的多,io密集型就是io用的多,这种说法对吗?

答案是:不能算错,但也不全对。其实应该从程序的瓶颈来看待这个问题,若系统瓶颈在cpu,那就是cpu密集型;反之,若在io,则是io密集型。 了解程序是哪种类型,对系统优化有着关键的指导作用

比如我们平时使用的IM软件、网上商城应用都属于io密集型,因为它们主要做的事情是【高频的小数据传输与读写】,一个是网络io,一个是磁盘io。

图像处理、加解密、序列化反序列化等则需要消耗大量的cpu计算资源。

什么是IO密集型? db查询、kv查询、http/rpc接口调用。此时cpu平均负载可能高,但使用率不一定高

什么是CPU密集型? 如排序、加密解密、音视频编/解码、序列化与反序列化、压缩/解压、新建/拷贝大对象,此时CPU使用率高,负载也越高,二者呈正相关

2. 其他aio库

本文作者:Eric

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!