V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
RedBeanIce
V2EX  ›  Java

CompletableFuture 使用交流(其实是困解)

  •  1
     
  •   RedBeanIce · 2020-09-04 17:00:49 +08:00 · 3521 次点击
    这是一个创建于 1328 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如下代码所示,我使用 CompletableFuture 进行多线程的下载

    但是我的 map 里面有 29 个图片 URL,我只得到了 26 张,所以求助大佬们,

    1,我的代码哪里有问题

    2,求助完整的 CompletableFuture 的使用方式

     private static void downloadCompletableFuture(Map<String, String> map) {
          try {
              List<CompletableFuture<Void>> futureList = new ArrayList<>();
              for (Map.Entry<String, String> stringStringEntry : map.entrySet()) {
                  // image Url
                  String imageUrl = stringStringEntry.getValue();
                  CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
                      @Override
                      public void run() {
                          // download picture
                          DownloadPicture3.download(imageUrl);
                      }
                  });
                  futureList.add(future);
              }
              CompletableFuture<Void> allDoneFuture =
              		CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
              allDoneFuture.get(20, TimeUnit.SECONDS);
          } catch (Exception e) {
              e.printStackTrace();
          } finally {
              log.info("end");
              // 11:27:37.442 [main] INFO com.ice.http.JucDownloadPicture
          }
      }
      ```
    
    29 条回复    2020-09-06 13:34:24 +08:00
    AllanAG
        1
    AllanAG  
       2020-09-04 17:25:02 +08:00
    既然使用了 CompletableFuture,最好使用异步的方式完成整个流程。
    1 图片下载不够,推测是超时时间太短,allDoneFuture.get(20, TimeUnit.SECONDS);20s 执行时间不够
    2 可以下面那段代码修改成这种方式试试
    ```
    CompletableFuture<Void> allDoneFuture =
    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
    allDoneFuture.whenCompleteAsync((void1, void2) -> {
    // 所有完成回调
    log.info("end");
    });
    ```
    RedBeanIce
        2
    RedBeanIce  
    OP
       2020-09-04 17:32:19 +08:00
    @AllanAG
    #1 谢谢!!!!!!我现在去试试。
    RedBeanIce
        3
    RedBeanIce  
    OP
       2020-09-04 17:38:27 +08:00
    @AllanAG

    #1 实际上不行,whenCompleteAsync 虽然是在获得结果完成后执行,但是实际上,一张图片也没有,log 也没有打印

    ```
    private static void downloadCompletableFuture2(Map<String, String> map) {
    try {
    List<CompletableFuture<Void>> futureList = new ArrayList<>();
    for (Map.Entry<String, String> stringStringEntry : map.entrySet()) {
    // image Url
    String imageUrl = stringStringEntry.getValue();
    CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
    // download picture
    log.info("下载所花时间 = " + DownloadPicture3.download(imageUrl));
    }
    });
    futureList.add(future);
    }
    CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
    allDoneFuture.whenCompleteAsync((void1, void2) -> {
    // 所有完成回调
    log.info("================================end");
    });

    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    log.info("end");
    // 11:27:37.442 [main] INFO com.ice.http.JucDownloadPicture
    }
    }
    ```
    wysnylc
        4
    wysnylc  
       2020-09-04 18:11:06 +08:00
    收集的资料
    putaozhenhaochi
        5
    putaozhenhaochi  
       2020-09-04 18:19:07 +08:00 via Android
    用有返回结果的方法控制看看
    mango88
        6
    mango88  
       2020-09-04 18:43:32 +08:00
    @RedBeanIce

    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();

    阻塞等待所有的完成
    RedBeanIce
        7
    RedBeanIce  
    OP
       2020-09-04 18:44:38 +08:00
    @wysnylc
    #4 图裂开。
    RedBeanIce
        8
    RedBeanIce  
    OP
       2020-09-04 18:48:13 +08:00
    @mango88

    #6 不行,仍然少了三张

    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
    RedBeanIce
        9
    RedBeanIce  
    OP
       2020-09-04 18:48:58 +08:00
    @putaozhenhaochi
    #5 求推荐,我已经人傻了。。。。。
    mango88
        10
    mango88  
       2020-09-04 18:50:27 +08:00
    @RedBeanIce

    少三张,可能就不是并行的原因了,也许是下载报错了吧
    RedBeanIce
        11
    RedBeanIce  
    OP
       2020-09-04 18:55:11 +08:00
    RedBeanIce
        12
    RedBeanIce  
    OP
       2020-09-04 18:56:58 +08:00
    @mango88
    #10

    我同时使用一个普通方法下载,然后使用的 completablefuture 下载,一前一后执行,,前面的普通方法还是 29 张,但是一到后面这个就少了 2-3 张,每次执行不等。
    putaozhenhaochi
        13
    putaozhenhaochi  
       2020-09-04 19:09:06 +08:00
    @RedBeanIce 哈哈 CompletableFuture 不熟

    要不试试 Stream 并行:
    map.values().stream().parallel().forEach( v->{
    System.out.println(v);
    });
    cheng6563
        14
    cheng6563  
       2020-09-04 19:11:56 +08:00 via Android
    Java 这几个 API 实在太复杂了
    zhady009
        15
    zhady009  
       2020-09-04 19:25:53 +08:00
    用这个都要加个 exceptionally(tx -> {log...})
    不然你找不出问题
    mango88
        16
    mango88  
       2020-09-04 22:17:58 +08:00
    奇怪了,看起来没啥问题。DownloadPictutre 方法能简单贴一下吗 ?
    cs419
        17
    cs419  
       2020-09-04 22:59:01 +08:00
    排查手段不详细
    既然下载有缺失
    那把那 3 个失败的单独跑一遍代码是什么结果 (排除这 3 个链接就有问题)
    再针对下载成功的 26 个链接,下载 50 次 又是啥结果 (排除任务太多)

    在方法 DownloadPicture3.download(imageUrl); 前后打印序号
    确保 download 方法 成功的确完成了 50 次调用
    如果这里没毛病,那就是 download 方法内部执行出了问题

    获取结果 allDoneFuture.get() 不使用超时时间
    allDoneFuture.exceptionally(e->{
    System.out.println("出错-"+ e.getMessage());
    e.printStackTrace();
    });
    打印报错
    Narcissu5
        18
    Narcissu5  
       2020-09-04 23:29:59 +08:00
    ```java
    DownloadPicture3.download(imageUrl);
    ```

    比较可能是这个方法有线程安全的问题。另外如果 IO 不是异步的,使用异步就没意义。如果 IO 是异步的,那么这个方法本身就应该返回的是 Future,不需要在 runAsync 。另外 runAsync 本身会使用`ForkJoinPool#commonPool()`,而 ForkJoinPool 里面是不能放异步方法的,你这样子可能把整个 JVM 都拖慢
    coldear
        19
    coldear  
       2020-09-05 00:27:05 +08:00
    add more logs to debug.
    allan888
        20
    allan888  
       2020-09-05 00:53:36 +08:00
    download 之前 random 的等几秒钟试试?有可能有的网站图片会防止太多并发的连接。
    大概这样:
    Thread.sleep(ThreadLocalRandom.current().nextInt(0, 15000));
    DownloadPicture3.download(imageUrl);
    isir1234
        21
    isir1234  
       2020-09-05 15:33:30 +08:00
    runAsync 后加上异常处理试试

    比如 CompletableFuture.runAsync(()->xxx).exceptionally(e -> {
    // print exception here
    return null;
    }))
    RedBeanIce
        22
    RedBeanIce  
    OP
       2020-09-06 01:06:42 +08:00
    @zhady009
    @mango88
    @cs419
    @Narcissu5
    @coldear
    @allan888
    @isir1234

    https://www.yuque.com/docs/share/61f38a49-764c-4b6e-9271-53e06fc0d32d?#

    各位大佬代码已经贴出来了,,大佬们可以在自己的电脑执行链接中的代码,
    大佬们的方法我都试了一下,好像不行

    1,也没有报错的 log
    2,
    RedBeanIce
        23
    RedBeanIce  
    OP
       2020-09-06 01:11:36 +08:00
    @putaozhenhaochi

    private static void downloadStream(Map<String, String> map) throws IOException {
    map.values().parallelStream().forEach(new Consumer<String>() {
    @SneakyThrows
    @Override
    public void accept(String s) {
    Long download = download(s);
    System.out.println(download);
    }
    });
    }
    RedBeanIce
        24
    RedBeanIce  
    OP
       2020-09-06 01:12:11 +08:00
    #13
    同样也会丢很多张,,,详情代码就是在上面链接代码里面,加了一个方法
    amiwrong123
        25
    amiwrong123  
       2020-09-06 11:16:56 +08:00
    个人怀疑,是不是 ForkJoinPool#commonPool()的坑,难道是在 supplyAsync 内部提交 task 给 commonPool 的时候执行了什么奇怪的拒绝策略。

    建议使用 supplyAsync(Supplier<U> supplier, Executor executor),自己给一个线程池,排除一下线程池的原因。
    cs419
        26
    cs419  
       2020-09-06 12:41:42 +08:00
    测了下代码 下载数量正确
    代码贴在语雀下面了
    你再试下,不行就让你同事也运行下
    没准是你环境的问题
    RedBeanIce
        27
    RedBeanIce  
    OP
       2020-09-06 13:24:18 +08:00
    @cs419

    #26 大佬我试了一下您的方案,还是不行,不过我在 QQ 群朋友的帮助下解决了。
    https://www.yuque.com/docs/share/4ba58651-ac10-46ae-9175-1c5b43ec97ec?#
    RedBeanIce
        28
    RedBeanIce  
    OP
       2020-09-06 13:33:15 +08:00
    此贴 end

    下面贴代码(上面的是大佬的方案,下面是测试 3 次的代码,可行) base64

    aHR0cHM6Ly93d3cueXVxdWUuY29tL2RvY3Mvc2hhcmUvNGJhNTg2NTEtYWMxMC00NmFlLTkxNzUtMWM1YjQzZWM5N2VjPyM=

    错误原因:错误的使用 long startTime = System.currentTimeMillis() 作为文件的名字,文件被覆盖了

    解决措施:使用 AtomicLong.incrementAndGet()自增,原子性的增加然后返回的操作

    另外:LongAdder 由于没有 incrementAndGet,所以只能 increment(),然后 longvalue(),这样不是原子的操作,所以也会覆盖(中间有一个版本,我没有使用 System.currentTimeMillis(),使用了 LongAdder 仍然失败了)

    总结:多线程好难啊!!!!
    RedBeanIce
        29
    RedBeanIce  
    OP
       2020-09-06 13:34:24 +08:00
    各位大佬,谢谢指教!!!!太强了

    贴一下 26 楼的大佬
    https://www.yuque.com/docs/share/6065b121-4732-4826-bac2-3bb356f0461e?#
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5768 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 06:28 · PVG 14:28 · LAX 23:28 · JFK 02:28
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.