Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

2023-12-25 17:58

本文主要是介绍Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》


  1. 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    • 创建 rabbiqmq.yaml

      apiVersion: dapr.io/v1alpha1
      kind: Component
      metadata:
      name: messagebus
      spec:
      type: pubsub.rabbitmq
      metadata:
      - name: hostvalue: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"
      - name: consumerIDvalue: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"
      - name: durablevalue: "true" # Optional. Default: "false"
      - name: deletedWhenUnusedvalue: "false" # Optional. Default: "false"
      - name: autoAckvalue: "false" # Optional. Default: "false"
      - name: deliveryModevalue: "2" # Optional. Default: "0". Values between 0 - 2.
      - name: requeueInFailurevalue: "true" # Optional. Default: "false".
  2. 改造 StorageService.Api

    目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代码为

      public static IHostBuilder CreateHostBuilder(string[] args)
      {return Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.ConfigureKestrel(options =>{options.Listen(IPAddress.Loopback, 5003, listenOptions =>{listenOptions.Protocols = HttpProtocols.Http2;});});webBuilder.UseStartup<Startup>();});
      }
    • 添加 DaprClientService

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}
      }

      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

    • 修改 Startup.cs

       /// <summary>
      /// This method gets called by the runtime. Use this method to add services to the container.
      /// </summary>
      /// <param name="services">Services.</param>
      public void ConfigureServices(IServiceCollection services)
      {services.AddGrpc();services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });
      }
      /// <summary>
      /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
      /// </summary>
      /// <param name="app">app.</param>
      /// <param name="env">env.</param>
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
      {if (env.IsDevelopment()){app.UseDeveloperExceptionPage();}app.UseRouting();app.UseEndpoints(endpoints =>{endpoints.MapSubscribeHandler();endpoints.MapGrpcService<DaprClientService>();});
      }
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件

    • 启动 StorageService 服务

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
  3. 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为
    • 下单
    • 查看订单详情
    • 获取订单列表

    在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

    • 创建 CreateOrder.proto 文件

      syntax = "proto3";package daprexamples;option java_outer_classname = "CreateOrderProtos";
      option java_package = "generate.protos";service OrderService {rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);
      }message CreateOrderRequest {string ProductID = 1; //Product IDint32 Amount=2; //Product Amountstring CustomerID=3; //Customer ID
      }message CreateOrderResponse {bool Succeed = 1; //Create Order Result,true:success,false:fail
      }message RetrieveOrderRequest{string OrderID=1;
      }message RetrieveOrderResponse{Order Order=1;
      }message GetOrderListRequest{string CustomerID=1;
      }message GetOrderListResponse{repeated Order Orders=1;
      }message Order{string ID=1;string ProductID=2;int32 Amount=3;string CustomerID=4;
      }
    • 使用 protoc 生成 Java 代码

      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
    • 引用 MyBatis 做为 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 服务

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
  4. 创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件

    • 引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

      如未安装 protoc-gen-gogo ,通过一下命令获取并安装

      go get github.com/gogo/protobuf/gogoproto

      安装 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto

      根据 proto 文件生成代码

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 客户端代码,创建订单

      ...response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{Id:     "OrderService",Data:   createOrderRequestData,Method: "createOrder",})if err != nil {fmt.Println(err)return}...
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构

      syntax = "proto3";package daprexamples;option java_outer_classname = "DataToPublishProtos";
      option java_package = "generate.protos";message StorageReduceData {string ProductID = 1;int32 Amount=2;
      }
    • 生成 DataToPublish 代码

       protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列

      ...createOrderResponse := &daprexamples.CreateOrderResponse{}if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {fmt.Println(err)return
      }
      fmt.Println(createOrderResponse.Succeed)if !createOrderResponse.Succeed {//下单失败return
      }storageReduceData := &daprexamples.StorageReduceData{ProductID: createOrderRequest.ProductID,Amount:    createOrderRequest.Amount,
      }
      storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)
      if err != nil {fmt.Println(err)return
      }_, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{Topic: "Storage.Reduce",Data:  &any.Any{Value: storageReduceDataData},
      })fmt.Println(storageReduceDataData)if err != nil {fmt.Println(err)
      } else {fmt.Println("Published message!")
      }
      ...

      注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端

       dapr run --app-id client go run main.go

      输出

      == APP == true
      == APP == Published message!
  5. RabbitMQ

    • 在浏览器中输入 http://localhost:15672/ ,账号和密码均为 guest
    • 查看 Connections ,有3个连接
      • 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
    • 查看 Exchanges

      Name            Type    Features    Message rate in Message rate out
      (AMQP default)  direct  D
      Storage.Reduce  fanout  D
      amq.direct      direct  D
      amq.fanout      fanout  D
      ...

      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

    • 查看 Queues

      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打开 DaprClientService.cs 文件,更改内容为

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {private readonly StorageContext _storageContext;public DaprClientService(StorageContext storageContext){_storageContext = storageContext;}public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context){if (request.Topic.Equals("Storage.Reduce")){StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());Console.WriteLine("ProductID:" + storageReduceData.ProductID);Console.WriteLine("Amount:" + storageReduceData.Amount);await HandlerStorageReduce(storageReduceData);}return new Empty();}private async Task HandlerStorageReduce(StorageReduceData storageReduceData){Guid productID = Guid.Parse(storageReduceData.ProductID);Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));if (storageFromDb == null){return;}if (storageFromDb.Amount < storageReduceData.Amount){return;}storageFromDb.Amount -= storageReduceData.Amount;Console.WriteLine(storageFromDb.Amount);await _storageContext.SaveChangesAsync();}
    • 说明
      • 添加 GetTopicSubscriptions() 将完成对主题的关注
        • 当应用停止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理
      • HandlerStorageReduce 用于减少库存
  7. 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
    • Java

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
    • go

      dapr run --app-id client  go run main.go

      go grpc 输出为

      == APP == true
      == APP == Published message!

    查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

