第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】

2024-05-15 08:04

本文主要是介绍第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】

  • 前言
  • 推荐
  • 项目总结
    • 第5章 Kafka,构建TB级异步消息系统
      • 1.阻塞队列
      • 2. Kafka入门
      • 3.Spring整合Kafka
      • 4.发送系统通知
      • 5.显示系统通知
  • 最后

前言

2023-4-30 20:42:51

以下内容源自【Java面试项目】
仅供学习交流使用

推荐

仿牛客网项目【面试】

项目总结

第5章 Kafka,构建TB级异步消息系统

1.阻塞队列

2. Kafka入门

3.Spring整合Kafka

4.发送系统通知

package com.jsss.community.event;import com.alibaba.fastjson.JSONObject;
import com.jsss.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;// 处理事件public void fireEvent(Event event) {// 将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}}
package com.jsss.community.event;import com.alibaba.fastjson.JSONObject;
import com.jsss.community.entity.DiscussPost;
import com.jsss.community.entity.Event;
import com.jsss.community.entity.Message;
import com.jsss.community.service.DiscussPostService;import com.jsss.community.service.ElasticsearchService;
import com.jsss.community.service.MessageService;
import com.jsss.community.util.CommunityConstant;
import com.jsss.community.util.CommunityUtil;import com.qiniu.common.QiniuException;
import com.qiniu.common.Zone;
import com.qiniu.http.Response;
import com.qiniu.storage.Configuration;
import com.qiniu.storage.UploadManager;
import com.qiniu.util.Auth;
import com.qiniu.util.StringMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaListeners;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;@Component
public class EventConsumer implements CommunityConstant {private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;@Value("${wk.image.command}")private String wkImageCommand;@Value("${wk.image.storage}")private String wkImageStorage;//七牛云@Value("${qiniu.key.access}")private String accessKey;@Value("${qiniu.key.secret}")private String secretKey;@Value("${qiniu.bucket.share.name}")private String shareBucketName;//定时任务@Autowiredprivate ThreadPoolTaskScheduler taskScheduler;//消费通知事件@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}// 发送站内通知Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());//复用message conversation_id 做 topicmessage.setConversationId(event.getTopic());message.setCreateTime(new Date());//拼接语句  用户nowcoder 评论了你的 帖子 , 点击查看 !Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());//附加信息 一律存入contentif (!event.getData().isEmpty()) {for (Map.Entry<String, Object> entry : event.getData().entrySet()) {content.put(entry.getKey(), entry.getValue());}}message.setContent(JSONObject.toJSONString(content));//复用message conversation_id 做 topic//92,1,111,like,"{""entityType"":1,""entityId"":237,""postId"":237,""userId"":112}",1,2019-04-13 22:20:58messageService.addMessage(message);}// 消费发帖事件@KafkaListener(topics = {TOPIC_PUBLISH})public void handlePublishMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}DiscussPost post = discussPostService.findDiscussPostById(event.getEntityId());elasticsearchService.saveDiscussPost(post);}// 消费删帖事件@KafkaListener(topics = {TOPIC_DELETE})public void handleDeleteMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}elasticsearchService.deleteDiscussPost(event.getEntityId());}// 消费分享事件@KafkaListener(topics = TOPIC_SHARE)public void handleShareMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}String htmlUrl = (String) event.getData().get("htmlUrl");String fileName = (String) event.getData().get("fileName");String suffix = (String) event.getData().get("suffix");String cmd = wkImageCommand + " --quality 75 "+ htmlUrl + " " + wkImageStorage + "/" + fileName + suffix;try {Runtime.getRuntime().exec(cmd);logger.info("生成长图成功: " + cmd);} catch (IOException e) {logger.error("生成长图失败: " + e.getMessage());}// 启用定时器,监视该图片,一旦生成了,则上传至七牛云.UploadTask task = new UploadTask(fileName, suffix);Future future = taskScheduler.scheduleAtFixedRate(task, 500);task.setFuture(future);}class UploadTask implements Runnable {// 文件名称private String fileName;// 文件后缀private String suffix;// 启动任务的返回值private Future future;// 开始时间private long startTime;// 上传次数private int uploadTimes;public UploadTask(String fileName, String suffix) {this.fileName = fileName;this.suffix = suffix;this.startTime = System.currentTimeMillis();}public void setFuture(Future future) {this.future = future;}@Overridepublic void run() {// 生成失败if (System.currentTimeMillis() - startTime > 30000) {logger.error("执行时间过长,终止任务:" + fileName);future.cancel(true);return;}// 上传失败if (uploadTimes >= 3) {logger.error("上传次数过多,终止任务:" + fileName);future.cancel(true);return;}String path = wkImageStorage + "/" + fileName + suffix;File file = new File(path);if (file.exists()) {logger.info(String.format("开始第%d次上传[%s].", ++uploadTimes, fileName));// 设置响应信息StringMap policy = new StringMap();policy.put("returnBody", CommunityUtil.getJSONString(0));// 生成上传凭证Auth auth = Auth.create(accessKey, secretKey);String uploadToken = auth.uploadToken(shareBucketName, fileName, 3600, policy);// 指定上传机房UploadManager manager = new UploadManager(new Configuration(Zone.zone1()));try {// 开始上传图片Response response = manager.put(path, fileName, uploadToken, null, "image/" + suffix, false);// 处理响应结果JSONObject json = JSONObject.parseObject(response.bodyString());if (json == null || json.get("code") == null || !json.get("code").toString().equals("0")) {logger.info(String.format("第%d次上传失败[%s].", uploadTimes, fileName));} else {logger.info(String.format("第%d次上传成功[%s].", uploadTimes, fileName));future.cancel(true);}} catch (QiniuException e) {logger.info(String.format("第%d次上传失败[%s].", uploadTimes, fileName));}} else {logger.info("等待图片生成[" + fileName + "].");}}}}

5.显示系统通知

