kafaka发送接收消息stream方式实例

2023-11-11 04:38

本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:
  cloud:
    stream:
      bindings:
        input-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order:
          consumer:
            headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka:
        binder:
          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect";
   String WATCH_OUTPUT ="output-watch" ;
   
   @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut();

   @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut();

}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service
@Slf4j
@EnableBinding(NotifyMessageChannel.class)
public class NotifyServiceImpl implements NotifyService {@Autowired
    private NotifyMessageChannel notifyMessageChannel;
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ;
            log.info("send result:"+result);
        } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e);
        }}
}

4.接收消息input配置

public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect";
   String ORDER_INPUT ="input-order" ;
   
   @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput();

   @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput();

}

5.监听接收到的消息,进行消费处理

@Service
@Slf4j
@EnableBinding(ReceiveMessageChannel.class)
public class CollectListener {private UserCollectCourseClient userCollectCourseService;
   
   private ObjectMapper mapper = new ObjectMapper();

   public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient;
   }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString());

      String content = message.getPayload();
      UserCollectCourseNotify uccn;
      try {uccn = mapper.readValue(content,UserCollectCourseNotify.class);
         log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId());

         if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn);
//          userCollectCourseService.saveUserCollectCourse(ucci);
//          log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId());
         }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage());
      }}
}






这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387596

相关文章

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

Vue3视频播放组件 vue3-video-play使用方式

《Vue3视频播放组件vue3-video-play使用方式》vue3-video-play是Vue3的视频播放组件,基于原生video标签开发,支持MP4和HLS流,提供全局/局部引入方式,可监听... 目录一、安装二、全局引入三、局部引入四、基本使用五、事件监听六、播放 HLS 流七、更多功能总结在 v

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

k8s admin用户生成token方式

《k8sadmin用户生成token方式》用户使用Kubernetes1.28创建admin命名空间并部署,通过ClusterRoleBinding为jenkins用户授权集群级权限,生成并获取其t... 目录k8s admin用户生成token创建一个admin的命名空间查看k8s namespace 的

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

PyQt6 键盘事件处理的实现及实例代码

《PyQt6键盘事件处理的实现及实例代码》本文主要介绍了PyQt6键盘事件处理的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起... 目录一、键盘事件处理详解1、核心事件处理器2、事件对象 QKeyEvent3、修饰键处理(1)、修饰键类

Pandas处理缺失数据的方式汇总

《Pandas处理缺失数据的方式汇总》许多教程中的数据与现实世界中的数据有很大不同,现实世界中的数据很少是干净且同质的,本文我们将讨论处理缺失数据的一些常规注意事项,了解Pandas如何表示缺失数据,... 目录缺失数据约定的权衡Pandas 中的缺失数据None 作为哨兵值NaN:缺失的数值数据Panda

java读取excel文件为base64实现方式

《java读取excel文件为base64实现方式》文章介绍使用ApachePOI和EasyExcel处理Excel文件并转换为Base64的方法,强调EasyExcel适合大文件且内存占用低,需注意... 目录使用 Apache POI 读取 Excel 并转换为 Base64使用 EasyExcel 处