TiDB Binlog 源码阅读系列文章(四)Pump server 介绍

2024-04-08 02:58

本文主要是介绍TiDB Binlog 源码阅读系列文章(四)Pump server 介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

作者: satoru

在 上篇文章 中,我们介绍了 TiDB 如何通过 Pump client 将 binlog 发往 Pump,本文将继续介绍 Pump server 的实现,对应的源码主要集中在 TiDB Binlog 仓库的 pump/server.go 文件中。

启动 Pump Server

Server 的启动主要由两个函数实现:NewServer(*Server).Start

NewServer 依照传入的配置项创建 Server 实例,初始化 Server 运行所必需的字段,以下简单说明部分重要字段:

  1. metrics:一个 MetricClient,用于定时向 Prometheus Pushgateway 推送 metrics。

  2. clusterID:每个 TiDB 集群都有一个 ID,连接到同一个 TiDB 集群的服务可以通过这个 ID 识别其他服务是否属于同个集群。

  3. pdCli:PD Client,用于注册、发现服务,获取 Timestamp Oracle。

  4. tiStore:用于连接 TiDB storage engine,在这里主要用于查询事务相关的信息(可以通过 TiDB 中的对应 interface 描述 了解它的功能)。

  5. storage:Pump 的存储实现,从 TiDB 发过来的 binlog 就是通过它保存的,下一篇文章将会重点介绍。

Server 初始化以后,就可以用 (*Server).Start 启动服务。为了避免丢失 binlog,在开始对外提供 binlog 写入服务之前,它会将当前 Server 注册到 PD 上,确保所有运行中的 Drainer 都已经观察到新增的 Pump 节点。这一步除了启动对外的服务,还开启了一些 Pump 正常运作所必须的辅助机制,下文会有更详细的介绍。

Pump Server API

Pump Server 通过 gRPC 暴露出一些服务,这些接口定义在 tipb/pump.pb.go,包含两个接口 WriteBinlogPullBinlogs

WriteBinlog

顾名思义,这是用于写入 binlog 的接口,上篇文章中 Pump client 调用的就是这个。客户端传入的请求,是以下的格式:

