在 Java 中并发的请求多个接口,把请求来的数据做个聚合然后返回,如果是调用了 api1 然后再调用 api2 再调用 api3,这种方式可能大多数时间都在网络 IO 上了,而且随着接口的变多性能不断下降 如果是在 go 中直接用协成就可以了,请求五个 api 和请求一个 api 耗时可能差不多(前提 api 平均耗时都一样)
java 中该如何编写代码呢?使用线程池的话肯定提高效率有限,因为线程不是协成,数量不会太多,并发量大了都在线程池里排队了
使用 NIO httpclient 可能效果好些,但是都必须写 callback,怎么判断所有的 api 都把结果成功返回了,然后我们要聚合接口,callback 写起来有些难受
java 中这种场景应该很多见,一般会怎么处理呢?
1
guyeu 2020-03-26 16:23:43 +08:00 1
线程池+CompletableFuture+聚合操作
|
2
pursuer 2020-03-26 16:26:16 +08:00
使用 Kotlin,C#的 async await,使用 Java Promise 库自己封装或者等待 Project Loom
|
3
guyeu 2020-03-26 16:28:15 +08:00
CompletableFuture 自带一个线程池,自己写的话可能比协程别扭一点,但是效率差不多
|
4
xuanbg 2020-03-26 17:12:44 +08:00
其实还是前端直接调各个接口拿数据效率高,前端 JS 天然就是异步模式的。
|
5
coer 2020-03-26 18:01:21 +08:00
callback+CompletableFuture ?
|
6
jamlee 2020-03-26 18:14:02 +08:00
RxJava 比较适合这种事情吧
|
7
areless 2020-03-26 18:22:13 +08:00 via Android
nginx lua 抗下大半
|
8
Artiano 2020-03-26 18:29:22 +08:00
RxJava zip,用 Kotlin async/await 特别爽
|
9
123444a 2020-03-26 18:32:59 +08:00 via Android
肯定是 callback 丫大哥,多线程不需要加锁
|
10
123444a 2020-03-26 18:48:48 +08:00 via Android
callback 都是在 io 线程的,然后唤醒工作线程在工作线程判断收完 response 没,然后记得设置超时也要 callback
|
11
CoderGeek 2020-03-26 18:52:48 +08:00
Future rx
|
12
araaaa 2020-03-26 18:53:18 +08:00 via iPhone
rxjava spring reactor
|
13
th00000 2020-03-26 18:55:30 +08:00
异步可解
|
14
xhinliang 2020-03-26 18:57:23 +08:00
CountDownLatch
|
15
gz911122 2020-03-26 19:00:24 +08:00
rxjava 了解一下
或者 kotlin 协程 |
16
Kipp 2020-03-26 19:10:43 +08:00 via iPhone
最近也同样遇到这个问题 mark
|
17
yeqizhang 2020-03-26 21:50:16 +08:00 via Android
futuretask 短板是时间最长的那个接口
|
18
liuliuluk 2020-03-26 21:53:16 +08:00
以往项目中是用 Future,mark 一下 JDK8 新特性
|
19
micean 2020-03-26 22:07:37 +08:00
java 的 vertx 可以这么用
CompositeFuture.all(请求 1,请求 2...) .compose(结果集 -> 处理结果集,返回最终结果) .setHandler(成功时的处理最终结果,至少一项请求失败时处理异常) 和 java 自带的 CompletableFuture 相比,只用了 1 个线程,无阻塞。 |
20
mosliu 2020-03-26 23:54:10 +08:00
jdk8
CompletableFuture allof |
21
Macolor21 2020-03-27 00:04:27 +08:00 via iPhone
以前做个类似场景,用创建个线程池,然后用 CountdownLatch 。看楼上似乎 8 的特性也支持。建议楼主写多个版本,做下 benchmark
|
22
noble4cc OP @guyeu 线程池在并发量大的情况下不如协成吧
感觉线程池的原理是使用多线程进行 http 请求,比如 5 个 api,开 5 线程,然后聚合,但是每个线程在执行的时候 io 是阻塞的,大部分的线程时间都浪费在阻塞上了,如果我们这种聚合 api 数据的请求特别多,比如 1000qps,复用五个线程或者多开点 20 个,相当于 1000 个要请求 5000 次后端 api,在线程池里排队处理的话太慢吧 |
23
tairan2006 2020-03-27 08:40:57 +08:00 via Android
这不是基本功么,开线程等待完成,CountDownLatch 啊
|
24
Seawalker 2020-03-27 09:06:37 +08:00 via Android
标记一下看看有没有好方案
|
25
shaoyijiong 2020-03-27 09:16:14 +08:00
一楼标准答案
|
26
yc8332 2020-03-27 09:17:16 +08:00
只是聚合请求,干嘛不搞个现成的 api 网关就好了。。
|
27
piglovesx 2020-03-27 09:19:51 +08:00
小白一枚,很好奇协程是从哪个英文单词翻译过来的,是 channel 吗?
|
28
guolaopi 2020-03-27 09:31:28 +08:00
C#:Task.WaitAll();
(滑稽 |
29
LosLord 2020-03-27 09:46:07 +08:00
CompletableFuture.allOf(List<CompletableFuture>)
|
30
noble4cc OP @tairan2006 老哥我说过多线程方案性能肯定不行
200 qps 访问 5 个 api 不能开 1000 个线程吧,线程复用一个机器 8core 开 16 个工作线程的话,每次并发的请求后端 api 是 16,每个 api 平均耗时 10ms 的话,第 200 个请求得等到什么时候呢?量少了确实没什么问题,java 类似的工具包确实也多如牛毛 |
31
noble4cc OP @micean 这个本质上确实是 io 多路复用的原理吧,开起来挺方便的,vert.x 不太熟,netty 到是经常用,我一开始想的是用 netty 封装个 httpclient,但是感觉搞起来太麻烦了,是不是 vert.x 就是用 netty 实现了 http 协议了
|
32
lscexpress 2020-03-27 10:34:03 +08:00
@piglovesx Coroutine 翻译为协程,通常来说 java 不用协程。channel 在书中的翻译多为信道或者通道
|
33
piglovesx 2020-03-27 11:07:32 +08:00
@lscexpress 谢谢 :)
|
34
guyeu 2020-03-27 11:13:24 +08:00
@noble4cc #22 是的,线程池在并发量大的情况下不如协程。所以这种情况下会做一些设计,比如把发消息和收消息分开,一个线程池专门发,一个线程池专门处理收消息,也就是 NIO 的思路。。
|
35
hpeng 2020-03-27 11:14:44 +08:00 via iPhone
看一楼的
|
37
buliugu 2020-03-27 15:27:09 +08:00
java 大量 API 请求可以用 Quasar,现成的纤程库
|
38
elevation 2020-03-30 15:47:12 +08:00
不知道你现在怎么样,我觉得用 diruptor,环形数组线程分发,可以降低消耗,自己写底层实现,工厂,资源调用。比较方便;
|
39
xiaoidea 2020-03-31 17:27:30 +08:00
目前用的是线程池+guava ListenableFuture 、Futures 工具类,确实很多线程堵在 IO 上了,线程池要开多大需要压测
看到有其他项目用 Spring webflux 的,对这个不熟 |
40
monkeyWie 2020-04-03 18:31:05 +08:00
NIO httpclient + CountDownLatch 不就行了吗
主线程还是得阻塞的啊,阻塞到 api 全部 callback 完 |
41
RRRSSS 2020-04-08 19:51:03 +08:00
Completable<Void> f = CompletableFuture.allOf(task1, task2, task3); // 这里注意要使用线程池
f.get(); // 这里消耗的时间是 task1 、task2 、task3 的最大值 Stream.of(future1, future2, future3).forEach(dd -> dd.thenAccept(e -> { // 处理数据 })); |
42
guisheng 2020-05-23 15:41:14 +08:00
楼主最后使用了什么方式呢?我目前采用的是 spring webclient 的 Mono.zip 来组合请求发送。
|