所有文章 > 日积月累 > 使用Actix和Rust构建REST和WebSocket API
使用Actix和Rust构建REST和WebSocket API

使用Actix和Rust构建REST和WebSocket API

多年来,Web 开发的世界确实取得了长足的进步。在 90 年代末到 2000 年代初,我从各种网站上学会了如何使用 HTML、表格、随机 JavaScript 片段等构建网页。随着时间的推移,我们获得了更多复杂的服务器渲染选项,如 asp、php,然后进入 MVC 框架作品,如 Rails 和 Django。现在,我们正在编写完全基于REST API的后端,服务器只负责返回数据,而客户端则利用这些数据来填充界面。界面的构建采用了多种技术,你仍然可以使用我6-7年前所用的技术:在Rails中渲染服务器端页面,并编写jQuery代码来使内容更加动态。这种方法对于许多应用程序来说都非常有效,但当你需要对页面进行更多动态控制时,它的可扩展性就会降低。这正是React和Angular等现代前端框架发挥作用的地方。

“你为什么要说这个,”你问。“这篇文章是关于Actix和Rust的!”好问题,读者!我提到这段精彩的 Web 开发历史,因为我觉得 websockets 是一个非常强大的工具。我不认为每个案例都需要它们,我想我们在日常使用的各种应用程序中都会遇到它们。也就是说,在我在各种机构工作的职业生涯中,它并没有被大量利用。有许多框架可以深入研究以利用 Web 套接字。我过去曾通过 Primus 这样做过。ActionHeroJS是一个NodeJS API框架,提供了相对无缝的HTTP和Websocket请求处理。最近,当我在做一个副项目时,我想利用 actix 来实现其 HTTP API 功能,并集成 websockets 来更新用户的状态变化。

我最终构建的应用程序是供用户预测特定比赛或比赛的应用程序。特别是我要去的星际争霸聚会,我们会在那里进行抽奖,每场比赛我们都会提前选择谁会获胜,并达到游戏中的其他第一个里程碑。对于一项运动,你可以将其等同于第一个进球,或者第一个在棒球比赛中出现三振出局的球队,等等。我们的想法是以类似于流行的 Jackbox 游戏的方式对其进行建模,用户可以在手机上轻松加入游戏,选择他们的选择并查看排行榜。Web Sockets 在更新拨片选择状态、评分等方面的好处对我来说非常明显。

那个应用程序包含的内容比这个教程要复杂得多。相反,我将借鉴在构建那个项目时学到的经验,并基于此来构建一个部分功能。用户可以查看问题列表,也可以创建问题。当有新问题创建时,通过WebSocket连接的用户将接收到这些问题。虽然功能相对简单,但它涵盖了许多相同的概念。

注意

本文希望您以前使用过 Rust,或者至少有一些工作知识,并且您以前使用过后端 Web API。因为我在这里并没有真正深入研究这些概念。如果你对 Rust 感兴趣,我建议你去看看官方的书:https://doc.rust-lang.org/book/。这个关于实现链表的教程也很有趣 https://rust-unofficial.github.io/too-many-lists/index.html,并且确实触及了 Rust 中的一些初学者概念。

其次,我以这样一种方式编写了代码,即代码并不总是包含完整的代码段。您需要的所有代码都将在此处编写,但是在重新访问文件时,我不会显示那里的所有现有代码。我在代码块中做注释,以了解何时删除内容。我还在代码注释中做了笔记,并在外部何时进行添加。我还添加了代码注释,解释了一些 Rust 的东西,以及 actix 如何注入某些参数,或者某些特征如何应用于代码。

让我们开始设置

首先,这是我的应用程序的完整服务器的当前版本:https://github.com/agmcleod/sc-predictions-server。它不是开源的,但我很高兴人们在自己的项目和爱好中使用它来参考。

其次,我想向这个仓库表示极大的敬意:https://github.com/ddimaria/rust-actix-example,它有助于帮助我弄清楚一些身份验证部分以及错误处理。

首先创建一个新文件夹 。questions-app

为了构建这个应用程序,我正在使用docker来运行数据库,然后在我的主机上编译并运行rust代码。如果您使用 Linux,则可以使用 docker 运行 rust 代码。在Windows和Mac上,我发现编译时间慢了大约20倍。在 docker 中构建需要几分钟,而在 docker 之外则更像是 10 秒。但是,如果您在主机上运行该应用程序,则需要在计算机上安装 postgres 开发工具。

如果您想执行相同的操作,请设置如下文件:questions-app/docker-compose.yml

version: "3"

services:
database:
image: "postgres:10.5"
ports:
- "5434:5432"
environment:
POSTGRES_DB: my_database
POSTGRES_PASSWORD: my_password

如果可以的话,我建议使用更新版本的 postgres,我几年前第一次开始这个项目。端口映射也取决于您,我将其设置为不同的端口,因为在 Windows 中安装 postgres 开发人员工具会启动 postgres 服务器。我发现这更容易避免端口冲突。

然后创建数据库容器,让它在后台为我们运行:

docker-compose up -d

您可以随时通过运行

docker-compose stop

docker 的 postgres 基础映像为我们创建了数据库,因此我们应该可以开始了。

对于应用程序服务器,我们会将其拆分为几个 crate。创建一个名为 的文件,然后让我们用我们的工作区信息填充它。Cargo.toml

[workspace]

members = ["server"]

让我们从服务器应用程序方面开始。

cargo new --bin server

打开并为其添加一些依赖项:server/Cargo.toml

[dependencies]
actix = "0.10.0"
actix-cors = "0.5"
actix-http = "2.1"
actix-identity = "0.3"
actix-service = "1.0.6"
actix-web = "3.2"
actix-web-actors = "3.0"
actix-rt = "1.1"
dotenv = "0.9.0"
env_logger = "0.8.2"
futures = "0.3.5"
futures-util = "0.3.5"
log = "0.4.0"

我们不需要立即需要所有这些,但我们正在指定此应用程序所需的基本 actix crate,用于处理异步代码的 futures。无论是在应用程序还是在测试中。处理环境变量,因此我们不需要对 key 和 host name 进行硬编码。还有一些很好的老式伐木。

让我们在 中设置。server/src/main.rs

#[macro_use]
extern crate log;

use std::env;

use actix_cors::Cors;
use actix_rt;
use actix_web::{http, middleware::Logger, App, HttpServer};
use dotenv::dotenv;
use env_logger;

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
env_logger::init_from_env(env_logger::Env::default().default_filter_or("debug"));

// TBD
}

前几行代码从 .env 文件中读取环境变量,以及 bash 环境变量……我们还没有设置!通常使用 docker,我会通过 compose 文件传递环境变量,但由于服务器在主机上运行,因此我使用 Makefile。如果您使用的是 Windows,则需要先安装 make。

SHELL := /bin/bash

db_url := postgres://postgres:my_password@localhost:5434/my_database

run_server:
DATABASE_URL=$(db_url) \
RUST_BACKTRACE=full \
cargo run --bin server

.PHONY: run_server

