Akka(43): Http:SSE-Server Sent Event - 服务端主推消息

2024-04-09 04:48

本文主要是介绍Akka(43): Http:SSE-Server Sent Event - 服务端主推消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   因为我了解Akka-http的主要目的不是为了有关Web-Server的编程,而是想实现一套系统集成的api,所以也需要考虑由服务端主动向客户端发送指令的应用场景。比如一个零售店管理平台的服务端在完成了某些数据更新后需要通知各零售门市客户端下载最新数据。虽然Akka-http也提供对websocket协议的支持,但websocket的网络连接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而我们面临的可能是批次型的大量数据库数据交换,只需要简单的服务端单向消息就行了,所以websocket不太合适,而Akka-http的SSE应该比较适合我们的要求。SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于自己应该执行的指令,然后进行相应的处理。客户端接收SSE是在一个独立的线程里不断进行的,不会影响客户端当前的运算流程。当收到有用的消息后就会调用一个业务功能函数作为后台异步运算任务。

服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来实现的。ServerSentEvent类型定义如下:

/*** Representation of a server-sent event. According to the specification, an empty data field designates an event* which is to be ignored which is useful for heartbeats.** @param data data, may span multiple lines* @param eventType optional type, must not contain \n or \r* @param id optional id, must not contain \n or \r* @param retry optional reconnection delay in milliseconds*/
final case class ServerSentEvent(data:      String,eventType: Option[String] = None,id:        Option[String] = None,retry:     Option[Int]    = None) {...}

这个类型的参数代表事件消息的数据结构。用户可以根据实际需要充分利用这个数据结构来传递消息。服务端是通过complete以SeverSentEvent类为元素 的Source来进行SSE的,如下:

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._complete {Source.tick(2.seconds, 2.seconds, NotUsed).map( _ => processToServerSentEvent).keepAlive(1.second, () => ServerSentEvent.heartbeat)}

以上代码代表服务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后发布给所有订阅的客户端。我们用一个函数processToServerSentEvent模拟重复运算的业务功能:

  private def processToServerSentEvent: ServerSentEvent = {Thread.sleep(3000)   //processing delayServerSentEvent(SyncFiles.fileToSync)}

这个函数模拟发布事件数据是某种业务运算结果,在这里代表客户端需要下载文件名称。我们用客户端request来模拟设定这个文件名称:

  object SyncFiles {var fileToSync: String = ""}private def route = {import Directives._import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._def syncRequests =pathPrefix("sync") {pathSingleSlash {post {parameter("file") { filename =>complete {SyncFiles.fileToSync = filenames"set download file to : $filename"}}}}}

客户端订阅SSE的方式如下:

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._import system.dispatcherHttp().singleRequest(Get("http://localhost:8011/events")).flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]).foreach(_.runForeach(se => downloadFiles(se.data)))

每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:

  def downloadFiles(file: String) = {Thread.sleep(3000)   //process delayif (file != "")println(s"Try to download $file")}

下面是客户端程序的测试运算步骤:

    scala.io.StdIn.readLine()println("do some thing ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()println("do some other things ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")).onSuccess {case msg => println(msg)}

运算结果:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Ordersdo some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download ItemsTry to download ItemsProcess finished with exit code 0

下面是本次讨论的示范源代码:

服务端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEventobject SSEServer {def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val mat    = ActorMaterializer()Http().bindAndHandle(route, "localhost", 8011)scala.io.StdIn.readLine()system.terminate()}object SyncFiles {var fileToSync: String = ""}private def route = {import Directives._import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._def syncRequests =pathPrefix("sync") {pathSingleSlash {post {parameter("file") { filename =>complete {SyncFiles.fileToSync = filenames"set download file to : $filename"}}}}}def events =path("events") {get {complete {Source.tick(2.seconds, 2.seconds, NotUsed).map( _ => processToServerSentEvent).keepAlive(1.second, () => ServerSentEvent.heartbeat)}}}syncRequests ~ events}private def processToServerSentEvent: ServerSentEvent = {Thread.sleep(3000)   //processing delayServerSentEvent(SyncFiles.fileToSync)}
}

客户端:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._object SSEClient {def downloadFiles(file: String) = {Thread.sleep(3000)   //process delayif (file != "")println(s"Try to download $file")}def main(args: Array[String]): Unit = {implicit val system = ActorSystem()implicit val mat    = ActorMaterializer()import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._import system.dispatcherHttp().singleRequest(Get("http://localhost:8011/events")).flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]).foreach(_.runForeach(se => downloadFiles(se.data)))scala.io.StdIn.readLine()println("do some thing ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()println("do some other things ...")Http().singleRequest(HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")).onSuccess {case msg => println(msg)}scala.io.StdIn.readLine()system.terminate()}
}






这篇关于Akka(43): Http:SSE-Server Sent Event - 服务端主推消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/887211

相关文章

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

Windows Server 2025 搭建NPS-Radius服务器的步骤

《WindowsServer2025搭建NPS-Radius服务器的步骤》本文主要介绍了通过微软的NPS角色实现一个Radius服务器,身份验证和证书使用微软ADCS、ADDS,具有一定的参考价... 目录简介示意图什么是 802.1X?核心作用802.1X的组成角色工作流程简述802.1X常见应用802.

C++ HTTP框架推荐(特点及优势)

《C++HTTP框架推荐(特点及优势)》:本文主要介绍C++HTTP框架推荐的相关资料,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Crow2. Drogon3. Pistache4. cpp-httplib5. Beast (Boos

C#使用MQTTnet实现服务端与客户端的通讯的示例

《C#使用MQTTnet实现服务端与客户端的通讯的示例》本文主要介绍了C#使用MQTTnet实现服务端与客户端的通讯的示例,包括协议特性、连接管理、QoS机制和安全策略,具有一定的参考价值,感兴趣的可... 目录一、MQTT 协议简介二、MQTT 协议核心特性三、MQTTNET 库的核心功能四、服务端(BR

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

SQL Server身份验证模式步骤和示例代码

《SQLServer身份验证模式步骤和示例代码》SQLServer是一个广泛使用的关系数据库管理系统,通常使用两种身份验证模式:Windows身份验证和SQLServer身份验证,本文将详细介绍身份... 目录身份验证方式的概念更改身份验证方式的步骤方法一:使用SQL Server Management S

Spring AI 实现 STDIO和SSE MCP Server的过程详解

《SpringAI实现STDIO和SSEMCPServer的过程详解》STDIO方式是基于进程间通信,MCPClient和MCPServer运行在同一主机,主要用于本地集成、命令行工具等场景... 目录Spring AI 实现 STDIO和SSE MCP Server1.新建Spring Boot项目2.a

SQL Server中的PIVOT与UNPIVOT用法具体示例详解

《SQLServer中的PIVOT与UNPIVOT用法具体示例详解》这篇文章主要给大家介绍了关于SQLServer中的PIVOT与UNPIVOT用法的具体示例,SQLServer中PIVOT和U... 目录引言一、PIVOT:将行转换为列核心作用语法结构实战示例二、UNPIVOT:将列编程转换为行核心作用语

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

SpringBoot快速搭建TCP服务端和客户端全过程

《SpringBoot快速搭建TCP服务端和客户端全过程》:本文主要介绍SpringBoot快速搭建TCP服务端和客户端全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录TCPServerTCPClient总结由于工作需要,研究了SpringBoot搭建TCP通信的过程