编辑
2024-08-11
📘总结-保持好奇心
00
请注意,本文编写于 312 天前,最后修改于 225 天前,其中某些信息可能已经过时。

目录

python 任务调度框架-APScheduler
1. 简介
2. APScheduler安装
3. APScheduler组件
4. 调度器
5. 作业存储器
6.执行器
7.触发器
8.配置调度程序
9. 启动调度器
10. 添加作业
11. 删除作业
12.暂停和恢复作业
13. 获取作业列表
14. 修改作业属性
15.关闭调度程序
最后
附录-tornado集成Apscheduler定时任务
个人使用tornado集成的代码
1.初始化准备脚本
2. 初始脚本
3. 要运行的脚本
附录-可能出现的bug
1. 添加之后,在start()否则会报错
schedule的使用
1. 环境
2. 安装
3. 每分钟运行任务
函数调用方法
装饰器方法
传递参数
4. 取消任务
5. 运行一次任务
6. 获取所有的任务
7. 取消所有任务
8. 根据标签获取任务
9. 根据标签取消任务
10. 随机秒数运行任务
11. 运行任务截止到具体时间
12. run_all,运行所有的job
13. 后台运行
14. 并行执行
15. 异常处理
16.日志记录
17. 运行多个调度器

python 任务调度框架-APScheduler

1. 简介

APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。官方文档

2. APScheduler安装

  • 方法一:使用pip安装
bash
$ pip install apscheduler
bash
$ python setup.py install

3. APScheduler组件

  • triggers(触发器): 触发器中包含调度逻辑,每个作业都由自己的触发器来决定下次运行时间。除了他们自己初始配置意外,触发器完全是无状态的。 job

stores(作业存储器):存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。

  • executors(执行器):处理作业的运行,他们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  • schedulers(调度器):配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。根据不同的应用场景可以选用不同的调度器,可选的有BlockingScheduler,BackgroundScheduler,AsyncIOScheduler,GeventScheduler,TornadoScheduler,TwistedScheduler,QtScheduler 7种。

4. 调度器

  • BlockingScheduler : 当调度器是你应用中唯一要运行的东西时。
  • BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用(充电桩即使用此种方式)。
  • AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
  • GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
  • TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
  • TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
  • QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。

5. 作业存储器

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下仍然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)

6.执行器

对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。

7.触发器

当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:

  • date 一次性指定日期
  • interval 在某个时间范围内间隔多长时间执行一次
  • cron 和Linux crontab格式兼容,最为强大

最基本的一种调度,作业只会执行一次。它的参数如下:

  • run_date (datetime|str) – 作业的运行日期或时间
  • timezone (datetime.tzinfo|str) – 指定时区 举个栗子:
python
# 2016-12-12运行一次job_function sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text']) # 2016-12-12 12:00:00运行一次job_function sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text']) interval 1 2 3 4 5

间隔调度,参数如下:

  • weeks (int) – 间隔几周
  • days (int) – 间隔几天
  • hours (int) – 间隔几小时
  • minutes (int) – 间隔几分钟
  • seconds (int) – 间隔多少秒
  • start_date (datetime|str) – 开始日期
  • end_date (datetime|str) – 结束日期
  • timezone (datetime.tzinfo|str) – 时区 举个栗子:
python
# 每两个小时调一下job_function sched.add_job(job_function, 'interval', hours=2) cron 1 2

参数如下:

  • year (int|str) – 年,4位数字
  • month (int|str) – 月 (范围1-12)
  • day (int|str) – 日 (范围1-31)
  • week (int|str) – 周 (范围1-53)
  • day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – 时 (范围0-23)
  • minute (int|str) – 分 (范围0-59)
  • second (int|str) – 秒 (范围0-59)
  • start_date (datetime|str) – 最早开始日期(包含)
  • end_date (datetime|str) – 最晚结束时间(包含)
  • timezone (datetime.tzinfo|str) – 指定时区
python
举个栗子: # job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31') 1 2 3 4

8.配置调度程序

在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler的例子:

python
from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # Initialize the rest of the application here, or before the scheduler initialization 1 2 3 4 5 6

这将生成一个名为“default”的MemoryJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最大线程数为10。 如果不满足于当前配置,如希望使用两个执行器有两个作业存储器,并且还想要调整新作业的默认值并设置不同的时区,可按如下配置

python
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor # 配置作业存储器 jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } # 配置执行器,并设置线程数 executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, # 默认情况下关闭新的作业 'max_instances': 3 # 设置调度程序将同时运行的特定作业的最大实例数3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22