所以现在当我们运行 时,它将使用我们需要的变量运行。我偷偷地加入了 a,这样我们就可以更容易地跟踪错误堆栈。通过该设置,我们将以下内容添加到 main() 的其余部分make run_serverRUST_BACKTRACE

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
env_logger::init_from_env(env_logger::Env::default().default_filter_or("debug"));
HttpServer::new(move || {
let cors = Cors::default()
.allowed_origin(&env::var("CLIENT_HOST").unwrap())
.allow_any_method()
.allowed_headers(vec![
http::header::AUTHORIZATION,
http::header::ACCEPT,
http::header::CONTENT_TYPE,
])
.max_age(3600);

App::new()
.wrap(cors)
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
})
.bind("0.0.0.0:8080")?
.run()
.await
}

我们构建 Web 服务器,并向其传递许多服务。首先,我们只允许前端的主机使用 CORS,并限制允许的标头。然后我们创建应用程序服务器,设置日志记录,但还没有路由或中间件。最后,服务器在端口 8080 上启动。

Running 现在应该编译并运行 actix Web 服务器。第一次编译和构建所有依赖项需要一些时间。完成后,打开浏览器并访问 http://localhost:8080 进行验证。您应该会看到一个空白页面和一个 404 http 响应。make run_server

获取所有问题

现在,我们需要开始构建一个返回问题列表的 GET 终端节点。

创建我们的 db crate:

cargo new --lib db

如输出所示,将其添加到我们的工作区 Cargo.toml 文件中。

[workspace]

members = ["db", "server"]

对于我们的数据库模型,我们将使用 diesel。以及用于连接池的 r2d2、用于日期时间的 chrono 和用于将模型序列化为 JSON 的 serde。

将以下内容添加到db/Cargo.toml

chrono = { version = "0.4.6", features = ["serde"] }
diesel = { version = "1.4.4", features = ["postgres", "r2d2", "uuid", "chrono"] }
env_logger = "0.5.13"
log = "0.4.0"
r2d2 = "0.8.2"
r2d2_postgres = "0.14.0"
serde = "1.0.80"
serde_derive = "1.0.115"
serde_json = "1.0.13"

在主机上安装 diesel-cli:

cargo install diesel_cli --no-default-features --features postgres

然后运行 setup 命令,这将创建我们的初始迁移。

DATABASE_URL=postgres://postgres:my_password@localhost:5434/my_database diesel setup --migration-dir=db/migrations

让我们创建一个迁移来创建我们的 questions 表。向我们的 Makefile 添加一些有用的命令

create_migration:
	DATABASE_URL=$(db_url) diesel migration generate $(name) --migration-dir=db/migrations

migrate:
DATABASE_URL=$(db_url) diesel migration run --migration-dir=db/migrations

redo_migrate:
DATABASE_URL=$(db_url) diesel migration redo --migration-dir=db/migrations

现在创建迁移

make create_migration name=create_questions

Diesel 的迁移机制包含两个文件:up.sql 和 down.sql。当你想要应用更改时,会执行 up.sql;而当你想要撤销更改时,会执行 down.sql。打开刚刚创建的 up.sql 文件(注意这里应为 up.sql 而不是 up.yml,因为 Diesel 使用 SQL 文件进行迁移),并在其中填写以下内容:

CREATE TABLE questions (
  id SERIAL PRIMARY KEY,
body TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);

SELECT diesel_manage_updated_at('questions');

该函数会应用一个触发器,以确保每当记录发生更改时updated_at都会为我们填充该触发器。现在,使用 drop table 语句填充 down.yml 文件:diesel_manage_updated_at

DROP TABLE questions;

在运行迁移之前,请打开文件,并将内容更改为:diesel.toml

# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "db/src/schema.rs"

这指定了我们希望用于生成的 schema.rs 文件的位置。

运行迁移!

make migrate

完成后,将创建 db/src/schema.rs 文件。请勿编辑此文件,因为 diesel 使用它来管理数据库架构的状态。

要设置我们的数据库模型,请创建一些新文件:

mkdir -p db/src/models
touch db/src/models/mod.rs
touch db/src/models/question.rs

打开并初始化我们将要创建的模块,并为我们的矿池设置一个类型:db/src/lib.rs

use std::env;

use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};

pub mod models;
pub mod schema;

pub type PgPool = Pool<ConnectionManager<PgConnection>>;

pub fn get_conn(pool: &PgPool) -> PooledConnection<ConnectionManager<PgConnection>> {
pool.get().unwrap()
}

pub fn new_pool() -> PgPool {
// again using our environment variable
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let manager = ConnectionManager::<PgConnection>::new(database_url);

Pool::builder()
.build(manager)
.expect("failed to create db pool")
}

对于获取连接,立即调用 unwrap() 并不是最好的选择,但我们稍后会改进它。

我们定义了一个可重用的类型,因此我们不需要在每个路由处理程序中导入这些类型。从那里,new_pool() 函数使用环境变量建立连接管理器,并创建一个 r2d2 池来管理连接。PgPool

打开 以定义问题模块。db/src/models/mod.rs

mod question;

pub use self::question::*;

以这种方式定义一个模块,然后调用 允许我们从 models 模块中重新导出 question 模块中的定义。在应用程序中,我们将能够使用 .而不是。pub useuse db::models::Questionuse db::models::question::Question

打开模型文件,让我们设置模型。首先添加我们需要的导入:

// question.rs
use chrono::{DateTime, Utc};
use diesel::{PgConnection, QueryDsl, RunQueryDsl};
use serde::{Deserialize, Serialize};

use crate::schema::questions;

您可能会看到引用 的编译器错误,我们会尽快修复它!crate::schema

创建一个结构体,并从Diesel中派生类型,以便它能够支持我们所需的查询类型和序列化

