作者:刘博 MO研发工程师
导读
Logservice 在 MatrixOne 中扮演着非常重要的角色,它是独立的服务,通过RPC的方式供外部组件使用,用来对日志进行管理。
Logservice使用基于raft协议的dragonboat库(multi-raft group的golang开源实现),通常情况下使用本地磁盘,以多副本的方式保存日志,可以理解为对 WAL 的管理。事务的提交只需要写入Logservice中就可以,不需要将数据写入到S3,有另外的组件异步地将数据批量写入到S3上。这样的设计,保证了在事务提交时的低延迟,同时多个副本也保证了数据的高可靠。
本文将重点分享MatrixOne中 logservice 模块的主要原理。
下面是本文目录概览:
1.
整体架构
2.
客户端
3.
服务端
4.
Write&Read
5.
Truncation
本文字数:2800字+ 阅读时间:5分钟+
Part 1
整体架构
如下图所示是 logservice 的整体架构。在 server 中包括 handler、dragonboat、以及 RSM 几个模块。
Part 2
客户端
Logservice client 主要是提供给 DN 调用,关键接口说明如下:
Close() 关闭客户端连接。
Config() 获取 client 相关配置。
GetLogRecord() 返回一个 pb.LogRecord 变量,该结构包括 8 字节的 Lsn,4 字节的 record type,以及类型为 []byte 的 Data。Data 部分包括 4 字节的 pb.UserEntryUpdate,8 字节的 replica DN ID,已经 payload []byte。
Append() append pb.LogRecord 到 logservice,返回值为 Lsn。在调用端,参数 pb.LogRecord 可以复用。
Read() 从 logservice 中读取 firstLsn 开始的日志,达到 maxSize 时结束读取,返回值 Lsn 作为下一次读取的起点。
Truncate() 删除 lsn 之前的日志,释放磁盘空间。
GetTruncatedLsn() 返回最新删除的日志 Lsn。
GetTSOTimestamp() 向 TSO 请求总数为 count 的时间戳,调用之前的时间戳,调用者占用 [returned value, returned value + count] 这个范围。该方法暂未使用。
Client 通过 MO-RPC 向 logservice 的 server 端发送请求,server 端与 raft/drgonboat 交互返回结果。
Part 3
服务端
Server Handler
Logservice 的 server 端接收到 client 发送的请求做处理,入口函数是 (*Service).handle(),不同的请求会调用不同的方法进行处理:
Append ,Append 日志到 logservice,最终会调用 dragonboat 的 (*NodeHost) SyncPropose() 方法同步提交 propose 请求,需要日志 commit 并且 apply 之后才能返回,返回值是日志写成功后的 Lsn。
Read ,从 log db 中读取 log entry。首先调用 (*NodeHost) SyncRead() 从状态机中线性读取到当前的 Lsn,再根据 Lsn,调用 (*NodeHost) QueryRaftLog() 从 log db 中读取到 log entry。
Truncate,截断 log db 中的日志,释放磁盘空间。注意,这里只是向状态机中更新当前可以 truncate 的 lsn,不是真正的做 truncate 操作。
Connect ,与 logservice server 建立连接,尝试读写状态机进行状态检查。
Heartbeat ,包括 logservice、CN 和 DN 的心跳,该请求向 HAKeeper 的状态机更新各自的状态信息,并且同步 HAKeeper 的 tick,在 HAKeeper 做 check 时根据 tick 比较离线时间,如果离线,就出发 remove/shutdown 等操作。
Get XXX ,从状态机中获取信息。
Bootstrap
Bootstrap 是 logservice 服务启动的时候进行的,通过 HAKeeper shard (shard ID 是0)来完成,入口函数是 (*Service) BootstrapHAKeeper。
不管配置中设置了几个副本,每个 logservice 进程启动的时候都会启动 HAKeeper 的一个副本,每个副本在启动时设置了 members,HAKeeper shard 以这些 members 作为默认的副本数启动 raft。
在完成 raft 的 leader election 之后,执行 set initial cluster info,设置 log、DN 的 shard 数以及 log 的副本数。
设置完副本数之后,会把多余的 HAKeeper 副本停掉。
Heartbeat
该心跳是 logservice、CN 以及 DN 发送到 HAKeeper 的心跳,并不是 raft 副本之间的心跳!主要有两个作用:
通过心跳发送各副本状态信息给 HAKeeper ,HAKeeper 的 RSM 更新副本信息;
心跳返回时,从 HAKeeper 中获取到需要副本执行的命令。
Logservice 的 heartbeat 流程如下图,CN、DN 的流程与之类似。
心跳默认每 1 秒执行一次,其原理如下:
以 store 级别,生成该 store 上所有 shard 的副本的心跳信息,包括 shard ID、节点信息、term、leader 等;
发送 request 到 logservice 的 server 端;
Server 收到请求后调用 (*Service) handleLogHeartbeat() 做处理,调用 propose 将心跳发送到 raft;
HAKeeper 的 RSM 收到心跳后,调用 (*stateMachine) handleLogHeartbeat() 做处理,主要做两件事:
更新状态机中的 LogState:调用 (*LogState) Update() 更新 stores 和 shards;
从状态机的 ScheduleCommands 中获取到 commands,返回给发起端执行 command。
CN 和 DN 到 HA keeper 的心跳原理也是一样的。
RSM
Logservice 和 HAKeeper 的状态机都是基于内存的状态机模型,所有的数据都只保存在内存中。它们都实现了 IStateMachine 接口。主要方法说明如下:
Update() ,在一次 propose 完成 commit 之后(即多数副本完成写入),会调用 Update 接口,更新状态机中的数据。Update() 方法的实现由用户完成,必须是无副作用的(Side effect),相同的输入必须得到相同的输出结果,否则会导致状态机的不稳定。Update() 的结果通过 Result 结构返回,如果发生错误,error 不为空。
Lookup() ,查找状态机中的数据,通过 interface{} 来指明需要查找什么数据,返回的结果也是 interface{} 类型,因此需要用户自己定义好状态机中的数据,传入相应的数据类型,返回对应的数据类型,做类型断言。Lookup() 是一个只读的方法,不应该去修改状态机中的数据。
SaveSnapshot(),创建 snapshot,将状态机中的数据写入 io.Writer 接口,通常是文件句柄,因此最终会保存到本地磁盘文件中。ISnapshotFileCollection 是除了状态机中的数据以外的文件系统中的文件列表,如果有的话,也会转存到快照中。第三个参数用来通知 snapshot precedure,raft 副本已经停止,终止打快照的操作。
RecoverFromSnapshot() ,恢复状态机数据,从 io.Reader 中读取最新的 snapshot。[]SnapshotFile 是一些额外的文件列表,直接复制到状态机数据目录中。第三个参数用来控制 raft 副本时也终止掉恢复快照的操作。
Close() ,关闭状态机,做一些清理工作。
Part 4
Write&Read
简单说明一下 logservice 中一次读写请求的流程。
Write
如果连接的不是 leader,会转发到 leader 上面;leader 接收到请求后,将 log entries 写入本地磁盘。
同时,异步地发送给 follower 节点,follower 节点接收到请求后,将 log entries 写入本地磁盘。
当本次 append 在大多数节点上完成后,更新 commit index,并通过心跳通知给其他 follower 节点。
Leader commit 之后开始 apply 状态机操作。
Apply 完成后,返回客户端
Follower 接收到来自 leader 的 commit index 之后,各自 apply 自己的状态机
Read
读取数据分为两种:
- 从状态机中读取数据
- 从 log db 中读取 log entries
从状态机中读取数据
从状态机中读取数据,如下图:
客户端发起 read 请求,到达 leader 节点时,会记录此时的 commit index;
Leader 向所有的节点发送心跳请求,确认自己的 leader 地位,大多数节点回复时,确定仍然是 leader,可以回复读请求;
等待 apply index 大于等于 commit index;
此时才可以读取状态机中的数据返回给客户端
从logdb 读取
从 log db 中读取 log entries,如下图:
过程比较简单,通常发生在集群重启的时候。
重启时,副本需要先从 snapshot 中恢复状态机的数据,然后从 snapshot 中记录的 index 位置开始读取 log db 中的 log entries,应用到状态机中。该操作完成后才可以参与 leader 的选举。当集群选举出 leader 后,DN 会连接到 logservice cluster,从其中一个副本的 log db 的上一次 checkpoint 位置开始读取 log entries,replay 到 DN 自己的内存数据中。
Part 5
Truncation
Log db 的 log entries 如果一直增长会导致磁盘空间不足,因此需要定期释放磁盘空间,通过 truncation 完成。
Logservice 使用的是基于内存的状态机,状态机中没有记录用户数据,只记录了一些元数据和状态信息,比如 tick、state 和 LSN 等。用户数据由 DN tae 自己记录。可以理解为在 MO 中,状态机是分离的,tae 和 logservice 分别维护了一个状态机。
在这种状态机分离的设计下,简单的 snapshot 机制会导致问题:
如图中所示,tae 发送一个 truncate 请求,truncate index 是 100,但是此时 logservice 状态机 applied index 是 200,即 200 之前的日志会被删掉,然后在这个位置生成快照。注意:truncate_index != applied_index。
集群重启。
Logservice 状态机 apply snapshot,index 为 200,并设置 first index 为 200(200之前的日志都已经删除),然后 logservice 状态机开始回放日志,回放完成后对外提供服务。
Tae 从 logservice 读取 log entries,读取起始位置是 100,但是无法读取,因为 200 之前的日志都已经删除,发生错误。
为了解决上面描述的问题,目前 truncation 的整体工作流程如下图所示:
Tae 发送 truncate 请求,更新 logservice 状态机中的 truncateLsn,此时只是更新该值,不做 snapshot/truncate 操作。
每个 logservice server 内部都会启动一个 truncation worker,每隔一段时间就会发送一次 Truncate Request,注意,这个 Request 其中的参数 Exported 设置为 true,表明这个 snapshot 对系统不可见,只是将 snapshot 导出到某个目录下。
Truncation worker 中还会检查当前已经 export 的 snapshot 列表,是否有 index 大于当前 logservice 状态机中的 truncateLsn 的,如果有,将最接近 truncateLsn 的那个 snapshot 导入到系统中,使之生效,对系统可见。
所有的副本都执行相同的操作,这样就保证了两个状态机的 snapshot lsn 是一样的,在集群重启时,可以读取到对应的 log entries。
以上就是logservice模块的主要工作原理,若对 MatrixOne 的更多架构感兴趣,欢迎加入我们的技术交流群。