2023年1月6日

RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。 RocketMQ 可观测体系 作为一款典型的分布式中间件产品,RocketMQ 被广泛应用于业务核心链路中,每条消息都关联着核心业务数据的变化。业务链路有其明显的复杂性: 生产者、消费者多对多:业务调用链路网状结构,上下游梳理困难 上下游解耦、异步链路:异步化调用,信息收集不完整 消息是状态数据:未消费成功、定时中等状态增加排查的复杂度 消息链路耦合复杂的业务处理逻辑:无法快速定位问题边界 鉴于消息链路耦合业务系统,复杂带状态,RocketMQ 通过强大的可观测系统和经验支撑,及时发现问题、定位问题、解决问题有助于提升运维效率,对于业务运行是一项重要的保障能力。 RocketMQ 的可观测体系主要由指标(Metrics)、轨迹(Tracing)和日志(Logging)组成。 指标 RocketMQ中定义了详细的Metrics指标,这些指标覆盖生产者、消费者、服务端及消息收发关键接口和流程的统计数据,并支持从实例、Topic和Group等多个维度进行聚合展示,帮助您实时监控消息业务或RocketMQ服务的运行状态。和4.x版本相比,RocketMQ服务端5.x版本增加了消息堆积场景相关指标、关键接口的耗时指标、错误分布指标、存储读写流量等指标,帮助您更好地监控异常场景。 消息轨迹 在分布式应用中,RocketMQ作为全链路中异步解耦的关键服务,提供的Tracing数据可有效将业务上下游信息串联起来,帮助您更好地排查异常,定位问题。和4.x版本相比,RocketMQ服务端5.x版本支持OpenTelemetry开源标准,提供更加丰富的轨迹指标,针对消费场景、高级消息类型场景等细化轨迹内容,为问题定位提供更多关键信息。 日志 RocketMQ为不同的异常情况定义唯一的错误码及错误信息,并划分不同的错误级别,您可以根据客户端返回的错误码信息快速获取异常原因。和4.x版本相比,RocketMQ服务端5.x版本统一了ErrorCode和ErrorMessage,异常日志中增加了RequestID、资源信息,细化了错误信息,保证日志内容明确靠。 RocketMQ 监控告警介绍 RocketMQ 联合阿里云云监控提供了开箱即用且免费的监控报警服务,可帮助您解决如下问题: 实例规格水位监控预警 若您实际使用的指标值超过实例的规格限制,RocketMQ会进行强制限流。提前配置实例规格水位告警可以提前发现规格超限风险并及时升配,避免因限流导致的业务故障。 业务逻辑错误监控预警 您在消息收发时可能会收到异常报错,配置调用错误告警可以提前在业务反馈前发现异常,帮助您提前判断异常来源并及时修复。 业务性能指标监控预警 如果您的消息链路有相关性能指标要求,例如RT耗时、消息延迟等,提前配置业务指标告警可以帮助您提前治理业务风险。 RocketMQ 版提供了丰富的 Metric 指标和告警监控项。各监控项可分为运行水位、收发性能、异常错误事件三类告警。根据大量生产环境实践经验,建议您根据以下原则配置如下告警 接下来重点通过消息堆积和消息收发失败这两个典型场景来阐述基于可观测体系中的指标(Metrics),RocketMQ 如何通过云监控创建监控规则,将关键的 Metrics 指标作为告警项,帮助您自动监控服务的运行状态,并自动发送报警通知, 便于您及时预警服务的异常信息,提高运维效率。 应用场景1:消息堆积问题 消息堆积指标及监控配置 业界通用指标:使用消息堆积量(ready + inflight)来度量消费健康度,表示未处理完成的消息量;部分产品额外增加已就绪消息量来度量消息拉取的及时性;使用上述 2 个指标直接来配置报警有以下缺点: 有误报或无法触发报警的问题 及时性的间接指标,不直观 RocketMQ 指标:额外支持延时时间来度量消费健康度,涵盖了所有业务场景,根据业务容忍延迟度直接配置时间告警阈值。 消息处理延迟时间:表示业务处理完成及时度 已就绪消息排队时间:表示拉取消息及时度 建议对消息堆积敏感的用户,都在 RocketMQ 实例页的监控报警,添加如下报警指标,并设置符合业务需求的阈值。 如何定位和处理堆积问题 假如收到堆积报警,确认消息出现堆积情况,可参考以下措施进行定位和处理。 1. 判断消息堆积在 RocketMQ 服务端还是客户端 查看客户端本地日志文件 ons.log,搜索是否出现如下信息:the cached message count exceeds the threshold 出现相关日志信息,说明客户端本地缓冲队列已满,消息堆积在客户端,请执行步骤2。 若未出现相关日志,说明消息堆积不在客户端,若出现这种特殊情况,请直接提交工单联系阿里云技术支持。 2. 确认消息的消费耗时是否合理 若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑,请执行步骤3。 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。 消息的消费耗时可以通过以下方式查看: 查看消费者状态,在客户端连接信息中查看业务处理时间,获取消费耗时的平均值。 3. 查看客户端堆栈信息。只需要关注线程名为 ConsumeMessageThread 的线程,这些都是业务消费消息的逻辑。 客户端堆栈信息可以通过以下方式获取:查看消费者状态,在客户端连接信息中查看 Java 客户端堆栈信息 使用 Jstack 工具打印堆栈信息。 常见的异常堆栈信息如下: 消费逻辑有抢锁休眠等待等情况。消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。 示例一: 消费逻辑操作数据库等外部存储卡住。消费线程阻塞在外部的 HTTP 调用上,导致消费缓慢。 示例二: 4. 针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以跳过不消费,您可以通过重置消费位点跳过这些堆积的消息从最新位点开始消费,快速恢复业务。 如何避免消息堆积 为了避免在业务使用时出现非预期的消息堆积和延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。 梳理消息的消费耗时通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息: 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。 消息消费逻辑中的 I/O 操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。外部 I/O 操作通常包括如下业务逻辑: 读写外部数据库,例如 MySQL 数据库读写。 读写外部缓存等系统,例如 Redis 读写。 下游系统调用,例如 Dubbo 调用或者下游 HTTP 接口调用。 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。 设置消息的消费并发度 逐步调大线程的单个节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。 应用场景2:消息收发失败问题 消息收发的核心流程 从上图中可以看出消息收发都要先从 NameServer 返回路由,再通过 broker 的鉴权以及实例规格是否超限的判断,才能进行正常收发消息。根据经验检消息收发失败的原因有如下情况: API 请求频率是否超过实例规格限制 查网络是否正常 服务端是否是有重启造成的短期收发失败 操作资源是否有权限 常见的消息收发失败异常 在无论开发阶段还是生产运行阶段,遇到收发失败问题,我们都可以从客户端日志出发进行排查。以下列举下常见的消息收发失败异常场景: 1. 在客户端日志中出现ClusterName consumer groupId consumer topic messages flow control, flow limit threshold is , remainMs 异常信息 原因:RocketMQ 每个实例都明确了消息收发 API 调用 TPS,例如,标准版实例支持每秒 5000 次 API 调用,若实例消息收发 API 调用频率超过规格限制,会导致实例被限流。实例被限流后,导致部分消息收发请求失败。 建议措施: 1. 配置实例 API 调用频率监控告警 建议设置为规格上限的 70%。例如,您购买的实例消息收发 TPS 上限为 10000,则告警阈值建议设置为 7000。 1. 配置限流次数告警 RocketMQ 支持将指定实例触发限流的事件作为监控项,通过对限流次数的监控,可以帮助您了解当前业务的受损情况。 2. 在客户端日志中出现RemotingConnectException: connect to failed 或者 RemotingTimeoutException 等异常信息。 可能有如下原因: MQ 服务升级过程中 , 会出现短暂的网络闪断,查看官网公告看是否在服务升级窗口 检查应用服务器到broker的网络是否通畅,是否有网络延迟 检查应用的网络带宽情况,是否被打满 确认下应用是否出现 FGC 现象,FGC 会造成一定的网络延迟 3. 在客户端日志当中出现 system busy, start flow control for a while 或者 broker busy, start flow control for a while等异常信息。 可能原因:共享集群 broker(出现网络,磁盘,IO 等抖动)压力大,造成消息收发出现排队现象;若是偶尔短暂抖动,此类错误 SDK 会自动重试,但建议在自己的业务代码做好异常处理,当自动重试次数超限仍失败情况下,业务根据需要做好容灾。若长时间持续出现,可以提工单让技术人员跟进排查。
作者:合伯
#技术探索 #可观测

