文章

CompletableFuture的使用

CompletableFuture使用

一、核心API

  • public static CompletableFuture runAsync(Runnable runnable)
  • public static CompletableFuture runAsync(Runnable runnable,Executor executor)
  • public static CompletableFuture supplyAsync(Supplier supplier)
  • public static CompletableFuture supplyAsync(Supplier supplier,Executor executor)

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

线程池参数设置可以参考

CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费。

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间。

无返回值 使用

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
            //暂停几秒钟线程
            try { 
                TimeUnit.SECONDS.sleep(1); 
            } catch (InterruptedException e) {
                 e.printStackTrace(); 
            }
            System.out.println("-----task over");
        });
        System.out.println(future.get());
    }

点击并拖拽以移动

有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
            //暂停几秒钟线程
            try { 
                TimeUnit.SECONDS.sleep(1); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            }
            System.out.println("-----task over");
            return 1;
        });
        System.out.println(future.get());
    }

点击并拖拽以移动

二、CompletableFuture常用方法

1、获得结果和触发计算

  • public T get() 主线程阻塞的等待返回结果。
  • public T get(long timeout, TimeUnit unit) 带超时时间主线程阻塞的等待返回结果。
  • public T getNow(T valueIfAbsent) 没有计算完成的情况下,返回一个默认结果
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1;
        });
        //计算没有完成,返回0,计算完成,返回计算结果
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(completableFuture.getNow(0));
    }

点击并拖拽以移动

  • public T join() 完成后返回结果值
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        System.out.println(CompletableFuture.supplyAsync(() -> "hello")
                .thenApply(r -> r + " world").join());
    }

点击并拖拽以移动

  • public boolean complete(T value) 如果尚未完成,将返回的值get()种相关方法为给定值
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException
    {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 1;
        });

        //注释掉暂停线程,get还没有算完只能返回complete方法设置的444;暂停2秒钟线程,异步线程能够计算完成返回get
//        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }

        //当调用CompletableFuture.get()被阻塞的时候,complete方法就是结束阻塞并get()获取设置的complete里面的值.
        System.out.println(completableFuture.complete(0)+"\t"+completableFuture.get());
        
    }

点击并拖拽以移动

2、对计算结果进行处理

  • public boolean thenApply(T value)
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        //当一个线程依赖另一个线程时用 thenApply 方法来把这两个线程串行化,
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 111;
        },threadPoolExecutor).thenApply(f -> {
             //int age = 10/0; // 异常情况:那步出错就停在那步。
            System.out.println("222");
            return f + 1;
        }).thenApply(f -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("-----v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });

        threadPoolExecutor.shutdown();
    }

点击并拖拽以移动

  • handle
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 1L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        //当一个线程依赖另一个线程时用 handle 方法来把这两个线程串行化,
        // 异常情况:有异常也可以往下一步走,根据带的异常参数可以进一步处理
        CompletableFuture.supplyAsync(() -> {
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("111");
            return 111;
        },threadPoolExecutor).handle((f,e) -> {
            int age = 10/0;
            System.out.println("222");
            return f + 1;
        }).handle((f,e) -> {
            System.out.println("333");
            return f + 1;
        }).whenCompleteAsync((v,e) -> {
            System.out.println("*****v: "+v);
        }).exceptionally(e -> {
            e.printStackTrace();
            return null;
        });
        threadPoolExecutor.shutdown();
    }

点击并拖拽以移动

whenComplete和whenCompleteAsync的区别:
whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行。

3、对计算结果进行消费

  • thenAccept 接收任务的处理结果,并消费处理,无返回结果
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        CompletableFuture.supplyAsync(() -> 1)
                .thenApply(f -> f + 2)
                .thenApply(f -> f + 3)
                .thenApply(f -> f + 4)
                .thenAccept(System.out::println);
    }

点击并拖拽以移动

thenRun(Runnable runnable) 任务 A 执行完执行 B,并且 B 不需要 A 的结果

thenAccept(Consumer action) 任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值

thenApply(Function fn) 任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值

4、对计算速度进行选用

  • applyToEither
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
            return 10;
        });

        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return 20;
        });

        CompletableFuture<Integer> thenCombineResult = completableFuture1.applyToEither(completableFuture2,f -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in ");
            return f + 1;
        });

        System.out.println(Thread.currentThread().getName() + "\t" + thenCombineResult.get());
    }

点击并拖拽以移动

5、对计算结果进行合并

  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

两个CompletionStage任务都完成后,最终能把两个任务的结果一起交给thenCombine 来处理

    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        CompletableFuture<Integer> thenCombineResult = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 1");
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 2");
            return 20;
        }), (x,y) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 3");
            return x + y;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 4");
            return 30;
        }),(a,b) -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "---come in 5");
            return a + b;
        });
        System.out.println("-----主线程结束,END");
        System.out.println(thenCombineResult.get());
        
    }

点击并拖拽以移动

三、使用实例

下面是java虚拟线程官方视频的一个使用示例,翻译后视频连接【通向Java21-02-Java虚拟线程】 https://www.bilibili.com/video/BV1Ju4y1Q788/?share_source=copy_web&vd_source=36048a8ec755b67c73e0b0233a43f92b

下面代码意思是确保用户已保存在数据库中。 然后取出购物车。然后循环用户选择的商品,计算总价。 然后调用支付服务,并记录交易ID。 通过所有这些信息,发送包含交易所有详细信息的电子邮件

img

2、使用Completablefutrue获取一组数据的详细信息

        List<Object> collect = list.stream().map(v ->
                CompletableFuture.supplyAsync(() -> userService.getById(v))
).collect(Collectors.toList())
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
License:  CC BY 4.0