9. 启动调度器

启动调度器只需要调用start()方法,对于除BlockingScheduler以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会将作业添加到调度程序。 对于BlockingScheduler,只需在完成初始化步骤后调用start()

python
scheduler.start() 1

10. 添加作业

  • 方法一:调用add_job()方法

    最常见的方法,add_job()方法返回一个apscheduler.job.Job实例,您可以稍后使用它来修改或删除该作业。

  • 方法二:使用装饰器scheduled_job()

    此方法主要是方便的声明在应用程序运行时不会改变的作业

11. 删除作业

  • 方法一:通过作业ID或别名调用remove_job()删除作业
  • 方法二:通过add_job()返回的job实例调用remove()方法删除作业
python
举个栗子: # 实例删除 job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # id删除 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id') 1 2 3 4 5 6

12.暂停和恢复作业

可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:

python
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job() 1 2 恢复作业: apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job() 1 2

13. 获取作业列表

要获得计划作业的机器可处理列表,可以使用get_jobs()方法。 它将返回一个Job实例列表。 如果您只对特定作业存储中包含的作业感兴趣,则将作业存储别名作为第二个参数。

为了方便起见,可以使用print_jobs()方法,它将打印格式化的作业列表,触发器和下次运行时间。

14. 修改作业属性

您可以通过调用apscheduler.job.Job.modify()或modify_job()来修改除id以外的任何作业属性。

python
job.modify(max_instances=6, name='Alternate name') 1

15.关闭调度程序

默认情况下,调度程序关闭其作业存储和执行程序,并等待所有当前正在执行的作业完成,wait=False参数可选,代表立即停止,不用等待。 scheduler.shutdown(wait=False) 附:1、定时任务运行脚本小例子:

python
import datetime from apscheduler.schedulers.blocking import BlockingScheduler from app.untils.log_builder import sys_logging scheduler = BlockingScheduler() # 后台运行 # 设置为每日凌晨00:30:30时执行一次调度程序 @scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30') def rebate(): print "schedule execute" sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) if __name__ == '__main__': try: scheduler.start() sys_logging.debug("statistic scheduler start success") except (KeyboardInterrupt, SystemExit): scheduler.shutdown() sys_logging.debug("statistic scheduler start-up fail") 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

最后

  1. 第一次使用此定时器时总会执行两次,一直不知道为什么,后来发现,python 的flask框架在debug模式下会多开一个线程监测项目变化,所以每次会跑两遍,可以将debug选项改为False

  2. 创建定时任务对象时,指定时区 scheduler=BlockingScheduler(timezone='Asia/Shanghai')

附录-tornado集成Apscheduler定时任务

python
from datetime import datetime from tornado.ioloop import IOLoop, PeriodicCallback from tornado.web import RequestHandler, Application from apscheduler.schedulers.tornado import TornadoScheduler scheduler = None job_ids = [] # 初始化 def init_scheduler(): global scheduler scheduler = TornadoScheduler() scheduler.start() print('[Scheduler Init]APScheduler has been started') # 要执行的定时任务在这里 def task1(options): print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options)) class MainHandler(RequestHandler): def get(self): self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>') class SchedulerHandler(RequestHandler): def get(self): global job_ids job_id = self.get_query_argument('job_id', None) action = self.get_query_argument('action', None) if job_id: # add if 'add' == action: if job_id not in job_ids: job_ids.append(job_id) scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,)) self.write('[TASK ADDED] - {}'.format(job_id)) else: self.write('[TASK EXISTS] - {}'.format(job_id)) # remove elif 'remove' == action: if job_id in job_ids: scheduler.remove_job(job_id) job_ids.remove(job_id) self.write('[TASK REMOVED] - {}'.format(job_id)) else: self.write('[TASK NOT FOUND] - {}'.format(job_id)) else: self.write('[INVALID PARAMS] INVALID job_id or action') if __name__ == "__main__": routes = [ (r"/", MainHandler), (r"/scheduler/?", SchedulerHandler), ] init_scheduler() app = Application(routes, debug=True) app.listen(8888) IOLoop.current().start()

个人使用tornado集成的代码

1.初始化准备脚本