type WriteBinlogReq struct {// The identifier of tidb-cluster, which is given at tidb startup.// Must specify the clusterID for each binlog to write.ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`// Payload bytes can be decoded back to binlog struct by the protobuf.Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}

其中 Payload 是一个用 Protobuf 序列化的 binlog,WriteBinlog 的 主要流程 就是将请求中的 Payload 解析成 binlog 实例,然后调用 storage.WriteBinlog 保存下来。storage.WriteBinlog 将 binlog 持久化存储,并对 binlog 按 start TS / commit TS 进行排序,详细的实现将在下章展开讨论。

PullBinlogs

PullBinlogs 是为 Drainer 提供的接口,用于按顺序获取 binlog。这是一个 streaming 接口,客户端请求后得到一个 stream,可以从中不断读取 binlog。请求的格式如下:

type PullBinlogReq struct {// Specifies which clusterID of binlog to pull.ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`// The position from which the binlog will be sent.StartFrom Pos `protobuf:"bytes,2,opt,name=startFrom" json:"startFrom"`
}// Binlogs are stored in a number of sequential files in a directory.
// The Pos describes the position of a binlog.
type Pos struct {// The suffix of binlog file, like .000001 .000002Suffix uint64 `protobuf:"varint,1,opt,name=suffix,proto3" json:"suffix,omitempty"`// The binlog offset in a file.Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
}

从名字可以看出,这个请求指定了 Drainer 要从什么时间点的 binlog 开始同步。虽然 Pos 中有 SuffixOffset 两个字段,目前只有 Offset 字段是有效的,我们把它用作一个 commit TS,表示只拉取这个时间以后的 binlog。

PullBinlogs 的 主要流程,是调用 storage.PullCommitBinlogs 得到一个可以获取序列化 binlog 的 channel,将这些 binlog 通过 stream.Send 接口逐个发送给客户端。

辅助机制

上文提到 Pump 的正常运作需要一些辅助机制,本节将逐一介绍这些机制。

fake binlog

在 《TiDB-Binlog 架构演进与实现原理》 一文中,对 fake binlog 机制有以下说明:

“Pump 会定时(默认三秒)向本地存储中写入一条数据为空的 binlog,在生成该 binlog 前,会向 PD 中获取一个 tso,作为该 binlog 的 start_tscommit_ts,这种 binlog 我们叫作 fake binlog。

……Drainer 通过如上所示的方式对 binlog 进行归并排序,并推进同步的位置。那么可能会存在这种情况:某个 Pump 由于一些特殊的原因一直没有收到 binlog 数据,那么 Drainer 中的归并排序就无法继续下去,正如我们用两条腿走路,其中一只腿不动就不能继续前进。我们使用 Pump 一节中提到的 fake binlog 的机制来避免这种问题,Pump 每隔指定的时间就生成一条 fake binlog,即使某些 Pump 一直没有数据写入,也可以保证归并排序正常向前推进。”

genForwardBinlog 实现了这个机制,它里面是一个定时循环,每隔一段时间(默认 3 秒,可通过 gen-binlog-interval 选项配置)检查一下是否有新的 binlog 写入,如果没有,就调用 writeFakeBinlog 写一条假的 binlog。

判断是否有新的 binlog 写入,是通过 lastWriteBinlogUnixNano 这个变量,每次有新的写入都会 将这个变量设置为当前时间。

垃圾回收

由于存储容量限制,显然 Pump 不能无限制地存储收到的 binlog,因此需要有一个 GC (Garbage Collection) 机制来清理没用的 binlog 释放空间,gcBinlogFile 就负责 GC 的调度。有两个值会影响 GC 的调度:

  1. gcInterval:控制 GC 检查的周期,目前写死在代码里的设置是 1 小时

  2. gcDuration:binlog 的保存时长,每次 GC 检查就是 通过当前时间和 gcDuration 计算出 GC 时间点,在这个时间点之前的 binlog 将被 GC 在 gcBinlogFile 的循环中,用 select 监控着 3 种情况:

select {
case <-s.ctx.Done():log.Info("gcBinlogFile exit")return
case <-s.triggerGC:log.Info("trigger gc now")
case <-time.After(gcInterval):
}

3 个 case 分别对应:server 退出,外部触发 GC,定时检查这三种情况。其中 server 退出的情况我们直接退出循环。另外两种情况都会继续,计算 GC 时间点,交由 storage.GC 执行。

Heartbeat

心跳机制用于定时(默认两秒)向 PD 发送 Server 最新状态,由 (*pumpNode).HeartBeat 实现。状态是由 JSON 编码的 Status 实例,主要记录 NodeIDMaxCommitTS 之类的信息。

HTTP API 实现

Pump Server 通过 HTTP 方式暴露出一些 API,主要提供运维相关的接口。

路径Handler说明
GET /statusStatus返回所有 Pump 节点的状态。
PUT /state/{nodeID}/{action}ApplyAction支持 pause 和 close 两种 action,可以暂停和关闭 server。接到请求的 server 会确保用户指定的 nodeID 跟自己的 nodeID 相匹配,以防误操作。
GET /drainersAllDrainers返回通过当前 PD 服务可以发现的所有 Drainer 的状态,一般用于调试时确定 Pump 是否能如预期地发现 Drainer。
GET /debug/binlog/{ts}BinlogByTS通过指定的 timestamp 查询 binlog,如果查询结果是一条 Prewrite binlog,还会额外输出 MVCC 相关的信息。
POST /debug/gc/triggerTriggerGC手动触发一次 GC,如果 GC 已经在运行中,请求将被忽略。

下线 Pump Server

下线一个 Pump server 的流程通常由 binlogctl 命令发起,例如:

bin/binlogctl -pd-urls=localhost:2379 -cmd offline-pump -node-id=My-Host:8240

binlogctl 先通过 nodeID 在 PD 发现的 Pump 节点中找到指定的节点,然后调用上一小节中提到的接口 PUT /state/{nodeID}/close

在 Server 端,ApplyAction 收到 close 后会将节点状态置为 Closing(Heartbeat 进程会定时将这类状态更新到 PD),然后另起一个 goroutine 调用 CloseClose 首先调用 cancel,通过 context 将关停信号发往协作的 goroutine,这些 goroutine 主要就是上文提到的辅助机制运行的 goroutine,例如在 genForwardBinlog 中设计了在 context 被 cancel 时退出:

for {select {case <-s.ctx.Done():log.Info("genFakeBinlog exit")return

ClosewaitGroup 等待这些 goroutine 全部退出。这时 Pump 仍然能正常提供 PullBinlogs 服务,但是写入功能 已经停止。Close 下一行调用了 commitStatus,这时节点的状态是 Closing,对应的分支调用了 waitSafeToOffline 来确保到目前为止写入的 binlog 都已经被所有的 Drainer 读到了。waitSafeToOffline 先往 storage 中写入一条 fake binlog,由于此时写入功能已经停止,可以确定这将是这个 Pump 最后的一条 binlog。之后就是在循环中定时检查所有 Drainer 已经读到的 Binlog 时间信息,直到这个时间已经大于 fake binlog 的 CommitTS

waitSafeToOffline 等待结束后,就可以关停 gRPC 服务,释放其他资源。

小结

本文介绍了 Pump server 的启动、gRPC API 实现、辅助机制的设计以及下线服务的流程,希望能帮助大家在阅读源码时有一个更清晰的思路。在上面的介绍中,我们多次提到 storage 这个实体,用来存储和查询 binlog 的逻辑主要封装在这个模块内,这部分内容将在下篇文章为大家作详细介绍。

原文阅读:https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-4/

这篇关于TiDB Binlog 源码阅读系列文章(四)Pump server 介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/884391

相关文章

一篇文章彻底搞懂macOS如何决定java环境

《一篇文章彻底搞懂macOS如何决定java环境》MacOS作为一个功能强大的操作系统,为开发者提供了丰富的开发工具和框架,下面:本文主要介绍macOS如何决定java环境的相关资料,文中通过代码... 目录方法一:使用 which命令方法二:使用 Java_home工具(Apple 官方推荐)那问题来了,

5 种使用Python自动化处理PDF的实用方法介绍

《5种使用Python自动化处理PDF的实用方法介绍》自动化处理PDF文件已成为减少重复工作、提升工作效率的重要手段,本文将介绍五种实用方法,从内置工具到专业库,帮助你在Python中实现PDF任务... 目录使用内置库(os、subprocess)调用外部工具使用 PyPDF2 进行基本 PDF 操作使用

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

Python 基于http.server模块实现简单http服务的代码举例

《Python基于http.server模块实现简单http服务的代码举例》Pythonhttp.server模块通过继承BaseHTTPRequestHandler处理HTTP请求,使用Threa... 目录测试环境代码实现相关介绍模块简介类及相关函数简介参考链接测试环境win11专业版python

Java中HashMap的用法详细介绍

《Java中HashMap的用法详细介绍》JavaHashMap是一种高效的数据结构,用于存储键值对,它是基于哈希表实现的,提供快速的插入、删除和查找操作,:本文主要介绍Java中HashMap... 目录一.HashMap1.基本概念2.底层数据结构:3.HashCode和equals方法为什么重写Has

SQL Server 查询数据库及数据文件大小的方法

《SQLServer查询数据库及数据文件大小的方法》文章介绍了查询数据库大小的SQL方法及存储过程实现,涵盖当前数据库、所有数据库的总大小及文件明细,本文结合实例代码给大家介绍的非常详细,感兴趣的... 目录1. 直接使用SQL1.1 查询当前数据库大小1.2 查询所有数据库的大小1.3 查询每个数据库的详

Springboot项目构建时各种依赖详细介绍与依赖关系说明详解

《Springboot项目构建时各种依赖详细介绍与依赖关系说明详解》SpringBoot通过spring-boot-dependencies统一依赖版本管理,spring-boot-starter-w... 目录一、spring-boot-dependencies1.简介2. 内容概览3.核心内容结构4.

Spring Boot 整合 SSE(Server-Sent Events)实战案例(全网最全)

《SpringBoot整合SSE(Server-SentEvents)实战案例(全网最全)》本文通过实战案例讲解SpringBoot整合SSE技术,涵盖实现原理、代码配置、异常处理及前端交互,... 目录Spring Boot 整合 SSE(Server-Sent Events)1、简述SSE与其他技术的对