V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
amiwrong123
V2EX  ›  Java

为什么 CompletableFuture 的 thenApplyAsync 没有新起一个线程?

  •  
  •   amiwrong123 · 2020-08-22 15:16:00 +08:00 · 2336 次点击
    这是一个创建于 1590 天前的主题,其中的信息可能已经有所发展或是发生改变。
        public static void test() {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                String supplyAsyncResult = " "+Thread.currentThread().getName()+" Hello world! ";
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(supplyAsyncResult);
                return supplyAsyncResult;
            }).thenApplyAsync(r -> {  //添加后续任务
                String thenApplyResult = Thread.currentThread().getName()+r + " thenApply! ";
                System.out.println(thenApplyResult);
                return thenApplyResult;
            });
    
            try {
                System.out.println(completableFuture.get() + " finish!");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    打印:

     ForkJoinPool.commonPool-worker-9 Hello world! 
    ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply! 
    ForkJoinPool.commonPool-worker-9 ForkJoinPool.commonPool-worker-9 Hello world!  thenApply!  finish!
    
            public void run() {
                CompletableFuture<T> d; Supplier<T> f;
                if ((d = dep) != null && (f = fn) != null) {
                    dep = null; fn = null;  //只是为了防止内存泄漏,方便 GC
                    if (d.result == null) {
                        try {
                            d.completeValue(f.get());  //执行 task
                        } catch (Throwable ex) {       //执行 task 期间抛出了异常
                            d.completeThrowable(ex);
                        }
                    }
                    d.postComplete();
                }
            }
    

    从源码上来看,supplyAsync 新起了一个线程,等到线程执行完 task,开始执行 d.postComplete(),即开始执行后续 task,然后 postComplete 会执行后续 task 的 completion 对象的 tryFire 方法。

        static final class UniApply<T,V> extends UniCompletion<T,V> {
            Function<? super T,? extends V> fn;
            UniApply(Executor executor, CompletableFuture<V> dep,
                     CompletableFuture<T> src,
                     Function<? super T,? extends V> fn) {
                super(executor, dep, src); this.fn = fn;
            }
            final CompletableFuture<V> tryFire(int mode) {
                CompletableFuture<V> d; CompletableFuture<T> a;
                if ((d = dep) == null ||
                    !d.uniApply(a = src, fn, mode > 0 ? null : this))//这里会发现前一个 stage 执行完毕,但提供了线程池
                    return null;
                dep = null; src = null; fn = null;
                return d.postFire(a, mode);
            }
        }
    
        final <S> boolean uniApply(CompletableFuture<S> a,
                                   Function<? super S,? extends T> f,
                                   UniApply<S,T> c) {
            Object r; Throwable x;
            if (a == null || (r = a.result) == null || f == null)
                return false;
            tryComplete: if (result == null) {
                if (r instanceof AltResult) {
                    if ((x = ((AltResult)r).ex) != null) {
                        completeThrowable(x, r);
                        break tryComplete;
                    }
                    r = null;
                }
                try {
                    if (c != null && !c.claim())//会执行到这里,然后发现 claim 返回 false
                        return false;
                    @SuppressWarnings("unchecked") S s = (S) r;
                    completeValue(f.apply(s));
                } catch (Throwable ex) {
                    completeThrowable(ex);
                }
            }
            return true;
        }
    
            final boolean claim() {
                Executor e = executor;
                if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                    if (e == null)
                        return true;
                    executor = null; // disable
                    e.execute(this); //会执行到这里,然后把 this completion 对象提交给线程池执行,当前线程即将返回
                }
                return false;
            }
    

    我的问题在于,当 worker-9 线程执行完第一个 task 之后,它把第二个 task 提交给了 executor (e.execute(this)),然后线程就返回了(从 claim 函数一层一层返回,直到返回 postComplete )。那为什么第二个 task 从打印结果来看,还是同一个 worker-9 线程来执行的?

    还是说,只是因为我的例子比较简单,所以 executor 没有分配一个新的线程出来,其他情况下,thenApplyAsync 里面在执行e.execute(this)时,还是有可能新起一个线程的吗?

    8 条回复    2020-08-24 19:15:47 +08:00
    passerbytiny
        1
    passerbytiny  
       2020-08-22 15:47:17 +08:00 via Android
    supplyAsync 和 thenApplyAsync 虽然都是异步调用,但它们两个之间是串行的,为什么就不能在一个线程(执行器)中被执行。
    amiwrong123
        2
    amiwrong123  
    OP
       2020-08-22 15:56:32 +08:00
    @passerbytiny
    没说不可以,它们之间肯定是串行的,但不一定是同一个线程吧。从源码上可见,supplyAsync 的线程并不是直接执行下一个 task 的,因为它 e.execute(this)之后就马上返回了。
    zyoo
        3
    zyoo  
       2020-08-22 16:05:42 +08:00
    async 的语义是不一定同一个线程,所以这个只能说是巧合了,你可以多试几把?
    amiwrong123
        4
    amiwrong123  
    OP
       2020-08-22 16:12:49 +08:00
    @zyoo
    多试几次也一样。我怀疑这跟 ForkJoinPool.commonPool()的线程调度有关系,但我现在还没来得及看它的原理呢。。
    passerbytiny
        5
    passerbytiny  
       2020-08-22 17:02:26 +08:00 via Android
    @amiwrong123 异步任务都是将任务提交给执行器去执行的,而不是从线程池取出一个线程用来执行任务。选择哪个线程是由执行器自行决定的,任务的提交者很难也不该对线程选择产生影响。

    从高层次上看,thenApplyAsync 是在 applyAsync 完成之后执行的,所以最优选择就是两者使用同一个线程(线程唤醒也是有成本的)。从源码看,你要主要看的应该是 Executor 的源码。
    amiwrong123
        6
    amiwrong123  
    OP
       2020-08-22 17:18:45 +08:00
    @passerbytiny
    好吧,大概理解了。主要之前我以为我这个例子,applyAsync 和 thenApplyAsync 的执行线程肯定是同一个线程,但从源码上看发现 前一个线程只是提交任务给 Executor 而已。

    所以,applyAsync 和 thenApplyAsync 的执行线程不一定是同一个呗。只是这个例子里,线程池是这样调度的。
    XuHuan1025
        7
    XuHuan1025  
       2020-08-22 22:20:55 +08:00
    木宝厉害哦
    RedBeanIce
        8
    RedBeanIce  
       2020-08-24 19:15:47 +08:00
    @amiwrong123 #5
    @passerbytiny #6

    JDK8
    求问一下,我也是看到这里将任务提交到 Executor
    e.execute(new AsyncRun(d, f));
    那么下一步应该看 forkjoinpool ?因为默认是他。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2626 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 21ms · UTC 07:31 · PVG 15:31 · LAX 23:31 · JFK 02:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.