线程的创建(Runnable,Future,CompletionService,CompletableFuture的辨析)

本文主要是介绍线程的创建(Runnable,Future,CompletionService,CompletableFuture的辨析),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 直接使用Thread
    • 使用Runnable
    • 使用Callable与FutureTask
    • 使用线程池来执行任务
    • CompletionService
    • ListenableFuture
    • CompletableFuture
    • 参考资料

直接使用Thread

直接让某个类继承Thread类,复写run方法,外部调用的时候直接调用start方法。
因为java的单继承模式,我们一般不直接使用这种方法。

使用Runnable

@Slf4j
public class MyTask implements Runnable {@Overridepublic void run() {log.info("i love you");}public static void main(String[] args) {MyTask myTask = new MyTask();Thread myThread = new Thread(myTask);myThread.start();}
}

代码如上,另外 使用匿名Runnable其实和上面也是一致的思路
使用runnable解决java单继承的问题,但是也还有一个问题,那就是,没有返回值。

使用Callable与FutureTask

@Slf4j
public class TestFutureTask {public static void main(String[] args) {Callable<Integer> callable = () -> {int a = new Random().nextInt(100);Thread.sleep(1000);log.info(a + "   in subThread " + System.currentTimeMillis());return a;};FutureTask<Integer> ft = new FutureTask<>(callable);log.info(" main start " + System.currentTimeMillis());ft.run();try {log.info("in mainThread " + System.currentTimeMillis());log.info("得到结果 " + ft.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}
}

代码如上,使用FutureTask来承接Callable,Callable里面存放需要执行的任务

另外如果future的结果还没有出来,调用它的get方法就会阻塞
那如果我就是想立即知道future到底有没有"计算"出结果呢?
在这里插入图片描述用isDone()
带参数的get,就是给个时间限制,如果再n个单位时间内,还没有获得结果,就抛出异常。
刚才的代码执行结果如下:

17:20:39.838 [main] INFO com.alibaba.TestFutureTask -  main start 1685697639835
17:20:40.844 [main] INFO com.alibaba.TestFutureTask - 25   in subThread 1685697640844
17:20:40.844 [main] INFO com.alibaba.TestFutureTask - in mainThread 1685697640844
17:20:40.844 [main] INFO com.alibaba.TestFutureTask - 得到结果 25

我们可以从主线程里拿到Callable任务的返回值
不过,大家应该注意到一个问题:
打印了main start后过了很久in mainThread才打印,也就是说
FutureTask的run方法会阻塞主进程!!
那应该怎么办呢?

