掌握API建模:基本概念和实践
什么是变更数据捕获?
CDC 的基础知识
更改数据捕获 (CDC) 的理念是捕获对数据库中的数据所做的更改,然后将这些更改记录实时传送到下游流程或系统。
让我们从数据库复制过程的简单演示开始。有一个跟随者数据库从领导者数据库复制数据。您可以以记录日志流的形式查看领导者数据库的更改。每次有人成功写入领导者数据库时,这就是流中的一条记录。如果您按照与原始数据库提交事件完全相同的顺序将这些事件应用到关注者数据库,那么您最终会得到数据库的精确副本。
但我们可以利用 CDC 记录做更多事情。关注者不需要是关注者数据库。它还可以是您想要保留最新用户的缓存,或者是在添加新文档时更新的搜索索引等等。这个想法是,您可以将不同的使用者附加到不同用例的 CDC 记录,并且它们与原始数据库以及彼此之间完全解耦。
我如何建立一个CDC系统?
我们刚刚回顾的 CDC 用例非常令人兴奋,并且可以应用于广泛的问题。
现在我们“只”需要构建一个系统,该系统可以:
- 不断从数据库中提取更改
- 持续处理 CDC 记录
- 不断发出目标系统要采取的操作
肯定有很多开源工具可以让您构建一个系统来做到这一点。如果您沿着这条路开始,您可能很快就会设计出一个与此类似的系统:
- Debezium允许您捕获来自各种数据库的更改并将 CDC 记录传递到 Kafka
- Kafka可以持久保存这些 CDC 记录,并允许不同的消费者独立处理事件。 Pulsar 或 Kinesis 也是这里的常见选择。
- Flink是一个强大的分布式有状态数据处理引擎,用于处理 CDC 事件。其他选择通常包括 Spark 和 KsqlDB。
每个系统都会执行所承诺的操作,但部署、配置和操作每个系统以及确保它们彼此无缝协作的开销并非微不足道。这就是 Decodable 将其作为服务提供的原因。
可解码流程 CDC 记录即服务
Decodable 是一个实时数据处理平台,为 CDC 处理提供一流的服务。用户只需连接到源数据库并实时处理 CDC 记录,无需管理底层基础设施。
在 Decodable 中,CDC 连接器发出更改记录,这些记录存储在更改流中。每个更改记录都包含修改类型:插入、更新和删除,以及修改的数据。这些记录是根据管道中的修改进行处理的。在接收器一侧,更改流可以连接到支持使用更改记录的连接器(例如:Postgres Sink)。
典型的工作流程包含三个步骤(出于演示目的,此处显示的示例已被简化):
1. 配置 Decodable 的 CDC 连接器(例如:Postgres CDC、MySQL CDC),它会自动提供包含 CDC 记录的变更流。例如:{"op":"c","before":null,"after":{"id":1,"user":"alice","status":"NEW"},"ts_ms":0} {"op":"c","before":null,"after":{"id":2,"user":"alice","status":"NEW"},"ts_ms":1} {"op":"c","before":null,"after":{"id":3,"user":"bob","status":"NEW"},"ts_ms":2} {"op":"u","before":{"id":2,"user":"alice","status":"NEW"},"after":{"id":2,"user":"alice","status":"SHIPPED"},"ts_ms":3} {"op":"u","before":{"id":2,"user":"alice","status":"SHIPPED"},"after":{"id":2,"user":"alice","status":"DELIVERED"},"ts_ms":4}
Copy
2. 创建管道,处理逻辑用SQL编写。例如,下面的 SQL 查询对每个用户未交付的所有订单进行计数。insert into non_delivered_count select user, count(status) as total_non_delivered from orders where status != "DELIVERED" group by user
Copy
上面的查询还会自动配置一个输出更改流non_delivered_count ,其中包含供接收器连接器使用的 CDC 记录。由于管道是连续处理的,因此结果也会随着每个输入记录而更新。输出流中的记录如下所示:{"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":0} {"op":"d","before":{"user":"alice","total_non_delivered":1},"after":null,"ts_ms":1} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":2},"ts_ms":1} {"op":"c","before":null,"after":{"user":"bob","total_non_delivered":1},"ts_ms":2} {"op":"d","before":{"user":"alice","total_non_delivered":2},"after":null,"ts_ms":4} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":4}
Copy
3. 配置一个接收器连接器,它可以消耗更改以实时查看结果。例如,如果配置了 Postgres 接收器,则连接会针对流中的每条记录向 Postgres 数据库发出一个操作。它们看起来像:INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 2) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("bob", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered;
Copy
请注意,SQL 语法INSERT…ON CONFLICT…DO UPDATE SET…用于确保以原子方式应用撤消(删除 + 创建)。
通过 CDC 解锁更多用例
现在我们知道 Decodable 可以实时处理 CDC 记录,并支持目标系统始终拥有最新视图。但没有限制一次只能处理一个变更流。借助 Flink 强大的引擎和易于访问的 SQL 接口,流连接变得更加容易——您可以实时连接来自不同数据库的表!
文章来源:https://www.decodable.co/blog/what-is-change-data-capture