2022年12月21日

消息收发弹性——生产集群如何解决大促场景消息收发的弹性&降本诉求
产品介绍—什么是消息收发弹性 大家好,我是来自阿里云云原生消息团队的赖福智,花名宸罡,今天来给大家分享下阿里云 RocketMQ5.0 实例的消息弹性收发功能,并且通过该功能生产集群是如果解决大促场景消息收发的弹性以及降本诉求的。 阿里云弹性策略 本次将会从产品介绍,产品使用及限制,使用方式及演示三个方面来介绍。在介绍 Rocketmq5.0 实例的消息首发弹性之前,先从整体上看下阿里云的弹性策略。我们通常认为业务方往往存在预期外的突发业务热点和毛刺流量,常规扩容无法及时应对,这样一来服务会有不确定性的风险。因此为了应对突发流量,我们设计了一套处理机制,最基本的是要满足规格内的预期流量,然后是应对弹性区间内的突发流量可以随时开启的弹性能力,最后是要有对完全超过弹性上限流量的限流限流能力。针对弹性区间的突发流量,传统自建集群通过常规扩容方式应对,需要分钟级的处理时间,在这段时间内业务会受损,并且为了这部分偶尔的突发流量扩容到一个较大的规格并不划算。云上5.0实例的消息收发弹性能力对弹性区间内的突发流量可以做到秒级响应,针对大促这种预期内的短期突发流量可以按量收费更加实惠,仅当用户真正用到这部分弹性能力才收费。 消息收发弹性简介 接下来我们就看具体看下 5.0 实例的消息收发弹性,消息收发弹性最直观的感受就是在 5.0 实例的详情页面的自适应弹性 TPS 这部分,可以看到在正常消息收发 TPS 的旁边额外有一个自适应弹性 TPS。通过这部分弹性 TPS 的设置,用户可以快速、低成本的应对大促这种短时间突发流量的场景。 这时可能有小伙伴会问为什么我不直接升级规格提高标准收发 TPS,而是使用弹性 TPS 呢?让我们假设一个典型的大促场景,比如在今晚 0 点有大促活动,使用消息弹性功能的用户完全可以提前几天就把弹性功能打开,大促结束等流量恢复后再把弹性功能关闭,实际上不关闭也不会有什么问题,不使用则不收费。 如果通过升级规格来提升标准 TPS 应对大促流量,用户同样是提前几天就把规格升高了,那么在大促前这几天按照高规格收费但实际又跑不到高规格的 TPS,实际上花了更多的钱但是确造成了资源的浪费。如果用户为了避免资源浪费在大促当天 0 点前升级规格,一个是需要用户付出额外的精力来关注 RocketMQ 按时升配,再就是实例的升配是一个重资源操作,扩容耗时长,无法做到即开即用秒级生效,很有可能已经到 0 点了但是升配还没有完成。 使用消息弹性功能的话可以做到秒级生效开箱即用,并且如果没有使用到这部分额外的弹性 TPS 是不会收费的。但是弹性 TPS 也不是个解决问题的万能银弹,也是有上限的,基本上可以在规格标准 TPS 的基础上额外有一半的弹性 TPS,如果标准 TPS+ 弹性 TPS 仍然无法满足用户业务需求,此时意味着仅扩容弹性节点已经无法满足需求,同时需要扩容存储节点,所以需要升配规格,这部分的原理后面会详细解释。 也有用户会问,如果我的日常 TPS 在 2500 左右,可不可以购买一个 2000 标准 TPS 的实例并且一直开着 1000 的弹性 TPS 满足需求呢?这种情况我们建议直接使用标准 TPS 大于 2500 的实例,因为弹性 TPS 这部分的使用会额外计费,如果一天 24 小时都在大量使用弹性 TPS,从计费上来说直接使用更高规格的实例更实惠。 5.0 实例消息收发弹性的实现方式,和传统自建方式的对比 接下来我们看下阿里云 RocketMQ5.0 实例是怎么实现消息收发弹性的,并且在扩容场景和自建 RocketMQ 有什么优势。传统自建 RocketMQ 集群如左图显示,是一个存储计算不分离的架构,这种架构下 Broker 是一个很重的组件,因为它同时需要处理客户端的请求,也要负责数据的读取写入,Broker 同时负责计算和存储。作为一个有状态的节点,Broker 扩容是一个很重的操作,时间会很慢,而且在很多时候我们并不需要扩容存储能力,仅仅需要应对高 TPS 请求的计算能力,此时随着 Broker 扩容的存储扩容实际上被浪费了。 再来看下 RocketMQ5.0 实例消息收发弹性是怎么做的,首先 5.0 实例的架构是存储计算分离的模式,用户的客户端仅会请求计算层的计算节点,计算节点操作存储节点读写消息,客户端并不会直接访问存储节点。开启消息收发弹性功能意味着开启了计算层的弹性能力。得益于这种存储计算分离的架构,可以让我们快速低成本的扩容计算层节点,计算层节点作为无状态应用可以做到秒级扩容,十分便捷。而且在云厂商拥有大量资源池的前提下可以做到资源的弹性扩容。可以说 RocketMQ5.0 实例的消息收发弹性能力依赖于阿里云作为云厂商的弹性能力和存算分离的技术方案得以实现。 在大促这种短时间大流量的场景下,大部分都是不需要扩容存储节点的,此时就可以通过开通消息收发弹性的能力满足需求。 产品使用及限制:消息收发弹性的使用及限制 支持版本 消息收发弹性的功能仅在专业版和铂金版支持,标准版实例不支持,并且专业版的单机规格作为给用户使用的测试版本也不支持。 弹性上限 不同规格实例的弹性 TPS 上限不同,基本上在标准 TPS 的基础上额外有一半的弹性 TPS,下图所示为专业版的弹性 TPS 上限。受篇幅所限,其他规格的弹性上限可以参考官方文档,就不再列出了。 计费方式 弹性 TPS 是额外计费的,计费周期按小时计费,不足 1 小时,按 1 小时计算。计费方式为超过限制的 TPS× 使用时长(小时)× 弹性规格单价(元/TPS/小时)弹性规格单价如下图,不同地域的单价会有略微差异。 SLA 可能有小伙伴会担心使用到这部分额外的弹性 TPS 会不会有问题,毕竟这部分是在标准 TPS 之上额外的能力,有一种自己实例超负荷运转的感觉。这个是完全不用担心的,不同规格的弹性上限已经经过压测验证,和规格标准 TPS 享受一样的稳定性 SLA 保证。 使用方式及演示:结合业务场景的最佳使用方式 开启方式、生效时间、收发比例 最后我们来实际操作下开启弹性收发能力并且验证该功能。RocketMQ5.0实例依然支持使用 RocketMQ4.0 实例的 1.x 客户端访问,所以这里分别提供了 1.x 客户端和 5.x 客户端的测试代码实例。 该程序开启了 200 个线程的线程池通过 ratelimiter 根据输入参数设置每秒最大的发送消息条数,打印失败的原因,并且每秒统计成功发送的消息量 在这里我已经提前购买好了一个专业版的实例,默认是不会开启消息收发弹性能力的。我们可以点击这里的开启弹性按钮进入实例修改页面开启弹性功能。这里要注意的是开启之后的弹性 TPS 依然受实例整体的消息收发占比设置,用户可以根据自己的消息收发场景设置该比例。 再开启之前我们来尝试下每秒发送 2300 个消息会怎么样,可以看到已经被限流了,并且每秒成功发送的量要比 2000 多一些。接着我们将弹性开启,并且将默认的收发比 1:1 改为 4:5,这是修改后的实例状态,现在让我们继续每秒发送 2300 个消息来验证下,可以看到已经都成功发送了。
作者:宸罡
#技术探索

2022年11月30日

