2021年10月12日

EDA 事件驱动架构与 EventBridge 二三事
当下比较成功的企业已然认识到,要想最大限度提升运营效率和客户体验,务必将业务和技术两方面的举措紧密结合起来。运营事件或业务形势的变化是时下众多企业关注的焦点,这些变化能够为企业领导者带来切实有用的信息,而架构设计的主旨恰恰是从客户联系人、交易、运营等方面的信息中获取洞见,两者相辅相成。传统技术历来对企业从事件中获取洞见的速度有着诸多限制,比如用于记录、收集和处理此类事件的批处理 ETL(提取、转换、加载)。 事件驱动型架构 (EDA) 方兴未艾,作为一种 Serverless 化的应用概念对云原生架构具有着深远影响。当我们讨论到一个具体架构时,首当其冲的是它的发展是否具有技术先进性。这里从我们熟悉的 MVC 架构,SOA 架构谈起,聊一聊关于消息事件领域的历史与发展趋势。 消息事件领域的发展趋势 早在 2018 年,Gartner 评估报告将 EventDriven Model 列为 10 大战略技术趋势之一,事件驱动架构(EDA)将成为未来微服务的主流,并做出以下断言: 到 2022 年,事件通知的软件模型将成为超过 60% 的新型数字化商业的解决方案; 到 2022 年,超过 50% 的商业组织将参与到事件驱动的数字化商业服务的生态系统当中; George Santayana 在《 The Life of Reason》曾提到, Those who fail to learn History are doomed to repeat it.(不懂历史的人注定会重蹈覆辙)。我们以史为鉴,来看看为什么会架构会演进到事件驱动。 架构本身没有优劣之分,它本身就是一组技术决策,决定后续项目的所有功能开发(框架,编码规范,文档,流程….),这里聊聊为什么会引入某些框架,这个框架解决了软件开发中的什么问题。 单体架构:在单节点服务中,单体应用的所有模块都封装在单个进程运行,通信通过相同堆栈调用完成。这种模式下非常容易导致结构和关系不明确,难以对系统进行更改和重构。就像一个不透明的,粘稠的,脆弱的,僵硬的 Big Ball of Mud! 分层架构:在经典的分层架构中,层以相当谨慎的方式使用。即一个层只能知道它下方层的数据。在随后的实际应用中,更多的方式是一个层可以访问它下面的任何层。分层架构解决了单体架构的的逻辑分离问题,每一层都可以被等效替换,层区分也更加标准化,同时一个层可以被几个不同/更高级别的层使用。当然,层也有比较明显的缺点,层不能封装掉一切,比如添加到UI的某个字段,可能也需要添加到DB,而且额外多余的层会严重损害系统性能。 MVC 架构:MVC 架构产生的原因其实很简单,随着业务系统的复杂性增加,之前所谓“全栈工程师”已经不适用大部分场景。为了降低前端和后台的集成复杂性,故而开始推广 MVC 架构。其中,Model 代表业务逻辑,View 代表视图层比如前端UI的某个小组件,Controller 提供 View 和 Model 的协调比如将用户某项操作转为业务逻辑等。这里还有很多扩展架构,譬如 ModelViewPresenter ,ModelViewPresenterViewModel,ResourceMethodRepresentation,ActionDomainResponder 。 EBI 架构:即 Entity,Boundary(接口),Interactor(控制)。EBI架构将系统边界视为完整连接,而不仅仅是视图,控制器或接口。EBI 的实体代表持有数据并结束相关行为的实际实体,很类似阿里云的 POP API。EBI 主要还是后端概念,他是与 MVC 相辅相成的。 洋葱架构:洋葱架构是一种低耦合,高内聚的架构模型。所有的应用程序围绕独立的对象模型构建,内层定义接口外层实现接口,耦合方向向中心内聚,所有代码都可以独立与基础设施进行编译和运行。 SOA 架构:SOA 是 Service Orientated Architure 的缩写,即面向服务架构。表示每一个功能都是通过一个独立的服务来提供,服务定义了明确的可调用接口,服务之间的编排调用完成一个完整的业务。其实这个架构也是目前架构中最成熟的,日常使用最多的架构模式。 什么是 EDA 架构 我们聊完之前全部的架构趋势后,再回过头看看什么是 EDA 架构。 EDA 事件驱动架构( EventDriven Architecture ) 是一种系统架构模型,它的核心能力在于能够发现系统“事件”或重要的业务时刻(例如交易节点、站点访问等)并实时或接近实时地对相应的事件采取必要行动。这种模式取代了传统的“ request/response ”模型,在这种传统架构中,服务必须等待回复才能进入下一个任务。事件驱动架构的流程是由事件提供运行的。 上图其实很好的解释了 EDA 架构的模型,但是其实还不够明确。所以,这里我们和单体架构一起对比看看他们之间差异。 在如上对比图中,我们其实可以较为清楚看到它与传统架构的区别。在一般传统架构中,创建订单操作发生后,一系列的操作其实都是通过一个系统完成的。而事件驱动的概念则是将全部操作都转换为 “事件” 概念,下游通过捕获某个 “事件” 来决定调用什么系统完成什么样的操作。 总结来看,事件驱动其实是将比较重要的业务时刻封装成“事件”,并通过某个 EventBus 将事件路由给下游系统。 我们了解了 EDA 架构的整个处理过程,但是还没解决这个所谓的“EventBUS”到底是啥样。 上图就是事件驱动的核心逻辑架构。是不是非常像某个传统 MQ?别着急,下面我会讲到这个架构的复杂部分。讲完 EventBus,我们回过头来看“事件”,刚刚介绍中比较重要部分其实是将操作转换为某类事件进行分发。那这的事件我们怎么定义呢? 简单来看,其实事件就是状态的显著变化,当用户采取特定行动时触发。以 4S 店售卖汽车为例: 当客户购买汽车并且其状态从 For Sale 变为 Sold 是一个事件。 成功交易后,从帐户中扣除金额是一个事件。 单击预订试驾后,从将预约信息添加到指定用户就是一个事件。 每个事件都可能触发一个或多个选项作为响应。 关于事件其实云原生 CNCF 基金会在 2018 年托管了开源 CloudEvents 项目,该项目旨在用统一和规范的格式来描述事件,来加强不同的服务、平台以及系统之间的互操作性。在该项目定义下,通用的事件规范是这样的: 事件主要由 Json 体构成,通过不同字段描述发生的事件。 EDA 架构的落地实践思考 在开始介绍落地实践时,我们先来看一个经典的 EDA 架构模型: 这是一个非常经典 EDA 订单架构,该架构主要使用了 EventBridge 和 FC 函数计算(如果不太熟悉 FaaS 的同学可以把 FC 节点当作 ECS 或 K8s 的某个 POD 节点),通过事件驱动各个业务进行协作。 所以这块的中心节点(EventBridge)其实有三个比较重要的能力: 1. For Event Capturing(事件收集):具备采集事件的能力 2. For Routing(事件路由):通过事件内容将事件路由分发至于下游的能力的 3. For Event Processing(事件过滤/替换):对事件进行脱敏或初步过滤&筛选的能力 通常情况下,要实现这三个能力是比较困难的,比如:Event Capturing 可能需要熟悉 Dell Boomi, Snaplogic, MuleSoft, Dataflow, Apache Apex 等,Routing 部分可能通过 RocketMQ,RabbitMQ, ActiveMQ, Apache Kafka ,Event Processing 需要了解 Apache Storm, Apache Flink 。所以之前讲的逻辑架构其实非常理想,要想实现完成的 EDA 事件驱动还需要包括这些核心能力。   其实,从刚刚的架构中我们也能窥探到一些信息,EDA 架构其实看起来没有那么简单,那它有何优劣呢?下面我就简单罗列下 EDA 架构在实践中的优势: 松耦合:事件驱动架构是高度松耦合且高度分布式的架构模型,事件的创建者(来源)只知道发生的事件,并不知道事件的处理方式,也关心有多少相关方订阅该事件。 异步执行:EDA 架构是异步场景下最适合的执行工具,我们可以将需要事件保留在队列中,直到状态正常后执行。 可扩展性:事件驱动架构可以通过路由&过滤能力快速划分服务,提供更便捷的扩展与路由分发。 敏捷性:事件驱动架构可以通过将事件分发至任何地方,提供更敏捷高效的部署方案。 当然,劣势也很明显: 架构复杂:事件驱动架构复杂,路由节点多,系统结成复杂,功能要求多。 路由分发难:事件路由及分发难,灵活的事件路由需要依赖强大的实时计算能力,对整体分发系统要求较高。 无法追踪:事件追踪是整个 EDA 架构保证,EDA 架构中往往很难追踪到事件处理状态,需要大量的定制化开发。 可靠性差:事件驱动由于需要多系统集成,可靠性通常较差,且交付无法保障。   阿里云 EventBridge 如何解决 EDA 场景下的困境 针对 EDA 场景下面临的这些问题,阿里云推出了 EventBridge,一款无服务器事件总线服务,其使命是作为云事件的枢纽,以标准化的 CloudEvents 1.0 协议连接云产品和应用,应用和应用,提供中心化的事件治理和驱动能力,帮助用户轻松构建松耦合、分布式的事件驱动架构;另外,在阿里云之外的云市场上有海量垂直领域的 SaaS 服务,EventBridge 将以出色的跨产品、跨组织以及跨云的集成与被集成能力,助力客户打造一个完整的、事件驱动的、高效可控的上云体验。并针对 EDA 困境提供了针对性的解决方案。 架构复杂:提供业内通用的  Source ,Buses,Rules,Targets  模块管理能力,同时支持 EventBus 和 EventStream 两种模式。大幅度降低事件驱动架构难度。 路由分发:EventBridge 通过事件规则驱动,支持 8 大事件模式,4 重转换器,满足路由分发的全部诉求。 无法追踪:独家提供事件追踪能力,事件分析/查询能力。为用户完善整体事件链路。 可靠性差:支持 DLQ/ 重试机制,大幅度保证由于用户下游系统导致的事件故障与延迟。同时,在此基础上 EventBridge 支持 82 种阿里云产品,847 种事件类型。 阿里云 EventBridge 更多场景介绍 1. 经典 EDA 事件驱动:事件总线(EventBridge)最重要的能力是通过连接应用程序,云服务和 Serverless 服务构建 EDA(Eventdriven Architectures) 事件驱动架构,驱动应用与应用,应用与云的连接。 2. 流式 ETL 场景:EventBridge 另一个核心能力是为流式的数据管道的责任,提供基础的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步/跨地域备份等场景,连接不同的系统与不同服务。 3. 统一事件通知服务:EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,您可以通过总线直接监听云产品产生的数据,并上报至监控,通知等下游服务。  目前事件总线免费公测,点击下方链接,立即体验!
作者:肯梦
#技术探索 #事件驱动架构

2021年7月30日

