Akka(41): Http:DBTable-rows streaming - 数据库表行交换

2024-04-09 04:48

本文主要是介绍Akka(41): Http:DBTable-rows streaming - 数据库表行交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用Source[ByteString,_]来读取文件然后放进HttpEntity中。我们还提到:如果需要进行数据库数据交换的话,可以用Source[ROW,_]来表示库表行,但首先必须进行ROW -> ByteString的转换。在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。Akka-http的Marshalling实现采用了type-class编程模式,需要为每一种类型与Json的转换在可视域内提供Marshaller[A,B]类型的隐式实例。Akka-http默认的Json工具库是Spray-Json,着重case class,而且要提供JsonFormat?(case-class),其中?代表case class的参数个数,用起来略显复杂。不过因为是Akka-http的配套库,在将来Akka-http的持续发展中具有一定的优势,所以我们还是用它来进行下面的示范。

下面就让我们开始写些代码吧。首先,我们用一个case class代表数据库表行结构,然后用它作为流元素来构建一个Source,如下:

  case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }

我们先设计服务端的数据下载部分:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupporttrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)val route =path("rows") {get {complete {source}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

在上面的代码里我们直接把source放进了complete(),然后期望这个directive能通过ToEntityMarshaller[County]类实例用Spray-Json把Source[County,NotUsed]转换成Source[ByteString,NotUsed]然后放入HttpResponse的HttpEntity里。转换结果只能在客户端得到证实。我们知道HttpResponse里的Entity.dataBytes就是一个Source[ByteString,_],我们可以把它Unmarshall成Source[County,_],然后用Akka-stream来操作:

      case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}


上面这个Unmarshal调用了下面这个FromEntityUnmarshaller[County]隐式实例:

  // support for as[Source[T, NotUsed]]implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒if (support.supported.matches(e.contentType)) {val frames = e.dataBytes.via(support.framingDecoder)val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)val unmarshallingFlow =if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)val elements = frames.viaMat(unmarshallingFlow)(Keep.right)FastFuture.successful(elements)} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))}

这个隐式实例是由Spray-Jason提供的,在SprayJsonSupport.scala里。
下面是这部分客户端的完整代码:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling.Unmarshaltrait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))scala.io.StdIn.readLine()sys.terminate()}

以上我们已经实现了客户端从服务端下载一段数据库表行,然后以Akka-stream的操作方式来处理下载数据。那么反向交换即从客户端上传一段表行的话就需要把一个Source[T,_]转换成Source[ByteString,_]然后放进HttpRequest的HttpEntity里。服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。Akka-http在客户端没有提供像complete这样的强大的自动化功能。我们可能需要自定义并提供像ToRequestMarshaller[Source[T,_]]这样的隐式实例。但Akka-http的Marshalling-type-class是个非常复杂的系统。如果我们的目的是简单提供一个Source[ByteString,_],我们是否可以直接调用Spray-Json的函数来进行ROW->Son->ByteString转换呢?如下:

  import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)

我们直接用toJson函数进行County->Json转换实现了flowCountyToByteString。toJason是Spray-Json提供的一个函数:

package json {case class DeserializationException(msg: String, cause: Throwable = null, fieldNames: List[String] = Nil) extends RuntimeException(msg, cause)class SerializationException(msg: String) extends RuntimeException(msg)private[json] class PimpedAny[T](any: T) {def toJson(implicit writer: JsonWriter[T]): JsValue = writer.write(any)}private[json] class PimpedString(string: String) {@deprecated("deprecated in favor of parseJson", "1.2.6")def asJson: JsValue = parseJsondef parseJson: JsValue = JsonParser(string)}
}

假设服务端收到数据后以Akka-stream方式再转换成一个List返回,我们用下面的方法来测试功能:

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}

服务端接收数据处理方法如下:

     post {withoutSizeLimit {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}

考虑到在数据转换的过程中可能会出现异常。需要异常处理方法来释放backpressure:

  def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}

在客户端试运行返回结果显示:

  uploadRows(request,data)["","广西壮族自治区地市县编号 #1","广西壮族自治区地市县编号 #2","广西壮族自治区地市县编号 #3","广西壮族自治区地市县编号 #4","广西壮族自治区地市县编号 #5"]

正是我们期待的结果。

