V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
wuzhizuiguo
V2EX  ›  Java

请问大佬 并发的 RPC 调用次数超过了 Hystrix 默认的线程数,怎么"慢慢"地调用?

  •  
  •   wuzhizuiguo · 2019-08-19 10:40:59 +08:00 · 4739 次点击
    这是一个创建于 1905 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个 service 方法里 for 循环一个列表,每条记录里 new Thread 在 run 方法里调用一次 payRPC 方法 例如

      orderList.forEach(x->{
                  new Thread(){
                         public void run(){
                              payRPC.reset(x.getId(),x.getUserId());
                          }
                      }.start();
                     });
    

    但是这种方式 报错了 "could not be queued for execution and no fallback available", 超出了 Hystrix 默认是 10 个线程.

    请问一下能否改成 循环列表 把 rpc 调用放到线程池中去,让它去控制短时间内的 RPC 调用,这样是否可行? executorService 的队列 queue 大小是不是得小于 10 个(Hystrix 默认的线程数)

    orderList.forEach(x->{
               executorService.execute(new Runnable() {
                   @Override
                   public void run() {
                       payRPC.reset(x.getId(),x.getUserId());
                   }
               });
           });
    

    或者 循环列表, 不使用 PRC 调用,直接利用 httpClient 这种直接调用 url 接口请求? 有提到 RabbitMQ(这个还没接触过)

    就像这种情况 https://www.v2ex.com/t/481064, 300 万的列表,rpc 接口 但每次只能处理 300(超出报错),怎么去有限制地调用 rpc 接口 (我的数据量没这么大).

    请问下大佬 我这种该怎么解决? 谢谢

    13 条回复    2019-08-19 16:43:23 +08:00
    Raymon111111
        1
    Raymon111111  
       2019-08-19 11:19:05 +08:00
    线程池应该是分为执行线程的大小和等待队列的吧
    DanielYao
        2
    DanielYao  
       2019-08-19 11:28:56 +08:00
    你可以先把 id 全部放到 ConcurrentQueue<T> 里,然后定义一个 list<task>任务集合,循环从 ConcurrentQueue 取,放到 list<task>中,10 个一次执行
    if (listT.Count % 10 == 0)
    Task.WaitAll(listT.ToArray());
    DanielYao
        3
    DanielYao  
       2019-08-19 11:30:29 +08:00
    @DanielYao 最好还是能让 RPC 提供批量方法
    519718366
        4
    519718366  
       2019-08-19 11:35:51 +08:00 via iPhone
    不知道你这代码是为了配合问题描述还是业务就这样

    如果一次服务调用就要 reset 多个 order,那就看看服务提供方能给你一个批量接口
    for 循环调 rpc 服务会被批斗死

    如果一次只是 reset 一个 order,你代码只是为了模拟多个用户并发的场景,你就正常调,那就是正常的业务产品问题了
    wuzhizuiguo
        5
    wuzhizuiguo  
    OP
       2019-08-19 11:50:38 +08:00
    @Raymon111111 谢谢. 好像是的.. 第一次用. ThreadPoolExecutor 的 corePoolSize 和 maximumPoolSize 小于 10 ,然后 BlockingQueue 大一点, 可能是先执行核心线程数大小的 rpc 调用,然后队列里的在等着. 测了 20 几个,没报异常,不知道列表大了会怎么样?
    wuzhizuiguo
        6
    wuzhizuiguo  
    OP
       2019-08-19 11:51:01 +08:00
    @DanielYao 这个 ConcurrentQueue 好像是 C++的, 我去看看对应 Java 的
    wuzhizuiguo
        7
    wuzhizuiguo  
    OP
       2019-08-19 11:51:33 +08:00
    @519718366 谢谢. 明白了, 是我自己写成了这样的.. 没有了解到这样循环调用 rpc 会出现问题.
    519718366
        8
    519718366  
       2019-08-19 12:10:12 +08:00 via iPhone
    @wuzhizuiguo 既然你的服务的任务量在这,不管线程池还是 mq,注意你自己的服务别超时
    wuzhizuiguo
        9
    wuzhizuiguo  
    OP
       2019-08-19 13:14:10 +08:00
    @519718366 好的. 没有那么大,很小.....
    wdmx007
        10
    wdmx007  
       2019-08-19 14:59:33 +08:00
    java 的线程池不是内置了一个 TaskQueue 嘛,不用再外面自己用 Queue 了。
    passerbytiny
        11
    passerbytiny  
       2019-08-19 16:09:26 +08:00
    你的这个错误是“执行已经不能放到队列,即队列已满,并且这种情况也没有定义 fallback 处理”,受限制的是队列容量而不是线程数。队列容量一般是大于线程数的,但是 Hystrix 本来就是做熔断的,有相对极严格的超时要求,队列容量也不建议太大。

    你这种情况实际上与 Hystrix 无关,调大 Hystrix 限制、换成 httpClient 都治标不治本。你的问题的本质是:一个 RPC 请求(对你 service 本身的 RPC )分裂成了多个后续的 RPC 请求(多个 payRPC.reset )。类似于 N+1 查询,但比 N+1 查询严重多了。处理方式有两个:
    一,让 pay 服务提供批量 reset 的接口,这样不管 orderList 多长都只需要一个后续 RPC。前提是批量 reset 的时间足够短,如果批量 reset 时间很长,甚至等同于单个 reset 时间乘以个数,那么该方式不可用。
    二,异步处理或最终一致性。比如说 pay 服务提供批量 reset 注册接口,调用该接口只是把 reset 加入 pay 服务的队列,并没有实际的 reset,pay 服务后续会自行处理队列,你当前的服务需要过段时间后再去询问执行结果。又比如说使用消息队列,你当前的服务只用把一个 reset 消息放到 RabbitMQ 等消息队列上,不用根 pay 服务直接打交道,pay 服务需要自行消费消息队列做后续处理。这种方式很复杂,一时半会说不清楚,你可以以“最终一致性”、“事件驱动”为关键字搜索资料。
    wuzhizuiguo
        12
    wuzhizuiguo  
    OP
       2019-08-19 16:42:35 +08:00
    @wdmx007 谢谢. 是这个参数吧 BlockingQueue.
    wuzhizuiguo
        13
    wuzhizuiguo  
    OP
       2019-08-19 16:43:23 +08:00
    @passerbytiny 谢谢,很详细, 学习了. 具体报错是博客上看到的" 超出了 Hystrix 的默认 10 个线程", 实际日志恰好显示执行的个数也是 10 个,Hystrix 还没有了解过. 确实我偷懒了,想当然了,以为循环 rpc 调一下就好了,没有写一个 RPC 批量处理. 刚刚试了下 ExecutorService 线程池 创建一个核心线程数小于 10 的(2,3,4..), 然后设置一定长度的队列 BlockingQueue(几千). 消息队列自己写的部分还没有接触到..格局很小,没有几十上百万的数据...
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3206 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 12:25 · PVG 20:25 · LAX 04:25 · JFK 07:25
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.