编辑
2024-08-11
📘总结-保持好奇心
00
请注意,本文编写于 312 天前,最后修改于 225 天前,其中某些信息可能已经过时。

目录

分布式函数调度框架
环境及功能介绍
介绍
5种并发模式
20多种中间件
20种控制功能

分布式函数调度框架

环境及功能介绍

  • python 3.9
  • funboost框架
pip3 install -i http://mirrors.al2iyun.com/pypi/simple/ funboost --upgrade --trusted-host mirrors.aliyun.com

介绍

分布式函数调度框架,支持5种并发模式,20+消息中间件,20种任务控制功能。

用途概念就是常规经典的:生产者+消息队列中间件+消费者 编程思想

5种并发模式

  • threading(使用的是可变线程池,可以智能自动缩小和扩大线程数量)
  • gevent
  • evenlet
  • asyncio(框架可以直接支持asynic定义的协程函数作为任务,celery不支持)
  • single_thread

20多种中间件

RABBITMQ_AMQPSTORM = 0 # 使用 amqpstorm 包操作rabbitmq 作为 分布式消息队列,支持消费确认.推荐这个。 RABBITMQ_RABBITPY = 1 # 使用 rabbitpy 包操作rabbitmq 作为 分布式消息队列,支持消费确认。 REDIS = 2 # 使用 redis 的 list结构,brpop 作为分布式消息队列。随意重启和关闭会丢失大量消息任务,不支持消费确认。 LOCAL_PYTHON_QUEUE = 3 # 使用python queue.Queue实现的基于当前python进程的消息队列,不支持跨进程 跨脚本 跨机器共享任务,不支持持久化,适合一次性短期简单任务。 RABBITMQ_PIKA = 4 # 使用pika包操作rabbitmq 作为 分布式消息队列。 MONGOMQ = 5 # 使用mongo的表中的行模拟的 作为分布式消息队列,支持消费确认。 PERSISTQUEUE = 6 # 使用基于sqlute3模拟消息队列,支持消费确认和持久化,但不支持跨机器共享任务,可以基于本机单机跨脚本和跨进程共享任务,好处是不需要安装中间件。 NSQ = 7 # 基于nsq作为分布式消息队列,支持消费确认。 KAFKA = 8 # 基于kafka作为分布式消息队列,建议使用BrokerEnum.CONFLUENT_KAFKA。 REDIS_ACK_ABLE = 9 # 基于redis的 list + 临时unack的set队列,采用了 lua脚本操持了取任务和加到pengding为原子性,随意重启和掉线不会丢失任务。 SQLACHEMY = 10 # 基于SQLACHEMY 的连接作为分布式消息队列中间件支持持久化和消费确认。支持mysql oracle sqlserver等5种数据库。 ROCKETMQ = 11 # 基于 rocketmq 作为分布式消息队列,这个中间件必须在linux下运行,win不支持。 REDIS_STREAM = 12 # 基于redis 5.0 版本以后,使用 stream 数据结构作为分布式消息队列,支持消费确认和持久化和分组消费,是redis官方推荐的消息队列形式,比list结构更适合。 ZEROMQ = 13 # 基于zeromq作为分布式消息队列,不需要安装中间件,可以支持跨机器但不支持持久化。 RedisBrpopLpush = 14 # 基于redis的list结构但是采用brpoplpush 双队列形式,和 redis_ack_able的实现差不多,实现上采用了原生命令就不需要lua脚本来实现取出和加入unack了。 """ 操作 kombu 包,这个包也是celery的中间件依赖包,这个包可以操作10种中间件(例如rabbitmq redis),但没包括分布式函数调度框架的kafka nsq zeromq 等。 同时 kombu 包的性能非常差,可以用原生redis的lpush和kombu的publish测试发布,使用brpop 和 kombu 的 drain_events测试消费,对比差距相差了5到10倍。 由于性能差,除非是分布式函数调度框架没实现的中间件才选kombu方式(例如kombu支持亚马逊队列 qpid pyro 队列),否则强烈建议使用此框架的操作中间件方式而不是使用kombu。 """ KOMBU = 15 """基于confluent-kafka包,包的性能比kafka-python提升10倍。同时应对反复随意重启部署消费代码的场景,此消费者实现至少消费一次,第8种BrokerEnum.KAFKA是最多消费一次。""" CONFLUENT_KAFKA = 16 """ 基于emq作为中间件的。这个和上面的中间件有很大不同,服务端不存储消息。所以不能先发布几十万个消息,然后再启动消费。mqtt优点是web前后端能交互, 前端不能操作redis rabbitmq kafka,但很方便操作mqtt。这种使用场景是高实时的互联网接口。 """ MQTT = 17 HTTPSQS = 18 # 基于httpsqs的 PULSAR = 20 # 下一代分布式消息系统。5年后会同时取代rabbitmq和kafka。 UDP = 21 # 基于socket udp 实现的。小规模使用不支持持久化,好处是不用安装软件。 TCP = 22 # 基于socket tcp 实现的。小规模使用不支持持久化,好处是不用安装软件。 HTTP = 23 # 基于http实现的,小规模使用不支持持久化,好处是不用安装软件。 NATS = 24 # 高性能中间件nats,中间件服务端性能很好,。 TXT_FILE = 25 # 磁盘txt文件作为消息队列,支持单机持久化,不支持多机分布式 PEEWEE = 26 # peewee包操作mysql,使用peewee包操作mysql表模拟消息队列 REDIS_PUBSUB = 27 # 使用redis pubsub实现的广播,每个消费者都会消费同一条消息。

20种控制功能

