kafaka发送接收消息stream方式实例

2023-11-11 04:38

本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:
  cloud:
    stream:
      bindings:
        input-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order:
          consumer:
            headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka:
        binder:
          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect";
   String WATCH_OUTPUT ="output-watch" ;
   
   @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut();

   @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut();

}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service
@Slf4j
@EnableBinding(NotifyMessageChannel.class)
public class NotifyServiceImpl implements NotifyService {@Autowired
    private NotifyMessageChannel notifyMessageChannel;
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ;
            log.info("send result:"+result);
        } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e);
        }}
}

4.接收消息input配置

public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect";
   String ORDER_INPUT ="input-order" ;
   
   @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput();

   @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput();

}

5.监听接收到的消息,进行消费处理

@Service
@Slf4j
@EnableBinding(ReceiveMessageChannel.class)
public class CollectListener {private UserCollectCourseClient userCollectCourseService;
   
   private ObjectMapper mapper = new ObjectMapper();

   public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient;
   }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString());

      String content = message.getPayload();
      UserCollectCourseNotify uccn;
      try {uccn = mapper.readValue(content,UserCollectCourseNotify.class);
         log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId());

         if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn);
//          userCollectCourseService.saveUserCollectCourse(ucci);
//          log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId());
         }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage());
      }}
}






这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387596

相关文章

gitlab安装及邮箱配置和常用使用方式

《gitlab安装及邮箱配置和常用使用方式》:本文主要介绍gitlab安装及邮箱配置和常用使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装GitLab2.配置GitLab邮件服务3.GitLab的账号注册邮箱验证及其分组4.gitlab分支和标签的

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

Linux脚本(shell)的使用方式

《Linux脚本(shell)的使用方式》:本文主要介绍Linux脚本(shell)的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录概述语法详解数学运算表达式Shell变量变量分类环境变量Shell内部变量自定义变量:定义、赋值自定义变量:引用、修改、删

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

python判断文件是否存在常用的几种方式

《python判断文件是否存在常用的几种方式》在Python中我们在读写文件之前,首先要做的事情就是判断文件是否存在,否则很容易发生错误的情况,:本文主要介绍python判断文件是否存在常用的几种... 目录1. 使用 os.path.exists()2. 使用 os.path.isfile()3. 使用

Mybatis的分页实现方式

《Mybatis的分页实现方式》MyBatis的分页实现方式主要有以下几种,每种方式适用于不同的场景,且在性能、灵活性和代码侵入性上有所差异,对Mybatis的分页实现方式感兴趣的朋友一起看看吧... 目录​1. 原生 SQL 分页(物理分页)​​2. RowBounds 分页(逻辑分页)​​3. Page

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Python使用smtplib库开发一个邮件自动发送工具

《Python使用smtplib库开发一个邮件自动发送工具》在现代软件开发中,自动化邮件发送是一个非常实用的功能,无论是系统通知、营销邮件、还是日常工作报告,Python的smtplib库都能帮助我们... 目录代码实现与知识点解析1. 导入必要的库2. 配置邮件服务器参数3. 创建邮件发送类4. 实现邮件

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析

《Spring组件实例化扩展点之InstantiationAwareBeanPostProcessor使用场景解析》InstantiationAwareBeanPostProcessor是Spring... 目录一、什么是InstantiationAwareBeanPostProcessor?二、核心方法解