#[derive(Debug, Identifiable, Serialize, Deserialize, Queryable)]
pub struct Question {
pub id: i32,
pub body: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

需要注意的重要一点是,结构体中的字段需要与它们在 schema.rs 文件中定义的顺序相匹配。这也将匹配它们在迁移文件中定义的顺序。因此,如果要在新的迁移中删除并重新添加它,则需要将结构移动到底部。这可能是 Diesel 让我烦恼的一件事,但我意识到鉴于这项工作的棘手性,没有一个简单的答案。bodybody

接下来,向 Question 结构体添加一个函数,以获取所有记录。首先通过添加 macro_use 进行更新。这样我们就可以从 schema 模块中提取数据。Diesel 将自动加载 schema.rs 文件,这允许它在项目范围内定义该文件中使用的宏。db/src/lib.rs

// db/src/lib.rs
#[macro_use]
extern crate diesel;

然后再次打开 question.rs

// Update the use list from before
use diesel::{PgConnection, QueryDsl, Queryable, RunQueryDsl};

impl Question {
pub fn get_all(conn: &PgConnection) -> Result<Vec<Question>, diesel::result::Error> {
use crate::schema::questions::dsl::{body, questions};

let all_questions = questions.order(body).load::<Question>(conn)?;

Ok(all_questions)
}
}

这里的代码拉取了用于查询 questions 表的 dsl 和 body 字段,以便我们可以按它进行排序。使用 ?运算符立即返回 Diesel 错误(如果发生)。当事情进展顺利时,我们将 包装在 Ok 结果中。Vec<Question>

好了,现在让我们设置一个返回问题的路由。首先将我们的 db crate 添加到服务器的 Cargo.toml 中

db = { path = "../db" }
mkdir -p server/src/routes/
touch server/src/routes/mod.rs
mkdir -p server/src/routes/questions
touch server/src/routes/questions/mod.rs
touch server/src/routes/questions/get_all.rs

好多文件啊!如果你愿意,你可以把所有东西都放在 routes/mod.rs 中,但我更喜欢将它们分开,这样我们就不会挤满我们的路由文件。

添加我们的模块定义

// server/src/main.rs

mod routes;
// server/src/routes/mod.rs

pub mod questions;
// server/src/routes/questions/mod.rs

mod get_all;

// re-export everything under get_all to be part of questions
pub use self::get_all::*;

打开 get_all.rs,让我们创建一个路由来返回数据库中的所有问题。

use actix_web::{
    error::Error,
web::{block, Data, Json},
Result,
};

use db::{get_conn, models::Question, PgPool};

pub async fn get_all(pool: Data<PgPool>) -> Result<Json<Vec<Question>>, Error> {
let connection = get_conn(&pool);

let questions = block(move || Question::get_all(&connection)).await?;

Ok(Json(questions))
}

这汇集了我们目前设置的所有内容。我们添加依赖项,并设置我们的路由处理程序。值得注意的是,这个论点之所以有效,是因为actix允许共享应用程序状态和数据。接下来,我们将把 pool 的实例传递给 ,这允许在我们的路由处理程序函数中将其作为参数进行访问。

它使用 Question 模型获取所有记录。由于我们将 serde Serialize 应用于 Question 模型,因此可以很容易地将其转换为 JSON。对于我们的错误类型,这里我们只使用通用的 actix 错误类型。block() 的原因是数据库操作(即 IO 阻塞功能)发生在单独的线程上。block 返回一个 future,这意味着我们可以使用它来等待它完成,然后再返回响应。创建 Ok 结果并包装为 Json 将通过 API 将我们的数据数组作为 JSON 返回。await

打开 创建一个路由配置,并将其添加到其中:server/src/routes/mod.rs

use actix_web::web;

pub fn routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/api")
.service(web::scope("/questions").route("", web::get().to(questions::get_all))),
);
}

在这个函数中,我们将能够向它添加各种路由。这将设置一个没有默认值的顶级 /api,然后在其下方设置一个 /questions。我们将 get_all指定为 GET 请求。

打开我们的 server/src/main.rs 文件,将路由附加到 App,并添加一个数据库池。

// at the top of main()
let pool = db::new_pool();

// inside HttpServer::new block
App::new()
.wrap(cors)
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.data(pool.clone()) // <---- the news line here
.configure(routes::routes) // <----

现在我们已经把所有这些零碎的东西放在一起,让我们编写一些测试。在 中创建一个新文件,以便我们可以定义一些测试帮助程序。server/src/tests.rs

首先设置一个模块和我们的依赖项。我们添加了 cfg(test) 属性,以便此模块仅在 tests 中编译。

// server/src/main.rs
#[cfg(test)]
mod tests;
// server/src/tests.rs
use actix_http::Request;
use actix_service::Service;
use actix_web::{body::Body, dev::ServiceResponse, error::Error, test, App};
use serde::de::DeserializeOwned;

use crate::routes::routes;

这些依赖项将用于两个函数。一个用于创建用于单个请求的服务,另一个用于调用我们的 get 请求。

pub async fn get_service(
) -> impl Service<Request = Request, Response = ServiceResponse<Body>, Error = Error> {
test::init_service(App::new().data(db::new_pool()).configure(routes)).await
}

init_service返回 Service trait 的实例,这需要我们定义许多泛型类型。因此,为此,我们从 Actix 中提取了一些实现这些特征的类型。此函数使用 init_service 使用我们的路由创建 App 的新实例。

现在将以下内容添加到模块中:

pub async fn test_get<R>(route: &str) -> (u16, R)
where
R: DeserializeOwned,
{
let mut app = get_service().await;
let req = test::TestRequest::get().uri(route);
let res = test::call_service(&mut app, req.to_request()).await;

let status = res.status().as_u16();
let body = test::read_body(res).await;
let json_body = serde_json::from_slice(&body).unwrap_or_else(|_| {
panic!(
"read_response_json failed during deserialization. response: {} status: {}",
String::from_utf8(body.to_vec())
.unwrap_or_else(|_| "Could not convert Bytes -> String".to_string()),
status
)
});

(status, json_body)
}

这将使用 get_service 创建应用程序服务,然后创建我们的 get 请求。为了执行它,我们然后调用函数,该函数接受我们的应用程序和路由来处理请求。其思路是,这仅针对单个请求运行应用程序。call_service

之后,我们获得 http 状态代码以及响应正文。这是假设我们只得到 json 响应,但确实有一些错误处理,如果反序列化出错,使用 panic 强制测试失败。

在将serde添加到server/Cargo.toml文件的过程中,你可能会遇到一些编译错误。同时,既然你已经在那里了,不妨也将diesel添加进去,因为我们在测试时会用到它。

diesel = { version = "1.4.4", features = ["postgres", "r2d2", "uuid", "chrono"] }
serde = "1.0.80"
serde_json = "1.0.13"

完成该设置后,打开 ,并将以下内容添加到底部。get_all.rs

#[cfg(test)]
mod tests{
// not a type we're using directly, but we need to pull in this trait.
// If you leave it out, you'll see a compiler error for calling the query to insert.
// The Rust compiler is pretty good about telling you what traits you're missing.
use diesel::RunQueryDsl;

use db::{
get_conn,
models::{Question},
new_pool,
schema::{questions}
};

use crate::tests;

#[actix_rt::test]
async fn test_get_all_returns_questions() {
let pool = new_pool();
let conn = get_conn(&pool);

diesel::insert_into(questions::table).values(Question {
body: "one question".to_string(),
})
.execute(&conn).unwrap();
}
}

我们这里有一些事情发生,还有一个编译器错误。

use diesel::RunQueryDsl;不是我们直接使用的类型,但我们需要引入这个 trait。如果你省略它,你会在调用 insert 时看到缺少的 trait 的编译器错误。在这些情况下,编译器会告诉你需要哪个 trait,所以你可以简单地添加 use 语句。我真的很喜欢编译器的这一点,因为我总是忘记 traits。

我想指出的另一个是 。如果您以前为 Rust 代码编写过测试,则可能已经用 .我们在这里使用这个的原因是 it 允许我们使它们异步。如果你阅读 crate 文档,你会发现它是一个“基于 Tokio 的 Actix 生态系统单线程异步运行时”。test 属性宏标记要在 Actix 系统中运行的 async 函数。#[actix_rt::test]#[test]actix_rt::test