RocketMQ 5.0 可观测能力升级:Metrics 指标分析
从消息的生命周期看可观测能力 在进入主题之前先来看一下 RocketMQ 生产者、消费者和服务端交互的流程: message produce and consume process RocketMQ 的消息是按照队列的方式分区有序储存的,这种队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势,尤其是在流式处理场景下能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 接下来我们来看一下消息的整个生命周期中需要关注的重要节点: message life cycle 首先是消息发送:发送耗时是指一条消息从生产者开始发送到服务端接收到并储存在硬盘上的时间。如果是定时消息,需要到达指定的定时时间才能被消费者可见。 服务端收到消息后需要根据消息类型进行处理,对于定时/事务消息只有到了定时时间/事务提交才对消费者可见。RocketMQ 提供了消息堆积的特性,即消息发送到服务端后并不一定立即被拉取,可以按照客户端的消费能力进行投递。 从消费者的角度上看,有三个需要关注的阶段: 拉取消息:消息从开始拉取到抵达客户端的网络和服务端处理耗时; 消息排队:等待处理资源,即从消息抵达客户端到开始处理消息; 消息消费:从开始处理消息到最后提交位点/返回 ACK。 消息在生命周期的任何一个阶段,都可以清晰地被定义并且被观测到,这就是 RocketMQ 可观测的核心理念。而本文要介绍的 Metrics 就践行了这种理念,提供覆盖消息生命周期各个阶段的监控埋点。借助 Metrics 提供的原子能力我们可以搭建适合业务需要的监控系统: 日常巡检与监控预警; 宏观趋势/集群容量分析; 故障问题诊断。 RocketMQ 4.x Metrics 实现 – Exporter RocketMQ 团队贡献的 RocketMQ exporter 已被 Prometheus 官方的开源 Exporter 生态所收录,提供了 Broker、Producer、Consumer 各个阶段丰富的监控指标。 exporter metrics spec Exporter 原理解析 RocketMQ expoter 获取监控指标的流程如下图所示,Expoter 通过 MQAdminExt 向 RocketMQ 集群请求数据。获取的数据转换成 Prometheus 需要的格式,然后通过 /metics 接口暴露出来。 rocketmq exporter 随着 RocketMQ 的演进,exporter 模式逐渐暴露出一些缺陷: 无法支持 RocketMQ 5.x 中新加入的 Proxy 等模块的可观测需求; 指标定义不符合开源规范,难以和其他开源可观测组件搭配使用; 大量 RPC 调用给 Broker 带来额外的压力; 拓展性差,增加/修改指标需要先修改 Broker 的 admin 接口。 为解决以上问题,RocketMQ 社区决定拥抱社区标准,在 RocketMQ 5.x 中推出了基于 OpenTelemtry 的 Metrics 方案。 RocketMQ 5.x 原生 Metrics 实现 基于 OpenTelemtry 的 Metrics OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题,提供与三方 vendor 无关的服务。 在讨论新的 Metrics 方案时 RocketMQ 社区决定遵守 OpenTelemetry 规范,完全重新设计新 metrics 的指标定义:数据类型选用兼容 Promethues 的 Counter、Guage、Histogram,并且遵循 Promethues 推荐的指标命名规范,不兼容旧有的 rocketmqexporter 指标。新指标覆盖 broker、proxy、producer、consumer 等各个 module,对消息生命周期的全阶段提供监控能力。 指标上报方式 我们提供了三种指标上报的方式: Pull 模式:适合自运维 K8s 和 Promethues 集群的用户; Push 模式:适合希望对 metrics 数据做后处理或接入云厂商的可观测服务的用户; Exporter 兼容模式:适合已经在使用 Exporter 和有跨数据中心(或其他网络隔离环境)传输 metrics 数据需求的用户。 Pull Pull 模式旨在与 Prometheus 兼容。在 K8s 部署环境中无需部署额外的组件,prometheus 可以通过社区提供的 K8s 服务发现机制(创建 PodMonitor、ServiceMonitor CDR)自动获取要拉取的 broker/proxy 列表,并从他们提供的 endpoint 中拉取 metrics 数据。 pull mode Push OpenTelemetry 推荐使用 Push 模式,这意味着它需要部署一个 collector 来传输指标数据。 push mode OpenTelemetry 官方提供了 collector 的实现,支持对指标做自定义操作如过滤、富化,可以利用社区提供的插件实现自己的 collector。并且云厂商提供的可观测服务(如 AWS CloudWatch、阿里云 SLS)大多已经拥抱了 OpenTelemetry 社区,可以直接将数据推送到它们提供的 collector 中,无需额外的组件进行桥接。 OpenTelemetry collector 兼容 RocketMQ Exporter 新的 Metrics 也提供对 RocketMQ Exporter 的兼容,现在使用 exporter 的用户无需变更部署架构即可接入新 Metrics。而且控制面应用(如 Promethues)和数据面应用(如 RocketMQ)有可能隔离部署。因此借助 Exporter 作为代理来获取新的 Metrics 数据也不失为一种好的选择。 RocketMQ 社区在 Exporter 中嵌入了一个 OpenTelemetry collector 实现,Broker 将 Metrics 数据导出到 Exporter,Exporter 提供了一个新的 endpoint(下图中的 metricsv2)供 Prometheus 拉取。 exporter mode 构建监控体系最佳实践 丰富的指标覆盖与对社区标准的遵循使得可以轻而易举的借助 RocketMQ 的 Metrics 能力构建出适合业务需求的监控体系,这个章节主要以一个典型的流程介绍构建监控体系的最佳实践: 集群监控/巡检 触发告警 排查分析。 集群状态监控与巡检 我们将指标采集到 Promethues 后就可以基于这些指标配置监控,这里给出一些示例: 接口监控: 监控接口调用情况,可以据此快速抓出异常的请求对症下药 下图给出一些相关示例:所有 RPC 的耗时(avg、pt90、pt99 等)、成功率、失败原因、接口调用与返回值分布情况等。 rpc metrics 客户端监控: 监控客户端的使用情况,发现非预期的客户端使用如超大消息发送、客户端上下线、客户端版本治理等。 下图给出一些相关示例:客户端连接数、客户端语言/版本分布、发送的消息大小/类型分布。 client metrics Broker 监控: 监控 Broker 的水位和服务质量,及时发现集群容量瓶颈。 下图给出一些相关示例:Dispatch 延迟、消息保留时间、线程池排队、消息堆积情况。 broker metrics 以上的示例只是 Metrics 的冰山一角,需要根据业务需要灵活组合不同的指标配置监控与巡检。 告警配置 有了完善的监控就可以对需要关注的指标配置告警,比如可以配置 Broker 监控中 Dispatch 延迟这个指标的告警: broker alert 收到告警后可以联动监控查看具体原因,关联发送接口的失败率可以发现有 1.7% 的消费发送失败,对应的报错是没有创建订阅组: promblem analysis 问题排查分析 最后以消息堆积这个场景为例来看一下如何基于 Metrics 分析线上问题。 从消息生命周期看堆积问题 正如本文开篇所述,排查 RocketMQ 的问题需要结合消息的生命周期综合分析,如果片面的认定是服务端/客户端的故障未免会误入歧途。 对于堆积问题,我们主要关注消息生命周期中的两个阶段: 就绪消息:就绪消息是可供消费但还未被拉取的消息,即在服务端堆积的消息; 处理中消息:处理中的消息是被客户端拉取但是还未被消费的消息。 consume lag 多维度指标分析堆积问题 对于堆积问题,RocketMQ 提供了消费延迟相关指标 rocketmq_consumer_lag_latency 可以基于这个指标配置告警。告警的阈值需要根据当前业务对消费延迟的容忍程度灵活指定。 触发告警后就需要对消息堆积在还是就绪消息和处理中消息进行分析,RocketMQ 提供了 rocketmq_consumer_ready_messages 和 rocketmq_consumer_inflight_messages 这两个指标,结合其他消费相关指标与客户端配置综合分析即可判断出消息堆积的根因: case 1:就绪消息持续上涨,处理中消息达到客户端堆积上限 这是最常见的堆积场景,客户端处理中的消息量 rocketmq_consumer_inflight_messages 达到了客户端配置的阈值,即消费者的消费能力低于消息发送量。如果业务要求尽可能实时的消费消息就需要增加消费者机器数量,如果业务对消息延迟不是很敏感可以等待业务高峰过去后再消化堆积的消息。 case 2:就绪消息几乎为 0,处理中消息持续上涨 这个 case 多出现在使用 RocketMQ 4.x 客户端的场景,此时消费位点是顺序提交的,如果某条消息的消费卡住会导致位点无法提交。看起来的现象是消息在客户端大量堆积,即处理中消息持续上涨。可以结合消费轨迹和 rocketmq_process_time 这个指标抓出消费慢的消息分析上下游链路,找到根因优化消费逻辑。 case 3: 就绪消息持续上涨,处理中消息几乎为 0 此种场景说明客户端没有拉取到消息,一般有如下几种情况: 鉴权问题:检查 ACL 配置,如果使用公有云产品请检查 AK、SK 配置; 消费者 hang 住:尝试打印线程堆栈或 gc 信息判断是否是进程卡死; 服务端响应慢:结合 RPC 相关指标查看拉取消息接口调用量与耗时、硬盘读写延迟。检查是否为服务端问题,如硬盘 IOPS 被打满了等等。
作者:玄珏
#技术探索 #可观测