RocketMQ在搜狐的创新实践
MQ使用场景及选型 大多数的视频各部门中使用过的消息中间件,包括有 RedisMQ、ActiveMQ、RocketMQ、Kafka 等,本文将选取几个典型的业务介绍一下其使用场景及问题。 1、引入RocketMQ 最开始使用 RocketMQ 的是计数业务,计数业务需要将客户端的播放量实时计算并展示。当时采用 Redis 进行实时计数,再异步调用数据库进行计数。起初这种模式没什么问题,但是随着业务量变大,数据库压力也进一步增大。甚至有时候数据库机器的 CPU 快被打满了,另外当数据库迁移时,需要暂停写入,计数将面临数据丢失。 这时计数业务迫切需要一个可靠的,能实时消费,且能够堆积的 MQ 来改变这种状况. 当时我们考虑了 RocketMQ 和 Kafka,却最终选择了 RocketMQ,原因请参考下方。 2、放弃 Kafka 放弃 Kafka 投放业务需要将为用户推荐的内容投放到各个区域,但是推荐业务需要知道用户对于推荐内容的反馈,所以投放业务选择了使用 Kafka 来跟推荐业务交互。但是由于某次机器故障,导致 Kafka 集群发生故障转移,而不幸的是,这个集群的分区数过多,导致转移耗时几分钟才完成。 进而导致业务线程阻塞,服务进入无响应状态。而之后了解到 RocketMQ 即使某个 broker 宕机,消息会发送到其他 broker,不会产生整个集群阻塞情况,后来投放业务就将消息交互全部迁移到了 RocketMQ 上。 3、不可靠的 RedisMQ 之前视频基础服务使用了 RedisMQ,用来通知调用方,视频数据发生了变化,进行数据更新。而redis的消息推送基于 pub/sub 模式,虽然实时性很高,但是却不保证可靠,而且消息不会进行持久化。 这两个缺点就导致了某些情况下,调用方收不到通知,而且消息丢失时基本无据可查。 所以此业务最终放弃了 RedisMQ,转而投向 RocketMQ。RocketMQ 能够保证消息至少被投递一次,而且消息支持持久化,即使客户端重启,仍然可以从上次消费的地方继续消费。 4、低性能 ActiveMQ 用户视频基础服务之前使用了 ActiveMQ,主要用于通知依赖方数据变更,它的消息体里包含了变更的数据。遗憾的是,当消息量很大时,ActiveMQ 经常出现无法响应的情况,甚至消费者出现长时间接收不到消息的情况。而了解到 RocketMQ 单个 broker 可以承担几十万 TPS,亿级消息堆积时,此业务也迁移到 了RocketMQ 上。 目前使用 RocketMQ 的业务,包括视频基础服务,用户服务,直播业务,付费业务,审核等等业务系统。而 Kafka 大部分只用于日志相关的处理服务上,比如日志上报,业务日志收集等等。 另外,随着 RocketMQ 支持的客户端越来越丰富,也便于我们很多其他语言的业务接入,比如 AI 组使用 python 客户端,一些 GO 开发的业务,使用 GO 客户端等。 运维之痛 初期,我们运维 RocketMQ 基本靠命令行和 RocketMQConsole。业务方经常来询问的问题包括如下: 我有哪些机器在往这个topic发送消息? 发送消息怎么超时了? 发送失败能通知我吗? 消费失败了能通知我吗? 消息体是啥样的? RocketMQ集群不可用了能不能降级隔离? 我消费我的topic为啥导致别的业务消费混乱? 为啥还需要我自己序列化? 问题很多,而且千奇百怪! 而作为运维人员,除了调查解答业务方的问题之外,在命令行运维 RocketMQ,更让我们小心翼翼。生怕脑子一时糊涂,敲错一个命令,造成大面积故障。随着运维的深入,我们总结了一篇又一篇的使用规范,最佳实践,命名约定,操作步骤等等的文章。但是,随之发现,这些文章对生产效率的提升并不明显。所以与其写文档不如将经验和实践转换为产品,能够更好的服务于业务,因此 MQCloud 应运而生。 MQCloud 诞生 先看一下 MQCloud 的定位: 它是集客户端 SDK,监控预警,集群运维于一体的一站式服务平台。MQCloud 的系统架构如下: 接下来分别说明一下 MQCloud 如何解决上面提到的痛点。 1、业务端和运维端分离,使业务用户只聚焦于业务数据 为了实现这个目的,引入了用户,资源两大维度。针对用户和资源加以控制,使不同的用户只聚焦于自己的数据。 对于生产方来说,他关心的是 topic 配置,消息的发送数据,谁在消费等等问题,这样只对他展示相应的数据即可; 对于消费者来说,只关心消费状况,有没有堆积,消费失败等情况; 对于管理员来说,可以进行部署,监控,统一配置,审批等日常运维; 2、清晰明了的操作 通过对不同角色展示不同的视图,使用户可以进行的操作一目了然。 3、规范和安全 为了保障集群操作的安全性和规范性,所有的操作都会以申请单的形式进入后台审批系统,管理员来进行相关审批,安全性大大提升。 4、多维的数据统计和监控预警 MQCloud 核心功能之一就是监控预警,目前支持如下预警: 生产消息异常预警 消费消息堆积预警(broker 角度) 消费客户端阻塞(客户端角度) 消费失败预警 消费偏移量错误预警 消费订阅错误预警 消费落后预警(超出内存阈值,从硬盘拉数据) 死消息预警(消费失败太多,消息进入死信队列) 消息流量异常预警 消息存储耗时过长预警(broker 存储消息耗时) broker&NameServer 宕机预警 服务器宕机预警 服务器 cpu,内存,网络流量等指标预警 要想做监控,必须先做统计,为了更好的知道 RocketMQ 集群的运行状况,MQCloud 做了大量的统计工作(大部分依赖于 broker 的统计),主要包括如下几项: 每分钟 topic 的生产流量:用于绘制 topic 生产流量图及监控预警。 每分钟消费者流量:用于绘制消费流量图及监控预警。 每10分钟 topic 生产流量:用于按照流量展示 topic 排序。 每分钟 broker 生产、消费流量:用于绘制 broker 生产消费流量图。 每分钟集群生产、消费流量:用于绘制集群的生产流量图。 每分钟生产者百分位耗时、异常统计:以 ip 维度绘制每个生产者的耗时流量图及监控预警。 机器的 cpu,内存,io,网络流量,网络连接等统计:用于服务器的状况图和监控预警。 下面捡一两点进行一下说明: 1、生产异常耗时统计: 由于 RocketMQ 并没有提供生产者的流量统计(只提供了 topic,但是并不知道每个生产者的情况),所以 MQCloud 实现了对生产者数据进行统计(通过 RocketMQ 的回调钩子实现): 主要统计如下信息: 客户端 ipbroker ip 发送消息耗时 消息数量 发送异常 统计完成后,定时发送到 MQCloud 进行存储,并做实时监控和展示。 关于统计部分有一点说明,一般耗时统计有最大,最小和平均值,而通常 99% (即 99% 的请求耗时都低于此数值)的请求的耗时情况才能反映真实响应情况。99% 请求耗时统计最大的问题是如何控制内存占用,因为需要对某段时间内所有的耗时做排序后才能统计出这段时间的 99% 的耗时状况。而对于流式数据做这样的统计是有一些算法和数据结构的,例如 tdigest,但是 MQCloud 采用了非精确的但是较为简单的分段统计的方法,具体如下: 1、创建一个按照最大耗时预哈希的时间跨度不同的耗时分段数组: 第一段:耗时范围 0ms~10ms,时间跨度为 1ms。 第二组:耗时范围 11ms~100ms,时间跨度 5ms。 第三组:耗时范围 101ms~3500ms,时间跨度 50ms。 _优点:此种分段方法占用内存是固定的,比如最大耗时如果为3500ms,那么只需要空间大小为96的数组即可缺点:分段精度需要提前设定好,且不可更改。_ 2、针对上面的分段数组,创建一个大小对应的AtomicLong的计数数组,支持并发统计: 3、耗时统计时,计算耗时对应的耗时分段数组下标,然后调用计数数组进行统计即可,参考下图: 例如某次耗时为18ms,首先找到它所属的区间,即归属于[16~20]ms之间,对应的数组下标为12。 根据第一步找到的数组下标12,获取对应的计数数组下标12。 获取对应的计数器进行+1操作,即表示18ms发生了一次调用。 这样,从计数数组就可以得到实时耗时统计,类似如下: 4、然后定时采样任务会每分钟对计数数组进行快照,产生如下耗时数据: 5、由于上面的耗时数据天然就是排好序的,可以很容易计算 99%、90%、平均耗时等数据了。 _另外提一点,由于 RocketMQ 4.4.0 新增的 trace 功能也使用 hook 来实现,与 MQCloud 的统计有冲突,MQCloud 已经做了兼容。Trace 和统计是两种维度,trace 反映的是消息从生产存储消费的流程,而 MQCloud 做的是针对生产者状况的统计,有了这些统计数据,才可以做到生产耗时情况展示,生产异常情况预警等功能。_ 2、机器统计 关于集群状况收集主要采用了将nmon自动放置到/tmp目录,定时采用ssh连接到机器执行nmon命令,解析返回的数据,然后进行存储。 上面这些工作就为监控和预警奠定了坚实的数据基础。 一、单独定制的客户端 针对客户端的一些需求,mqclient 在 rocketmqclient 的基础上进行了开发定制: 1、多集群支持 MQCloud储存了生产者、消费者和集群的关系,通过路由适配,客户端可以自动路由到目标集群上,使客户端对多集群透明。 2、透明的trace集群 通过搭建单独的trace集群和定制客户端,使trace数据能够发往独立的集群,防止影响主集群。 3、序列化 通过集成不同的序列化机制,配合MQCloud,客户端无需关心序列化问题。 目前支持的序列化为protobuf和json,并且通过类型检测支持在线修改序列化方式。 4、流控 通过提供令牌桶和漏桶限流机制,自动开启流控机制,防止消息洪峰冲垮业务端,也为需要精准控制流速的业务提供了方便。 5、隔离降级 针对生产消息使用hystrix提供了隔离api,使业务端在broker故障时可以避免拖累。 6、埋点监控 通过对客户端数据进行统计,收集,在MQCloud里进行监控,使客户端任何风吹草动都能及时得知。 7、规范问题 通过编码保障,使某些约定,规范和最佳实践得以实现。包括但不限于: 命名规范 消费组全局唯一,防止重复导致消费问题 重试消息跳过 安全关闭等等 更完善的重试机制 二、近乎自动化运维 1、部署 手动部署一台 broker 实例没什么问题,但是当实例变多时,手动部署极易出错且耗时耗力。 MQCloud 提供了一套自动化部署机制,包括停止写入,上下线,本地更新,远程迁移(包含数据校验): 支持一键部署: 另外,broker 作为 RocketMQ 的核心,其配置有百项之多,而且好多涉及到性能调优,调整时往往需要根据服务器的状况谨慎调整,MQCloud 开发了配置模板功能来支持灵活的部署项: 2、机器运维 MQCloud 提供了一整套机器的运维机制,大大提升了生产力。 3、可视化的集群拓扑 三、安全性加固 1、开启管理员权限 RocketMQ 从 4.4.0 开始支持 ACL,但是默认没有开启,也就是任何人使用管理工具或 API 就可以直接操纵线上集群。但是开启 ACL 对现有业务影响太大,针对这种情况 MQCloud 进行专门定制。 借鉴 RocketMQ ACL 机制,只针对 RocketMQ 管理员操作加固权限校验: 并且支持自定义和热加载管理员请求码,使得非法操作 RocketMQ 集群成为不可能,安全性大大提升。 2broker 通信加固 broker 同步数据代码由于没有校验,存在安全隐患,只要连接 master 监听的 slave 通信端口,发送数据大于 8 个字节,就可能导致同步偏移量错误,代码如下: MQCloud 通过验证数据首包的策略,保障了通信的安全性。 if ((this.byteBufferRead.position() this.processPostion) = 8) {  int pos = this.byteBufferRead.position() (this.byteBufferRead.position() % 8);  long readOffset = this.byteBufferRead.getLong(pos 8);  this.processPostion = pos;  HAConnection.this.slaveAckOffset = readOffset;  if (HAConnection.this.slaveRequestOffset < 0) {      HAConnection.this.slaveRequestOffset = readOffset;      log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);  }  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); } 开源之路 目前 MQCloud 运维规模如下: 服务器:50台+ 集群:5个+ topic:800个+ consumer:1400+ 生产消费消息量/日:4 亿条+ 生产消费消息大小/日:400G+ MQCloud 在充分考虑和吸收实际业务的需求后,以各个角色聚焦为核心,以全面监控为目标,以满足各业务端需求为己任,在不断地发展和完善。 在 MQCloud 逐渐成熟之后,秉承着服务于社区和吸收更多经验的理念,我们开放了源代码。经过设计和拆分,MQCloud 于 18 年开源了,从第一个版本 release 到现在已经过去两年了,期间随着更新迭代大大小小一共 release 了 20 多个版本。其中不但包含功能更新、bug 修复、wiki 说明等,而且每个大版本都经过详细的测试和内部的运行。之后很多小伙伴跃跃欲试,来试用它,并提出一些建议和意见,我们根据反馈来进一步完善它。 我们将一直遵循我们的目标,坚定的走自己的开源之路: 为业务提供可监控,可预警,可满足其各种需求的稳定的 MQ 服务。 积累 MQ 领域经验,将经验转化为产品,更好的服务业务。
#行业实践

2021年6月30日

