TiDB 利用binlog 恢复-反解析binlog

2024-04-30 07:44
文章标签 tidb 恢复 解析 binlog

本文主要是介绍TiDB 利用binlog 恢复-反解析binlog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

我们知道TiDB的binlog记录了所有已经执行成功的dml语句,类似mysql binlog row模式

,TiDB官方也提供了reparo可以进行解析binlog,如下所示:

[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:153] ["Parsed start TSO"] [ts=449217508147200000]
[2024/04/26 20:58:02.136 +08:00] [INFO] [config.go:160] ["Parsed stop TSO"] [ts=449222855884800000]
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 1
merchant_id(varchar): 2
coupon_code(varchar): 4
customer_id(varchar): 4
customer_mobile_no(varchar): 5
trade_channel(varchar): 03
trade_store_id(varchar): 1440040
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 6
external_order_id(varchar): <nil>
receive_channel_code(varchar): 105
coupon_card_type(tinyint): 0
coupon_type(tinyint): 1
receive_type(smallint): 6
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1
schema: coupon_trade; table: coupon_trade_record; type: Insert
coupon_id(varchar): 2
merchant_id(varchar): <nil>
coupon_code(varchar): 5
customer_id(varchar): 6
customer_mobile_no(varchar): 7
trade_channel(varchar): 03
trade_store_id(varchar): 1420452
trade_store_name(varchar): <nil>
bill_type(tinyint): 0
bill_code(varchar): 8
external_order_id(varchar): <nil>
receive_channel_code(varchar): 03
coupon_card_type(tinyint): 0
coupon_type(tinyint): 0
receive_type(smallint): 4
receive_src_code(varchar): <nil>
process_type(tinyint): 2
order_food_type(varchar): 1
order_mode(varchar): 0
is_give_away(tinyint): 0
is_available(tinyint): 1
is_deleted(bigint): 0
updated_date(datetime): 2024-04-21 00:00:00
created_date(datetime): 2024-04-21 00:00:00
updated_user(varchar): default
created_user(varchar): default
version(int): 1

另外reparo 支持print 和mysql 两种模式,print只做解析打印到标准输出,不执行 SQL,mysql:是直接再下游数据库执行SQL

但是我们有的时候需要进去反解析,比如我们误删除了一些数据,我们要把误删除的数据解析成INSERT语句,这怎么办呢,TIDB目前不提供这种反解析工具,于是自己写了一个工具进行解析,代码如下:

package mainimport ("bufio""encoding/json""flag""fmt""log""os""regexp""strings""time"
)var (binlogFile stringschema     stringtable      stringsqlType    stringwhereSql   stringlogPath    stringfiledRe    = regexp.MustCompile(`\(.*?\)+:.*`)logOutFile *log.Logger
)type filed struct {Name  string `json:"name"`Value string `json:"value"`
}
type Parser struct {lines []filed
}//	type lists struct {
//		filed []filed
//	}
func compressStr(str string) string {str = strings.ReplaceAll(str, " ", "")r := strings.NewReplacer("\r", "", "\n", "")str = r.Replace(str)//匹配一个或多个空白符的正则表达式reg := regexp.MustCompile("\\s+")return reg.ReplaceAllString(str, "")
}func main() {flag.StringVar(&binlogFile, "binlogFile", "", "binlog日志文件路径")flag.StringVar(&schema, "schema", "", "要解析的数据库名称")flag.StringVar(&table, "table", "", "要解析的表名称")flag.StringVar(&sqlType, "sqlType", "", "要解析的dml类型")flag.StringVar(&logPath, "logPath", "", "输出文件路径名称")flag.Parse()if binlogFile == "" {log.Println("请输入binlog日志文件路径...")return}if schema == "" {log.Println("请输入要解析的数据库名称...")return}if table == "" {log.Println("请输入要解析的表名称...")return}if sqlType == "" {log.Println("请输入要解析的dml类型...")return}if logPath == "" {log.Println("请输入输出日志文件路径...")return}outSlowLogFile(logPath)file, err := os.Open(binlogFile)if err != nil {log.Println("读取文件失败...")return}schemaRe := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, sqlType)//schemaRe2 := fmt.Sprintf("schema:%v;table:%v;type:%v", schema, table, "Insert")defer file.Close()scanner := bufio.NewScanner(file)scanner.Buffer(make([]byte, 1024*1024), 1024*1024*10)inHeader := falsevar f filedvar array []stringvar time2, _ = time.Parse("2006-01-02 15:04:05", "2024-04-01 00:00:00")fields := ""for scanner.Scan() {line := scanner.Text()if compressStr(line) == compressStr(schemaRe) {if fields != "" {fields = fmt.Sprintf("{%v}", fields)array = append(array, fields)}fields = ""inHeader = truecontinue} else if strings.Contains(line, "sync binlog success") || strings.Contains(line, "read file end") {inHeader = falsecontinue} else {if inHeader {if filedRe.MatchString(line) {f.Name = strings.Split(line, "(")[0]tmpValue := strings.Split(line, ": ")if len(tmpValue) == 1 {inHeader = falsecontinue} else {f.Value = strings.Split(line, ": ")[1]}if fields == "" {fields = fmt.Sprintf("\"%s\":\"%s\" ", f.Name, f.Value)} else {fields += fmt.Sprintf(",\"%s\":\"%s\"", f.Name, f.Value)}} else {inHeader = falsecontinue}}}}if fields != "" {fields = fmt.Sprintf("{%v}", fields)array = append(array, fields)}if err := scanner.Err(); err != nil {log.Println(err)}for i := 0; i < len(array); i++ {//decoder := json.NewDecoder(strings.NewReader(array[i]))////for key, value := range JsonToMap(array[i]) {//	fmt.Printf("键:%v,值:%d\n", key, value)//}m := make(map[string]string)err = json.Unmarshal([]byte(array[i]), &m)if err != nil {fmt.Printf("Unmarshal with error: %+v", err)}keys := ""values := ""isExec := false//newKeys := make([]string, 0, len(m))//for k := range m {//	newKeys = append(newKeys, k)//}对切片进行排序//sort.Strings(newKeys)for key, value := range m {if keys == "" {keys = fmt.Sprintf("%v", key)if value == "<nil>" {values = fmt.Sprintf("%v", "NULL")} else {values = fmt.Sprintf("'%v'", value)}} else {keys += fmt.Sprintf(",%v", key)if value == "<nil>" {values += fmt.Sprintf(",%v", "NULL")} else {values += fmt.Sprintf(",'%v'", value)}}if key == "updated_date" {isExec = trueupdateTime, _ := time.Parse("2006-01-02 15:04:05", value)if updateTime.After(time2) {isExec = true} else {isExec = false}}}if isExec {inSql := fmt.Sprintf("insert into coupon_trade_record(%v) values(%v);", keys, values)logOutFile.Println(inSql)}}
}func outSlowLogFile(outFile string) {outFilePath, err := os.OpenFile(outFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766)if err != nil {log.Println(fmt.Sprintf("创建输出文件失败:%s", err))return}logOutFile = log.New(outFilePath, "", log.Lmsgprefix)
}