因此,在执行此操作时,我们不能省略 ID 和记录时间戳等字段,因为结构体需要它们。values 函数接受多种传递数据的方式,但我更喜欢使用类型化结构。因此,让我们在 db crate 中创建一个新的。开门db/src/models/question.rs

// add Insertiable
use diesel::{PgConnection, Insertable, QueryDsl, Queryable, RunQueryDsl};

#[derive(Debug, Insertable)]
#[table_name = "questions"]
pub struct NewQuestion {
pub body: String,
}

在这种情况下,我们需要添加 table_name 属性,因为这里的类型与我们 table name 不匹配。否则,我们唯一真正关心的参数是 body,因此我们将其添加为字段。

现在回到我们的测试中。

#[cfg(test)]
mod tests{
use diesel::RunQueryDsl;

use db::{
get_conn,
models::{Question, NewQuestion},
new_pool,
schema::{questions}
};

use crate::tests;

#[actix_rt::test]
async fn test_get_all_returns_questions() {
let pool = new_pool();
let conn = get_conn(&pool);

diesel::insert_into(questions::table).values(NewQuestion {
body: "one question".to_string(),
})
.execute(&conn).unwrap();

let res: (u16, Vec<Question>) = tests::test_get("/api/questions").await;
assert_eq!(res.0, 200);
assert_eq!(res.1.len(), 1);
assert_eq!(res.1[0].body, "one question");
}
}

我们使用我们花哨的 new 结构创建一个问题,然后从我们的端点获取 status + 结果数组。

为了运行测试,我创建了一个单独的 docker-compose 文件,然后向 Makefile 添加了新命令。

// docker-compose.test.yml
version: "3"

services:
database_test:
image: "postgres:10.5"
ports:
- "5433:5432"
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: ""
POSTGRES_DB: my_database_test
POSTGRES_HOST_AUTH_METHOD: trust

启动测试 docker postgres 容器:然后将一些命令添加到您的 Makefile。docker-compose -f docker-compose.test.yml up -d

test_prepare:
	DATABASE_URL=postgres://root@localhost:5433/my_database_test diesel migration run --migration-dir=db/migrations

test:
docker-compose -f docker-compose.test.yml exec database_test psql -d my_database_test --c="TRUNCATE questions"
DATABASE_URL=postgres://root@localhost:5433/my_database_test \
cargo test $(T) -- --nocapture --test-threads=1

# Update the phony line:
.PHONY: run_server test test_prepare

容器启动并运行后,运行 以运行迁移,然后您可以随时运行 以运行测试。我喜欢使用 truncate 命令来确保数据库清晰,但这取决于您。最好确保在测试用例完成后清理您在测试中创建的任何数据。否则,你可能会污染其他测试。make test_preparemake test

如果一切顺利,您应该会在输出中看到类似这样的内容。服务器目标通过 1 次测试。

 Finished test [unoptimized + debuginfo] target(s) in 50.88s
     Running target\debug\deps\db-ad00c7968215382d.exe

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Running target\debug\deps\server-578da7493e677323.exe

running 1 test
test routes::questions::get_all::tests::test_get_all_returns_questions ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Doc-tests db

running 0 tests

test result: ok. 0

改进错误处理

我之前指出,获取数据库连接并立即调用 unwrap() 并不是很好。在大多数情况下,这可能很好,但如果由于某种原因连接失败怎么办?这会导致 Rust 应用程序 panic,从而停止运行。此外,随着应用程序的增长,您可能希望手动返回不同的错误代码和类型。我们将创建自己的错误类型和处理方式,以便我们可以支持这些用例。

创建一个新的 crate:

cargo new --lib errors

将其添加到 Cargo 工作区

members = ["db", "errors", "server"]

然后打开并添加我们需要的依赖项。errors/Cargo.toml

[dependencies]
actix-web = "3.2"
derive_more = "0.99.9"
diesel = { version = "1.4.4", features = ["postgres", "r2d2", "uuid", "chrono"] }
env_logger = "0.5.13"
log = "0.4.0"
r2d2 = "0.8.2"
serde = "1.0.80"
serde_json = "1.0.13"

与我们的其他箱子类似的套装,使用actix-web、diesel和r2d2进行数据库、日志记录支持和serde。这样我们就可以处理来自这些不同外部数据类型的错误。我再次非常感谢 github,我在帖子顶部列出的存储库中也有类似的设置。ddimaria

开门。您可以删除默认测试,然后开始添加以下内容。errors/src/lib.rs

#[macro_use]
extern crate log;

use actix_web::{
error::{BlockingError, ResponseError},
Error as ActixError, HttpResponse,
};
use derive_more::Display;
use diesel::result::{DatabaseErrorKind, Error as DBError};
use r2d2::Error as PoolError;
use serde::{Deserialize, Serialize};

#[derive(Debug, Display, PartialEq)]
pub enum Error {
BadRequest(String),
InternalServerError(String),
Unauthorized,
Forbidden,
NotFound(String),
PoolError(String),
BlockingError(String),
}

我们有通常的 use 语句来拉取依赖项,但我们也设置了我们的 Error 枚举。我已经添加了我们专门希望冒泡的所有类型。您可以添加更多类型,例如在我自己的应用程序中,我有 UnprocessableEntity 用于返回特定的验证错误。例如,在加入大厅时,该大厅的用户名已被占用。我喜欢这一点的是 Rust 枚举的使用。我们可以定义其中一些来携带错误消息,并且它是一个特定的类型。

对于我们的枚举,我们希望它作为 actix_web ResponseError 响应,因此请实现它:

impl ResponseError for Error {
    fn error_response(&self) -> HttpResponse {
match self {
Error::BadRequest(error) => {
HttpResponse::BadRequest().json::<ErrorResponse>(error.into())
}
Error::NotFound(message) => {
HttpResponse::NotFound().json::<ErrorResponse>(message.into())
}
Error::Forbidden => HttpResponse::Forbidden().json::<ErrorResponse>("Forbidden".into()),
_ => {
error!("Internal server error: {:?}", self);
HttpResponse::InternalServerError()
.json::<ErrorResponse>("Internal Server Error".into())
}
}
}
}

同样,这相当灵活,最后我们只想返回某种 HttpResponse。对于 40x 错误代码,我们会更加谨慎地处理它们,其他任何内容都被视为 500。在这种情况下,记录详细信息,并且仅向 API 返回一般错误。如果生成此代码,将看到错误。

cannot find type `ErrorResponse` in this scope
not found in this scope

对于这 40 个状态代码,我们将传递自定义数据类型来指示 JSON 结构,但我们尚未定义它,让我们现在开始定义。

#[derive(Debug, Deserialize, Serialize)]
pub struct ErrorResponse {
pub errors: Vec<String>,
}

我们需要它通过serde进行序列化和反序列化,否则就很简单了。仅包含将返回给客户端的错误消息数组。

然后,我们将在 ErrorResponse 上为某些类型添加 From trait,这样 into() ErrorResponse 就可以工作了。这是通过为 、 和 类型实现 From trait 来完成的。&strStringVec<String>

