Akka(39): Http:File streaming-文件交换

2024-04-09 04:48
文章标签 http 交换 file 39 akka streaming

本文主要是介绍Akka(39): Http:File streaming-文件交换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表行的上传下载。Akka-http的数据交换模式支持流式操作:代表交换数据可以是一种无限长度流的元素。这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦。更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。

 任何文件的内容储存格式无论在硬盘、内存或者数据线上都是一堆bytes。文件交换流程包括读取文件里的bytes,传送这些bytes,最终把这些bytes写入文件。我们看到这里每个环节操作目标都是bytes,所以可能在程序里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:

  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =fromPath(f, chunkSize, startPosition = 0)def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] =Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource")))def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =toPath(f, options, startPosition = 0)def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] =Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))

我们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下:

  def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}

fileStream是Source[ByteString,_]可以直接放进Entity:

  val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/A4.TIF",256))

我们把fileStream放入了HttpRequest中。对于HttpResponse可以用下面的方式:

 val route = pathPrefix("file") {(get & path("text" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}

注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:

          entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }

上面我们提过FileIO.toPath就是一个Sink。由于我们的目的是大型的文件交换,所以无论上传下载都使用了withoutSizeLimit:

 val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}

好了下面的示范代码里对字符型或二进制文件都进行了交换的示范操作:

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpEntity._
import java.nio.file._object FileServer extends App {implicit val httpSys = ActorSystem("httpSystem")implicit val httpMat = ActorMaterializer()implicit val httpEC = httpSys.dispatcherdef fileStream(filePath: String, chunkSize: Int) = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val destPath = "/users/tiger-macpro/downloads/A4-1.TIF"val route = pathPrefix("file") {(get & path("exchange" / Remaining)) { fp =>withoutSizeLimit {complete(HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/users/tiger-macpro/" + fp, 256)))}} ~(post & path("exchange")) {withoutSizeLimit {extractDataBytes { bytes =>val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath)))onComplete(fut) { _ =>complete(s"Save upload file to: $destPath")}}}}}val (port, host) = (8011,"localhost")val bindingFuture = Http().bindAndHandle(route,host,port)println(s"Server running at $host $port. Press any key to exit ...")scala.io.StdIn.readLine()bindingFuture.flatMap(_.unbind()).onComplete(_ => httpSys.terminate())}

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.limitableByteSource
import akka.http.scaladsl.model._
import java.nio.file._
import akka.util.ByteString
import scala.util._object FileClient extends App {implicit val sys = ActorSystem("ClientSys")implicit val mat = ActorMaterializer()implicit val ec = sys.dispatcherdef downloadFileTo(request: HttpRequest, destPath: String) = {val futResp = Http(sys).singleRequest(request)futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))).onComplete { case _ => println(s"Download file saved to: $destPath") }case Success(r@HttpResponse(code, _, _, _)) =>println(s"Download request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to download file!")case Failure(err) => println(s"Download failed: ${err.getMessage}")}}val dlFile = "Downloads/readme.txt"val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile)downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt")scala.io.StdIn.readLine()val dlFile2 = "Downloads/image.png"val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2)downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png")scala.io.StdIn.readLine()def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {val futResp = Http(sys).singleRequest(request.copy(entity = dataEntity))futResp.andThen {case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>entity.dataBytes.map(_.utf8String).runForeach(println)case Success(r@HttpResponse(code, _, _, _)) =>println(s"Upload request failed, response code: $code")r.discardEntityBytes()case Success(_) => println("Unable to Upload file!")case Failure(err) => println(s"Upload failed: ${err.getMessage}")}}def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = {def loadFile = {//   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")val file = Paths.get(filePath)FileIO.fromPath(file, chunkSize).withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))}limitableByteSource(loadFile)}val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange")val textData = HttpEntity(ContentTypes.`application/octet-stream`,fileStream("/Users/tiger-macpro/downloads/readme.txt",256))uploadFile(uploadText,textData)scala.io.StdIn.readLine()sys.terminate()}






这篇关于Akka(39): Http:File streaming-文件交换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/887207

相关文章

Linux中修改Apache HTTP Server(httpd)默认端口的完整指南

《Linux中修改ApacheHTTPServer(httpd)默认端口的完整指南》ApacheHTTPServer(简称httpd)是Linux系统中最常用的Web服务器之一,本文将详细介绍如何... 目录一、修改 httpd 默认端口的步骤1. 查找 httpd 配置文件路径2. 编辑配置文件3. 保存

C++ HTTP框架推荐(特点及优势)

《C++HTTP框架推荐(特点及优势)》:本文主要介绍C++HTTP框架推荐的相关资料,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. Crow2. Drogon3. Pistache4. cpp-httplib5. Beast (Boos

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

Spring Boot Controller处理HTTP请求体的方法

《SpringBootController处理HTTP请求体的方法》SpringBoot提供了强大的机制来处理不同Content-Type​的HTTP请求体,这主要依赖于HttpMessageCo... 目录一、核心机制:HttpMessageConverter​二、按Content-Type​处理详解1.

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

Nginx中配置HTTP/2协议的详细指南

《Nginx中配置HTTP/2协议的详细指南》HTTP/2是HTTP协议的下一代版本,旨在提高性能、减少延迟并优化现代网络环境中的通信效率,本文将为大家介绍Nginx配置HTTP/2协议想详细步骤,需... 目录一、HTTP/2 协议概述1.HTTP/22. HTTP/2 的核心特性3. HTTP/2 的优

使用Python自建轻量级的HTTP调试工具

《使用Python自建轻量级的HTTP调试工具》这篇文章主要为大家详细介绍了如何使用Python自建一个轻量级的HTTP调试工具,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录一、为什么需要自建工具二、核心功能设计三、技术选型四、分步实现五、进阶优化技巧六、使用示例七、性能对比八、扩展方向建

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

在java中如何将inputStream对象转换为File对象(不生成本地文件)

《在java中如何将inputStream对象转换为File对象(不生成本地文件)》:本文主要介绍在java中如何将inputStream对象转换为File对象(不生成本地文件),具有很好的参考价... 目录需求说明问题解决总结需求说明在后端中通过POI生成Excel文件流,将输出流(outputStre