前期准备

1
2
3
4
5
6
7
8
/**
* 定义线程池
*/
public static ExecutorService service = Executors.newFixedThreadPool(5);

private static String getThreadInfo() {
return " 线程: " + Thread.currentThread().getName() + "_" + Thread.currentThread().getId();
}

启动异步任务

1、runAsync 无入参、无返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* runAsync:无入参、无返回值
* Executor为空,则采用默认的ForkJoinPool.commonPool线程池,否则使用自定义的ThreadPool
* @throws ExecutionException
* @throws InterruptedException
*/
private static void one() throws ExecutionException, InterruptedException {
// 1、runAsync:无入参、无返回值
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("开启异步任务 one...");
}, service);
// 2、Executor为空,则采用默认的ForkJoinPool.commonPool线程池,否则使用自定义的ThreadPool
// main end ...null
System.out.println("main end ..." + future.get()); // null
}

2、supplyAsync 无入参,可以获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 启动异步任务
* supplyAsync :无入参,可以获取返回值
* @throws InterruptedException
* @throws ExecutionException
*/
private static void two() throws InterruptedException, ExecutionException {
// supplyAsync :无入参,可以获取返回值
System.out.println("main start ...");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 two...");
return "开启异步任务,我是返回值";
}, service);
System.out.println("获取异步任务返回值:" + future.get()); // 开启异步任务,我是返回值
System.out.println("main end ...");
}

完成回调与异常感知

1、whenCompleteAsync res, exc: 是把whenCompleteAsync的任务继续提交给线程池来进行执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void three() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 开启异步任务...pool-1-thread-313
System.out.println("开启异步任务..." + Thread.currentThread().getName() + Thread.currentThread().getId());
int i = 10 / 2;
if (i == 5) {
throw new RuntimeException("远程服务调用失败");
}
return i;
}, service).whenCompleteAsync((res, exc) -> {
// Cant return result value
// 如果没有指明Executor那么执行的就是默认线程池:ForkJoinPool.commonPool-worker-114
System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId());
System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc);
}, defaultExcutor);
System.out.println("获取异步任务返回值:" + future.get());
}

2、exceptionally 捕获异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void three() throws InterruptedException, ExecutionException {
// exceptionally 接口可以接收一个异常,返回异常处理结果。
System.out.println("main start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务...");
int i = 10 / 2;
if (i == 5) {
throw new RuntimeException("远程服务调用失败");
}
return i;
}, service).whenCompleteAsync((res, exc) -> {
// null 异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
System.out.println("异步任务完成了,执行结果是:" + res + " 异常是:" + exc);
}).exceptionally(throwable -> {
// 接口可以接收一个异常,返回异常处理结果。
System.out.println("进入了异常处理,捕获了" + throwable.getMessage() + "异常");
return 5;
});
// 5
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}

3、whenComplete res, exc: 是执行当前任务的线程继续执行whenComplete的任务,就有可能是主线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void three() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1..." + Thread.currentThread().getName() + Thread.currentThread().getId());
int i = 10 / 2;
if (i == 5) {
throw new RuntimeException("远程服务调用失败");
}
return i;
}, service).whenComplete((res, exc) -> {
// 主线程main方法执行,id为1
System.out.println(Thread.currentThread().getName() + Thread.currentThread().getId());
System.out.println("异步任务1完成了,执行结果是:" + res + " 异常是:" + exc);
});
System.out.println("获取异步任务1返回值:" + future1.get());
}

4、handleAsync 即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 完成回调与异常感知 handle
* handleAsync即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。
*
* @throws InterruptedException
* @throws ExecutionException
*/
private static void four() throws InterruptedException, ExecutionException {
System.out.println("main start ...");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务 four...");
int i = 10 / 2;
if (i == 5) {
throw new RuntimeException("远程服务调用失败");
}
return i;
}, service).handleAsync((res, thr) -> {
System.out.println("进入handleAsync方法");
if (res != null) {
return res * 2;
}
if (thr != null) {
System.out.println("捕获到异常" + thr);
return 0;
}
return 0;
}, service);
System.out.println("获取异步任务返回值:" + future.get());
System.out.println("main end ...");
}

