创建线程的方式只有两种:继承Thread或者实现Runnable接口。 但是这两种方法都存在一个缺陷,没有返回值。也就是说我们无法得知线程执行结果。
Java 1.5 以后的Callable和Future接口就解决了这个问题,我们可以通过向线程池提交一个Callable来获取一个包含返回值的Future对象,从此,我们的程序逻辑就不再是同步顺序。
Future接口在Java5中被引入,设计初衷是对将来某个时刻会产生的结果进行建模。它建模了一种异步运算,返回一个执行结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作完成。
Future接口的局限性
当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程
Future执行耗时任务
由此我们得知,Future获取得线程执行结果前,我们的主线程get()得到结果需要一直阻塞等待,即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。
当Future的线程进行了一个非常耗时的操作,那我们的主线程也就阻塞了。 当我们在简单业务上,可以使用Future的另一个重载方法get(long,TimeUnit)来设置超时时间,避免我们的主线程被无穷尽地阻塞。 不过,有没有更好的解决方案呢?
以下业务场景,单纯使用Future接口或者FutureTask类并不能很好地完成以下我们所需的业务
- •将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
- •等待Future集合种的所有任务都完成。
- •仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。
- •通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
- •应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)
CompletableFuture
可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
Java8实战:
同步获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
CompletableFuture<Integer>future=newCompletableFuture<>();
Integer integer=future.get();
get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。
join() 与get() 区别在于join() 返回计算的结果或者抛出一个unchecked异常(CompletionException),而get() 返回一个具体的异常.
getNow() 则有所区别,参数valueIfAbsent的意思是当计算结果不存在或者Now时刻没有完成任务,给定一个确定的值。
计算完成后续操作1——complete
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
方法1和2的区别在于是否使用异步处理,2和3的区别在于是否使用自定义的线程池,前三个方法都会提供一个返回结果和可抛出异常,我们可以使用lambda表达式的来接收这两个参数,然后自己处理。 方法4,接收一个可抛出的异常,且必须return一个返回值,类型与钻石表达式种的类型一样,详见下文的exceptionally() 部分,更详细
CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{
return10086;
});
future.whenComplete((result,error)->{
System.out.println(“拨打”+result);
error.printStackTrace();
});
计算完成后续操作2——handle
public<U>CompletableFuture<U>handle(BiFunction<?superT,Throwable,?extendsU>fn)
public<U>CompletableFuture<U>handleAsync(BiFunction<?superT,Throwable,?extendsU>fn)
public<U>CompletableFuture<U>handleAsync(BiFunction<?superT,Throwable,?extendsU>fn,Executor executor)
handle方法集和上面的complete方法集没有区别,同样有两个参数一个返回结果和可抛出异常,区别就在于返回值,没错,虽然同样返回CompletableFuture类型,但是里面的参数类型,handle方法是可以自定义的。
// 开启一个异步方法
CompletableFuture<List>future=CompletableFuture.supplyAsync(()->{
List<String>list=newArrayList<>();
list.add(“语文”);
list.add(“数学”);
// 获取得到今天的所有课程
returnlist;
});
// 使用handle()方法接收list数据和error异常
CompletableFuture<Integer>future2=future.handle((list,error)->{
// 如果报错,就打印出异常
error.printStackTrace();
// 如果不报错,返回一个包含Integer的全新的CompletableFuture
returnlist.size();
// 注意这里的两个CompletableFuture包含的返回类型不同
});
计算完成的后续操作3——apply
publicCompletableFuture<Void>thenAccept(Consumer<?superT>action)
publicCompletableFuture<Void>thenAcceptAsync(Consumer<?superT>action)
publicCompletableFuture<Void>thenAcceptAsync(Consumer<?superT>action,Executor executor)
accept()三个方法只做最终结果的消费,注意此时返回的CompletableFuture是空返回。只消费,无返回,有点像流式编程的终端操作。
例子:请看下面的exceptionally()示例
捕获中间产生的异常——exceptionally
publicCompletableFuture<T>exceptionally(Function<Throwable,?extendsT>fn)
exceptionally() 可以帮我们捕捉到所有中间过程的异常,方法会给我们一个异常作为参数,我们可以处理这个异常,同时返回一个默认值,跟服务降级 有点像,默认值的类型和上一个操作的返回值相同。 小贴士 :向线程池提交任务的时候发生的异常属于外部异常,是无法捕捉到的,毕竟还没有开始执行任务。作者也是在触发线程池拒绝策略的时候发现的。exceptionally() 无法捕捉RejectedExecutionException()
// 实例化一个CompletableFuture,返回值是Integer
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 返回null
return null;
});
CompletableFuture<String> exceptionally = future.thenApply(result -> {
// 制造一个空指针异常NPE
int i = result;
return i;
}).thenApply(result -> {
// 这里不会执行,因为上面出现了异常
String words = "现在是" + result + "点钟";
return words;
}).exceptionally(error -> {
// 我们选择在这里打印出异常
error.printStackTrace();
// 并且当异常发生的时候,我们返回一个默认的文字
return "出错啊~";
});
exceptionally.thenAccept(System.out::println);
}
组合式异步编程
组合两个completableFuture
还记得我们上面说的Future做不到的事吗
- •将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
thenApply()
假设一个场景,我是一个小学生,我想知道今天我需要上几门课程 此时我需要两个步骤,1.根据我的名字获取我的学生信息 2.根据我的学生信息查询课程 我们可以用下面这种方式来链式调用api,使用上一步的结果进行下一步操作
CompletableFuture<List<Lesson>>future=CompletableFuture.supplyAsync(()->{
// 根据学生姓名获取学生信息
returnStudentService.getStudent(name);
}).thenApply(student->{
// 再根据学生信息获取今天的课程
returnLessonsService.getLessons(student);
});
thenCompose()假设一个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校
CompletableFuture<List<String>> total = CompletableFuture.supplyAsync(() -> {
// 第一个任务获取美术课需要带的东西,返回一个list
List<String> stuff = new ArrayList<>();
stuff.add("画笔");
stuff.add("颜料");
return stuff;
}).thenCompose(list -> {
// 向第二个任务传递参数list(上一个任务美术课所需的东西list)
CompletableFuture<List<String>> insideFuture = CompletableFuture.supplyAsync(() -> {
List<String> stuff = new ArrayList<>();
// 第二个任务获取劳技课所需的工具
stuff.add("剪刀");
stuff.add("折纸");
// 合并两个list,获取课程所需所有工具
List<String> allStuff = Stream.of(list, stuff).flatMap(Collection::stream).collect(Collectors.toList());
return allStuff;
});
return insideFuture;
});
System.out.println(total.join().size());
我们通过CompletableFuture.supplyAsync()方法创建第一个任务,获得美术课所需的物品list,然后使用thenCompose()接口传递list到第二个任务,然后第二个任务获取劳技课所需的物品,整合之后再返回。至此我们完成两个任务的合并。 (说实话,用compose去实现这个业务场景看起来有点别扭,我们看下一个例子)
- •将两个异步计算合并为一个,这两个异步计算之间相互独立,互不依赖
thenCombine()
还是上面那个场景,我是一个小学生,今天有劳技课和美术课,我需要查询到今天需要带什么东西到学校
CompletableFuture<List<String>>painting=CompletableFuture.supplyAsync(()->{
// 第一个任务获取美术课需要带的东西,返回一个list
List<String>stuff=newArrayList<>();
stuff.add(“画笔”);
stuff.add(“颜料”);
returnstuff;
});
CompletableFuture<List<String>>handWork=CompletableFuture.supplyAsync(()->{
// 第二个任务获取劳技课需要带的东西,返回一个list
List<String>stuff=newArrayList<>();
stuff.add(“剪刀”);
stuff.add(“折纸”);
returnstuff;
});
CompletableFuture<List<String>>total=painting
// 传入handWork列表,然后得到两个CompletableFuture的参数Stuff1和2
.thenCombine(handWork,(stuff1,stuff2)->{
// 合并成新的list
List<String>totalStuff=Stream.of(stuff1,stuff1)
.flatMap(Collection::stream)
.collect(Collectors.toList());
returntotalStuff;
});
System.out.println(JSONObject.toJSONString(total.join()));
• 等待Future集合中的所有任务都完成。
获取所有完成结果——allOfpublic static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
allOf方法,当所有给定的任务完成后,返回一个全新的已完成CompletableFuture
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
//使用sleep()模拟耗时操作
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 2;
});
CompletableFuture.allOf(future1, future1);
// 输出3
System.out.println(future1.join()+future2.join());获取率先完成的任务结果——anyOf
*
• 仅等待Future集合种最快结束的任务完成(有可能因为他们试图通过不同的方式计算同一个值),并返回它的结果。 小贴士 :如果最快完成的任务出现了异常,也会先返回异常,如果害怕出错可以加个exceptionally() 去处理一下可能发生的异常并设定默认返回值
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
throw new NullPointerException();
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
// 睡眠3s模拟延时
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Object> anyOf = CompletableFuture
.anyOf(future, future2)
.exceptionally(error -> {
error.printStackTrace();
return 2;
});
System.out.println(anyOf.join());
几个小例子
多个方法组合使用
- •通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
- •应对Future的完成时间(即当Future的完成时间完成时会收到通知,并能使用Future的计算结果进行下一步的的操作,不只是简单地阻塞等待操作的结果)
CompletableFuture.supplyAsync(() -> 1)
.whenComplete((result, error) -> {
System.out.println(result);
error.printStackTrace();
})
.handle((result, error) -> {
error.printStackTrace();
return error;
})
.thenApply(Object::toString)
.thenApply(Integer::valueOf)
.thenAccept((param) -> System.out.println("done"));
long begin = System.currentTimeMillis();
// 自定义一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
// 循环创建10个CompletableFuture
List<CompletableFuture<Integer>> collect = IntStream.range(1, 10).mapToObj(i -> {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 在i=5的时候抛出一个NPE
if (i == 5) {
throw new NullPointerException();
}
try {
// 每个依次睡眠1-9s,模拟线程耗时
TimeUnit.SECONDS.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i);
return i;
}, executorService)
// 这里处理一下i=5时出现的NPE
// 如果这里不处理异常,那么异常会在所有任务完成后抛出,小伙伴可自行测试
.exceptionally(Error -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
return future;
}).collect(Collectors.toList());
// List列表转成CompletableFuture的Array数组,使其可以作为allOf()的参数
// 使用join()方法使得主线程阻塞,并等待所有并行线程完成
CompletableFuture.allOf(collect.toArray(new CompletableFuture[]{})).join();
System.out.println("最终耗时" + (System.currentTimeMillis() - begin) + "毫秒");
executorService.shutdown();
使用CompletableFuture场景
- •执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度
- •使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常
- •如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个
小贴士 :测试多线程的小伙伴请勿使用JUit单元测试,因为JUnit在主线程完成之后就会关闭JVM