同程旅行网基于 RocketMQ 高可用架构实践
背景介绍 为何选择 RocketMQ 我们在几年前决定引入 MQ 时,市场上已经有不少成熟的解决方案,比如 RabbitMQ , ActiveMQ,NSQ,Kafka 等。考虑到稳定性、维护成本、公司技术栈等因素,我们选择了 RocketMQ : 纯 Java 开发,无依赖,使用简单,出现问题能 hold ; 经过阿里双十一考验,性能、稳定性可以保障; 功能实用,发送端:同步、异步、单边、延时发送;消费端:消息重置,重试队列,死信队列; 社区活跃,出问题能及时沟通解决。 使用情况 主要用于削峰、解耦、异步处理; 已在火车票、机票、酒店等核心业务广泛使用,扛住巨大的微信入口流量; 在支付、订单、出票、数据同步等核心流程广泛使用; 每天 1000+ 亿条消息周转。 下图是 MQ 接入框架图 由于公司技术栈原因,client sdk 我们提供了 java sdk ;对于其他语言,收敛到 http proxy ,屏蔽语言细节,节约维护成本。按照各大业务线,对后端存储节点进行了隔离,相互不影响。 MQ 双中心改造 之前单机房出现过网络故障,对业务影响较大。为保障业务高可用,同城双中心改造提上了日程。 为何做双中心 单机房故障业务可用; 保证数据可靠:若所有数据都在一个机房,一旦机房故障,数据有丢失风险; 横向扩容:单机房容量有限,多机房可分担流量。 双中心方案 做双中心之前,对同城双中心方案作了些调研,主要有冷(热)备份、双活两种。(当时社区 Dledger 版本还没出现,Dledger 版本完全可做为双中心的一种可选方案。) 1)同城冷(热)备份 两个独立的 MQ 集群, 用户流量写到一个主集群,数据实时同步到备用集群,社区有成熟的 RocketMQ Replicator 方案,需要定期同步元数据,比如主题,消费组,消费进度等。 2)同城双活 两个独立 MQ 集群,用户流量写到各自机房的 MQ 集群,数据相互不同步。 平时业务写入各自机房的 MQ 集群,若一个机房挂了,可以将用户请求流量全部切到另一个机房,消息也会生产到另一个机房。 对于双活方案,需要解决 MQ 集群域名。 1)若两个集群用一个域名,域名可以动态解析到各自机房。此方式要求生产、消费必须在同一个机房。假如生产在 idc1 ,消费在 idc2 ,这样生产、消费各自连接一个集群,没法消费数据。 2)若一个集群一个域名,业务方改动较大,我们之前对外服务的集群是单中心部署的,业务方已经大量接入,此方案推广较困难。 为尽可能减少业务方改动,域名只能继续使用之前的域名,最终我们采用一个 Global MQ 集群,跨双机房,无论业务是单中心部署还是双中心部署都不影响;而且只要升级客户端即可,无需改动任何代码。 双中心诉求 就近原则:生产者在 A 机房,生产的消息存于 A 机房 broker ; 消费者在 A 机房,消费的消息来自 A 机房 broker 。 单机房故障:生产正常,消息不丢。 broker 主节点故障:自动选主。 就近原则 简单说,就是确定两件事: 节点(客户端节点,服务端节点)如何判断自己在哪个 idc; 客户端节点如何判断服务端节点在哪个 idc。 如何判断自己在哪个 idc? 1) ip 查询 节点启动时可以获取自身 ip ,通过公司内部的组件查询所在的机房。 2)环境感知 需要与运维同学一起配合,在节点装机时,将自身的一些元数据,比如机房信息等写入本地配置文件,启动时直接读写配置文件即可。 我们采用了第二个方案,无组件依赖,配置文件中 logicIdcUK 的值为机房标志。 客户端节点如何识别在同一个机房的服务端节点? 客户端节点可以拿到服务端节点的 ip 以及 broker 名称的,因此: ip 查询:通过公司内部组件查询 ip 所在机房信息; broker 名称增加机房信息:在配置文件中,将机房信息添加到 broker 名称上; 协议层增加机房标识:服务端节点向元数据系统注册时,将自身的机房信息一起注册。 相对于前两者,实现起来略复杂,改动了协议层, 我们采用了第二种与第三种结合的方式。 就近生产 基于上述分析,就近生产思路很清晰,默认优先本机房就近生产; 若本机房的服务节点不可用,可以尝试扩机房生产,业务可以根据实际需要具体配置。 就近消费 优先本机房消费,默认情况下又要保证所有消息能被消费。 队列分配算法采用按机房分配队列 每个机房消息平均分给此机房消费端; 此机房没消费端,平分给其他机房消费端。 伪代码如下: Map mqs = classifyMQByIdc(mqAll); Map cids = classifyCidByIdc(cidAll); Set< result = new HashSet<; for(element in mqs){ result.add(allocateMQAveragely(element, cids, cid)); //cid为当前客户端 } 消费场景主要是消费端单边部署与双边部署。 单边部署时,消费端默认会拉取每个机房的所有消息。 双边部署时,消费端只会消费自己所在机房的消息,要注意每个机房的实际生产量与消费端的数量,防止出现某一个机房消费端过少。 单机房故障 每组 broker 配置 一主两从,一主一从在一机房,一从在另一机房;某一从同步完消息,消息即发送成功。 单机房故障 消息生产跨机房;未消费消息在另一机房继续被消费。 故障切主 在某一组 broker 主节点出现故障时,为保障整个集群的可用性,需要在 slave 中选主并切换。要做到这一点,首先得有个broker 主故障的仲裁系统,即 nameserver(以下简称 ns )元数据系统(类似于 redis 中的哨兵)。 ns 元数据系统中的节点位于三个机房(有一个第三方的云机房,在云上部署 ns 节点,元数据量不大,延时可以接受),三个机房的 ns 节点通过 raft 协议选一个leader,broker 节点会将元数据同步给 leader, leader 在将元数据同步给 follower 。 客户端节点获取元数据时, 从 leader,follower 中均可读取数据。 切主流程 若 nameserver leader 监控到 broker 主节点异常, 并要求其他 follower 确认;半数 follower 认为 broker 节点异常,则 leader 通知在 broker 从节点中选主,同步进度大的从节点选为主; 新选举的 broker 主节点执行切换动作并注册到元数据系统; 生产端无法向旧 broker 主节点发送消息。 流程图如下 切中心演练 用户请求负载到双中心,下面的操作先将流量切到二中心回归双中心切到一中心。确保每个中心均可承担全量用户请求。 先将用户流量全部切到二中心 流量回归双中心,并切到一中心 回顾 全局 Global 集群 就近原则 一主二从,写过半消息即及写入成功 元数据系统 raft 选主 broker 主节点故障,自动选主 MQ 平台治理 即使系统高性能、高可用,倘若随便使用或使用不规范,也会带来各种各样的问题,增加了不必要的维护成本,因此必要的治理手段不可或缺。 目的 让系统更稳定 及时告警 快速定位、止损 治理哪些方面 主题/消费组治理 申请使用 生产环境 MQ 集群,我们关闭了自动创建主题与消费组,使用前需要先申请并记录主题与消费组的项目标识与使用人。一旦出现问题,我们能够立即找到主题与消费组的负责人,了解相关情况。若存在测试,灰度,生产等多套环境,可以一次申请多个集群同时生效的方式,避免逐个集群申请的麻烦。 生产速度 为避免业务疏忽发送大量无用的消息,有必要在服务端对主题生产速度进行流控,避免这个主题挤占其他主题的处理资源。 消息积压 对消息堆积敏感的消费组,使用方可设置消息堆积数量的阈值以及报警方式,超过这个阈值,立即通知使用方;亦可设置消息堆积时间的阈值,超过一段时间没被消费,立即通知使用方。 消费节点掉线 消费节点下线或一段时间无响应,需要通知给使用方。 客户端治理 发送、消费耗时检测 监控发送/消费一条消息的耗时,检测出性能过低的应用,通知使用方着手改造以提升性能;同时监控消息体大小,对消息体大小平均超过 10 KB 的项目,推动项目启用压缩或消息重构,将消息体控制在 10 KB 以内。 消息链路追踪 一条消息由哪个 ip 、在哪个时间点发送,又由哪些 ip 、在哪个时间点消费,再加上服务端统计的消息接收、消息推送的信息,构成了一条简单的消息链路追踪,将消息的生命周期串联起来,使用方可通过查询msgId或事先设置的 key 查看消息、排查问题。 过低或有隐患版本检测 随着功能的不断迭代,sdk 版本也会升级并可能引入风险。定时上报 sdk 版本,推动使用方升级有问题或过低的版本。 服务端治理 集群健康巡检 如何判断一个集群是健康的?定时检测集群中节点数量、集群写入 tps 、消费 tps ,并模拟用户生产、消费消息。 集群性能巡检 性能指标最终反映在处理消息生产与消费的时间上。服务端统计处理每个生产、消费请求的时间,一个统计周期内,若存在一定比例的消息处理时间过长,则认为这个节点性能有问题;引起性能问题的原因主要是系统物理瓶颈,比如磁盘 io util 使用率过高,cpu load 高等,这些硬件指标通过夜鹰监控系统自动报警。 集群高可用 高可用主要针对 broker 中 master 节点由于软硬件故障无法正常工作,slave 节点自动被切换为 master ,适合消息顺序、集群完整性有要求的场景。 部分后台操作展示 主题与消费组申请 生产,消费,堆积实时统计 集群监控 踩过的坑 社区对 MQ 系统经历了长时间的改进与沉淀,我们在使用过程中也到过一些问题,要求我们能从深入了解源码,做到出现问题心不慌,快速止损。 新老消费端并存时,我们实现的队列分配算法不兼容,做到兼容即可; 主题、消费组数量多,注册耗时过长,内存 oom ,通过压缩缩短注册时间,社区已修复; topic 长度判断不一致,导致重启丢消息,社区已修复; centos 6.6 版本中,broker 进程假死,升级 os 版本即可。 MQ 未来展望 目前消息保留时间较短,不方便对问题排查以及数据预测,我们接下来将对历史消息进行归档以及基于此的数据预测。 历史数据归档 底层存储剥离,计算与存储分离 基于历史数据,完成更多数据预测 服务端升级到 Dledger ,确保消息的严格一致
作者:阿里云云原生
#行业实践 #高可用

2021年6月28日

解读Apache RocketMQ 5.0 全新的高可用设计
高可用架构演进背景 在分布式系统中不可避免的会遇到网络故障,机器宕机,磁盘损坏等问题,为了向用户不中断且正确的提供服务,要求系统有一定的冗余与容错能力。RocketMQ 在日志,统计分析,在线交易,金融交易等丰富的生产场景中发挥着至关重要的作用,而不同环境对基础设施的成本与可靠性提出了不同的诉求。在 RocketMQ v4 版本中有两种主流高可用设计,分别是主备模式的无切换架构和基于 Raft 的多副本架构(图中左侧和右侧所示)。生产实践中我们发现,两副本的冷备模式下备节点资源利用率低,主宕机时特殊类型消息存在可用性问题;而 Raft 高度串行化,基于多数派的确认机制在扩展只读副本时不够灵活,无法很好的支持两机房对等部署,异地多中心等复杂场景。RocketMQ v5 版本融合了上述方案的优势,提出 DLedger Controller 作为管控节点(中间部分所示),将选举逻辑插件化并优化了数据复制的实现。 如何实现高可用系统 副本组与数据分片 在 PrimaryBackup 架构的分布式系统中,一份数据将被复制成多个副本来避免数据丢失。处理相同数据的一组节点被称为副本组(ReplicaSet),副本组的粒度可以是单个文件级别的(例如 HDFS),也可以是分区级 / 队列级的(例如 Kafka),每个真实存储节点上可以容纳若干个不同副本组的副本,也可以像 RocketMQ 一样粗粒度的独占节点。独占能够显著简化数据写入时确保持久化成功的复杂度,因为每个副本组上只有主副本会响应读写请求,备机一般配置只读来提供均衡读负载,选举这件事儿等价于让副本组内一个副本持有独占的写锁。 RocketMQ 为每个存储数据的 Broker 节点配置 ClusterName,BrokerName 标识来更好的进行资源管理。多个 BrokerName 相同的节点构成一个副本组。每个副本还拥有一个从 0 开始编号,不重复也不一定连续的 BrokerId 用来表示身份,编号为 0 的节点是这个副本组的 Leader / Primary / Master,故障时通过选举来重新对 Broker 编号标识新的身份。例如 BrokerId = {0, 1, 3},则 0 为主,其他两个为备。 一个副本组内,节点间共享数据的方式有多种,资源的共享程度由低到高来说一般有 Shared Nothing,Shared Disk,Shared Memory,Shared EveryThing。典型的 Shared Nothing 架构是 TiDB 这类纯分布式的数据库,TiDB 在每个存储节点上使用基于 RocksDB 封装的 TiKV 进行数据存储,上层通过协议交互实现事务或者 MVCC。相比于传统的分库分表策略来说,TiKV 易用性和灵活程度很高,更容易解决数据热点与伸缩时数据打散的一系列问题,但实现跨多节点的事务就需要涉及到多次网络的通信。另一端 Shared EveryThing 的案例是 AWS 的 Aurora,Aliyun 的 PolarStore,旁路 Kernal 的方式使应用完全运行于用户态,以最大程度的存储复用来减少资源消耗,一主多备完全共用一份底层可靠的存储,实现一写多读,快速切换。 大多数 KV 操作都是通过关键字的一致性哈希来计算所分配的节点,当这个节点所在的主副本组产生存储抖动,主备切换,网络分区等情况下,这个分片所对应的所有键都无法更新,局部会有一些操作失败。消息系统的模型有所不同,流量大但跨副本组的数据交互极少,无序消息发送到预期分区失败时还可以向其他副本组(分片)写入,一个副本组的故障不影响全局,这在整体服务的层面上额外提供了跨副本组的可用性。此外,考虑到 MQ 作为 Paas 层产品,被广泛部署于 Windows,Linux on Arm 等各种环境,只有减少和 Iaas 层产品的深度绑定,才能提供更好的灵活性。这种局部故障隔离和轻依赖的特性是 RocketMQ 选则 Shared Nothing 模型重要原因。 副本组中,各个节点处理的速度不同,也就有了日志水位的概念。Master 和与其差距不大的 Slave 共同组成了同步副本集(SyncStateSet)。如何定义差距不大呢?衡量的指标可以是日志水位(文件大小)差距较小,也可以是备落后的时间在一定范围内。在主宕机时,同步副本集中的其余节点有机会被提升为主,有时需要对系统进行容灾演练,或者对某些机器进行维护或灰度升级时希望定向的切换某一个副本成为新主,这又产生了优先副本(PriorityReplica)的概念。选择优先副本的原则和策略很多,可以动态选择水位最高,加入时间最久或 CommitLog 最长的副本,也可以支持机架,可用区优先这类静态策略。 从模型的角度来看,RocketMQ 单节点上 Topic 数量较多,如果像 kafka 以 topic / partition 粒度维护状态机,节点宕机会导致上万个状态机切换,这种惊群效应会带来很多潜在风险,因此 v4 版本时 RocketMQ 选择以单个 Broker 作为切换的最小粒度来管理,相比于其他更细粒度的实现,副本身份切换时只需要重分配 Broker 编号,对元数据节点压力最小。由于通信的数据量少,可以加快主备切换的速度,单个副本下线的影响被限制在副本组内,减少管理和运维成本。这种实现也一些缺点,例如存储节点的负载无法以最佳状态在集群上进行负载均衡,Topic 与存储节点本身的耦合度较高,水平扩展一般会改变分区总数,这就需要在上层附加额外的处理逻辑。 为了更规范更准确的衡量副本组的可用性指标,学术上就引入了几个名词: RTO(Recovery Time Objective)恢复时间目标,一般表示业务中断到恢复的时间。 RPO(Recovery Point Object)恢复点目标,用于衡量业务连续性。例如某个硬盘每天备份,故障时丢失最近备份后的所有更新。 SLA(ServiceLevel Agreement)服务等级协议,厂商以合约的形式对用户进行服务质量承诺,SLA 越高通常成本也越高。 节点数量与可靠性关系密切,根据不同生产场景,RocketMQ 的一个副本组可能会有 1,2,3,5 个副本。 1. 单副本成本最低,维护最简单,宕机时其他副本组接管新消息的写入,但已写入的数据无法读取,造成部分消息消费延迟。底层硬件故障还可能导致数据永久丢失,一般用于非关键日志,数据采集等低可靠性成本诉求较强的场景。 2. 两副本较好的权衡了数据冗余的成本与性能,RocketMQ 跨副本组容灾的特性使得两副本模式适用于绝大部分 IOPS 比较高的场景。此时备机可以分摊一定的读压力(尤其是主副本由于内存紧张或者产生冷读时)。两副本由于不满足多数派(quorum)原则,没有外部系统的参与时,故障时无法进行选举切换。 3. 三副本和五副本是业界使用最为广泛的,精心设计的算法使得多数情况下系统可以自愈。基于 Paxos / Raft 属于牺牲高可用性来保证一致性的 CP 型设计,存储成本很高,容易受到 IO 分布不均匀和水桶效应的影响。每条数据都需要半数以上副本响应的设计在需要写透(write through)多副本的消息场景下不够灵活。 日志复制还是消息复制 如何保证副本组中数据的最终一致性?那肯定是通过数据复制的方式实现,我们该选择逻辑复制还是物理复制呢? 逻辑复制:使用消息来进行同步的场景也很多,各种 connector 实现本质上就是把消息从一个系统挪到另外一个系统上,例如将数据导入导出到 ES,Flink 这样的系统上进行分析,根据业务需要选择特定 Topic / Tag 进行同步,灵活程度和可扩展性非常高。这种方案随着 Topic 增多,系统还会有服务发现,位点和心跳管理等上层实现造成的性能损失。因此对于消息同步的场景,RocketMQ 也支持以消息路由的形式进行数据转移,将消息复制作为业务消费的特例来看待。 物理复制:大名鼎鼎的 MySQL 对于操作会记录逻辑日志(bin log)和重做日志(redo log)两种日志。其中 bin log 记录了语句的原始逻辑,比如修改某一行某个字段,redo log 属于物理日志,记录了哪个表空间哪个数据页改了什么。在 RocketMQ 的场景下,存储层的 CommitLog 通过链表和内核的 MappedFile 机制抽象出一条 append only 的数据流。主副本将未提交的消息按序传输给其他副本(相当于 redo log),并根据一定规则计算确认位点(confirm offset)判断日志流是否被提交。这种方案仅使用一份日志和位点就可以保证主备之间预写日志的一致性,简化复制实现的同时也提高了性能。 为了可用性而设计的多副本结构,很明显是需要对所有需要持久化的数据进行复制的,选择物理复制更加节省资源。RocketMQ 在物理复制时又是如何保证数据的最终一致性呢?这就涉及到数据的水位对齐。对于消息和流这样近似 FIFO 的系统来说,越近期的消息价值越高,消息系统的副本组的单个节点不会像数据库系统一样,保留这个副本的全量数据,Broker 一方面不断的将冷数据规整并转入低频介质来节约成本,同时对热数据盘上的数据也会由远及近滚动删除。如果副本组中有副本宕机较久,或者在备份重建等场景下就会出现日志流的不对齐和分叉的复杂情况。在下图中我们将主节点的 CommitLog 的首尾位点作为参考点,这样就可以划分出三个区间。在下图中以蓝色箭头表示。排列组合一下就可以证明备机此时的 CommitLog 一定满足下列 6 种情况之一。 下面对每种情况进行讨论与分析: 11 情况下满足备 Max 主 Max,可能由于主异步写磁盘宕机后又成为主,或者网络分区时双主写入造成 CommitLog 分叉。由于新主落后于备,少量未确认的消息丢失,非正常模式的选举(RocketMQ 将这种情况称为 unclean 选举)是应该尽量避免的。 33 理论上不会出现,备的数据长于主,原因可能是主节点数据丢失又叠加了非正常选举,因此这种情况需要人工介入处理。 租约与节点身份变更 前文提到 RocketMQ 每个副本组的主副本才接受外部写请求,节点的身份又是如何决定的呢? 分布式系统一般分为中心化架构和去中心化架构。对于 MultiRaft,每个副本组包含三个或者五个副本,副本组内可以通过 Paxos / Raft 这样的共识协议来进行选主。典型的中心化架构,为了节省数据面资源成本会部署两副本,此时依赖于外部 ZK,ETCD,或者 DLedger Controller 这样的组件作为中心节点进行选举。由外置组件裁决成员身份涉及到分布式中两个重要的问题:1. 如何判断节点的状态是否正常。2. 如何避免双主问题。 对于第一个问题,kubernetes 的解决方案相对优雅,k8s 对与 Pod 的健康检查包括存活检测(Liveness probes)和就绪检测(Readiness probes),Liveness probes 主要是探测应用是否还活着,失败时重启 Pod。Readiness probes 来判断探测应用是否接受流量。简单的心跳机制一般只能实现存活检测,来看一个例子:假设有副本组中有 A、B、C 三个副本,另有一个节点 Q(哨兵) 负责观测节点状态,同时承担了全局选举与状态维护的职责。节点 A、B、C 周期性的向 Q 发送心跳,如果 Q 超过一段时间(一般是两个心跳间隔 )收不到某个节点的心跳则认为这个节点异常。如果异常的是主副本,Q 将副本组的其他副本提升为主并广播告知其他副本。 在工程实践中,节点下线的可能性一般要小于网络抖动的可能性。我们假设节点 A 是副本组的主,节点 Q 与节点 A 之间的网络中断。节点 Q 认为 A 异常。重新选择节点 B 作为新的 Master,并通知节点 A、B、C 新的 Master 是节点 B。节点 A 本身工作正常,与节点 B、C 之间的网络也正常。由于节点 Q 的通知事件到达节点 A、B、C 的顺序是未知的,假如先达到 B,在这一时刻,系统中同时存在两个工作的主,一个是 A,另一个是 B。假如此时 A、B 都接收外部请求并与 C 同步数据,会产生严重的数据错误。上述 "双主" 问题出现的原因在于虽然节点 Q 认为节点 A 异常,但节点 A 自己不认为自己异常,在旧主新主都接受写入的时候就产生了日志流的分叉,其问题的本质是由于网络分区造成的系统对于节点状态没有达成一致。 租约是一种避免双主的有效手段,租约的典型含义是现在中心节点承认哪个节点为主,并允许节点在租约有效期内正常工作。如果节点 Q 希望切换新的主,只需等待前一个主的租约过期,则就可以安全的颁发新租约给新 Master 节点,而不会出现双主问题。这种情况下系统对 Q 本身的可用性诉求非常高,可能会成为集群的性能瓶颈。生产中使用租约还有很多实现细节,例如依赖时钟同步需要颁发者的有效期设置的比接收者的略大,颁发者本身的切换也较为复杂。 在 RocketMQ 的设计中,希望以一种去中心化的设计降低中心节点宕机带来的全局风险,(这里认为中心化和是否存在中心节点是两件事)所以没有引入租约机制。在 Controller (对应于 Q )崩溃恢复期间,由于 Broker 对自己身份会进行永久缓存,每个主副本会管理这个副本组的状态机,RocketMQ Dledger Controller 这种模式能够尽量保证在大部分副本组在哨兵组件不可用时仍然不影响收发消息的核心流程。而旧主由于永久缓存身份,无法降级导致了网络分区时系统必须容忍双主。产生了多种解决方案,用户可以通过预配置选择 AP 型可用性优先,即允许系统通过短时分叉来保障服务连续性(下文还会继续谈谈为什么消息系统中分叉很难避免),还是 CP 型一致性优先,通过配置最小副本 ack 数超过集群半数以上节点。此时发送到旧主的消息将因为无法通过 ha 链路将数据发送给备,向客户端返回超时,由客户端将发起重试到其他分片。客户端经历一个服务发现的周期之后,客户端就可以正确发现新主。 特别的,在网络分区的情况下,例如旧主和备,Controller 之间产生网络分区,此时由于没有引入租约机制,旧主不会自动降级,旧主可以配置为异步双写,每一条消息需要经过主备的双重确认才能向客户端返回成功。而备在切换为主时,会设置自己只需要单个副本确认的同步写盘模式。此时,客户端短时间内仍然可以向旧主发送消息,旧主需要两副本确认才能返回成功,因此发送到旧主的消息会返回 SLAVE_NOT_AVAILABLE 的超时响应,通过客户端重试将消息发往新的节点。几秒后,客户端从 NameServer / Controller 获取新的路由时,旧主从客户端缓存中移除,此时完成了备节点的提升。 外置的组件可以对节点身份进行分配,上图展示了一个两副本的副本组上线流程: 1. 多个 Controller 通过选举和对 Broker 的请求进行重定向,最终由一个 Controller 做为主节点进行身份分配。 2. 如果 RocketMQ 副本组存在多个副本且需要选主,节点默认以备的身份启动,备节点会将自己注册到 Controller。 3. 节点从 Controller 获取 BrokerMemberGroup,包含了这个副本组的描述和连接信息。 1. 若分配的身份为备,解析出主节点的对外服务的地址并连接,完成日志截断后进行 HA 同步。 2. 若分配的身份为主,等待备机连接到自身的 HA 端口,并向 NameServer 再次宣告自己是主节点。 4. 主节点维护整个副本组的信息,向备发起数据复制,周期性的向 Controller 汇报主备之间水位差距,复制速度等。 RocketMQ 弱依赖 Controller 的实现并不会打破 Raft 中每个 term 最多只有一个 leader 的假设,工程中一般会使用 Leader Lease 解决脏读的问题,配合 Leader Stickiness 解决频繁切换的问题,保证主的唯一性。 Leader Lease: 租约,上一任 Leader 的 Lease 过期后,等待一段时间再发起 Leader 选举。 Leader Stickiness:Leader Lease 未过期的 Follower 拒绝新的 Leader 选举请求。 _注:Raft 认为具有最新已提交的日志的节点才有资格成为 Leader,而 MultiPaxos 无此限制。_ 对于日志的连续性问题,Raft 在确认一条日志之前会通过位点检查日志连续性,若检查到日志不连续会拒绝此日志,保证日志连续性,MultiPaxos 允许日志中有空洞。Raft 在 AppendEntries 中会携带 Leader 的 commit index,一旦日志形成多数派,Leader 更新本地的 commit index(对应于 RocketMQ 的 confirm offset)即完成提交,下一条 AppendEntries 会携带新的 commit index 通知其它节点,MultiPaxos 没有日志连接性假设,需要额外的 commit 消息通知其它节点。 计算日志分叉位点 除了网络分区,很多情况导致日志数据流分叉。有如下案例:三副本采用异步复制,异步持久化,A 为旧主 B C 为备,切换瞬间 B 日志水位大于 C,此时 C 成为新主,B C 副本上的数据会产生分叉,因为 B 还多出了一段未确认的数据。那么 B 是如何以一个简单可靠的方法去判断自己和 C 数据分叉的位点? 一个直观的想法就是,直接将主备的 CommitLog 从前向后逐渐字节比较,一般生产环境下,主备都有数百 GB 的日志文件流,读取和传输大量数据的方案费时费力。很快我们发现,确定两个大文件是否相同的一个好办法就是比较数据的哈希值,需要对比的数据量一下子就从数百 GB 降低为了几百个哈希值,对于第一个不相同的 CommitLog 文件,还可以采取局部哈希的方式对齐,这里仍然存在一些计算的代价。还有没有优化的空间呢,那就是利用任期 Epoch 和偏移量 StartOffset 实现一个新的截断算法。这种 EpochStartOffset 满足如下原则: 1. 通过共识协议保证给定的一个任期 Epoch 只有一个Leader。 2. 只有 Leader 可以写入新的数据流,满足一定条件才会被提交。 3. Follower 只能从 Leader 获取最新的数据流,Follower 上线时按照选举算法进行截断。 下面是一个选举截断的具体案例,选举截断算法思想和流程如下: 主 CommitLog Min = 300,Max = 2500,EpochMap = {, , }备 CommitLog Min = 300,Max = 2500,EpochMap = {, , } 1. 备节点连接到主节点进行 HA 协商,获取主节点的 EpochStartOffset 信息并比较 2. 备从后向前找到任期起始点相同的那个点作为分叉任期,在上述案例里是 3. 选择这个任期里主备结束位点的最小值(如果主副本没有切换且为最大任期,则主副本的结束位点是无穷大) 实现的代码如下: ${e} 数据回发与日志截断 故障发生后,系统将会对分叉数据进行修复,有很多小小细节值得深究与探讨。 在实现数据截断的过程中,有一个很特殊的动作,当备切主的时候要把 ConsumeQueue 的 Confirm Offset 提升到 CommitLog 的 MaxPhyOffset,即使这一部分数据在主上是否被提交是未知的。回想起几年前看 Raft 的时候,当一条日志被传输到 Follower,Follower 确认收到这条消息,主再把这条日志应用到自己的状态机时,通知客户端和通知所有的 follower 去 commit 这条日志这两件事是并行的,假如 leader 先回复 client 处理成功,此时 leader 挂了,由于其他 follower 的确认位点 confirm offset 一般会略低于 leader,中间这段未决日志还没应用到 follower 的状态机上,这时就出现了状态机不一致的情况,即已经写入 leader 的数据丢失了。让我们来举一个具体的案例,假设两副本一主一备: 1. 主的 max offset = 100,主向备发送当前 confirm offset = 40 和 message buffer = [40100] 的数据 2. 备向主回复 confirm offset = 100 后主需要同时做几件事 1. 本地提交(apply) [40100] 区间的数据,用后台的 dispatch 线程异步构建这段数据的索引 2. 向 producer 响应 [40100] 这段数据是发送成功的。 3. 向多个备机异步的提交,实际上是发送了 confirm offset = 100 3. 此时主突然宕机,备机的 confirm offset 可能是 [40100] 中的值 所以当备切换为主的时候,如果直接以 40 进行截断,意味着客户端已经发送到服务端的消息丢失了,正确的水位应该被提升至 100。但是备还没有收到 2.3 的 confirm = 100 的信息,这个行为相当于要提交了未决消息。事实上新 leader 会遵守 "Leader Completeness" 的约定,切换时任何副本都不会删除也不会更改旧 leader 未决的 entry。新 leader 在新的 term 下,会直接应用一个较大的版本将未决的 entry 一起提交,这里副本组主备节点的行为共同保证了复制状态机的安全性。 那么备切换成功的标志是什么,什么时候才能接收 producer 新的流量呢?对于 Raft 来说一旦切换就可以,对于 RocketMQ 来说这个阶段会被稍稍推迟,即索引已经完全构建结束的时候。RocketMQ 为了保证构建 consume queue 的一致性,会在 CommitLog 中记录 consume queue offset 的偏移量,此时 confirm offset 到 max offset 间的数据是副本作为备来接收的,这部分消息在 consume queue 中的偏移量已经固定下来了,而 producer 新的流量时由于 RocketMQ 预计算位点的优化,等到消息实际放入 CommitLog 的再真实的数据分发(dispatch)的时候就会发现对应位置的 consume queue 已经被占用了,此时就造成了主备索引数据不一致。本质原因是 RocketMQ 存储层预构建索引的优化对日志有一些侵入性,但切换时短暂等待的代价远远小于正常运行时提速的收益。 消息中间件场景 a. 元数据变更是否依赖于日志 目前 RocketMQ 对于元数据是在内存中单独管理的,备机间隔 5 秒向当前的主节点同步数据。例如当前主节点上创建了一个临时 Topic 并接受了一条消息,在一个同步周期内这个 Topic 又被删除了,此时主备节点元数据可能不一致。又比如位点更新的时候,对于单个队列而言,多副本架构中存在多条消费位点更新链路,Consumer 拉取消息时更新,Consumer 主动向 broker 更新,管控重置位点,HA 链路更新,当副本组发生主备切换时,consumer group 同时发生 consumer 上下线,由于路由发现的时间差,还可能造成同一个消费组两个不同 consumer 分别消费同一副本组主备上同一个队列的情况。 原因在于备机重做元数据更新和消息流这两件事是异步的,这有一定概率会造成脏数据。由于 RocketMQ 单个节点上 Topic / Group 数量较多,通过日志的实现会导致持久化的数据量很大,在复杂场景下基于日志做回滚依赖 snapshot 机制也会增加计算开销和恢复时间。这个问题和数据库很像,MySQL 在执行 DDL 修改元数据时通过会创建 MDL 锁,阻塞用户其他操作访问表空间的访问。备库同步主库也会加锁,元数据修改开始点和结束点所代表的两个日志并不是一个原子操作,这意味着主库上在修改元数据的过程中如果宕机了,备库上持有的 MDL 锁就无法释放。MySQL 的解决方案是在主库每次崩溃恢复后,都写一条特殊的日志,通知所有连接的备库释放其持有的所有 MDL 排他锁。对所有操作都走日志流进行状态机复制要求存储层有多种日志类型,实现也更加复杂。RocketMQ 选择以另一种同步的模式操作,即类似 ZAB 这样二阶段协议,例如位点更新时的可以选择配置 LockInStrictMode 让备都同步这条修改。事实上 RocketMQ 为了优化上述位点跳跃的现象,客户端在未重启时,遇到服务端主备切换还会用优先采纳本地位点的方式获取消息,进一步减少重复消费。 b. 同步复制与异步复制 同步复制的含义是用户的一个操作在多个副本上都已经提交。正常情况下,假设一个副本组中的 3 个副本都要对相同一个请求进行确认,相当于数据写透 3 个副本(简称 33 写),33 写提供了非常高的数据可靠性,但是把所有从节点都配置为同步复制时任何一个同步节点的中断都会导致整个副本组处理请求失败。当第三个副本是跨可用区时,长尾也会带来一定的延迟。 异步复制模式下,尚未复制到从节点的写请求都会丢失。向客户端确认的写操作也无法保证被持久化。异步复制是一种故障时 RPO 不为 0 的配置模式,由于不用考虑从节点上的状态,总是可以继续响应写请求,系统的延迟更低,吞吐性能更好。为了权衡两者,通常只有其中一个从节点是同步的,而其他节点是异步的模式。只要同步的从节点变得不可用或性能下降,则将另一个异步的从节点提升为同步模式。这样可以保证至少有两个节点(即主节点和一个同步从节点)拥有最新的数据副本。这种模式称为 23 写,能帮助避免抖动,提供更好的延迟稳定性,有时候也叫称为半同步。 在 RocketMQ 的场景中,异步复制也被广泛应用在消息读写比极高,从节点数量多或者异地多副本场景。同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。实际应用中要结合业务场景合理设置持久化方式和主从复制方式,通常,由于网络的速度高于本地 IO 速度,采用异步持久化和同步复制是一个权衡性能与可靠性的设置。 c. 副本组自适应降级 同步复制的含义是一条数据同时被主备确认才返回用户操作成功,可以保证主宕机后消息还在备中,适合可靠性要求较高的场景,同步复制还可以限制未同步的数据量以减少 ha 链路的内存压力,缺点则是副本组中的某一个备出现假死就会影响写入。异步复制无需等待备确认,性能高于同步复制,切换时未提交的消息可能会丢失(参考前文的日志分叉)。在三副本甚至五副本且对可靠性要求高的场景中无法采用异步复制,采用同步复制需要每一个副本确认后才会返回,在副本数多的情况下严重影响效率。关于一条消息需要被多少副本确认这个问题,RocketMQ 服务端会有一些数量上的配置来进行灵活调整: TotalReplicas:全部副本数 InSyncReplicas:每条消息至少要被这个数量的 Broker 确认(如果主为 ASYNC_MASTER 或者 AllAck 模式则该参数不生效) MinInSyncReplicas:最小的同步副本数,如果 InSyncReplicas 。对于正常情况下,两个副本会处于同步复制,当备下线或假死时,会进行自适应降级,保证主节点还能正常收发消息,这个功能为用户提供了一个可用性优先的选择。 d. 轻量级心跳与快速隔离 在 RocketMQ v4.x 版本的实现中,Broker 周期性的(间隔 30 秒)将自身的所有 Topic 序列化并传输到 NameServer 注册进行保活。由于 Broker 上 Topic 的元数据规模较大,带来了较大的网络流量开销,Broker 的注册间隔不能设置的太短。同时 NameServer 对 Broker 是采取延迟隔离机制,防止 NameServer 网络抖动时可能瞬间移除所有 Broker 的注册信息,引发服务的雪崩。默认情况下异常主宕机时超过 2 分钟,或者备切换为主重新注册后才会替换。容错设计的同时导致 Broker 故障转移缓慢,RocketMQ v5.0 版本引入轻量级心跳(参数liteHeartBeat),将 Broker 的注册行为与 NameServer 的心跳进行了逻辑拆分,将心跳间隔减小到 1 秒。当 NameServer 间隔 5 秒(可配置)没有收到来自 Broker 的心跳请求就对 Broker 进行移除,使异常场景下自愈的时间从分钟级缩短到了秒级。 RocketMQ 高可用架构演进路线 无切换架构的演进 最早的时候,RocketMQ 基于 MasterSlave 模式提供了主备部署的架构,这种模式提供了一定的高可用能力,在 Master 节点负载较高情况下,读流量可以被重定向到备机。由于没有选主机制,在 Master 节点不可用时,这个副本组的消息发送将会完全中断,还会出现延迟消息、事务消息、Pop 消息等二级消息无法消费或者延迟。此外,备机在正常工作场景下资源使用率较低,造成一定的资源浪费。为了解决这些问题,社区提出了在一个 Broker 进程内运行多个 BrokerContainer,这个设计类似于 Flink 的 slot,让一个 Broker 进程上可以以 Container 的形式运行多个节点,复用传输层的连接,业务线程池等资源,通过单节点主备交叉部署来同时承担多份流量,无外部依赖,自愈能力强。这种方式下隔离性弱于使用原生容器方式进行隔离,同时由于架构的复杂度增加导致了自愈流程较为复杂。 切换架构的演进 另一条演进路线则是基于可切换的,RocketMQ 也尝试过依托于 Zookeeper 的分布式锁和通知机制进行 HA 状态的管理。引入外部依赖的同时给架构带来了复杂性,不容易做小型化部署,部署运维和诊断的成本较高。另一种方式就是基于 Raft 在集群内自动选主,Raft 中的副本身份被透出和复用到 Broker Role 层面去除外部依赖,然而强一致的 Raft 版本并未支持灵活的降级策略,无法在 C 和 A 之间灵活调整。两种切换方案都是 CP 设计,牺牲高可用优先保证一致性。主副本下线时选主和路由定时更新策略导致整个故障转移时间依然较长,Raft 本身对三副本的要求也会面临较大的成本压力,RocketMQ 原生的 TransientPool,零拷贝等一些用来避免减少 IO 压力的方案在 Raft 下无法有效使用。 RocketMQ DLedger 融合模式 RocketMQ DLedger 融合模式是 RocketMQ 5.0 演进中结合上述两条路线后的一个系统的解决方案。核心的特性有以下几点: 1. 利用可内嵌于 NameServer 的 Controller 进行选主,无外部依赖,对两副本支持友好。 2. 引入 EpochStartOffset 机制来计算日志分叉位点。 3. 消息在进行写入时,提供了灵活的配置来协调系统对于可用性还是一致性优先的诉求。 4. 简化日志复制协议使得日志复制为高效。 几种实现对比表如下: 与其他消息系统的对比 控制节点 1. 是否强制要求选主 Kafka 的 Controller 是 Broker 选举产生,这需要有一个存储节点间的服务发现机制。RocketMQ 的 Controller 可以作为管控节点单独存在。对 Kafka,Pulsar 而言必须选择主副本进行写入,随着时间的运行节点之间负载需要通过复杂的方案进行再均衡。对 RocketMQ 的融合架构而言,由于选主是可选的,静态布局的方案(例如无需依赖复杂的动态调度就可以较为均衡的实现跨机架跨可用区),并且无切换与切换架构可以相互转换。 2. Controller 的逻辑复杂度 RocketMQ Controller 相比 Kafka Controller 更加轻量,Kafka 的 Controller 承担 Partition 粒度的 ISR 维护和选举等功能,而RocketMQ 的 Controller 维护的数据是副本组粒度的,对于元数据只维护节点身份,更加简单。RocketMQ Controller 可以独立部署,也可以内嵌 NameServer 运行。 3. Controller 依赖程度 RocketMQ Broker 的同步副本集维护是 Master Broker 节点上报,由于不强依赖中心节点来提供租约,controller 宕机时虽然无法为同时有主故障的副本组选举,但不影响绝大部分副本组可用性。Pulsar 中通过 fencing 机制防止有多个 writer(pulsar 中的计算节点称为 broker)同时写同一个 partition,是对外部有依赖的。 数据节点 1. 副本存储结构的抽象与最小粒度不同,在这一点上其实三者的设计各有优势 Kafka 的存储抽象粒度是 Partition,对每个分区进行维护多副本,扩容需要进行数据复制,对于冷读支持更好。 RocketMQ 的日志流是 Broker 粒度的,顺序写盘效率更高,在磁盘空间不足时一般选择水平扩容,只需复制元数据。 Pulsar 其实抽象了一个分布式日志流 Journal,分区被进一步分成分片,根据配置的时间或大小进行滚动,扩容只需复制元数据。 2. 复杂的参数配置被收敛至服务端 Kafka 和 RocketMQ 都支持灵活的配置单条消息的 ack 数,即权衡数据写入灵活性与可靠性。RocketMQ 在向云原生演进的过程希望简化客户端 API 与配置,让业务方只需关心消息本身,选择在服务端配置统一配置这个值。 3. 副本数据的同步方式不同 Pulsar 采用星型写:数据直接从 writer 写到多个 bookeeper。适合客户端与存储节点混部场景。数据路径只需要 1 跳,延迟更低。缺点是当存储计算分离时,星型写需要更多的存储集群和计算集群间网络带宽。 RocketMQ 和 Kafka 采用 Y 型写:client 先写到一个主副本,由其再转发给另外 Broker 副本。虽然服务端内部带宽充裕,但需要 2 跳网络,会增加延迟。Y 型写利于解决文件多客户端写的问题,也更容易利用 23 写克服毛刺,提供更好的延迟稳定性。 高可用架构的未来 仔细阅读 RocketMQ 的源码,其实大家也会发现 RocketMQ 在各种边缘问题处理上细节满满,节点失效,网络抖动,副本一致性,持久化,可用性与延迟之间存在各种细微的权衡,这也是 RocketMQ 多年来在生产环境下所积累的核心竞争力之一。随着分布式技术的进一步发展,更多更有意思的技术,如基于 RDMA 网络的复制协议也呼之欲出。RocketMQ 将与社区协同进步,发展为 “消息,事件,流” 一体化的融合平台。 参考文档: 1. Paxos design: 2. SOFAJRaft: 3. Pulsar Geo Replication: 4. Pulsar Metadata: 5. Kafka Persistence: 6. Kafka Balancing leadership: 7. Windows Azure Storage: A Highly Available Cloud Storage Service with Strong Consistency: 8. PolarDB Serverless: A Cloud Native Database for Disaggregated Data Centers:
作者:斜阳
#技术探索 #强力推荐 #高可用

2021年6月23日

博时基金基于 RocketMQ 的互联网开放平台 Matrix 架构实践
作者|伍振河 博时基金互联网金融部架构师 、曾志 博时基金互联网金融部开发主管 随着近两年业绩的抢眼,公募基金迎来了乘风破浪式的发展,截至 2021 年 1 月底,资产管理规模已破 20 万亿,创下了历史新高。 在中国新经济高质量及科技创新发展的背景下,众多金融类的互联网平台与基金公司展开合作。互联网金融科技与传统金融业务的融合,促使传统金融公司的信息技术系统更加开放。 据此,2020 年,博时基金互联网金融部启动了互联网开放平台 Matrix 的建设工作。 博时基金互联网开放平台 Matrix 建设背景与目标 1、传统金融架构遇到的问题与挑战 传统的金融系统架构受到了互联网化的挑战,主要表现在以下几个方面: 1) 互联网入口缺乏管控 有多个团队提供不同形式的互联网服务,接口协议和权限管控方式不一致。当服务和接口越来越多时,API 管控能力不足的问题将会突显。 2) 系统较为封闭,开放能力不足 传统基金行业系统生态较为封闭,与合作伙伴开放生态的能力有待提升。 3) 金融场景化封装能力不足 传统基金行业系统普遍依赖于底层数据库提供的 ACID 特性实现事务一致性。微服务化之后,这套机制对金融场景化的产品包装能力显得捉襟见肘。 2、系统建设目标 1)多渠道统一安全接入 为自有系统与运营厂商提供标准化统一接入,实现内外部 API 统一的管控。 Matrix 开放给经过博时互联网平台资质认证后的第三方平台使用,需要根据第三方平台识别的不同身份,进行接口级别权限管控。 2)提供开放能力 搭建开放平台,与合作伙伴共建开放生态。在得到 Matrix 平台的授权后,第三方平台开发者可以通过调用博时基金互联网开放平台的接口能力,为第三方平台提供基金产品信息查询、注册开户、积分兑换、基金申赎、资产查询、联合登录等全方位服务;第三方平台可以根据自身实际情况自由选择或组合 APP 、微信公众号、微信小程序、H5 等前端方式对接。 3)封装基金行业特色功能 应用层实现分布式事务框架以保证整体事务的一致性。基于此,封装优惠购、投资陪伴等复杂的金融场景化功能,让开发者专注于业务开发,提升客户的投资体验。 Matrix 建设思路 1、总体架构 1)互联网架构图 基于 Spring Cloud 微服务套件和 RocketMQ 消息中间件,搭建的企业级云原生架构。 2、关键组件 1)API 网关 API 网关是微服务架构重要组件之一,是服务唯一入口。API 网关封装内部系统架构,横向抽离通用功能,如:权限校验、熔断限流、负载均衡等。通过 API 网关可以把内部 API 统一管控起来。 目前博时基金的互联网业务接入入口主要分为 3 类: 面向自营业务的博时基金移动端 APP 和 H5 。 面向合作伙伴的 OpenAPI 。即作为开放平台的入口,服务的 OpenAPI 会提供有条件的访问限制(时间、流量、频率),需要考虑流量控制、安全认证、接口授权方面的管理。 面向企业内部管理系统的 API ,提供企业内部系统访问。 Matrix 的 API 网关基于 Spring Cloud Gateway 构建,SCG 内置的 Route、Predicate 和 Filter 模块可以方便扩展出路由转发、统一鉴权等跨横切面的功能。基于内外部网络隔离的需求,我们独立部署了两套网关,其中 Kylin 网关提供互联网接入。Phoenix 网关用于域内系统接入,提供域账户的访问权限控制。 2)认证中心 为了保护 OpenAPI 的安全,避免恶意访问、未授权访问、黑客攻击等导致的安全隐患,开放平台需要增加授权认证模块。同时,在博时的内部的应用系统之间,也有单点登录的需求。统一的认证中心是微服务架构的必备组件。 Matrix 基于 OAuth2 协议构建了统一认证中心,实现用户、应用、接口的统一认证和鉴权。OAuth2 核心思路是通过各类认证手段认证用户身份,并颁发 Token ,使得第三方应用可以使用该令牌在限定时间、限定范围访问指定资源。Matrix 支持 OAuth2 的 Authorization Code 、Resource Owner Credentials 和 Client Credentials 三种授权类型,根据不同的应用场景,采用不同的授权类型颁发 Token ,为开放平台的安全保驾护航。 3)RocketMQ 消息中间件 技术选型 在技术选型过程中,我们主要考虑以下几点: 首先必须是国产化的产品,其次是比较流行并且社区活跃度高的开源产品。 另外,重点关注的 MQ 特性: 消息可靠传递,即确保不丢消息。 分布式事务,需要支持分布式事务,降低业务的复杂性。 性能,我们的场景主要是在线的金融类业务,需要 MQ 具备支持金融级的低延迟特性。 最后,从架构演进的角度来考虑,需要无缝对接我们的混合云架构,最终我们选择了 RocketMQ。 RocketMQ 是阿里巴巴自主研发及双 11 交易核心链路消息产品,提供金融级高可靠消息服务。在开源方面,开源 RocketMQ 已经完成了云原生技术栈的集成,包括Knative 中的事件源,Prometheus 的 Exporter,K8s 的 Operator 等;也支持了微服务框架 SpringCloud 以及函数计算框架 OpenWhisk ;同时开发了很多 Connector 作为 Sink 或者 Source 去连接了 ELK、Flume、Flink、Hadoop 等大数据和数据分析领域的优秀开源产品。 在 Matrix 开放平台,RocketMQ 主要有三类应用场景。 1) 用于金融产品的场景化包装 业务场景: 典型的业务场景如优惠购,基民通过优惠购功能申购基金,可将交易费率降为0。简单来说就是先购买博时货币基金,再通过快速转购的方式买入目标基金,豁免相关转换费率。 实现原理: Matrix 基于 RocketMQ 的事务消息搭建了一个高可靠、高可用的事务消息平台事务中心,涉及业务流程如下: 第一阶段是 Prepare ,即业务系统将 RocketMQ 的半事务消息发送到事务中心,事务中心不做发布,等待二次确认。Prepare 完成之后,业务系统执行主事务,即购买货币基金,成功后 commit 到事务中心,由事务中心投递消息到从事务。如果主事务失败,就投递 rollback 给事务中心。 反查机制: 由于网络抖动、业务系统重启等原因,可能导致事务消息的二次确认丢失。此时需要依赖反查机制恢复整个分布式事务的上下文。RocketMQ 提供的 Message Status Check 机制正是为解决分布式事务中的超时问题而设计的。事务中心的反查机制流程主要是,先检查事务中心的内部状态,再通过反查接口检查本地事务的执行结果,恢复事务上下文后,正常推进后续的流程。 依赖于 RocketMQ 提供的事务消息,事务中心在应用层实现了分布式事务,大大提升了对金融产品的场景化包装能力。 2) 用于系统间解耦 业务场景: 部门 A 负责根据市场、产品和客户的陪伴场景输出优质的陪伴内容,部门 B 负责把这些陪伴内容触达到不同的渠道和用户。 实现原理: 部门 A 的陪伴事件触发服务和部门 B 的陪伴触达服务之间通过 RocketMQ 消息进行业务解耦,即双方没有依赖关系,也不必同时在线。 3) 异步调用 业务场景: 异步调用的使用场景比较多,如用户注册、用户关键行为跟踪等。其中用户行为跟踪场景,在服务端异步记录用户的关键行为及相关属性,可为用户分等级运营和精准营销打下基础。 实现原理: 将非核心的业务流程异步化可以减少系统的响应时间,提高吞吐量,是系统优化的常用手段。RocketMQ 提供了高效的通信机制,业务系统使用起来非常方便。 总结与未来展望 随着互联网技术在金融领域的不断渗透和金融创新业态的发展,公募基金互联网业务需要不断进行流程改造、模式创新及服务能力升级,在优化场景体验的基础上,持续打造基于平台、场景和产品三位一体的互联网服务平台。 Matrix 经过一年多的建设,目前已具备多渠道统一接入、第三方生态互联互通、基金特色交易场景化封装等功能特性。Matrix 通过建设有品质、有温度的陪伴,从技术上和体验上,让用户理解风险,理解投资,进而为客户持续创造价值。 在未来,将会有更多的合作伙伴接入 Matrix ,希望我们能一起畅游在创新科技的星辰大海中,合作共赢。
作者:伍振河、曾志
#行业实践

