CompletableFuture、ListenableFuture高级用列

2024-01-13 07:36

本文主要是介绍CompletableFuture、ListenableFuture高级用列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CompletableFuture
链式

 public static void main(String[] args) throws Exception {CompletableFuture<Integer> thenCompose = T1().thenCompose(Compress::T2).thenCompose(Compress::T3);Integer result = thenCompose.get();System.out.println(result);}// 假设这些是异步操作,并返回CompletableFuture<Integer>public static CompletableFuture<Integer> T1() {return CompletableFuture.supplyAsync(() -> {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1;});}public static CompletableFuture<Integer> T2(int valueFromT1) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int result = valueFromT1 * 2;try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}return result;});}public static CompletableFuture<Integer> T3(int valueFromT2) {return CompletableFuture.supplyAsync(() -> {// 使用上一步的结果进行计算int finalResult = valueFromT2 + 10;return finalResult;});}

异步操作集合对象入库

public void saveUsers(List<User> users) throws InterruptedException, ExecutionException {// 将大集合分成若干个小集合,每个大小为100(具体分片大小根据实际需求调整)List<List<User>> partitions = users.stream().collect(Collectors.groupingBy(it -> users.indexOf(it) / 100)).values().stream().collect(Collectors.toList());List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {userRepository.save(user); // 假设这是你的保存方法});});futures.add(future);}// 等待所有任务完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();}

CompletableFuture
异常

public static void saveTut(List<User> users) throws InterruptedException, ExecutionException{// 将大集合分成若干个小集合ThreadLocal<AtomicInteger> threadLocal = ThreadLocal.withInitial(() -> new AtomicInteger(0));List<List<User>> partitions = Lists.partition(users, 10);List<CompletableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {partition.forEach(user -> {AtomicInteger value = threadLocal.get();value.set(user.getId());value.incrementAndGet();});});// 添加异常处理器future.exceptionally(ex -> {// 记录异常信息//System.out.println("===="+threadLocal.get().intValue());return null;});futures.add(future);}// 等待所有任务完成,并处理整体完成时的异常CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));allFutures.thenAccept(v -> {System.out.println("All save tasks completed.");}).exceptionally(ex -> {// 记录整体完成时的异常//System.err.println(ex.getMessage());return null;});// 确保阻塞直到所有异步操作完成allFutures.get();
} 

ListenableFuture

public class ListProcessingExample  {private static final ListeningExecutorService EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));public static void main(String[] args) throws InterruptedException {List<User> users = Arrays.asList(new User(1), new User(2), new User(3));//processUsersAsync001(users);}public static void processUsersAsync001(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);//线程数List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<Void>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<Void> future = EXECUTOR_SERVICE.submit(() -> {partition.forEach(user -> {System.out.println("Processing user: " + user.getId());});return null;});Futures.addCallback(future, new FutureCallback<Void>() {@Overridepublic void onSuccess(Void result) {System.out.println("Successfully processed a batch of users.");}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}/*** 增加成功回调获取数据*/public static void processUsersAsync002(List<User> users) throws InterruptedException {// 将用户列表分为3个分区(根据实际情况调整分区数量)int partitionSize = (int) Math.ceil((double) users.size() / 3);List<List<User>> partitions = Lists.partition(users, partitionSize);List<ListenableFuture<ProcessedUserResult>> futures = new ArrayList<>();for (List<User> partition : partitions) {ListenableFuture<ProcessedUserResult> future = EXECUTOR_SERVICE.submit(() -> {ProcessedUserResult result = new ProcessedUserResult();partition.forEach(user -> {System.out.println("Processing user: " + user.getId());result.successfulIds.add(user.getId());});return result;});Futures.addCallback(future, new FutureCallback<ProcessedUserResult>() {@Overridepublic void onSuccess(ProcessedUserResult result) {System.out.println("Successfully processed users with IDs: " + result.successfulIds);}@Overridepublic void onFailure(Throwable t) {System.err.println("Error processing a batch of users, error: " + t.getMessage());}}, MoreExecutors.directExecutor());futures.add(future);}// 等待所有任务完成(这里为了演示阻塞等待,实际应用中可能不需要这一步,因为有回调处理结果)EXECUTOR_SERVICE.shutdown();EXECUTOR_SERVICE.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);}static class ProcessedUserResult {// 用于存储成功处理的用户ID列表private List<Integer> successfulIds = new ArrayList<>();// 用于存储失败处理的用户ID列表private List<Integer> failedIds = new ArrayList<>();public void addSuccessfulId(int userId) {this.successfulIds.add(userId);}public List<Integer> getSuccessfulIds() {return successfulIds;}public void addFailedId(int userId) {this.failedIds.add(userId);}public List<Integer> getFailedIds() {return failedIds;}}static class User {private int id;public User(int id) {this.id = id;}public int getId() {return id;}}
}

