PythonWeb开发框架—Flask-APScheduler超详细使用讲解

06-01 1149阅读

1.定时任务的两种实现方式

1.1 用@scheduler.task装饰任务

安装插件:

pip install Flask-APScheduler

pip install apscheduler

脚本实现:

###app.py
##导入依赖库
from flask import Flask
import datetime
import config
from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#创建Flask对象
app = Flask(__name__)
#配置执行器
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 10}}
#配置作业存储器
app.config['SCHEDULER_JOBSTORES'] = {'default': {'type': 'memory'}}
app.config.from_object(config.Config)
#创建APScheduler对象,调度器是BackgroundScheduler
scheduler=APScheduler(scheduler=BackgroundScheduler(daemon=True))
scheduler.init_app(app)
scheduler.start()
#以装饰器@scheduler.task定义任务,触发器为interval,每1秒执行一次
@scheduler.task(id='do_job_1',trigger='interval',  minutes=1)
def task():
    current_time=datetime.datetime.now()
    print("装饰器定时任务开始执行时间:{}".format(current_time))
if __name__ == '__main__':
    app.run()

定时任务测试结果:

PythonWeb开发框架—Flask-APScheduler超详细使用讲解

1.2 用add_job的方式

脚本实现:

###app.py
##导入依赖库
from flask import Flask
import datetime
import config
from flask_apscheduler import APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
#创建Flask对象
app = Flask(__name__)
#配置执行器
app.config['SCHEDULER_EXECUTORS'] = {'default': {'type': 'threadpool', 'max_workers': 10}}
#配置作业存储器
app.config['SCHEDULER_JOBSTORES'] = {'default': {'type': 'memory'}}
app.config.from_object(config.Config)
#创建APScheduler对象,调度器是BackgroundScheduler
scheduler=APScheduler(scheduler=BackgroundScheduler(daemon=True))
scheduler.init_app(app)
scheduler.start()
#定义任务
def api_task():
    current_time = datetime.datetime.now()
    print("API调用定时任务开始执行时间:{}".format(current_time))
#以调用API的方式定义定时任务
scheduler.add_job(id='do_job_2',func=api_task,trigger='interval', minutes=1)
if __name__ == '__main__':
    app.run()

定时任务测试结果:

PythonWeb开发框架—Flask-APScheduler超详细使用讲解

在上面的实现方式中,都涉及4个组件:调度器(scheduler),执行器(executor),作业存储器(job stores),触发器(trigger)。接下来详细介绍一下四个组件各自又包含哪些分类以及各分类又适合哪些场景。

2.调度器(scheduler)

