V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Joker123456789
V2EX  ›  Java

我们总是避免不了这一种需求

  •  
  •   Joker123456789 · 17 天前 · 1685 次点击

    非常不好意思,连续发两篇文章肯定是打扰到大家了,但我是有苦衷的[大哭][大哭],之前梯子出了点问题,所以一直访问不上,今天刚刚弄好,所以就把之前积累的两篇文章都发出来了。

    在此我真诚的给大家道个歉,实在是不好意思。


    我们总是避免不了这样一种需求,写个定时任务去查数据库或者去别的地方捞数据,然后对查出来的数据进行各种业务逻辑的处理。

    这其实就是就是一种生产者与消费者模式,查询数据库的过程可以称为“生产”,对数据进行处理的过程可以称为“消费”,只不过大部分情况下我们都懒得去把他们分开,而是查出来就直接处理了。但是这种做法有个很明显的缺陷,就是生产和消费这两个步骤有时会互相阻塞。

    也就是说当查询数据的步骤出现阻塞的时候就没有数据来处理了。

    反过来也一样,当处理数据的步骤阻塞了,会导致查询数据的步骤停下来。

    所以为了让这两种能力可以各自发挥,我们需要把他们分开,各干各的,而这种分开的设计方式就叫生产者与消费者模型。

    只要能查到数据就尽管查,就算没人处理也不会影响我继续查。

    只要有数据我就处理,我才不管你此刻是不是阻塞了,你之前查出来的数据,我可能都还没处理完。

    所以我写了一个小工具,可以很好的解决这个问题

    他是一个多对多的模型,可以把多个生产者和多个消费者放到一个组合里,这个组合里所有的生产者都可以发布数据,而发布出去的数据会被所有消费者接收到。

    每个生产者都会单独占用一个线程,每个消费者也一样。

    比如像这样子我们就可以创建一个组合

    // 创建一组生产者与消费者,而这样组可以创建无限个
    // 每一组的生产者都只会把数据推送给同一组的消费者
    MagicDataProcessing.getProducerAndConsumerManager()
                    .addProducer(new DemoProducer()) // 添加一个生产者(可以添加多个)
                    .addConsumer(new DemoConsumer()) // 添加一个消费者(可以添加多个)
                    .start();
    

    生产者长这样

    这里省略了很多其他配置,感兴趣的可以去官网查看

    public class DemoProducer extends MagicProducer {
    
        /**
         * 当生产者启动后,会自动执行这个方法,我们可以在这个方法里生产数据,并通过 publish 方法发布给消费者
         *
         * 这边举一个例子
         * 假如我们需要不断地扫描某张表,根据里面的数据状态去执行一些业务逻辑
         * 那么我们可以在这个方法里写一个查询的逻辑,然后将查询到数据发送给消费者
         */
        @Override
        public void producer() {
            // 根据上面的例子,我们可以查询这张表里符合条件的数据
            List<Object> dataList = selectList();
    
            // 然后将他推送给消费者
            // 可以推送任意类型的数据
            this.publish(dataList);
    
            // 也可以分页查询,一页一页的推,至于怎么分页那就是业务层的事情了
    
        }
    }
    

    消费者长这样

    同样省略了很多其他配置,感兴趣的可以去官网查看

    public class DemoConsumer extends MagicConsumer {
    
        /**
         * 心跳通知,消费者每消费一个任务,都会触发一下这个方法
         * 我们可以根据他触发的频率来判断这个消费者的活跃度
         *
         * 注意!!!
         * 这个方法里不可以有耗时的操作,不然会将消费者阻塞的
         * 如果一定要加耗时的操作,那么务必在新线程里搞
         * @param id
         */
        @Override
        public void pulse(String id) {
            new Thread(()->{
                // 如果你需要在这个方法里搞一些耗时的操作,那么务必要像这样开启一个新线程
                // 不然消费者会被阻塞的
            }).start();
        }
    
    
        /**
         * 这个方法会接收到生产者推送过来的数据
         * 在里面执行相应的业务逻辑即可
         * @param data
         */
        @Override
        public void doRunner(Object data) {
            // data 可以是任何类型
            // 因为能给他推送数据的生产者是固定的,所以 data 有可能收到的类型也是固定的
            // 所以我们可以在这里自己判断,然后转化即可
            // 为什么不用泛型?这是为了兼容多个生产者,因为他们推送的数据类型可能会不同
        }
    }
    

    这个模型里做了哪些事情?

    • 多线程处理,也就是前面提到的,每个生产者,每个消费者都是由单独的线程去执行的
    • 会自动开始下一轮生产,当生产者里的 producer 方法执行结束就意味着一轮生产已经结束了,根据业务需求如果需要不断的生产一轮又一轮,那么 producer 方法会自动不断地去执行。
    • 限制生产者,当生产者投放完一轮数据以后,会去监视消费者是否把这批数据消费完了,如果没消费完就会进入等待,这样可以在一定程度上避免数据积压。

    其他扩展功能

    项目里还提供了其他的工具类,可以并发处理 List ,Set ,Map 等集合,如果在生产者或者消费者里用上这些工具,可以进一步提高效率。

    官网地址:https://magician-io.com

    1 条回复    2024-11-25 11:19:45 +08:00
    edwardzcn98
        1
    edwardzcn98  
       16 天前
    这是教学向的帖子吗?我看思路就和 toy message queue 一样。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   867 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 21:20 · PVG 05:20 · LAX 13:20 · JFK 16:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.