impl From<&str> for ErrorResponse {
    fn from(error: &str) -> Self {
ErrorResponse {
errors: vec![error.into()],
}
}
}

impl From<&String> for ErrorResponse {
fn from(error: &String) -> Self {
ErrorResponse {
errors: vec![error.into()],
}
}
}

impl From<Vec<String>> for ErrorResponse {
fn from(error: Vec<String>) -> Self {
ErrorResponse { errors: error }
}
}

假设我们得到的 Result 类型包含其他 crate 中定义的类型,我们将希望将它们包装在我们自己的 Error 中。因为我们的错误类型将用于路由处理程序以及其他地方。让我们再次使用外部错误类型应用 From trait,并将其应用于我们的 Error。

// Convert DBErrors to our Error type
impl From<DBError> for Error {
fn from(error: DBError) -> Error {
// Right now we just care about UniqueViolation from diesel
// But this would be helpful to easily map errors as our app grows
match error {
DBError::DatabaseError(kind, info) => {
if let DatabaseErrorKind::UniqueViolation = kind {
let message = info.details().unwrap_or_else(|| info.message()).to_string();
return Error::BadRequest(message);
}
Error::InternalServerError("Unknown database error".into())
}
DBError::NotFound => Error::NotFound("Record not found".into()),
_ => Error::InternalServerError("Unknown database error".into()),
}
}
}

// Convert PoolError to our Error type
impl From<PoolError> for Error {
fn from(error: PoolError) -> Error {
Error::PoolError(error.to_string())
}
}

impl From<BlockingError<Error>> for Error {
fn from(error: BlockingError<Error>) -> Error {
match error {
BlockingError::Error(error) => error,
BlockingError::Canceled => Error::BlockingError("Thread blocking error".into()),
}
}
}

impl From<ActixError> for Error {
fn from(error: ActixError) -> Error {
Error::InternalServerError(error.to_string())
}
}

每种错误类型的代码都会将其转换为我们的枚举类型之一。

准备好后,让我们将其集成到我们的应用程序中。将依赖项添加到我们的服务器和数据库板条箱中。

# Cargo.toml
[dependencies]
errors = { path = "../errors" }

打开 ,并在我们的 get_all 中更新返回错误类型。db/src/models/question.rs

use errors::Error;

impl Question {
pub fn get_all(conn: &PgConnection) -> Result<Vec<Question>, Error> {
use crate::schema::questions::dsl::{body, questions};

let all_questions = questions.order(body).load::<Question>(conn)?;

Ok(all_questions)
}
}

因为我们使用的是 ?运算符,From trait 转换为我们完成了大部分提升工作。也在此处打开并更改错误类型。server/src/routes/questions/get_all.rs

// remove actix_web::errors::Error
use actix_web::{
web::{block, Data, Json},
Result,
};

// add our type
use errors::Error;

这并没有清理 中 unwrap() 的使用,所以让我们解决这个问题。打开,更新依赖项和get_conn如下。get_conn()db/src/lib.rs

#[macro_use]
extern crate log;

use r2d2::Error;

pub fn get_conn(pool: &PgPool) -> Result<PooledConnection<ConnectionManager<PgConnection>>, Error> {
pool.get().map_err(|err| {
error!("Failed to get connection - {}", err.to_string());
err.into()
})
}

我们添加了 for 日志,以便我们可以记录数据库连接何时失败。我们仍然在这里返回一个外部 crate 类型的错误,但我们已经在 errors crate 中处理了 pool 错误。通过此更改,我们主要希望记录错误,然后将其冒泡。macro_use

进行此更改将导致一些编译器错误,因此让我们打开 get_all.rs 并修复它们。更新 get_conn() 调用以使用 ?get_all运算符。

pub async fn get_all(pool: Data<PgPool>) -> Result<Json<Vec<Question>>, Error> {
    let connection = get_conn(&pool)?;
// etc

然后在测试中,调用 unwrap()。我们的测试不知道如何处理 result 类型,让 panic 测试失败是可以的。

#[actix_rt::test]
async fn test_get_all_returns_questions() {
let pool = new_pool();
let conn = get_conn(&pool).unwrap();
// etc

现在运行以确保我们没有导致任何回归。make test

running 1 test
test routes::questions::get_all::tests::test_get_all_returns_questions ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

耶!现在,让我们允许 API 使用者创建一些问题。

创建终端节点

让我们添加一个 create 终端节点,允许调用方创建新问题。首先,将 create 方法添加到我们的 question 模型中。打开 ,让我们将 create 函数添加到现有的 .此代码与我们在测试中创建问题时使用的代码非常相似。db/src/models/question.rbimpl Question

impl Question {
    pub fn create(conn: &PgConnection, body: &String) -> Result<Question, Error> {
use crate::schema::questions::dsl::{questions};

let question = diesel::insert_into(questions).values(NewQuestion {
body: body.clone(),
}).get_result::<Question>(conn)?;

Ok(question)
}
}

我们提取问题 dsl 作为插入目标,然后重用我们为测试创建的 NewQuestion。我们调用 get_result 来返回 insert 中的数据,而不是调用 execute(调用 insert)但不返回任何值。正如 diesel 在文档中指出的那样:

当在 insert、update 或 delete 语句上调用此方法时,除非已经指定了 returning 子句,否则它会隐式地将 RETURNING * 添加到查询中。

有了这个,我们使用 ?运算符返回正确的错误类型,如果一切正常,则返回 Ok() 结果。

现在是 POST 终端节点。

touch server/src/routes/questions/create.rs

更新 routes/questions 模块

// server/src/routes/questions/mod.rs
mod create;
pub use self::create::*;

打开新的 create.rs 文件,让我们开始吧。您可以添加我们在 get_all 中拥有的大部分相同的 use 语句。此外,我们使用 serde Serialize & Deserialize 特征,因为我们需要定义请求参数类型。

use actix_web::{
    web::{block, Data, Json},
Result,
};
use serde::{Deserialize, Serialize};

use db::{get_conn, models::Question, PgPool};
use errors::Error;


#[derive(Clone, Deserialize, Serialize)]
pub struct CreateRequest {
body: String,
}

pub async fn create(pool: Data<PgPool>, params: Json<CreateRequest>) -> Result<Json<Question>, Error> {
if params.body == "" {
return Err(Error::BadRequest("Body is required".to_string()));
}

let connection = get_conn(&pool)?;

let question = block(move || Question::create(&connection, &params.body)).await?;

Ok(Json(question))
}

create 函数看起来很熟悉,但我们为 request params 添加了第二个参数。我们通过声明要使用的结构体,将其声明为 JSON,具有一组给定的字段。如果传入的数据不是 JSON,或者它与 的结构不匹配,则 actix 将返回一个错误的请求错误。CreateRequest

如果请求看起来没问题,我们应该检查正文是否为空。如果是,我们将返回一个带有 detail 字符串的错误请求。这对于本教程的目的来说很好,但要实现更多验证,我建议查看 https://github.com/Keats/validator。

之后,我们执行与以前相同的操作,从池中获取连接,并调用新的 create 函数。

现在更新添加新路线。server/src/routes/mod.rs

注意:请确保在第一个链之后有链,以便它是调用的一部分。在构建它时,我已将其附加到 Call 的结束段落中。路线对我来说是 404ing!.route()scope()service()

use actix_web::web;

pub mod questions;

pub fn routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/api")
.service(web::scope("/questions")
.route("", web::get().to(questions::get_all))
.route("", web::post().to(questions::create))),
);
}

