14个文本转图像AI API
杂谈-FastAPI中的异步后台任务之Celery篇
前言
前些时间有人问起关于如何在FastAPI中进行异步任务的处理。对此也刚好有一定的使用经验,借此机会也谈谈一下。
异步任务实现方式
在FastAPI中的异步任务的方式有几种: 在FastAPI中,有以下几种方式可以处理异步任务: 首先我们所熟知的异步协程或线程等方式实现异步任务是最常见,如下我们最场景的就是基于协程方式的异步API接口。如我们经常使用async/await关键字:可以在路由处理函数中使用async/await关键字来处理异步任务。例如:
from fastapi import FastAPI
app = FastAPI()
@app.get("/async_task")
async def async_task():
# 异步任务的代码,并等待执行完成
await some_async_function()
return {"message": "OK"}
但是有点我们是需要注意,上面的只是说我们实现了异步任务,但是由于我们在接口中进行await,所以尽管的任务是异步,但是还是需要等待,我们本节主要是学习相关异步化后台的任务,也就是说我们某些任务放置到后台或其他地方去执行,不影响当前主线程的运行。
BackgroundTasks异步化的后台任务实现方式
在前面示例中,对于的任务执行了await等待,就会进行任务挂起并等待。假如some_async_function()的任务需要耗时比较久的话,且不在意它相关的处理结果,或者结果可以通过另一种方式进行通知的话,我们可以把some_async_function()的任务进行后台话,可以理解为放到另一个线程去执行,而不而不需要await等待,如之前的代码,我们可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks类来处理后台任务,如下示例代码:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def some_async_function():
# 后台任务的代码
pass
@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
background_tasks.add_task(some_async_function)
return {"message": "后台任务添加成功"}
当然上面这种方式也不合适长期运行耗时的任务,所以引出我们本小结主要需要了解的Celery库,—芹菜. 使用Celery我们可以基于异步任务队列的方式来处理异步任务。
Celery库—芹菜.异步化的后台任务实现方式
下面我一步一步的介绍如何基于Celery实现异步任务以及延迟任务。
首先我们从官网介绍总结一下关于Celery,它是一个基于Python的分布式任务队列框架,用于实现异步任务的调度和执行。它的主要作用是可以帮助我们将耗时的任务从主线程中分离出来,不因任务的耗时而空等,以此来提高我们系统的并发性和响应速度。
Celery是用来处理异步任务,所以我们可以使用它来处理以下类似的一些业务场景,例如发送电子邮件、生成报表、处理大量数据等。通过将这些任务放入任务队列中,可以让主线程继续处理其他请求,而不需要等待任务完成。当然Celery还提供定时任务的调度功能,可以让我们按照设定的时间间隔或者时间点执行任务。
Celery简单使用步骤
通常我们使用Celery需要进行以下相关以下大致的几个步骤:
- 实例化Celery对象来创建一个Celery应用。
- 基于已实例化的任务的实例对象来定于该实例所包含的任务:通常我们是使用装饰器将函数注册为Celery任务,并设置任务的参数和返回值。(加入我们的是在相关的Fastapi框架进行使用,通常我们需要做就是实例化对象并声明任务,并进行任务的发布,通常是通过调用Celery应用的apply_async()方法来提交任务到任务队列中。)
3.当我们的任务发布之后,任务会进入一个消息队列里面进行等待,等待消费者去消费,所以接下里我们需要启动Celery Worker:启动Celery Worker的消费者对象来处理任务队列中的任务。
例如:
1 定义任务和发布任务
主要我们这里使用消息代理的中间件是redis,所以是先启动我们的redis,
接着定义相关任务,如下示例代码:
from fastapi import FastAPI
from celery import Celery
app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
# 定义任务
@celery_app.task
def some_celery_task():
# Celery任务的代码
pass
@app.get("/celery_task")
async def celery_task():
# 开始发布任务
some_celery_task.delay()
return {"message": "Celery 任务发布成功"}
if __name__ == "__main__":
# 使用os.path.basename函数获取了当前文件的名称,并将.py文件扩展名替换为空字符串\
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
from pathlib import Path
# 使用Path函数获取了当前文件的名称,并将.py文件扩展名替换为空字符串\
# app_modeel_name = Path(__file__).name.replace(".py", "")
import uvicorn
import inspect
# 根据文件路径返回模块名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函数运行了一个应用程序。它指定了应用程序的主机和端口,并且设置了reload参数为True。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)
启动应用并发布任务:
在FastAPI应用程序,当调用 some_celery_task.delay() 来发布任务时,Celery将把任务放入Redis队列中。
2 启动Celery worker进程,进行任务消费
在命令行中,切换到您的项目目录。启动Celery worker进程。在命令行中运行以下命令:
celery -A main.celery_app worker --loglevel=info
启动后得到如下图所示的结果:
Celery worker进程将从Redis队列中获取任务,并执行任务所定义的代码。当任务被执行时,您将在Celery worker进程的日志中看到相关的日志消息。但是我们运行时候遇到了问题如下:
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}
The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 642, in on_task_received
strategy = strategies[type_]
~~~~~~~~~~^^^^^^^
出现上述原因问题在于我们的:
@celery_app.task
def some_celery_task():
# Celery任务的代码
pass
没有进行相关实例绑定,我们可以修改为:
@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任务的代码
import time
time.sleep(10)
pass
然后每次发布任务的时候,重新观察消费者的任务输出信息如下:
如上的输出,表示我们的任务已经被正常进行消费了!
Celery 详解之相关参数项
在上面实例我们已简单完成相关异步任务的处理,通常我们一般需要使用提各种参数来配置和控制其行为。下面介绍一下Celery一些常用的配置项:
PS:配置选项的名称和具体含义可能会因Celery的版本而有所不同。
- broker:
- 类型:字符串
- 默认值:
'amqp://guest:guest@localhost//'
- 描述:指定消息代理(broker)的URL地址,用于任务消息的传输。Celery支持多种消息代理,如RabbitMQ、Redis等。
- backend:
- 类型:字符串
- 默认值:
None
- 描述:指定结果存储后端的URL地址。当任务执行完成后,结果将被存储在指定的后端中,以供后续查询和获取。
- include:
- 类型:列表
- 默认值:
[]
- 描述:指定要包含的任务模块列表。这些任务模块中的任务函数将被Celery自动发现和注册。
- task_track_started:
- 类型:布尔值
- 默认值:
False
- 描述:设置为
True
时,Celery将跟踪任务的开始状态,并在任务开始时发送任务状态更新。
- task_time_limit:
- 类型:整数
- 默认值:
None
- 描述:设置任务的最大运行时间(以秒为单位)。如果任务执行时间超过此限制,Celery将会终止任务。
- task_soft_time_limit:
- 类型:整数
- 默认值:
None
- 描述:设置任务的软时间限制(以秒为单位)。软时间限制是一个警告机制,当任务执行时间接近限制时,Celery会发送警告通知。
- task_acks_late:
- 类型:布尔值
- 默认值:
False
- 描述:设置为
True
时,Celery将在任务执行完成后再发送确认消息。这可以确保即使在任务执行期间发生错误,任务也不会丢失。
- task_ignore_result:
- 类型:布尔值
- 默认值:
False
- 描述:设置为
True
时,Celery将不会存储任务的执行结果。这可以节省存储资源,适用于不关心任务结果的情况。
除了之前提到的常用配置项外,以下是一些其他常见的Celery配置项的说明:
- task_serializer:
- 类型:字符串
- 默认值:
'json'
- 描述:指定任务消息的序列化器。Celery支持多种序列化器,如JSON、pickle等。
- result_serializer:
- 类型:字符串
- 默认值:与
task_serializer
相同 - 描述:指定任务结果的序列化器。任务执行完成后,结果将使用指定的序列化器进行序列化。
- task_default_queue:
- 类型:字符串
- 默认值:
'celery'
- 描述:指定任务的默认队列名称。当任务没有指定队列时,将使用此默认队列。
- task_default_exchange:
- 类型:字符串
- 默认值:
'celery'
- 描述:指定任务的默认交换机名称。当任务没有指定交换机时,将使用此默认交换机。
- task_default_routing_key:
- 类型:字符串
- 默认值:与
task_default_queue
相同 - 描述:指定任务的默认路由键。当任务没有指定路由键时,将使用此默认路由键。
- worker_concurrency:
- 类型:整数
- 默认值:根据当前系统的CPU核心数动态调整
- 描述:指定每个工作进程的并发任务数。可以根据系统的性能和资源情况进行调整。
- worker_prefetch_multiplier:
- 类型:整数
- 默认值:
4
- 描述:指定工作进程从消息队列中预取任务的数量的乘数。较高的值可以提高任务处理的吞吐量。
- beat_schedule:
- 类型:字典
- 默认值:
{}
- 描述:指定定时任务的调度配置。您可以定义多个定时任务,并指定它们的执行时间和要执行的任务函数。
······其他参数说明,有需要我得话,我在去官方文档查阅接口。接下里我,我们看看和它对于就是所谓得配置文件。配置文件其实和上面所谓得参数是没有区别得,也就是说,我们可以使用其他的方式来定义我们的参数传入,如这些配置项我们还可以可以在Celery的配置文件中进行设置,通常命名为celeryconfig.py或celery.py。如下的示例代码所示:
下面是一个使用配置文件的示例:
- 首先我们创建一个名为
celeryconfig.py
的配置文件,里面包含的内容如下:
# celeryconfig.py
# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab
broker_url = 'amqp://guest:guest@localhost//' # 消息代理的URL
result_backend = 'redis://localhost:6379/0' # 结果存储后端的URL
task_serializer = 'json' # 任务的序列化器
result_serializer = 'json' # 任务结果的序列化器
timezone = 'Asia/Shanghai' # 使用的时区
task_acks_late = True # 启用延迟确认
task_ignore_result = False # 不忽略任务结果
task_soft_time_limit = 60 # 任务的软时间限制为60秒
task_time_limit = 120 # 任务的硬时间限制为120秒
worker_prefetch_multiplier = 4 # 任务执行者的预取乘数
worker_concurrency = 8 # 每个任务执行者的并发处理数量
worker_max_tasks_per_child = 100 # 每个任务执行者最大处理任务数
task_default_priority = 0 # 任务的默认优先级
task_routes = {
'myapp.tasks.email_task': {'queue': 'email_queue'}, # 指定任务的队列
'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False # 不重写根日志记录器
worker_disable_rate_limits = False # 不禁用任务速率限制
beat_schedule = {
'task1': {
'task': 'myapp.tasks.task1',
'schedule': crontab(minute='*/15'), # 每15分钟执行一次
},
'task2': {
'task': 'myapp.tasks.task2',
'schedule': timedelta(seconds=30), # 每30秒执行一次
}
}
在你的Celery应用程序中加载配置文件。
# celery_app.py
from celery import Celery
app = Celery('myapp')
app.config_from_object('celeryconfig')
在上述示例中,我们创建了一个名为celeryconfig.py
的配置文件,并在其中设置了各种Celery的配置选项。然后,我们在Celery应用程序的入口文件celery_app.py
中加载了该配置文件。
你可以根据自己的需求和应用程序的特定要求,在配置文件中进行相应的配置。注意,Celery的配置文件应该位于与你的Celery应用程序代码相同的目录中,或者可以通过正确的路径进行引用。
确保在启动Celery应用程序时,使用正确的配置文件加载配置。例如,通过以下命令启动Celery Worker:
celery -A celery_app worker --loglevel=info
PS: 通常win系统下可能你需要使用如下的方式启动:
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 启动celery监控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
获取使用代码的方式启动:
from src.tasks.app import celery_app
if __name__ == '__main__':
# python -m celery -A src.tasks.app worker -Q mm_ring_v2 --loglevel=info -P eventlet
celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info', '-Q', 'mm_ring_v2','-P', 'eventlet'])
这样,Celery将会根据配置文件中的设置来运行任务调度和执行过程。
Celery延时任务的执行
延迟任务场景我们也通常会遇到,比如超时未支付则自当取消订单等。下面举例说明一下具体的实现过程:
基本任务项目结果:
1:定义Celery实例对象
from celery.signals import task_failure, task_retry, task_success
from celery.exceptions import MaxRetriesExceededError
from celery import Task
from src.tasks.service import TaskFactory
# 任务工厂,可以创建多个,目前只是创建了一个celery_app对象
factory = TaskFactory()
# 创建第一个 Celery 应用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False # 禁止重定向工作进程的标准输出和标准错误流
def get_pending_tasks():
with celery_app.connection() as connection:
tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
return tasks
# 移除任务队列中的所有等待执行的任务
def remove_pending_tasks():
with celery_app.connection() as connection:
connection.default_channel.queue_purge(queue='celery')
# 重新调度任务
def reschedule_tasks():
pending_tasks = get_pending_tasks()
remove_pending_tasks()
# task_prerun:任务开始运行前触发的信号。
# task_postrun:任务运行完成后触发的信号。
# task_success:任务成功完成时触发的信号。
# task_failure:任务失败时触发的信号。
# task_retry:任务重试时触发的信号。
# task_revoked:任务被撤销时触发的信号。
# task_rejected:任务被拒绝时触发的信号。
# worker_ready:Worker准备就绪时触发的信号。
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
print("全局异常错误获取!!", sender, task_id)
pass
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
print("全局异常错误获取!!", sender, task_id)
pass
@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
# # 处理任务失败的逻辑
if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
print("全局异常错误获取!!", sender, task_id)
pass
@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 处理任务成功完成的逻辑
print("handle_task_success!", sender, result)
pass
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
# 启动celery监控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555
定义实例配置信息:
# !/usr/bin/envpython
# -*-coding:UTF-8-*-
'''
@File : __init__.py.py
@Contact : 308711822@qq.com
@License : (C) Copyright 2021-2225, Personal exclusive right.
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2023/6/16 11:25 小钟同学 1.0 None
'''
from kombu import Queue, Exchange
class Config:
# broker = 'redis://127.0.0.1:6379/1' # 任务储存
# backend = 'redis://127.0.0.1:6379/2' # 结果存储,执行完之后结果放在这
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
# 全局的任务过期时间
# app.conf.update(result_expires=60)
# 定义队列名称
CELERY_QUEUES = (
Queue('mm_ring_v2'),
)
# CELERY_QUEUES = (
# Queue('default', exchange=Exchange('default'), routing_key='default'),
# Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
# Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任务走什么对了和routing_key
# CELERY_ROUTES = {
# 'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
# 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加载的任务
# 指定要导入的任务模块或任务文件列表
# celery_app.conf.update(
# include=[
# 'src.tasks.task.migu_order_sync_status',
# ]
# )
# 配置需要执行的任务所在的目录
CELERY_INCLUDE = [
'src.tasks.task.migu_order_sync_status',
]
class DevelopmentConfig(Config):
DEBUG = True
class ProductionConfig(Config):
DEBUG = False
2:定义任务:
import datetime
import signal
from celery import current_app
from celery.result import AsyncResult
from pydantic import BaseModel
def exponential_backoff(retries):
return 2 ** retries
# 定义延迟执行的任务
# expires=3600 将任务结果的存储有效期设置为 1 小时。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):
# self.request.retries 表示当前重试的次数
······
需要说明一点是,@celery_app.task:这是一个装饰器,用于将一个普通的Python函数注册为Celery任务。celery_app 是你的 Celery 应用实例,task 是 Celery 提供的装饰器函数。且这个的任务相关参数项说明如下
- bind=True:这个参数用于指定任务函数的第一个参数(通常被命名为self)将会绑定到任务实例上,使得在任务函数内部能够通过self访问任务实例的属性和方法。
- max_retries=5:指定任务在发生错误时最多重试的次数。如果任务执行失败,Celery 将会自动重试该任务,最多重试次数由该参数指定。
- default_retry_delay=1:指定任务在发生错误后进行重试之前的默认延迟时间,单位为秒。也就是说,在每次重试之间等待的秒数。
- retry_backoff=exponential_backoff(2):这个参数用于指定重试延迟时间的增长规律。exponential_backoff 是一个用于指数级增长延迟时间的函数,其中的 2 表示指数底数。也就是说,重试的延迟时间会按照指数级增长。
- expires=3660:指定任务的过期时间,单位为秒。如果任务在指定的时间内没有被执行,将会被标记为过期并丢弃。
3:在API接口发布任务
@router.get("/put/code", summary="订单提交")
def callback(*, forms: PutCodeForm = Depends()):
.....
# 发布任务
result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')
if result == -1:
return Fail(message='提交失败')
return Success(message='提交成功')
各个个参数项说明如下:
args=(Orders.orderid, Orders.mobile)
:这个参数用于指定要传递给 Celery 任务的位置参数,即任务函数在执行时所需的参数值。在这个例子中,任务函数需要两个参数,分别对应Orders.orderid
和Orders.mobile
。countdown=60
:指定任务在被放入队列之后需要延迟多少秒才开始执行。在这个例子中,任务会在被加入队列后延迟 60 秒后开始执行。retry=5
:指定任务在发生错误时最多重试的次数。与之前提到的max_retries
类似,但这里是针对单次任务调用的重试次数。eta=eta
:这个参数用于指定任务的预计执行时间。通常用于将任务调度到未来的某个时间点执行,而不是立即执行。expires=expires
:同样的意义,指定任务的过期时间,单位为秒。如果任务在指定的时间内没有被执行,将会被标记为过期并丢弃。queue='mm_ring_v2'
:指定任务被发送到的队列名称。Celery 支持将任务发送到不同的队列中,以便进行任务的分类和分配。
综合起来,这些参数的设置使得对该 Celery 任务的调用具有了一定的灵活性。你可以控制任务的延迟执行、重试次数、预计执行时间以及过期时间,并且可以指定任务发送到哪个队列中。
4:启动消费者
if __name__ == '__main__':
pass
# D:\code_loacl\mm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
# celery -A tasks worker --loglevel=info -P eventlet
5:启动celery监控和管理Flower
本文章转载微信公众号@程序员小钟同学