2022年10月20日

解析 RocketMQ 多样消费功能-消息过滤
什么是消息过滤 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。 就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。 消息过滤的应用场景 我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。 电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。 而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。 举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。 试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案: 1. 通过将主题进行拆分,将不同的消息发送到不同主题上。 对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。 2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。 这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。 有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。 RocketMQ 消息过滤的模式 RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。 在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍 Tag 标签过滤 功能介绍 Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。 过滤语法 1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者; 2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费; 3. 全 Tag 匹配:使用星号()作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。 使用方式 1. 发送消息,设置 Tag 标签 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息 .setTag("TagA") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个 Tag 标签 //只订阅消息标签为“TagA”的消息 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个 Tag 标签 //只订阅消息标签为“TagA”、“TagB”或“TagC”的消息 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有 Tag 标签,即不过滤 //使用Tag标签过滤消息,订阅所有消息 FilterExpression filterExpression = new FilterExpression("", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 技术原理 RocketMQ 在存储消息的时候,是通过 AppendOnly 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。 整个 Tag 标签过滤的流程如下: 1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker; 2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中; 3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中; 4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者; 5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。 SQL 属性过滤 功能介绍 SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。 过滤语法 1. 数值比较:, =, , IN 3. 判空运算:IS NULL or IS NOT NULL 4. 逻辑运算:AND, OR, NOT 使用方式 1. 发送消息,设置属性 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息属性,用于消费端根据指定属性过滤消息。 .addProperty("Channel", "TaoBao") .addProperty("Price", "5999") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price5000", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有属性 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 技术原理   由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。 整个 SQL 过滤的处理流程如下: 1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数;  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker;  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图;  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配;  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者  差异及对比 最佳实践 主题划分及消息定义 主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。 那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。 主题划分的原则   1. 业务领域是否一致 不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。 2. 业务场景是否一致 同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。 3. 消息类型是否一致 同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。 消息定义的原则   1. 无标签无属性 对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。 2. 如何定义标签 标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。 3. 如何定义属性 属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。 保持订阅关系一致 订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。 正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 TopicA 这个主题,但是消费者一订阅的是 TagA 这个标签的消息,消费者二订阅的是 TagB 这个标签的消息,那么他们两者的订阅关系就存在不一致。 导致的问题: 那么订阅关系不一致会导致什么问题呢? 1. 频繁复杂均衡 在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。 2. 消费速率下降 客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。 3. 消息重复消费 客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。 4. 消息未消费 订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。 使用的建议 在消息过滤使用中,有以下建议: 1. 不要共用消费者组 不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。 2. 不频繁变更订阅关系 频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。 3. 变更做好风险评估 由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。 4. 消费做好幂等处理 不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。 到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注。
作者:徒钟
#技术探索 #功能特性

2022年10月12日

RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。 全新统一的 API 此处的 API 并不单单只是接口上的定义,同时也规定了各个接口不同的方法和行为,明确了整个消息模型。 RocketMQ 过去的 API 从第一版开始,至今已经过了很长时间,长期依赖是一个缺乏变革的状态,对于一些中途打算废弃或者变更的 API 也没有进行后续的迭代。此外,接口的定义也不够清晰。因此,RocketMQ 希望在 5.0 中能够建立一个统一的规范,精简整个 API,通过引入 builder 模式来引入更多的不变性,同时做好异常管理,给开发者和用户一个更加清爽的面貌。 目前 C++ 和 Java 已经进行了 5.0 API 的定义与实现,更多语言的支持也陆续在路上了。我们也欢迎更多的开发者可以参与到社区的工作中来。这里给出 5.0 客户端的仓库链接: 除了在上述接口上做的一些修改之外, RocketMQ 5.0 还规定了四种新的不同的客户端类型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。 其中 Pull Consumer 还在开发中;Producer 主要还是做了接口裁剪,规范了异常管理。在功能上其实并没有做一些颠覆性的改变。Push Consumer 也是比较类似的;Simple consumer 将更多的权利将下发给用户,是一种用户可以主动控制消息接收与处理过程的消费者,特别的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都采用 RocketMQ 的 pop 机制进行实现,一些社区的同学可能已经熟悉了。 如果用户并不一定想控制或者关心整个消息的接收过程,只在乎消息的消费过程的话,这个时候 Push Consumer 可能是一个更好的选择。 RocketMQ 5.0 定义了四种不同的消息类型。过去的开源版本中其实我们并没有去突出消息类型这样一个概念,后续出于维护及运维方面的需要以及模型定义的完备,才让今天的 5.0 有了消息类型的这样一个概念。 1、NORMAL:普通消息。 2、FIFO:满足先入先出的语义。用户可以通过定义 message group 来控制消息间的消费顺序。例如图中的 fruit 这个 topic 下,可以定义不同的 message group,在 apple 这个 message group 下,会按照发送顺序决定消息被消费的顺序,并且不同的 message group 之间不会互相干扰。 3、TRANSACTIONAL:可以将一条或多条消息包成一个事务,最终用户可以根据自己的业务结果选择提交或者回滚。 4、DELAY:用户可以自主地设置消息的定时时间,相比之前开源版本仅允许用户设置定时/延迟级别,5.0 的实现中还讲允许用户设置更精确的时间戳。 以上四种消息是互斥的,我们会在 topic 的元数据去标识它的类型。实际在消息发送的时候如果如果出现了尝试发送的消息类型与 topic 类型不匹配的情况,也会做一些限制。 实现 RocketMQ 5.0 在客户端的启动过程中提前进行了更多的准备工作。比如用户提前设置要发送消息的 topic 时,Producer 会在启动过程中尝试获取对应 topic 的路由。在过去的客户端实现中,在针对于某个 topic 第一次发送消息时,需要先获取路由,这里就会有一个类似冷启动的过程。 提前获取 Topic 的路由信息有两点好处: 1. 不阻塞后面的发送,让消息的发送仅仅触发发送这一个动作。 2. 错误前置,比如用户要往一个不存在 Topic 发送消息时,因为路由的获取参与到整个客户端的启动过程,获取路由不成功,那整个客户端启动可能就会失败,用户也是拿不到对应的 Producer 对象的。 类似的,Consumer 的启动也会有这样的一个过程。 除此之外,我们在客户端和服务端之间增加了一个 Telemetry 的部分,它会在客户端和服务端之间建立起了一个进行双向数据通讯的通道,客户端和服务端会在这个过程中沟通配置,比如服务端可以实现对客户端配置的下发,更好地管理客户端。此外,Telemetry 也可以将本地配置主动上报给服务端,让服务端也可以对客户端的设置有更好的了解。Telemetry 通道也会在客户端启动时尝试建立,如果这个通道没有建立成功,也会影响客户端的启动。 总的来说,客户端的启动过程会尽可能将所有准备工作做好。同时在客户端和服务端之间建立 Telemetry 这样一个通讯通道。 客户端内部存在一些周期性的任务,比如路由的定时更新以及客户端往服务端发送心跳等。对于上文中提到的 Telemetry 过程中,客户端的配置上报也是周期性的。 Producer 在 RocketMQ 5.0 中的具体工作流程 消息在发送时,会检查是否已经获取对应 topic 的路由信息。如果已经获取,则尝试在路由中选取队列,随即查看要发送的消息的类型是否与 topic 类型匹配,如果匹配,则进行消息发送。如果发送成功,则返回;否则,判断当前重试次数是否超出用户设置的上限,如果超出,则返回失败;否则轮转到下一个队列,然后对新的队列进行重试直到消费次数超出上线。而如果启动过程中没有提前获取路由,那么消息发送时依然会先尝试获取路由,然后再进行下一步操作。 另外一点相对于老客户端较大的改变在于,客户端从底层 RPC 交互到上层的业务逻辑全部采用异步实现。Producer 依然会提供一个同步发送接口和异步发送接口,但同步的方法也是使用异步来实现,整个逻辑非常统一和清爽。 Push Consumer 分为两部分,消息的接收和消费。 消息接收流程为:客户端需要不断地从服务端拉取消息,并将消息缓存。Push Consumer 会将消息先缓存到客户端的本地,再进行消费,因此它会判断客户端本地的 Cache 是否已满,如果已满,则隔一段时间再判断,直到消息被客户端消费,Cache 尚有余量时再进行消息拉取。为了避免出现一些内存问题,Cache 的大小也是被严格限制的。 消息消费过程分为两个类型,顺序类型和非顺序类型。 其中非顺序类型即并发消费。消费者会先从 Cache 中获取消息,然后尝试消费消息,消费后再将消息从 Cache 中移除。消息消费成功时,会尝试将消息 ACK ,消费流程结束;如果消费失败,则尝试修改消息的可见时间,即决定下一次什么时候可见。 顺序消费指对于同一个 Group 的消息,最终消费时一定是前一条消息被消费过并且得到确认后,后面的消息才能够继续消费。而消费过程与非顺序消费类似,首先尝试从 Cache 中拉取消息,如果消费成功,则将消息 ACK。ACK 成功后,将其从 Cache 中移除。特别地,如果消费失败,会 suspend 一段时间,然后继续尝试对消息进行消费。此时会判断消费次数是否超限,如果超限,则会尝试将消息放入死信队列中。 相对于非顺序消费,顺序消费更复杂,因为其需要保证前一个消息消费成功后才能对后面的消息进行消费。顺序消费的消费逻辑是基于 message group 隔离的。message group 会在发送时做哈希,从而保证 message group 的消息最终会落在一个队列上,顺序消费模式本质上保证队列内部消费的顺序。 此外,因为不同 message group 的顺序消息最终可能会映射到同一个队列上,这可能会导致不同的 message group 之间的消费形成阻塞,因此服务端未来会实现一个虚拟队列,让不同的 message group 映射到客户端的虚拟队列,保证他们之间没有任何阻塞,从而加速数据消息的消费过程。 对于 Simple Consumer,用户可以主动控制消息接收和确认的流程。比如用户收到消息后,可以根据业务决定是否过一段时间再消费该消息,或者不需要再收到该消息。消费成功后将消息 ACK 掉,如果失败则主动修改可见时间,选择该消息下一次什么时候可见,即由用户自发地控制整个过程。 可观测性 Shaded Logback 因为历史原因,RocketMQ 的老客户端并不是面向 SLF4J 进行编程的,而是面向 logback 的。这么做的目的其实是为了方便快捷地获取日志,不需要让用户自己去手动配置。 RocketMQ 中专门有一个 logging 模块是负责日志部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,两者就会产生各种各样的冲突,这个 logging 模块就是用来保证这一层隔离性的。 但是 logging 模块本身的实现并不是很优雅,也带来了一定的维护成本。因此我们采用了 shade logback 的方式来达到上文提到的隔离性。shaded logback 不仅能够避免用户的 logback 与 RocketMQ 自己的 logback 冲突,还能保持较好的可维护性,将来要想在日志上去做一些修改,也来得容易的多。 具体来说,用户的 logback 会采用 logback.xml 的配置文件,通过 shade logback, RocketMQ 5.0 的客户端会使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已经完全隔离了,同时在 shade 的过程中,还对原生 logback 中使用到的一些环境变量和系统变量也进行了修改,这样就保证了两者的彻底隔离。 另外,使用 shadeed logback 之后,RocketMQ 5.0 客户端中的日志部分就全都是面向 SLF4J 来进行编程的了,这样一来,如果我们未来想让用户自己去完全控制日志的话,提供一个去除 logback 的 SDK 就可以了,非常方便。 Trace 5.0 的消息轨迹基于 OpenTelemetry 模型进行定义与实现,消息发送或接收消息的流程被定义为一个个独立的 span ,这一套 span 规范参照了 OpenTelemetry 关于 Messaging 的定义。图中这里 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命周期,从发送到接收到消费,就可以具象化为这样一个个的 span。 比如,针对 Push Consumer 而言,先会有一个 receive 的 span 来表示从服务端获取消息的过程,收到消息后到会先等待消息被处理,这个也就是 await span 表示的过程,消息被处理则对应图中的 process span,消息消费结束之后,向服务端反馈消息处理结果也会有专门的 span 进行描述。 我们通过 parent 和 link 来讲所有的这些 span 关联起来,这样通过一条消息的任意一个 span,就可以获得这条消息全生命周期的所有 span。 不仅如此,用户还将允许可以设置一个 span context 与自己的业务链路进行关联,将 RocketMQ 5.0 的消息轨迹本身嵌入进自己的全链路可观测系统中去。 Metrics Tracing 相对来说成本是比较高的,因为一条消息从发送到接收,可能会有很多流程,这就伴随着很多的 span,这就导致相对来说,tracing 数据的存储查询成本相对来说比较高。我们希望诊断整个 SDK 的健康状况,同时又不希望收集太多的 tracing 信息提高成本,此时提供一份 metrics 数据就能比较好地满足我们的需求。 在 SDK 的 metrics 中我们新增了诸多指标,包括不限于 Producer 中消息发送延时,Push Consumer 中消息的消费耗时和消息缓存量,可以帮助用户和运维者更快更好地发现异常。 5.0 中 SDK 的 metrics 也是基于 OpenTelemetry 进行实现的。以 Java程序为例,OpenTelemetry 对于 Java 的实现本身提供了一个 agent,agent 在运行时会打点采集 SDK 的一些 tracing/metrics 信息,并将它上报到对应的 metric collector 中,这种通过 agent 来降低无侵入式数据采集的方式被称之为 automatic instrumentation,而手动在代码中实现打点采集的方式则被称之 manual instrumentation。对于 metrics 我们目前还是采用 manual instrumentation 的方式来进行数据的采集和上报的。服务端会告知客户端对应的 collector 的地址,然后客户端将 Metrics 数据上传到对应的 collector 当中去。 _作者介绍:_ _艾阳坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里云智能高级开发工程师。_
作者:艾阳坤
#技术探索 #云原生

2022年8月18日

解析 RocketMQ 业务消息--顺序消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。 简介 顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。 功能原理 在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。 顺序发送 在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。 同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。  因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。 顺序消费 与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。 小结 无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。 我们回到开头那个问题: 既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢? 现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。 下述是一个表格,简要对比了顺序消息和普通消息。 最佳实践 合理设置 MessageGroup MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。 同步发送和发送重试 如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。 消费幂等 消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。 应用案例 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。   电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。   实战 发送 可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下: public class ProducerFifoMessageExample { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class); private ProducerFifoMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "yourFifoTopic"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing. .setTopics(topic) // May throw {@link ClientException} if the producer is not initialized. .build(); // Define your message body. byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey1ff69ada8e0e") // Message group decides the message delivery order. .setMessageGroup("youMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { LOGGER.error("Failed to send message", t); } // Close the producer when you don't need it anymore. producer.close(); } } 消费 消费的代码如下: public class SimpleConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(5); final List messages = consumer.receive(maxMessageNum, invisibleDuration); for (MessageView message : messages) { try { consumer.ack(message); } catch (Throwable t) { LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); } }
作者:绍舒
#技术探索 #功能特性

2022年8月10日

