作者:个推平台研发工程师 祥子
在个推的推送场景中,消息队列在整个系统中占有非常重要的位置。 当 APP 有推送需求的时候, 会向个推发送一条推送命令,接到推送需求后,我们会把 APP 要求推送消息的用户放入下发队列中,进行消息下发;当同时有多个 APP 进行消息下发时,难免会出现资源竞争的情况, 因此就产生了优先级队列的需求,在下发资源固定的情况下, 高优先级的用户需要有更多的下发资源。
针对以上场景,个推基于 Kafka 设计了第一版的优先级队列方案。Kafka 是 LinkedIn 开发的一个高性能、分布式消息系统; Kafka 在个推有非常广泛的应用,如日志收集、在线和离线消息分发等。
架构 在该方案中,个推将优先级统一设定为高、中、低三个级别。具体操作方案如下:
对某个优先级根据 task (单次推送任务)维度,存入不同的 Topic,一个 task 只写入一个 Topic,一个 Topic 可存多个 task ;
消费模块根据优先级配额(如 6:3:1),获取不同优先级的消息数,同一优先级轮询获取消息;这样既保证了高优先级用户可以更快地发送消息,又避免了低优先级用户出现没有下发的情况。
Kafka 方案遇到的问题
随着个推业务的不断发展,接入的 APP 数量逐渐增多,第一版的优先级方案也逐渐暴露出一些问题:
基于上述问题,个推进行了新一轮的技术选型, 我们需要可以创建大量的 Topic, 同时吞吐性能不能比 Kafka 逊色。经过一段时间的调研,Apache Pulsar 引起了我们的关注。
Apache Pulsar 是一个企业级的分布式消息系统,最初由 Yahoo 开发,在 2016 年开源,并于 2018 年 9 月毕业成为 Apache 基金会的顶级项目。Pulsar 已经在 Yahoo 的生产环境使用了三年多,主要服务于 Mail、Finance、Sports、Flickr、the Gemini Ads platform、Sherpa (Yahoo 的 KV 存储)。
架构
Topic 数量 Pulsar 可以支持百万级别 Topic 数量的扩展,同时还能一直保持良好的性能。Topic 的伸缩性取决于它的内部组织和存储方式。Pulsar 的数据保存在 bookie (BookKeeper 服务器)上,处于写状态的不同 Topic 的消息,在内存中排序,最终聚合保存到大文件中,在 Bookie 中需要更少的文件句柄。另一方面 Bookie 的 IO 更少依赖于文件系统的 Pagecache,Pulsar 也因此能够支持大量的主题。
消费模型 Pulsar 支持三种消费模型:Exclusive、Shared 和 Failover。 Exclusive (独享):一个 Topic 只能被一个消费者消费。Pulsar 默认使用这种模式。
Shared(共享):共享模式,多个消费者可以连接到同一个 Topic,消息依次分发给消费者。当一个消费者宕机或者主动断开连接时,那么分发给这个消费者的未确认(ack)的消息会得到重新调度,分发给其他消费者。
Failover (灾备):一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障,则备份消费者接管。不会出现同时有两个活跃的消费者。
Exclusive 和 Failover 订阅,仅允许一个消费者来使用和消费每个订阅的 Topic。这两种模式都按 Topic 分区顺序使用消息。它们最适用于需要严格消息顺序的流(Stream)用例。
Shared 允许每个主题分区有多个消费者。同一个订阅中的每个消费者仅接收 Topic 分区的一部分消息。Shared 最适用于不需要保证消息顺序队列(Queue)的使用模式,并且可以按照需要任意扩展消费者的数量。
存储 Pulsar 引入了 Apache BookKeeper 作为存储层,BookKeeper 是一个专门为实时系统优化过的分布式存储系统,具有可扩展、高可用、低延迟等特性。具体介绍,请参考 BookKeeper 官网。
Segment BookKeeper 以 Segment (在 BookKeeper 内部被称作 ledger) 作为存储的基本单元。从 Segment 到消息粒度,都会均匀分散到 BookKeeper 的集群中。这种机制保证了数据和服务均匀分散在 BookKeeper 集群中。
Pulsar 和 Kafka 都是基于 partition 的逻辑概念来做 Topic 存储的。最根本的不同是,Kafka 的物理存储是以 partition 为单位的,每个 partition 必须作为一个整体(一个目录)存储在某个 broker 上。 而 Pulsar 的 partition 是以 segment 作为物理存储的单位,每个 partition 会再被打散并均匀分散到多个 bookie 节点中。
这样的直接影响是,Kafka 的 partition 的大小,受制于单台 broker 的存储;而 Pulsar 的 partition 则可以利用整个集群的存储容量。
扩容 当 partition 的容量达到上限后,需要扩容的时候,如果现有的单台机器不能满足,Kafka 可能需要添加新的存储节点,并将 partition 的数据在节点之间搬移达到 rebalance 的状态。
而 Pulsar 只需添加新的 Bookie 存储节点即可。新加入的节点由于剩余空间大,会被优先使用,接收更多的新数据;整个扩容过程不涉及任何已有数据的拷贝和搬移。
Broker 故障 Pulsar 在单个节点失败时也会体现同样的优势。如果 Pulsar 的某个服务节点 broker 失效,由于 broker 是无状态的,其他的 broker 可以很快接管 Topic,不会涉及 Topic 数据的拷贝;如果存储节点 Bookie 失效,在集群后台中,其他的 Bookie 会从多个 Bookie 节点中并发读取数据,并对失效节点的数据自动进行恢复,对前端服务不会造成影响。
Bookie 故障 Apache BookKeeper 中的副本修复是 Segment (甚至是 Entry)级别的多对多快速修复。这种方式只会复制必须的数据,这比重新复制整个主题分区要精细。如下图所示,当错误发生时,Apache BookKeeper 可以从 bookie 3 和 bookie 4 中读取 Segment 4 中的消息,并在 bookie 1 处修复 Segment 4。所有的副本修复都在后台进行,对 Broker 和应用透明。
当某个 Bookie 节点出错时,BookKeeper 会自动添加可用的新 Bookie 来替换失败的 Bookie,出错的 Bookie 中的数据在后台恢复,所有 Broker 的写入不会被打断,而且不会牺牲主题分区的可用性。
在设计思路上,Pulsar 方案和 Kafka 方案并没有多大区别。但在新方案中,个推技术团队借助 Pulsar 的特性,解决了 Kafka 方案中存在的问题。
dbStorage_rocksDB_blockCacheSize
设置的足够大;当消息体量大,出现 backlog 大量堆积时, 使用默认大小(256M)会出现读耗时过大情况,导致消费变慢。backlogQuotaDefaultLimitGB
设置的足够大(默认 10G), 避免因为默认使用producer_request_hold
模式出现 block producer 的情况;当然可以根据实际业务选择合适的 backlogQuotaDefaultRetentionPolicy
。现在, 个推针对优先级中间件的改造方案已经在部分现网业务中试运行,对于 Pulsar 的稳定性,我们还在持续关注中。 作为一个 2016 年才开源的项目,Pulsar 拥有非常多吸引人的特性,也弥补了其他竞品的短板,例如跨地域复制、多租户、扩展性、读写隔离等。尽管在业内使用尚不广泛, 但从现有的特性来说, Pulsar 表现出了取代 Kafka 的趋势。在使用 Pulsar 过程中,我们也遇到了一些问题, 在此特别感谢翟佳和郭斯杰(两位均为 Stream Native 的核心工程师、开源项目 Apache Pulsar 的 PMC 成员)给我们提供的支持和帮助。
参考文献:
[1] 比拼 Kafka, 大数据分析新秀 Pulsar 到底好在哪( https://www.infoq.cn/article/1UaxFKWUhUKTY1t_5gPq)
[2] 开源实时数据处理系统 Pulsar:一套搞定 Kafka+Flink+DB( https://juejin.im/post/5af414365188256717765441)