ES 数据写入方式:直连 VS Flink 集成系统

2024-05-16 08:52

本文主要是介绍ES 数据写入方式:直连 VS Flink 集成系统,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ES 作为一个分布式搜索引擎,从扩展能力和搜索特性上而言无出其右,然而它有自身的弱势存在,其作为近实时存储系统,由于其分片和复制的设计原理,也使其在数据延迟和一致性方面都是无法和 OLTP(Online Transaction Processing)系统相媲美的。

也正因如此,通常它的数据都来源于其他存储系统同步而来,做二次过滤和分析的。这就引入了一个关键节点,即 ES 数据的同步写入方式,本文介绍的则是 MySQL 同步 ES 方式。

将 MySQL 数据写入 ES,首先想到的一定是消费 Binlog 直连 ES 写入,这种方式简单明了,然而如果稍微考量维度多一点,就会发现该方式的一些弊端。因此还有另外一个方式,即【RocketMQ + Flink Consumer + ES Bulk】集成生态,我们将从同步延迟、消费特性,ES 写入性能、系统容灾能力四个方面评估这两种接入方式,希望给到大家灵感并选择适合业务的同步方式。

ES 基础写入原理

ES 写入属于追加式写入,先形成特定大小的 Segment,然后定时 Merge 小数据段为大数据段以减少内存碎片,提升查询效率的过程。一个 Index 由 N 个 Shard 及其副本构成,存储了同一种 Type 类型的 Documents,由 Mapping 定义了其索引方式,每一个 Shard 由 N 个 Segment 组成,每个 Shard 都是一个全功能且完整的 Lucene 索引,它是 ES 的最小处理单元;Segment 是 ES 最小的数据处理单位,每个 Segment 都是一个独立的倒排索引。

ES 写入其实是不断将数据写入到同一个 Segment(内存),然后触发 Refresh 刷新,将 Segment 刷新到 OS Cache(默认 1s),此时数据就可以查询到了,OS Cache 会由操作系统触发 Flush 操作持久化到磁盘。

引发思考:ES 是如何保证数据不丢失的呢?追加式写入的优劣点是什么?追加式写入是如何处理数据更新问题的?MySQL 是属于哪种写入方式呢?本文重点不在此处,大家可以另行查阅文章。

ES 基本概念

ES 写入过程

ES 直连写入

采用 ES 直连写入的优点是因为路径短,依赖组件少,加上 Dsyncer(异构存储转换系统)通常已经提供了完善的限流重试机制,所以消费延迟和消费的数据完整性都是可以保证的。

缺点:

  1. 不易于接入多机房容灾部署,目前 ES 容灾机房都属于独立部署,独立读写模式,所以如果采用该方式,则难以同时对多机房写入分别做管控,达不到容灾效果。Binlog-->Dsyncer 通常一个 MySQL Table 对应一个转换任务,如果为了写多机房起多个重复的转换任务,则显得有些愚笨。

  2. 如果自身业务场景有对同一条记录并发写场景,但写不一定全部来源于 Binlog 的情况下,那全局考虑直写 ES 则更容易遇到写入冲突问题,因为缺乏有序队列的保障。

通过 Flink 搭建 ES 集成系统

Flink 搭建 ES 集成系统,则指的是所有的 ES 写入都由 Flink 任务完成,Flink 监听 RocketMQ 实时数据流,既保证了数据的分区有序性,又充分利用了 ES 的批量写入能力,ES 的批量写入能力比单条写入性能高出多倍。同时由于 Flink 本身的容错性,即使在异常场景下,也能保证数据的最终一致性。

优点

  1. 通过 MQ 可以更快捷的接入多机房 ES 集群,写入解耦,三机房分别起消费者写入数据,彼此独立,当出现单机房故障时,只要有可用机房,直接处理读流量切流即可,容灾方案简单清晰

  2. 网络抖动等问题会导致 ES 暂时性写入失败时,不影响其他集群写入的情况下,RocketMQ 会暂存消息,Flink 会保存消费快照,不断重试直至成功,更好的保障了数据最终一致性

  3. 多数据源写入能保证全局分区一致性。

缺点

  1. 依赖了更多组件,会增加全链路数据同步延迟,而 ES 默认的 Refresh 频率是每秒一次,经测试该链路正常情况下数据延迟都是秒级的,不是完全不可接受;

  2. 依赖了更多组件,对基础组件的稳定性有更高的要求,RocketMQ 异常,或者 Flink 任务异常都会导致同步链路出现问题,增加一定的业务异常风险。

在这里需要注意的一个问题是有人可能会考虑接入多机房 ES 集群,是怎么保证多机房同时成功的、以及怎么保证写入成功后就可以查询得到?目前这两点暂时无法做到,因为多个机房都是独立写入的,互不影响,且 ES 集群属于弱数据一致性集群,无法保证写入成功立刻就能查到。

搭建并运行一个 ES Flink 消费程序的必备条件

  • Flink 运行环境:首先需要有 Flink 任务的运行环境,通常企业级的 Flink 任务会作为一个 YARN 作业在分布式系统中被调度并分配资源执行,但同时 Flink 也可作为单机进程,亦或搭建一个独立集群运行。

  • ES 消息格式:需要约定一种 ES 消息传输格式和序列化方式,一套范式解决所有同步场景,目前流行的序列化方式是 pb 格式或 json 格式,目前我们都是推荐使用 pb 格式的,数据格式 Schema 定义:

字段名

值类型

必需/可选