云原生事件驱动引擎(Apache RocketMQ-EventBridge)应用场景与技术解析
在刚刚过去的 RocketMQ Summit 2022 全球开发者峰会上,我们对外正式开源了我们的新产品 RocketMQEventbridge 事件驱动引擎。 RocketMQ 给人最大的印象一直是一个消息引擎。那什么是事件驱动引擎?为什么我们这次要推出事件驱动引擎这个产品?他有哪些应用场景,以及对应的技术方案是什么? 今天我们就一起来看下,整篇文章包含三部分: 第一部分,我们一起看下什么是事件。 第二部分,和大家一起看看,事件有哪些不一样的“超能力”,使用这些“超能力”呢,我们又能干些什么? 第三部分,我们讲一下 RocketMQ 给出的关于事件的解决方案,也是我们这次开源的项目:RocketMQEventBridge。 什么是事件 大家自己可以先在脑袋里想一下,什么是事件?我们给事件下的一个定义是: 过去已经发生的事,尤其是比较重要的事。 A thing that happens, especially one of importance. 这个很好理解。比如说,昨天下午我做了一次核酸检测;今天上午又吃了一个冰激淋。这些都是过去已经发生的事件。但是,如果我再问:事件跟消息有什么区别?这个时候,大家是不是觉得事件这个定义,好像又不那么清晰? 刚才说的那些事件,是不是也可以理解为消息啊?如果,老张给我发送了一条短信,这个算是事件,还是消息啊?平常开发过程中,“什么时候使用消息,什么时候使用事件?” 不过,在回答这个问题之前,我们一起来看一个典型的微服务。 一个微服务系统和外部系统的交互,可以简单分为两部分:一是接收外部请求(就是图中上面黄色的部分);二是是调用外部服务(就是图中下面绿色的部分)。 接收外部请求,我们有两种方式:一种是提供 API,接收外部发过来的 Query 请求和 Command 请求;另外一种是主动订阅外部 Command 消息。这两类操作,进入系统内部之后呢,我们常常还会,调用其他为微服务系统,一起协同处理,来完成一个具体的操作。当这些操作,使得系统状态发生改变时,就会产生事件。 这里呢,我们把从外部接收到的 Command 消息,和系统内部产生的事件,都称之为消息。 我们总结一下,消息和事件的关系是这样的:消息包含两部分,Command 消息和 Event 消息 1、看图中左半部分,Command 是外部系统发送给本系统的一条操作命令; 2、再看图中右半部分,Event 则是本系统收到 Command 操作请求,系统内部发生改变之后,随之而产生了事件; 所以,事件和消息是不同的,事件可以理解为是一种特殊的消息。其特殊的点,主要在 4 个地方: 已发生、且不可变 事件,一定是“已发的”。“已发生”的代表什么呢?不可变的。我们不可能改变过去。这个特性非常重要,在我们处理事件、分析事件的时候,这就意味着,我们绝对可以相信这些事件,只要是收到的事件,一定是系统真实发生过的行为。而且是不可修改。 对比 Command 和 Query。Command 的中文是什么?命令。很显然,它是还没有发生的,只是表达了一种期望。我们知道“期望的”,不一定会成功发生。 比如:把厨房的灯打开、去按下门铃、转给 A 账户 10w…… 这些都是 Command,都是期望发生的行为。但是,最终有没有发生?并不知道。 Event 则是明确已经发生的事情。比如:厨房灯被打开了、有人按了门铃、A 账户收到了 10w…… 再对比 Query,它则是查询系统当前状态的一种请求,比如:厨房的灯是打开着的、门铃正在响、查下账户显示余额 11w…… 无期望的 这个怎么理解?事件是客观的描述一个事物的状态或属性值的变化,但对于如何处理事件本身并没有做任何期望。 相比之下,Command 和 Query 则都是有期望的,他们希望系统做出改变或则返回结果,但是 Event 呢,它只是客观描述系统的一个变化。 我们看一个例子:交通信号灯,从绿灯变成红灯,事件本身并没有要求行人或汽车禁止通行,而是交通法规需要红绿灯,并赋予了其规则。 所以,系统一般不会定向的、单独向另外一个系统发送事件,而是统一的告诉“事件中心”,“事件中心”呢,那里面有各个系统上报上来的,各式各样的事件。系统会向事件中心说明:自己这个系统,会产生哪些事件呀,这些事件的格式是怎么样的呀。 别的系统如果感兴趣呢,就可以来主动订阅这些事件。真正赋予事件价值的,是事件消费者。事件消费者想看看,某个系统发生了什么变化呀?OK,那他就去订阅这些事件,所以事件是消费者驱动的。 这跟消息有什么区别呢?Command 消息的发送和订阅,是双方约定好的,外人不知道,往往是以文档或代码的形式,大家按约定好的协议,发送和订阅消费,所以消息是生产者驱动的。 我们打个比喻,事件就像市场经济,商品被生产出来,具体有什么价值,有多大价值,很大程度上看其消费者。我们能看到系统中各种各样的事件,就像橱窗里摆放了各种各样的商品。而 Command 消息呢,有点像计划经济,一出生就带着很强的目的性,我就是要“分配”给谁消费。 天然有序 事件的第三个特性是:“天然有序”。含义:同一个实体,不能同时发生 A 又发生 B,必有先后关系;如果是,则这两个事件必属于不同的事件类型。 比如:针对同一个交通信号灯,不能既变成绿灯,又变成红灯,同一时刻,只能变成一种状态。 大家可能发现了一点,这里其实隐藏了事件的一个额外属性:因为天然有序,跟时间轴上的某一时刻强绑定,且不能同时发生,所以它一定是唯一的。 如果我们看到了两个内容一样的事件,那么一定是发生了两次,而且一次在前,一次在后。(这对于我们处理数据最终一致性、以及系统行为分析都很有价值:我们看到的,不光光是系统的一个最终结果,而是看到变成这个结果之前的,一系列中间过程) 具像化 事件的第四个特性是:“具象化”的。 事件会尽可能的把“案发现场”完整的记录下来,因为它也不知道消费者会如何使用它,所以它会做到尽量的详尽,比如: ●是由谁产生的事件?Subject ●是什么类型的事件?Type ●是谁发送的事件?Source ●事件的唯一性标志是什么?Id ●什么时候发生?Time ●事件的内容是什么?Data ●事件的内容有哪些信息?Dataschema 我们还是以交通信号灯举例子: 对比我们常见的消息,因为上下游一般是确定的,常常为了性能和传输效率,则会做到尽可能的精简,只要满足“计划经济”指定安排的消费者需求即可。 总结一下,事件上面的 4 个特性,是对事件巨大的一个属性加成,让事件拥有了跟普通消息不一样的“超能力”。使事件,常常被用到 4 个典型场景:事件通知、事件溯源、系统间集成和 CQRS。 下面让我们一个个展开,具体看看这些应用场景。 事件的典型应用场景 事件通知 事件通知是我们系统中很常见的一个场景。比如:用户下单事件通知给支付系统;用户付款事件通知给交易系统。 这里,让我们回到一开始信号灯那个例子。当交通信号灯,从红灯变成绿灯时,可能存在很多系统都需要这个信息。 方式 1:发送方主动调用,适配接收方 一种最简单的方式是,我们依次 call 每个系统,并把信息传递出去。比如:信号灯系统,主动调用地图导航的 API 服务、调用交警中控的 API 服务,调用城市大脑的 API 服务,把红绿灯变化信号发送出去。 但我们都知道,这个设计非常糟糕。尤其当系统越来越多时,这无疑是灾难的,不仅开发成本高,而且其中一个系统出现问题,可能会 hang 住整个服务,则导致调用其他系统都会受到影响。 方式 2:接收方主动订阅,适配发送方 一个很自然的解决方案是,我们将这些信息发送到中间消息服务 Broker,其他系统如果有需要,则主动去订阅这些消息即可。 这个时候,信号灯系统与其他系统并没有直接的调用依赖,交警中控服务、地图导航服务、城市大脑服务,只要按照约定的协议,去订阅信号灯的消息,并解析这些信息即可。 但是,这里同样存在一个问题:这个架构中,是以“信号灯”为中心。消费者需要理解发送者的业务领域,并主动添加适配层,(就是图中白色回旋镖部分),将消息转化为自己业务领域内的语言。但对于每一个微服务来说,他都希望都是高内聚低耦合的。 如果交警中控需要全国的信号灯数据,但是每个地域的消息格式又不一样,这就意味着,交警中控需要适配每一个地域的协议,做一层转换。而且万一后面变化了怎么办?想想就知道这个运维成本有多可怕。 那是否交警中控系统,可以要求全国所有红绿灯系统,都按同一种数据协议给到自己呢?不好意思,这些信号灯数据地图服务也在用,城市大脑也在用,不能更改。 方式 3:引入事件,Borker 根据接收方协议,进行灵活适配 但如果使用事件,就不一样了。因为事件是“无期望的”,“具像化的”,天然的保留了案发现场尽可能多的信息,且更加规范标准,对于消费者(也就是交警中空)来说,可以轻易将不同省份,收集上来的事件,轻易组装成,符合自己业务要求的格式。 而且,这一组装,是在中间层 Broker 发生的。对于交警中控来说,它只需要,按照自己业务领域的设计,提供一个接收事件的 API,然后其他事件,通过 Broker,主动投递到这个 API 上即可。从头到尾,对交警中控系统,没有一行适配外部业务的代码。 所以,这种方式有 3 个明显的优势: 1、只关注自己业务领域本身,不需要做适配外部的代码; 2、所有对系统的变更,收敛到 API,为唯一入口;同一个 API,可能既是用来接收事件的,也可能同时用于控制台操作; 3、因为事件是推送过来的,所以,也不需要像之前一样,引入一个 SDK,和 Broker 发生连接,获取消息,降低了系统的复杂度。 这样,我们一开始的图,就会变成这个样子:交通信号灯产生事件,投递到事件中心,其他需要这些事件的消费者,在事件中心订阅,再由事件中心,按照他们期望的事件格式,主动投递过去。 让我们再来回顾下整个过程: 第 1 幅图:一开始,我们通过强依赖的方式,让信号灯系统,主动将信息发送给各个系统。那这张图里,我们是以各个下游服务为中心,信号灯系统去适配各个下游服务。 第 2 幅图:后来,我们采用传统消息的方式,对调用链路进行了解耦,两边系统不再直接依赖了,但是依旧会存在业务上的依赖。消费者需要去理解生产者的消息格式,并在自己系统内部,进行转换适配。所以,这里其实是以生产者为中心。 第 3 幅图:最后,我们引入了事件通知的方式,对于这种方式,生产者和消费者,他们都只需要关注自己系统本身就可以了。生产者,生产什么样的事件,消费者,消费什么样的数据格式,都各自以自己的业务为中心,不需要为对方做适配。真正做到我们说的高内聚低耦合,实现彻底的完全解耦。 现在,回到我们一开始提到的典型微服务模型,对于有些场景,我们就可以变为下面这种方式:对微服务的变更操作,统一收敛到 API 操作入口,去掉 Command 消息入口。收敛入口,对于我们维护微服务,保障系统稳定性,常常非常有好处的。 事件溯源 事件溯源是什么?事件溯源简单理解就是让系统回到过去任意时刻。那怎么样,才能让系统可以回到过去呢?很简单,首先系统所有发生的变化,都得以事件的方式记录下来;然后,我们就可以通过回放事件的方式,回到过去任何一个时刻。 那为什么只有事件才能做这个事,其他普通消息不行呢?这个还是要回到我们刚才说的几个事件特性:已发生不可变的、天然有序且唯一的、而且是非常详细具体的,完整的记录了事件的案发现场。所以,对于事件溯源这个场景,事件可以说是系统的一等一的公民。 举个例子:比如说,如果我们能够完整地收集路上的各种事件信息,包括信号灯、车量、天气、拥堵路况等等,那么,我们就可以“穿越时间”,回到交通现场,重新做一次决策。比如,在智慧交通场景,当我们想去验证一个调度算法的时候,我们就可以回放当时发生的所有事件,来重现现场。 大家可能觉得这个很神奇,但是,其实我们平常一直有接触,大家知道是什么吗?就是我们常用的代码版本管理系统,比如:github。 这里有大家可能会问,如果一个系统积赞了很多事件,想重放是不是得很久?比如在一些交易场景,每天都会产生大量的事件,那应该怎么处理呢?这里呢,系统一般每天晚上都会打一份快照。如果系统意外宕机,想回到某一个时刻,就可以把前一天的快照取出,然后再重新跑下当天的事件,即可恢复。而白天呢,所有的事件都是在内存中进行处理,不会跟数据库交互,所以系统性能非常快,只有事件会落盘。 当然,事件溯源也不是适合所有场景,它有优点也有缺点,详细看上图。 系统间集成 刚才讲的第1个场景:事件通知,一般涉及到两个上下游团队的协作开发;讲的第 2 个场景:事件溯源,则一般是 1 个团队内的开发;但系统间集成,则往往面对的是三个业务团队的协作开发。这个怎么理解呢? 其实这个也很常见:比如公司里购买了 ERP 系统,同时也购买了外部考勤系统、外部营销系统服务等等。这些系统都有一个共同点,是什么?都不是我们自己开发的,是而买来的。 如果我们想把 ERP 系统的人员信息,实时且自动同步到考勤系统中去怎么办?其实这个是有点麻烦的,因为这些都不是我们自己开发的。 1、我们不能修改 ERP 系统的代码,主动去调用考勤系统,把人员变更信息发送过去; 2、也不能修改考情系统的代码,主动去调用外部 ERP 系统的 API; 但是我们可以通过事件总线,借助 webhook 或则标准 API 等等方式,收集上游的 ERP 系统产生的人员变更事件,然后进行过滤和转换,推送到下游考勤系统中去,当然,这里也可以是内部自研服务。 所以,现在的研发模式变成了:事件中心管理了所有 SaaS 服务,包括内部自研系统产生的所有事件。然后呢,我们只需要在事件中心,寻找我们需要的事件,进行订阅,对 SaaS 服务和内部自研系统,进行简单服务编排,即可完成开发。 CQRS CQRS 中的 C 代表 Command,Command 什么意思?就是明令,一般包含:Create/Update/Delete,Q 代表 Query,是指查询。所以 CQRS 本质是读写分离:所有的写操作,在图中左边的系统中完成,然后将系统因为 Command 产生变化的事件,同步到右边的查询系统。 这里同学可能有疑问,这跟数据库的读写分离有什么区别?数据库读写分离也是提供一个写的 DB,一个读的 DB,两边做同步。对吧… 那这里很大的一个区别是:对于数据库的读写分离,是以数据库为中心,两边的数据库是一模一样的,甚至数据的存储结构也是一模一样的。 但是对于 CQRS 的读写分离场景,是以业务为中心,两边存储的数据结构格式,往往是不一样的,甚至数据库都不是同一种。完全围绕各自的读写业务逻辑,设计最佳技术选型。对于写场景,为了保障事务,我们可能使用关系性数据库;对于读的场景,我们为了提高性能,我们可能会使用 Redis、HBase 等 Nosql 数据库。 当然 CQRS 也不是适合所有场景,他往往比较适合: ●希望同时满足高并发的写、高并发的读; ●写模型和读模型差别比较大时; ●读/写比非常高时; 我们刚才讲了事件的 4 个应用场景,但是,事件不是万能的,就像软件研发也没有银弹,有很多场景也并不适合使用事件。包括: 1. 强依赖 Response 的同步调用场景; 2. 要求服务调用保持事务强一致性的场景。 RocketMQ 关于事件的解决方案 需要什么样的能力? 首先,按照之前讲到的事件应用场景,我们整理下,如果我们做好事件驱动这块,我们的系统,需要具备什么样的能力呢? 第一,我们肯定得有一个事件标准,对吧…因为,事件不是给自己看的,也不是给他看的,而是给所有人看的。刚才,我们也讲到事件是无期望的,它没有明确的消费者,所有都是潜在的消费者,所以,我们得规范化事件的定义,让所有人都能看得懂,一目了然。 第二,我们得有一个事件中心,事件中心里面有所有系统,注册上来的各种事件,(这个跟消息不一样,我们没有消息中心,因为消息一般是定向的,是生产者和消费者约定的,有点像计划经济,消息生产出来的时候,带着很强的目的性,是给谁谁消费的。而事件有点像市场经济,事件中心呢,)这个有点类似市场经济大卖场,玲琅满目,里面分类摆放了各种各样的事件,所有人即使不买,也都可以进来瞧一瞧,看一看,有哪些事件,可能是我需要的,那就可以买回去。 第三,我们得有一个事件格式,用来描述事件的具体内容。这相当于市场经济的一个买卖契约。生产者发送的事件格式是什么,得确定下来,不能总是变;消费者以什么格式接收事件也得确定下来,不然整个市场就乱套了。 第四,我们得给消费者一个,把投递事件到目标端的能力。并且投递前,可以对事件进行过滤和转换,让它可以适配目标端 API 接收参数的格式,我们把这个过程呢,统一叫做订阅规则。 第五,我们还得有一个存储事件的地方,就是最中间的事件总线。 事件标准 关于刚才提到的第一点事件标准,我们选取了 CNCF 旗下的开源项目 CloudEvents,目前已被广泛集成,算是一个事实上的标准。 它的协议也很简单,主要规范了 4 个必选字段:id,source、type、specversion;以及多个可选字段:subject、time、dataschema、datacontenttype和data。上图右边,我们有一个简单的例子,大家可以看下,这里就不具体展开了。 另外,事件的传输也需要定义一种协议,方便不同系统之间的沟通,默认支持三种 HTTP 的传输方式:Binary Content Mode、Structured Content Mode 和 Batched Content Mode。通过 HTTP 的 ContentType,就可以区分这三种不同的模式。其中前两种,都是传递单个事件;第三种则是传递批量事件。 事件 Schema 事件的 Schema,用来描述事件中有哪些属性、对应的含义、约束等等信息。目前我们选取了 Json Schema. 和 OpenAPI 3.0,根据事件的 Schema 描述,我们可以对事件进行合法性校验。,当然 Schema 本身的修改,也需要符合兼容性原则,这里不作具体展开。 事件过滤和转换 关于事件的过滤和转换,我们提供了 7 种事件过滤方式和 4 种事件转换方式,详细可以下图描述: 技术架构 我们 RocketMQ 围绕事件驱动推出的产品,叫做 EventBridge,也是我们这次要开源的新产品。 他的整个架构可以分为两部分:上面是我们的控制面、下面是我们的数据面。 控制面中最上面的 EventSource 是各个系统注册上来的事件源,这些事件可以通过 APIGateway 发送事件到事件总线,也可以通过配置的 EventSource,生成 SouceRuner,主动从我们的系统中,去拉取事件。事件到达事件总线 EventBus 之后,我们就可以配置订阅规则了 EventRule,在规则 EventRule 里我们设置了事件怎么过滤,以及投递到目标端前,做哪些转换。系统基于创建的规则会生成 TargetRunner,就可以将事件推送到指定的目标端。 那这里 SouceRuner 和 TargetRunner 是什么呢?我们具体能对接哪些上下游 Source 和 Target? 这些我们都可以在下面的 SourceRegister 和 TargetRegister 提前进行注册。 所以 EventBridge 的数据面是一个开放的架构,他定义了事件处理的SPI,底下可以有多种实现。比如,我们把 RocketMQ 的 HTTPConnector 注册到 EventBridge 中,那我们就可以把事件推送到 HTTP 服务端。 如果我们把 Kafka 的 JDBC Connector 注册到 EventBridge 中,我们就可以把事件推送到数据库。 当然,如果你的系统不是通用的像 HTTP/JDPC 等协议,也可以开发自己的 Connector,这样就能将事件实时同步到 EventBridge,或则接收来自 EventBridge 的事件。 除此之外,我们还会有一些附加的运维能力,包括:事件追踪、事件回放、事件分析、事件归档。 RocketMQEventBridge 与云上 在所有开源的,与其他上下游系统做集成的 Connector 当中,我们有一个特殊的 Connector,叫:EventBridgeConnector,通过它可以方便的和阿里云云上的事件总线进行集成。这里有两个典型的应用场景: 第一个场景是:IDC 系统内部产生的事件,不仅可以用来做内部系统间的解耦,还可以实时同步到云上,驱动云上的一些计算服务,比如通过云上 Maxcompute 对内部产生的事件进行离线分析,或则驱动云上的图像识别服务,实时分析事件中标注的图片。 第二个场景是:如果 IDC 内部使用到了自建 MQ,我们同样可以通过 MQConnector 和 EventBridgeConnector,实时同步事件到云上,逐步将内部自建 MQ,迁移到云上MQ。 生态发展 关于 EventBridge 的未来方向,我们希望是在开源,构建一个支持多云架构的事件总线生态。这个怎么理解?简单来说,我们希望在不同云厂商之间,包括云厂商和内部 IDC 系统之间,可以通过事件,来打破围墙,实现互通。虽然,这几年云计算发展很快,但是对于一些特别大的客户来讲,有时候并不希望跟某家云厂商强绑定。这不光是市场充分竞争的结果,也是大客户一种降低风险的手段。所以,这个时候,如何在不同云厂商之间,包括云厂商系统和自己内部 IDC 系统之间,灵活的交互,甚至灵活的迁移,是企业非常重要的一个诉求。 当然,实现这个是有一定难度的。不过如果我们在进行企业架构设计的时候,是基于事件驱动架构进行设计开发——不同系统之间的交互,围绕事件展开,就会容易很多。 事件,在这里,就好比一种通用语言,通过这个通用语言,就可以实现和不同系统之间的沟通交流。比如:用 IDC 系统内部的事件,去驱动阿里云上服务;甚至用阿里云上的事件,去驱动 AWS 上的服务运行; 为了实现这个目标,我们在和不同云厂商,不同 SaaS 系统服务商,进行系统间集成的时候,需要开发与之对应的连接器。 也欢迎大家,一起来共建 RocketMQEventBridge 的生态。 源码地址:
作者:静罗
#技术探索 #强力推荐

