开机运行虚拟机
392 2023-04-03 04:33:19
APScheduler
全称Advanced Python Scheduler
作用为在指定的时间规则执行指定的作业。
pip install apscheduler
一个任务就是一个函数,或者异步函数
BlockingScheduler是最基本的调度器,阻塞型的调度器
参数一:任务名
参数二:触发器,使用的是interval间隔触发器
seconds:间隔时间,单位秒,没个几秒执行一次
args:所添加的任务的传入参数
start
from datetime import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerdef func(name): now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now + f" Hello world, {name}")scheduler = BlockingScheduler()scheduler.add_job(func, 'interval', seconds=3, args=["desire"])scheduler.start()
执行结果:
2022-05-19 16:28:51 Hello world, desire2022-05-19 16:28:54 Hello world, desire2022-05-19 16:28:57 Hello world, desire2022-05-19 16:29:00 Hello world, desire2022-05-19 16:29:03 Hello world, desire
BlockingScheduler
start
函数会阻塞当前线程,不能立即返回from apscheduler.schedulers.blocking import BlockingScheduler
BackgroundScheduler
start
后主线程不会阻塞from apscheduler.schedulers.background import BackgroundScheduler
AsyncIOScheduler
asyncio
模块的应用程序from apscheduler.schedulers.asyncio import AsyncIOScheduler
GeventScheduler
gevent
模块的应用程序from apscheduler.schedulers.gevent import GeventScheduler
TwistedScheduler
Twisted
的应用程序from apscheduler.schedulers.twisted import TwistedScheduler
QtScheduler
:Qt
的应用程序from apscheduler.schedulers.qt import QtScheduler
TornadoScheduler
Tornado
的应用程序from apscheduler.schedulers.tornado import TornadoScheduler
from datetime import datetimefrom apscheduler.schedulers.blocking import BlockingSchedulerdef func(name): now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now + f" Hello world, {name}")scheduler = BlockingScheduler()# 指定在2022/05/19 16:53 进行执行任务scheduler.add_job(func, 'date', run_date=datetime(2022, 5, 19, 16, 53), args=["desire"])scheduler.start()
运行结果:
2022-05-19 16:53:00 Hello world, desire
在固定的事件间隔触发事件
interval触发器可以设置的触发参数
# 三秒执行一次scheduler.add_job(func, 'interval', seconds=3, args=["desire"])
在某个确切的时间周期性的触发时间
参数:
也可以使用表达式类型:
# 在每个50秒的时候触发scheduler.add_job(func, 'cron', second=50, args=["desire"])# 在第4个星期日触发scheduler.add_job(func, 'cron', day="4th sun", args=["desire"])
MemoryJobStore
from apscheduler.jobstores.memory import MemoryJobStore
SQLAlchemyJobStore
SQLAlchemy
这个ORM框架作为存储方式from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
MongoDBJobStore
mongodb
作为存储器from apscheduler.jobstores.mongodb import MongoDBJobStore
RedisJobStore
redis
作为存储器from apscheduler.jobstores.redis import RedisJobStore
ThreadPoolExecutor
from apscheduler.executors.pool import ThreadPoolExecutor
ProcessPoolExecutor
from apscheduler.executors.pool import ProcessPoolExecutor
GeventExecutor
Gevent
程序执行器from apscheduler.executors.gevent import GeventExecutor
TornadoExecutor
Tornado
程序执行器from apscheduler.executors.tornado import TornadoExecutor
TwistedExecutor
Twisted
程序执行器from apscheduler.executors.twisted import TwistedExecutor
AsyncIOExecutor
asyncio
程序执行器from apscheduler.executors.asyncio import AsyncIOExecutor
jobstores
用来配置存储器executors
用来配置执行器job_defaults
创建job时的默认参数max_instances
最大实例数, 同一个任务同一时间最多只能有n个实例在运行。from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutorinterval_task = { # 配置存储器 "jobstores": { # 使用SQLAlchemy进行存储,会自动创建数据库,并创建apscheduler_jobs表 'default': SQLAlchemyJobStore(url="sqlite:///jobs.db") }, # 配置执行器 "executors": { # 使用线程池进行执行,最大线程数是20个 'default': ThreadPoolExecutor(20) }, # 创建job时的默认参数 "job_defaults": { 'coalesce': False, # 是否合并执行 'max_instances': 3 # 最大实例数 }}scheduler = BlockingScheduler(**interval_task)
# 最常用的方式scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="desire_job", replace_existing=True)# 使用装饰器@scheduler.scheduled_job("interval", seconds=5, id="job2222222")def test_task(): now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now + f" Hello world, 使用装饰器")
remove_job
方法add_job()
中得到的job实例调用remove()
方法
# removejob = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")job.remove()# remove_jobscheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")scheduler.remove_job(job_id="job_remove")
# 暂停一个job# 方式一:job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")job.pause()# 方式二:scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")scheduler.pause_job(job_id="job_remove")# 恢复一个job# 方式一:job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")job.resume()# 方式二:scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")scheduler.resume_job(job_id="job_remove")
get_jobs
获取机器上可处理的作业调度列表print_jobs
格式化输出作业列表以及他们的触发器和下一次的运行时间modify()
通过job实例进行修改属性modify_job
通过job的ID进行修改属性job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_modify")# modifyjob.modify(name="job222")# modify_jobscheduler.modify_job(job_id="job_modify", name="job2222")
reschedule
通过job实例重新调度jobreschedule_job
通过job的ID进行重新调度jobjob = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_modify")# reschedulejob.reschedule(trigger='cron', minute='*/5')# reschedule_jobscheduler.reschedule_job(job_id="job_modify", trigger='cron', minute='*/5')
scheduler.shutdown()scheduler.shutdown(wait=False)
add_listener
通过此方法对调度器绑定事件监听器def my_listener(event): if event.exception: print("任务出错了!!!!!!!!!") else: print("任务正常运行。。。。。")# 绑定事件监听器,当出现异常或者错误的时,进行监听scheduler.add_listener(my_listener, mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)