2021年6月16日

RocketMQ 千锤百炼--哈啰在分布式消息治理和微服务治理中的实践
背景 哈啰已进化为包括两轮出行(哈啰单车、哈啰助力车、哈啰电动车、小哈换电)、四轮出行(哈啰顺风车、全网叫车、哈啰打车)等的综合化移动出行平台,并向酒店、到店团购等众多本地生活化生态探索。 随着公司业务的不断发展,流量也在不断增长。我们发现生产中的一些重大事故,往往是被突发的流量冲跨的,对流量的治理和防护,保障系统高可用就尤为重要。 本文就哈啰在消息流量和微服务调用的治理中踩过的坑、积累的经验进行分享。 作者介绍 梁勇 ( 老梁 ) ,《 RocketMQ 实战与进阶》专栏联合作者、参与了《 RocketMQ 技术内幕》审稿工作。ArchSummit 全球架构师大会讲师、QCon 案例研习社讲师。 当前主要在后端中间件方向,在公众号【瓜农老梁】已陆续发表百余篇源码实战类文章,涵盖 RocketMQ 系列、Kafka 系列、GRPC 系列、Nacosl 系列、Sentinel 系列、Java NIO 系列。目前就职于哈啰出行,任职高级技术专家。 聊聊治理这件事 开始之前先聊聊治理这件事情,下面是老梁个人理解: 治理在干一件什么事? 让我们的环境变得美好一些 需要知道哪些地方还不够好? 以往经验 用户反馈 业内对比 还需要知道是不是一直都是好的? 监控跟踪 告警通知 不好的时候如何再让其变好? 治理措施 应急方案 目录 1. 打造分布式消息治理平台 2. RocketMQ 实战踩坑和解决 3. 打造微服务高可用治理平台 背景 裸奔的 RabbitMQ 公司之前使用 RabbitMQ ,下面在使用 RabbitMQ 时的痛点,其中很多事故由于 RabbitMQ 集群限流引起的。 积压过多是清理还是不清理?这是个问题,我再想想。 积压过多触发集群流控?那是真的影响业务了。 想消费前两天的数据?请您重发一遍吧。 要统计哪些服务接入了?您要多等等了,我得去捞IP看看。 有没有使用风险比如大消息?这个我猜猜。 裸奔的服务 曾经有这么一个故障,多个业务共用一个数据库。在一次晚高峰流量陡增,把数据库打挂了。 数据库单机升级到最高配依然无法解决 重启后缓一缓,不一会就又被打挂了 如此循环着、煎熬着、默默等待着高峰过去 思考:无论消息还是服务都需要完善的治理措施 打造分布式消息治理平台 设计指南 哪些是我们的关键指标,哪些是我们的次要指标,这是消息治理的首要问题。 设计目标 旨在屏蔽底层各个中间件( RocketMQ / Kafka )的复杂性,通过唯一标识动态路由消息。同时打造集资源管控、检索、监控、告警、巡检、容灾、可视化运维等一体化的消息治理平台,保障消息中间件平稳健康运行。 消息治理平台设计需要考虑的点 提供简单易用 API 有哪些关键点能衡量客户端的使用没有安全隐患 有哪些关键指标能衡量集群健康不健康 有哪些常用的用户/运维操作将其可视化 有哪些措施应对这些不健康 尽可能简单易用 设计指南 把复杂的问题搞简单,那是能耐。 极简统一 API 提供统一的 SDK 封装了( Kafka / RocketMQ )两种消息中间件。 一次申请 主题消费组自动创建不适合生产环境,自动创建会导致失控,不利于整个生命周期管理和集群稳定。需要对申请流程进行控制,但是应尽可能简单。例如:一次申请各个环境均生效、生成关联告警规则等。 客户端治理 设计指南 监控客户端使用是否规范,找到合适的措施治理 场景回放 场景一 瞬时流量与集群的流控 假设现在集群 Tps 有 1 万,瞬时翻到 2 万甚至更多,这种过度陡增的流量极有可能引发集群流控。针对这类场景需监控客户端的发送速度,在满足速度和陡增幅度阈值后将发送变的平缓一些。 场景二 大消息与集群抖动 当客户端发送大消息时,例如:发送几百KB甚至几兆的消息,可能造成 IO 时间过长与集群抖动。针对这类场景治理需监控发送消息的大小,我们采取通过事后巡检的方式识别出大消息的服务,推动使用同学压缩或重构,消息控制在 10KB 以内。 场景三 过低客户端版本 随着功能的迭代 SDK 的版本也会升级,变更除了功能外还有可能引入风险。当使用过低的版本时一个是功能不能得到支持,另外一个是也可能存在安全隐患。为了解 SDK 使用情况,可以采取将 SDK 版本上报,通过巡检的方式推动使用同学升级。 场景四 消费流量摘除和恢复 消费流量摘除和恢复通常有以下使用场景,第一个是发布应用时需要先摘流量,另外一个是问题定位时希望先把流量摘除掉再去排查。为了支持这种场景,需要在客户端监听摘除/恢复事件,将消费暂停和恢复。 场景五 发送/消费耗时检测 发送/消费一条消息用了多久,通过监控耗时情况,巡检摸排出性能过低的应用,针对性推动改造达到提升性能的目的。 场景六 提升排查定位效率 在排查问题时,往往需要检索发了什么消息、存在哪里、什么时候消费的等消息生命周期相关的内容。这部分可以通过 msgId 在消息内部将生命周期串联起来。另外是通过在消息头部埋入 rpcId / traceId 类似链路标识,在一次请求中将消息串起来。 治理措施提炼 需要的监控信息 发送/消费速度 发送/消费耗时 消息大小 节点信息 链路标识 版本信息 常用治理措施 定期巡检:有了埋点信息可以通过巡检将有风险的应用找出来。例如发送/消费耗时大于 800 ms、消息大小大于 10 KB、版本小于特定版本等。 发送平滑:例如检测到瞬时流量满足 1 万而且陡增了 2 倍以上,可以通过预热的方式将瞬时流量变的平滑一些。 消费限流:当第三方接口需要限流时,可以对消费的流量进行限流,这部分可以结合高可用框架实现。 消费摘除:通过监听摘除事件将消费客户端关闭和恢复。 主题/消费组治理 设计指南 监控主题消费组资源使用情况 场景回放 场景一 消费积压对业务的影响 有些业务场景对消费堆积很敏感,有些业务对积压不敏感,只要后面追上来消费掉即可。例如单车开锁是秒级的事情,而信息汇总相关的批处理场景对积压不敏感。通过采集消费积压指标,对满足阈值的应用采取实时告警的方式通知到应用负责的同学,让他们实时掌握消费情况。 场景二 消费/发送速度的影响 发送/消费速度跌零告警?有些场景速度不能跌零,如果跌零意味着业务出现异常。通过采集速度指标,对满足阈值的应用实时告警。 场景三 消费节点掉线 消费节点掉线需要通知给应用负责的同学,这类需要采集注册节点信息,当掉线时能实时触发告警通知。 场景四 发送/消费不均衡 发送/消费的不均衡往往影响其性能。记得有一次咨询时有同学将发送消息的key设置成常量,默认按照 key 进行 hash 选择分区,所有的消息进入了一个分区里,这个性能是无论如何也上不来的。另外还要检测各个分区的消费积压情况,出现过度不均衡时触发实时告警通知。 治理措施提炼 需要的监控信息 发送/消费速度 发送分区详情 消费各分区积压 消费组积压 注册节点信息 常用治理措施 实时告警:对消费积压、发送/消费速度、节点掉线、分区不均衡进行实时告警通知。 提升性能:对于有消费积压不能满足需求,可以通过增加拉取线程、消费线程、增加分区数量等措施加以提升。 自助排查:提供多维度检索工具,例如通过时间范围、msgId 检索、链路系统等多维度检索消息生命周期。 集群健康治理 设计指南 度量集群健康的核心指标有哪些? 场景回放 场景一 集群健康检测 集群健康检测回答一个问题:这个集群是不是好的。通过检测集群节点数量、集群中每个节点心跳、集群写入Tps水位、集群消费Tps水位都是在解决这个问题。 场景二 集群的稳定性 集群流控往往体现出集群性能的不足,集群抖动也会引发客户端发送超时。通过采集集群中每个节点心跳耗时情况、集群写入Tps水位的变化率来掌握集群是否稳定。 场景三 集群的高可用 高可用主要针对极端场景中导致某个可用区不可用、或者集群上某些主题和消费组异常需要有一些针对性的措施。例如:MQ 可以通过同城跨可用区主从交叉部署、动态将主题和消费组迁移到灾备集群、多活等方式进行解决。 治理措施提炼 需要的监控信息 集群节点数量采集 集群节点心跳耗时 集群写入 Tps 的水位 集群消费 Tps 的水位 集群写入 Tps 的变化率 常用治理措施 定期巡检:对集群 Tps 水位、硬件水位定期巡检。 容灾措施:同城跨可用区主从交叉部署、容灾动态迁移到灾备集群、异地多活。 集群调优:系统版本/参数、集群参数调优。 集群分类:按业务线分类、按核心/非核心服务分类。 最核心指标聚焦 如果说这些关键指标中哪一个最重要?我会选择集群中每个节点的心跳检测,即:响应时间( RT ),下面看看影响 RT 可能哪些原因。 关于告警 监控指标大多是秒级探测 触发阈值的告警推送到公司统一告警系统、实时通知 巡检的风险通知推送到公司巡检系统、每周汇总通知 消息平台图示 架构图 看板图示 多维度:集群维度、应用维度 全聚合:关键指标全聚合 RocketMQ 实战中踩过的坑和解决方案 行动指南 我们总会遇到坑,遇到就把它填了。 1. RocketMQ 集群 CPU 毛刺 问题描述 RocketMQ 从节点、主节点频繁 CPU 飙高,很明显的毛刺,很多次从节点直接挂掉了。 只有系统日志有错误提示 20200316T17:56:07.505715+08:00 VECS0xxxx kernel:[] ? __alloc_pages_nodemask+0x7e1/0x96020200316T17:56:07.505717+08:00 VECS0xxxx kernel: java: page allocation failure. order:0, mode:0x2020200316T17:56:07.505719+08:00 VECS0xxxx kernel: Pid: 12845, comm: java Not tainted 2.6.32754.17.1.el6.x86_64 120200316T17:56:07.505721+08:00 VECS0xxxx kernel: Call Trace:20200316T17:56:07.505724+08:00 VECS0xxxx kernel:[] ? __alloc_pages_nodemask+0x7e1/0x96020200316T17:56:07.505726+08:00 VECS0xxxx kernel: [] ? dev_queue_xmit+0xd0/0x36020200316T17:56:07.505729+08:00 VECS0xxxx kernel: [] ? ip_finish_output+0x192/0x38020200316T17:56:07.505732+08:00 VECS0xxxx kernel: [] ? 各种调试系统参数只能减缓但是不能根除,依然毛刺超过 50% 解决方案 将集群所有系统升级从 centos 6 升级到 centos 7 ,内核版本也从从 2.6 升级到 3.10 ,CPU 毛刺消失。 2. RocketMQ 集群线上延迟消息失效 问题描述 RocketMQ 社区版默认本支持 18 个延迟级别,每个级别在设定的时间都被会消费者准确消费到。为此也专门测试过消费的间隔是不是准确,测试结果显示很准确。然而,如此准确的特性居然出问题了,接到业务同学报告线上某个集群延迟消息消费不到,诡异! 解决方案 将" delayOffset.json "和" consumequeue / SCHEDULE_TOPIC_XXXX "移到其他目录,相当于删除;逐台重启 broker 节点。重启结束后,经过验证,延迟消息功能正常发送和消费。 打造微服务高可用治理平台 设计指南 哪些是我们的核心服务,哪些是我们的非核心服务,这是服务治理的首要问题 设计目标 服务能应对突如其来的陡增流量,尤其保障核心服务的平稳运行。 应用分级和分组部署 应用分级 根据用户和业务影响两个纬度来进行评估设定的,将应用分成了四个等级。 业务影响:应用故障时影响的业务范围 用户影响:应用故障时影响的用户数量 S1:核心产品,产生故障会引起外部用户无法使用或造成较大资损,比如主营业务核心链路,如单车、助力车开关锁、顺风车的发单和接单核心链路,以及其核心链路强依赖的应用。 S2: 不直接影响交易,但关系到前台业务重要配置的管理与维护或业务后台处理的功能。 S3: 服务故障对用户或核心产品逻辑影响非常小,且对主要业务没影响,或量较小的新业务;面向内部用户使用的重要工具,不直接影响业务,但相关管理功能对前台业务影响也较小。 S4: 面向内部用户使用,不直接影响业务,或后续需要推动下线的系统。 分组部署 S1 服务是公司的核心服务,是重点保障的对象,需保障其不被非核心服务流量意外冲击。 S1 服务分组部署,分为 Stable 和 Standalone 两套环境 非核心服务调用 S1 服务流量路由到 Standalone 环境 S1 服务调用非核心服务需配置熔断策略 多种限流熔断能力建设 我们建设的高可用平台能力 部分限流效果图 预热图示 排队等待 预热+排队 高可用平台图示 中间件全部接入 动态配置实时生效 每个资源和 IP 节点详细流量 总结 哪些是我们的关键指标,哪些是我们的次要指标,这是消息治理的首要问题 哪些是我们的核心服务,哪些是我们的非核心服务,这是服务治理的首要问题 源码&实战 是一种比较好的工作学习方法。
作者:梁勇
#行业实践

