Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布

本文主要是介绍Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

使Flink输出的数据在多个partition中均匀分布

FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数据)也可以使用自己定义的Partitioner(继承KafkaPartitioner),我觉得实现比较复杂.

构造FlinkKafkaProducerBase的子类的2种情况

    public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());}public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, 
Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);}

默认的FixedPartitioner

public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {private static final long serialVersionUID = 1627268846962918126L;private int targetPartition = -1;@Overridepublic void open(int parallelInstanceId, int parallelInstances, int[] partitions) {if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {throw new IllegalArgumentException();}this.targetPartition = partitions[parallelInstanceId % partitions.length];}@Overridepublic int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {if (targetPartition >= 0) {return targetPartition;} else {throw new RuntimeException("The partitioner has not been initialized properly");}}
}

在构造FlinkKafkaProducerBase的子类时,可以传递一个值为null的KafkaPartitioner,这样就可以使用Kafka Client默认的Partitioner,默认的Paritioner就是将数据均匀分配到各个partition中.

protected FlinkKafkaProducerBase<Record> createSink(String topic, KeyedSerializationSchemadeserializationSchema, Properties properties) {String classFullName = "";if (kafkaVersion.startsWith("0.8")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";} else if (kafkaVersion.startsWith("0.9")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else if (kafkaVersion.startsWith("0.10")) {classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";} else {throw new RuntimeException("not support the "+"version kafka = " + kafkaVersion);}FlinkKafkaProducerBase<Record> sink = null;try {Class clazz = Class.forName(classFullName);Constructor constructor = clazz.getConstructor(String.class, 
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);sink = (FlinkKafkaProducerBase) constructor.newInstance(topic, 
deserializationSchema, properties,(KafkaPartitioner)null);} catch (Throwable e) {e.printStackTrace();}return sink;}

Kafka Client中默认的Partitioner

public class DefaultPartitioner implements Partitioner {private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();public void configure(Map<String, ?> configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}public void close() {}
}

调用过程

在调用FlinkKafkaProducerBase中的invoke方法时,会判断partitioner是否为空,如果为空则构建一个partition属性为空的ProducerRecord对象,否则使用partitioner获得partition构造ProducerRecord对象.

    public void invoke(IN next) throws Exception {// propagate asynchronous errorscheckErroneous();byte[] serializedKey = schema.serializeKey(next);byte[] serializedValue = schema.serializeValue(next);String targetTopic = schema.getTargetTopic(next);if (targetTopic == null) {targetTopic = defaultTopicId;}ProducerRecord<byte[], byte[]> record;if (partitioner == null) {record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);} else {record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);}if (flushOnCheckpoint) {synchronized (pendingRecordsLock) {pendingRecords++;}}producer.send(record, callback);}

在调用KafkaProducer的send方法的时候,方法里面会调用partition方法决定数据放到哪个分区,如果ProducerRecord的partition属性存在并且合法,则使用该值,否则使用KafkaProducer中的partitioner进行分区

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}

KafkaProducer的partitioner是通过读取配置获取的,默认为DefaultPartitioner,可以在properties中put partitioner.class指定要使用的partitioner

this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

这篇关于Flink源码阅读:如何使用FlinkKafkaProducer将数据在Kafka的多个partition中均匀分布的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1002575

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Redis 基本数据类型和使用详解

《Redis基本数据类型和使用详解》String是Redis最基本的数据类型,一个键对应一个值,它的功能十分强大,可以存储字符串、整数、浮点数等多种数据格式,本文给大家介绍Redis基本数据类型和... 目录一、Redis 入门介绍二、Redis 的五大基本数据类型2.1 String 类型2.2 Hash

Redis中Hash从使用过程到原理说明

《Redis中Hash从使用过程到原理说明》RedisHash结构用于存储字段-值对,适合对象数据,支持HSET、HGET等命令,采用ziplist或hashtable编码,通过渐进式rehash优化... 目录一、开篇:Hash就像超市的货架二、Hash的基本使用1. 常用命令示例2. Java操作示例三

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他