Flask-APScheduler调度器支持七种,分别如下:

  • BlockingScheduler:启动后在当前进程的主线程中运行,所以会阻塞当前正在运行的线程,直到调度任务执行完成或手动终止才会释放,适合只需要运行一次的任务
    from apscheduler.schedulers.blocking import BlockingScheduler
    def task_job():
        print("xxxx")
    if __name__ == "__main__":
        # 创建 BlockingScheduler 实例
        scheduler = BlockingScheduler()
        scheduler.add_job(job, 'interval', seconds=5, id='my_job')
        scheduler.start()
    • BackgroundScheduler:独立启动一个线程执行,不会阻塞主线程,适合高频或者需要长时间运行的任务
      from flask_apscheduler import APScheduler
      from apscheduler.schedulers.background import BackgroundScheduler
      scheduler = APScheduler(scheduler=BackgroundScheduler(daemon=True))
      scheduler.init_app(app)
      scheduler.start()
      • AsyncIOScheduler:异步调度器,任务函数需为 async 协程(一种特殊的函数,允许任务在等待 I/O 操作时释放 CPU 资源,避免阻塞主线程,函数的定义方法:async def func())。适合高并发 I/O 密集型任务
        from apscheduler.schedulers.asyncio import AsyncIOScheduler
        scheduler = APScheduler(scheduler=AsyncIOScheduler())
        scheduler.init_app(app)
        scheduler.start()
        • GeventScheduler:高并发调度器,利用 Gevent(高性能python并发网络库) 的协程实现并发,需使用 gevent.monkey.patch_all() 打补丁,适合大量并发但无需复杂异步代码的任务
          from gevent import monkey
          monkey.patch_all()
          from apscheduler.schedulers.gevent import GeventScheduler
          scheduler = APScheduler(scheduler=GeventScheduler())
          scheduler.init_app(app)
          scheduler.start()
          • TornadoScheduler:异步web调度器,适合高并发的实时通信web服务、长连接服务或自定义协议服务的任务
            from apscheduler.schedulers.tornado import TornadoScheduler
            scheduler = APScheduler(scheduler=TornadoScheduler())
            scheduler.init_app(app)
            scheduler.start()
            • TwistedScheduler:事件驱动异步网络调度器,适合高度定制化异步流程的任务,比如游戏服务,网络客户端,分布式系统。
              from apscheduler.schedulers.twisted import TwistedScheduler
              scheduler = APScheduler(scheduler=TwistedScheduler())
              scheduler.init_app(app)
              scheduler.start()
              • QtScheduler:任务在 Qt 主线程中执行,适合桌面应用的定时任务
                from apscheduler.schedulers.qt import QtScheduler
                scheduler = APScheduler(scheduler=QtScheduler())
                scheduler.init_app(app)

                3.触发器(trigger)

                触发器分为三种,分别如下:

                • date触发器:日期触发器,在指定时间点执行一次任务,参数:      

                  run_date:任务执行的具体时间,格式:datetime/str

                  timezone:指定时区,不指定的话,默认用全局配置的时区,如果全局也没有配置,则用UTC时区。

                  • interval触发器:按固定时间间隔重复执行任务,参数:

                    weeks:间隔几周执行一次

                    days:间隔几天执行一次

                    hours:间隔几小时执行一次

                    minutes:间隔几分钟执行一次

                    seconds:间隔几秒执行一次

                    start_date:首次执行时间,可选,默认立即执行

                    end_date:停止执行时间,可选,默认无限制

                    timezone:时区,可选,默认使用调度器的全局时区

                    jitter:延迟秒数,可选,防止多个任务同时触发

                    • Cron 触发器:cron表达式触发器,表达式参数:

                      year:年,4位,如2025

                      month:月,1~12,*/5表示每5个月

                      day:日,1~31,*/5表示每5天

                      week:一年的周数

                      day_of_week:周几,0~6

                      hour:小时,0~23,*/5表示每5小时

                      minute:分钟,0~59,*/15表示每15分钟

                      second:秒,0~59

                      start_date:首次执行时间,可选,默认立即执行

                      end_date:停止执行时间,可选,默认无限制

                      timezone:时区,可选,默认使用配置的全局时区

                      jitter:延迟秒数,可选,防止多个任务同时触发

                      4.执行器(executor)

                      APScheduler提供的执行器有7种,分别如下:

                      • ThreadPoolExecutor:线程池执行器

                        type为:threadpool;

                        参数:max_workers:线程池里的最大线程数,默认为10

                        app.config['SCHEDULER_EXECUTORS'] = {
                            'default': {'type': 'threadpool', 'max_workers': 30}
                        }
                        • ProcessPoolExecutor:进程池执行器

                          type为:processpool

                          参数:max_workers:进程池的最大进程数,默认 5,不过受CPU核数限制

                          app.config['SCHEDULER_EXECUTORS'] = {
                              'default': {'type': 'processpool', 'max_workers': 4}
                          }
                          • GeventExecutor:Gevent协程执行器

                            type为:gevent

                            app.config['SCHEDULER_EXECUTORS'] = {
                                'default': {'type': 'gevent'}
                            }
                            • AsyncIOExecutor:Asyncio异步执行器

                              type为:asyncio

                              app.config['SCHEDULER_EXECUTORS'] = {
                                  'default': {'type': 'asyncio'}
                              }
                              • TornadoExecutor:Tornado执行器

                                type为:tornado

                                app.config['SCHEDULER_EXECUTORS'] = {
                                    'default': {'type': 'tornado'}
                                }
                                • TwistedExecutor:Twisted执行器

                                  type为:twisted

                                  app.config['SCHEDULER_EXECUTORS'] = {
                                      'default': {'type': 'twisted'}
                                  }
                                  • DebugExecutor:调试执行器

                                    type为:debug

                                    app.config['SCHEDULER_EXECUTORS'] = {
                                        'default': {'type': 'debug'}
                                    }

                                    5.作业存储器(job stores)

                                    作业存储器(job stores)负责持久化存储定时任务的信息,如任务 ID、执行状态、触发器规则等,Flask-APScheduler支持的作业存储器类型有5种,分别是:

                                    • MemoryJobStore:任务信息存储在内存中,暂停重启工程任务信息会丢失
                                    • SQLAlchemyJobStore:任务信息存储在SQLite、MySQL、PostgreSQL数据库中,任务信息持久不会丢失
                                    • RedisJobStore:任务信息存储在Redis中,支持分布式环境,适合高并发、分布式部署的任务
                                    • MongoDBJobStore:任务信息存储在MongoDB中,适合非结构化数据
                                    • ZooKeeperJobStore:使用 Apache ZooKeeper 存储任务,适合高可用性要求的环境
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码