APScheduler全程为Advanced Python Scheduler,是一款轻量级的Python任务调度框架。它允许你像Cron那样安排定期执行的任务,并且支持Python函数或任意可调用的对象。官方文档
bash$ pip install apscheduler
bash$ python setup.py install
stores(作业存储器):存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中。当作业被保存到一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化。作业存储器不能共享调度器。
如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下仍然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)
对执行器的选择取决于你使用上面哪些框架,大多数情况下,使用默认的ThreadPoolExecutor已经能够满足需求。如果你的应用涉及到CPU密集型操作,你可以考虑使用ProcessPoolExecutor来使用更多的CPU核心。你也可以同时使用两者,将ProcessPoolExecutor作为第二执行器。
当你调度作业的时候,你需要为这个作业选择一个触发器,用来描述这个作业何时被触发,APScheduler有三种内置的触发器类型:
最基本的一种调度,作业只会执行一次。它的参数如下:
python# 2016-12-12运行一次job_function
sched.add_job(job_function, 'date', run_date=date(2016, 12, 12), args=['text'])
# 2016-12-12 12:00:00运行一次job_function
sched.add_job(job_function, 'date', run_date=datetime(2016, 12, 12, 12, 0, 0), args=['text'])
interval
1
2
3
4
5
间隔调度,参数如下:
python# 每两个小时调一下job_function
sched.add_job(job_function, 'interval', hours=2)
cron
1
2
参数如下:
python举个栗子:
# job_function将会在6,7,8,11,12月的第3个周五的1,2,3点运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2016-12-30 00:00:00,每周一到周五早上五点半运行job_function
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2016-12-31')
1
2
3
4
在应用程序中使用默认作业存储和默认执行程序运行BackgroundScheduler的例子:
pythonfrom apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# Initialize the rest of the application here, or before the scheduler initialization
1
2
3
4
5
6
这将生成一个名为“default”的MemoryJobStore和名为“default”的ThreadPoolExecutor的BackgroundScheduler,默认最大线程数为10。 如果不满足于当前配置,如希望使用两个执行器有两个作业存储器,并且还想要调整新作业的默认值并设置不同的时区,可按如下配置
pythonfrom pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 配置作业存储器
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 配置执行器,并设置线程数
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False, # 默认情况下关闭新的作业
'max_instances': 3 # 设置调度程序将同时运行的特定作业的最大实例数3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
启动调度器只需要调用start()方法,对于除BlockingScheduler以外的调度程序,此调用将立即返回,您可以继续应用程序的初始化过程,可能会将作业添加到调度程序。 对于BlockingScheduler,只需在完成初始化步骤后调用start()
pythonscheduler.start()
1
方法一:调用add_job()方法
最常见的方法,add_job()方法返回一个apscheduler.job.Job实例,您可以稍后使用它来修改或删除该作业。
方法二:使用装饰器scheduled_job()
此方法主要是方便的声明在应用程序运行时不会改变的作业
python举个栗子:
# 实例删除
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# id删除
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
1
2
3
4
5
6
可以通过Job实例或调度程序本身轻松暂停和恢复作业。 当作业暂停时,下一个运行时间将被清除,直到作业恢复,不会再计算运行时间。 要暂停作业,请使用以下任一方法:
pythonapscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
1
2
恢复作业:
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
1
2
要获得计划作业的机器可处理列表,可以使用get_jobs()方法。 它将返回一个Job实例列表。 如果您只对特定作业存储中包含的作业感兴趣,则将作业存储别名作为第二个参数。
为了方便起见,可以使用print_jobs()方法,它将打印格式化的作业列表,触发器和下次运行时间。
您可以通过调用apscheduler.job.Job.modify()或modify_job()来修改除id以外的任何作业属性。
pythonjob.modify(max_instances=6, name='Alternate name')
1
默认情况下,调度程序关闭其作业存储和执行程序,并等待所有当前正在执行的作业完成,wait=False参数可选,代表立即停止,不用等待。 scheduler.shutdown(wait=False) 附:1、定时任务运行脚本小例子:
pythonimport datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from app.untils.log_builder import sys_logging
scheduler = BlockingScheduler() # 后台运行
# 设置为每日凌晨00:30:30时执行一次调度程序
@scheduler.scheduled_job("cron", day_of_week='*', hour='1', minute='30', second='30')
def rebate():
print "schedule execute"
sys_logging.debug("statistic scheduler execute success" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
try:
scheduler.start()
sys_logging.debug("statistic scheduler start success")
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
sys_logging.debug("statistic scheduler start-up fail")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
第一次使用此定时器时总会执行两次,一直不知道为什么,后来发现,python 的flask框架在debug模式下会多开一个线程监测项目变化,所以每次会跑两遍,可以将debug选项改为False
创建定时任务对象时,指定时区 scheduler=BlockingScheduler(timezone='Asia/Shanghai')
pythonfrom datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler
scheduler = None
job_ids = []
# 初始化
def init_scheduler():
global scheduler
scheduler = TornadoScheduler()
scheduler.start()
print('[Scheduler Init]APScheduler has been started')
# 要执行的定时任务在这里
def task1(options):
print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))
class MainHandler(RequestHandler):
def get(self):
self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')
class SchedulerHandler(RequestHandler):
def get(self):
global job_ids
job_id = self.get_query_argument('job_id', None)
action = self.get_query_argument('action', None)
if job_id:
# add
if 'add' == action:
if job_id not in job_ids:
job_ids.append(job_id)
scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
self.write('[TASK ADDED] - {}'.format(job_id))
else:
self.write('[TASK EXISTS] - {}'.format(job_id))
# remove
elif 'remove' == action:
if job_id in job_ids:
scheduler.remove_job(job_id)
job_ids.remove(job_id)
self.write('[TASK REMOVED] - {}'.format(job_id))
else:
self.write('[TASK NOT FOUND] - {}'.format(job_id))
else:
self.write('[INVALID PARAMS] INVALID job_id or action')
if __name__ == "__main__":
routes = [
(r"/", MainHandler),
(r"/scheduler/?", SchedulerHandler),
]
init_scheduler()
app = Application(routes, debug=True)
app.listen(8888)
IOLoop.current().start()
python
# -*- coding:utf-8 -*-
import logging
import os,sys
from Application.Cfg.dir_cfg import model_config,hy2020_dir
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.tornado import TornadoScheduler
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.events import EVENT_JOB_EXECUTED
LOG_FORMAT = '[%(asctime)s] %(filename)s -> %(funcName)s line:%(lineno)d [%(levelname)s]: %(message)s'
# logging.basicConfig(filename=os.path.join(hy2020_dir, 'log/scheduler.log'), level=logging.INFO, format=LOG_FORMAT,\
# datefmt='%Y-%m-%d %H:%M:%S')
def listener_function(event):
if event.exception:
logging.error('任务出错')
else:
logging.info('任务正常')
def init_scheduler():
REDIS = {
'host': model_config.get('redis', "HOSTNAME"),'port': model_config.get('redis', "PORT"),
'db': model_config.get('redis', "DB"),'password': model_config.get('redis', "PASSWORD")}
jobstores = {'redis': RedisJobStore(**REDIS)}
# 配置执行器,并设置线程数
executors = {
'default': ThreadPoolExecutor(10), # 默认线程数
'processpool': ProcessPoolExecutor(5) # 默认进程
}
scheduler = TornadoScheduler(jobstores=jobstores,executors=executors,timezone='Asia/Shanghai',daemon=True)
# 添加监听器
scheduler.add_listener(callback=listener_function, mask=(EVENT_JOB_EXECUTED |
EVENT_JOB_ERROR))
# 设置日志
# scheduler.pause_job()
scheduler._logger = logging
# scheduler.add_job(task,"interval",seconds=3,id="job1",args=())
logging.info('[Scheduler Init]APScheduler has been started')
return scheduler
pythonclass GlobalVar(object):
def _init():
""" 初始化 """
global _global_dict
_global_dict = {}
def set_value(key,value):
""" 定义一个全局变量 """
_global_dict[key] = value
def get_value(key,defValue=None):
""" 获得一个全局变量,不存在则返回默认值 """
try:
return _global_dict[key]
except KeyError: # 查找字典的key不存在的时候触发
return defValue
import GlobalVar as gr
gr._init()
# 为全局变量赋值
gr.set_value('scheduler', init_scheduler())
python@tornado.web.authenticated
def post(self,kt):
self.refreshSession()
if kt == 'config':
self.post_config()
else:
return self.pathError()
def post_config(self):
# 必选
name = self.get_body_argument('name',None)
status = self.get_body_argument('status',None)
task_time = self.get_body_argument('task_time',None)
# 非必选
descr = self.get_body_argument('descr',None)
power_house_id = self.get_body_argument('power_house_id',None)
if not all([name,status,task_time]):
return self.customError('缺少必选参数')
session = self.getOrNewSession()
user = session.user
userid = user.get('id')
if db_session.query(ReportConfig.name).filter(ReportConfig.name==name).first():
return self.customError('名称不可重复')
r = ReportConfig(creator_id=userid,name=name,status=status,task_time=task_time,\
descr=descr,power_house_id=power_house_id)
db_session.add(r)
db_session.commit()
self.scheduler_add(job_id=str(r.id),power_house_id=power_house_id,val=eval(task_time))
return self.returnTypeSuc('操作成功')
@tornado.web.authenticated
def put(self,kt):
self.refreshSession()
if kt == 'config':
# self.put_config()
pass
else:
return self.pathError()
def put_config(self):
# 必选
id = self.get_body_argument('id',None)
name = self.get_body_argument('name',None)
status = self.get_body_argument('status',None)
task_time = self.get_body_argument('task_time',None)
# 非必选
descr = self.get_body_argument('descr',None)
power_house_id = self.get_body_argument('power_house_id',None)
if not all([name,status,task_time,id]):
return self.customError('缺少必选参数')
# session = self.getOrNewSession()
# user = session.user
# userid = user.get('id')
res = db_session.query(ReportConfig).filter(ReportConfig.id==id).first()
if not res:
return self.customError('id错误')
# self.scheduler_update(id,eval(task_time))
res.name = name
res.status = status
res.task_time = task_time
if res.task_time != task_time:
pass
res.descr = descr
res.power_house_id = power_house_id
res.op_ts = datetime.datetime.now()
db_session.add(res)
db_session.commit()
return self.returnTypeSuc('操作成功')
def delete_config(self):
id = self.get_body_argument('id',None)
res = db_session.query(ReportConfig).filter(ReportConfig.id==id).first()
if not res:
return self.customError('id错误')
db_session.query(ReportConfig).filter(ReportConfig.id==id).delete()
self.scheduler_remove(id)
db_session.commit()
return self.returnTypeSuc('操作成功')
def scheduler_add(self,**kwargs):
h,m = kwargs['val']['moment'].split(":")
if kwargs['val']['per'] == 'week':
d = int(kwargs['val']['val']) -1
gr.get_value('scheduler').add_job(func=reportData().get_report_data, trigger='cron',jobstore='redis', \
day_of_week=d, hour=h,minute=m,id=kwargs['job_id'],misfire_grace_time=None,
replace_existing=True,args=(kwargs['power_house_id'],kwargs['job_id'],'week',))
gr.get_value('scheduler').start()
elif kwargs['val'].get('per') == 'month':
d = int(kwargs['val']['val'])
gr.get_value('scheduler').add_job(func=reportData().get_report_data, trigger='cron',jobstore='redis', \
day =d, hour=h,minute=m,id=kwargs['job_id'],misfire_grace_time=None,
replace_existing=True,args=(kwargs['power_house_id'],kwargs['job_id'],'month',))
gr.get_value('scheduler').start()
else:
return self.customError('任务时间格式错误')
def scheduler_update(self,job_id,val):
if val.get('per') == 'week':
h,m = val['moment'].split(":")
temp_dict = {"hour": h, "minute": m,'day_of_week':str(int(val['val'])-1)}
temp_trigger = gr.get_value('scheduler')._create_trigger(trigger='cron',trigger_args=temp_dict)
result = gr.get_value('scheduler').modify_job(job_id=job_id,jobstore='redis', trigger=temp_trigger)
elif val.get('per') == 'month':
pass
else:
return self.customError('任务时间格式错误')
def scheduler_remove(self,job_id):
r.zrem('apscheduler.run_times', job_id)
for key in r.hkeys('apscheduler.jobs'):
if key.decode('utf-8') == job_id:
r.hdel("apscheduler.jobs",key)
break
pythonimport schedule
import time
def job():
print("I'm working...")
# Run job every 3 second/minute/hour/day/week,
# Starting 3 second/minute/hour/day/week from now
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
# Run job every minute at the 23rd second
schedule.every().minute.at(":23").do(job)
# Run job every hour at the 42rd minute
schedule.every().hour.at(":42").do(job)
# Run jobs every 5th hour, 20 minutes and 30 seconds in.
# If current time is 02:00, first execution is at 06:20:30
schedule.every(5).hours.at("20:30").do(job)
# Run job every day at specific HH:MM and next HH:MM:SS
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
# Run job on a specific day of the week
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
pythonfrom schedule import every, repeat, run_pending
import time
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)
pythonimport schedule
import time
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
from schedule import every, repeat
@repeat(every().second, "World")
@repeat(every().day, "Mars")
def hello(planet):
print("Hello", planet)
while True:
schedule.run_pending()
time.sleep(1)
使用cancel之后,就会取消任务
import schedule,time def some_task(): print('Hello world') job = schedule.every().day.at('09:14').do(some_task) schedule.cancel_job(job) while True: schedule.run_pending() time.sleep(1)
pythonimport schedule
import time
def job_that_executes_once():
# Do some work that only needs to happen once...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
pythonimport schedule
def hello():
print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()
import schedule def greet(name): print('Hello {}'.format(name)) schedule.every().second.do(greet) schedule.clear()
pythonimport schedule,time
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
friends = schedule.get_jobs('friend')
print(friends)
while True:
schedule.run_pending()
time.sleep(1)
'''
输出结果:
[Every 1 day do greet('Andrea') (last run: [never], next run: 2022-12-02 09:17:53), Every 1 hour do greet('John') (last run: [never], next run: 2022-12-01 10:17:53)]
'''
pythonimport schedule
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
pythondef my_job():
print('Foo')
# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)
pythonimport schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)
# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
pythonimport schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
schedule.run_all(delay_seconds=10) # 会有延迟
无法在后台运行计划。但是,您可以自己创建一个线程,并使用它来运行作业,而不会阻塞主线程。这是一个如何做到这一点的示例:
pythonimport threading
import time
import schedule
def run_continuously(interval=1):
"""Continuously run, while executing pending jobs at each
elapsed time interval.
@return cease_continuous_run: threading. Event which can
be set to cease continuous run. Please note that it is
*intended behavior that run_continuously() does not run
missed jobs*. For example, if you've registered a job that
should run every minute and you set a continuous run
interval of one hour then your job won't be run 60 times
at each interval but only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
schedule.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
def background_job():
print('Hello from the background thread')
schedule.every().second.do(background_job)
# Start the background thread
stop_run_continuously = run_continuously()
# Do some other things...
time.sleep(10)
# Stop the background thread
stop_run_continuously.set()
我试图每10秒执行50个项目,但从我的日志来看,它说它在10秒内连续执行每个项目,有什么办法吗?
默认情况下,schedule串行执行所有作业。这背后的原因是,很难找到一个让所有人都满意的并行执行模型。
您可以通过在自己的线程中运行每个作业来解决此限制:
pythonimport threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
'''
输出结果: 并行执行
I'm running on thread <Thread(Thread-1 (job), started 1032)>
I'm running on thread <Thread(Thread-2 (job), started 25812)>
I'm running on thread <Thread(Thread-3 (job), started 33760)>
I'm running on thread <Thread(Thread-4 (job), started 30064)>
I'm running on thread <Thread(Thread-5 (job), started 23536)>
'''
如果希望对线程数进行更严格的控制,请使用共享作业队列和一个或多个工作线程:
pythonimport time
import threading
import schedule
import queue
def job():
print("I'm working")
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
该模型对于分布式应用程序也有意义,其中工作人员是从分布式工作队列接收作业的独立进程。我喜欢在beanstalkc Python库中使用beanstalkd。
调度不会捕获作业执行期间发生的异常。因此,作业执行期间抛出的任何异常都将冒出来,并中断调度的run_xyz函数。
如果你想防止异常,你可以像这样把你的job函数包装在装饰器中:
pythonimport schedule
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).seconds.do(bad_task)
while True:
schedule.run_pending()
'''输出结果:
Traceback (most recent call last):
File "D:\myproject\schedule\test.py", line 14, in wrapper
return job_func(*args, **kwargs)
File "D:\myproject\schedule\test.py", line 25, in bad_task
return 1 / 0
ZeroDivisionError: division by zero
'''
pythonimport schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
result:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={}) Hello, Logs DEBUG:schedule:Deleting *all* jobs
装饰器方式记录日志
pythonimport functools
import time
import schedule
# This decorator can be applied to any job function to log the elapsed time of each job
def print_elapsed_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_timestamp = time.time()
print('LOG: Running job "%s"' % func.__name__)
result = func(*args, **kwargs)
print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp))
return result
return wrapper
@print_elapsed_time
def job():
print('Hello, Logs')
time.sleep(5)
schedule.every().second.do(job)
schedule.run_all()
您可以根据需要从一个调度器运行任意多的作业。但是,对于较大的安装,可能需要有多个调度器。这是支持的:
pythonimport time
import schedule
def fooJob():
print("Foo")
def barJob():
print("Bar")
# Create a new scheduler
scheduler1 = schedule.Scheduler()
# Add jobs to the created scheduler
scheduler1.every().hour.do(fooJob)
scheduler1.every().hour.do(barJob)
# Create as many schedulers as you need
scheduler2 = schedule.Scheduler()
scheduler2.every().second.do(fooJob)
scheduler2.every().second.do(barJob)
while True:
# run_pending needs to be called on every scheduler
scheduler1.run_pending()
scheduler2.run_pending()
time.sleep(1)
本文作者:Eric
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!