2022年8月3日

解析 RocketMQ 业务消息——事务消息
引言:在分布式系统调用场景中存在这样一个通用问题,即在执行一个核心业务逻辑的同时,还需要调用多个下游做业务处理,而且要求多个下游业务和当前核心业务必须同时成功或者同时失败,进而避免部分成功和失败的不一致情况出现。简单来说,消息队列中的“事务”,主要解决的是消息生产者和消费者的数据一致性问题。本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。 点击下方链接,查看视频讲解: 场景:为什么需要事务消息 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括: 主分支订单系统状态更新:由未支付变更为支付成功; 物流系统状态新增:新增待发货物流记录,创建订单物流记录; 积分系统状态变更:变更用户积分,更新用户积分表; 购物车系统状态变更:清空购物车,更新用户购物车记录。  分布式系统调用的特点是:一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。 传统 XA 事务方案:性能不足 为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务,基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。 基于普通消息方案:一致性保障困难 将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。 该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如: 消息发送成功,订单没有执行成功,需要回滚整个事务; 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致; 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。 基于RocketMQ分布式事务消息:支持最终一致性 上述普通消息方案中,普通消息和订单事务无法保证一致的本质原因是普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于消息队列 RocketMQ 版实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 消息队列 RocketMQ 版事务消息的方案,具备高性能、可扩展、业务开发简单的优势。 基本原理 概念介绍 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;  半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;  消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。 事务消息生命周期 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态;  事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;  消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;  提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;  消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试;  消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。  消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。 事务消息基本流程 事务消息交互流程如下图所示: 1. 生产者将消息发送至 RocketMQ 服务端;  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息;  3. 生产者开始执行本地事务逻辑;  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下: 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者; 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;   6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。 实现细节:RocketMQ 事务消息如何实现 根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。 处理 Half 消息 发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图: 具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。 Commit 或 Rollback 命令处理 发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图: Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费;  Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。  具体实现在 EndTransactionProcessor 中。 事务消息 check 如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图: 事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。 另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。 具体实现参考: TransactionalMessageServiceImplcheck 方法。 实战:使用事务消息 了解了 RocketMQ 事务消息的原理后,我们看下如何使用事务。首先,我们需要创建一个 “事务消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 事务消息相比普通消息发送时需要修改以下几点: 发送事务消息前,需要开启事务并关联本地的事务执行。 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。  当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意: 1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟; 2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端; 3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。 
作者:合伯
#技术探索 #功能特性