通过代码可以将DELETE sql直接转成insert 语句:

 至此完成将数据重新插入到业务库里面,即可完成恢复

这篇关于TiDB 利用binlog 恢复-反解析binlog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/948279

相关文章

SQL 外键Foreign Key全解析

《SQL外键ForeignKey全解析》外键是数据库表中的一列(或一组列),用于​​建立两个表之间的关联关系​​,外键的值必须匹配另一个表的主键(PrimaryKey)或唯一约束(UniqueCo... 目录1. 什么是外键?​​ ​​​​2. 外键的语法​​​​3. 外键的约束行为​​​​4. 多列外键​

Java进行日期解析与格式化的实现代码

《Java进行日期解析与格式化的实现代码》使用Java搭配ApacheCommonsLang3和Natty库,可以实现灵活高效的日期解析与格式化,本文将通过相关示例为大家讲讲具体的实践操作,需要的可以... 目录一、背景二、依赖介绍1. Apache Commons Lang32. Natty三、核心实现代

使用Python自动化生成PPT并结合LLM生成内容的代码解析

《使用Python自动化生成PPT并结合LLM生成内容的代码解析》PowerPoint是常用的文档工具,但手动设计和排版耗时耗力,本文将展示如何通过Python自动化提取PPT样式并生成新PPT,同时... 目录核心代码解析1. 提取 PPT 样式到 jsON关键步骤:代码片段:2. 应用 JSON 样式到

MySQL精准控制Binlog日志数量的三种方案

《MySQL精准控制Binlog日志数量的三种方案》作为数据库管理员,你是否经常为服务器磁盘爆满而抓狂?Binlog就像数据库的“黑匣子”,默默记录着每一次数据变动,但若放任不管,几天内这些日志文件就... 目录 一招修改配置文件:永久生效的控制术1.定位my.cnf文件2.添加核心参数不重启热更新:高手应

Maven 插件配置分层架构深度解析

《Maven插件配置分层架构深度解析》:本文主要介绍Maven插件配置分层架构深度解析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录Maven 插件配置分层架构深度解析引言:当构建逻辑遇上复杂配置第一章 Maven插件配置的三重境界1.1 插件配置的拓扑

全解析CSS Grid 的 auto-fill 和 auto-fit 内容自适应

《全解析CSSGrid的auto-fill和auto-fit内容自适应》:本文主要介绍了全解析CSSGrid的auto-fill和auto-fit内容自适应的相关资料,详细内容请阅读本文,希望能对你有所帮助... css  Grid 的 auto-fill 和 auto-fit/* 父元素 */.gri

Maven 依赖发布与仓库治理的过程解析

《Maven依赖发布与仓库治理的过程解析》:本文主要介绍Maven依赖发布与仓库治理的过程解析,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录Maven 依赖发布与仓库治理引言第一章:distributionManagement配置的工程化实践1

MySQL复合查询从基础到多表关联与高级技巧全解析

《MySQL复合查询从基础到多表关联与高级技巧全解析》本文主要讲解了在MySQL中的复合查询,下面是关于本文章所需要数据的建表语句,感兴趣的朋友跟随小编一起看看吧... 目录前言:1.基本查询回顾:1.1.查询工资高于500或岗位为MANAGER的雇员,同时还要满足他们的姓名首字母为大写的J1.2.按照部门

Spring三级缓存解决循环依赖的解析过程

《Spring三级缓存解决循环依赖的解析过程》:本文主要介绍Spring三级缓存解决循环依赖的解析过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、循环依赖场景二、三级缓存定义三、解决流程(以ServiceA和ServiceB为例)四、关键机制详解五、设计约

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re