python
# -*- coding:utf-8 -*- import logging import os,sys from Application.Cfg.dir_cfg import model_config,hy2020_dir from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.tornado import TornadoScheduler from apscheduler.jobstores.redis import RedisJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import EVENT_JOB_ERROR from apscheduler.events import EVENT_JOB_EXECUTED LOG_FORMAT = '[%(asctime)s] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s]: %(message)s' # logging.basicConfig(filename=os.path.join(hy2020_dir, 'log/scheduler.log'), level=logging.INFO, format=LOG_FORMAT,\ # datefmt='%Y-%m-%d %H:%M:%S') def listener_function(event): if event.exception: logging.error('任务出错') else: logging.info('任务正常') def init_scheduler(): REDIS = { 'host': model_config.get('redis', "HOSTNAME"),'port': model_config.get('redis', "PORT"), 'db': model_config.get('redis', "DB"),'password': model_config.get('redis', "PASSWORD")} jobstores = {'redis': RedisJobStore(**REDIS)} # 配置执行器,并设置线程数 executors = { 'default': ThreadPoolExecutor(10), # 默认线程数 'processpool': ProcessPoolExecutor(5) # 默认进程 } scheduler = TornadoScheduler(jobstores=jobstores,executors=executors,timezone='Asia/Shanghai',daemon=True) # 添加监听器 scheduler.add_listener(callback=listener_function, mask=(EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)) # 设置日志 # scheduler.pause_job() scheduler._logger = logging # scheduler.add_job(task,"interval",seconds=3,id="job1",args=()) logging.info('[Scheduler Init]APScheduler has been started') return scheduler

2. 初始脚本

python
class GlobalVar(object): def _init(): """ 初始化 """ global _global_dict _global_dict = {} def set_value(key,value): """ 定义一个全局变量 """ _global_dict[key] = value def get_value(key,defValue=None): """ 获得一个全局变量,不存在则返回默认值 """ try: return _global_dict[key] except KeyError: # 查找字典的key不存在的时候触发 return defValue import GlobalVar as gr gr._init() # 为全局变量赋值 gr.set_value('scheduler', init_scheduler())

3. 要运行的脚本

python
@tornado.web.authenticated def post(self,kt): self.refreshSession() if kt == 'config': self.post_config() else: return self.pathError() def post_config(self): # 必选 name = self.get_body_argument('name',None) status = self.get_body_argument('status',None) task_time = self.get_body_argument('task_time',None) # 非必选 descr = self.get_body_argument('descr',None) power_house_id = self.get_body_argument('power_house_id',None) if not all([name,status,task_time]): return self.customError('缺少必选参数') session = self.getOrNewSession() user = session.user userid = user.get('id') if db_session.query(ReportConfig.name).filter(ReportConfig.name==name).first(): return self.customError('名称不可重复') r = ReportConfig(creator_id=userid,name=name,status=status,task_time=task_time,\ descr=descr,power_house_id=power_house_id) db_session.add(r) db_session.commit() self.scheduler_add(job_id=str(r.id),power_house_id=power_house_id,val=eval(task_time)) return self.returnTypeSuc('操作成功') @tornado.web.authenticated def put(self,kt): self.refreshSession() if kt == 'config': # self.put_config() pass else: return self.pathError() def put_config(self): # 必选 id = self.get_body_argument('id',None) name = self.get_body_argument('name',None) status = self.get_body_argument('status',None) task_time = self.get_body_argument('task_time',None) # 非必选 descr = self.get_body_argument('descr',None) power_house_id = self.get_body_argument('power_house_id',None) if not all([name,status,task_time,id]): return self.customError('缺少必选参数') # session = self.getOrNewSession() # user = session.user # userid = user.get('id') res = db_session.query(ReportConfig).filter(ReportConfig.id==id).first() if not res: return self.customError('id错误') # self.scheduler_update(id,eval(task_time)) res.name = name res.status = status res.task_time = task_time if res.task_time != task_time: pass res.descr = descr res.power_house_id = power_house_id res.op_ts = datetime.datetime.now() db_session.add(res) db_session.commit() return self.returnTypeSuc('操作成功') def delete_config(self): id = self.get_body_argument('id',None) res = db_session.query(ReportConfig).filter(ReportConfig.id==id).first() if not res: return self.customError('id错误') db_session.query(ReportConfig).filter(ReportConfig.id==id).delete() self.scheduler_remove(id) db_session.commit() return self.returnTypeSuc('操作成功') def scheduler_add(self,**kwargs): h,m = kwargs['val']['moment'].split(":") if kwargs['val']['per'] == 'week': d = int(kwargs['val']['val']) -1 gr.get_value('scheduler').add_job(func=reportData().get_report_data, trigger='cron',jobstore='redis', \ day_of_week=d, hour=h,minute=m,id=kwargs['job_id'],misfire_grace_time=None, replace_existing=True,args=(kwargs['power_house_id'],kwargs['job_id'],'week',)) gr.get_value('scheduler').start() elif kwargs['val'].get('per') == 'month': d = int(kwargs['val']['val']) gr.get_value('scheduler').add_job(func=reportData().get_report_data, trigger='cron',jobstore='redis', \ day =d, hour=h,minute=m,id=kwargs['job_id'],misfire_grace_time=None, replace_existing=True,args=(kwargs['power_house_id'],kwargs['job_id'],'month',)) gr.get_value('scheduler').start() else: return self.customError('任务时间格式错误') def scheduler_update(self,job_id,val): if val.get('per') == 'week': h,m = val['moment'].split(":") temp_dict = {"hour": h, "minute": m,'day_of_week':str(int(val['val'])-1)} temp_trigger = gr.get_value('scheduler')._create_trigger(trigger='cron',trigger_args=temp_dict) result = gr.get_value('scheduler').modify_job(job_id=job_id,jobstore='redis', trigger=temp_trigger) elif val.get('per') == 'month': pass else: return self.customError('任务时间格式错误') def scheduler_remove(self,job_id): r.zrem('apscheduler.run_times', job_id) for key in r.hkeys('apscheduler.jobs'): if key.decode('utf-8') == job_id: r.hdel("apscheduler.jobs",key) break

