编辑
2022-11-15
🧰语言-python
00
请注意,本文编写于 947 天前,最后修改于 225 天前,其中某些信息可能已经过时。

目录

简介及定义
安装
快速开始
客户端使用
1.发起请求
2.发送json请求
3.传递url中的参数
4.获取响应内容
获取二进制响应内容
获取json响应内容
获取流失响应内容
5.获取请求信息
自定义headers信息
自定义Cookies
6.发起更复杂的post请求
发送多部分编码文件
7.流式上传
8.上传预压缩过的数据
9.持久连接(keep-aliver),连接池和cookies共享
10.安全cookies
11. 使用虚假Cookie Jar
12. 使用连接器
限制连接池的容量
使用自定义域名服务器
为Tcp sockets添加ssl控制
13. unix域套接字
14. 代理支持
15.查看响应状态码
16.获取响应cookies
17.使用websockets

简介及定义

aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio(Python用于支持异步编程的标准库)的异步库。

核心功能

  • 同时支持客户端使用和服务端使用。
  • 同时支持服务端WebSockets组件和客户端WebSockets组件,开箱即用呦。
  • web服务器具有中间件,信号组件和可插拔路由的功能。

安装

shell
pip install aiohttp

用安装更快的cchardet来替代chardet进行解码

bash
pip install cchardet

对于更快的客户端API DNS解析方案,aiodns是个很好的选择,极力推荐:

bash
$ pip install aiodns

快速开始

client.py

python
# coding:utf-8 """ 文件直接运行 """ import aiohttp import asyncio import async_timeout # 定义异步函数=协程 async def fetch(session, url): with async_timeout.timeout(10): async with session.get(url) as response: # 必须使用await,不能使用yield from;如果是使用yield from ,需要采用@asyncio.coroutine相对应 return await response.text() async def main(): async with aiohttp.ClientSession() as session: # 建立一个session对象,用session对象去打开网页 html = await fetch(session, 'http://www.baidu.com') print(html.encode('utf-8')) ''' async def 用来定义异步函数,其内部有异步操作。每个线程有一个事件循环,主线程调用asyncio.get_event_loop()时会创建事件循环,把异步的任务丢给这个循环的run_until_complete()方法,事件循环会安排协同程序的执行。 ''' loop = asyncio.get_event_loop() #获取“事件循环”对象 loop.run_until_complete(main()) #通过事件循环,去调用协程函数 loop.close() # 结束时间循环

server.py

python
# coding:utf-8 """ 1. 运行文件 2. 在8080端口访问 """ from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.router.add_get('/', handle) app.router.add_get('/{name}', handle) web.run_app(app)

客户端使用

1.发起请求

client2.py

python
#coding:utf-8 import asyncio import aiohttp async def main(): async with aiohttp.ClientSession() as session: async with session.get('https://api.github.com/events') as resp: print(resp.status) # print(type(resp.text())) a = await resp.text() a = a.encode('utf-8') print(a) loop = asyncio.get_event_loop() #获取“事件循环”对象 loop.run_until_complete(main())

其他请求:

注意 不要为每个请求都创建一个会话。大多数情况下每个应用程序只需要一个会话就可以执行所有的请求。 每个会话对象都包含一个连接池,可复用的连接和持久连接状态(keep-alives,这两个是默认的)可提升总体的执行效率。

2.发送json请求

每个会话的请求方法都可接受json参数

python
async with aiohttp.ClientSession() as session: async with session.post(json={'test':'object'})

默认情况下,会话session使用Python标准库里的json模块解析json信息。但还可使用其他的json解析器。可以给clientsession制定json_serlialize参数实现

python
import ujson async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session: async with session.post(json={'test':'object'})

3.传递url中的参数

python
params = {'key1': 'value1', 'key2': 'value2'} async with session.get('http://httpbin.org/get', params=params) as resp: assert str(resp.url) == 'http://httpbin.org/get?key2=value2&key1=value1'

同键不同值的**并联字典(MultiDict) **也同样支持。 可使用带有两个tuples(元组,python中的数据类型)的list(列表,python中的数据类型)来构建:

python
params = [('key', 'value1'), ('key', 'value2')] async with session.get('http://httpbin.org/get', params=params) as r: assert str(r.url) == 'http://httpbin.org/get?key=value2&key=value1'

同样也允许你传递str(字符串)给params,但要小心一些不能被编码的字符。+就是一个不能被编码的字符:

python
async with session.get('http://httpbin.org/get', params='key=value+1') as r: assert str(r.url) == 'http://httpbin.org/get?key=value+1'

注意 aiohttp会在发送请求前标准化URL。 域名部分会用IDNA 编码,路径和查询条件会重新编译(requoting)。 比如