描述

_index

string

必需

文档要写入索引的名称或别名

_type

string

必需/可选

文档的类型

_op_type

string

必需

文档写入操作类型,取值范围: index, create, update, upsert, delete

_id

string

可选

文档 ID,不指定时写入 ES 会自动生成,但同一条数据被重复消费写入 ES 会生成多个文档

_routing

string

可选

文档路由,不指定时默认使用 _id 字段值路由

_version

int64

可选

文档版本,指定时大于 0 且仅操作为 index/delete 有效,默认使用 external_gte 版本类型

_source

object

必需/可选

文档内容,操作类型为 delete 时可不指定

_script

object

可选

文档脚本,操作类型为 update/upsert 时有效,但和 _source 不能同时存在

syntax = "proto3";message ESIndexInfo {string Name = 1;  // 文档要写入索引的名称或别名
}enum ESOPType { // 文档写入操作类型DELETE = 0; // 删除文档INDEX = 1;  // 创建新文档或更新老文档,只能全量更新 (替换老文档)UPDATE = 2; // 更新老文档,支持部分更新 (合并老文档)UPSERT = 3; // 创建新文档或更新老文档,支持部分更新 (合并老文档)CREATE = 4; // 创建新文档,存在时报错丢弃
}message ESDocAction {ESIndexInfo IndexInfo = 1; // 索引信息 (必需)ESOPType OPType = 2;       // 操作类型 (必需)string ID = 3;             // 文档 ID (可选)string Doc = 4;            // 文档内容 (JSON 格式, 删除操作时不需要)int64 Version = 5;         // 文档版本 (可选, 大于 0 且操作为 index/create/delete 有效)string Routing = 6;        // 文档路由 (可选, 非空有效)string Script = 7;         // 文档脚本 (JSON 格式, 操作类型为 update/upsert 有效,但和 Doc 不能同时存在)
}
  • Flink 任务必要配置:监听的 RocketMQ Topic 信息,写 ES 集群信息;

  • Flink 执行函数:Flink 处理流式消息有流式 SQL 和自定义应用程序两种方式,流式 SQL 约束于本身的一些限制,比如不支持同一个 MQ 有多个索引消息,而自定义编程更加灵活,比如添加各种打点,日志,错误码处理等,推荐该方式;

  • Flink 资源配置:JobManager 资源配置,TaskManager 资源配置等等;

  • Flink 自定义参数配置:可以自定义一些与应用程序紧密相关的动态配置,方便动态调节 Flink 消费能力,比如:

参数名

用途

默认值

job.writer.connector.bulk-flush.max-actions

单次 bulk 最大文档数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 300

job.writer.connector.bulk-flush.max-size

单次 bulk 最大字节数,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 10MB

job.writer.connector.bulk-flush.interval

两次 bulk 最大间隔,超过进行一次 flush (即执行一次 es 的 bulk 请求)

默认 1000ms

job.writer.connector.global-rate-limit

全局写入限速值

默认 -1,不限速

job.writer.connector.failure-handler

指定自定义失败处理器,比如处理4xx错误,5xx错误的方式不同,429总是无限重试等;

global_parallelism_num

flink 任务全局并发度

rmq 是 queue/4,bmq/kafka 是 partition/3

max_parallelism_num

flink 任务最大并发度

mq 的 queue/partition 的个数

checkpoint_interval

创建 Checkpoint 的间隔,单位 ms (5min=300000)

默认 15min

checkpoint_timeout

创建 Checkpoint 的超时时间,单位 ms (5min=300000)

默认 10min

rebalance_enable

开启乱序消费

默认 false

对比建议

写入方式

同步延迟

写入特性

ES写入性能

消费者

容灾能力

直连

依赖组件少,延迟低

Binlog 单 key 有序

bulk写入

FaaS

较差

RocketMQ+Flink+ES

依赖组件多,延迟较高/秒级

全局单 key 有序

bulk写入

Flink

经过以上介绍如果业务在都可接受秒级延迟的条件下,使用 RocketMQ+Flink 的方式能够更好的实现有序性和容灾能力,Flink 在流式任务处理能力上也远优 FaaS,但是直连方式明显链路更加简洁,架构更加轻量,系统集成和维护成本较低,所以还是需要依照业务特性选择最适合的才是最好的。

这篇关于ES 数据写入方式:直连 VS Flink 集成系统的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/994447

相关文章

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

Vue3视频播放组件 vue3-video-play使用方式

《Vue3视频播放组件vue3-video-play使用方式》vue3-video-play是Vue3的视频播放组件,基于原生video标签开发,支持MP4和HLS流,提供全局/局部引入方式,可监听... 目录一、安装二、全局引入三、局部引入四、基本使用五、事件监听六、播放 HLS 流七、更多功能总结在 v

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

k8s admin用户生成token方式

《k8sadmin用户生成token方式》用户使用Kubernetes1.28创建admin命名空间并部署,通过ClusterRoleBinding为jenkins用户授权集群级权限,生成并获取其t... 目录k8s admin用户生成token创建一个admin的命名空间查看k8s namespace 的

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

C++中处理文本数据char与string的终极对比指南

《C++中处理文本数据char与string的终极对比指南》在C++编程中char和string是两种用于处理字符数据的类型,但它们在使用方式和功能上有显著的不同,:本文主要介绍C++中处理文本数... 目录1. 基本定义与本质2. 内存管理3. 操作与功能4. 性能特点5. 使用场景6. 相互转换核心区别