V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
DeadLion
V2EX  ›  Java

关于微服务并行调用优化的想法

  •  
  •   DeadLion · 2018-10-22 16:12:23 +08:00 · 4627 次点击
    这是一个创建于 2263 天前的主题,其中的信息可能已经有所发展或是发生改变。

    假设一个场景:对外提供一个 http 接口 A,A 接口内部实际上又是调用 B C D 三个微服务接口,B C D 之间没有依赖关系,可以同时执行,最后将结果组合起来返回。

    我们正常一般都是顺序同步执行,假设每个服务用时 1 秒,那么至少需要 3 秒才能返回结果,假如 BCD 并行执行的话,理论上只要 1 秒就能返回结果了。

    在 Java 里只想到了用多线程来做这样的优化,可以用 callable 这种可以获取异步返回结果的类。

    在公司内网上看到有人说可以用 RxJava 来做这样的优化,但是花了些时间发现好像还不如上面的方法,也可能是自己对 RxJava 不熟悉,如果有熟悉 RxJava 的不吝赐教。还是用这里的例子来说明,在 RxJava 里 B C D 服务需要各封装成一个 observable,然后和 observer 用异步线程模式来关联。那么 BCD 会同时开始执行,但是我发现没有已经封装好的工具来阻塞主线程等大家一起执行完返回结果,callable 好歹还有个 get ()方法,不过也不好用就是了。最好还是借助 CountDownLatch 类的工具来协助。 不知道 RxJava 在服务端应用不多的原因是不是因为这个,实在是没有发现什么特别好的地方,唯一我觉得好用的可能就是在一个 observerable 里 onnext 链式调用,对于有层层依赖的情况可能比较好用,还有一些操作符之类的。

    另外在知乎上也看到一篇同样主题的文章

    希望大家来点建议。

    19 条回复    2018-10-24 11:27:30 +08:00
    ysweics
        1
    ysweics  
       2018-10-22 16:24:56 +08:00
    同有这种疑惑,坐等大神
    DeadLion
        2
    DeadLion  
    OP
       2018-10-22 16:38:21 +08:00
    贴一个 Callable demo 出来

    ```
    @RequestMapping("/testWhitCallable")
    public String testWhitCallable() throws ExecutionException, InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(2);

    ExecutorService es = Executors.newCachedThreadPool();

    Future fb = es.submit(() -> {
    Object result = b.methodB();
    countDownLatch.countDown();
    return result;
    });

    Future fa = es.submit(() -> {
    Object result = a.methodA();
    countDownLatch.countDown();
    return result;
    });
    countDownLatch.await();
    return fa.get().toString() + fb.get().toString();
    }
    ```
    sagaxu
        3
    sagaxu  
       2018-10-22 16:38:55 +08:00 via Android
    可以用 future 来做,每个 http 请求一个 future,再聚合这 3 个 future 得到一个用来等待的 future。http 请求改成异步的,不要让它阻塞你的线程。
    lhx2008
        4
    lhx2008  
       2018-10-22 16:39:18 +08:00 via Android
    有一个 zip 方法,我用的是 reactor3 的,把各个请求任务发到不同线程上面,然后 zip 一起,再 onNext,reactor3 有一个 block 方法转回同步。rxjava 不太清楚。

    或者 vert.x 的 future 也有一个类似的 ,叫 CompositeFuture,底层可以看到是 Countdown latch,所以其实 jdk 自带就是 countdownlatch
    lhx2008
        5
    lhx2008  
       2018-10-22 16:46:20 +08:00 via Android
    reacto3 demo,手机打的,意思一下

    Mono.zip(Mono.fromCallable(), Mono.fromCallable(),Mono.fromCallable())
    .doOnNext(xxx)
    .doOnError(xxx)
    .block
    lhx2008
        6
    lhx2008  
       2018-10-22 16:48:05 +08:00 via Android
    忘了发线程了

    Mono.zip(Mono.fromCallable().subscribeOn(线程池), Mono.fromCallable().subsribeOn(),Mono.fromCallable().subscribeOn)
    .doOnNext(xxx)
    .doOnError(xxx)
    .block
    xcstream
        7
    xcstream  
       2018-10-22 16:48:36 +08:00
    java 有协程么
    没有的话本质上就是多线程了
    ffeii
        8
    ffeii  
       2018-10-22 16:49:21 +08:00 via iPhone
    jinhan13789991
        9
    jinhan13789991  
       2018-10-22 16:50:10 +08:00
    android 开发一枚,我们一般用 combineLatest 来做一些表单验证操作。没有涉及到太多多线程。
    你可以看一下 ~
    https://blog.csdn.net/johnny901114/article/details/61191723
    lhx2008
        10
    lhx2008  
       2018-10-22 16:53:59 +08:00 via Android
    @ffeii 是 Mono.zip 吧,参考我上面的,Flux 会对流里面每个都合并。
    hpeng
        11
    hpeng  
       2018-10-22 16:57:42 +08:00
    zhangwugui
        12
    zhangwugui  
       2018-10-22 17:08:01 +08:00
    对 RxJava 同样不太熟悉,楼上倒是也说了这个 CompletableFuture,这个是 JDK8 基于 Future 异步处理的增强,这个倒是可以用在这里。
    DeadLion
        13
    DeadLion  
    OP
       2018-10-22 17:09:08 +08:00   ❤️ 2
    搜到一篇 https://medium.com/@nithinmallya4/processing-streaming-data-with-spring-webflux-ed0fc68a14de

    里面提到了很多方案,不过好像大家都说的差不多了

    1.Java 5 提供的 Futures
    2.Java 8 提供的 CompletableFutures
    3.第三方库 RxJava
    4.Spring 5 提供的 Reactive Streams,实际就是上面提到的 Reactor 实现的。
    5.Spring 5 中的 Webflux,不过我看上面说只能基于 http 或者 websockets 请求
    ,不过我们说的不仅限于 http 形式的微服务,还有 rpc 也算。

    @ysweics @lhx2008 @ffeii
    janus77
        14
    janus77  
       2018-10-22 17:10:20 +08:00
    zip 操作符即可
    lhx2008
        15
    lhx2008  
       2018-10-22 17:14:02 +08:00 via Android   ❤️ 1
    @DeadLion webflux 就是 reactor,只是 webflux 提供了一个配套的异步网络 API,如果是 RPC 的,Reactor 支持接入别的异步或者 fromCallable
    DeadLion
        16
    DeadLion  
    OP
       2018-10-22 17:16:06 +08:00
    @lhx2008 明白了
    wengang285
        17
    wengang285  
       2018-10-22 17:37:49 +08:00
    promise/future
    yaoliyc
        18
    yaoliyc  
       2018-10-22 19:05:07 +08:00
    jdk7 提供了 fork-join
    v3exhost
        19
    v3exhost  
       2018-10-24 11:27:30 +08:00
    ```java
    CompletableFuture<String> future1
    = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2
    = CompletableFuture.supplyAsync(() -> "Beautiful");
    CompletableFuture<String> future3
    = CompletableFuture.supplyAsync(() -> "World");

    CompletableFuture<Void> combinedFuture
    = CompletableFuture.allOf(future1, future2, future3);
    ```
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2365 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:01 · PVG 00:01 · LAX 08:01 · JFK 11:01
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.