什么是变更数据捕获?
CDC 的基础知识
更改数据捕获 (CDC) 的理念是捕获对数据库中的数据所做的更改,然后将这些更改记录实时传送到下游流程或系统。
让我们从数据库复制过程的简单演示开始。有一个跟随者数据库从领导者数据库复制数据。您可以以记录日志流的形式查看领导者数据库的更改。每次有人成功写入领导者数据库时,这就是流中的一条记录。如果您按照与原始数据库提交事件完全相同的顺序将这些事件应用到关注者数据库,那么您最终会得到数据库的精确副本。

但我们可以利用 CDC 记录做更多事情。关注者不需要是关注者数据库。它还可以是您想要保留最新用户的缓存,或者是在添加新文档时更新的搜索索引等等。这个想法是,您可以将不同的使用者附加到不同用例的 CDC 记录,并且它们与原始数据库以及彼此之间完全解耦。

我如何建立一个CDC系统?
我们刚刚回顾的 CDC 用例非常令人兴奋,并且可以应用于广泛的问题。
现在我们“只”需要构建一个系统,该系统可以:
- 不断从数据库中提取更改
- 持续处理 CDC 记录
- 不断发出目标系统要采取的操作
肯定有很多开源工具可以让您构建一个系统来做到这一点。如果您沿着这条路开始,您可能很快就会设计出一个与此类似的系统:

- Debezium允许您捕获来自各种数据库的更改并将 CDC 记录传递到 Kafka
- Kafka可以持久保存这些 CDC 记录,并允许不同的消费者独立处理事件。 Pulsar 或 Kinesis 也是这里的常见选择。
- Flink是一个强大的分布式有状态数据处理引擎,用于处理 CDC 事件。其他选择通常包括 Spark 和 KsqlDB。
每个系统都会执行所承诺的操作,但部署、配置和操作每个系统以及确保它们彼此无缝协作的开销并非微不足道。这就是 Decodable 将其作为服务提供的原因。
可解码流程 CDC 记录即服务
Decodable 是一个实时数据处理平台,为 CDC 处理提供一流的服务。用户只需连接到源数据库并实时处理 CDC 记录,无需管理底层基础设施。
在 Decodable 中,CDC 连接器发出更改记录,这些记录存储在更改流中。每个更改记录都包含修改类型:插入、更新和删除,以及修改的数据。这些记录是根据管道中的修改进行处理的。在接收器一侧,更改流可以连接到支持使用更改记录的连接器(例如:Postgres Sink)。
典型的工作流程包含三个步骤(出于演示目的,此处显示的示例已被简化):
1. 配置 Decodable 的 CDC 连接器(例如:Postgres CDC、MySQL CDC),它会自动提供包含 CDC 记录的变更流。例如:{"op":"c","before":null,"after":{"id":1,"user":"alice","status":"NEW"},"ts_ms":0} {"op":"c","before":null,"after":{"id":2,"user":"alice","status":"NEW"},"ts_ms":1} {"op":"c","before":null,"after":{"id":3,"user":"bob","status":"NEW"},"ts_ms":2} {"op":"u","before":{"id":2,"user":"alice","status":"NEW"},"after":{"id":2,"user":"alice","status":"SHIPPED"},"ts_ms":3} {"op":"u","before":{"id":2,"user":"alice","status":"SHIPPED"},"after":{"id":2,"user":"alice","status":"DELIVERED"},"ts_ms":4}
Copy
2. 创建管道,处理逻辑用SQL编写。例如,下面的 SQL 查询对每个用户未交付的所有订单进行计数。insert into non_delivered_count select user, count(status) as total_non_delivered from orders where status != "DELIVERED" group by user
Copy
上面的查询还会自动配置一个输出更改流non_delivered_count ,其中包含供接收器连接器使用的 CDC 记录。由于管道是连续处理的,因此结果也会随着每个输入记录而更新。输出流中的记录如下所示:{"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":0} {"op":"d","before":{"user":"alice","total_non_delivered":1},"after":null,"ts_ms":1} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":2},"ts_ms":1} {"op":"c","before":null,"after":{"user":"bob","total_non_delivered":1},"ts_ms":2} {"op":"d","before":{"user":"alice","total_non_delivered":2},"after":null,"ts_ms":4} {"op":"c","before":null,"after":{"user":"alice","total_non_delivered":1},"ts_ms":4}
Copy
3. 配置一个接收器连接器,它可以消耗更改以实时查看结果。例如,如果配置了 Postgres 接收器,则连接会针对流中的每条记录向 Postgres 数据库发出一个操作。它们看起来像:INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 2) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("bob", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered; INSERT INTO postgres_output(user, total_non_delivered) VALUES ("alice", 1) ON CONFLICT (user) DO UPDATE SET id=EXCLUDED.user, value=EXCLUDED.total_non_delivered;
Copy
请注意,SQL 语法INSERT…ON CONFLICT…DO UPDATE SET…用于确保以原子方式应用撤消(删除 + 创建)。
通过 CDC 解锁更多用例
现在我们知道 Decodable 可以实时处理 CDC 记录,并支持目标系统始终拥有最新视图。但没有限制一次只能处理一个变更流。借助 Flink 强大的引擎和易于访问的 SQL 接口,流连接变得更加容易——您可以实时连接来自不同数据库的表!
文章来源:https://www.decodable.co/blog/what-is-change-data-capture
热门API
- 1. AI文本生成
- 2. AI图片生成_文生图
- 3. AI图片生成_图生图
- 4. AI图像编辑
- 5. AI视频生成_文生视频
- 6. AI视频生成_图生视频
- 7. AI语音合成_文生语音
- 8. AI文本生成(中国)
最新文章
- GPT-4o API全攻略:多模态AI模型的功能解析与实战指南
- Python 使用 话费 API:轻松实现自动话费查询功能
- 构建现代RESTful API:C#中的关键标准和最佳实践
- 优化 ASP.NET Core Web API 性能方法
- 如何设计一个对外的安全接口?
- 2025 LangGraph AI 工作流引擎|可视化多 Agent 协作+节点扩展教程
- 动漫百科全书API:你准备好探索动漫世界的无限可能了吗?
- Claude API在中国停用后的迁移与替代方案详解
- Grafana API 入门指南:自动化仪表板管理与高级功能
- 常用的14条API文档编写基本准则
- 如何获取 Kimi K2 API 密钥(分步指南)
- 为什么需要隐藏您的 API Key 密钥