为了测试它,我们需要一个新的 helper 函数,因为这是一个 POST 请求。

// server/src/test.rs

// update our serde use statement
use serde::{de::DeserializeOwned, Serialize};

pub async fn test_post<T: Serialize, R>(
route: &str,
params: T,
) -> (u16, R)
where
R: DeserializeOwned,
{
let mut app = get_service().await;

let req = test::TestRequest::post().set_json(&params).uri(route);

let res = test::call_service(&mut app, req.to_request()).await;

let status = res.status().as_u16();
let body = test::read_body(res).await;
let json_body = serde_json::from_slice(&body).unwrap_or_else(|_| {
panic!(
"read_response_json failed during deserialization. response: {} status: {}",
String::from_utf8(body.to_vec())
.unwrap_or_else(|_| "Could not convert Bytes -> String".to_string()),
status
)
});

(status, json_body)
}

这与我们的 test_get 工作方式非常相似。我们设置服务,创建一个 post 请求,将参数添加为 JSON。然后我们返回状态和响应正文。

// create.rs

#[cfg(test)]
mod tests {
use diesel::{self, RunQueryDsl};

use db::{
models::{NewQuestion, Question},
get_conn,
new_pool,
schema::questions
};

use crate::tests;

#[actix_rt::test]
pub async fn test_create_question() {
let pool = new_pool();
let conn = get_conn(&pool).unwrap();

let res: (u16, Question) = tests::test_post("/api/questions", NewQuestion {
body: "A new question".to_string()
}).await;

assert_eq!(res.0, 200);
assert_eq!(res.1.body, "A new question");

let result_questions = questions::dsl::questions.load::<Question>(&conn).unwrap();
assert_eq!(result_questions.len(), 1);
assert_eq!(result_questions[0].body, "A new question");

diesel::delete(questions::dsl::questions).execute(&conn).unwrap();
}
}

添加此代码后,您将收到编译器错误:

the trait bound `db::models::question::NewQuestion: routes::questions::create::_::_serde::Serialize` is not satisfied
the trait `routes::questions::create::_::_serde::Serialize` is not implemented for `db::models::question::NewQuestion`

这表明 NewQuestion 缺少 serde Serialize trait 来通过需要 JSON 的请求来传递它。因此,打开并将 Serialize 派生添加到结构体中。db/src/models/question.rs

#[derive(Debug, Insertable, Serialize)] // add Serialize here
#[table_name = "questions"]
pub struct NewQuestion {
pub body: String,
}

该测试与我们为 get_all 进行的测试非常相似。我们得到了一个池和连接,这样我们就可以在创建记录后检查数据库,以及清理它。在这里检查状态和响应正文的工作方式相同。

运行 ,我们可以看到测试用例成功了!make test

test routes::questions::create::tests::test_create_question ... ok

现在让我们测试我们的验证代码。

// add above with the other use statements in the test module
use errors::ErrorResponse;

#[actix_rt::test]
pub async fn test_create_body_required() {
let pool = new_pool();
let conn = get_conn(&pool).unwrap();

// note here how the deserialize type is changed
let res: (u16, ErrorResponse) = tests::test_post("/api/questions", NewQuestion {
body: "".to_string()
}).await;

assert_eq!(res.0, 400);
assert_eq!(res.1.errors, vec!["Body is required"]);

let result_questions = questions::dsl::questions.load::<Question>(&conn).unwrap();
assert_eq!(result_questions.len(), 0);
}

我们使用空字符串调用同一终端节点,并将我们的类型指定为响应正文。然后检查 http 状态、错误消息的值以及是否未写入数据库记录。ErrorResponse

再次运行,您应该会看到:make test

running 3 tests
test routes::questions::create::tests::test_create_body_required ... ok
test routes::questions::create::tests::test_create_question ... ok
test routes::questions::get_all::tests::test_get_all_returns_questions ... ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

这涵盖了我们的 create 端点,让我们进入 websockets。

Websockets 浏览器

在标题中,我提到了 websockets 的使用。Actix 使我们能够使用 websocket 服务器,并使用 Actix 的 actor 系统处理请求。我们要做的是创建一个 HTTP 端点,供用户连接到 websocket。然后,在创建问题时,将问题详细信息广播给连接的所有用户。

创建我们将需要的文件和文件夹:

mkdir -p server/src/websocket
touch server/src/websocket/mod.rs
touch server/src/websocket/server.rs
// server/src/main.rs
mod websocket;
// server/src/websocket/mod.rs
mod server;
pub use self::server::*;

让我们从 .server.rs

use std::collections::HashMap;

use actix::prelude::{Message as ActixMessage, Recipient};
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Message(pub String);

#[derive(ActixMessage, Deserialize, Serialize)]
#[rtype(result = "()")]
pub struct MessageToClient {
pub msg_type: String,
pub data: Value,
}

impl MessageToClient {
pub fn new(msg_type: &str, data: Value) -> Self {
Self {
msg_type: msg_type.to_string(),
data,
}
}
}

pub struct Server {
sessions: HashMap<String, Recipient<Message>>
}

impl Server {
pub fn new() -> Self {
Server {
sessions: HashMap::new(),
}
}
}

从上到下,我们定义了 Message,以使结构体包裹在 String 周围。这是通过 websocket 发送的消息的基本知识。他们需要定义返回类型,因此我们使用 attribute 来做到这一点。.对于 this 和所有类型,我们只返回一个空值。#[rtype(result = "()")]

我们通过字符串键声明消息的类型。因此,在我们的例子中,我们将只传递 “newquestion”,因为我们将发送新的 question 数据。data 字段是 Value from serde_json 之一。因此,任何可以转换为 Value 类型的类型。从那里,我们设置一个基本的构造函数。MessageToClient

最后,我们为 WebSocket 服务器设置结构体。这将使用 String 作为键来存储当前会话,我们将为其生成 UUID。该值是发送原始 .因此,当用户连接时,我们会收到来自 HTTP 请求的 ,并将其存储在 Server 中。RecipientMessageRecipient<Message>

添加 send_message 函数:

use serde_json::{error::Result as SerdeResult, Value}; // add the Result use here

impl Server {
fn send_message(&self, data: SerdeResult<String>) {
match data {
Ok(data) => {
for recipient in self.sessions.values() {
match recipient.do_send(Message(data.clone())) {
Err(err) => {
error!("Error sending client message: {:?}", err);
}
_ => {}
}
}
}
Err(err) => {
error!("Data did not convert to string {:?}", err);
}
}
}
}