        FutureTask<Integer> ft2 = new FutureTask<>(callable);log.info(" main start1 " + System.currentTimeMillis());new Thread(ft2).start();try {log.info("in mainThread " + System.currentTimeMillis());log.info("得到结果2 " + ft2.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}

如上把原来的ft.run();改成new Thread(ft).start()

使用线程池来执行任务

上面三种启动线程的方式,都只是用来学习的,并不推荐,因为每个任务都启动一个线程去执行,实在是太慢了,我们一般都会使用线程池。
例如:

       int  maximumPoolSize =Runtime.getRuntime().availableProcessors() * 4 +1;int corePoolSize = maximumPoolSize/2;ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,3,TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());executor.allowCoreThreadTimeOut(true);

CompletionService

CompletionService干什么的?可以从线程池里拿到最先完成的任务

package com.alibaba.thread;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;import lombok.extern.slf4j.Slf4j;/*** @author zjhua* @description* @date 2020/1/28 21:07*/
@Slf4j
public class CompletionServiceTest {public static void main(String[] args) throws ExecutionException, InterruptedException {testFuture();}// 结果的输出和线程的放入顺序 有关(如果前面的没完成,就算后面的哪个完成了也得等到你的牌号才能输出!),so阻塞耗时public static void testFuture() throws InterruptedException, ExecutionException {long beg = System.currentTimeMillis();log.info("testFuture()开始执行:" + beg);ExecutorService executor = Executors.newCachedThreadPool();List<Future<String>> result = new ArrayList<>();for (int i = 5; i > 0; i--) {Future<String> submit = executor.submit(new Task(i));result.add(submit);}executor.shutdown();for (int i = 0; i < 5; i++) {// 一个一个等待返回结果Thread.sleep(500);log.info("线程" + i + "执行完成:" + result.get(i).get());}log.info("testFuture()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));}private static class Task implements Callable<String> {private volatile int i;public Task(int i) {this.i = i;}@Overridepublic String call() throws Exception {long beg = System.currentTimeMillis();Thread.sleep(i * 500);long end = System.currentTimeMillis();return "任务 : " + i + "beg: " + beg + " consume:" + (end - beg);}}
}

大家先看看testFuture方法
我们把任务交给了线程池之后,拿到了一堆Future
List<Future> result = new ArrayList<Future>();
可是我压根不知道哪个先完成了,只能按照放入的顺序遍历get了。
上面打印的日志是:

2023-06-03 00:05:00.557 INFO  [main] c.a.t.CompletionServiceTest - testFuture()开始执行:1685721900556
2023-06-03 00:05:03.073 INFO  [main] c.a.t.CompletionServiceTest - 线程0执行完成:任务 : 5beg: 1685721900560 consume:2513
2023-06-03 00:05:03.583 INFO  [main] c.a.t.CompletionServiceTest - 线程1执行完成:任务 : 4beg: 1685721900561 consume:2003
2023-06-03 00:05:04.088 INFO  [main] c.a.t.CompletionServiceTest - 线程2执行完成:任务 : 3beg: 1685721900561 consume:1513
2023-06-03 00:05:04.594 INFO  [main] c.a.t.CompletionServiceTest - 线程3执行完成:任务 : 2beg: 1685721900561 consume:1006
2023-06-03 00:05:05.101 INFO  [main] c.a.t.CompletionServiceTest - 线程4执行完成:任务 : 1beg: 1685721900561 consume:514
2023-06-03 00:05:05.101 INFO  [main] c.a.t.CompletionServiceTest - testFuture()执行完成:1685721905101,4545

第一个提交的任务,其实执行的耗时最长。
其实ExecutorCompletionService的功能就是,拿到最先执行完成的那个。
ok 看下面的代码

