一个 service 方法里 for 循环一个列表,每条记录里 new Thread 在 run 方法里调用一次 payRPC 方法 例如
orderList.forEach(x->{ new Thread(){ public void run(){ payRPC.reset(x.getId(),x.getUserId()); } }.start(); });
但是这种方式 报错了 "could not be queued for execution and no fallback available", 超出了 Hystrix 默认是 10 个线程.
请问一下能否改成 循环列表 把 rpc 调用放到线程池中去,让它去控制短时间内的 RPC 调用,这样是否可行? executorService 的队列 queue 大小是不是得小于 10 个(Hystrix 默认的线程数)
orderList.forEach(x->{ executorService.execute(new Runnable() { @Override public void run() { payRPC.reset(x.getId(),x.getUserId()); } }); });
或者 循环列表, 不使用 PRC 调用,直接利用 httpClient 这种直接调用 url 接口请求? 有提到 RabbitMQ(这个还没接触过)
就像这种情况 https://www.v2ex.com/t/481064, 300 万的列表,rpc 接口 但每次只能处理 300(超出报错),怎么去有限制地调用 rpc 接口 (我的数据量没这么大).
请问下大佬 我这种该怎么解决? 谢谢
1
Raymon111111 2019-08-19 11:19:05 +08:00
线程池应该是分为执行线程的大小和等待队列的吧
|
2
DanielYao 2019-08-19 11:28:56 +08:00
你可以先把 id 全部放到 ConcurrentQueue<T> 里,然后定义一个 list<task>任务集合,循环从 ConcurrentQueue 取,放到 list<task>中,10 个一次执行
if (listT.Count % 10 == 0) Task.WaitAll(listT.ToArray()); |
4
519718366 2019-08-19 11:35:51 +08:00 via iPhone
不知道你这代码是为了配合问题描述还是业务就这样
如果一次服务调用就要 reset 多个 order,那就看看服务提供方能给你一个批量接口 for 循环调 rpc 服务会被批斗死 如果一次只是 reset 一个 order,你代码只是为了模拟多个用户并发的场景,你就正常调,那就是正常的业务产品问题了 |
5
wuzhizuiguo OP @Raymon111111 谢谢. 好像是的.. 第一次用. ThreadPoolExecutor 的 corePoolSize 和 maximumPoolSize 小于 10 ,然后 BlockingQueue 大一点, 可能是先执行核心线程数大小的 rpc 调用,然后队列里的在等着. 测了 20 几个,没报异常,不知道列表大了会怎么样?
|
6
wuzhizuiguo OP @DanielYao 这个 ConcurrentQueue 好像是 C++的, 我去看看对应 Java 的
|
7
wuzhizuiguo OP @519718366 谢谢. 明白了, 是我自己写成了这样的.. 没有了解到这样循环调用 rpc 会出现问题.
|
8
519718366 2019-08-19 12:10:12 +08:00 via iPhone
@wuzhizuiguo 既然你的服务的任务量在这,不管线程池还是 mq,注意你自己的服务别超时
|
9
wuzhizuiguo OP @519718366 好的. 没有那么大,很小.....
|
10
wdmx007 2019-08-19 14:59:33 +08:00
java 的线程池不是内置了一个 TaskQueue 嘛,不用再外面自己用 Queue 了。
|
11
passerbytiny 2019-08-19 16:09:26 +08:00
你的这个错误是“执行已经不能放到队列,即队列已满,并且这种情况也没有定义 fallback 处理”,受限制的是队列容量而不是线程数。队列容量一般是大于线程数的,但是 Hystrix 本来就是做熔断的,有相对极严格的超时要求,队列容量也不建议太大。
你这种情况实际上与 Hystrix 无关,调大 Hystrix 限制、换成 httpClient 都治标不治本。你的问题的本质是:一个 RPC 请求(对你 service 本身的 RPC )分裂成了多个后续的 RPC 请求(多个 payRPC.reset )。类似于 N+1 查询,但比 N+1 查询严重多了。处理方式有两个: 一,让 pay 服务提供批量 reset 的接口,这样不管 orderList 多长都只需要一个后续 RPC。前提是批量 reset 的时间足够短,如果批量 reset 时间很长,甚至等同于单个 reset 时间乘以个数,那么该方式不可用。 二,异步处理或最终一致性。比如说 pay 服务提供批量 reset 注册接口,调用该接口只是把 reset 加入 pay 服务的队列,并没有实际的 reset,pay 服务后续会自行处理队列,你当前的服务需要过段时间后再去询问执行结果。又比如说使用消息队列,你当前的服务只用把一个 reset 消息放到 RabbitMQ 等消息队列上,不用根 pay 服务直接打交道,pay 服务需要自行消费消息队列做后续处理。这种方式很复杂,一时半会说不清楚,你可以以“最终一致性”、“事件驱动”为关键字搜索资料。 |
12
wuzhizuiguo OP @wdmx007 谢谢. 是这个参数吧 BlockingQueue.
|
13
wuzhizuiguo OP @passerbytiny 谢谢,很详细, 学习了. 具体报错是博客上看到的" 超出了 Hystrix 的默认 10 个线程", 实际日志恰好显示执行的个数也是 10 个,Hystrix 还没有了解过. 确实我偷懒了,想当然了,以为循环 rpc 调一下就好了,没有写一个 RPC 批量处理. 刚刚试了下 ExecutorService 线程池 创建一个核心线程数小于 10 的(2,3,4..), 然后设置一定长度的队列 BlockingQueue(几千). 消息队列自己写的部分还没有接触到..格局很小,没有几十上百万的数据...
|