kafaka发送接收消息stream方式实例

2023-11-11 04:38

本文主要是介绍kafaka发送接收消息stream方式实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:
  cloud:
    stream:
      bindings:
        input-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYgroup: account-devinput-order:
          consumer:
            headerMode: rawcontentType: text/plain;charset=UTF-8destination: ACCOUNT_ORDER_NOTIFYgroup: account-devoutput-collect:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_COLLECT_NOTIFYoutput-watch:
          contentType: text/plain;charset=UTF-8destination: ACCOUNT_WATCH_NOTIFYkafka:
        binder:
          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {String COLLECT_OUTPUT = "output-collect";
   String WATCH_OUTPUT ="output-watch" ;
   
   @Output(NotifyMessageChannel.COLLECT_OUTPUT)MessageChannel collectOutPut();

   @Output(NotifyMessageChannel.WATCH_OUTPUT)MessageChannel watchOutPut();

}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service
@Slf4j
@EnableBinding(NotifyMessageChannel.class)
public class NotifyServiceImpl implements NotifyService {@Autowired
    private NotifyMessageChannel notifyMessageChannel;
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {try {Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(mapper.writeValueAsString(userCollectCourseNotify)).build()) ;
            log.info("send result:"+result);
        } catch (Exception e) {log.error("Exception from create user UserCollectCourse.", e);
        }}
}

4.接收消息input配置

public interface ReceiveMessageChannel {String COLLECT_INPUT = "input-collect";
   String ORDER_INPUT ="input-order" ;
   
   @Input(ReceiveMessageChannel.COLLECT_INPUT)SubscribableChannel collectInput();

   @Input(ReceiveMessageChannel.ORDER_INPUT)SubscribableChannel orderInput();

}

5.监听接收到的消息,进行消费处理

@Service
@Slf4j
@EnableBinding(ReceiveMessageChannel.class)
public class CollectListener {private UserCollectCourseClient userCollectCourseService;
   
   private ObjectMapper mapper = new ObjectMapper();

   public CollectListener(UserCollectCourseClient userCollectCourseClient) {this.userCollectCourseService = userCollectCourseClient;
   }@StreamListener(ReceiveMessageChannel.COLLECT_INPUT)public void process(Message<String> message) {log.debug("Received Notify:[{}]",message.toString());

      String content = message.getPayload();
      UserCollectCourseNotify uccn;
      try {uccn = mapper.readValue(content,UserCollectCourseNotify.class);
         log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId());

         if(uccn!=null){log.info("receive UserCollectCourseNotify:"+uccn);
//          userCollectCourseService.saveUserCollectCourse(ucci);
//          log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId());
         }} catch (Exception e) {log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage());
      }}
}






这篇关于kafaka发送接收消息stream方式实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387596

相关文章

Django中的函数视图和类视图以及路由的定义方式

《Django中的函数视图和类视图以及路由的定义方式》Django视图分函数视图和类视图,前者用函数处理请求,后者继承View类定义方法,路由使用path()、re_path()或url(),通过in... 目录函数视图类视图路由总路由函数视图的路由类视图定义路由总结Django允许接收的请求方法http

Python yield与yield from的简单使用方式

《Pythonyield与yieldfrom的简单使用方式》生成器通过yield定义,可在处理I/O时暂停执行并返回部分结果,待其他任务完成后继续,yieldfrom用于将一个生成器的值传递给另一... 目录python yield与yield from的使用代码结构总结Python yield与yield

pandas数据的合并concat()和merge()方式

《pandas数据的合并concat()和merge()方式》Pandas中concat沿轴合并数据框(行或列),merge基于键连接(内/外/左/右),concat用于纵向或横向拼接,merge用于... 目录concat() 轴向连接合并(1) join='outer',axis=0(2)join='o

shell脚本批量导出redis key-value方式

《shell脚本批量导出rediskey-value方式》为避免keys全量扫描导致Redis卡顿,可先通过dump.rdb备份文件在本地恢复,再使用scan命令渐进导出key-value,通过CN... 目录1 背景2 详细步骤2.1 本地docker启动Redis2.2 shell批量导出脚本3 附录总

Django开发时如何避免频繁发送短信验证码(python图文代码)

《Django开发时如何避免频繁发送短信验证码(python图文代码)》Django开发时,为防止频繁发送验证码,后端需用Redis限制请求频率,结合管道技术提升效率,通过生产者消费者模式解耦业务逻辑... 目录避免频繁发送 验证码1. www.chinasem.cn避免频繁发送 验证码逻辑分析2. 避免频繁

Oracle查询表结构建表语句索引等方式

《Oracle查询表结构建表语句索引等方式》使用USER_TAB_COLUMNS查询表结构可避免系统隐藏字段(如LISTUSER的CLOB与VARCHAR2同名字段),这些字段可能为dbms_lob.... 目录oracle查询表结构建表语句索引1.用“USER_TAB_COLUMNS”查询表结构2.用“a

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

Oracle数据库定时备份脚本方式(Linux)

《Oracle数据库定时备份脚本方式(Linux)》文章介绍Oracle数据库自动备份方案,包含主机备份传输与备机解压导入流程,强调需提前全量删除原库数据避免报错,并需配置无密传输、定时任务及验证脚本... 目录说明主机脚本备机上自动导库脚本整个自动备份oracle数据库的过程(建议全程用root用户)总结

Java Stream流之GroupBy的用法及应用场景

《JavaStream流之GroupBy的用法及应用场景》本教程将详细介绍如何在Java中使用Stream流的groupby方法,包括基本用法和一些常见的实际应用场景,感兴趣的朋友一起看看吧... 目录Java Stream流之GroupBy的用法1. 前言2. 基础概念什么是 GroupBy?Stream

python运用requests模拟浏览器发送请求过程

《python运用requests模拟浏览器发送请求过程》模拟浏览器请求可选用requests处理静态内容,selenium应对动态页面,playwright支持高级自动化,设置代理和超时参数,根据需... 目录使用requests库模拟浏览器请求使用selenium自动化浏览器操作使用playwright