本文主要是介绍kafka自定义分区器使用详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《kafka自定义分区器使用详解》本文介绍了如何根据企业需求自定义Kafka分区器,只需实现Partitioner接口并重写partition()方法,示例中,包含cuihaida的数据发送到0号分区...
kafka自定义分区器
根据企业需求,自己重新实现分区器
只需要定义类实现Partitioner接口,然后重写partition()方法即可
假设现在有一个需求
发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区
package com.example.kafkademo.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import Java.util.Map; /** * 1. 实现接口Partitioner * 2. 实现3个方法:partition,close,configure * 3. 编写partition方法,返回分区号 */ public class MyPartitioner implements Partitioner { /** * 重写这个方法 * @param topic 主题 * @param key 消息的key * @param keyBytes 消息的key序列化后的字节数组 * @param value 消息的值 * @param valueBytes 消息的值序列化后的字节数组 * @param cluster 集群元数据可以查看分区信息 * @return 信息对应的分区 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 获取消息 String msgValue = value.toString(); // 发送过来的数据中如果包含cuihaida,就发往0号分区,不包含cuihaida,就发往1号分区 return msgValue.coChina编程ntains("cuihaida") ? 0 : 1; } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
使用分区器的方法
在生产者的配置中添加分区器参数
package com.example.kafkademo.util;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class CommonUtils {
China编程 /**
* kafka生产者配置配置
* @return 配置内容
*/
public static Properties buildKafkaProperties() {
// 1. 创建kafka生产者配置对象
Properties properties = new Properties();
// 2. 给kafka的配置对象添加信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key, value初始化【必须有】
properties.pujst(ProducerConfig.KEY_SEwww.chinasem.cnRIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// =========> 添加自定义分区器 <javascript============
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafkademo.producer.MyPartitioner")
return properties;
}
}
总结
这篇关于kafka自定义分区器使用详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!