源码地址

这篇关于Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/536411

相关文章

Java如何用乘号来重复字符串的功能

《Java如何用乘号来重复字符串的功能》:本文主要介绍Java使用乘号来重复字符串的功能,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java乘号来重复字符串的功能1、利用循环2、使用StringBuilder3、采用 Java 11 引入的String.rep

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

Spring Boot项目打包和运行的操作方法

《SpringBoot项目打包和运行的操作方法》SpringBoot应用内嵌了Web服务器,所以基于SpringBoot开发的web应用也可以独立运行,无须部署到其他Web服务器中,下面以打包dem... 目录一、打包为JAR包并运行1.打包为可执行的 JAR 包2.运行 JAR 包二、打包为WAR包并运行

Java进行日期解析与格式化的实现代码

《Java进行日期解析与格式化的实现代码》使用Java搭配ApacheCommonsLang3和Natty库,可以实现灵活高效的日期解析与格式化,本文将通过相关示例为大家讲讲具体的实践操作,需要的可以... 目录一、背景二、依赖介绍1. Apache Commons Lang32. Natty三、核心实现代

Spring Boot 常用注解整理(最全收藏版)

《SpringBoot常用注解整理(最全收藏版)》本文系统整理了常用的Spring/SpringBoot注解,按照功能分类进行介绍,每个注解都会涵盖其含义、提供来源、应用场景以及代码示例,帮助开发... 目录Spring & Spring Boot 常用注解整理一、Spring Boot 核心注解二、Spr

SpringBoot实现接口数据加解密的三种实战方案

《SpringBoot实现接口数据加解密的三种实战方案》在金融支付、用户隐私信息传输等场景中,接口数据若以明文传输,极易被中间人攻击窃取,SpringBoot提供了多种优雅的加解密实现方案,本文将从原... 目录一、为什么需要接口数据加解密?二、核心加解密算法选择1. 对称加密(AES)2. 非对称加密(R

详解如何在SpringBoot控制器中处理用户数据

《详解如何在SpringBoot控制器中处理用户数据》在SpringBoot应用开发中,控制器(Controller)扮演着至关重要的角色,它负责接收用户请求、处理数据并返回响应,本文将深入浅出地讲解... 目录一、获取请求参数1.1 获取查询参数1.2 获取路径参数二、处理表单提交2.1 处理表单数据三、

java变量内存中存储的使用方式

《java变量内存中存储的使用方式》:本文主要介绍java变量内存中存储的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、变量的定义3、 变量的类型4、 变量的作用域5、 内存中的存储方式总结1、介绍在 Java 中,变量是用于存储程序中数据

如何合理管控Java语言的异常

《如何合理管控Java语言的异常》:本文主要介绍如何合理管控Java语言的异常问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、Thorwable类3、Error4、Exception类4.1、检查异常4.2、运行时异常5、处理方式5.1. 捕获异常

Spring Boot集成SLF4j从基础到高级实践(最新推荐)

《SpringBoot集成SLF4j从基础到高级实践(最新推荐)》SLF4j(SimpleLoggingFacadeforJava)是一个日志门面(Facade),不是具体的日志实现,这篇文章主要介... 目录一、日志框架概述与SLF4j简介1.1 为什么需要日志框架1.2 主流日志框架对比1.3 SLF4