2021年6月15日

阿里云中间件首席架构师李小平:企业为什么需要云原生?
前天我参加了信通院的云原生产业大会,参加会议的企业非常多,并且来自于各行各业,我在会场上非常感慨。我想起 2019 年的时候,我在搜索引擎上搜索“云原生”这个词,那时的搜索频率还比较低,而 2019 年又是云原生在国内开始飞速发展的一年。而今年的云原生会场上,已经有非常多的企业来参加,这些企业在技术、产品、生态中都在应用云原生,所以说,整个云原生已经从最开始的技术变成了行业,现在发展成了比较大的产业,并且这个产业的规模每年以非常快的速度在增长。 在今天,可能有很多咨询机构、企业,或者是个人开发者都在解读云原生,也许很多人对云原生都有比较深入的认识了。大家都可以认同的是,云原生肯定与云有关,但是它改变了什么,为企业带来什么价值呢?最核心的点应该是可以改变企业的应用架构;还有一种可能是不改变应用架构,只是把整个运维体系基于云原生进行重塑。但所有的这些,背后的目的都是为了加速企业的价值创造过程,简单的说,和制造企业改良生产线是一样的,核心点就是改良我们作为软件企业的生产线。 阿里在云原生的实践从 2006 年就开始了。我们在做云原生的过程中积累了很多经验,我们认为,今天云原生对于企业数字创新主要提供了多个价值: 一是资源弹性。弹性这个词大家很容易理解,实际上弹性有不同的层面。比如说基于虚拟机的弹性,提供的弹性能力是分钟级的。如果基于这些技术的应用是毫秒级的,那么分钟级只解决了资源弹性问题,整个应用高可用问题还需要进一步解决。如果说弹性到了应用的层面,到了毫秒级,高可用问题也得到一定程度的解决。 除此以外,系统的稳定性也是大家非常关注的方面。云原生就是把整个软件构造过程中非功能性特性拉出来放到云原生产品上去,帮助应用开发从非功能性处理过程中解脱出来,更多的专注在功能性。同样的,云原生有很多工具理念,可以让我们变得更好,整个软件开发从代码到上线的时间大幅缩短。同样的,今天在基于云原生可观测性上面我们会积累非常多的数据,这些数据可以结合机器学习这些能力,帮助我们改善企业的用户体验。这些对于业务来讲会带来比较大的价值。 阿里云原生的实践历程 今天,云原生在 CNCF、国内相关的开源、还有三方组织的推动下,可以使得一家企业在做技术选型的时候有非常多的选项。大家通常会面临一个问题,在这么多选择里面,要真正达到生产可用的目的到底选谁?特别是当我们的业务需要在非常短的时间内就上线,在业务高速发展的阶段,我们应该选什么样的架构,选什么样的开源开放的产品,这个是摆在广大企业技术决策者以及架构师面前的难题。 在云原生领域中,阿里云是相对比较早开始做自研的。从 2006 年到 2009 年互联网的中间件开始发展,到阿里云正式成立,整个过程中我们通过云原生解决很多业务问题。通过应用云原生相关技术,从早期很好地支持了淘宝的高速发展,到了 2015 年以后很好地支持了阿里的中台建设,以及到今天随着阿里巴巴整个生产系统、核心系统全部 100% 上云,这个过程中我们运用的云原生技术,像容器技术、微服务技术支持的规模都是百万级以上。 相关调研显示,这样的云原生落地规模在全球范围内都是非常领先的。实际上,对于很多企业来讲,也许用不到这些规模,但是阿里通过解决这样的大规模下的性能、稳定性问题,积累了非常多的硬核技术,最终能够把这些技术转变成了产品,通过阿里云对外输出,服务于各行各业的广大客户。 我们认为,云原生对于整个软件的改变,或者对软件公司的开发流程的改变是非常深刻的。首先 K8s 已经变成了软件交付的标准界面,它改变的不止是运维,而是从 CICD 到后续发布上线整个生产链条。由于所有生产流程得到改变,以及很多企业通过云原生技术重塑了软件架构,使得软件架构从传统架构变成了新的、我们称之为现代化的应用架构,因此云原生可以通过这种生产工具的改良进一步改变企业的生产关系,最终影响企业,使得企业在软件开发过程中得到了极大的提速。 阿里云在云原生实践过程中,积累了很强的技术竞争力,体现在这些方面: 一、我们有非常多领先的技术解决云原生领域里面的稳定性问题、可靠性问题,大规模下的高并发问题等。同时,我们会把所有的这些技术通过开源开放的形式输出。我们知道,在云原生的世界,企业需要的是开源开放的技术,而不是被像阿里这样单独一个厂商所锁定的技术。这个过程中我们基于开源开放技术标准积累了很多产品的硬核能力。在产品上,除了大家看到的基于云原生应用架构里,还包括云原生数据库、云原生大数据等。 在云原生相关的领域有比较多的测评,在这些测评里,例如阿里云容器产品 ACK,在去年 Gartner 评测中拿到满分,全球厂商中只有两个厂商拿到满分,阿里云是其中之一。今年,阿里云再次入选 Gartner 容器竞争格局。在新兴的计算形态领域中,今年阿里云进入 Forrester FaaS 领导者象限,函数计算获得了全球 FaaS 产品最高分。 在可观测性里,阿里云代表国内云厂商进入 Gartner APM 象限。所有这些三方评估从另外一个层面反映了阿里云产品的能力。容器架构上,我们基于开源开放的 K8s 技术体系,基于阿里云的硬件做深度的优化,在比较多的领域和场景里为广大 K8s 应用提供服务。我们把在 K8s 集群里面超大规模集群管理的能力输出到 ACK 产品里面,使得阿里云的客户在管理集群的时候,可以摆脱大规模集群的管理复杂性问题。 比如完美日记,作为美妆行业的独角兽公司,他们的业务发展速度非常快,但在业务快速发展过程中,他们面临的问题就是在大促的场景中怎么更好地预留资源,以及在大促时怎么样比较好地解决新上线的功能,以及需求的稳定性问题。在这个过程中,他们利用 PTS 作为压测,所有应用跑在 ACK 平台上面,通过压测模拟大促的流量,从而能够把整个大促从需要投入较大的状态提升到具备可以常态化的做大促压测的能力,也通过这个能力使得系统稳定性相关问题得到快速收敛。 云原生中间件 从微服务、消息到各种应用工具以外,根据企业常见的 IT 场景,云原生中间件也提供了很多解决方案。阿里云中间件诞生于集团内的大规模调用场景,同时兼容开源,并且融入了更多产品能力,例如在整个大促过程中表现优异的可观测性、高可用能力等,都属于云原生中间件产品体系。 同样在中间件领域里,我们也和较多企业客户有相应的合作。畅捷通是一家做 SaaS 的企业,迄今已经为超过四百万的小微企业做了云管。ToB 类型的应用复杂度较高,最大的问题就是整个软件的发布频率是非常快的,怎么样在高频软件发布下面能够比较好的解决软件的各种 BUG,或者解决设计上的不足带来的稳定性的问题,这是在前期探讨过程中畅捷通提出来的关注点。通过应用云原生中间件,不仅解决了整个应用的可观测性问题,并且让应用具备 360 度无死角可观测能力,通过应用探测能够快速发现在整个压测过程中各种可能的不稳定风险,从而使得相应风险得到快速的收敛。 Serverless 很多学术机构在 Serverless 领域深入研究,我们预感 Serverless 极有可能会成为下一代主流技术趋势。阿里云在 Serverless 领域里做到业界领先的毫秒级计费,以及在整个阿里云底层做深度优化,使客户的应用真正达到了智能的弹性、极致的运维和大幅提升开发效率。阿里云也和许多企业客户达成深度合作,进行 Serverless 落地实践,通过帮助客户将应用迁到 Serverless 技术体系上,达到比较快的应用部署;同时,把应用的稳定性问题、运维都委托给 Serverless 这样的云产品去解决。 解决方案 云原生在快速发展过程中,只有通过不断的技术创新、产品创新,才有可能使得云原生技术更好的服务于广大的企业客户。今天,阿里云对外发布四大解决方案:全链路压测解决方案、异地多活解决方案、资源混部解决方案、可观测解决方案。这些解决方案可以高效地解决在传统领域里还没有很好解决的问题。比如全链路压测,大家都知道全链路压测是个好东西,比较大的问题是在应用压测过程中使应用改造最小,甚至不要做改造,所以这次阿里云升级的全链路压测就可以帮助企业应用解决这些问题。 今天企业在不断深入地使用云以后,不管公有云还是专有云上,都会碰到整体 CPU 利用率不高的问题,混部就使得各种离线任务和在线任务可以部署在一起,各自享用资源调度的优势,使得整体机房的 CPU 利用率得到比较大的提升。在这个过程中要解决混部之后带来的稳定性问题、资源占用问题。阿里是比较早地应用大规模的混部,像支撑电商双十一的云原生产品。今天我们也是把混部能力变成解决方案对外输出。 大家都知道,阿里是比较早实现了单元化的架构,通过单元化架构实现了多活。今天我们把单元化整体的架构能力作为多活的解决方案。同时,这样的多活不仅可以支持自有数据中心、私有云的场景,也能够支持公有云和混合云场景实现整个应用的多活。 可观测性一直都是大家特别关注的话题,因为通过可观测性使得我们可以主动发现在系统的运行过程中可能出现的各类风险。今天,阿里云升级的可观测性方案包括从拨测到各种前端的性能监控,一直延伸到后端应用,甚至延伸到云服务里。 产品升级 除了解决方案的创新以外,我们在相应的云原生产品上面也做了比较多的升级。容器 ACK 备份容灾中心全新发布,为容器用户提供集群、应用和数据的完整性保护: 1、支持自动分析应用依赖的元数据及存储,实现秒级创建应用+数据的一致性快照; 2、支持创建备份计划,自动按预设时间点创建备份; 3、完全兼容 Kubernetes,并支持多集群、多地域、跨数据中心进行备份和恢复。 容器镜像 ACR 发布企业级 Serverless 构建服务,大幅提升云原生制品的构建效率和体验: 1、支持多操作系统、多架构镜像的矩阵构建,支持大规模并发任务构建。 2、支持多级缓存的构建加速,平均构建提速 30%。 3、支持自动构建加速镜像,实现 AI 等大镜像秒级按需加载,平均启动时间减少 60 %。 在微服务领域,越来越多的应用考虑采用服务网格技术。用户希望服务网格在开源技术之上有更强的微服务治理能力,因此阿里云推出专业版 ASM Pro,具备增强多协议支持,提升动态扩展能力,精细化服务治理,完善零信任安全体系。专业版相比去年发布的普通版,在性能及规模上均有显著提升,与开源的差异化竞争力进一步增强,降低用户在生产环境落地服务网格的门槛。 Gartner 预测,未来事件驱动将成为业务开发的主流架构。企业客户上云过程中对于低代码、无服务器弹性应用架构,如何轻量集成众多异构云服务的数据流有着明确的痛点和诉求。基于此趋势,阿里云发布了事件总线 EventBridge 这款产品,其目标在于统一阿里云云服务、第三方 SaaS 厂商、用户自定义的事件标准,通过标准、弹性、轻量的核心能力帮助用户快速低成本获取并处理海量事件,驱动业务开发。 在过去的一段时间,我们对 EventBridge 的产品能力做了进一步的扩充和升级: 在事件生态集成的规模方面,新增 60+ 云产品官方事件源接入,涵盖计算、存储、网络、数据库等主流云产品; 在事件触达和处理方式上,内置了十多种过滤匹配转换逻辑,并且新增了跨网络、跨地域、跨账号等深度触达方式,方便企业大客户做深层次的安全、隔离等定制; 在此基础上,阿里云 EventBridge 首次推出事件驱动应用中心,内置常见的事件驱动应用模板,用户无需代码和部署即可简单配置完成常见的事件 ETL 处理、数据同步等场景功能。 阿里云拥有最广泛的云原生客户群体。随着更多的企业客户上云,将有更多复杂的场景,对于云原生技术、产品以及云原生理念提出更高的要求。阿里云希望跟社会各界的朋友一起在云原生领域里面做更多的探索,希望通过云原生技术,真正为企业带来更多的业务价值,助力企业整体的业务创新。
作者:李小平
#行业实践 #云原生

