09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)

2024-01-10 15:36

本文主要是介绍09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • 通过修改保存时间来删除消息
    • ★ 删除指定主题的消息
    • 演示
      • 1、修改kafka检查过期消息的时间间隔
      • 2、修改主题下消息的过期时间
      • 3、查看修改是否生效
      • 4、先查看下主题下有没有消息
      • 5、添加几条消息看效果
      • 6、查看消息是否被删除
    • ★ 恢复主题的retention.ms配置
      • 1、先查看没修改前的test2主题的配置信息:
      • 2、 将test2主题下的消息的保存时间删除。
      • 3、再查看修改后的test2主题的配置信息:

通过修改保存时间来删除消息

★ 删除指定主题的消息

Kafka并没有提供直接删除特定主题下消息的方法,只能是强制让消息过期之后,再来删除消息。

因此需要指定如下两个配置:

  1. 控制将指定主题下消息的保存时间设为一个很短时间: retention.ms(为特定主题设置)

  2. 控制 Kafka 尽快去检查消息是否过期(默认是5分钟检查一次)。 log.retention.check.interval.ms

可通过如下两个属性来强制删除指定主题的消息

retention.ms属性:将该属性设为一个极短的时间,比如1000。

log.retention.check.interval.ms:设置Kafka轮询检查的时间间隔,比如将该该属性设为300000。

这意味着每5分钟,Kafka会对指定主题的消息执行一次检查,检查消息是否过期(超过retention.ms属性设置的时间值),如果过期则删除这些消息。

演示

官网显示:Kafka 默认是5分钟检查一次主题下是否有消息过期

在这里插入图片描述

1、修改kafka检查过期消息的时间间隔

为了方便演示,我这里改成120秒检查一次,演示完要记得注释掉这个配置,不然这么频繁检查会很卡。

配置修改后要重启下kafka。

在这里插入图片描述

2、修改主题下消息的过期时间

然后修改test2主题下的消息的保存时间,改成1秒,就是消息存在一秒后就过期。

▲ 将test2主题下的消息的保存时间设为1秒

 kafka-configs --bootstrap-server localhost:9092 ^--alter ^--entity-name test2 ^--entity-type topics ^--add-config retention.ms=1000

在这里插入图片描述

3、查看修改是否生效

修改完消息的过期时间,那么就查看下是否生效:

命令查看:
使用 kafka-configs.bat 命令的 --describe 子命令
——该命令可查看所有对象的信息

 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics

主题下消息的保存(过期)时间已经修改成1秒了
在这里插入图片描述

CMAK界面查看:

在这里插入图片描述

4、先查看下主题下有没有消息

查看端口为9092的kafka节点,主题是test2,查看内容是从beginning开始就存在的消息:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

此时这个test2主题下没有任何消息,为方便演示,先添加几条消息进去
在这里插入图片描述

5、添加几条消息看效果

添加带 key 的消息,相当于生产者发送消息到指定的主题里面去:

kafka-console-producer.bat ^
--bootstrap-server localhost:9092 ^
--topic test2 ^
--property parse.key=true

在这里插入图片描述

然后马上查看,发现消息发送成功,test2主题下有这些消息:为了方便,我命令再拷贝一份:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述

再检查一遍消息的存活时间,是存活1秒没错

 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics

在这里插入图片描述

6、查看消息是否被删除

kafka每个120就会去检查是否有过期的消息,有的话就把过期的消息删除掉,我又把消息的过期时间设置为1秒,所以看过期的消息在120后,是否会被kafka检查到并删除掉:

接下来就等120秒,发现过期消息已经被成功删除了:

查看 test2 主题下,从beginning 一开始到现在的所有消息:

 kafka-console-consumer --bootstrap-server localhost:9092 ^--topic test2 ^--from-beginning ^--property print.timestamp=true ^--property print.key=true ^--property print.offset=true ^--property print.partition=true

在这里插入图片描述

演示完,把这个设置给注释掉,恢复成默认的,默认是每5分钟检查一次。

在这里插入图片描述

★ 恢复主题的retention.ms配置

若要指定主题的retention.ms配置依然使用默认值,则只要使用kafka-configs.bat命令的 --delete-config 选项删除该配置即可。例如如下命令:

因为上面同通过 --add-config retention.ms=1000 把消息的过期时间设置为1秒,这个设置肯定是不合理的,所以需要把这个设置给删除掉。

1、先查看没修改前的test2主题的配置信息:

 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics

在这里插入图片描述

2、 将test2主题下的消息的保存时间删除。

(–delete-config retention.ms 就是删除 retention.ms 这个配置属性)
–alter ^ 表示修改的意思