分布式: 支持数十种最负盛名的消息中间件.(除了常规mq,还包括用不同形式的如 数据库 磁盘文件 redis等来模拟消息队列) 并发: 支持threading gevent eventlet asyncio 单线程 5种并发模式 叠加 多进程。 多进程不是和前面四种模式平行的,是叠加的,例如可以是 多进程 + 协程,多进程 + 多线程。 控频限流: 例如十分精确的指定1秒钟运行30次函数或者0.02次函数(无论函数需要随机运行多久时间,都能精确控制到指定的消费频率; 分布式控频限流: 例如一个脚本反复启动多次或者多台机器多个容器在运行,如果要严格控制总的qps,能够支持分布式控频限流。 任务持久化: 消息队列中间件天然支持 断点接续运行: 无惧反复重启代码,造成任务丢失。消息队列的持久化 + 消费确认机制 做到不丢失一个消息 (此框架很重视消息的万无一失,就是执行函数的机器支持在任何时候随时肆无忌惮反复粗暴拉电闸断电,或者强制硬关机, 或者直接用锄头把执行函数代码的机器砸掉,只要不是暴力破坏安装了消息队列中间件的机器就行,消息就万无一失, 现在很多人做的简单redis list消息队列,以为就叫做分布式断点接续,那是不正确的,因为这种如果把消息从reidis brpop取出来后, 如果消息正在被执行,粗暴的kill -9脚本或者直接强制关机,那么正在运行的消息就丢失了,如果是多线程同时并发运行很多消息,粗暴重启 会丢失几百个大量消息,这种简单的redis list根本就不能叫做安全的断点续传。 分布式函数调度框架的消费确认机制,保证函数运行完了才确认消费,正在运行突然强制关闭进程不会丢失一个消息, 下次启动还会消费或者被别的机器消费。 此框架的消息万无一失特性,不仅支持rabbbitmq因为原生支持,也支持redis,框架对redis的实现机制是因为客户端加了一层保障)。 定时: 可以按时间间隔、按指定时间执行一次、按指定时间执行多次,使用的是apscheduler包的方式。 延时任务: 例如规定任务发布后,延迟60秒执行,或者规定18点执行。这个概念和定时任务有一些不同。 指定时间不运行: 例如,有些任务你不想在白天运行,可以只在晚上的时间段运行 消费确认: 这是最为重要的一项功能之一,有了这才能肆无忌惮的任性反复重启代码也不会丢失一个任务。 (常规的手写 redis.lpush + redis.blpop,然后并发的运行取出来的消息,随意关闭重启代码瞬间会丢失大量任务, 那种有限的 断点接续 完全不可靠,根本不敢随意重启代码) 立即重试指定次数: 当函数运行出错,会立即重试指定的次数,达到最大次重试数后就确认消费了 重新入队: 在消费函数内部主动抛出一个特定类型的异常ExceptionForRequeue后,消息重新返回消息队列 超时杀死: 例如在函数运行时间超过10秒时候,将此运行中的函数kill 计算消费次数速度: 实时计算单个进程1分钟的消费次数,在日志中显示;当开启函数状态持久化后可在web页面查看消费次数 预估消费时间: 根据前1分钟的消费次数,按照队列剩余的消息数量来估算剩余的所需时间 函数运行日志记录: 使用自己设计开发的 控制台五彩日志(根据日志严重级别显示成五种颜色;使用了可跳转点击日志模板) + 多进程安全切片的文件日志 + 可选的kafka elastic日志 任务过滤: 例如求和的add函数,已经计算了1 + 2,再次发布1 + 2的任务到消息中间件,可以让框架跳过执行此任务。 任务过滤的原理是使用的是函数入参判断是否是已近执行过来进行过滤。 任务过滤有效期缓存: 例如查询深圳明天的天气,可以设置任务过滤缓存30分钟,30分钟内查询过深圳的天气,则不再查询。 30分钟以外无论是否查询过深圳明天的天气,则执行查询。 任务过期丢弃: 例如消息是15秒之前发布的,可以让框架丢弃此消息不执行,防止消息堆积, 在消息可靠性要求不高但实时性要求高的高并发互联网接口中使用 函数状态和结果持久化: 可以分别选择函数状态和函数结果持久化到mongodb,使用的是短时间内的离散mongo任务自动聚合成批量 任务后批量插入,尽可能的减少了插入次数 消费状态实时可视化: 在页面上按时间倒序实时刷新函数消费状态,包括是否成功 出错的异常类型和异常提示 重试运行次数 执行函数的机器名字+进程id+python脚本名字 函数入参 函数结果 函数运行消耗时间等 消费次数和速度生成统计表可视化: 生成echarts统计图,主要是统计最近60秒每秒的消费次数、最近60分钟每分钟的消费次数 最近24小时每小时的消费次数、最近10天每天的消费次数 rpc: 生产端(或叫发布端)获取消费结果。各个发布端对消费结果进行不同步骤的后续处理更灵活,而不是让消费端对消息的处理一干到底。 远程服务器部署消费函数: 代码里面 task_fun.fabric_deploy('192.168.6.133', 22, 'xiaomin', '123456', process_num=2) 只需要这样就可以自动将函数部署在远程机器运行, 无需任何额外操作,不需要借助阿里云codepipeline发版工具 和 任何运维发版管理工具,就能轻松将函数运行在多台远程机器。task_fun指的是被@boost装饰的函数 暂停消费: 支持从python解释器外部/远程机器 ,控制暂停消息消费和继续消费。

TypeError: cannot set 'is_timeout' attribute of immutable type 'TimeoutError'的错误,后来查询CSDN后运行

pip install https://github.com/eventlet/eventlet/archive/master.zip

解决了问题.

本文作者:Eric

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!