2021年6月14日

云原生消息队列RocketMQ:为什么我们选择 RocketMQ
说起消息队列,ActiveMQ、RabbitMQ、RocketMQ、Kafka、Pulsar 等纷纷涌入我们的脑海中, 在如此众多的开源消息队列产品中,作为一名合格的架构师如何给出高性价比的方案呢?商业化的产品暂不纳入选项中。 接下来我将从选型要素、RocketMQ 的优势两个方面解释为什么选择 RocketMQ 。 选型要素 首先从公司、消息队列服务提供者(一般是中间件团队)、最终用户三个角度来简单总结分析。 一、从公司层面看, 关注如下几点: 1. 技术成本 技术成本,一般包含服务器成本、二次开发成本、后期维护成本等,言而总之:都是钱。 服务器目前基本都使用云服务器,不同的云厂商的相同配置的服务器性能也有一定差异, 服务器成本一般需要了解:云厂商机器性能、云厂商优惠、所需服务器配置、服务器台数、单台服务器目前的价格、单台服务器优惠后的价格等。 2. 人力成本 人力成本,一般包含现有技术人员成本、新人招聘成本。 新的技术选型对于目前的技术人员接受程度怎么样,学习的难易程度怎样等,都是需要考虑的。如果太难的话,上线周期会变长、业务需求实现速度慢,甚至有人直接离职。 新人招聘成本,一般招聘一个新人有如下几个过程:简历筛选、预约面试、数轮面试、发 offer 、接受 offer 、正式入职、试用期、转正。这中间涉及到猎头成本、人力资源沟通成本、面试成本、新人入职后环境适应成本等等。 3. 其他 目前处于不同阶段的互联网公司对于技术成本、人力成本有着不一样的要求,但是很多有一定规模的公司实际上还是用“买买买”的心态来对待的:只要业务发展快速,买服务器、招人都不是问题,如果成本高了就做技术降成本、裁员。这不仅是员工之痛,也是业务之痛,更是公司之痛。 二、从中间件组层面看, 关注如下几点: 1. 稳定 公司级的服务首要的一点就是稳定。拥有稳定的组件、稳定的服务,业务才能有条不紊的进行。所以说,无论什么时候, 稳定都是王道。 2. 功能支持 不同的业务场景需要的功能也不尽相同,通常我们会考虑重试、死信机制,位点重置,定时延迟消息、事物消息,主从切换,权限控制等方面。 3. 性能 目前包含写入延迟和吞吐。 4. 管理平台 首先需要满足最终用户接入、查看、排障,管理员管控 topic 、消费者方便等。管理平台有现成的最好,方便二次开发 。 5. 监控、报警 监控报警是否完善、是否方便接入公司内部自研体系,或者行业的事实标准 Prometheus 。 6. 运维 & 支持 & 开源社区 如果产品上线后, 大部分时间,我们都是在做运维&支持。运维包含服务部署、迁移、服务升级、解决系统 Bug 、用户使用答疑、管理平台和监控报警平台升级等。 7. 其他 我们除了依赖自身以外,也可以借助社区的力量,同一个问题可能别人遇到过并且提交过 PR ,已经得到解决,我们就可以以此作为借鉴。所以社区的活跃情况也是非常重要的考虑。 三、从最终用户(一般包含业务后端研发以及他们的 Leader )看 1. 稳定性 对于业务的研发和他们的 Leader ,他们的核心任务是实现业务逻辑。如果一个服务三天两头总是有问题, 对于他们来说是比较致命的,所以稳定性是比较核心的一部分。 2. 改造现有项目的难度 旧项目改造其实是业务研发接入新中间件实际操作最多的部分。 3. 新项目接入是否便捷 是否便捷接入跟他们的工作量有着直接的关联。 4. 与目前的 App 微服务框架兼容怎样 新项目的接入和公司微服务框架兼容都比较容易。一般中间件在提供服务时都会考虑业务研发接入的便利性。 RocketMQ 的优势 下面将按照选项要素的要求, 分析 RocketMQ 在这方面的优势。 一、RocketMQ 如何解决和友好面对公司层面的诉求 1. 技术成本 就技术成熟度而言,在经历阿里双十一数万亿洪峰、微众银行、民生银行、蚂蚁金服、平安、字节跳动、快手、美团、京东、网易等各种行业大厂的考验后,就不言而喻了。 RocketMQ 对于服务器的配置要求不高, 普通的云主机都可以。曾经我们验证 8C 16G 500G SSD 的 2 主 2 从的集群,发送 tps 可以到 4~5w ,消费 tps 峰值 20w +,稳定在 8w~9w 。并且,还能根据业务实际的需求无感的横向扩展。 综合而言, 技术成本相对可控且人才多。 2. 人力成本 人力成本主要是现有的技术人员的学习成本、招新人的成本。 RocketMQ 是 java 开发的,代码也非常稳定、有条理,各个版本之间除了功能有差异之外,Api 、传输协议几乎没有太多变化,对于升级而言也更加方便。 java 也是目前中间件采用的比较主流的语言,使用的技术人员非常广泛。RocketMQ 在金融行业比如:微众银行、民生银行、蚂蚁金服、平安; 其他行业公司,比如阿里、字节跳动、快手、美团、京东、网易等与大量中小企业都在使用,候选人范围相对较大。 RocketMQ 社区也比较活跃,钉钉群、微信群、QQ 群众多,社区文档非常丰富和完善,原理剖析视频、文档也非常多,非常易于学习和入门。 下面是钉钉群,欢迎大家加群留言、答疑。 对于 java 方面的消息队列方面的人才相比 C/C++、C、Python、Go 等还是更多的:主流的 Kafka 是 scala + java、pulsar 是 java ,对于招聘也有极大的优势。 综合而言,RocketMQ 技术员对于人力成本比较友好。 二、从中间件组层面看,RocketMQ 是如何提供优秀的能力,为业务保驾护航呢? 1. 稳定性 金融级可靠、阿里双十一稳定支持万亿级消息洪峰,在笔者之前所在公司也有过 2 年+零事故的佳绩。 2. 功能丰富,支持的场景众多 重试、死信机制,友好、无感的业务重试机制。 顺序消息、事物消息 万级 Topic 数量支持 消息过滤 消息轨迹追踪 主从自动切换 原生支持 Prometheus 监控 原生支持易用管理平台:RocketMQ Console 访问权限控制(ACL) 3. 性能 RocketMQ 可以支持 99.9% 的写入延迟在 2 ms ,其他的开源消息队列中间件基本都是大于 5 ms ;目前大部分消息队列中间间都支持横向扩展,吞吐上横向扩展几乎都可以满足。RocketMQ 的在滴滴做的性能测试: _ _, 大家参考。 发送、消费 tps 和 kafka 一个数量级,Topic 数量剧增对于性能影响较小。 4. 管理平台 RocketMQ Console 原生支持: 5. 监控、报警 RocketMQ Exporter 原生支持 Prometheus: 6. 运维 & 支持 & 开源社区 无 zk 等第三方依赖,开箱即用 社区钉钉群、微信群、QQ 群非常活跃,钉钉群、微信群有问必答。 社区最近新来一位小姐姐 Commiter ,团队也在不断壮大。 综合看来,RocketMQ 稳定、可靠、性能好,开箱即用,不依赖 Zookeeper ,系统的稳定性更高,复杂度更小。监控报警等周边设施完善,场景支持全,社区活跃、文档丰富,是中间件团队的不二之选。 三、对于最终用户:业务研发、业务研发 Leader,他们的核心担忧是提供的技术是否稳定可靠、是否快速方便的接入 从中间件组层面看这个问题时,RocketMQ 稳定、可靠,那对于接入是否友好呢? RocketMQ 提供 java 原生客户端、Spring 客户端,C++ 客户端、Python 客户端、Go 客户端等多类型、多语言的客户端,对于各种项目都可以统一接入。 微服务框架中 Spring Cloud 基本已经成为事实标准,RocketMQ 支持 Spring boot Starter 和 Spring Cloud Function 等多种方式融合入微服务框架,对于 Spring 体系支持更加方便快捷。 Kafka vs RocketMQ 实际中,很多人应该面临过 RocketMQ vs Kafka ,Kafka 适合对于延迟不敏感、批量型、Topic 数量可控、对于消息丢失不敏感的场景。比如大数据场景的 MySQL2Hive、MySQL2Flink 的数据流通道,日志数据流通道等。 RocketMQ 适用于金融转账消息、订单状态变更消息、手机消息 Push 等业务场景。这些场景 Topic 数量通常过万,对于消息延迟和丢失极度敏感,数据通常是论条处理。对于海量数据的问题,一般地横向扩容完全可以解决。 合适的场景选择合适的产品,万能的产品是不存在的,都是折中,都是取舍。 作者介绍 李伟,Apache RocketMQ 社区 Commiter ,Python 客户端项目负责人, Apache RocketMQ 北京社区联合发起人,Apache Doris Contributor 。目前就职于腾讯,主要负责 OLAP 数据库开发,对分布式存储系统设计和研发有丰富经验,也热衷于知识分享和社区活动。 RocketMQ 学习资料 阿里云知行实验室提供一系列的 RocketMQ 在线实操环境,包含操作文档、ubuntu 实验环境,大家随时尝试玩玩: Apache RocketMQ 开源入门最佳实践: 《RocketMQ 分布式消息中间件:核心原理与最佳实践》随书实战:_ 在 Spring 生态中玩转 RocketMQ: 实验预览图如下: 其他资源 RocketMQ vs. ActiveMQ vs. Kafka: RocketMQ 源码: RocketMQ Exporter 源码: RocketMQ Spring 源码: RocketMQ C++ 客户端源码: RocketMQ Python 客户端源码: RocketMQ Go 客户端源码: RocketMQ Console 源码: RocketMQ Flink Connector 源码: RocketMQ 如何保证消息可靠: 大揭秘!RocketMQ 如何管理消费进度:
作者:李伟
#行业实践