   // 得到通知列表@RequestMapping(path = "/notice/list", method = RequestMethod.GET)public String getNoticeList(Model model) {User user = hostHolder.getUser();// 查询评论类通知Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);//还原commentString content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);messageVO.put("unread", unread);model.addAttribute("commentNotice", messageVO);}// 查询点赞类通知message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));messageVO.put("postId", data.get("postId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);messageVO.put("unread", unread);model.addAttribute("likeNotice", messageVO);}// 查询关注类通知message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);if (message != null) {Map<String, Object> messageVO = new HashMap<>();messageVO.put("message", message);String content = HtmlUtils.htmlUnescape(message.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);messageVO.put("user", userService.findUserById((Integer) data.get("userId")));messageVO.put("entityType", data.get("entityType"));messageVO.put("entityId", data.get("entityId"));int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);messageVO.put("count", count);int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);messageVO.put("unread", unread);model.addAttribute("followNotice", messageVO);}// 查询未读消息数量int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);model.addAttribute("letterUnreadCount", letterUnreadCount);int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);model.addAttribute("noticeUnreadCount", noticeUnreadCount);return "site/notice";}// 得到通知详情@RequestMapping(path = "/notice/detail/{topic}", method = RequestMethod.GET)public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {User user = hostHolder.getUser();//分页信息page.setLimit(5);page.setPath("/notice/detail/" + topic);page.setRows(messageService.findNoticeCount(user.getId(), topic));//通知信息List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());List<Map<String, Object>> noticeVoList = new ArrayList<>();if (noticeList != null) {for (Message notice : noticeList) {Map<String, Object> map = new HashMap<>();// 通知map.put("notice", notice);// 内容String content = HtmlUtils.htmlUnescape(notice.getContent());Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);map.put("user", userService.findUserById((Integer) data.get("userId")));map.put("entityType", data.get("entityType"));map.put("entityId", data.get("entityId"));map.put("postId", data.get("postId"));// 通知作者map.put("fromUser", userService.findUserById(notice.getFromId()));noticeVoList.add(map);}}model.addAttribute("notices", noticeVoList);// 设置已读List<Integer> ids = getLetterIds(noticeList);if (!ids.isEmpty()) {messageService.readMessage(ids);}return "site/notice-detail";}

最后

这篇博客能写好的原因是:站在巨人的肩膀上

这篇博客要写好的目的是:做别人的肩膀

开源:为爱发电

学习:为我而行

这篇关于第5章 Kafka,构建TB级异步消息系统【仿牛客网社区论坛项目】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/991267

相关文章

Mac系统下卸载JAVA和JDK的步骤

《Mac系统下卸载JAVA和JDK的步骤》JDK是Java语言的软件开发工具包,它提供了开发和运行Java应用程序所需的工具、库和资源,:本文主要介绍Mac系统下卸载JAVA和JDK的相关资料,需... 目录1. 卸载系统自带的 Java 版本检查当前 Java 版本通过命令卸载系统 Java2. 卸载自定

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

基于Python实现一个简单的题库与在线考试系统

《基于Python实现一个简单的题库与在线考试系统》在当今信息化教育时代,在线学习与考试系统已成为教育技术领域的重要组成部分,本文就来介绍一下如何使用Python和PyQt5框架开发一个名为白泽题库系... 目录概述功能特点界面展示系统架构设计类结构图Excel题库填写格式模板题库题目填写格式表核心数据结构

基于Python构建一个高效词汇表

《基于Python构建一个高效词汇表》在自然语言处理(NLP)领域,构建高效的词汇表是文本预处理的关键步骤,本文将解析一个使用Python实现的n-gram词频统计工具,感兴趣的可以了解下... 目录一、项目背景与目标1.1 技术需求1.2 核心技术栈二、核心代码解析2.1 数据处理函数2.2 数据处理流程

Linux系统中的firewall-offline-cmd详解(收藏版)

《Linux系统中的firewall-offline-cmd详解(收藏版)》firewall-offline-cmd是firewalld的一个命令行工具,专门设计用于在没有运行firewalld服务的... 目录主要用途基本语法选项1. 状态管理2. 区域管理3. 服务管理4. 端口管理5. ICMP 阻断

MySQL版本问题导致项目无法启动问题的解决方案

《MySQL版本问题导致项目无法启动问题的解决方案》本文记录了一次因MySQL版本不一致导致项目启动失败的经历,详细解析了连接错误的原因,并提供了两种解决方案:调整连接字符串禁用SSL或统一MySQL... 目录本地项目启动报错报错原因:解决方案第一个:第二种:容器启动mysql的坑两种修改时区的方法:本地

Python FastMCP构建MCP服务端与客户端的详细步骤

《PythonFastMCP构建MCP服务端与客户端的详细步骤》MCP(Multi-ClientProtocol)是一种用于构建可扩展服务的通信协议框架,本文将使用FastMCP搭建一个支持St... 目录简介环境准备服务端实现(server.py)客户端实现(client.py)运行效果扩展方向常见问题结

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化: