串行异步接口在接收时 异步串行接口的基本任务有哪些
论文探讨了如何使用Java CompletableFuture串行高效执行一系列异步任务,将其结果收集到一个列表中。针对常见的挑战,如确保任务按序执行、避免不必要的线程长度,分析了thenApplyAsync和thenCombineAsync的局限性,并详细介绍了两种基于thenCompose的场景方案。通过具体的示例代码和分析原理,旨在帮助开发者在异步复杂下实现CompletableFuture的高级应用,实现优雅且性能的异步流程控制。异步任务优化的串行执行与结果收集挑战
在现代java应用开发中,completablefuture提供了一种强大且灵活的异步编程模型。然而,当需要每个串行执行序列异步任务,将每个任务的结果汇总到一个集合中时,会遇到特定的挑战。这尤其常见于业务流程严格按顺序处理数据,但处理步骤本身又是运行操作的场景。
考虑一个场景:我们有一个流程方法,它返回一个 CompletionStage,代表一个运行且异步的业务操作。现在,我们需要对一系列输入数据依次调用这个流程方法,并最终将所有结果收集到一个 List 中,同时保证每个流程调用在前一个完成后才开始。import java.time.LocalDateTime;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.CompletionStage;import java.util.stream.Collectors;import java.util.stream.IntStream;public class SequentialCompletableFuture { /** * 模拟一个同步的异步业务处理过程。 * 返回一个CompletionStage,其结果为输入a加10。
*/ private CompletionStagelt;Integergt; process(int a) { return CompletableFuture.supplyAsync(() -gt; { System.err.printf(quot;s dispatch d\nquot;, LocalDateTime.now(), a); // 模拟长时间运行的逻辑业务 try { Thread.sleep(10); // 模拟运行 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return a 10; }).whenCompleteAsync((e, t) -gt; { if (t != null) System.err.printf(quot;!!!处理 'd' 时出错!!!\nquot;, a); System.err.printf(quot;s finish d\nquot;, LocalDateTime.now(), e); }); }}登录后复制常见尝试与问题分析
在尝试解决上述问题时,开发者可能会采用以下两种观察但存在局限性的方法:
方法一:使用 thenApplyAsync 唤醒 join()// 第一次尝试Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());CompletionStagelt;Listlt;Integergt;gt; resultStage1 = CompletableFuture.completedFuture(new ArrayListlt;gt;());for (Integer element : arr) { resultStage1 = resultStage1.thenApplyAsync((retList) -gt; { //在thenApplyAsync内部阻塞等待另一个CompletableFuture的结果 Integer a = process(element).toCompletableFuture().join(); retList.add(a); return retList; });}Listlt;Integergt;computeResult1 = resultStage1.toCompletableFuture().join();System.out.println(quot;方法一结果: quot;computeResult1);登录后复制
分析:这个方法确实实现了串行执行和结果
它收集。thenApplyAsync 会在前一个阶段完成后回调函数。由于 process(element).toCompletableFuture().join() 在 thenApplyAsync 的回调内部被调用,会阻塞当前线程直到进程完成任务。这保证了任务的串行性。不过,这种模式认为是“不雅”的,因为它在异步回调内部执行了其阻塞操作。CompletableFuture的设计是避免阻塞,而是通过回调链来处理异步结果。另外,每次 thenApplyAsync 都会调度一个新任务到线程池,如果进程内部也使用了线程池,可能会导致不必要的线程上下文切换和资源消耗。
立即学习“Java免费学习(深入)”;
方法二:使用 thenCombineAsync//第二次尝试Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());CompletionStagelt;Listlt;Integergt;gt; resultStage2 = CompletableFuture.completedFuture(new ArrayListlt;gt;());for (Integer element : arr) { // thenCombineAsync 会并行执行两个CompletionStage resultStage2 = resultStage2.thenCombineAsync(process(element), (existingList, newResult) -gt; {existingList.add(newResult); returnexistingList; });}//尝试获取结果,但由于大量执行,顺序可能不确定或不符合预期 Listlt;Integergt;computeResult2 = resultStage2.toCompletableFuture().join();// System.out.println(quot;方法 2 结果: quot;computeResult2); // 结果可能不按顺序登录后复制
分析: thenCombineAsync 的设计有两个独立的 CompletionStage 的结果合并。 resultStage2 和 process(element) 会被并发执行。在循环中,process(element) 会立即被调度执行,而不会等待一个进程任务完成。因此,方法无法保证任务的串行执行顺序,其输出结果的顺序将是混乱的,与我们的需求不符。的解决方案:利用thenCompose实现优雅串行
thenCompose是CompletableFuture中用于串行化异步操作的关键方法。它允许你将一个CompletionStage的结果推荐作为输入,并返回一个新的CompletionStage。这个实现链式异步操作,即一个异步任务完成然后启动下一个异步任务所需要的。
方案一:通过Void阶段和外部列表收集结果
这种方法的核心思想是维护一个表示当前链条消耗的CompletionStage,并在每个步骤中,在前面一个任务完成后,执行处理任务,然后将处理的结果添加到外部的List中。public class SequentialCompletableFuture { // ... (process方法同上) public static void main(String[] args) { SequentialCompletableFuture app = new SequentialCompletableFuture(); Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); // 方案一:使用 thenCompose 和外部列表 CompletionStagelt;Voidgt; LoopStage = CompletableFuture.completedFuture(null); // 初始化一个已完成的 Voidstage final Listlt;Integergt; resultList = new ArrayListlt;gt;(); //用于收集结果的外部列表 for (Integer element : arr) { LoopStage = LoopStage // thenCompose:等待loopStage完成,然后执行process(element)并返回其CompletionStage .thenCompose(v -gt; app.process(element)) // thenAccept:等待process(element)完成,然后将其结果添加到resultList .thenAccept(resultList::add); } // 暂停等待所有完成任务LoopStage.toCompletableFuture().join(); System.out.println(quot;方法一(thenCompose external list) 结果: quot; resultList); // 预期产出:[11, 12, 13, 14, 15, 16, 17, 18, 19] 且按顺序调度和完成}}登录后复制
原理分析:CompletableFuture.completedFuture(null)创建了一个立即完成的CompletionStage,作为链的起点。
在循环中,loopStage.thenCompose(v -gt;app.process(element))确保了app.process(element)在前一个loopStage完成后才会被调度执行。thenCompose的关键是,它的回调函数返回另一个CompletionStage,并且这个返回的CompletionStage会“撬化”到整个锁链中,使得锁链的下一个操作会等待这个CompletionStage内部完成。thenAccept(resultList::add)写入在process(element)后,将结果添加到resultList中。由于thenCompose保证了进程任务的串行性,thenAccept也按顺序执行,从而保证resultList中的元素顺序是正确的。loopStage.toCompletableFuture().join()阻塞当前线程,直到整个异步锁(即所有进程任务和结果操作添加)全部完成。方案二:通过thenCompose在链中提交列表
此方法将结果列表作为CompletionStage 的结果在链中交付,避免了对外部共享可变状态的直接依赖(尽管ArrayList本身是可变的)。
public class SequentialCompletableFuture { // ... (处理方法同上) public static void main(String[] args) { // ... (方案一代码) // 方案二:在链中传递列表中使用thenCompose Listlt;Integergt; arr = IntStream.range(1, 10).boxed().collect(Collectors.toList()); CompletionStagelt;Listlt;Integergt;gt; listStage = CompletableFuture.completedFuture(new ArrayListlt;gt;()); // 初始阶段包含一个空列表 for (Integer element : arr) { listStage = listStage // thenCompose:等待当前listStage完成,其结果是当前的列表 .thenCompose(list -gt; app.process(element) // 执行流程任务 .thenAccept(list::add) // 流程结果添加到确定的列表中.thenApply(v -gt; list) // 将修改后的列表在此thenCompose的结果传递给下一个阶段 ); } Listlt;Integergt; FinalResultList = listStage.toCompletableFuture().join(); System.out.println(quot;方法2(thenCompose list in chain) Results: quot; FinalResultList); // 预期输出:[11, 12, 13, 14, 15, 16, 17, 18, 19]且按顺序调度和完成 }}登录后复制
原理分析:CompletableFuture.completedFuture(new ArrayList()) 初始化一个CompletionStage,其结果是一个空的ArrayList。在循环中,listStage.thenCompose(...)保证了相同的串行性。内部的app.process(element).thenAccept(list::add).thenApply(v -gt;list) 是关键:app.process(element) 启动异步任务。.thenAccept(list::add) 在 process 任务完成后,将结果添加到从前一个 listStage 递过来的 list 中。
.thenApply(v -gt; list) 将修改后的列表作为当前thenCompose链的结果返回,便于下一个循环能够接收到这个更新后的列表。这种方法将列表的修改逻辑封装在每个thenCompose步骤内部,使得整个链条最终的结果就是包含所有任务结果的列表。注意事项与最佳实践
thenCompose vs. thenApply:thenApply用于将一个CompletionStage的结果转换为另一种类型,其回调函数返回一个普通值。如果回调函数返回一个CompletionStage,那么结果将是CompletionStagegt;这样的形式。thenCompose用于将一个CompletionStage的结果作为输入,并返回一个新的CompletionStage。它会“梳化”,结果避免视线,是实现串口异步操作的正确选择。
线程池管理:CompletableFuture可以使用ForkJoinPool.commonPool()。对于长时间运行或 I/O 密集型任务,建议显式指定一个自定义的 Executor,小区阻塞公共线程池或造成资源消耗。例如,可以使用 thenComposeAsync(Function, Executor)。在上述流程方法中,CompletableFuture.supplyAsync 默认也使用了 commonPool。如果流程内部有阻塞操作,确保线程池配置得当。
错误处理:在实际应用中,需要考虑如何处理异步链中的错误。使用 exceptingly()、handle() 或 whenComplete()来捕获和处理。如果链中某个任务失败,后续任务可能无法执行,或者会以异常状态完成。根据业务需求选择合适的错误传播和恢复策略。
最终结果的获取:toCompletableFuture().join()是一个阻塞操作,它会阻塞当前线程直到CompletableFuture完成并返回结果。在主线程中等待所有异步完成任务时,这通常是中断的。在非阻塞场景下,可以使用 thenAccept 或 thenRun 继续继续来处理最终结果,或者将最终的 CompletableFuture 返回给调用者。总结
通过本文的讨论,我们在 CompletableFuture 中理解了 中实现异步串行执行并收集结果的挑战。thenApplyAsync 配合 join() 虽然能实现串行,但不够优雅;thenCombineAsync 引发并发执行,非用于串行场景。
最终,我们掌握了两种基于 thenCompose 的推荐添加解决方案:方案一:通过维护一个 CompletionStage 链,并使用 thenAccept 将结果到外部共享列表中。这种方法简洁明了,适用于结果收集。方案二:通过在CompletionStagegt;链中提交和更新列表,将结果收集逻辑封装在异步链内部。这种方法更符合函数式编程的理念,减少了对外部可变状态的直接依赖。
选择哪种方案取决于具体的场景和个人偏好,但高效双向有效地解决 CompletableFuture 串行执行和结果收集的问题,并提供了比初步尝试更健壮和优雅的实现方式。掌握 thenCompose 的正确使用是编写、可维护的 CompletableFuture 异步代码的关键。
以上就是 Java CompletableFuture:高效串行处理异步任务流并汇总结果的详细内容,更多请关注乐哥常识网其他相关文章!