下面是本次讨论的示范代码:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }implicit val countyFormat = jsonFormat2(County)
}object HttpDBServer extends App {import Converters._implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherimplicit val jsonStreamingSupport = EntityStreamingSupport.json().withParallelMarshalling(parallelism = 8, unordered = false)def postExceptionHandler: ExceptionHandler =ExceptionHandler {case _: RuntimeException =>extractRequest { req =>req.discardEntityBytes()complete((StatusCodes.InternalServerError.intValue,"Upload Failed!"))}}val route =path("rows") {get {complete {source}} ~post {withoutSizeLimit {handleExceptions(postExceptionHandler) {entity(asSourceOf[County]) { source =>val futofNames: Future[List[String]] =source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))complete {futofNames}}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.unmarshalling._trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {case class County(id: Int, name: String)implicit val countyFormat = jsonFormat2(County)
}object HttpDBClient extends App {import Converters._implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherimplicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()def downloadRows(request: HttpRequest) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>val futSource = Unmarshal(entity).to[Source[County,NotUsed]]futSource.onSuccess {case source => source.runForeach(println)}case Success(r@HttpResponse(code, _, _, _)) =>println(s"download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download rows!")case Failure(err) => println(s"download failed: ${err.getMessage}")}}downloadRows(HttpRequest(HttpMethods.GET,uri = s"http://localhost:8011/rows"))import akka.util.ByteStringimport akka.http.scaladsl.model.HttpEntity.limitableByteSourceval source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}def countyToByteString(c: County) = {ByteString(c.toJson.toString)}val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)val rowBytes = limitableByteSource(source via flowCountyToByteString)val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")val data = HttpEntity(ContentTypes.`application/json`,rowBytes)def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}uploadRows(request,data)scala.io.StdIn.readLine()sys.terminate()}





这篇关于Akka(41): Http:DBTable-rows streaming - 数据库表行交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887209

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

使用Node.js和PostgreSQL构建数据库应用

《使用Node.js和PostgreSQL构建数据库应用》PostgreSQL是一个功能强大的开源关系型数据库,而Node.js是构建高效网络应用的理想平台,结合这两个技术,我们可以创建出色的数据驱动... 目录初始化项目与安装依赖建立数据库连接执行CRUD操作查询数据插入数据更新数据删除数据完整示例与最佳

Oracle数据库在windows系统上重启步骤

《Oracle数据库在windows系统上重启步骤》有时候在服务中重启了oracle之后,数据库并不能正常访问,下面:本文主要介绍Oracle数据库在windows系统上重启的相关资料,文中通过代... oracle数据库在Windows上重启的方法我这里是使用oracle自带的sqlplus工具实现的方

MySQL批量替换数据库字符集的实用方法(附详细代码)

《MySQL批量替换数据库字符集的实用方法(附详细代码)》当需要修改数据库编码和字符集时,通常需要对其下属的所有表及表中所有字段进行修改,下面:本文主要介绍MySQL批量替换数据库字符集的实用方法... 目录前言为什么要批量修改字符集?整体脚本脚本逻辑解析1. 设置目标参数2. 生成修改表默认字符集的语句3

Nginx部署HTTP/3的实现步骤

《Nginx部署HTTP/3的实现步骤》本文介绍了在Nginx中部署HTTP/3的详细步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录前提条件第一步:安装必要的依赖库第二步:获取并构建 BoringSSL第三步:获取 Nginx

Linux下MySQL数据库定时备份脚本与Crontab配置教学

《Linux下MySQL数据库定时备份脚本与Crontab配置教学》在生产环境中,数据库是核心资产之一,定期备份数据库可以有效防止意外数据丢失,本文将分享一份MySQL定时备份脚本,并讲解如何通过cr... 目录备份脚本详解脚本功能说明授权与可执行权限使用 Crontab 定时执行编辑 Crontab添加定

HTTP 与 SpringBoot 参数提交与接收协议方式

《HTTP与SpringBoot参数提交与接收协议方式》HTTP参数提交方式包括URL查询、表单、JSON/XML、路径变量、头部、Cookie、GraphQL、WebSocket和SSE,依据... 目录HTTP 协议支持多种参数提交方式,主要取决于请求方法(Method)和内容类型(Content-Ty

如何通过try-catch判断数据库唯一键字段是否重复

《如何通过try-catch判断数据库唯一键字段是否重复》在MyBatis+MySQL中,通过try-catch捕获唯一约束异常可避免重复数据查询,优点是减少数据库交互、提升并发安全,缺点是异常处理开... 目录1、原理2、怎么理解“异常走的是数据库错误路径,开销比普通逻辑分支稍高”?1. 普通逻辑分支 v

Python与MySQL实现数据库实时同步的详细步骤

《Python与MySQL实现数据库实时同步的详细步骤》在日常开发中,数据同步是一项常见的需求,本篇文章将使用Python和MySQL来实现数据库实时同步,我们将围绕数据变更捕获、数据处理和数据写入这... 目录前言摘要概述:数据同步方案1. 基本思路2. mysql Binlog 简介实现步骤与代码示例1