执行结果:

1
2
3
4
5
6
main start ...
开启异步任务 four...
进入handleAsync方法
捕获到异常java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值:0
main end ...

线程串行化

thenApply

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值。

1
2
3
4
5
6
7
8
9
10
11
private static void five() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务thenApply..." + getThreadInfo());
int i = 10 / 2;
return i;
}, service).thenApplyAsync((res) -> {
System.out.println("任务3启动了... 上一步的结果是:" + res + getThreadInfo());
return res * 2;
}, service);
System.out.println("获取异步任务最终返回值:" + future3.get());
}

运行结果:

1
2
3
开启异步任务thenApply... 线程: pool-1-thread-1_11
任务3启动了... 上一步的结果是:5 线程: pool-1-thread-2_12
获取异步任务最终返回值:10

thenAccept

thenAccept、thenAcceptAsync方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
private static void five() throws InterruptedException, ExecutionException {
// thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务..." + getThreadInfo());
int i = 10 / 2;
return i;
}, service).thenAccept((res) -> {
// 任务2启动了... 上一步的结果是:5 线程: main_1
System.out.println("任务2启动了... 上一步的结果是:" + res + getThreadInfo());
});
System.out.println("获取异步任务返回值:" + future1.get() + "\n");
}
1
2
3
4
5
6
7
8
9
10
11
12
private static void five() throws InterruptedException, ExecutionException {
// thenAccept 消费处理结果,接收任务的处理结果,并消费处理,无返回结果。
CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务..." + getThreadInfo());
int i = 10 / 2;
return i;
}, service).thenAcceptAsync((res) -> {
// 任务2启动了... 上一步的结果是:5 线程: main_1
System.out.println("任务2启动了... 上一步的结果是:" + res + getThreadInfo());
}, service);
System.out.println("获取异步任务返回值:" + future1.get() + "\n");
}

运行结果:

1
2
3
开启异步任务... 线程: pool-1-thread-1_11
任务2启动了... 上一步的结果是:5 线程: main_1
获取异步任务返回值:null

thenRun

thenRun 获取不到上个任务的执行结果,无返回值。只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
private static void five() throws InterruptedException, ExecutionException {
// thenRun 不能获取上一步的执行结果,并无返回值。
System.out.println("main start ...");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务..." + getThreadInfo());
int i = 10 / 2;
return i;
}, service).thenRun(() -> {
System.out.println("任务2启动了..." + getThreadInfo());
});
System.out.println("获取异步任务返回值:" + future.get() + "\n");
}

运行结果:

1
2
3
4
main start ...
开启异步任务... 线程: pool-1-thread-1_11
任务2启动了... 线程: main_1
获取异步任务返回值:null

两任务组合-两个任务都完才触发该任务

先创建两个异步任务

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1...");
int i = 10 / 2;
return i;
}, service);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务2...");
return "hello";
}, service);

runAfterBothAsync

runAfterBoth 没有返回值,入参CompletionStage(传future)、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)