为了更轻松地从处理程序进行调用,我们传递一个 SerdeResult 类型,并让我们的服务器处理错误处理(如果有)。您可以选择作为路由处理程序执行此操作。如果您想回复用户发送 websocket 消息时出了问题,这可能是更好的方法。

现在,我们将设置一组要监听的消息:

// a number of new actix & serde imports
use actix::prelude::{Actor, Context, Handler, Message as ActixMessage, Recipient};
use serde_json::{error::Result as SerdeResult, to_string, Value};

#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Connect {
pub addr: Recipient<Message>,
pub id: String,
}

impl Handler<Connect> for Server {
type Result = ();

fn handle(&mut self, msg: Connect, _: &mut Context<Self>) {
self.sessions.insert(msg.id.clone(), msg.addr);
}
}

#[derive(ActixMessage)]
#[rtype(result = "()")]
pub struct Disconnect {
pub id: String,
}

impl Handler<Disconnect> for Server {
type Result = ();

fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
self.sessions.remove(&msg.id);
}
}

impl Handler<MessageToClient> for Server {
type Result = ();

fn handle(&mut self, msg: MessageToClient, _: &mut Context<Self>) -> Self::Result {
self.send_message(to_string(&msg));
}
}

我们基于 .对于消息,我们将获取该初始 ID 以及 .同样,我们有 a 从 HashMap 中删除了它的标识符。然后我们实现 for ,它调用我们之前实现的 helper 函数,将对象序列化为 JSON 数据。Handler trait 的原因是,我们可以将这些类型中的任何一个传递给我们的 websocket 服务器。ActixMessageConnectRecipient<Message>DisconnectHandlerMessageToClient

许多编译器错误将沿着

the trait bound `websocket::server::Server: actix::actor::Actor` is not satisfied
the trait `actix::actor::Actor` is not implemented for `websocket::server::Server`

让我们添加 trait。

impl Actor for Server {
    type Context = Context<Self>;
}

它的作用是允许 Server 作为 Actor 接收消息(通过 Handler trait)。应用这些类型后,我们可以将 Server 注册为 actix 的 Actor,这意味着我们可以将其拉入我们的路由处理程序,类似于 postgres 连接池,并向其发送消息。然后,Actix 将异步发送这些消息。

打开,以便我们可以设置 HTTP 路由以及其他一些关键部分。server/src/websocket/mod.rs

use std::time::{Duration, Instant};

use uuid::Uuid;

use actix::{
prelude::{Actor, Addr}
};

// same as before
mod server;
pub use self::server::*;

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(30);

pub struct WebSocketSession {
id: String,
hb: Instant,
server_addr: Addr<Server>,
}


impl WebSocketSession {
fn new(server_addr: Addr<Server>) -> Self {
Self {
id: Uuid::new_v4().to_string(),
hb: Instant::now(),
server_addr,
}
}

fn send_heartbeat(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
info!("Websocket Client heartbeat failed, disconnecting!");
act.server_addr.do_send(Disconnect { id: act.id.clone() });
// stop actor
ctx.stop();

// don't try to send a ping
return;
}
ctx.ping(b"");
});
}
}

我们首先设置一些检测信号速率的常量,以保持连接处于活动状态,以及我们认为的超时。然后,我们创建一个新的结构体来跟踪会话的 id (uuid) 以及检测信号和服务器地址。后者是会话与 websocket 服务器通信的方式。

我们将send_heartbeat函数设置为很快被调用,但其想法是它使用 actor 上下文以设定的间隔运行。如果检测信号没有刷新,它将向服务器发送断开连接,因此我们的服务器会清除此会话。然后,它还将停止此 actor 的上下文,以便清理会话。

uuid 是一个新的依赖项,因此请打开以将 uuid 添加为依赖项。server/Cargo.toml

uuid = { version = "0.5", features = ["serde", "v4"] }

我们遇到了编译器错误,类似于我们在 websocket 服务器上遇到的错误

the trait bound `websocket::WebSocketSession: actix::actor::Actor` is not satisfied
the trait `actix::actor::Actor` is not implemented for `websocket::WebSocketSession`

Server 的修复非常简单,但这个修复稍微复杂一些。

// update use statements again
use actix::{
fut,
prelude::{Actor, Addr},
ActorContext, AsyncContext
};
use actix_web_actors::ws;

impl Actor for WebSocketSession {
type Context = ws::WebsocketContext<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
self.send_heartbeat(ctx);

let session_addr = ctx.address();
self.server_addr
.send(Connect {
addr: session_addr.recipient(),
id: self.id.clone(),
})
.into_actor(self)
.then(|res, _act, ctx| {
match res {
Ok(_res) => {}
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
}

与 websocket 服务器类似,我们将上下文设置为 websocket 上下文,这使我们能够获得心跳间隔。然后我们设置一个函数,当 actor 启动时调用该函数。我们告诉 context 连接我们刚刚编写的 heartbeat 函数,然后向服务器发送初始的 Connect 消息。如果进展顺利,我们会返回一个 future::ready,这类似于 JavaScript 中的 a。这样,我们将返回一个 Ok() future。否则,如果出现问题,我们会立即停止上下文。startedPromise.resolve()

我们收到了关于 WebSocketSession 未实现 Handler 的错误,因此让我们来解决这个问题。

use actix::{
    fut,
prelude::{Actor, Addr, Handler},
ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, WrapFuture
};

impl Handler<Message> for WebSocketSession {
type Result = ();

fn handle(&mut self, msg: Message, ctx: &mut Self::Context) {
ctx.text(msg.0);
}
}

这次更简单一些!我们只是将消息 String 作为文本消息传递给 actor 上下文。这就是我们将消息从 websocket 服务器传递到客户端的方式。如果您还记得,该结构会将其数据传递给 Server 的 .此函数将创建 的实例 并传递给收件人上下文。然后回到这个 .MessageToClientsend_messageMessageWebSocketSession

现在我们需要实现 trait,这将允许我们处理传入 Actor 的事件流。使用 ping/pong 更新检测信号、关闭事件、错误等。StreamHandler

// yep, this again
use actix::{
fut,
prelude::{Actor, Addr, Handler, StreamHandler},
ActorContext, ActorFuture, AsyncContext, ContextFutureSpawner, WrapFuture
};

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketSession {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
self.server_addr.do_send(Disconnect {
id: self.id.clone(),
});
ctx.close(reason);
ctx.stop();
}
Err(err) => {
warn!("Error handling msg: {:?}", err);
ctx.stop()
}
_ => ctx.stop(),
}
}
}

Ping & Pong 事件用于来回基本发送数据以确认连接仍然有效。它们将检测信号更新为当前时间戳,因此检测信号间隔将知道不要关闭连接。我们还会捕获任何二进制请求,而不是真正对它们做很多事情。如果收到 close 请求,我们将向服务器发送 Disconnect 并关闭 WebSocketSession。

此文件的最后一段代码,我们需要添加 http 请求处理程序以启动新会话。

use actix_web::{web, HttpRequest, HttpResponse};

use errors::Error;