2021年4月22日

使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
导读:本文将 rocktmqspringboot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 springbootstarter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 springbootstarter 工具包来配置,发送和消费 RocketMQ 消息。 在 Spring 生态中玩转 RocketMQ 系列文章: 本文配套可交互教程已登录阿里云知行动手实验室,PC 端登录 在浏览器中立即体验。 通过本文,您将了解到: Spring 的消息框架介绍 rocketmqspringboot 具体实现 使用示例 前言 上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。 随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布,它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 springbootwebstarter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可以使用约定的配置并且简化部署,受到越来越多的开发者的欢迎。 Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成: 其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。 为了利用 Spring Boot 的快速开发和让用户能够更灵活地使用 RocketMQ 消息客户端,Apache RocketMQ 社区推出了 springbootstarter 实现。随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布,近期升级了相关的 springboot 代码,通过注解方式支持分布式事务的回查和事务消息的发送。 本文将对当前的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 springbootstarter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 springbootstarter 工具包来配置,发送和消费 RocketMQ 消息。 Spring 中的消息框架 顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。 1. Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。 单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 如果有兴趣深入的了解 Spring Messaging 及针对不同的消息产品的使用,推荐阅读这个文件。参考 Spring Messaging 的既有实现,RocketMQ 的 springbootstarter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特点提供了相应的 API(如顺序、异步和事务半消息等)。 2. Spring Cloud Stream Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下: 该图片引自 spring cloud stream Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的Binder。 这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。 Binder SPI 的方式来让消息中间件产品使用可扩展的 API 来编写相应的 Binder,并集成到 Spring Cloud Steam 环境,目前 RocketMQ 还没有提供相关的 Binder,我们计划在下一步将完善这一功能,也希望社区里有这方面经验的同学积极尝试,贡献 PR 或建议。 springbootstarter的实现 在开始的时候我们已经知道,spring boot starter 构造的启动器对于使用者是非常方便的,使用者只要在 pom.xml引入starter 的依赖定义,相应的编译,运行和部署功能就全部自动引入。因此常用的开源组件都会为 Spring 的用户提供一个 springbootstarter 封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下 RocketMQ(客户端)的 starter 实现过程。 1. springbootstarter 的实现步骤 对于一个 springbootstarter 实现需要包含如下几个部分: 1)在 pom.xml 的定义 定义最终要生成的 starter 组件信息 org.apache.rocketmq springbootstarterrocketmq 1.0.0SNAPSHOT 定义依赖包 它分为两个部分:Spring 自身的依赖包和 RocketMQ 的依赖包。 2)配置文件类 定义应用属性配置文件类 RocketMQProperties,这个 Bean 定义一组默认的属性值。用户在使用最终的 starter 时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,而是 springboot 应用中对应的配置文件:src/main/resources/application.properties。 3)定义自动加载类 定义 src/resources/METAINF/spring.factories 文件中的自动加载类, 其目的是让 spring boot 更具文中中所指定的自动化配置类来自动初始化相关的 Bean、Component 或 Service,它的内容如下: org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration 在 RocketMQAutoConfiguration 类的具体实现中,定义开放给用户直接使用的 Bean 对象包括: RocketMQProperties 加载应用属性配置文件的处理类; RocketMQTemplate 发送端用户发送消息的发送模板类; ListenerContainerConfiguration 容器 Bean 负责发现和注册消费端消费实现接口类,这个类要求:由 @RocketMQMessageListener 注解标注;实现 RocketMQListener 泛化接口。 4)最后具体地进行 RpcketMQ 相关的封装 在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对 Spring Messaging 接口的兼容方式。 2. 消息发送端实现 1)普通发送端 发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调用关系图: 为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如异步,单向和顺序等方法直接添加到 RoketMQTempalte 中,这些方法直接代理调用到 RocketMQ 的 Producer API 来进行消息的发送。 2)事务消息发送端 对于事务消息的处理,在消息发送端进行了部分的扩展,参考上面的调用关系类图。 RocketMQTemplate 里加入了一个发送事务消息的方法 sendMessageInTransaction(),并且最终这个方法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后能够对 TransactionListener 里的方法实现进行调用。 3. 消息消费端实现 在消费端 SpringBoot 应用启动后,会扫描所有包含 @RocketMQMessageListener 注解的类(这些类需要集成 RocketMQListener 接口,并实现 onMessage()方法),这个 Listener 会一对一的被放置到。 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ Consumer 对象,启动并监听定制的 Topic 消息,如果有消费消息,则回调到 Listener 的 onMessage() 方法。 使用示例 上面的一章介绍了 RocketMQ 在 springbootstarter 方式的实现,这里通过一个最简单的消息发送和消费的例子来介绍如何使这个 rocketmqspringbootstarter。 1. RocketMQ 服务端的准备 1)启动 NameServer 和 Broker 要验证 RocketMQ 的 SpringBoot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。 2)创建实例中所需要的 Topics 在执行启动命令的目录下执行下面的命令行操作: bash bin/mqadmin updateTopic c DefaultCluster t stringtopic 2. 编译 rocketmqspringbootstarter 目前的 springbootstarter 依赖还没有提交的 Maven 的中心库,用户使用前需要自行下载 git 源码,然后执行 mvn clean install 安装到本地仓库。 git clone https://github.com/apache/rocketmqexternals.git cd rocketmqspringbootstarter mvn clean install 3. 编写客户端代码 用户如果使用它,需要在消息的发布和消费客户端的 maven 配置文件 pom.xml 中添加如下的依赖: 属性 springbootstarterrocketmqversion 的取值为:1.0.0SNAPSHOT, 这与上一步骤中执行安装到本地仓库的版本一致。 1)消息发送端的代码 发送端的配置文件 application.properties: 发送端的 Java 代码: 2)消息消费端代码 消费端的配置文件 application.properties: 消费端的 Java 代码: 这里只是简单的介绍了使用 springboot 来编写最基本的消息发送和接收的代码,如果需要了解更多的调用方式,如: 异步发送,对象消息体,指定 tag 标签以及指定事务消息,请参看 github 的说明文档和详细的代码。我们后续还会对这些高级功能进行陆续的介绍。 作者简介 辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对 Microservice、Messaging 和 Storage 等领域有深刻理解, 目前专注 RocketMQ 内核优化以及 Messaging 生态建设。 在 PC 端登录 start.aliyun.com 知行动手实验室,沉浸式体验在线交互教程。
作者:辽天
#技术探索 #微服务