附录-可能出现的bug

1. 添加之后,在start()否则会报错

参考网址1 参考网址2

schedule的使用

1. 环境

2. 安装

3. 每分钟运行任务

函数调用方法

python
import schedule import time def job(): print("I'm working...") # Run job every 3 second/minute/hour/day/week, # Starting 3 second/minute/hour/day/week from now schedule.every(3).seconds.do(job) schedule.every(3).minutes.do(job) schedule.every(3).hours.do(job) schedule.every(3).days.do(job) schedule.every(3).weeks.do(job) # Run job every minute at the 23rd second schedule.every().minute.at(":23").do(job) # Run job every hour at the 42rd minute schedule.every().hour.at(":42").do(job) # Run jobs every 5th hour, 20 minutes and 30 seconds in. # If current time is 02:00, first execution is at 06:20:30 schedule.every(5).hours.at("20:30").do(job) # Run job every day at specific HH:MM and next HH:MM:SS schedule.every().day.at("10:30").do(job) schedule.every().day.at("10:30:42").do(job) # Run job on a specific day of the week schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)

装饰器方法

python
from schedule import every, repeat, run_pending import time @repeat(every(10).minutes) def job(): print("I am a scheduled job") while True: run_pending() time.sleep(1)

传递参数

python
import schedule import time def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob') from schedule import every, repeat @repeat(every().second, "World") @repeat(every().day, "Mars") def hello(planet): print("Hello", planet) while True: schedule.run_pending() time.sleep(1)

4. 取消任务

使用cancel之后,就会取消任务

import schedule,time def some_task(): print('Hello world') job = schedule.every().day.at('09:14').do(some_task) schedule.cancel_job(job) while True: schedule.run_pending() time.sleep(1)

5. 运行一次任务

python
import schedule import time def job_that_executes_once(): # Do some work that only needs to happen once... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once) while True: schedule.run_pending() time.sleep(1)

6. 获取所有的任务

python
import schedule def hello(): print('Hello world') schedule.every().second.do(hello) all_jobs = schedule.get_jobs()

7. 取消所有任务

import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()

8. 根据标签获取任务