这篇关于CompletableFuture、ListenableFuture高级用列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/600679

相关文章

Java中的for循环高级用法

《Java中的for循环高级用法》本文系统解析Java中传统、增强型for循环、StreamAPI及并行流的实现原理与性能差异,并通过大量代码示例展示实际开发中的最佳实践,感兴趣的朋友一起看看吧... 目录前言一、基础篇:传统for循环1.1 标准语法结构1.2 典型应用场景二、进阶篇:增强型for循环2.

使用Python进行GRPC和Dubbo协议的高级测试

《使用Python进行GRPC和Dubbo协议的高级测试》GRPC(GoogleRemoteProcedureCall)是一种高性能、开源的远程过程调用(RPC)框架,Dubbo是一种高性能的分布式服... 目录01 GRPC测试安装gRPC编写.proto文件实现服务02 Dubbo测试1. 安装Dubb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

mysql中的group by高级用法详解

《mysql中的groupby高级用法详解》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,本文给大家介绍mysql中的groupby... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据

Spring Boot集成SLF4j从基础到高级实践(最新推荐)

《SpringBoot集成SLF4j从基础到高级实践(最新推荐)》SLF4j(SimpleLoggingFacadeforJava)是一个日志门面(Facade),不是具体的日志实现,这篇文章主要介... 目录一、日志框架概述与SLF4j简介1.1 为什么需要日志框架1.2 主流日志框架对比1.3 SLF4

Spring Boot集成Logback终极指南之从基础到高级配置实战指南

《SpringBoot集成Logback终极指南之从基础到高级配置实战指南》Logback是一个可靠、通用且快速的Java日志框架,作为Log4j的继承者,由Log4j创始人设计,:本文主要介绍... 目录一、Logback简介与Spring Boot集成基础1.1 Logback是什么?1.2 Sprin

MySQL复合查询从基础到多表关联与高级技巧全解析

《MySQL复合查询从基础到多表关联与高级技巧全解析》本文主要讲解了在MySQL中的复合查询,下面是关于本文章所需要数据的建表语句,感兴趣的朋友跟随小编一起看看吧... 目录前言:1.基本查询回顾:1.1.查询工资高于500或岗位为MANAGER的雇员,同时还要满足他们的姓名首字母为大写的J1.2.按照部门

Python中Flask模板的使用与高级技巧详解

《Python中Flask模板的使用与高级技巧详解》在Web开发中,直接将HTML代码写在Python文件中会导致诸多问题,Flask内置了Jinja2模板引擎,完美解决了这些问题,下面我们就来看看F... 目录一、模板渲染基础1.1 为什么需要模板引擎1.2 第一个模板渲染示例1.3 模板渲染原理二、模板

Spring Boot 整合 SSE的高级实践(Server-Sent Events)

《SpringBoot整合SSE的高级实践(Server-SentEvents)》SSE(Server-SentEvents)是一种基于HTTP协议的单向通信机制,允许服务器向浏览器持续发送实... 目录1、简述2、Spring Boot 中的SSE实现2.1 添加依赖2.2 实现后端接口2.3 配置超时时