1
2
3
4
5
6
7
8
9
10
11
12
private static void six() throws InterruptedException, ExecutionException {
// runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
future1.runAfterBothAsync(future2, () -> {
System.out.println("任务3 启动了....");
try {
System.out.println("future1获取异步任务最终返回值:" + future1.get());
System.out.println("future2获取异步任务最终返回值:" + future2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}, service);
}

运行结果:

1
2
3
4
5
开启异步任务1...
开启异步任务2...
任务3 启动了....
future1获取异步任务最终返回值:5
future2获取异步任务最终返回值:hello

thenAcceptBothAsync

thenAcceptBothAsync 可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值

1
2
3
4
5
6
7
8
9
10
private static void six() throws InterruptedException, ExecutionException {
// thenAcceptBothAsync可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值
future3.thenAcceptBothAsync(future4, (res1, res2) -> {
System.out.println("thenAcceptBothAsync 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2 + getThreadInfo());
}, service);
// thenAcceptBoth
future3.thenAcceptBoth(future4, (res1, res2) -> {
System.out.println("thenAcceptBoth 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2 + getThreadInfo());
});
}

运行结果:

1
2
3
4
开启异步任务1...
开启异步任务2...
thenAcceptBothAsync 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello 线程: pool-1-thread-3_13
thenAcceptBoth 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello 线程: main_1

thenCombineAsync

thenCombineAsync 可以获取两个任务的返回值,并可以将任务三结果返回

1
2
3
4
5
6
7
private static void six() throws InterruptedException, ExecutionException {
CompletableFuture<String> stringCompletableFuture = future5.thenCombineAsync(future6, (res1, res2) -> {
System.out.println("thenCombineAsync 任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);
return res1 + "-->" + res2;
}, service);
System.out.println("获取异步任务最终返回值:" + stringCompletableFuture.get());
}

运行结果:

1
2
3
4
开启异步任务1...
开启异步任务2...
thenCombineAsync 任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello
获取异步任务最终返回值:5-->hello

两任务组合有一个完成

先创建两个异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务1...");
int i = 10 / 2;
return i;
}, service);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开启异步任务2...");
return "hello";
}, service);

runAfterEither

runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
注意:任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。

1
2
3
4
5
6
7
private static void seven() throws ExecutionException, InterruptedException {
// runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
// 任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。
CompletableFuture<Void> future = future1.runAfterEitherAsync(future2, () -> {
System.out.println("任务3 启动了....");
}, service);
}

acceptEither

acceptEither 可以获取异步任务1的执行结果,但不返回执行结果。

1
2
3
4
5
6
private static void seven() throws ExecutionException, InterruptedException {
// future1 后执行了 future2,但是仅返回了先执行的future1结果
// 任务3 启动了...., 任务结果是:5 (future1结果)
CompletableFuture<Void> future = future1.acceptEitherAsync(future2, (res)
-> System.out.println("任务3 启动了...., 任务结果是:" + res), service);
}

applyToEitherAsync

applyToEitherAsync 可以感知结果,并返回执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void seven() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future5 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步任务5...");
int i = 10 / 2;
return i;
}, service);
CompletableFuture<Integer> future6 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开启异步任务6...");
return 10;
}, service);
CompletableFuture<String> stringCompletableFuture = future5.applyToEitherAsync(future6, (res) -> {
// future5执行完后,无需等待future6并可感知future5结果执行此内部逻辑
System.out.println("任务7 启动了...., 上个任务结果是:" + res);
return "我是任务6的返回值, 上个任务的执行结果是:" + res;
}, service);
System.out.println("任务7" + stringCompletableFuture.get());
}

运行结果:

1
2
3
4
5
开启异步任务5...
任务7 启动了...., 上个任务结果是:5
任务7我是任务6的返回值, 上个任务的执行结果是:5
main end ...
开启异步任务6...

多任务组合执行

先创建三个异步任务

1
2
3
4
5
6
7
8
9
10
11
12
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片...");
return "图片地址";
}, service);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品属性...");
return "黑色 256G";
}, service);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品品牌...");
return "苹果手机";
}, service);

allOf

allOf 等待所有结果完成

1
2
3
4
5
private static void eight() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
// 等待所有结果完成 返回:null
System.out.println("等待所有结果完成 allOf:" + future.get());
}

拓展:这种方式可以支持未知数量的异步任务使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片...");
return "图片地址1";
}, service);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("查询商品图片...");
return "图片地址2";
}, service);
CompletableFuture[] arrays = {future1, future2};
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(arrays);
System.out.println(completableFuture.get());

List<CompletableFuture<String>> completableFutureList = Arrays.asList(future1, future2);
CompletableFuture[] completableFutures = completableFutureList.stream().toArray(CompletableFuture[]::new);
CompletableFuture<Void> completableFuture1 = CompletableFuture.allOf(completableFutures);
System.out.println("strings:" + completableFuture1.get());

anyOf

anyOf 只要有一个任务完成即可执行

1
2
3
4
private static void eight() throws ExecutionException, InterruptedException {
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future4, future5, future6);
System.out.println("第一个执行成功的数据:anyOf" + objectCompletableFuture.get());
}

参考资料:

1、CompletableFuture 异步编排详解

2、Java CompletableFuture 之线程编排