CDC(Change Data Capture) 是一种实时捕获数据库中数据变更的技术,能够记录插入、更新和删除操作。它通过监控数据库的变更,实现数据的实时同步和增量处理,确保不同系统间数据的一致性。CDC 适用于实时数据同步、数据迁移、灾难恢复和审计跟踪等场景,通过读取事务日志等方式,减少全量数据复制的压力,并提升系统的性能和效率。
MO从2.0.0版本开始支持CDC功能,本文将简单介绍MO CDC模块的原理,实现及优化。
整体结构
本节将简单介绍CDC模块在MO中位置以及CDC模块的简略整体架构。
CDC在MO中的位置
CDC模块在MO中的位置大致如上图所示,图中的模块有:
- mo_cdc:MO开发的小工具,方便用户操作使用CDC功能,它接受简单的命令,生成sql语句,发送至MO执行,并展示执行结果。当然你也可以不使用这个工具,通过sql语句直接操作CDC任务
- FE:MO中的前端层,在本文中CDC模块中负责维护CDC任务的增删改查
- CDC:对于running的任务,定时向存储层查询最近修改,并转成sql发送到sink端
- Disttae:MO的存储层,提供指定时间段内的数据变更(新增和删除的数据)
- Downstream:接收同步数据的下游,目前支持下游类型有Mysql和MO
典型的CDC任务控制流为:
- 用户通过mo_cdc创建cdc任务,指定任务参数
- MO前端收到创建命令,拉起CDC任务
- CDC Module定期向MO存储层查询这段时间的数据变更
- CDC Module处理变更数据并转换成sql语句
- 将sql语句发送到下游执行,执行成功之后更新同步记录
CDC模块框架
CDC模块的大致框架如上图所示,由以下几个组件构成:
- reader负责定期从存储层读取最新的数据变化
- sinker负责把读取的信息按序转换为sql语句,并发送到下游
- 一个reader和一个sinker组成一条pipeline(上图中红色虚线框),一条pipeline对应一张表的同步任务,pipeline之间互相独立,互不影响
- 每个CDC任务有一个WatermarkUpdater,用来记录每张表的同步进度
CDC组件介绍
本节将具体介绍CDC各个组件的工作流程
Reader
Reader读取数据时序图
Reader会定期(每200ms一次)从存储层读取这段时间内的数据变更,具体操作:
- 从WatermarkUpdate获取当前水位信息(From)
- 调用存储的CollectChanges(From, Now)接口,得到changes对象(可以认为是一个iterator)
- 循环调用changes的Next接口获取数据
- 得到的新增和删除数据分别append到两个AtomicBatch对象中
- 获取完changes所有数据后,将数据发送到Sinker
上面第4步中提到的AtomicBatch,其定义如下:
type AtomicBatch struct {
Mp *mpool.MPool
From, To types.TS
Batches []*batch.Batch
Rows *btree.BTreeG[AtomicBatchRow]
}
type AtomicBatchRow struct {
Ts types.TS // (Ts, Pk) is the key for compare
Pk []byte
Offset int
Batch *batch.Batch
}
它主要由两部分组成:
- Batches负责存储具体数据
- Rows是一颗平衡二叉树(我们这里使用了B+树),将Batches中的每条记录按Ts,Pk为序排列
这边之所以需要排序,是因为存储层出于性能考虑,返回的数据不一定是有序的,为了保证正确性,我们需要现将数据按照提交事物的时间戳进行排序。 添加数据时,我们以(Ts, Pk)为key将数据插入树中,后续操作中,Sinker可以按序将batch数据取出。
Sinker
通过介绍我们知道,Reader会将读取到的数据变更(新增和删除)放到两个AtomicBatch中,AtomicBatch中有一棵B+树,我们可以按照(Ts, Pk)从小到大的顺序读取每条记录。
Sinker通过一种类似merge sort的流程来读取数据,比较insert/delete AtomicBatch中树顶(即最小)的元素,确定当前剩余数据中事务提交时间戳最小的那条记录,弹出该条记录生成对应的sql,然后继续比较之前比较操作,直到某个AtomicBatch为空,再将另一个AtomicBatch中的数据按序弹出即可。
WatermarkUpdater
WatermarkUpdater在内存有一张map记录该CDC任务中每张表的同步进度,Sinker将数据发送到下游并收到下游执行成功的回复之后,会将最新的进度更新到map中;同时MO有一张元数据表用于保存这些进度值,WatermarkUpdater有一个定时任务,会定期将map中各表的进度值持久化到元数据表中,确保CDC任务意外停止(比如MO进程崩溃,机器断电等)之后,重启时不必再同步已同步的数据。
性能优化
现在我们已经有了个简单的CDC模块原型,但这个原型的性能比较差,还有很多提升空间,这节简单介绍几个我们对于CDC模块做的性能优化。
优化一 sql合并发送
现在MO准备数据,网络传输及下游执行sql的时序大概如下所示:
我们知道sql批量执行会比拆分成多条多次执行的效率会高很多,当前MO CDC是将一个batch内的数据(最多8192条)转换成一条sql,我们可以将一个(From,To)时间段内数据尽量多的拼到一条sql中,直到达到下游规定的网络通信包大小的最大值(比如Mysql中的max_allowed_packet系统变量);同时每次与下游的通信都会有固定开销,将sql合并之后会减少与下游的通信次数,因而减少这类开销。
sql合并后的时序大概是这样:
优化二 非阻塞式sql生成
观察优化一之后的时序图,发现每次Sinker把sql发送出去之后就原地阻塞等待下游的返回,收到下游的ack之后才开始读取下一轮的数据变更及sql转换操作,浪费了等待下游返回的时间,而且在大数据量的场景下,执行合并sql的优化之后,准备sql是比较长的一段时间,这个问题更为明显。 因此,MO在等待下游sql执行结果时,就可以开始准备下一轮的数据,等下游返回结果之后就可以立马把下一轮的sql发送出去,时序类似这样:
优化三 定制化Mysql Driver
MO CDC下游主要支持兼容Mysql协议的数据库为主,我们使用Mysql Driver连接下游数据库,在研究Driver源码后,我们发现驱动根据协议,需要在我们传下去的sql语句前面加5个字节的标志位,实现这个操作的步骤是:先申请len(sql) + 5 的空间,把前5个字节填上相应的状态位,然后把sql复制到后面的空间中。
为了在sql前面腾出5个字节作为状态控制位,需要申请新空间加复制大量数据,操作花费昂贵,通过研究执行sql的调用传递链路,我们发现一个tricky的方式,新定义一个sql.NamedArg,把预留前5个字节的sql语句作为value,将sql传到Dirver中,Driver识别到这个特殊的NamedArg就直接在sql的前几个字节设置状态位。
reuseQueryArg := sql.NamedArg{
Name: mysql.ReuseQueryBuf,
// 前5个字节留空
Value: sqlBuf,
}
优化效果
我们使用tpch 10g库中的lineitem表进行全表同步的测试,具体数据如下表,可以看到通过上述3个优化,同步时间能减少到之前的50%不到:
优化 | 同步时间 |
---|---|
未优化 | 37min30s |
优化一 | 25min30s |
优化一 + 优化二 | 18min |
优化一 + 优化二 + 优化三 | 17min |
总结
本文简单介绍了CDC模块的功能职责,在MO中的位置及角色;CDC模块各组件的设计及实现;性能优化措施等,限于篇幅以及MO具体实现的复杂性,很多环节都以较通俗易懂的图来阐释,并不能完全展示其中的细节(当然不影响理解CDC模块的原理及运行流程)。
如果有兴趣了解更多实现细节,可以通过github获取源码来阅读,如果有CDC设计或者优化的建议,欢迎交流。