APScheduler定时任务框架-程序员宅基地

技术标签: 散装精品  python  APScheduler  

一.APScheduler简介

Advanced Python Scheduler (APScheduler) 是一个 Python 库,可让您安排 Python 代码稍后执行,可以只执行一次,也可以定期执行。您可以随意添加新工作或删除旧工作。如果您将任务存储在数据库中,它们也将在调度器重新启动后幸存下来并保持其状态。当调度器重新启动时,它将运行它在离线时应该运行的所有任务。

除此之外,APScheduler 可以用作跨平台、特定于应用程序的平台特定调度器的替代品,例如 cron 守护程序或 Windows 任务调度器。但是请注意,APScheduler 本身不是守护程序或服务,也不附带任何命令行工具。它主要用于在现有应用程序中运行。也就是说,APScheduler 确实为您提供了一些构建块来构建调度器服务或运行专用调度器进程。

APScheduler 具有三个可以使用的内置调度系统:

  • Cron 式调度(具有可选的开始/结束时间)
  • 基于间隔的执行(以均匀间隔运行任务,具有可选的开始/结束时间)
  • 一次性延迟执行(在设定的日期/时间运行一次任务)

您可以混合搭配调度系统和以您喜欢的任何方式存储任务的后端。支持的用于存储任务的后端包括:
Memory,SQLAlchemy (任何由SQLAlchemy支持的RDBMS都可以工作),MongoDB,Redis,RethinkDB,ZooKeeper

APScheduler还集成了一些常见的Python框架,比如:
asyncio,gevent,Tornado,Twisted,Qt (使用PyQt, PySide2或PySide)

有第三方解决方案可以将 APScheduler 与其他框架集成:
Django,Flask

二.APScheduler基本使用

1.安装

pip安装:

pip install apscheduler

