基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)

本文主要是介绍基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)

前言

本文主要讲解RxJava2.0+Retrofit2.0实现下载文件并带进度效果,如果按照传统方法是很容易实现的。但是,发现网上搜索的例子都是通过OkHttpClient的拦截器去拦截Response来实现进度显示(侵入性有点强),个人发现bug不少,问题都是在UI更新方面出了问题,只要记住UI刷新在主线程更新都容易解决,下面介绍两种非修改拦截器实现文件下载的方法:

效果图 

图中下载速度较快,是基于在公司专线环境测试

依赖添加

 /*Retrofit是一款类型安全的网络框架,基于HTTP协议,服务于Android和java语言,集成了okhttp依赖*/compile 'com.squareup.retrofit2:retrofit:2.3.0'compile 'com.squareup.retrofit2:converter-gson:2.3.0'compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'/*RxAndroid一款Android客户端组件间异步通信的框架,1和2差别很大*/compile 'io.reactivex.rxjava2:rxandroid:2.0.1'compile 'io.reactivex.rxjava2:rxjava:2.1.8'

具体实现 

 方法1:使用Handler更新下载进度

/*** 下载文件法1(使用Handler更新UI)** @param observable      下载被观察者* @param destDir         下载目录* @param fileName        文件名* @param progressHandler 进度handler*/public static void downloadFile(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).observeOn(Schedulers.io()).subscribe(new Observer<ResponseBody>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(ResponseBody responseBody) {InputStream inputStream = null;long total = 0;long responseLength;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress=-1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_PROGRESS, downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_SUCCESS, downloadInfo);} catch (final Exception e) {downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}@Overridepublic void onError(Throwable e) {//new Consumer<Throwable>downloadInfo.setErrorMsg(e);progressHandler.sendMessage(DownloadProgressHandler.DOWNLOAD_FAIL, downloadInfo);}@Overridepublic void onComplete() {// new Action()}});}

方法2:使用RxJava发射器更新下载进度 

 /*** 下载文件法2(使用RXJava更新UI)** @param observable* @param destDir* @param fileName* @param progressHandler*/public static void downloadFile2(Observable<ResponseBody> observable, final String destDir, final String fileName, final DownloadProgressHandler progressHandler) {final DownloadInfo downloadInfo = new DownloadInfo();observable.subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).flatMap(new Function<ResponseBody, ObservableSource<DownloadInfo>>() {@Overridepublic ObservableSource<DownloadInfo> apply(final ResponseBody responseBody) throws Exception {return Observable.create(new ObservableOnSubscribe<DownloadInfo>() {@Overridepublic void subscribe(ObservableEmitter<DownloadInfo> emitter) throws Exception {InputStream inputStream = null;long total = 0;long responseLength = 0;FileOutputStream fos = null;try {byte[] buf = new byte[1024 * 8];int len = 0;responseLength = responseBody.contentLength();inputStream = responseBody.byteStream();final File file = new File(destDir, fileName);downloadInfo.setFile(file);downloadInfo.setFileSize(responseLength);File dir = new File(destDir);if (!dir.exists()) {dir.mkdirs();}fos = new FileOutputStream(file);int progress = 0;int lastProgress = -1;long startTime = System.currentTimeMillis(); // 开始下载时获取开始时间while ((len = inputStream.read(buf)) != -1) {fos.write(buf, 0, len);total += len;progress = (int) (total * 100 / responseLength);long curTime = System.currentTimeMillis();long usedTime = (curTime - startTime) / 1000;if (usedTime == 0) {usedTime = 1;}long speed = (total / usedTime); // 平均每秒下载速度// 如果进度与之前进度相等,则不更新,如果更新太频繁,则会造成界面卡顿if (progress != lastProgress) {downloadInfo.setSpeed(speed);downloadInfo.setProgress(progress);downloadInfo.setCurrentSize(total);emitter.onNext(downloadInfo);}lastProgress = progress;}fos.flush();downloadInfo.setFile(file);emitter.onComplete();} catch (Exception e) {downloadInfo.setErrorMsg(e);emitter.onError(e);} finally {try {if (fos != null) {fos.close();}if (inputStream != null) {inputStream.close();}} catch (Exception e) {e.printStackTrace();}}}});}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<DownloadInfo>() {@Overridepublic void onSubscribe(Disposable d) {addDisposable(d);}@Overridepublic void onNext(DownloadInfo downloadInfo) {progressHandler.onProgress(downloadInfo.getProgress(), downloadInfo.getFileSize(), downloadInfo.getSpeed());}@Overridepublic void onError(Throwable e) {progressHandler.onError(e);}@Overridepublic void onComplete() {LogUtils.i("下载完成");progressHandler.onCompleted(downloadInfo.getFile());}});}

相关代码:

api服务接口类

DownloadApi.java

public interface DownloadApi {/*** 下载Apk1文件**/@Streaming@GET("imtt.dd.qq.com/16891/C527A902F14C1FFD8AA9C13872D5F92F.apk?mkey=5c41136cb711c35d&f=0c2f&fsname=com.tencent.moyu_1.4.0_1.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile1();/*** 下载Apk2文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/5BB89032B0755F5922C80DA8C2CAF735.apk?mkey=5c415b9fb711c35d&f=07b4&fsname=com.tencent.mobileqq_7.9.7_994.apk&csr=1bbd&cip=183.17.229.168&proto=https")Observable<ResponseBody> downloadApkFile2();/*** 下载Apk3文件**/@Streaming@GET("https://cc849cacb0e96648f8dd4bb35ff8365b.dd.cdntips.com/imtt.dd.qq.com/16891/BEC5EEF53983300D9F0AB46166EC9EA7.apk?mkey=5c41a20bda11e60f&f=184b&fsname=com.tencent.pao_1.0.61.0_161.apk&csr=1bbd&cip=218.17.192.250&proto=https")Observable<ResponseBody> downloadApkFile3();
}

下载进度Handler类

DownloadProgressHandler.java 

/*** 下载进度Handler** @author Kelly* @version 1.0.0* @filename DownloadProgressHandler.java* @time 2018/7/25 15:25* @copyright(C) 2018 song*/
public abstract class DownloadProgressHandler implements DownloadCallBack {public static final int DOWNLOAD_SUCCESS = 0;public static final int DOWNLOAD_PROGRESS = 1;public static final int DOWNLOAD_FAIL = 2;protected ResponseHandler mHandler = new ResponseHandler(this, Looper.getMainLooper());/*** 发送消息,更新进度** @param what* @param downloadInfo*/public void sendMessage(int what, DownloadInfo downloadInfo) {mHandler.obtainMessage(what, downloadInfo).sendToTarget();}/*** 处理消息* @param message*/protected void handleMessage(Message message) {DownloadInfo progressBean = (DownloadInfo) message.obj;switch (message.what) {case DOWNLOAD_SUCCESS://下载成功onCompleted(progressBean.getFile());removeMessage();break;case DOWNLOAD_PROGRESS://下载中onProgress(progressBean.getProgress(), progressBean.getFileSize(),progressBean.getSpeed());break;case DOWNLOAD_FAIL://下载失败onError(progressBean.getErrorMsg());break;default:removeMessage();break;}}private void removeMessage() {if (mHandler != null){mHandler.removeCallbacksAndMessages(null);}}protected static class ResponseHandler extends Handler {private DownloadProgressHandler mProgressHandler;public ResponseHandler(DownloadProgressHandler mProgressHandler, Looper looper) {super(looper);this.mProgressHandler = mProgressHandler;}@Overridepublic void handleMessage(Message msg) {mProgressHandler.handleMessage(msg);}}
}

 DownloadCallBack.java

/*** 下载回调*/public interface DownloadCallBack {/*** 进度,运行在主线程** @param progress 下载进度* @param total 总大小* @param speed 下载速率*/void onProgress(int progress, long total,long speed);/*** 运行在主线程** @param file*/void onCompleted(File file);/*** 运行在主线程** @param e*/void onError(Throwable e);}

文件下载信息类

DownloadInfo.java

/*** 下载文件信息** @author Kelly* @version 1.0.0* @filename DownloadInfo.java* @time 2018/7/25 14:27* @copyright(C) 2018 song*/
public class DownloadInfo {private File file;private String fileName;private long fileSize;//单位 byteprivate long currentSize;//当前下载大小private int progress;//当前下载进度private long speed;//下载速率private Throwable errorMsg;//下载异常信息public File getFile() {return file;}public void setFile(File file) {this.file = file;}public String getFileName() {return fileName;}public void setFileName(String fileName) {this.fileName = fileName;}public long getFileSize() {return fileSize;}public void setFileSize(long fileSize) {this.fileSize = fileSize;}public long getCurrentSize() {return currentSize;}public void setCurrentSize(long currentSize) {this.currentSize = currentSize;}public int getProgress() {return progress;}public void setProgress(int progress) {this.progress = progress;}public long getSpeed() {return speed;}public void setSpeed(long speed) {this.speed = speed;}public Throwable getErrorMsg() {return errorMsg;}public void setErrorMsg(Throwable errorMsg) {this.errorMsg = errorMsg;}
}

调用方法

 DownloadApi apiService = RetrofitHelper.getInstance().getApiService(DownloadApi.class);FileDownloader.downloadFile(apiService.downloadApkFile1(), DOWNLOAD_APK_PATH, "test.apk", new DownloadProgressHandler() {@Overridepublic void onProgress(int progress, long total, long speed) {LogUtils.i("progress:" + progress + ",speed:" + speed);mProgress.setText(progress + "%");mFileSize.setText(FileUtils.formatFileSize(total));mRate.setText(FileUtils.formatFileSize(speed)+"/s");}@Overridepublic void onCompleted(File file) {LogUtils.i("下载apk文件成功");FileDownloader.clear();}@Overridepublic void onError(Throwable e) {LogUtils.e("下载apk文件异常", e);FileDownloader.clear();}});

下载地址

CSDN:基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)_rxjava下载文件-Android代码类资源-CSDN下载

GitHub:https://github.com/kellysong/android-blog-demo/tree/master/net-demo

关于下载进度问题:

本文测试的前提是创建OkHttpClient时,没有添加log拦截器。添加HttpLoggingInterceptor会影响下载进度观察,会出现下载出现卡顿一会(时间因文件而异),才触发下载进度回调,导致认为不是下载进度回调,原因是HttpLoggingInterceptor中的下面代码导致

BufferedSource source = responseBody.source();
source.request(Long.MAX_VALUE); // 卡主,运行在子线程
Buffer buffer = source.getBuffer();

上面代码已经在下载文件了。

解决方案:

  1. 取消HttpLoggingInterceptor
  2. HttpLoggingInterceptor设置更低级别的日志拦截,如NONE
  3. 使用修改的HttpLoggingInterceptor(对应okhttp版本 3.14.9)已经修复打开日志拦截器的情况下:文件下载和文件上传oom问题

生产上必须关HttpLoggingInterceptor,除非需要做特别的网络监控

关于Retrofit下载问题:

有人是说通过okhttp拦截器下载文件才是下载(官方例子也是这样实现),通过ResponseBody是写本地文件?事实是真是这样吗?
    
    
简单来说文件下载就是拿到文件的输入流,边读编写,服务端只是返回一个通道inputStream。先说一下,传统文件(这里解释为不用任何第三方封装的框架,这里使用自带的HttpURLConnection)下载使用示例:
    
   

private void downloadFile(String url) {long start = System.currentTimeMillis();InputStream is = null;FileOutputStream fos = null;HttpURLConnection httpConn;try {httpConn = (HttpURLConnection) new URL(url).openConnection();httpConn.setDoOutput(false);// 使用 URL 连接进行输出httpConn.setDoInput(true);// 使用 URL 连接进行输入httpConn.setRequestMethod("GET");// 设置URL请求方法httpConn.setConnectTimeout(40000);httpConn.setReadTimeout(40000);httpConn.setRequestProperty("Content-Type", "application/octet-stream");httpConn.setRequestProperty("Connection", "Keep-Alive");// 维持长连接httpConn.setRequestProperty("Charset", "UTF-8");//获取文件下载输入流is = httpConn.getInputStream();File file = new File(Environment.getExternalStorageDirectory(), "test.apk");fos = new FileOutputStream(file);int b;byte[] byArr = new byte[1024];while ((b = is.read(byArr)) != -1) {//写文件fos.write(byArr, 0, b);}long end = System.currentTimeMillis();System.out.println("下载耗时:" + (end - start) / 1000.0 + "s");} catch (Exception e) {e.printStackTrace();} finally {close(is);close(fos);}}public static void close(Closeable x) {if (x != null) {try {x.close();} catch (Exception e) {// skip}}}

看了上面的例子,是否对下载文件有了更清晰的认识。现在我们再来下看下retrofit文件下载的操作:

      @Streaming@GETCall<ResponseBody> downLoadFile(@Url String url);

上面的ResponseBody是okhttp3.ResponseBody包下下,有一个注解 @Streaming表示直接返回ResponseBody类型的数据,不读取到内存,可以理解为返回一个输入通道inputStream,也就是你可以用过这个返回来的body的bytestream循环写入文件,同时可以做下载进度的回调。

为什么是这样呢,要具体分析两个主要类:转换器类BuiltInConverters类ResponseBody类
    
BuiltInConverters

    

final class BuiltInConverters extends Converter.Factory {@Overridepublic Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,Retrofit retrofit) {if (type == ResponseBody.class) {return Utils.isAnnotationPresent(annotations, Streaming.class)? StreamingResponseBodyConverter.INSTANCE: BufferingResponseBodyConverter.INSTANCE;}if (type == Void.class) {return VoidResponseBodyConverter.INSTANCE;}return null;}//省略}

如果下载接口加了注解@Streaming就会用StreamingResponseBodyConverter,直接返回ResponseBody,否则就是BufferingResponseBodyConverter,而BufferingResponseBodyConverter是一次性读取到内存中的,实际容易出现OOM,这也就是为什么下载大文件时接口需要加上注解@Streaming。

ResponseBody

    

    public abstract class ResponseBody implements Closeable {/** Multiple calls to {@link #charStream()} must return the same instance. */private Reader reader;public abstract @Nullable MediaType contentType();/*** Returns the number of bytes in that will returned by {@link #bytes}, or {@link #byteStream}, or* -1 if unknown.*/public abstract long contentLength();//输入流public final InputStream byteStream() {return source().inputStream();}public abstract BufferedSource source();//省略}

拿到inputStream就可以进行文件写入,而`source().inputStream()`的InputStream(抽象类)在`RealBufferedSource`已经被重新定义。

 @Override public InputStream inputStream() {return new InputStream() {@Override public int read() throws IOException {if (closed) throw new IOException("closed");if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.readByte() & 0xff;}@Override public int read(byte[] data, int offset, int byteCount) throws IOException {if (closed) throw new IOException("closed");checkOffsetAndCount(data.length, offset, byteCount);if (buffer.size == 0) {long count = source.read(buffer, Segment.SIZE);if (count == -1) return -1;}return buffer.read(data, offset, byteCount);}//省略...};}

上面分析了@Streaming只是拿到一个下载通道,如果你不读取数据是不会下载的,也就不会回调拦截器进度,即OkHttp没有从输入流读取数据,哪怕下载请求响应已经返回。

下面分析下载进度问题:

当我们通过ResponseBody拿到InputStream ,调用inputStream.read(myBuffer)时,会触发 `read(byte[] data, int offset, int byteCount)`方法调用,
从方法可知okhttp使用`Buffer`(提高IO的处理效率),意思是buffer.size == 0,先通过source把数据读到Buffer,然后在从Buffer中读取myBuffer返回到`inputStream.read(myBuffer)`调用处,再把myBuffer写入文件。而如果重写了`ResponseBody`的话,会调用拦截器下面的ProgressResponseBody的source中的`read(Buffer sink, long byteCount)`方法。

 private  class ProgressResponseBody extends ResponseBody {private final ResponseBody responseBody;private BufferedSource bufferedSource;ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {this.responseBody = responseBody;this.progressListener = progressListener;}private Source source(Source source) {return new ForwardingSource(source) {long totalBytesRead = 0L;@Overridepublic long read(Buffer sink, long byteCount) throws IOException {long bytesRead = super.read(sink, byteCount);// read() returns the number of bytes read, or -1 if this source is exhausted.totalBytesRead += bytesRead != -1 ? bytesRead : 0;progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);return bytesRead;}};}//省略...}}

如果设置的缓冲区较小,会导致先把缓冲区的数据读完,给人感觉是上层循环多次读取,其实buffer已经存在,但是上层读取数据大小进度是跟拦截器中ProgressResponseBody的进度是等价的。如果读完了buffer,不去调用`source.read(buffer, Segment.SIZE)`,数据是没有从服务端返回。缓冲区的数据读取是非常快的。故通过ResponseBody监听文件下载进度是没有问题的。

小结

文件下载,连接过程本身需要一定的时间,然后是文件下载的IO读写,个人觉得通过ResponseBody下载文件和监听下载进度,相比通过拦截器处理更简单,可以更方便处理多任务下载进度问题。如有不对请指出,会立马纠正。

其它文章:

基于RxJava2.0+Retrofit2.0超大文件分块(分片)上传(带进度)

这篇关于基于RxJava2.0+Retrofit2.0的文件下载实现(带进度,非覆写ResponseBody和拦截器)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1014976

相关文章

Spring Boot整合Redis注解实现增删改查功能(Redis注解使用)

《SpringBoot整合Redis注解实现增删改查功能(Redis注解使用)》文章介绍了如何使用SpringBoot整合Redis注解实现增删改查功能,包括配置、实体类、Repository、Se... 目录配置Redis连接定义实体类创建Repository接口增删改查操作示例插入数据查询数据删除数据更

Java Lettuce 客户端入门到生产的实现步骤

《JavaLettuce客户端入门到生产的实现步骤》本文主要介绍了JavaLettuce客户端入门到生产的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 目录1 安装依赖MavenGradle2 最小化连接示例3 核心特性速览4 生产环境配置建议5 常见问题

Java使用Swing生成一个最大公约数计算器

《Java使用Swing生成一个最大公约数计算器》这篇文章主要为大家详细介绍了Java使用Swing生成一个最大公约数计算器的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下... 目录第一步:利用欧几里得算法计算最大公约数欧几里得算法的证明情形 1:b=0情形 2:b>0完成相关代码第二步:加

linux ssh如何实现增加访问端口

《linuxssh如何实现增加访问端口》Linux中SSH默认使用22端口,为了增强安全性或满足特定需求,可以通过修改SSH配置来增加或更改SSH访问端口,具体步骤包括修改SSH配置文件、增加或修改... 目录1. 修改 SSH 配置文件2. 增加或修改端口3. 保存并退出编辑器4. 更新防火墙规则使用uf

Java 的ArrayList集合底层实现与最佳实践

《Java的ArrayList集合底层实现与最佳实践》本文主要介绍了Java的ArrayList集合类的核心概念、底层实现、关键成员变量、初始化机制、容量演变、扩容机制、性能分析、核心方法源码解析、... 目录1. 核心概念与底层实现1.1 ArrayList 的本质1.1.1 底层数据结构JDK 1.7

Java Map排序如何按照值按照键排序

《JavaMap排序如何按照值按照键排序》该文章主要介绍Java中三种Map(HashMap、LinkedHashMap、TreeMap)的默认排序行为及实现按键排序和按值排序的方法,每种方法结合实... 目录一、先理清 3 种 Map 的默认排序行为二、按「键」排序的实现方式1. 方式 1:用 TreeM

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象