Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

2023-12-25 17:58

本文主要是介绍Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》


  1. 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    • 创建 rabbiqmq.yaml

      apiVersion: dapr.io/v1alpha1
      kind: Component
      metadata:
      name: messagebus
      spec:
      type: pubsub.rabbitmq
      metadata:
      - name: hostvalue: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"
      - name: consumerIDvalue: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"
      - name: durablevalue: "true" # Optional. Default: "false"
      - name: deletedWhenUnusedvalue: "false" # Optional. Default: "false"
      - name: autoAckvalue: "false" # Optional. Default: "false"
      - name: deliveryModevalue: "2" # Optional. Default: "0". Values between 0 - 2.
      - name: requeueInFailurevalue: "true" # Optional. Default: "false".
  2. 改造 StorageService.Api

    目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代码为

      public static IHostBuilder CreateHostBuilder(string[] args)
      {return Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.ConfigureKestrel(options =>{options.Listen(IPAddress.Loopback, 5003, listenOptions =>{listenOptions.Protocols = HttpProtocols.Http2;});});webBuilder.UseStartup<Startup>();});
      }
    • 添加 DaprClientService

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}
      }

      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

    • 修改 Startup.cs

       /// <summary>
      /// This method gets called by the runtime. Use this method to add services to the container.
      /// </summary>
      /// <param name="services">Services.</param>
      public void ConfigureServices(IServiceCollection services)
      {services.AddGrpc();services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });
      }
      /// <summary>
      /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
      /// </summary>
      /// <param name="app">app.</param>
      /// <param name="env">env.</param>
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
      {if (env.IsDevelopment()){app.UseDeveloperExceptionPage();}app.UseRouting();app.UseEndpoints(endpoints =>{endpoints.MapSubscribeHandler();endpoints.MapGrpcService<DaprClientService>();});
      }
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件

    • 启动 StorageService 服务

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
  3. 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为
    • 下单
    • 查看订单详情
    • 获取订单列表

    在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

    • 创建 CreateOrder.proto 文件

      syntax = "proto3";package daprexamples;option java_outer_classname = "CreateOrderProtos";
      option java_package = "generate.protos";service OrderService {rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);
      }message CreateOrderRequest {string ProductID = 1; //Product IDint32 Amount=2; //Product Amountstring CustomerID=3; //Customer ID
      }message CreateOrderResponse {bool Succeed = 1; //Create Order Result,true:success,false:fail
      }message RetrieveOrderRequest{string OrderID=1;
      }message RetrieveOrderResponse{Order Order=1;
      }message GetOrderListRequest{string CustomerID=1;
      }message GetOrderListResponse{repeated Order Orders=1;
      }message Order{string ID=1;string ProductID=2;int32 Amount=3;string CustomerID=4;
      }
    • 使用 protoc 生成 Java 代码

      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
    • 引用 MyBatis 做为 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 服务

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
  4. 创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件

    • 引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

      如未安装 protoc-gen-gogo ,通过一下命令获取并安装

      go get github.com/gogo/protobuf/gogoproto

      安装 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto

      根据 proto 文件生成代码

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 客户端代码,创建订单

      ...response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{Id:     "OrderService",Data:   createOrderRequestData,Method: "createOrder",})if err != nil {fmt.Println(err)return}...
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构

      syntax = "proto3";package daprexamples;option java_outer_classname = "DataToPublishProtos";
      option java_package = "generate.protos";message StorageReduceData {string ProductID = 1;int32 Amount=2;
      }
    • 生成 DataToPublish 代码

       protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列

      ...createOrderResponse := &daprexamples.CreateOrderResponse{}if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {fmt.Println(err)return
      }
      fmt.Println(createOrderResponse.Succeed)if !createOrderResponse.Succeed {//下单失败return
      }storageReduceData := &daprexamples.StorageReduceData{ProductID: createOrderRequest.ProductID,Amount:    createOrderRequest.Amount,
      }
      storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)
      if err != nil {fmt.Println(err)return
      }_, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{Topic: "Storage.Reduce",Data:  &any.Any{Value: storageReduceDataData},
      })fmt.Println(storageReduceDataData)if err != nil {fmt.Println(err)
      } else {fmt.Println("Published message!")
      }
      ...

      注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端

       dapr run --app-id client go run main.go

      输出

      == APP == true
      == APP == Published message!
  5. RabbitMQ

    • 在浏览器中输入 http://localhost:15672/ ,账号和密码均为 guest
    • 查看 Connections ,有3个连接
      • 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
    • 查看 Exchanges

      Name            Type    Features    Message rate in Message rate out
      (AMQP default)  direct  D
      Storage.Reduce  fanout  D
      amq.direct      direct  D
      amq.fanout      fanout  D
      ...

      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

    • 查看 Queues

      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件

    • 打开 DaprClientService.cs 文件,更改内容为

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {private readonly StorageContext _storageContext;public DaprClientService(StorageContext storageContext){_storageContext = storageContext;}public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context){var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");return Task.FromResult(topicSubscriptionsEnvelope);}public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context){if (request.Topic.Equals("Storage.Reduce")){StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());Console.WriteLine("ProductID:" + storageReduceData.ProductID);Console.WriteLine("Amount:" + storageReduceData.Amount);await HandlerStorageReduce(storageReduceData);}return new Empty();}private async Task HandlerStorageReduce(StorageReduceData storageReduceData){Guid productID = Guid.Parse(storageReduceData.ProductID);Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));if (storageFromDb == null){return;}if (storageFromDb.Amount < storageReduceData.Amount){return;}storageFromDb.Amount -= storageReduceData.Amount;Console.WriteLine(storageFromDb.Amount);await _storageContext.SaveChangesAsync();}
    • 说明
      • 添加 GetTopicSubscriptions() 将完成对主题的关注
        • 当应用停止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理
      • HandlerStorageReduce 用于减少库存
  7. 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端

    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
    • Java

      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
    • go

      dapr run --app-id client  go run main.go

      go grpc 输出为

      == APP == true
      == APP == Published message!

    查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

源码地址

这篇关于Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/536411

相关文章

Java实现字节字符转bcd编码

《Java实现字节字符转bcd编码》BCD是一种将十进制数字编码为二进制的表示方式,常用于数字显示和存储,本文将介绍如何在Java中实现字节字符转BCD码的过程,需要的小伙伴可以了解下... 目录前言BCD码是什么Java实现字节转bcd编码方法补充总结前言BCD码(Binary-Coded Decima

SpringBoot全局域名替换的实现

《SpringBoot全局域名替换的实现》本文主要介绍了SpringBoot全局域名替换的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录 项目结构⚙️ 配置文件application.yml️ 配置类AppProperties.Ja

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

JavaScript中的高级调试方法全攻略指南

《JavaScript中的高级调试方法全攻略指南》什么是高级JavaScript调试技巧,它比console.log有何优势,如何使用断点调试定位问题,通过本文,我们将深入解答这些问题,带您从理论到实... 目录观点与案例结合观点1观点2观点3观点4观点5高级调试技巧详解实战案例断点调试:定位变量错误性能分

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

SpringBoot实现不同接口指定上传文件大小的具体步骤

《SpringBoot实现不同接口指定上传文件大小的具体步骤》:本文主要介绍在SpringBoot中通过自定义注解、AOP拦截和配置文件实现不同接口上传文件大小限制的方法,强调需设置全局阈值远大于... 目录一  springboot实现不同接口指定文件大小1.1 思路说明1.2 工程启动说明二 具体实施2

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有