V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
rykinia
V2EX  ›  问与答

[ Java ]CrudBoy 想请教一个多线程处理的问题

  •  
  •   rykinia · 2019-07-12 10:20:04 +08:00 · 1066 次点击
    这是一个创建于 1743 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想用多线程把数据库的数据写入 elasticsearch

    如果发生异常,要立即终止整个循环,所以用了 Future

    代码如下

    private Result loadDataFromDbIntoEs(Long maxId) {
        ExecutorService pool = Executors.newWorkStealingPool();
        LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>();
        try {
            //遍历数据库的表
            for (long i = 0; i <= maxId; i += getDbPageSize()) {
                //创建任务
                Callable<Result> task = createTask(i);
                //任务入队
                futureQueue.put(pool.submit(task));
    
                //队列超过一定长度后,先执行掉一部分再继续
                if (futureQueue.size() > QUEUE_SIZE * 8) {
                    if ((i % 100000) == 0) {
                        log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId);
                    }
                    //执行一部分任务
                    checkFuture(futureQueue);
                }
            }
            pool.shutdown();
            //处理队列中剩余的任务
            while (!futureQueue.isEmpty()) {
                checkFuture(futureQueue);
            }
            log.info("{} - sync complete", esEntityClassName);
        } catch (SyncException | InterruptedException | ExecutionException e) {
            //throw...
        }
        return Result.ok();
    }
    
    
    /**
     * 创建任务
     */
    private Callable<Result> createTask(Long currentId) {
        return () -> {
            List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1);
            if (dbList.isEmpty()) {
                //忽略没有数据的 id 区间
                return Result.ok();
            }
    
            //写入 es
            return bulkCreate(dbListToEsList(dbList));
        };
    }
    
    
    /**
     * 消费任务
     */
    private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException {
        for (int i = 0; i < QUEUE_SIZE; i++) {
            Future<Result> future = futureQueue.poll();
            if (future != null) {
                Result result = future.get();
                if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) {
                    throw new SyncException(result.getMessage());
                }
            }
        }
    }
    

    现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?

    4 条回复    2019-07-16 14:33:52 +08:00
    gosansam
        1
    gosansam  
       2019-07-12 11:49:43 +08:00
    Result result = future.get();
    这个获取结果是阻塞的
    zhady009
        2
    zhady009  
       2019-07-12 13:30:03 +08:00
    guava 的 ListeningExecutorService submit 返回 ListenableFuture 应该可以解决
    softtwilight
        3
    softtwilight  
       2019-07-12 13:49:47 +08:00
    可以试试用 completableFuture,checkFuture 可以改 join,如果 io 耗时多,Executors.newWorkStealingPool() 的线程可能有点少,completableFuture 也可以自定义线程池
    rykinia
        4
    rykinia  
    OP
       2019-07-16 14:33:52 +08:00
    谢谢各位的解答。
    研究了几天,感觉主要问题还是 es 的阻塞比较久,不过把这里改成了主线程消费 queue 里面的,线程池里异步往 queue 添加,稍微好了点。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5280 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 07:47 · PVG 15:47 · LAX 00:47 · JFK 03:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.