kafka-configs.bat --alter ^
--bootstrap-server localhost:9092 ^
--entity-type topics ^
--entity-name test2 ^
--delete-config retention.ms

在这里插入图片描述

3、再查看修改后的test2主题的配置信息:

命令行查看:

 kafka-configs --bootstrap-server localhost:9092 ^--describe ^--entity-name test2 ^--entity-type topics

发现 retention.ms 这个配置属性 已经被成功删除了,表示消息的过期时间恢复成默认的168小时了。
在这里插入图片描述

这个就是默认的消息过期时间:

在这里插入图片描述

CMAK 查看:
retention.ms 这个配置属性恢复成默认的了

在这里插入图片描述

这篇关于09、Kafka ------ 通过修改保存时间来删除消息(retention.ms 配置)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/591267

相关文章

mysql5.7.15winx64配置全过程

《mysql5.7.15winx64配置全过程》文章详细介绍了MySQL5.7.15免安装版的配置步骤,包括解压安装包、设置环境变量、修改配置文件、初始化数据目录、安装服务、启动数据库、登录及密码修改... 目录前言一、首先下载安装包二、安android装步骤1.第一步解压文件2.配置环境变量3.复制my-

MySQL 数据库表操作完全指南:创建、读取、更新与删除实战

《MySQL数据库表操作完全指南:创建、读取、更新与删除实战》本文系统讲解MySQL表的增删查改(CURD)操作,涵盖创建、更新、查询、删除及插入查询结果,也是贯穿各类项目开发全流程的基础数据交互原... 目录mysql系列前言一、Create(创建)并插入数据1.1 单行数据 + 全列插入1.2 多行数据

Jenkins的安装与简单配置过程

《Jenkins的安装与简单配置过程》本文简述Jenkins在CentOS7.3上安装流程,包括Java环境配置、RPM包安装、修改JENKINS_HOME路径及权限、启动服务、插件安装与系统管理设置... 目录www.chinasem.cnJenkins安装访问并配置JenkinsJenkins配置邮件通知

Conda国内镜像源及配置过程

《Conda国内镜像源及配置过程》文章介绍Conda镜像源使用方法,涵盖临时指定单个/多个源、永久配置及恢复默认设置,同时说明main(官方稳定)、free(逐渐弃用)、conda-forge(社区更... 目录一、Conda国内镜像源二、Conda临时使用镜像源指定单个源临时指定多个源创建环境时临时指定源

mybatisplus的逻辑删除过程

《mybatisplus的逻辑删除过程》:本文主要介绍mybatisplus的逻辑删除过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录myBATisplus的逻辑删除1、在配置文件中添加逻辑删除的字段2、在实体类上加上@TableLogic3、业务层正常删除即

MySQL配置多主复制的实现步骤

《MySQL配置多主复制的实现步骤》多主复制是一种允许多个MySQL服务器同时接受写操作的复制方式,本文就来介绍一下MySQL配置多主复制的实现步骤,具有一定的参考价值,感兴趣的可以了解一下... 目录1. 环境准备2. 配置每台服务器2.1 修改每台服务器的配置文件3. 安装和配置插件4. 启动组复制4.

MybatisPlus中removeById删除数据库未变解决方案

《MybatisPlus中removeById删除数据库未变解决方案》MyBatisPlus中,removeById需实体类标注@TableId注解以识别数据库主键,若字段名不一致,应通过value属... 目录MyBATisPlus中removeBypythonId删除数据库未变removeById(Se

通过配置nginx访问服务器静态资源的过程

《通过配置nginx访问服务器静态资源的过程》文章介绍了图片存储路径设置、Nginx服务器配置及通过http://192.168.206.170:8007/a.png访问图片的方法,涵盖图片管理与服务... 目录1.图片存储路径2.nginx配置3.访问图片方式总结1.图片存储路径2.nginx配置

nginx配置错误日志的实现步骤

《nginx配置错误日志的实现步骤》配置nginx代理过程中,如果出现错误,需要看日志,可以把nginx日志配置出来,以便快速定位日志问题,下面就来介绍一下nginx配置错误日志的实现步骤,感兴趣的可... 目录前言nginx配置错误日志总结前言在配置nginx代理过程中,如果出现错误,需要看日志,可以把

MySQL中DATE_FORMAT时间函数的使用小结

《MySQL中DATE_FORMAT时间函数的使用小结》本文主要介绍了MySQL中DATE_FORMAT时间函数的使用小结,用于格式化日期/时间字段,可提取年月、统计月份数据、精确到天,对大家的学习或... 目录前言DATE_FORMAT时间函数总结前言mysql可以使用DATE_FORMAT获取日期字段