pub async fn ws_index(
req: HttpRequest,
stream: web::Payload,
server_addr: web::Data<Addr<Server>>,
) -> Result<HttpResponse, Error> {
let res = ws::start(
WebSocketSession::new(server_addr.get_ref().clone()),
&req,
stream,
)?;

Ok(res)
}

它使用 actix websocket crate 来启动一个新的 actor。我们使用数据中的服务器角色地址进行传递,为创建角色传递请求和流信息。

打开并连接 Web 服务器!server/src/main.rs

use actix::Actor;

// inside main()
let server = websocket::Server::new().start(); // new line

App::new()
.wrap(cors)
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.data(pool.clone())
.data(server.clone()) // new line here
.configure(routes::routes)

连接 websocket 路由

use actix_web::web;

use crate::websocket;

pub mod questions;

pub fn routes(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("/ws/").route(web::get().to(websocket::ws_index))
).service(
web::scope("/api")
.service(web::scope("/questions")
.route("", web::get().to(questions::get_all))
.route("", web::post().to(questions::create))),
);
}

打开并让我们将新问题传递给 websocket。server/src/routes/questions/create.rs

// new use statements
use actix::Addr;
use serde_json::to_value;

use crate::websocket::{MessageToClient, Server};

// updates to create()
pub async fn create(
pool: Data<PgPool>,
websocket_srv: Data<Addr<Server>>,
params: Json<CreateRequest>,
) -> Result<Json<Question>, Error> {
if params.body == "" {
return Err(Error::BadRequest("Body is required".to_string()));
}

let connection = get_conn(&pool)?;

let question = block(move || Question::create(&connection, &params.body)).await?;

if let Ok(question) = to_value(question.clone()) {
let msg = MessageToClient::new("newquestion", question);
websocket_srv.do_send(msg);
}

Ok(Json(question))
}

我们将 websocket 服务器添加为数据参数。将其包装在 Addr 类型中,因为它是一个 Actor。创建问题后,我们调用 websocket 服务器,首先将问题转换为 Value,然后将其作为 MessageToClient 发送。你会遇到一些编译器错误,所以让我们来修复它。

首先,我们的 Question 没有实现 so open up 。clone()db/src/models/question.rs

#[derive(Clone, Debug, Identifiable, Serialize, Deserialize, Queryable)]
pub struct Question {
pub id: i32,
pub body: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

在这里,我们将 Clone 添加到派生列表中,以便可以克隆我们的结构。我们使用此方法,因为在没有克隆的情况下传递它会导致移动,并且我们无法在 JSON 响应中返回问题。to_value

现在,我们将更新测试以支持 websocket。开门。server/src/tests.rs

// add the use statements so we can add the Server
use actix::Actor;

use crate::websocket::Server;

// update get_service to include the websocket server
pub async fn get_service(
) -> impl Service<Request = Request, Response = ServiceResponse<Body>, Error = Error> {
test::init_service(
App::new()
.data(db::new_pool())
.data(Server::new().start())
.configure(routes),
)
.await
}

运行以确保我们当前的测试通过。make test

测试 websocket

在过去的测试中,我们使用了 ,但这并不适用于 websocket 服务器,因为我们需要运行 actor 来接收来自它的消息。因此,让我们采用不同的方法。开门。test_serviceserver/src/tests.rs

use actix_web_actors::ws;

use crate::websocket::{MessageToClient, Server}; // add MessageToClient here

pub fn get_test_server() -> test::TestServer {
test::start(|| {
App::new()
.data(db::new_pool())
.data(Server::new().start())
.configure(routes)
})
}

pub fn get_websocket_frame_data(frame: ws::Frame) -> Option<MessageToClient> {
match frame {
ws::Frame::Text(t) => {
let bytes = t.as_ref();
let data = String::from_utf8(bytes.to_vec()).unwrap();
let value: MessageToClient = serde_json::from_str(&data).unwrap();
return Some(value);
}
_ => {}
}

None
}

我们这里有两个功能。第一个版本使用的内容与它几乎相同,但它返回一个正在运行的 Web 服务器。第二个是辅助函数,可用于从 websocket 连接流获取帧,并将它们转换为我们可以读取的 json 数据。get_service

再次打开文件,让我们更新 Happy path 测试。create.rs

#[cfg(test)]
mod tests {
use actix_web::client::Client;
use futures::StreamExt;
use serde_json;
// etc

我们在此处添加一些重要的 use 语句。 是一个 Web 客户端,我们可以使用它来建立 websocket 连接。 是流式传输 WebSocket 响应所需的。 将 Web Socket 中的数据转换为我们的类型。ClientStreamExtserde_json

跳转到该函数并清空它,我们将替换它的大部分。test_create_question

#[actix_rt::test]
pub async fn test_create_question() {
let pool = new_pool();
let conn = get_conn(&pool).unwrap();

let srv = tests::get_test_server();

let client = Client::default();
let ws_conn = client.ws(srv.url("/ws/")).connect().await.unwrap();

let mut res = srv
.post("/api/questions")
.send_json(&NewQuestion {
body: "A new question".to_string(),
})
.await
.unwrap();

像以前一样,我们创建数据库连接池并获得一个连接。我们使用 new 函数来启动测试服务器。然后,我们创建一个新的 Web 客户端,并通过我们在路由配置中指定的 URL 连接到 websocket 服务器。然后,我们将 test_post() 调用替换为直接在测试服务器上调用 post()。

 assert_eq!(res.status().as_u16(), 200);

let question: Question = res.json().await.unwrap();
assert_eq!(question.body, "A new question");

let mut stream = ws_conn.1.take(1);
let msg = stream.next().await;

let data = tests::get_websocket_frame_data(msg.unwrap().unwrap());
if data.is_some() {
let msg = data.unwrap();
assert_eq!(msg.msg_type, "newquestion");
let question: Question = serde_json::from_value(msg.data).unwrap();
assert_eq!(question.body, "A new question");
} else {
assert!(false, "Message was not a string");
}

与我们的旧测试类似,我们会检查HTTP请求的状态和响应正文。然后,我们从 websockect 连接获取一个流,只需通过 . 请求总共 1 条消息。将数据从框架中提取出来,我们使用 Option 类型来处理是否出现任何问题。如果数据是我们需要的类型,我们就会检查 websocket 框架中的字段。take()

  drop(stream);

let result_questions = questions::dsl::questions.load::<Question>(&conn).unwrap();
assert_eq!(result_questions.len(), 1);
assert_eq!(result_questions[0].body, "A new question");

srv.stop().await;

diesel::delete(questions::dsl::questions)
.execute(&conn)
.unwrap();
}

通过停止流来完成测试。为此,我们使用标准的 drop(),因为流实现了 Drop trait。当变量超出范围时,将调用此函数,但我们还需要停止服务器,因此我们必须在此处手动调用 drop。之后我们有之前的测试代码,检查数据库并清理它,然后是服务器停止代码。

快运行吧,你应该很高兴!make test

原文来源:https://agmprojects.com/blog/building-a-rest-and-web-socket-api-with-actix.html

#你可能也喜欢这些API文章!