Kafka如何将消息发送到指定分区

2024-05-03 06:36

本文主要是介绍Kafka如何将消息发送到指定分区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

面试一个时,面试官问了一个问题,Kafka如何做到顺序消息。我回答只给Kafka的Topic创建一个分区,发送到该Topic的消息在Kafka中就是有序的。

面试官又问,如果Topic有多个分区呢?我回答消息发送者在发送消息的时候,指定分区进行发送,可以在发送消息时,每次指定相同的Key。但是面试官说这样做不到,我后面去查了资料,是可以做到的,我当时也没有反驳,毕竟我是一个求职者,跟面试官产生冲突也不太好。而且可能面试官也只知道其他的方式,不知道基于这种方式可以将消息发送到指定分区。

写个博客记录下。

有哪些方式可以将消息发送到指定分区?

当一个Topic中有多个分区的时候,如何将消息发送到指定分区呢?

方式一:基于key

下面的第二个参数,partitionA就是message的key。
Kafka会将具有相同的key的消息发送到同一分区,这是通过哈希函数实现的。
此外,Kafka会按照消息产生的顺序被一致性的接受,这就保证了同一分区内消息的顺序性。

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

方式二:自定义分区器

Kafka允许自定义分区器,允许用户根据Topic、message key、message val、cluster等信息,自定义将消息发送到哪个分区。

自定义分区器:

public class CustomPartitioner implements Partitioner {// PREMIUM的意思是额外加价private static final int PREMIUM_PARTITION = 0;// NORMAL的意思是正常、标准private static final int NORMAL_PARTITION = 1;@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String customerType = extractCustomerType(key.toString());// 判断提取出的单词里面是否含有premium,如果有,则将其发送到第0号分区,否则发送到第1号分区。// 美团外卖有个加钱提前送达的服务,可以采用这种方式来实现。return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;}private String extractCustomerType(String key) {String[] parts = key.split("_");return parts.length > 1 ? parts[1] : "normal";}
}

在创建KafkaTemplate时,将自定义分区器设置到KafkaTemplate的属性里面去

// 在实际的SpringBoot项目中,可以将这个KafkaTemplate注入到Spring容器中
private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);return new KafkaTemplate<>(producerFactory);
}

测试代码。
将高级客户订单和普通客户订单区分开来,进行不同的处理。

// 在实际的SpringBoot项目中,可以从Spring容器中获取这个KafkaTemplate
KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();
// 根据自定义分区器,当key为123_premium,则消息会被发送到第0号分区。
kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
// 根据自定义分区器,当key为456_normal,不含有premium,则消息会被发送到第1号分区。
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

方式三:直接指定分区序号

第二个参数0、1就是指定的分区号码,发送消息时,直接指定分区,将消息发送到指定的分区。

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

其他方式

在下面的参考文章当中,还看到了一个粘性分区器,但是没看太懂,而且不为大家所熟知,所以就没有太关注。
将数据发送到 Kafka 中的特定分区

参考

将数据发送到 Kafka 中的特定分区

这篇关于Kafka如何将消息发送到指定分区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/956054

相关文章

Java实现删除文件中的指定内容

《Java实现删除文件中的指定内容》在日常开发中,经常需要对文本文件进行批量处理,其中,删除文件中指定内容是最常见的需求之一,下面我们就来看看如何使用java实现删除文件中的指定内容吧... 目录1. 项目背景详细介绍2. 项目需求详细介绍2.1 功能需求2.2 非功能需求3. 相关技术详细介绍3.1 Ja

Python pip下载包及所有依赖到指定文件夹的步骤说明

《Pythonpip下载包及所有依赖到指定文件夹的步骤说明》为了方便开发和部署,我们常常需要将Python项目所依赖的第三方包导出到本地文件夹中,:本文主要介绍Pythonpip下载包及所有依... 目录步骤说明命令格式示例参数说明离线安装方法注意事项总结要使用pip下载包及其所有依赖到指定文件夹,请按照以

python如何生成指定文件大小

《python如何生成指定文件大小》:本文主要介绍python如何生成指定文件大小的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录python生成指定文件大小方法一(速度最快)方法二(中等速度)方法三(生成可读文本文件–较慢)方法四(使用内存映射高效生成

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

MySQL数据库实现批量表分区完整示例

《MySQL数据库实现批量表分区完整示例》通俗地讲表分区是将一大表,根据条件分割成若干个小表,:本文主要介绍MySQL数据库实现批量表分区的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录一、表分区条件二、常规表和分区表的区别三、表分区的创建四、将既有表转换分区表脚本五、批量转换表为分区

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

python如何下载网络文件到本地指定文件夹

《python如何下载网络文件到本地指定文件夹》这篇文章主要为大家详细介绍了python如何实现下载网络文件到本地指定文件夹,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下...  在python中下载文件到本地指定文件夹可以通过以下步骤实现,使用requests库处理HTTP请求,并结合o

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Python如何调用指定路径的模块

《Python如何调用指定路径的模块》要在Python中调用指定路径的模块,可以使用sys.path.append,importlib.util.spec_from_file_location和exe... 目录一、sys.path.append() 方法1. 方法简介2. 使用示例3. 注意事项二、imp