2022年7月27日

RocketMQ 消息集成:多类型业务消息——定时消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。 本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。 点击下方链接,查看直播讲解: 概念:什么是定时消息 在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。 相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。 对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。 而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。 场景:为什么需要使用定时消息 在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求: 高性能吞吐:需要大量事件触发,不能有性能瓶颈。 高可靠可重试:不能丢失事件触发。 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。  传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。 这类方案虽然可以实现定时调度,但往往存在很多不足之处: 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。 在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。 案例:使用定时消息实现金融支付超时需求 利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。 基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。 原理:RocketMQ 定时消息如何实现 固定间隔定时消息 如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。 Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。 当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。 时间轮算法 在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。 如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。 现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间, 那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。 任意秒级定时消息 在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。 TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下: 对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。 TimerWheel 和 TimerLog 直接的关系如下图所示: TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。 当需要新增一条记录的时候,例如现在我们要新增一个 “14”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “13”,然后修改 lastPos 指向 “14”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。 有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。 首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去 然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮 对于入时间轮: TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理 TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中  对于出时间轮: TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息 TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。  实战:使用定时消息 了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。 当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。
作者:凯易、明锻
#技术探索 #功能特性

2022年7月7日

RocketMQ 消息集成:多类型业务消息-普通消息
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备   1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _[1] DTS:_ _[2] Debezium:_ _[3] Canal:_ _[4] Maxwell:_ _[5] DTS 数据订阅:_ _[6] 事件总线:_ _[7] 事件流:_ _[8] SUBSCRIBE 消费模式:_
作者:昶风
#技术探索 #功能特性