想知道如何保护使用 FastAPI Web 框架构建的 API?阅读我们最新的博客文章,我们将指导您了解 FastAPI 安全的最佳实践。探索这些技术如何不仅增强 Web 应用程序的安全性,而且为您的开发之旅带来实实在在的好处。
在本指南中,Escape 的安全研究团队收集了最重要的技巧,以保护您的 FastAPI 应用程序免受潜在的数据泄露。我们的目标是让您能够创建更具弹性和更高效的 FastAPI 项目。让我们开始吧!
FastAPI 是一个现代的 Python Web 框架,具有几个关键功能(如果你不是初学者,请跳到下一节查看技术文章):
要开始使用 FastAPI,您首先需要安装它。幸运的是,安装 FastAPI 是一件轻而易举的事。您只需要 Python 3.7(或更高版本)和 pip(Python 包安装程序)。只需打开您的终端或命令提示符并运行命令 .安装完成后,您就可以开始使用 FastAPI 了。就是这么简单!pip install fastapi
FastAPI 凭借其直观而强大的功能使构建 API 变得容易。您可以通过定义函数和使用装饰器轻松创建路由。@app.route()
为此,请使用 Python 类型提示定义具有自动请求和响应验证的路由:
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str = None
async def create_item(item: Item):
return item # FastAPI automatically validates and serializes the response
async def read_item(item_id: int, q: str = None):
return {"item_id": item_id, "q": q}
每个路由可以有不同的方法,例如 GET、POST、PUT、DELETE 等,允许您处理不同类型的请求。
使用 FastAPI,您只需定义函数参数即可轻松处理查询参数、请求体、路径参数等。这使得访问和处理客户端发送的数据非常方便。总体而言,FastAPI 简化了构建 API 的过程,并允许您专注于编写应用程序的核心逻辑。
在 FastAPI 中,处理和验证请求变得简单高效。只需几行代码,您就可以定义每个请求中的预期结构和数据类型。
FastAPI 使用我们之前提到的 Pydantic 库,它允许您创建具有与请求的预期 JSON 正文相对应的属性和类型的 Python 类。这样,您可以自动验证传入数据并确保它与预期格式匹配。
FastAPI 还提供了方便的装饰器,例如 和 来处理不同类型的 HTTP 请求。您可以通过对象轻松访问请求参数、标头和 Cookie。这使您可以根据自己的特定需求灵活地处理和验证请求。@app.post()
为了说明这一点,让我们创建一个 FastAPI 应用程序,它使用 Pydantic 模型定义和验证请求数据:
from fastapi import FastAPI, Requestfrom pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str = None
async def create_item(item: Item):
return {"item": item}
async def read_item(item_id: int, request: Request):
headers = request.headers
cookies = request.cookies
return {"item_id": item_id, "headers": headers, "cookies": cookies}
FastAPI简化了数据库集成。你只需安装数据库驱动程序,导入它,并建立连接。依赖注入(Dependency Injection)确保有一个单一的连接实例,用于高效的数据库交互和CRUD(创建、读取、更新、删除)操作。你可以使用像SQLAlchemy或Tortoise-ORM这样的对象关系映射(ORM)工具来定义模型、查询和操作数据。
您是实践型学习者吗?不要犹豫,深入研究我们精选的易受攻击的 FastAPI 应用程序,以发现它们的漏洞,利用并尝试修复它们:
为确保与 FastAPI 应用程序的安全通信,强烈建议使用 HTTPS。
您可以使用数字证书来启用安全通信,并且 uvicorn 中的 and 选项允许您指定 SSL 证书和密钥文件。--cert
以下是如何使用 FastAPI 启用 HTTPS 的示例:
from fastapi import FastAPIimport uvicorn
app = FastAPI()
async def root():
return {"message": "Hello World"}
if __name__ == "__main__":
uvicorn.run("main:app", host="", port=8000, ssl_keyfile="path/to/key.pem", ssl_certfile="path/to/cert.pem")
通过遵循此最佳实践并使用 HTTPS,您可以增强 FastAPI 应用程序的安全性并保护用户的敏感数据。
FastAPI 为验证和清理用户输入提供了内置支持,这对于确保应用程序的安全性至关重要。
通过验证和清理用户输入,您可以防止潜在的安全漏洞,例如 SQL 注入、跨站点脚本 (XSS) 攻击和 CSRF(跨站点请求伪造)。在本文中,我们将深入探讨如何使用高级外部库来防范最后两个漏洞。
FastAPI 允许您使用 为输入数据定义模型,它会根据指定的规则自动验证和清理传入数据。例如,您可以定义一个模型,用于指定每个输入字段的预期数据类型和约束:Pydantic
from fastapi import FastAPI, Queryfrom pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str = None
price: float
tax: float = None
async def create_item(item: Item):
# Here, item is automatically validated against the Item model
# If validation fails, FastAPI will automatically return an error response
# with details about the validation errors.
# Once validated, you can safely use the data in your application logic
item_dict = item.dict()
price_with_tax = item.price + (item.tax or 0)
item_dict.update({"price_with_tax": price_with_tax})
# Here you can use the validated and sanitized data as needed
return item_dict
实现速率限制是增强 FastAPI 应用程序安全性的基本做法。
FastAPI 通过外部库(如 或 )提供对速率限制和限制的内置支持。只需几行代码,您就可以根据 IP 地址、用户或您选择的任何其他条件配置速率限制。fastapi-limiter
您可以通过 pip 安装:fastapi-limiter
pip install fastapi-limiter
from fastapi import FastAPIfrom fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
app = FastAPI()
# Initialize FastAPILimiter
limiter = FastAPILimiter(app)
async def read_items():
return {"message": "This endpoint is rate-limited."}
实施 JWT 身份验证
最安全的身份验证机制通常涉及使用具有适当安全措施的 JWT (JSON Web Tokens)。以下是使用 FastAPI 实现基于 JWT 的身份验证的方法,并结合了最佳 API 安全实践:
您可以使用 以下命令进行安装 :pyjwt
pip install pyjwt
from pydantic import BaseModel
class User(BaseModel):
username: str
password: str
实施负责对用户进行身份验证和生成 JWT 令牌的身份验证服务。
from fastapi import HTTPException, Dependsfrom fastapi.security import OAuth2PasswordBearer
import jwt
from passlib.context import CryptContext
# Secret key for signing JWT tokens. We'll provide example including secret key rotation below.
SECRET_KEY = "hello-from-escape-team"
# Algorithm used for JWT token encoding
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 password bearer flow for token retrieval
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Mock database of users (replace with your user database logic)
fake_users_db = {
"user1": {
"username": "user1",
"password": "$2b$12$AQyHPwd7p1s.GQvax/A3ve5wMey6NZuDXW1/FVhDpi8s/MV/Fo1LC", # hashed password: 'password1'
"disabled": False,
# Authentication function
def authenticate_user(username: str, password: str):
user = fake_users_db.get(username)
if not user or not pwd_context.verify(password, user["password"]):
return False
return user
# Token generation function
def create_access_token(data: dict):
encoded_jwt = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Dependency for extracting and verifying JWT token
def get_current_user(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
token_data = {"username": username}
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return token_data
实现一个端点,用于在成功进行身份验证时生成 JWT 令牌:
from fastapi import FastAPI, Dependsfrom fastapi.security import OAuth2PasswordRequestForm
app = FastAPI()
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
token_data = {"sub": user["username"]}
access_token = create_access_token(token_data)
return {"access_token": access_token, "token_type": "bearer"}
实施需要使用 JWT 令牌进行身份验证的受保护终端节点:
@app.get("/protected")async def protected_route(current_user: dict = Depends(get_current_user)):
return {"message": f"Beware, {current_user['username']}. This is a protected route."}
from fastapi import FastAPI, HTTPException, Dependsfrom fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from passlib.context import CryptContext
import jwt
import threading
import time
app = FastAPI()
# Secret key for signing JWT tokens
SECRET_KEY = os.environ.get("SECRET_KEY", None)
# If SECRET_KEY is not provided as an environment variable, generate a random one
if SECRET_KEY is None:
SECRET_KEY = ''.join(random.SystemRandom().choice(string.ascii_letters + string.digits) for _ in range(64))
# Algorithm used for JWT token encoding
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 password bearer flow for token retrieval
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Mock database of users (replace with your user database logic)
fake_users_db = {
"user1": {
"username": "user1",
"password": "$2b$12$AQyHPwd7p1s.GQvax/A3ve5wMey6NZuDXW1/FVhDpi8s/MV/Fo1LC", # hashed password: 'password1'
"disabled": False,
# Define User model
class User(BaseModel):
username: str
password: str
# Authentication function
def authenticate_user(username: str, password: str):
user = fake_users_db.get(username)
if not user or not pwd_context.verify(password, user["password"]):
return False
return user
# Token generation function
def create_access_token(data: dict):
encoded_jwt = jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Dependency for extracting and verifying JWT token
def get_current_user(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
token_data = {"username": username}
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication credentials")
return token_data
# Secret key rotation functionality
def rotate_secret_key():
"""Rotate the secret key."""
new_secret_key = generate_secret_key()
SECRET_KEY = new_secret_key
print("Secret key rotated.")
def schedule_key_rotation(interval_seconds):
"""Schedule secret key rotation at regular intervals."""
while True:
def generate_secret_key(length=32):
"""Generate a random string of letters and digits for the secret key."""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
if __name__ == "__main__":
# Start key rotation in a separate thread
rotation_interval_seconds = 3600 # Rotate key every hour (adjust as needed)
rotation_thread = threading.Thread(target=schedule_key_rotation, args=(rotation_interval_seconds,))
rotation_thread.daemon = True
# Run FastAPI app
# Implement the token endpoint for token generation
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
token_data = {"sub": user["username"]}
access_token = create_access_token(token_data)
return {"access_token": access_token, "token_type": "bearer"}
# Implement a protected endpoint that requires authentication using JWT tokens
async def protected_route(current_user: dict = Depends(get_current_user)):
return {"message": f"Beware, {current_user['username']}. This is a protected route."}
import uvicorn
uvicorn.run(app, host="", port=8000)
在 FastAPI 中实现基于角色的授权
基于角色的授权允许你根据经过身份验证的用户的角色或权限来控制对 FastAPI 应用程序中某些资源的访问。
from fastapi import Depends, FastAPI, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from typing import List
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def check_roles(roles: List[str]):
def decorator(func):
def wrapper(*args, **kwargs):
# Get the current user's roles from the database or token
# Check if any of the roles match the required roles
# If not, raise an HTTPException with 403 status code
# Else, continue with the execution of the function
return func(*args, **kwargs)
return wrapper
return decorator
def protected_route():
return {"message": "You have access to this protected route!"}
在此示例中,装饰器将所需角色的列表作为参数。在 decorator 中,您通常会从数据库或令牌中检索经过身份验证的用户角色,并将其与所需的角色进行比较。如果任何角色匹配,则装饰器允许访问路由或终端节点。否则,它将引发 an 并带有 403 状态代码,指示用户没有所需的角色。check_roles
通过使用这种方法,您可以轻松地在 FastAPI 应用程序中实现基于角色的授权,并根据用户角色控制对资源的访问。
另一个常见的漏洞是跨站点脚本 (XSS),可以通过在 HTML 模板中呈现用户生成的内容之前对其进行转义来缓解。
您可以集成第三方库或实施自定义中间件来添加 XSS 保护。
下面是如何使用该库来清理用户输入并防止 XSS 攻击的示例:nh3
首先,使用 pip 安装库:nh3
pip install nh3
然后,你可以使用 来清理 FastAPI 应用程序中的用户输入。用于清理 HTML 片段:nh3
>>> import nh3>>> nh3.clean("<unknown>hi")
>>> nh3.clean("<b><img src='' onerror='alert(\\'hax\\')'>XSS?</b>")
'<b><img src="">XSS?</b>'
要保护 FastAPI API 免受跨站点请求伪造 (CSRF) 攻击,您可以使用扩展 FastAPI CSRF Protect。
虽然还有其他方法可以在 FastAPI 中获得 CSRF 保护(比如 itsdangerous
库,可用于生成和验证 CSRF 令牌或使用 Piccolo-API 的中间件),但 FastAPI CSRF Protect 是获得 CSRF 保护的最安全、最灵活的方法之一。
pip install fastapi-csrf-protect
然后,你可以在 FastAPI 应用程序中实现 CSRF 保护,如下所示:
from fastapi import FastAPIfrom fastapi_csrf_protect import CsrfProtect
app = FastAPI()
# Initialize CsrfProtect with a secret key
csrf = CsrfProtect(app, app.secret_key)
async def read_root():
return {"message": "Hello from Escape"}
async def secure_endpoint():
return {"message": "This endpoint is CSRF protected"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="", port=8000)
请始终记住避免在代码中对密钥进行硬编码。AWS Secrets Manager 或 HashiCorp Vault 等解决方案非常适合在运行时提供密钥。以下示例说明了如何使用 Python 的 os 模块在 FastAPI 应用程序中实现密钥轮换以与环境变量交互:
import osimport random
import string
import time
from fastapi import FastAPI
app = FastAPI()
def generate_secret_key(length=32):
"""Generate a random string of letters and digits for the secret key."""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def rotate_secret_key():
"""Rotate the secret key."""
new_secret_key = generate_secret_key()
os.environ["SECRET_KEY"] = new_secret_key
def schedule_key_rotation(interval_seconds):
"""Schedule secret key rotation at regular intervals."""
while True:
print("Secret key rotated.")
async def read_root():
return {"message": "Hello from Escape"}
if __name__ == "__main__":
# Start key rotation in a separate thread or process
rotation_interval_seconds = 3600 # Rotate key every hour (adjust as needed)
import threading
rotation_thread = threading.Thread(target=schedule_key_rotation, args=(rotation_interval_seconds,))
rotation_thread.daemon = True
在 FastAPI 应用程序中创建端点可能是有益的,特别是如果你需要获取 CSRF 令牌以进行客户端操作,例如 AJAX 请求或 SPA(单页应用程序)交互。此端点将根据请求生成并返回 CSRF 令牌。/csrftoken/
from fastapi import FastAPIfrom fastapi_csrf_protect import CsrfProtect
app = FastAPI()
csrf = CsrfProtect(app)
def get_csrf_token(csrf_protect: CsrfProtect = Depends()):
"""Endpoint to obtain a CSRF token."""
csrf_token, _ = csrf_protect.generate_csrf_tokens()
return {"csrf_token": csrf_token}
这是一个示例,它结合了上面列出的如何将此扩展集成到 FastAPI API 的登录表单中,以保护 POST 处理程序。/login/
您可以稍后修改端点的 URL 路径。
import osimport random
import string
import threading
import time
from fastapi import FastAPI, Request, Depends
from fastapi.responses import JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi_csrf_protect import CsrfProtect
from fastapi_csrf_protect.exceptions import CsrfProtectError
from pydantic import BaseModel
app = FastAPI()
templates = Jinja2Templates(directory="templates")
class CsrfSettings(BaseModel):
secret_key: str = os.environ.get("SECRET_KEY", "staysecureyourescapeteam")
cookie_samesite: str = "none"
def get_csrf_config():
return CsrfSettings()
def generate_secret_key(length=32):
"""Generate a random string of letters and digits for the secret key."""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(length))
def rotate_secret_key():
"""Rotate the secret key."""
new_secret_key = generate_secret_key()
os.environ["SECRET_KEY"] = new_secret_key
print("Secret key rotated.")
def schedule_key_rotation(interval_seconds):
"""Schedule secret key rotation at regular intervals."""
while True:
def get_csrf_token(csrf_protect: CsrfProtect = Depends()):
"""Endpoint to obtain a CSRF token."""
csrf_token, _ = csrf_protect.generate_csrf_tokens()
return {"csrf_token": csrf_token}
def form(request: Request, csrf_protect: CsrfProtect = Depends()):
Returns form template.
csrf_token, signed_token = csrf_protect.generate_csrf_tokens()
response = templates.TemplateResponse(
"form.html", {"request": request, "csrf_token": csrf_token}
csrf_protect.set_csrf_cookie(signed_token, response)
return response
@app.post("/login", response_class=JSONResponse)
async def create_post(request: Request, csrf_protect: CsrfProtect = Depends()):
Creates a new Post
await csrf_protect.validate_csrf(request)
response: JSONResponse = JSONResponse(status_code=200, content={"detail": "OK"})
csrf_protect.unset_csrf_cookie(response) # prevent token reuse
return response
Handles any validation errors that show up
def csrf_protect_exception_handler(request: Request, exc: CsrfProtectError):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})
if __name__ == "__main__":
# Start key rotation in a separate thread
rotation_interval_seconds = 3600 # Rotate key every hour (adjust as needed)
rotation_thread = threading.Thread(target=schedule_key_rotation, args=(rotation_interval_seconds,))
rotation_thread.daemon = True
# Run FastAPI app
import uvicorn
uvicorn.run(app, host="", port=8000)
使用 从环境变量中检索 。如果未找到,则使用默认值 ()。SECRET_KEY
此设置有助于保护你的 FastAPI API 免受 CSRF 攻击。
总之,保护使用 FastAPI 构建的 API 是一项多方面的任务,需要一种全面的方法来降低潜在风险并确保数据的机密性、完整性和可用性。通过实施强大的身份验证和授权机制、验证和清理用户输入以及速率限制,开发人员可以为 API 保护奠定坚实的基础。利用 FastAPI 库(如 ),或简化这些安全措施与 FastAPI API 的集成。fastapi-csrf-protect
定期安全审计、使用 API 安全工具(如支持 FastAPI 等框架的 Escape)进行自动化 API 安全测试,并随时了解 FastAPI 中新兴的安全实践,对于确保 API 安全至关重要。