源码安装(https://pypi.python.org/pypi/APScheduler/):

python setup.py install

2.基本概念

APScheduler 有四种组件:
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个任务都有它自己的触发器,用于确定下一次运行任务的时间,除了初始配置之外,触发器是完全无状态的。

任务存储器(job stores):任务存储器包含预定的任务,指定了任务被存放的位置,默认情况下任务保存在内存,也可将任务保存在各种数据库中,当任务的数据在保存到持久任务存储时,它会被序列化,并在从它加载回来时会进行反序列化。任务存储(默认情况除外)不会将任务数据保存在内存中,而是充当在后端保存、加载、更新和搜索任务的中间人。任务存储绝不能在调度器之间共享。

执行器(executors):执行器负责处理任务的运行,通常将指定的任务(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器,然后调度器发出适当的事件。

调度器(schedulers):任务调度器,属于控制角色,通过它配置任务存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、任务存储器、执行器的运行,通常只有一个调度器运行在应用程序中,开发人员通常不需要直接处理任务存储器、执行器或触发器。相反,调度器提供了适当的接口来处理所有这些。任务存储和执行器的配置是通过调度器完成的。

3.选择正确的调度器、任务存储器、执行器和触发器

选择调度器:对调度器的选择主要取决于你的编程环境以及你将使用 APScheduler 的目的

BlockingScheduler:当调度器是进程中唯一运行的东西时使用。
BackgroundScheduler:当不使用以下任何框架时使用,并且希望调度器在应用程序的后台运行。
AsyncIOScheduler:当应用程序使用 asyncio 模块时使用
GeventScheduler: 当应用程序使用 gevent时使用
TornadoScheduler:当正在构建 Tornado 应用程序时使用
TwistedScheduler:当正在构建 Twisted 应用程序时使用
QtScheduler: 当正在构建 Qt 应用程序时使用

选择任务存储器:你就需要确定是否需要任务持久化。如果你总是在应用程序开始时重新创建任务,那么你可以使用默认值 ( MemoryJobStore)。但是,如果你需要在调度器重新启动或应用程序崩溃时保持你的任务,那么就要选择持久化的任务储存器。但是,如果你是自由选择,则推荐使用SQLAlchemyJobStore并搭配PostgreSQL作为后台数据库,因为它可以提供强大的数据完整性保护功能。

选择执行器:这个同样要看你的实际需求,默认的ThreadPoolExecutor线程池执行器方案可以满足大部分需求。如果你的工作负载涉及 CPU 密集型操作,则应考虑ProcessPoolExecutor进程池执行器方案,以此改为使用多个 CPU 内核来充分利用多核算力。你甚至可以同时使用两者,将ProcessPoolExecutor进程池执行器添加为辅助执行器。

选择触发器:配置一个任务,就需要设置一个任务触发器。触发器可以设定运行任务时计算日期/时间的逻辑。APScheduler 带有三种内置触发器类型:

date: 当你想在某个时间点只运行一次任务时使用

interval: 当你想以固定的时间间隔运行任务时使用

cron:当你想在一天中的特定时间定期运行任务时使用

也可以将多个触发器组合成一个触发器,该触发器可以设定同时满足所有触发器条件而触发,或者满足一项即触发。有关更多信息,请参阅的文档:combining triggers

4.触发器详解

(1)date

作用:在给定的日期时间触发一次。如果run_date留空,则使用当前时间。

date类:

from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 设定执行时间
sched.add_job(my_job, 'date', run_date=date(2021, 8, 30), args=['test'])

sched.start()

其中run_date参数可以是date类型、datetime类型或文本类型。
datetime类

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()

def my_job(text):
    print(text)

# 设定执行时间
sched.add_job(my_job, 'date', run_date=datetime(2021, 8, 30, 17, 10, 0), args=['test'])

sched.start()

运行结果:
在这里插入图片描述
文本类

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


def my_job(text):
    print(text)


# 设定执行时间
sched.add_job(my_job, 'date', run_date='2021-08-30 17:27:00', args=['test'])

sched.start()

运行结果:
在这里插入图片描述
未指定时间,则立即执行

sched.add_job(my_job, args=['test'])

(2)interval

作用:当你想以固定的时间间隔运行任务时使用

class apscheduler.triggers.interval.IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)

在指定的时间间隔内触发,如果指定,则从start_date开始,否则datetime.now() + 间隔。

参数:
周( int ) – 等待的周数
days ( int ) – 等待的天数
小时( int ) – 等待的小时数
分钟( int ) – 等待的分钟数
seconds ( int ) – 等待的秒数
start_date ( datetime|str ) – 间隔计算的起点
end_date ( datetime|str ) – 要触发的最晚可能日期/时间
timezone ( datetime.tzinfo|str ) – 用于日期/时间计算的时区
jitter ( int|None ) –jitter最多延迟作业执行几秒钟

from datetime import datetime

from apscheduler.schedulers.blocking import BlockingScheduler


def job_func():
    print("Hello World")


sched = BlockingScheduler()

# 设定周期开始时间start_date和结束时间end_date,及每10s触发
sched.add_job(job_func, 'interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)

sched.start()

运行结果:
在这里插入图片描述
也能通过scheduled_job()装饰器实现:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()


@sched.scheduled_job('interval', start_date="2021-08-30 17:48:20", end_date="2021-08-30 17:49:00", seconds=10)
def job_func():
    print("Hello World")

sched.start()

该jitter选项使你能够将随机组件添加到执行时间。如果你有多个服务器并且不希望它们在完全相同的时刻运行作业,或者如果你想防止具有类似选项的多个作业始终同时运行,这可能很有用:

# 在120s内随机选择一个额外延迟
sched.add_job(job_func, 'interval', hours=1, jitter=120)

(3)cron

作用:当你想在一天中的特定时间定期运行任务时使用

classapscheduler.triggers.cron.CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)

当当前时间匹配所有指定的时间限制时触发,类似于 UNIX cron 调度器的工作方式。

参数:
year (int|str) – 4 位数字年份
month (int|str) – 月 (1-12)
day (int|str) – 月中的第几天 (1-31)
week (int|str) – ISO 周 (1-53)
day_of_week (int|str) – 工作日的编号或名称(0-6 或 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 小时 (0-23)
minute (int|str) – 分钟 (0-59)
second (int|str) – 秒 (0-59)
start_date (datetime|str) – 最早可能触发的日期/时间(包括)
end_date (datetime|str) – 要触发的最晚可能日期/时间(包括)
timezone (datetime.tzinfo|str) – 用于日期/时间计算的时区(默认为调度器时区)
jitter (int|None) – 最多延迟作业执行几秒钟

介绍
这是 APScheduler 中最强大的内置触发器。你可以在每个字段上指定多种不同的表达式,在确定下一次执行时间时,它会在每个字段中找到满足条件的最早时间。这种行为类似于大多数类 UNIX 操作系统中的“Cron”实用程序。

你还可以分别通过start_date和 end_date参数指定 cron 样式计划的开始日期和结束日期。它们可以作为日期/日期时间对象或文本(采用 ISO 8601格式)给出。

与 crontab 表达式不同,你可以省略不需要的字段。当省略时间参数时,在显式指定参数之前的参数会被设定为*,之后的参数会被设定为最小值,week 和day_of_week的最小值为* 。例如,某任务在每年每个月的第一天每小时 20 分钟执行。下面的代码示例应该进一步说明这种行为。

day=1, minute=20
等同于
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0

表达式类型
下表列出了从年份到第二个字段中使用的所有可用表达式。可以在单个字段中给出多个表达式,用逗号分隔。

表达式 参数类型 描述
* any 匹配字段所有取值
*/a any 匹配字段每递增 a 后的值, 从字段最小值开始,包括最小值,比如小时(hour)的 */5,则匹配0,5,10,15,20
a-b any 匹配字段 a 到 b 之间的取值,a 必须小于 b,包括 a 与 b,比如1-5,则匹配1,2,3,4,5
a-b/c any 匹配 a 到 b 之间每递增 c 后的值,包括 a,不一定包括 b,比如1-20/5,则匹配1,6,11,16
xth y day 匹配 y 在当月的第 x 次,比如 3rd fri 指当月的第三个周五
last x day 匹配 x 在当月的最后一次,比如 last fri 指当月的最后一个周五
last day 匹配当月的最后一天
x,y,z any 组合表达式,可以组合确定值或上方的表达式

在month和day_of_week领域接受英文简写月和星期名(jan-dec和mon-sun分别)。

from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发
sched.add_job(job_func, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')

sched.start()

可以使用start_date和end_date来限制计划运行的总时间:

from apscheduler.schedulers.blocking import BlockingScheduler

def job_func():
    print("Hello World")

sched = BlockingScheduler()

# 在2021-09-02 00:00:00前,每周一到每周五 10:35运行
sched.add_job(job_func, 'cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
sched.start()

可通过 scheduled_job() 装饰器实现:

from apscheduler.schedulers.blocking import BlockingScheduler

sched = BlockingScheduler()
@sched.scheduled_job('cron', day_of_week='mon-fri', hour=10, minute=35, end_date='2021-09-02')
def job_func():
    print("Hello World")

sched.start()

运行结果:
在这里插入图片描述
使用标准 crontab 表达式安排任务:

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

sched = BlockingScheduler()

def job_func():
    print("Hello World")

# from_crontab(cls, expr, timezone=None)
# 其中expr: minute, hour, day of month, month, day of week
# 在9月到11月间前15天内的每个星期的10:48触发
sched.add_job(job_func, CronTrigger.from_crontab('48 10 1-15 sep-nov *'))
sched.start()

运行结果:
在这里插入图片描述
也可添加jitter参数:

sched.add_job(job_func, 'cron', hour='*', jitter=120)

夏令时问题
cron 触发器使用所谓的“挂钟”时间。因此,如果所选时区遵守 DST(夏令时),你应该注意它可能会在进入或离开 DST 时导致 cron 触发器出现意外行为。从标准时间切换到夏令时时,时钟会向前移动一小时或半小时,具体取决于时区。同样,当切换回标准时间时,时钟会向后移动一小时或半小时。这将导致一些时间段要么根本不存在,要么重复。如果你的日程安排要在这些时间段之一执行作业,则它的执行频率可能比预期的要高或低。这不是一个错误。如果你希望避免这种情况,请使用不遵守 DST 的时区,可以使用UTC时间,或提前预知并规划好执行的问题。

# 在Europe/Helsinki时区, 在三月最后一个周一就不会触发;在十月最后一个周一会触发两次
sched.add_job(job_function, 'cron', hour=3, minute=30)

5.配置调度器

APScheduler 提供了许多不同的方式来配置调度器。你可以选择直接传字典,也可以将选项作为关键字参数传入。你还可以先实例化调度器,然后添加任务并配置调度器。通过这种方式,你可以在任何环境中获得最大的灵活性。

可以在BaseScheduler该类的 API 参考中找到调度器级别配置选项的完整列表 。调度器子类可能还有其他选项,这些选项记录在它们各自的 API 参考中。单个任务存储和执行器的配置选项同样可以在它们的 API 参考页面上找到。

假设你想使用默认任务存储和默认执行器在你的应用程序中运行 BackgroundScheduler:

from apscheduler.schedulers.background import BackgroundScheduler


scheduler = BackgroundScheduler()

#在此或在计划程序初始化之前初始化应用程序的其余部分,因为是非阻塞的后台调度器,所以程序会继续向下执行

这将为你提供一个 BackgroundScheduler,其有一个名称为default的MemoryJobStore(内存任务储存器)和一个名称是default且默认最大线程是10的ThreadPoolExecutor(线程池执行器)。

现在,如果你希望拥有两个使用两个执行器的任务存储器,并且你还希望调整任务的默认值并设置不同的时区。可参考下面的三个例子,它们是完全等价的:

一个名为“mongo”的 MongoDBJobStore

一个名为“default”的 SQLAlchemyJobStore(使用 SQLite)

一个名为“default”的 ThreadPoolExecutor,工作线程数为 20个

一个名为“processpool”的 ProcessPoolExecutor,工作进程数为 5个

UTC 作为调度器的时区

默认情况下为新任务关闭合并模式

新任务的默认最大实例数限制为 3个

方法一

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


jobstores = {
    
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方法二

from apscheduler.schedulers.background import BackgroundScheduler


# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
    
    'apscheduler.jobstores.mongo': {
    
         'type': 'mongodb'
    },
    'apscheduler.jobstores.default': {
    
        'type': 'sqlalchemy',
        'url': 'sqlite:///jobs.sqlite'
    },
    'apscheduler.executors.default': {
    
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
    
        'type': 'processpool',
        'max_workers': '5'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'UTC',
})

方法三

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    
    'mongo': {
    'type': 'mongodb'},
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    
    'default': {
    'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler()

# 在这里做些别的事情,可能会增加任务等

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

6.启动调度器

只需调用start()调度器即可启动调度器。对于除BlockingScheduler之外的调度器,此调用将立即返回,你可以继续应用程序的初始化过程,如调度器添加任务。

对于 BlockingScheduler,你只需要start()在完成任何初始化步骤后调用即可,因程序则会阻塞在start()位置,故要运行的代码必须写在start()之前。 。

注意:调度器启动后,您将无法再更改其设置。

7.添加任务

添加任务的方法如下(两种):
(1)通过调用add_job()
(2)通过装饰器scheduled_job()

第一种方法是最常见的方法。第二种方法主要是为了方便声明在程序运行时不会更改的任务。
第一种方法add_job()方法会返回一个 apscheduler.job.Job实例,你可以稍后使用它来修改或删除任务。

在任何时候你都可以在调度安排任务。但是如果添加任务时调度器尚未运行,则任务将被暂定调度,并且仅在调度器启动时才计算其第一次运行时间。

需要注意的是,如果你使用序列化任务的执行器或任务存储器,它将对你的任务增加一些要求:

a.目标可调用对象必须可全局访问
b.可调用对象的任何参数都必须是可序列化的

在内置任务存储中,只有 MemoryJobStore 不会序列化任务 。在内置执行器中,只有 ProcessPoolExecutor 会序列化任务。

注意
a. 如果你在程序初始化期间在持久任务存储器中安排任务,则必须为任务定义一个具体的ID 并使用replace_existing=True ,否则每次程序重新启动时你都会获得任务的新副本,也就表示任务的状态不会保存。
b.要立即运行任务,可以在添加任务时省略trigger参数。

8.删除任务

当你从调度器中删除任务时,它会从其关联的任务存储器中删除,并且不会再被执行。有两种方法可以实现这一点:

(1)通过remove_job()使用任务的 ID 和任务存储器别名调用

(2)在通过add_job()创建的任务实例上调用remove()方法

第二种方式更方便,但前提必须在创建任务实例时,实例被保存在变量中。对于通过scheduled_job()创建的任务,只能选择第一种方式。

如果任务的调度结束(即它的触发器不会产生任何进一步的运行时间),它会被自动删除。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=11, minute=59)
job.remove()
sched.add_job(job_func2, 'cron', hour=11, minute=59)

sched.start()

运行结果:
在这里插入图片描述

同样,通过任务的具体ID:

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


sched.add_job(job_func1, 'cron', hour=11, minute=56, id='my_job_id')
sched.remove_job('my_job_id')
sched.add_job(job_func2, 'cron', hour=11, minute=56)
sched.start()

运行结果:
在这里插入图片描述

9.暂停和恢复工作

你可以通过Job实例或调度器本身轻松暂停和恢复任务。当任务暂停时,它的下一次运行时间将被清除,并且在任务恢复之前不会为其计算进一步的运行时间。
暂停任务,请使用以下任一方法:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务:

apscheduler.job.Job.resume()

apscheduler.schedulers.base.BaseScheduler.resume_job()

10.获取预定任务列表

通过get_jobs()就可以获得一个可修改的任务列表。get_jobs()第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。

为方便起见,print_jobs()可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。

from apscheduler.schedulers.blocking import BlockingScheduler


sched = BlockingScheduler()


def job_func1():
    print("Hello")


def job_func2():
    print("World")


job = sched.add_job(job_func1, 'cron', hour=14, minute=17)

sched.add_job(job_func2, 'cron', hour=14, minute=17)
sched.print_jobs()
sched.start()

在这里插入图片描述

11.修改任务

通过apscheduler.job.Job.modify()或modify_job(),你可以修改任务当中除了id的任何属性。

job.modify(max_instances=6, name='Alternate name')

如果你想重新安排任务——即更改其触发器,你可以使用 apscheduler.job.Job.reschedule()或 reschedule_job()。这些方法为任务构造一个新的触发器,并根据新的触发器重新计算其下一次运行时间。

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

12.关闭调度器

要关闭调度器:

scheduler.shutdown()

默认情况下,调度器关闭其任务存储器和执行器,并等待所有当前正在执行的任务完成。如果你不想等待,你可以这样做:

scheduler.shutdown(wait=False)

这仍将关闭任务存储器和执行器,但不会等待任何正在运行的任务完成。

13.暂停/恢复任务处理

可以暂停正在执行任务的处理:

scheduler.pause()

恢复任务:

scheduler.resume()

也可以在调度器启动时,默认所有任务设为暂停状态:

scheduler.start(paused=True)

13.限制同时执行的任务实例数

默认情况下,每个任务只允许同时运行一个实例。这意味着,如果任务即将运行,但前一次运行尚未完成,则将最新运行视为丢失。为了避免这种情况的发生,通过在添加任务时使用max_instances关键字参数,可以设置调度器允许并发运行的特定任务的最大实例数。

14.丢失任务的执行与合并

有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的misfire_grace_time参数int值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。

但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。coalescing合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,coalescing为True能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。

注意:如果任务的执行由于池中没有可用的线程或进程而延迟,则执行器可能会由于运行太晚(与其最初指定的运行时间相比)而跳过它。如果这可能发生在你的应用程序中,你可能需要增加执行器中的线程/进程数,或者将misfire_grace_time 设置调整为更高的值。

15.调度器事件

可以将事件侦听器附加到调度器。调度器事件在某些情况下被触发,并且可能在其中包含有关该特定事件详细信息的附加信息,比如当前运行次数等。通过add_listener()为其提供适当的mask参数,或将不同的常量组合在一起,可以只侦听特定类型的事件 。回调对象会有一个参数就是触发的事件。

有关events可用事件及其属性的详细信息,请参阅模块的文档。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
# 当任务执行完或任务出错时,调用my_listener
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

事件类型:

常量 描述 事件类
EVENT_SCHEDULER_STARTED 调度器已启动 SchedulerEvent
EVENT_SCHEDULER_SHUTDOWN 调度器被关 SchedulerEvent
EVENT_SCHEDULER_PAUSED 调度器中的任务处理已暂停 SchedulerEvent
EVENT_SCHEDULER_RESUMED 调度器中的任务处理已恢复 SchedulerEvent
EVENT_EXECUTOR_ADDED 一个执行器被添加到调度器 SchedulerEvent
EVENT_EXECUTOR_REMOVED 一个执行器被移除到调度器中 SchedulerEvent
EVENT_JOBSTORE_ADDED 任务存储已添加到调度器 SchedulerEvent
EVENT_JOBSTORE_REMOVED 任务存储已从调度器中删除 SchedulerEvent
EVENT_ALL_JOBS_REMOVED 所有任务都已从所有任务存储或一个特定任务存储中删除 SchedulerEvent
EVENT_JOB_ADDED 任务已添加到任务存储 JobEvent
EVENT_JOB_REMOVED 任务已从任务存储中删除 JobEvent
EVENT_JOB_MODIFIED 从调度器外部修改了任务 JobEvent
EVENT_JOB_SUBMITTED 任务已提交给其执行器以运行 JobSubmissionEvent
EVENT_JOB_MAX_INSTANCES 提交给其执行器的任务未被执行器接受,因为该任务已达到其最大并发执行实例数 JobSubmissionEvent
EVENT_JOB_EXECUTED 一个任务执行成功 JobExecutionEvent
EVENT_JOB_ERROR 任务在执行期间引发异常 JobExecutionEvent
EVENT_JOB_MISSED 错过了任务的执行 JobExecutionEvent
EVENT_ALL 包含所有事件类型的全能掩码 N/A

16.故障排除

如果调度器没有按预期工作,将记录apscheduler器的日志记录级别提高到该DEBUG级别会有所帮助 。

如果您还没有首先启用日志记录,您可以这样做:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

这应该提供许多关于调度器内部发生的事情的有用信息。

还要确保您检查了常见问题部分,看看您的问题是否已经有了解决方案。
报告错误
Github 提供了一个错误跟踪器

文章参考链接
  APScheduler官网-用户指南
  黑色小米粥-APScheduler官方文档翻译

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yuanfate/article/details/119945735

智能推荐

Policy-based Reinforcement learning_policy函数-程序员宅基地

文章浏览阅读4k次,点赞18次,收藏69次。强化学习这一章会讲基于策略的强化学习Value-Based Reinforcement Learning-DQN强化学习前言一、policy函数二、DQN2.1 游戏中agent的目标是什么?2.2 agent如何做决策?2.3 如何理解Q* 函数呢?2.5 DQN打游戏?三、如何训练DQN?3.1 TD算法3.2 TD算法训练DQN四、训练步骤六、总结前言说明一下:这是我的一个学习笔记,课程链接如下:最易懂的强化学习课程公众号:AI那些事一、policy函数我们回顾一下Acti_policy函数

project2016调配资源冲突-程序员宅基地

文章浏览阅读5.4k次,点赞9次,收藏26次。(1) Project查看资源负荷情况的方法和结果在工时类资源会存在资源过度分配(在同一个时间段给工时类资源分配的资源超出了他的最大单位)的情况,而成本类、材料类资源则不会有、查看资源负荷的方法有:在视图栏------资源图表如下图在这里我们可以看到每个资源的分配状况,如下图滚动鼠标滑轮就会出现不同的资源分配状况此时选择“资源”—“下一个资源过度分配处”如下图总结:甘特图、..._project2016调配资源冲突

推荐算法知识图谱模型(二):KGCN-程序员宅基地

文章浏览阅读235次。常用的KGE方法侧重于建模严格的语义相关性(例如,TransE和TransR假设头+关系=尾),这更适合于KG补全和链接预测等图内应用,而不是推荐。更自然、更直观的方法是直接设计一个图算法来利用KG结构。_图谱模型

ajax跨域与cookie跨域_一级域名 的cookie ajax 请求二级域名时获取cookie-程序员宅基地

文章浏览阅读389次。ajax跨域ajax跨域取数据(利用可以跨域加载js的原理 functioncallback(){ }这是需要返回这样一个js函数)ajax数据类型使用jsonp :如 ajax{ url:..._一级域名 的cookie ajax 请求二级域名时获取cookie

Flutter从0到1实现高性能、多功能的富文本编辑器(基础实战篇)_flutter 富文本-程序员宅基地

文章浏览阅读1.3k次,点赞2次,收藏2次。在上一章中,我们分析了一个富文本编辑器需要有哪些模块组成。在本文中,让我们从零开始,去实现自定义的富文本编辑器。注:本文篇幅较长,从失败的方案开始分析再到成功实现自定义富文本编辑器,真正的从0到1。— 完整代码太多, 文章只分析核心代码,需要源码请到代码仓库作为基础的富文本编辑器实现,我们需要专注于简单且重要的部分,所以目前只需定义标题、文本对齐、文本粗体、文本斜体、下划线、文本删除线、文本缩进符等富文本基础功能。//定义默认颜色​...///用户自定义颜色解析。_flutter 富文本

新一代异步IO框架——io_uring 架构-程序员宅基地

文章浏览阅读30次。近年来,Linux社区开发了一种新的异步IO框架,称为io_uring。io_uring通过提供高度可扩展和高性能的异步IO接口,有效地解决了传统异步IO框架中的一些性能瓶颈和限制。io_uring已经成为许多高性能应用程序的首选异步IO框架,为开发者提供了更好的IO处理能力。io_uring 架构是建立在Linux内核之上的,它使用了一组新的系统调用和内核机制,以提供高性能和低延迟的异步IO操作。io_uring的设计目标是提供一种简单而强大的接口,使得开发者可以轻松地利用异步IO的优势。

随便推点

购物车功能测试用例测试点整理思维导图方式_购物车测试点思维导图-程序员宅基地

文章浏览阅读8.2k次,点赞12次,收藏71次。_购物车测试点思维导图

使用matplotlib绘图实现动态刷新(动画)效果_matplotlib 动态刷新-程序员宅基地

文章浏览阅读4.8k次,点赞7次,收藏36次。最近在做四足的运动学仿真,因为这一段时间用python比较多,所以想直接用python做运动仿真,通过画图来展示步态和运动效果。了解了一下matplotlib库之后又参考了一些网上的博客,成功实现了绘图动态刷新的效果,类似动画效果。_matplotlib 动态刷新

Apache Kafka 可视化工具调研_kafka-console-ui-程序员宅基地

文章浏览阅读3k次。Apache Kafka 可视化工具调研_kafka-console-ui

如何编译部署独立专用服务端(Standalone Dedicated Server)【UE4】_ue4 独立服务器搭建-程序员宅基地

文章浏览阅读1.4k次。一、下载源码及编译原文链接首先需要unrealengine官网上注册并加入github开发组才有权限进入下面地址。https://github.com/EpicGames/UnrealEngine/tags注意:编译专用服务器,只能用源码编译版本的引擎,安装版本的引擎无法编译Server。打开页面后下载一个最新的release版本,解压出来后先运行Setup.bat,会自动下载资源..._ue4 独立服务器搭建

Hadoop 序列化机制_hadoop final-程序员宅基地

文章浏览阅读493次。序列化是指将结构化对象转化为字节流以便在网络上传输或者写到磁盘上进行永久存储的过程,反序列化是指将字节流转回结构化对象的逆过程序列化用于分布式处理的两大领域,进程间通信和永久存储。在Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用”(remote procedure call, RPC)实现的。RPC将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流饭序列化为原始..._hadoop final

tinymce富文本编辑器实现本地图片上传_tinymce images_upload_handler-程序员宅基地

文章浏览阅读5.7k次,点赞3次,收藏6次。在开发过程中使用tinymce富文本编辑器,发现他的图片上传默认是上传网络图片那么如何实现上传本地图片呢,上官网逛一圈,发现其实很简单在官网中找到下面这张图片,并且有相关的例子这里,我使用了自定义函数images_upload_handler (blobInfo, success, failure) { const url = 'uploadImg' ..._tinymce images_upload_handler