('http://example.com/путь%30?a=%31') 会被转化为URL('http://example.com/%D0%BF%D1%83%D1%82%D1%8C/0?a=1') 如果服务器需要接受准确的表示并不要求编译URL,那标准化过程应是禁止的。 禁止标准化可以使用encoded=True:

python
await session.get(URL('http://example.com/%30', encoded=True))

传递params时不要用encode=True,这俩参数不能同时使用。

4.获取响应内容

我们可以读取服务器的响应内容

python
async with session.get('https://api.github.com/events') as resp: print(await resp.text())

aiohttp将会自动解码内容。你可以为text()方法指定编码(使用encoding参数):

python
await resp.text(encoding='windows-1251')

获取二进制响应内容

python
print(await resp.read()) b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

获取json响应内容

python
async with session.get('https://api.github.com/events') as resp: print(await resp.json())

注意 这些方法会读出内存中所有响应的内容。如果你要读非常多的数据,考虑使用流式响应方法进行读取。

获取流失响应内容

read(), json(), text()等方法使用起来很方便,但也要注意谨慎地使用。上述方法会将所有的响应内容加载到内存。举个例子,如果你要下载几个G的文件,这些方法还是会将所有内容都加载到内存,内存会表示"臣妾做不到啊~"(如果内存不够的话)。作为代替你可以用content属性。content其实是 aiohttp.StreamReader类的实例。gzip和deflate传输编码同样会自动解码。

python
async with session.get('https://api.github.com/events') as resp: await resp.content.read(10)

一般情况下,你可以使用下列模式将内容保存在一个文件中:

python
with open(filename, 'wb') as fd: while True: chunk = await resp.content.read(chunk_size) if not chunk: break fd.write(chunk)

5.获取请求信息

ClientResponse(客户端响应)对象含有request_info(请求信息),主要是url和headers信息。 raise_for_status结构体上的信息会被复制给ClientResponseError实例。

自定义headers信息

python
import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} headers = {'content-type': 'application/json'} await session.post(url, data=json.dumps(payload), headers=headers)

自定义Cookies

python
url = 'http://httpbin.org/cookies' cookies = {'cookies_are': 'working'} async with ClientSession(cookies=cookies) as session: async with session.get(url) as resp: assert await resp.json() == { "cookies": {"cookies_are": "working"}}

6.发起更复杂的post请求

如果你想发送非表单形式的数据你可用str(字符串)代替dict(字典)。这些数据会直接发送出去。 例如,GitHub API v3 接受JSON编码POST/PATCH数据:

python
import json url = 'https://api.github.com/some/endpoint' payload = {'some': 'data'} async with session.post(url, data=json.dumps(payload)) as resp: ...

发送多部分编码文件

python
url = 'http://httpbin.org/post' files = {'file': open('report.xls', 'rb')} await session.post(url, data=files)

你也可以显式地设置文件名,文件类型:

python
url = 'http://httpbin.org/post' data = FormData() data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel') await session.post(url, data=data)

7.流式上传

aiohttp 支持多种形式的流式上传,允许你直接发送大文件而不必读到内存。

python
with open('massive-body', 'rb') as f: await session.post('http://httpbin.org/post', data=f)

或者使用aiohttp.streamer对象:

python
@aiohttp.streamer def file_sender(writer, file_name=None): with open(file_name, 'rb') as f: chunk = f.read(2**16) while chunk: yield from writer.write(chunk) chunk = f.read(2**16) # 之后你可以使用’file_sender‘传递给data: async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp: print(await resp.text())

同样可以使用StreamReader对象.

我们来看下如何把来自于另一个请求的内容作为文件上传并计算其SHA1值:

python
async def feed_stream(resp, stream): h = hashlib.sha256() while True: chunk = await resp.content.readany() if not chunk: break h.update(chunk) stream.feed_data(chunk) return h.hexdigest() resp = session.get('http://httpbin.org/post') stream = StreamReader() loop.create_task(session.post('http://httpbin.org/post', data=stream)) file_hash = await feed_stream(resp, stream)

因为响应对象的content属性是一个StreamReader实例,所以你可以将get和post请求连在一起用:

python
r = await session.get('http://python.org') await session.post('http://httpbin.org/post', data=r.content)

8.上传预压缩过的数据

上传一个已经压缩过的数据,需要为Headers中的Content-Encoding指定算法名(通常是deflate或者是zlib).

python
async def my_coroutine(session, headers, my_data): data = zlib.compress(my_data) headers = {'Content-Encoding': 'deflate'} async with session.post('http://httpbin.org/post', data=data, headers=headers) pass

9.持久连接(keep-aliver),连接池和cookies共享

ClientSession可以在多个请求之间共享cookies

python
async with aiohttp.ClientSession() as session: await session.get( 'http://httpbin.org/cookies/set?my_cookie=my_value') filtered = session.cookie_jar.filter_cookies('http://httpbin.org') assert filtered['my_cookie'].value == 'my_value' async with session.get('http://httpbin.org/cookies') as r: json_body = await r.json() assert json_body['cookies']['my_cookie'] == 'my_value'

