
Python调用Google Bard API 完整指南
近年来,微服务架构因其能够创建可扩展、可维护且灵活的应用程序而广受欢迎。在这篇博文中,我们将探讨微服务架构的概念,并演示如何使用 Python 生态系统中的两个强大工具 FastAPI 和 RabbitMQ 构建简单的微服务。
整体式架构就像一个单一的大型计算网络,具有一个代码库,将所有业务关注点耦合在一起。把它想象成一个巨大的冰川结构,将应用程序的所有组件都放在一个屋檐下。在整体式架构中进行更改需要更新整个堆栈,这可能非常耗时且具有限制性。您可以在下图中看到整体式架构的示例。
另一方面,微服务架构是一种将应用程序划分为更小、可独立部署的服务的方法。每个服务都有自己的业务逻辑和数据库,并通过轻量级协议与其他服务通信。这种方法可以缩短开发周期、简化维护并提高可扩展性。
RabbitMQ 是一个实现高级消息队列协议 (AMQP) 的消息代理。它充当分布式系统各个组件的中间人,使它们能够高效地通信和协调任务。以下是 RabbitMQ 在微服务架构中被广泛使用的原因:
现在我们已经介绍了微服务的基础知识,让我们更深入地研究如何编写我们的第一个微服务。我们将探索如何使用 Python、FastAPI、RabbitMQ 和 PostgreSQL 设计和实现微服务架构。本动手指南将引导您设置每个组件、设计微服务交互以及部署微服务以创建功能齐全的应用程序。让我们开始吧!
我们的应用程序包括四项主要服务:
在开始之前,请确保满足以下先决条件:
要使用 Docker 安装 PostgreSQL,请运行以下命令:
docker run --name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres--name postgres-db -e POSTGRES_PASSWORD=mysecretpassword -d postgres
要使用 Docker 安装 RabbitMQ,请运行以下命令:
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management--主机名我的兔子--名称一些兔子-p 5672 : 5672 -p 15672 : 15672 rabbitmq: 3-管理
演示
│
│
│
│env
│
...
在本节中,我们将重点介绍如何实现网关服务,该服务充当微服务架构中所有传入请求的入口点。网关服务负责将请求路由到适当的微服务并处理应用程序的整体编排。
现在,让我们实现网关服务。使用以下代码在 gateway/
目录中创建一个 main.py
文件:
from fastapi import FastAPI, HTTPException , File, UploadFile
import fastapi as _fastapi
from fastapi.security import OAuth2PasswordBearer
from dotenv import load_dotenv
from jwt.exceptions import DecodeError
from pydantic import BaseModel
import requests
import base64
import pika
import logging
import os
import jwt
import rpc_client
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Load environment variables
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Retrieve environment variables
JWT_SECRET = os.environ.get("JWT_SECRET")
AUTH_BASE_URL = os.environ.get("AUTH_BASE_URL")
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL)) # add container name in docker
channel = connection.channel()
channel.queue_declare(queue='gatewayservice')
channel.queue_declare(queue='ocr_service')
# JWT token validation
async def jwt_validation(token: str = _fastapi.Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except DecodeError:
raise HTTPException(status_code=401, detail="Invalid JWT token")
# Pydantic models for request body
class GenerateUserToken(BaseModel):
username: str
password: str
class UserCredentials(BaseModel):
username: str
password: str
class UserRegisteration(BaseModel):
name: str
email: str
password: str
class GenerateOtp(BaseModel):
email: str
class VerifyOtp(BaseModel):
email: str
otp: int
# Authentication routes
@app.post("/auth/login", tags=['Authentication Service'])
async def login(user_data: UserCredentials):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/token", json={"username": user_data.username, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/register", tags=['Authentication Service'])
async def registeration(user_data:UserRegisteration):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users", json={"name":user_data.name,"email": user_data.email, "password": user_data.password})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/generate_otp", tags=['Authentication Service'])
async def generate_otp(user_data:GenerateOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/generate_otp", json={"email":user_data.email})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
@app.post("/auth/verify_otp", tags=['Authentication Service'])
async def verify_otp(user_data:VerifyOtp):
try:
response = requests.post(f"{AUTH_BASE_URL}/api/users/verify_otp", json={"email":user_data.email ,"otp":user_data.otp})
if response.status_code == 200:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
except requests.exceptions.ConnectionError:
raise HTTPException(status_code=503, detail="Authentication service is unavailable")
# ml microservice route - OCR route
@app.post('/ocr' , tags=['Machine learning Service'] )
def ocr(file: UploadFile = File(...),
payload: dict = _fastapi.Depends(jwt_validation)):
# Save the uploaded file to a temporary location
with open(file.filename, "wb") as buffer:
buffer.write(file.file.read())
ocr_rpc = rpc_client.OcrRpcClient()
with open(file.filename, "rb") as buffer:
file_data = buffer.read()
file_base64 = base64.b64encode(file_data).decode()
request_json = {
'user_name':payload['name'],
'user_email':payload['email'],
'user_id':payload['id'],
'file': file_base64
}
# Call the OCR microservice with the request JSON
response = ocr_rpc.call(request_json)
# Delete the temporary image file
os.remove(file.filename)
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5001, reload=True)
要为网关设置环境,请在网关文件夹中创建一个 .env
文件。
AUTH_BASE_URL=http://0.0.0.0:5000
JWT_SECRET=e56623570e0a0152989fd38e13da9cd6eb7031e4e039e939ba845167ee59b496
RABBITMQ_URL=localhost
为了与其他微服务通信,我们将使用 RabbitMQ,这是一个支持服务之间异步消息传递的消息代理。我们将在 gateway/
目录中创建一个 rpc_client.py
文件来处理与 RabbitMQ 服务器的通信。
import pika
import uuid
import json
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
class OcrRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_URL))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, message):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='ocr_service',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
while self.response is None:
self.connection.process_data_events()
response_json = json.loads(self.response)
return response_json
此代码定义了一个客户端类 OcrRpcClient
,用于使用 RabbitMQ 向 OCR 微服务(ML 微服务)发送消息。它初始化连接,为响应设置回调队列,并提供异步发送消息和接收响应的方法。
__init__
):建立与 RabbitMQ 的连接。创建通道并声明唯一的回调队列。设置使用者以侦听回调队列上的响应。
2. 发送请求(呼叫
):
向 OCR 微服务(ML 微服务)发送消息。等待回调队列上的响应并返回它。
此类使网关服务能够使用 RabbitMQ 高效地与 OCR 微服务通信。
在本节中,我们将重点介绍如何实现身份验证服务,该服务充当微服务架构中所有与身份验证相关的请求的入口点。身份验证服务负责用户身份验证、注册和 OTP 生成。现在,让我们实现身份验证服务。
在 auth/ 目录下创建一个 main.py 文件,代码如下:
此代码使用 FastAPI 实现身份验证服务,用于用户注册、登录、JWT 令牌生成、使用 OTP 的电子邮件验证和用户资料检索。它使用 SQLAlchemy 进行数据库作,使用 RabbitMQ 发送 OTP 电子邮件。该服务包括用于创建用户、生成 JWT 令牌、检索用户资料和验证 OTP 以进行电子邮件验证的端点。
from typing import List
from fastapi import HTTPException
import fastapi as _fastapi
import schemas as _schemas
import sqlalchemy.orm as _orm
import models as _models
import service as _services
import logging
import database as _database
import pika
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue='email_notification')
def get_db():
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
app = _fastapi.FastAPI()
logging.basicConfig(level=logging.INFO)
_models.Base.metadata.create_all(_models.engine)
@app.post("/api/users" , tags = ['User Auth'])
async def create_user(
user: _schemas.UserCreate,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
db_user = await _services.get_user_by_email(email=user.email, db=db)
if db_user:
logging.info('User with that email already exists')
raise _fastapi.HTTPException(
status_code=200,
detail="User with that email already exists")
user = await _services.create_user(user=user, db=db)
return _fastapi.HTTPException(
status_code=201,
detail="User Registered, Please verify email to activate account !")
# Endpoint to check if the API is live
@app.get("/check_api")
async def check_api():
return {"status": "Connected to API Successfully"}
@app.post("/api/token" ,tags = ['User Auth'])
async def generate_token(
#form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),
user_data: _schemas.GenerateUserToken,
db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.authenticate_user(email=user_data.username, password=user_data.password, db=db)
if user == "is_verified_false":
logging.info('Email verification is pending. Please verify your email to proceed. ')
raise _fastapi.HTTPException(
status_code=403, detail="Email verification is pending. Please verify your email to proceed.")
if not user:
logging.info('Invalid Credentials')
raise _fastapi.HTTPException(
status_code=401, detail="Invalid Credentials")
logging.info('JWT Token Generated')
return await _services.create_token(user=user)
@app.get("/api/users/me", response_model=_schemas.User , tags = ['User Auth'])
async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):
return user
@app.get("/api/users/profile", tags=['User Auth'])
async def get_user(email: str, db: _orm.Session = _fastapi.Depends(_services.get_db)):
return db.query(_models.User and _models.Address).filter_by(id=1).first()
@app.post("/api/users/generate_otp", response_model=str, tags=["User Auth"])
async def send_otp_mail(userdata: _schemas.GenerateOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db)
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if user.is_verified:
raise _fastapi.HTTPException(status_code=400, detail="User is already verified")
# Generate and send OTP
otp = _services.generate_otp()
print(otp)
_services.send_otp(userdata.email, otp, channel)
# Store the OTP in the database
user.otp = otp
db.add(user)
db.commit()
return "OTP sent to your email"
@app.post("/api/users/verify_otp", tags=["User Auth"])
async def verify_otp(userdata: _schemas.VerifyOtp, db: _orm.Session = _fastapi.Depends(_services.get_db)):
user = await _services.get_user_by_email(email=userdata.email, db=db )
if not user:
raise _fastapi.HTTPException(status_code=404, detail="User not found")
if not user.otp or user.otp != userdata.otp:
raise _fastapi.HTTPException(status_code=400, detail="Invalid OTP")
# Update user's is_verified field
user.is_verified = True
user.otp = None # Clear the OTP
db.add(user)
db.commit()
return "Email verified successfully"
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)
在 auth/ 目录下创建一个 database.py 文件,代码如下:
此代码设置用于连接到 PostgreSQL 数据库的 SQLAlchemy 引擎和会话创建器。它使用 dotenv
从环境变量加载数据库连接详细信息。DATABASE_URL
是使用检索到的环境变量(包括主机、数据库名称、用户名和密码)构建的。该引擎是使用 create_engine
和 DATABASE_URL
创建的,并且 SessionLocal
被定义为绑定到此引擎的会话创建者。Base
变量被初始化为用于定义 ORM 模型的声明性基。
import sqlalchemy as _sql
import sqlalchemy.ext.declarative as _declarative
import sqlalchemy.orm as _orm
from dotenv import load_dotenv
import os
# Load environment variables from .env file
load_dotenv()
# Retrieve environment variables
postgres_host = os.environ.get("POSTGRES_HOST")
postgres_db = os.environ.get("POSTGRES_DB")
postgres_user = os.environ.get("POSTGRES_USER")
postgres_password = os.environ.get("POSTGRES_PASSWORD")
# Assuming your PostgreSQL server is running locally with a database named 'mydatabase'
DATABASE_URL = f"postgresql://{postgres_user}:{postgres_password}@{postgres_host}/{postgres_db}"
engine = _sql.create_engine(DATABASE_URL)
SessionLocal = _orm.sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = _declarative.declarative_base()
在 auth/ 目录下创建一个 models.py 文件,代码如下:
此代码为 User
和 Address
表定义 SQLAlchemy 模型 ,存储用户信息和地址以及它们之间的关系。它还使用提供的引擎在数据库中创建表。
import datetime as _dt
import sqlalchemy as _sql
import sqlalchemy.orm as _orm
import passlib.hash as _hash
from database import Base , engine
import database as _database
Base.metadata.create_all(engine)
class User(_database.Base):
__tablename__ = "users"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
name = _sql.Column(_sql.String)
email = _sql.Column(_sql.String, unique=True, index=True)
is_verified = _sql.Column(_sql.Boolean , default=False)
otp = _sql.Column(_sql.Integer)
hashed_password = _sql.Column(_sql.String)
addresses = _orm.relationship("Address", back_populates="user")
date_created = _sql.Column(_sql.DateTime, default=_dt.datetime.utcnow)
def verify_password(self, password: str):
return _hash.bcrypt.verify(password, self.hashed_password)
class Address(_database.Base):
__tablename__ = "addresses"
id = _sql.Column(_sql.Integer, primary_key=True, index=True)
street = _sql.Column(_sql.String)
landmark = _sql.Column(_sql.String)
city = _sql.Column(_sql.String)
country = _sql.Column(_sql.String)
pincode = _sql.Column(_sql.String)
user_id = _sql.Column(_sql.Integer, _sql.ForeignKey("users.id"))
user = _orm.relationship("User", back_populates="addresses")
latitude = _sql.Column(_sql.Float)
longitude = _sql.Column(_sql.Float)
在 auth/ 目录下创建一个 schemas.py 文件,代码如下:
此代码为 用户相关的数据结构定义 Pydantic 模型,包括用户创建、身份验证和 OTP 验证。它还包括用于位置信息的地址模型。这些模型配置为从字典属性自动创建实例。
import datetime
import pydantic
class UserBase(pydantic.BaseModel):
name: str
email: str
class Config:
from_attributes=True
class UserCreate(UserBase):
password: str
class Config:
from_attributes=True
class User(UserBase):
id: int
date_created: datetime.datetime
class Config:
from_attributes=True
class AddressBase(pydantic.BaseModel):
street: str
landmark: str
city: str
country: str
pincode: str
latitude: float
longitude: float
class Config:
from_attributes=True
class GenerateUserToken(pydantic.BaseModel):
username: str
password: str
class Config:
from_attributes=True
class GenerateOtp(pydantic.BaseModel):
email: str
class VerifyOtp(pydantic.BaseModel):
email: str
otp: int
在 auth/ 目录下创建一个 service.py 文件,代码如下:
此代码定义了用于用户身份验证和 OTP(一次性密码)生成和验证的各种函数和依赖项。它使用 FastAPI 来处理 HTTP 请求,使用 SQLAlchemy 进行数据库作,使用 Pydantic 进行数据验证和序列化,使用 JWT 进行身份验证,使用 RabbitMQ 发送电子邮件通知。这些功能包括创建数据库、获取数据库会话、创建新用户、对用户进行身份验证、创建 JWT 令牌、从 JWT 令牌获取当前用户、生成随机 OTP、连接到 RabbitMQ 以及发送 OTP 电子邮件通知。
import jwt
import sqlalchemy.orm as _orm
import passlib.hash as _hash
import email_validator as _email_check
import fastapi as _fastapi
import fastapi.security as _security
from passlib.hash import bcrypt
import database as _database
import schemas as _schemas
import models as _models
import random
import json
import pika
import time
import os
# Load environment variables
JWT_SECRET = os.getenv("JWT_SECRET")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")
oauth2schema = _security.OAuth2PasswordBearer("/api/token")
def create_database():
# Create database tables
return _database.Base.metadata.create_all(bind=_database.engine)
def get_db():
# Dependency to get a database session
db = _database.SessionLocal()
try:
yield db
finally:
db.close()
async def get_user_by_email(email: str, db: _orm.Session):
# Retrieve a user by email from the database
return db.query(_models.User).filter(_models.User.email == email and _models.User.is_verified==True).first()
async def create_user(user: _schemas.UserCreate, db: _orm.Session):
# Create a new user in the database
try:
valid = _email_check.validate_email(user.email)
name = user.name
email = valid.email
except _email_check.EmailNotValidError:
raise _fastapi.HTTPException(status_code=404, detail="Please enter a valid email")
user_obj = _models.User(email=email, name=name, hashed_password=_hash.bcrypt.hash(user.password))
db.add(user_obj)
db.commit()
db.refresh(user_obj)
return user_obj
async def authenticate_user(email: str, password: str, db: _orm.Session):
# Authenticate a user
user = await get_user_by_email(email=email, db=db)
if not user:
return False
if not user.is_verified:
return 'is_verified_false'
if not user.verify_password(password):
return False
return user
async def create_token(user: _models.User):
# Create a JWT token for authentication
user_obj = _schemas.User.from_orm(user)
user_dict = user_obj.model_dump()
del user_dict["date_created"]
token = jwt.encode(user_dict, JWT_SECRET, algorithm="HS256")
return dict(access_token=token, token_type="bearer")
async def get_current_user(db: _orm.Session = _fastapi.Depends(get_db), token: str = _fastapi.Depends(oauth2schema)):
# Get the current authenticated user from the JWT token
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
user = db.query(_models.User).get(payload["id"])
except:
raise _fastapi.HTTPException(status_code=401, detail="Invalid Email or Password")
return _schemas.User.from_orm(user)
def generate_otp():
# Generate a random OTP
return str(random.randint(100000, 999999))
def connect_to_rabbitmq():
# Connect to RabbitMQ
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_URL))
return connection
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
time.sleep(5)
def send_otp(email, otp, channel):
# Send an OTP email notification using RabbitMQ
connection = connect_to_rabbitmq()
channel = connection.channel()
message = {'email': email,
'subject': 'Account Verification OTP Notification',
'other': 'null',
'body': f'Your OTP for account verification is: {otp} \n Please enter this OTP on the verification page to complete your account setup. \n If you did not request this OTP, please ignore this message.\n Thank you '
}
try:
queue_declare_ok = channel.queue_declare(queue='email_notification', passive=True)
current_durable = queue_declare_ok.method.queue
if current_durable:
if queue_declare_ok.method.queue != current_durable:
channel.queue_delete(queue='email_notification')
channel.queue_declare(queue='email_notification', durable=True)
else:
channel.queue_declare(queue='email_notification', durable=True)
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OTP email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
finally:
channel.close()
connection.close()
在本节中,我们将重点介绍如何实现机器学习服务,该服务作为微服务架构中所有 OCR(光学字符识别)相关请求的入口点。机器学习服务负责处理 OCR 请求、从图像中提取文本以及在完成时发送通知。
现在,让我们实现机器学习服务。
在 ml_services/ 目录下创建一个 main.py 文件,代码如下:
此 Python 脚本连接到 RabbitMQ 服务器,并使用来自名为“ocr_service”的队列中的消息。收到消息后,它使用 OCRService 对象对其进行处理,使用 send_email_notification 函数发送电子邮件通知,然后将响应发布到回复队列。在处理每条消息后,它确认向 RabbitMQ 传送消息。该脚本使用预取计数 1 来限制 RabbitMQ 将传送的未确认消息的数量。
import pika
import json
from utils import OCRService
from utils import send_email_notification
# Connect to RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='ocr_service')
# Callback function to process OCR requests
def on_request(ch, method, props, body):
# Initialize OCR service
ocr_service = OCRService()
# Process OCR request
response = ocr_service.process_request(body)
# Send email notification
send_email_notification(response['user_email'], response['ocr_text'], channel)
# Publish response to the reply queue
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=json.dumps(response))
# Acknowledge the message delivery
ch.basic_ack(delivery_tag=method.delivery_tag)
# Set prefetch count to 1
channel.basic_qos(prefetch_count=1)
# Consume messages from the 'ocr_service' queue
channel.basic_consume(queue='ocr_service', on_message_callback=on_request)
# Start consuming messages
print(" [x] Awaiting RPC requests")
channel.start_consuming()
在 ml_services/ 目录下创建一个 utils.py 文件,代码如下:
import json
import base64
import pandas as pd
#keras ocr pipeline and imports
import keras_ocr
import pika
class OCRService:
def __init__(self):
self.keras_pipeline = keras_ocr.pipeline.Pipeline()
def keras_ocr(self, image_path):
results = self.keras_pipeline.recognize([image_path])
df = pd.DataFrame(results[0], columns=['text', 'bbox'])
words = df['text'].tolist()
sentence = ' '.join(words)
return sentence
def process_request(self, message):
message_body = json.loads(message)
user_name = message_body['user_name']
user_email = message_body['user_email']
user_id = message_body['user_id']
file_base64 = message_body['file']
print(f" [x]user_id: {user_id} request recieved from gateway..")
print(f" [x]processing request for {user_name}")
# Assuming file_base64 contains the base64-encoded string
file_data = base64.b64decode(file_base64.encode())
# Write the decoded file data to a new file
with open('artifacts/decoded_file.png', 'wb') as f:
f.write(file_data)
image_path = "artifacts/decoded_file.png"
ocr_text = self.keras_ocr(image_path)
print(" [^]OCR processing done !!!")
response = {
"user_id": user_id,
"user_name": user_name,
"user_email": user_email,
"ocr_text": ocr_text
}
return response
def send_email_notification(email, ocr_text, channel):
# Send an email notification using RabbitMQ
message = {
'email': email,
'subject':'OCR Process Completed !!',
'body':f'We are pleased to inform you that the OCR (Optical Character Recognition) process for your image has been successfully completed.\n The extracted text has been processed and is now ready for use.\n\n OCR text : {ocr_text}',
'other': 'null',
}
try:
channel.basic_publish(
exchange="",
routing_key='email_notification',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
print("Sent OCR Process Completed email notification")
except Exception as err:
print(f"Failed to publish message: {err}")
在本节中,我们将重点介绍如何实现通知服务,该服务向用户发送电子邮件通知。
在 notification_service/ 目录下创建一个 main.py 文件,代码如下:
此脚本设置一个 RabbitMQ 使用者,用于侦听“email_notification”队列中的消息。收到消息时,它会 从 email_service
模块调用通知
函数来处理通知过程。如果成功,它将确认消息;否则,它将拒绝该消息并打印错误消息。
import pika
import sys
import os
import time
import email_service
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
def main():
# rabbitmq connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_URL))
channel = connection.channel()
def callback(ch, method, properties, body):
try:
err = email_service.notification(body)
if err:
ch.basic_nack(delivery_tag=method.delivery_tag)
else:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue="email_notification", on_message_callback=callback
)
print("Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
在 notification_service/ 目录下创建一个 email_service.py 文件,代码如下:
import smtplib, os, json
from email.message import EmailMessage
from dotenv import load_dotenv
from email.mime.text import MIMEText
load_dotenv()
def notification(message):
try:
message = json.loads(message)
receiver_address = message["email"]
subject = message["subject"]
body = message["body"]
other = message["other"]
sender_address = os.environ.get("GMAIL_ADDRESS")
sender_password = os.environ.get("GMAIL_PASSWORD")
# Gmail SMTP server settings
smtp_server = 'smtp.gmail.com'
smtp_port = 587
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_address, sender_password)
# Compose the email message
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = sender_address
msg['To'] = receiver_address
server.sendmail(sender_address, receiver_address, msg.as_string())
server.quit()
print("Mail Sent")
except Exception as e:
print(f"Failed to send email: {e}")
综上所述,我们成功地使用 FastAPI 和 RabbitMQ 实现了端到端的微服务架构。我们演示了如何创建用户身份验证服务、用于 OCR 处理的机器学习服务以及用于电子邮件通知的通知服务。
通过本博客,我们了解了微服务的关键概念,例如服务隔离、通过消息队列进行通信以及使用异步处理实现可扩展性和性能的好处。
上一篇: