前期准备 1 2 3 4 5 6 7 8 public static ExecutorService service = Executors.newFixedThreadPool(5 );private static String getThreadInfo () { return " 线程: " + Thread.currentThread().getName() + "_" + Thread.currentThread().getId(); }
启动异步任务 1、runAsync 无入参、无返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static void one () throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("开启异步任务 one..." ); }, service); System.out.println("main end ..." + future.get()); }
2、supplyAsync 无入参,可以获取返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void two () throws InterruptedException, ExecutionException { System.out.println("main start ..." ); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 two..." ); return "开启异步任务,我是返回值" ; }, service); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); }
完成回调与异常感知 1、whenCompleteAsync res, exc: 是把whenCompleteAsync的任务继续提交给线程池来进行执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void three () throws InterruptedException, ExecutionException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务..." + Thread.currentThread().getName() + Thread.currentThread().getId()); int i = 10 / 2 ; if (i == 5 ) { throw new RuntimeException ("远程服务调用失败" ); } return i; }, service).whenCompleteAsync((res, exc) -> { System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId()); System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc); }, defaultExcutor); System.out.println("获取异步任务返回值:" + future.get()); }
2、exceptionally 捕获异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static void three () throws InterruptedException, ExecutionException { System.out.println("main start ..." ); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务..." ); int i = 10 / 2 ; if (i == 5 ) { throw new RuntimeException ("远程服务调用失败" ); } return i; }, service).whenCompleteAsync((res, exc) -> { System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc); }).exceptionally(throwable -> { System.out.println("进入了异常处理,捕获了" + throwable.getMessage() + "异常" ); return 5 ; }); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); }
3、whenComplete res, exc: 是执行当前任务的线程继续执行whenComplete的任务,就有可能是主线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static void three () throws InterruptedException, ExecutionException { CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1..." + Thread.currentThread().getName() + Thread.currentThread().getId()); int i = 10 / 2 ; if (i == 5 ) { throw new RuntimeException ("远程服务调用失败" ); } return i; }, service).whenComplete((res, exc) -> { System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId()); System.out.println("异步任务1完成了,执行结果是:" + res + " 异常是:" + exc); }); System.out.println("获取异步任务1返回值:" + future1.get()); }
4、handleAsync 即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private static void four () throws InterruptedException, ExecutionException { System.out.println("main start ..." ); CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务 four..." ); int i = 10 / 2 ; if (i == 5 ) { throw new RuntimeException ("远程服务调用失败" ); } return i; }, service).handleAsync((res, thr) -> { System.out.println("进入handleAsync方法" ); if (res != null ) { return res * 2 ; } if (thr != null ) { System.out.println("捕获到异常" + thr); return 0 ; } return 0 ; }, service); System.out.println("获取异步任务返回值:" + future.get()); System.out.println("main end ..." ); }
执行结果:
1 2 3 4 5 6 main start ... 开启异步任务 four... 进入handleAsync方法 捕获到异常java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败 获取异步任务返回值:0 main end ...
线程串行化 thenApply thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值。
1 2 3 4 5 6 7 8 9 10 11 private static void five () throws InterruptedException, ExecutionException { CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务thenApply..." + getThreadInfo()); int i = 10 / 2 ; return i; }, service).thenApplyAsync((res) -> { System.out.println("任务3启动了... 上一步的结果是:" + res + getThreadInfo()); return res * 2 ; }, service); System.out.println("获取异步任务最终返回值:" + future3.get()); }
运行结果:
1 2 3 开启异步任务thenApply... 线程: pool-1-thread-1_11 任务3启动了... 上一步的结果是:5 线程: pool-1-thread-2_12 获取异步任务最终返回值:10
thenAccept thenAccept、thenAcceptAsync方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
1 2 3 4 5 6 7 8 9 10 11 12 private static void five () throws InterruptedException, ExecutionException { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务..." + getThreadInfo()); int i = 10 / 2 ; return i; }, service).thenAccept((res) -> { System.out.println("任务2启动了... 上一步的结果是:" + res + getThreadInfo()); }); System.out.println("获取异步任务返回值:" + future1.get() + "\n" ); }
1 2 3 4 5 6 7 8 9 10 11 12 private static void five () throws InterruptedException, ExecutionException { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务..." + getThreadInfo()); int i = 10 / 2 ; return i; }, service).thenAcceptAsync((res) -> { System.out.println("任务2启动了... 上一步的结果是:" + res + getThreadInfo()); }, service); System.out.println("获取异步任务返回值:" + future1.get() + "\n" ); }
运行结果:
1 2 3 开启异步任务... 线程: pool-1-thread-1_11 任务2启动了... 上一步的结果是:5 线程: main_1 获取异步任务返回值:null
thenRun thenRun 获取不到上个任务的执行结果,无返回值。只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
1 2 3 4 5 6 7 8 9 10 11 12 private static void five () throws InterruptedException, ExecutionException { System.out.println("main start ..." ); CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务..." + getThreadInfo()); int i = 10 / 2 ; return i; }, service).thenRun(() -> { System.out.println("任务2启动了..." + getThreadInfo()); }); System.out.println("获取异步任务返回值:" + future.get() + "\n" ); }
运行结果:
1 2 3 4 main start ... 开启异步任务... 线程: pool-1-thread-1_11 任务2启动了... 线程: main_1 获取异步任务返回值:null
两任务组合-两个任务都完才触发该任务 先创建两个异步任务
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1..." ); int i = 10 / 2 ; return i; }, service); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务2..." ); return "hello" ; }, service);
runAfterBothAsync runAfterBoth 没有返回值,入参CompletionStage(传future)、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
1 2 3 4 5 6 7 8 9 10 11 12 private static void six () throws InterruptedException, ExecutionException { future1.runAfterBothAsync(future2, () -> { System.out.println("任务3 启动了...." ); try { System.out.println("future1获取异步任务最终返回值:" + future1.get()); System.out.println("future2获取异步任务最终返回值:" + future2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }, service); }
运行结果:
1 2 3 4 5 开启异步任务1... 开启异步任务2... 任务3 启动了.... future1获取异步任务最终返回值:5 future2获取异步任务最终返回值:hello
thenAcceptBothAsync thenAcceptBothAsync 可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值
1 2 3 4 5 6 7 8 9 10 private static void six () throws InterruptedException, ExecutionException { future3.thenAcceptBothAsync(future4, (res1, res2) -> { System.out.println("thenAcceptBothAsync 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2 + getThreadInfo()); }, service); future3.thenAcceptBoth(future4, (res1, res2) -> { System.out.println("thenAcceptBoth 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2 + getThreadInfo()); }); }
运行结果:
1 2 3 4 开启异步任务1... 开启异步任务2... thenAcceptBothAsync 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello 线程: pool-1-thread-3_13 thenAcceptBoth 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello 线程: main_1
thenCombineAsync thenCombineAsync 可以获取两个任务的返回值,并可以将任务三结果返回
1 2 3 4 5 6 7 private static void six () throws InterruptedException, ExecutionException { CompletableFuture<String> stringCompletableFuture = future5.thenCombineAsync(future6, (res1, res2) -> { System.out.println("thenCombineAsync 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2); return res1 + "-->" + res2; }, service); System.out.println("获取异步任务最终返回值:" + stringCompletableFuture.get()); }
运行结果:
1 2 3 4 开启异步任务1... 开启异步任务2... thenCombineAsync 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello 获取异步任务最终返回值:5-->hello
两任务组合有一个完成 先创建两个异步任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务1..." ); int i = 10 / 2 ; return i; }, service); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("开启异步任务2..." ); return "hello" ; }, service);
runAfterEither runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。 注意:任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。
1 2 3 4 5 6 7 private static void seven () throws ExecutionException, InterruptedException { CompletableFuture<Void> future = future1.runAfterEitherAsync(future2, () -> { System.out.println("任务3 启动了...." ); }, service); }
acceptEither acceptEither 可以获取异步任务1的执行结果,但不返回执行结果。
1 2 3 4 5 6 private static void seven () throws ExecutionException, InterruptedException { CompletableFuture<Void> future = future1.acceptEitherAsync(future2, (res) -> System.out.println("任务3 启动了...., 任务结果是:" + res), service); }
applyToEitherAsync applyToEitherAsync 可以感知结果,并返回执行结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private static void seven () throws ExecutionException, InterruptedException { CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> { System.out.println("开启异步任务5..." ); int i = 10 / 2 ; return i; }, service); CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("开启异步任务6..." ); return 10 ; }, service); CompletableFuture<String> stringCompletableFuture = future5.applyToEitherAsync(future6, (res) -> { System.out.println("任务7 启动了...., 上个任务结果是:" + res); return "我是任务6的返回值, 上个任务的执行结果是:" + res; }, service); System.out.println("任务7" + stringCompletableFuture.get()); }
运行结果:
1 2 3 4 5 开启异步任务5... 任务7 启动了...., 上个任务结果是:5 任务7我是任务6的返回值, 上个任务的执行结果是:5 main end ... 开启异步任务6...
多任务组合执行 先创建三个异步任务
1 2 3 4 5 6 7 8 9 10 11 12 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品图片..." ); return "图片地址" ; }, service); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品属性..." ); return "黑色 256G" ; }, service); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品品牌..." ); return "苹果手机" ; }, service);
allOf allOf 等待所有结果完成
1 2 3 4 5 private static void eight () throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3); System.out.println("等待所有结果完成 allOf:" + future.get()); }
拓展:这种方式可以支持未知数量的异步任务使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品图片..." ); return "图片地址1" ; }, service); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("查询商品图片..." ); return "图片地址2" ; }, service); CompletableFuture[] arrays = {future1, future2}; CompletableFuture<Void> completableFuture = CompletableFuture.allOf(arrays); System.out.println(completableFuture.get()); List<CompletableFuture<String>> completableFutureList = Arrays.asList(future1, future2); CompletableFuture[] completableFutures = completableFutureList.stream().toArray(CompletableFuture[]::new ); CompletableFuture<Void> completableFuture1 = CompletableFuture.allOf(completableFutures); System.out.println("strings:" + completableFuture1.get());
anyOf anyOf 只要有一个任务完成即可执行
1 2 3 4 private static void eight () throws ExecutionException, InterruptedException { CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future4, future5, future6); System.out.println("第一个执行成功的数据:anyOf" + objectCompletableFuture.get()); }
参考资料:
1、CompletableFuture 异步编排详解
2、Java CompletableFuture 之线程编排