你也可以为所有的会话请求设置headers:

python
async with aiohttp.ClientSession( headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session: async with session.get("http://httpbin.org/headers") as r: json_body = await r.json() assert json_body['headers']['Authorization'] == \ 'Basic bG9naW46cGFzcw=='

10.安全cookies

ClientSession中的默认的aiohttp.CookiesJar使用的是严苛模式,RFC 2109明确禁止使用ip地址形式的URL携带cookies信息。比如: http://127.0.0.1:80/cookie 这样很好,不过有些时候我们测试时需要允许携带cookies。在aiohttp.CookiesJar中传递unsafe=True来实现这一效果:

python
jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)

11. 使用虚假Cookie Jar

有时不想处理cookie。这时可以在会话中使用aiohttp.DummyCookieJar来达到目的。

python
jar = aiohttp.DummyCookieJar() session = aiohttp.ClientSession(cookie_jar=jar)

12. 使用连接器

想要调整请求的传输层你可以为ClientSession及其同类组件传递自定义的连接器。例如:

python
conn = aiohttp.TCPConnector() session = aiohttp.ClientSession(connector=conn)

限制连接池的容量

限制同一时间打开的连接数可以传递limit参数:

python
conn = aiohttp.TCPConnector(limit=30)

这样就将总数限制在30.

默认情况下是100.

如果你不想有限制,传递0即可:

限制同一时间在同一个端点((host, port, is_ssl) 3者都一样的情况)打开的连接数可指定limit_per_host参数:

python
conn = aiohttp.TCPConnector(limit_per_host=30)

使用自定义域名服务器

底层需要aiodns支持:

python
from aiohttp.resolver import AsyncResolver resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"]) conn = aiohttp.TCPConnector(resolver=resolver)

为Tcp sockets添加ssl控制

默认情况下aiohttp总会对使用了HTTPS协议(的URL请求)查验其身份。但也可将verify_ssl设置为False让其不检查:

python
r = await session.get('https://example.com', verify_ssl=False)

如果你需要设置自定义SSL信息(比如使用自己的证书文件)你可以创建一个ssl.SSLContext实例并传递到ClientSession中:

python
sslcontext = ssl.create_default_context( cafile='/path/to/ca-bundle.crt') r = await session.get('https://example.com', ssl_context=sslcontext)

如果你需要忽略所有SSL的错误:

python
try: await session.get('https://expired.badssl.com/') except aiohttp.ClientSSLError as e: assert isinstance(e, ssl.SSLError) try: await session.get('https://wrong.host.badssl.com/') except aiohttp.ClientSSLError as e: assert isinstance(e, ssl.CertificateError)

13. unix域套接字

如果你的服务器使用UNIX域套接字你可以用UnixConnector:

python
conn = aiohttp.UnixConnector(path='/path/to/socket') session = aiohttp.ClientSession(connector=conn)

什么是套接字?

TCP用主机的IP地址加上主机上的端口号作为TCP连接的端点,这种端点就叫做套接字(socket)或插口。套接字用(IP地址:端口号)表示,区分不同应用程序进程间的网络通信和连接,主要有3个参数:通信的目的IP地址、使用的传输层协议(TCP或UDP)和使用的端口号。

14. 代理支持

aiohttp 支持 HTTP/HTTPS形式的代理。你需要使用proxy参数:

python
async with aiohttp.ClientSession() as session: async with session.get("http://python.org", proxy="http://some.proxy.com") as resp: print(resp.status)

同时也支持认证代理

python
async with aiohttp.ClientSession() as session: proxy_auth = aiohttp.BasicAuth('user', 'pass') async with session.get("http://python.org", proxy="http://some.proxy.com", proxy_auth=proxy_auth) as resp: print(resp.status)
python
session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

15.查看响应状态码

python
async with session.get('http://httpbin.org/get') as resp: assert resp.status == 200

16.获取响应cookies

python
url = 'http://example.com/some/cookie/setting/url' async with session.get(url) as resp: print(resp.cookies['example_cookie_name'])

17.使用websockets

客户端

python
session = aiohttp.ClientSession() async with session.ws_connect('http://example.org/websocket') as ws: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: if msg.data == 'close cmd': await ws.close() break else: await ws.send_str(msg.data + '/answer') elif msg.type == aiohttp.WSMsgType.CLOSED: break elif msg.type == aiohttp.WSMsgType.ERROR: break

你只能使用一种读取方式(例如await ws.receive() 或者 async for msg in ws:)和写入方法,但可以有多个写入任务,写入任务也是异步完成的(ws.send_str('data'))。

官方文档

本文作者:Eric

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!