python
import schedule,time def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') friends = schedule.get_jobs('friend') print(friends) while True: schedule.run_pending() time.sleep(1) ''' 输出结果: [Every 1 day do greet('Andrea') (last run: [never], next run: 2022-12-02 09:17:53), Every 1 hour do greet('John') (last run: [never], next run: 2022-12-01 10:17:53)] '''

9. 根据标签取消任务

python
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks')

10. 随机秒数运行任务

python
def my_job(): print('Foo') # Run every 5 to 10 seconds. schedule.every(5).to(10).seconds.do(my_job)

11. 运行任务截止到具体时间

python
import schedule from datetime import datetime, timedelta, time def job(): print('Boo') # run job until a 18:30 today schedule.every(1).hours.until("18:30").do(job) # run job until a 2030-01-01 18:33 today schedule.every(1).hours.until("2030-01-01 18:33").do(job) # Schedule a job to run for the next 8 hours schedule.every(1).hours.until(timedelta(hours=8)).do(job) # Run my_job until today 11:33:42 schedule.every(1).hours.until(time(11, 33, 42)).do(job) # run job until a specific datetime schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

12. run_all,运行所有的job

python
import schedule def job_1(): print('Foo') def job_2(): print('Bar') schedule.every().monday.at("12:40").do(job_1) schedule.every().tuesday.at("16:40").do(job_2) schedule.run_all() # Add the delay_seconds argument to run the jobs with a number # of seconds delay in between. schedule.run_all(delay_seconds=10) # 会有延迟

13. 后台运行

无法在后台运行计划。但是,您可以自己创建一个线程,并使用它来运行作业,而不会阻塞主线程。这是一个如何做到这一点的示例:

python
import threading import time import schedule def run_continuously(interval=1): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading. Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): schedule.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run def background_job(): print('Hello from the background thread') schedule.every().second.do(background_job) # Start the background thread stop_run_continuously = run_continuously() # Do some other things... time.sleep(10) # Stop the background thread stop_run_continuously.set()

14. 并行执行

我试图每10秒执行50个项目,但从我的日志来看,它说它在10秒内连续执行每个项目,有什么办法吗?

默认情况下,schedule串行执行所有作业。这背后的原因是,很难找到一个让所有人都满意的并行执行模型。

您可以通过在自己的线程中运行每个作业来解决此限制:

python
import threading import time import schedule def job(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) while 1: schedule.run_pending() time.sleep(1) ''' 输出结果: 并行执行 I'm running on thread <Thread(Thread-1 (job), started 1032)> I'm running on thread <Thread(Thread-2 (job), started 25812)> I'm running on thread <Thread(Thread-3 (job), started 33760)> I'm running on thread <Thread(Thread-4 (job), started 30064)> I'm running on thread <Thread(Thread-5 (job), started 23536)> '''

如果希望对线程数进行更严格的控制,请使用共享作业队列和一个或多个工作线程:

python
import time import threading import schedule import queue def job(): print("I'm working") def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = queue.Queue() schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1)

该模型对于分布式应用程序也有意义,其中工作人员是从分布式工作队列接收作业的独立进程。我喜欢在beanstalkc Python库中使用beanstalkd。

15. 异常处理

调度不会捕获作业执行期间发生的异常。因此,作业执行期间抛出的任何异常都将冒出来,并中断调度的run_xyz函数。

如果你想防止异常,你可以像这样把你的job函数包装在装饰器中:

python
import schedule import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).seconds.do(bad_task) while True: schedule.run_pending() '''输出结果: Traceback (most recent call last): File "D:\myproject\schedule\test.py", line 14, in wrapper return job_func(*args, **kwargs) File "D:\myproject\schedule\test.py", line 25, in bad_task return 1 / 0 ZeroDivisionError: division by zero '''

16.日志记录

python
import schedule import logging logging.basicConfig() schedule_logger = logging.getLogger('schedule') schedule_logger.setLevel(level=logging.DEBUG) def job(): print("Hello, Logs") schedule.every().second.do(job) schedule.run_all() schedule.clear()

result:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs

装饰器方式记录日志

python
import functools import time import schedule # This decorator can be applied to any job function to log the elapsed time of each job def print_elapsed_time(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_timestamp = time.time() print('LOG: Running job "%s"' % func.__name__) result = func(*args, **kwargs) print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp)) return result return wrapper @print_elapsed_time def job(): print('Hello, Logs') time.sleep(5) schedule.every().second.do(job) schedule.run_all()

17. 运行多个调度器

您可以根据需要从一个调度器运行任意多的作业。但是,对于较大的安装,可能需要有多个调度器。这是支持的:

python
import time import schedule def fooJob(): print("Foo") def barJob(): print("Bar") # Create a new scheduler scheduler1 = schedule.Scheduler() # Add jobs to the created scheduler scheduler1.every().hour.do(fooJob) scheduler1.every().hour.do(barJob) # Create as many schedulers as you need scheduler2 = schedule.Scheduler() scheduler2.every().second.do(fooJob) scheduler2.every().second.do(barJob) while True: # run_pending needs to be called on every scheduler scheduler1.run_pending() scheduler2.run_pending() time.sleep(1)

官网

本文作者:Eric

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!