 // 结果的输出和线程的放入顺序 无关(谁完成了谁就先输出!主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序),so很大大缩短等待时间private static void testCompletionService() throws InterruptedException, ExecutionException {long beg = System.currentTimeMillis();log.info("testCompletionService()开始执行:" + beg);ExecutorService executor = Executors.newCachedThreadPool();ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);for (int i = 5; i > 0; i--) {completionService.submit(new Task(i));}executor.shutdown();for (int i = 0; i < 5; i++) {// 检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。Future<String> future = completionService.take(); // 这一行没有完成的任务就阻塞Thread.sleep(500);log.info("线程" + i + "执行完成:" + future.get()); // 这一行在这里不会阻塞,引入放入队列中的都是已经完成的任务}log.info("testCompletionService()执行完成:" + System.currentTimeMillis() + "," + (System.currentTimeMillis() - beg));}public static void main(String[] args) throws ExecutionException, InterruptedException {testFuture();log.info("************");testCompletionService();}

这样最后的结果就是

2023-06-03 00:10:09.915 INFO  [main] c.a.t.CompletionServiceTest - testFuture()开始执行:1685722209913
2023-06-03 00:10:12.423 INFO  [main] c.a.t.CompletionServiceTest - 线程0执行完成:任务 : 5beg: 1685722209918 consume:2505
2023-06-03 00:10:12.923 INFO  [main] c.a.t.CompletionServiceTest - 线程1执行完成:任务 : 4beg: 1685722209918 consume:2006
2023-06-03 00:10:13.437 INFO  [main] c.a.t.CompletionServiceTest - 线程2执行完成:任务 : 3beg: 1685722209918 consume:1505
2023-06-03 00:10:13.951 INFO  [main] c.a.t.CompletionServiceTest - 线程3执行完成:任务 : 2beg: 1685722209918 consume:1004
2023-06-03 00:10:14.465 INFO  [main] c.a.t.CompletionServiceTest - 线程4执行完成:任务 : 1beg: 1685722209918 consume:504
2023-06-03 00:10:14.465 INFO  [main] c.a.t.CompletionServiceTest - testFuture()执行完成:1685722214465,4552
2023-06-03 00:10:14.465 INFO  [main] c.a.t.CompletionServiceTest - ************
2023-06-03 00:10:14.465 INFO  [main] c.a.t.CompletionServiceTest - testCompletionService()开始执行:1685722214465
2023-06-03 00:10:15.476 INFO  [main] c.a.t.CompletionServiceTest - 线程0执行完成:任务 : 1beg: 1685722214466 consume:504
2023-06-03 00:10:15.982 INFO  [main] c.a.t.CompletionServiceTest - 线程1执行完成:任务 : 2beg: 1685722214466 consume:1010
2023-06-03 00:10:16.482 INFO  [main] c.a.t.CompletionServiceTest - 线程2执行完成:任务 : 3beg: 1685722214466 consume:1501
2023-06-03 00:10:16.983 INFO  [main] c.a.t.CompletionServiceTest - 线程3执行完成:任务 : 4beg: 1685722214466 consume:2001
2023-06-03 00:10:17.487 INFO  [main] c.a.t.CompletionServiceTest - 线程4执行完成:任务 : 5beg: 1685722214466 consume:2502
2023-06-03 00:10:17.487 INFO  [main] c.a.t.CompletionServiceTest - testCompletionService()执行完成:1685722217487,3022

testCompletionService整体耗时3022毫秒,之前的testFuture是4000多毫秒。
为什么会有这样的差别呢?
核心就是

//检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
Future future = completionService.take(); // 这一行没有完成的任务就阻塞

总结一下就是ExecutorCompletionService包裹了ExecutorService之后,我们可以按照执行完成的先后顺序拿到ExecutorCompletionService要执行的逻辑的返回结果。
代码如下

ExecutorService executor = Executors.newCachedThreadPool();
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executor);

ListenableFuture

从runnable到callable我们实现了没有返回值到有返回值
从futuretask到线程池,我们实现了不用每次都起一个线程的进步
从线程池到CompletionService,我们可以拿到最先完成的任务
现在看future,代码还是真规整的,先提交结果,然后我去做我的事情,等会过来问问是否完成
那么等会过来问问是否完成这个步骤能否省略呢?

有guava的ListenableFuture实现了把"等会过来问问的"步骤省略的工作
来看代码

package com.alibaba.thread;import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;import lombok.extern.slf4j.Slf4j;@Slf4j
public class GuavaTest {public static ListeningExecutorService service;static {int maximumPoolSize = Runtime.getRuntime().availableProcessors() * 4 + 1;int corePoolSize = maximumPoolSize / 2;ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 3, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1024), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());executor.allowCoreThreadTimeOut(true);service = MoreExecutors.listeningDecorator(executor);}public static void main(String[] args) {long start = System.currentTimeMillis();// 任务1ListenableFuture<Boolean> booleanTask = service.submit(() -> {Thread.sleep(10000);return true;});// 老版本的addCallback只有两个参数// 新版本的addCallback有三个参数 多的就是最后那个ExecutorFutures.addCallback(booleanTask, new FutureCallback<Boolean>() {@Overridepublic void onSuccess(Boolean result) {log.info("BooleanTask.任务1-10s: " + result);}@Overridepublic void onFailure(Throwable throwable) {log.info("BooleanTask.throwable: " + throwable);}}, service);// 任务2ListenableFuture<String> stringTask = service.submit(() -> {Thread.sleep(3000);return "Hello World";});Futures.addCallback(stringTask, new FutureCallback<String>() {@Overridepublic void onSuccess(String result) {log.info("StringTask.任务2-3s: " + result);}@Overridepublic void onFailure(Throwable t) {}}, service);// 任务3ListenableFuture<Integer> integerTask = service.submit(() -> {Thread.sleep(2000);return new Random().nextInt(100);});Futures.addCallback(integerTask, new FutureCallback<Integer>() {@Overridepublic void onSuccess(Integer result) {log.info("IntegerTask.任务3-2s:: " + result);}@Overridepublic void onFailure(Throwable t) {}}, service);// 执行时间log.info("time: " + (System.currentTimeMillis() - start));}}

CompletableFuture

CompletableFuture这个东西能干啥呢?
我的理解是可以链式指定各个任务之间的顺序,例如完成了任务1和任务2才能开始任务3;任务4和任务5完成了一个之后就去完成任务6。

眼前有景道不得,崔颢题诗在上头!
大家参考
https://juejin.cn/post/6844904195162636295

另外使用CompletableFuture的时候最好使用自定义线程池,默认的线程池是共用的,且线程数是cpu的核数减一,这里极易出现问题,大家参考
https://www.cnblogs.com/blackmlik/p/16098938.html

参考资料

https://www.cnblogs.com/lightdb/p/11829397.html
https://juejin.cn/post/6844904195162636295
https://www.cnblogs.com/blackmlik/p/16098938.html

这篇关于线程的创建(Runnable,Future,CompletionService,CompletableFuture的辨析)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/691268

相关文章

Java中如何正确的停掉线程

《Java中如何正确的停掉线程》Java通过interrupt()通知线程停止而非强制,确保线程自主处理中断,避免数据损坏,线程池的shutdown()等待任务完成,shutdownNow()强制中断... 目录为什么不强制停止为什么 Java 不提供强制停止线程的能力呢?如何用interrupt停止线程s

python 线程池顺序执行的方法实现

《python线程池顺序执行的方法实现》在Python中,线程池默认是并发执行任务的,但若需要实现任务的顺序执行,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋... 目录方案一:强制单线程(伪顺序执行)方案二:按提交顺序获取结果方案三:任务间依赖控制方案四:队列顺序消

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

MySQL 数据库表操作完全指南:创建、读取、更新与删除实战

《MySQL数据库表操作完全指南:创建、读取、更新与删除实战》本文系统讲解MySQL表的增删查改(CURD)操作,涵盖创建、更新、查询、删除及插入查询结果,也是贯穿各类项目开发全流程的基础数据交互原... 目录mysql系列前言一、Create(创建)并插入数据1.1 单行数据 + 全列插入1.2 多行数据

MySQL 临时表创建与使用详细说明

《MySQL临时表创建与使用详细说明》MySQL临时表是存储在内存或磁盘的临时数据表,会话结束时自动销毁,适合存储中间计算结果或临时数据集,其名称以#开头(如#TempTable),本文给大家介绍M... 目录mysql 临时表详细说明1.定义2.核心特性3.创建与使用4.典型应用场景5.生命周期管理6.注

MySQL的触发器全解析(创建、查看触发器)

《MySQL的触发器全解析(创建、查看触发器)》MySQL触发器是与表关联的存储程序,当INSERT/UPDATE/DELETE事件发生时自动执行,用于维护数据一致性、日志记录和校验,优点包括自动执行... 目录触发器的概念:创建触www.chinasem.cn发器:查看触发器:查看当前数据库的所有触发器的定

创建springBoot模块没有目录结构的解决方案

《创建springBoot模块没有目录结构的解决方案》2023版IntelliJIDEA创建模块时可能出现目录结构识别错误,导致文件显示异常,解决方法为选择模块后点击确认,重新校准项目结构设置,确保源... 目录创建spChina编程ringBoot模块没有目录结构解决方案总结创建springBoot模块没有目录

SpringBoot实现虚拟线程的方案

《SpringBoot实现虚拟线程的方案》Java19引入虚拟线程,本文就来介绍一下SpringBoot实现虚拟线程的方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录什么是虚拟线程虚拟线程和普通线程的区别SpringBoot使用虚拟线程配置@Async性能对比H

在Java中实现线程之间的数据共享的几种方式总结

《在Java中实现线程之间的数据共享的几种方式总结》在Java中实现线程间数据共享是并发编程的核心需求,但需要谨慎处理同步问题以避免竞态条件,本文通过代码示例给大家介绍了几种主要实现方式及其最佳实践,... 目录1. 共享变量与同步机制2. 轻量级通信机制3. 线程安全容器4. 线程局部变量(ThreadL

Linux线程同步/互斥过程详解

《Linux线程同步/互斥过程详解》文章讲解多线程并发访问导致竞态条件,需通过互斥锁、原子操作和条件变量实现线程安全与同步,分析死锁条件及避免方法,并介绍RAII封装技术提升资源管理效率... 目录01. 资源共享问题1.1 多线程并发访问1.2 临界区与临界资源1.3 锁的引入02. 多线程案例2.1 为