2021年4月6日

基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台
导读:本文将对 RocketMQExporter 的设计实现做一个简单的介绍,读者可通过本文了解到 RocketMQExporter 的实现过程,以及通过 RocketMQExporter 来搭建自己的 RocketMQ 监控系统。RocketMQ 在线可交互教程现已登录知行动手实验室,PC 端登录 start.aliyun.com 即可直达。 RocketMQ 云原生系列文章: (本文) RocketMQExporter 项目的 GitHub 地址: 文章主要内容包含以下几个方面: 1. RocketMQ 介绍 2. Prometheus 简介 3. RocketMQExporter 的具体实现 4. RocketMQExporter 的监控指标和告警指标 5. RocketMQExporter 使用示例 RocketMQ 介绍 RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。简单的来说,它由 Broker 服务器和客户端两部分组成,其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。 正由于它具有高性能、高可靠性和高实时性的特点,与其他协议组件在 MQTT 等各种消息场景中的结合也越来越多,应用越来越广泛。而对于这样一个强大的消息中间件平台,在实际使用的时候还缺少一个监控管理平台。 当前在开源界,使用最广泛监控解决方案的就是 Prometheus。与其它传统监控系统相比较,Prometheus 具有易于管理,监控服务的内部运行状态,强大的数据模型,强大的查询语言 PromQL,高效的数据处理,可扩展,易于集成,可视化,开放性等优点。并且借助于 Prometheus 可以很快速的构建出一个能够监控 RocketMQ 的监控平台。 Prometheus 简介 下图展示了 Prometheus 的基本架构: 1. Prometheus Server Prometheus Server 是 Prometheus 组件中的核心部分,负责实现对监控数据的获取,存储以及查询。Prometheus Server 可以通过静态配置管理监控目标,也可以配合使用 Service Discovery 的方式动态管理监控目标,并从这些监控目标中获取数据。其次 Prometheus Server 需要对采集到的监控数据进行存储,Prometheus Server 本身就是一个时序数据库,将采集到的监控数据按照时间序列的方式存储在本地磁盘当中。最后 Prometheus Server 对外提供了自定义的 PromQL 语言,实现对数据的查询以及分析。 2. Exporters Exporter 将监控数据采集的端点通过 HTTP 服务的形式暴露给 Prometheus Server,Prometheus Server 通过访问该 Exporter 提供的 Endpoint 端点,即可获取到需要采集的监控数据。RocketMQExporter 就是这样一个 Exporter,它首先从 RocketMQ 集群采集数据,然后借助 Prometheus 提供的第三方客户端库将采集的数据规范化成符合 Prometheus 系统要求的数据,Prometheus 定时去从 Exporter 拉取数据即可。 当前 RocketMQ Exporter 已被 Prometheus 官方收录,其地址为:。 RocketMQExporter 的具体实现 当前在 Exporter 当中,实现原理如下图所示: 整个系统基于 spring boot 框架来实现。由于 MQ 内部本身提供了比较全面的数据统计信息,所以对于 Exporter 而言,只需要将 MQ 集群提供的统计信息取出然后进行加工而已。所以 RocketMQExporter 的基本逻辑是内部启动多个定时任务周期性的从 MQ 集群拉取数据,然后将数据规范化后通过端点暴露给 Prometheus 即可。其中主要包含如下主要的三个功能部分: MQAdminExt 模块通过封装 MQ 系统客户端提供的接口来获取 MQ 集群内部的统计信息。 MetricService 负责将 MQ 集群返回的结果数据进行加工,使其符合 Prometheus 要求的格式化数据。 Collect 模块负责存储规范化后的数据,最后当 Prometheus 定时从 Exporter 拉取数据的时候,Exporter 就将 Collector 收集的数据通过 HTTP 的形式在/metrics 端点进行暴露。 RocketMQExporter 的监控指标和告警指标 RocketMQExporter 主要是配合 Prometheus 来做监控,下面来看看当前在 Expoter 中定义了哪些监控指标和告警指标。 监控指标 rocketmq_message_accumulation 是一个聚合指标,需要根据其它上报指标聚合生成。 告警指标 消费者堆积告警指标也是一个聚合指标,它根据消费堆积的聚合指标生成,value 这个阈值对每个消费者是不固定的,当前是根据过去 5 分钟生产者生产的消息数量来定,用户也可以根据实际情况自行设定该阈值。告警指标设置的值只是个阈值只是象征性的值,用户可根据在实际使用 RocketMQ 的情况下自行设定。这里重点介绍一下消费者堆积告警指标,在以往的监控系统中,由于没有像 Prometheus 那样有强大的 PromQL 语言,在处理消费者告警问题时势必需要为每个消费者设置告警,那这样就需要 RocketMQ 系统的维护人员为每个消费者添加,要么在系统后台检测到有新的消费者创建时自动添加。在 Prometheus 中,这可以通过一条如下的语句来实现: (sum(rocketmq_producer_offset) by (topic) on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)560 0 借助 PromQL 这一条语句不仅可以实现为任意一个消费者创建消费告警堆积告警,而且还可以使消费堆积的阈值取一个跟生产者发送速度相关的阈值。这样大大增加了消费堆积告警的准确性。 RocketMQExporter 使用示例 1. 启动 NameServer 和 Broker 要验证 RocketMQ 的 SpringBoot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。 2. 编译 RocketMQExporter 用户当前使用,需要自行下载 git 源码编译: git clone https://github.com/apache/rocketmqexporter cd rocketmqexporter mvn clean install 3. 配置和运行 RocketMQExporter 有如下的运行选项: 以上的运行选项既可以在下载代码后在配置文件中更改,也可以通过命令行来设置。 编译出来的 jar 包就叫 rocketmqexporter0.0.1SNAPSHOT.jar,可以通过如下的方式来运行。 java jar rocketmqexporter0.0.1SNAPSHOT.jar [rocketmq.config.namesrvAddr="127.0.0.1:9876" ...] 4. 安装 Prometheus 首先到 Prometheus去下载 Prometheus 安装包,当前以 linux 系统安装为例,选择的安装包为 prometheus2.7.0rc.1.linuxamd64.tar.gz,经过如下的操作步骤就可以启动 prometheus 进程。 tar xzf prometheus2.7.0rc.1.linuxamd64.tar.gzcd prometheus2.7.0rc.1.linuxamd64/./prometheus config.file=prometheus.yml web.listenaddress=:5555 Prometheus 默认监听端口号为 9090,为了不与系统上的其它进程监听端口冲突,我们在启动参数里面重新设置了监听端口号为 5555。然后通过浏览器访问 ;服务器 IP 地址:5555,就可以验证 Prometheus 是否已成功安装,显示界面如下: 由于 RocketMQExporter 进程已启动,这个时候可以通过 Prometheus 来抓取 RocketMQExporter 的数据,这个时候只需要更改 Prometheus 启动的配置文件即可。 整体配置文件如下: my global config global: scrape_interval: 15s Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s Evaluate rules every 15 seconds. The default is every 1 minute. scrape_timeout is set to the global default (10s). Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: "first_rules.yml" "second_rules.yml" scrape_configs: job_name: 'prometheus' static_configs: targets: ['localhost:5555'] job_name: 'exporter' static_configs: targets: ['localhost:5557'] 更改配置文件后,重启服务即可。重启后就可以在 Prometheus 界面查询 RocketMQExporter 上报的指标,例如查询 rocketmq_broker_tps 指标,其结果如下: 5. 告警规则添加 在 Prometheus 可以展示 RocketMQExporter 的指标后,就可以在 Prometheus 中配置 RocketMQ 的告警指标了。在 Prometheus 的配置文件中添加如下的告警配置项,.rules 表示可以匹配多个后缀为 rules 的文件。 rule_files: "first_rules.yml" "second_rules.yml" /home/prometheus/prometheus2.7.0rc.1.linuxamd64/rules/.rules 当前设置的告警配置文件为 warn.rules,其文件具体内容如下所示。其中的阈值只起一个示例的作用,具体的阈值还需用户根据实际使用情况来自行设定。 Sample prometheus rules/alerts for rocketmq. Galera Alerts groups: name: GaleraAlerts rules: alert: RocketMQClusterProduceHigh expr: sum(rocketmq_producer_tps) by (cluster) = 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} Sending tps too high.' summary: cluster send tps too high alert: RocketMQClusterProduceLow expr: sum(rocketmq_producer_tps) by (cluster) = 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} consuming tps too high.' summary: cluster consume tps too high alert: RocketMQClusterConsumeLow expr: sum(rocketmq_consumer_tps) by (cluster) 0 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.topic}} lag behind and is falling behind (behind value {{$value}}).' summary: consumer lag behind alert: GroupGetLatencyByStoretime expr: rocketmq_group_get_latency_by_storetime 1000 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.broker}}, {{$labels.topic}} consume time lag behind message store time and (behind value is {{$value}}).' summary: message consumes time lag behind message store time too much 最终,可以在 Prometheus 的看一下告警展示效果,红色表示当前处于告警状态的项,绿色表示正常状态。 6. Grafana dashboard for RocketMQ Prometheus 自身的指标展示平台没有当前流行的展示平台 Grafana 好, 为了更好的展示 RocketMQ 的指标,可以使用 Grafana 来展示 Prometheus 获取的指标。 首先到官网去下载:,这里仍以二进制文件安装为例进行介绍。 wget https://dl.grafana.com/oss/release/grafana6.2.5.linuxamd64.tar.gz tar zxvf grafana6.2.5.linuxamd64.tar.gz cd grafana5.4.3/ 同样为了不与其它进程的使用端口冲突,可以修改 conf 目录下的 defaults.ini 文件的监听端口,当前将 grafana 的监听端口改为 55555,然后使用如下的命令启动即可: ./bin/grafanaserver web 然后通过浏览器访问 ;服务器 IP 地址:55555,就可以验证 grafana 是否已成功安装。系统默认用户名和密码为 admin/admin,第一次登陆系统会要求修改密码,修改密码后登陆,界面显示如下: 点击 Add data source 按钮,会要求选择数据源。 选择数据源为 Prometheus,设置数据源的地址为前面步骤启动的 Prometheus 的地址。 回到主界面会要求创建新的 Dashboard。 点击创建 dashboard,创建 dashboard 可以自己手动创建,也可以以配置文件导入的方式创建,当前已将 RocketMQ 的 dashboard 配置文件上传到 Grafana 的官网,这里以配置文件导入的方式进行创建。 点击 New dashboard 下拉按钮。 选择 import dashboard。 这个时候可以到 Grafana 官网去下载当前已为 RocketMQ 创建好的配置文件,地址为:,如下图所示: 点击 download 就可以下载配置文件,下载配置文件然后,复制配置文件中的内容粘贴到上图的粘贴内容处。 最后按上述方式就将配置文件导入到 Grafana 了。 最终的效果如下所示: 作者简介 陈厚道,曾就职于腾讯、盛大、斗鱼等互联网公司。目前就职于尚德机构,在尚德机构负责基础架构方面的设计和开发工作。对分布式消息队列、微服务架构和落地、DevOps 和监控平台有比较深入的研究。 冯庆,曾就职于华为。目前就职于尚德机构,在尚德机构基础架构团队负责基础组件的开发工作。 在 PC 端登录 知行动手实验室,沉浸式体验在线交互教程。
作者:陈厚道、冯庆
#技术探索 #可观测