2022年11月9日

消息队列Apache RocketMQ 5.0:从消息服务到云原生事件流平台
前言 回顾 RocketMQ 的发展历程,至今已十年有余。2022 年 RocketMQ 5.0 正式发布,全面迈进云原生时代。 11 月 5 日,2022 杭州 · 云栖大会上,阿里云智能高级产品专家杨秋弟在云原生峰会上发表主题演讲,发布消息队列 RocketMQ 5.0:从消息服务到云原生事件流处理平台。 阿里云智能高级产品专家&Apache RocketMQ 联合创始人 杨秋弟 Apache RocketMQ 发展史 回顾 Apache RocketMQ 过去十年的发展历程,可分为“诞生于互联网”与“成长于云计算”两大阶段。 第一个阶段是 RocketMQ 的从 0 到 1,在阿里内部规模化落地。2012 年,为了支撑超大规模电商互联网架构,阿里中间件研发了 RocketMQ,并在产品诞生初期开源,2017 年 RocketMQ 统一了阿里消息技术体系。 第二个阶段是云计算。2015 年 RocketMQ 上云,这也是业界首个提供公共云 SaaS 形态的开源消息队列;2016 年,阿里把 RocketMQ 捐赠给 Apache,2017 年孵化毕业,成为国内首个 TLP 的互联网中间件。 十年磨一剑,出鞘必锋芒。在这十年的过程中,通过集团打磨稳定性,依托云计算孵化创新,开源共建加速标准化建立与生态连接,RocketMQ 始终坚持开源、集团、商业三位一体的发展思路,内核演进和产品迭代齐头并进。2022 年 RocketMQ 5.0 正式发布宣告着全面迈进云原生时代。 RocketMQ 5.0:从消息服务到云原生事件流平台 回顾过去这十年,RocketMQ 服务于集团几乎所有的业务,在阿里云上更是累计服务了 10 万余家企业客户,覆盖互联网、零售、金融、汽车等 20 多个行业,大规模的生产实践持续累积产品的核心优势。 多样性。企业级应用复杂的业务诉求,催生 RocketMQ 提供丰富的消息类型,比如定时消息、事务消息、顺序消息等等。此外,也提供了像消息轨迹、消息回溯等一系列的消息治理能力。 一致性。无论是淘宝交易还是蚂蚁支付都天然对数据一致性有着极高的要求,RocketMQ 提供的分布式事务消息是业内第一个提供该特性的消息产品,将异步解耦与数据一致性完美融合,是金融客户中不可或缺的产品能力。 稳定性。稳定性是产品的根基,更是一个系统工程,RocketMQ 长期在电商和金融领域中打磨,不仅提供最高达 99.99% SLA,更是帮助客户提供全方位的健康巡检与故障排查能力,如消息轨迹、消息回溯、死信机制等等,提供多样化的稳定性兜底手段。 高性能。在双十一的极限流量下,RocketMQ 具备无限扩展能力,支持千万级并发与万亿级消息洪峰,P9999 写延迟在 1ms 内,100%在 100ms 内。 可以说,在消息领域,尤其在业务消息领域,RocketMQ 在国内已经做到顶尖,成为企业客户的首选。 而随着云原生以及数字化时代的到来,RocketMQ 也在快速的发生着变化,那么变化主要体现在哪些方面呢? 首先,全面拥抱云原生。向下,消息系统自身实现云原生架构的演进,充分释放云基础设施的池化能力,全方位提高消息的核心技术指标。向上,消息产品形态持续演进,成为云原生应用架构的核心引擎。比如微服务、事件驱动、Serverless 等现代化应用架构。 其次,全面拥抱实时数据。企业的数字化转型从原来的业务数字化迈向了数字业务化。对业务数据的实时洞察、实时决策成为指导业务成功的关键要素。消息队列也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,从而实现事务分析一体化。 随着 5.0 的发布,RocketMQ也正式从消息服务升级到云原生事件流处理平台。 RocketMQ 5.0:云原生架构升级 首先来看 RocketMQ 自身的云原生架构演进。从下面的全景图可以看出,RocketMQ 从客户端到服务端都进行了全方位的改造,具体体现在以下几个方面: 轻量化。RocketMQ 4.0 诞生于 2012 年的淘宝电商,当时大部分的业务还跑在物理机上,单节点计算能力强,客户端节点数少且较为稳定,因此,富客户端的接入方式不仅更加高效,更可以提供诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但这种模式在云原生时代发生了改变,轻量客户端更容易被云原生技术栈所集成。因此,RocketMQ 5.0 客户端采用轻量 SDK 设计理念,将原来富客户端的逻辑下沉到服务端,满足现代化应用轻量化、Serverless 化以及 Mesh 化的趋势,更容易被集成;同时也正是因为轻量化,使得 SDK 多语言开发成本低了很多,快速覆盖当下主流的多语言版本。 弹性。存算分离架构让无状态计算节点可以快速伸缩,而分级存储以及冷热分离架构更是让消息存储具备更强的弹性能力。 高可用。基于全新的 Leaderless 架构,去 ZK 依赖的同时,可以做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移;面向云的多可用区、多地域组建全局高可用能力。 基础设施云原生化。RocketMQ 整体架构走向 Kubernetes 化,拥抱 OpenTelemetry,依托于阿里云提供的 ARMS、Prometheus 以及Grafana 实现可观测能力的云原生化。 而 RocketMQ 5.0 本次的升级,除了在技术架构云原生化之外,在产品能力以及成本优化方面都有着重大的提升,我们来逐一分解。 轻量无状态消费模型 RocketMQ 4.0 采用按队列消费模型,消费者完全按照队列负载均衡,非常适合批量拉取快速消费,对单一消息状态不敏感的场景,比如流计算。然而在业务消息领域,尤其是金融场景以及事件驱动架构之下,每一条消息状态都是极为重要的。再加上不同业务类型的消息处理耗时也是各不相同,从毫秒级、到秒级甚至到分钟级,队列的负载不均衡或者短暂的 Block 都可能会引发消息的局部堆积,从而影响到最终用户的体验。因此,RocketMQ 5.0 全新推出按消息负载的轻量无状态消费模型,通过 PoP 机制巧妙地在队列模型之上构建了消息模型,业务只需要关心消息而无需关心队列,所有 API 都能够支持单条消息级别控制,如消息的消费、重试、删除等。而基于消息消费模型,客户端、连接和消费都是无状态的,可在任意 Proxy 节点上飘移,真正做到轻量化。 RocketMQ 5.0 提供按队列消费模型以及按消息消费模型,更好的满足事件与流的业务场景,正可谓鱼与熊掌兼得。 海量消息分级存储 RocketMQ 5.0 的另一个重大升级则是海量消息的分级存储。对消息队列了解的同学都知道,消息通常都是流动的短时间的存储,业内大部分消息产品对消息的保留时间都比较优先,3 天,7 天,最长 15 天不等。有限的存储空间使不仅限制了消息的保留时长,更在某些场景下可能会导致业务资损,比如在消息还没有被消费的时候,因为磁盘空间不足或者消息过期而被清除,这在金融等领域都是不可接受的。所以,RocketMQ 一直想要解决这样的问题,让存储变得更有弹性。 RocketMQ 5.0 基于ESSD、对象存储打造冷热分离以及分级存储架构,提供低成本的无限存储能力,确保消息不会因为本地磁盘空间不足而提前被清除,造成业务资损。我们提供消息存储的 Serverless,客户只需按实际存储使用量付费,而无需预购存储空间。 此外,流量削峰是消息队列极为常见的场景,而由此带来的海量消息堆积能力以及在堆积情况下的性能稳定性成为衡量产品性能的核心指标。RocketMQ 5.0 基于冷热数据分离架构进一步做到读写隔离,避免在堆积的场景下影响热数据的写入性能。分级存储的冷数据碎片规整能力更是提高了冷数据的读取性能,为用户提供一致的冷读 SLA。 售卖系列全线升级,最高降本 50% 从前面的介绍我们已经了解到,RocketMQ 5.0 在技术架构以及产品能力上都有着明显提升。 而 5.0 推出全新的售卖形态与计费策略相比 4.0 更简单、更灵活也更为普惠。实例的综合成本最高可降低 50%。接入门槛最低可降至 390 元/月,远低于自建成本。消息存储支持 Serverless 弹性能力,按需付费可大幅降低闲置成本。结合冷热分离的多级存储能力,相比开源自建可降低 67%,大幅降低消息队列的使用成本。 EventBridge:云上事件枢纽 事件驱动是一个起源很早的概念,早在几十年前,无论是操作系统内核的设计、还是客户端编程框架都大量采用了事件驱动技术。RocketMQ PushConsumer 提供的 API 其实就是一种事件驱动的编程范式,但在微服务应用架构当中,集成与通信才是刚需,事件驱动的价值并没有那么明显的体现。 而随着云原生时代的到来,计算力的构成越来越多样化。作为云原生的代表技术,Serverless 架构范式也是事件驱动的。无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,云产品事件,如 OSS 文件上传触发用户基于函数进行文件加工处理;用户业务事件,如 RocketMQ 触发函数运行消费逻辑处理等等。 以事件驱动为核心理念,阿里云推出了 EventBridge 产品,其使命就是打造云上的事件枢纽。通过EventBridge 可以兑现四大业务价值: 统一事件枢纽。阿里云从 IaaS、PaaS到第三方的 SaaS,每天都有数以亿计的事件产生,但却没有一种简单和统一的方式来触达这些事件;这些事件散落在各个地方形成『事件孤岛』,很难挖掘出有用的业务价值。只有充分发挥数据的规模效应,建立起数据之间的血缘关系,我们才能更好的发掘出数据的价值;所以 EventBridge 首要任务便是统一阿里云上的事件规范,拥抱CloudEvents 事件标准,打造阿里云统一的事件枢纽。 事件驱动引擎。当 EventBridge 连接了海量的事件源后,基于 RocketMQ 毫秒级的事件触发能力,必将加速企业 EDA/Serverless 的架构升级。 开放与集成。EventBridge 提供丰富的跨云、跨平台、跨产品、跨地域以及跨账号的连接能力,能够促进云产品、应用程序、SaaS 服务的相互集成。 低代码。EventBridge 借助Serverless 的应用中心,通过简单的规则匹配以及丰富的模板,即可实现事件的分发、过滤、转换等处理,进一步提升事件驱动的效率。 让消息无处不在,让事件无所不及 依托于 EventBridge、RocketMQ 以及函数计算 FC 的强强联合,目前 EventBridge 的事件生态已初具规模。 在云产品事件集成方面,目前已经集成 200+云产品事件源,3000 多种事件类型。 在数据集成与处理方面,EventBridge 与微服务应用、大数据、IoT 等场景深度集成。比如与消息生态的融合,阿里云 6 款消息队列产品通过 EventBridge 实现消息数据的互联互通,并且通过 EventBridge 的全球事件网络赋予消息全球消息路由的能力,同时也可以通过EventBridge 提供的丰富的模板,实现 ETL 数据处理能力。 在 SaaS 应用集成方面,包括钉钉、聚石塔以及云上 50 多个 SaaS 服务都可以通过 EventBridgehook 方式连接到 EventBridge。 在事件触发方面,EventBridge 目前已经触达 10 多个事件目标,海量的事件源都可以通过 EventBridge 触发包括 FC/SAE 等在内的 10 多款事件目标云产品。除此之外,目前 EventBridge 已经对接了阿里云全量的云产品 API,任何一个事件都可以通过云产品 API 的方式进行触达。 未来还有会更多的事件源以及事件目标接入到 EventBridge 上来。 RocketMQ Streams:轻量级计算的新选择 正如开篇所提到的,基于云原生架构的全面升级,RocketMQ 5.0 也将从在线业务架构的基础设施,延伸到实时数据架构的基础设施,实现事务分析一体化。将 RocketMQ Streams 打造成为轻量级计算的新选择。 业内最常见如 Flink、Spark 等大数据框架大多采用中心化的 MasterSlave 架构,依赖和部署比较重,每个任务的执行都需要很大的开销,有较高的使用成本。而与之不同的是,RocketMQ Streams 着重打造轻资源,高性能的轻量计算引擎,无额外依赖,最低1core,1g 即可完成部署,适用于大数据量、高过滤、轻窗口计算的场景,在资源敏感型场景,如消息队列流计算、安全风控,边缘计算等,RocketMQ Streams 具有有很大优势。阿里云消息团队与安全团队合作,通过对过滤场景做大量优化,性能提升 35 倍,资源节省 50%80%。 目前,RocketMQ Streams 已经在开源社区发布,未来计划在 2023 年 4 月在阿里云完成商业化。 RocketMQ 这十年,我们一同向前 RocketMQ历经十余年的打磨,已经取得了众多成果。全球拥有 700+的贡献者,1.8w Star 数,超过 80%的主流云厂商提供了 RocketMQ 的商业托管服务,Apache RocketMQ 社区始终保持着极高的活跃度,因此,也荣获了科创中国“开源创新榜”,中日韩开源软件优秀技术奖等十多个国内外开源奖项。 而阿里云作为 RocketMQ 的起源和核心贡献者,不仅 100%覆盖全集团业务,承载千万级并发万亿级消息洪峰。十多年以来更是累计服务 10w+万企业客户,覆盖互联网、零售、汽车等 20 多个行业,超过 75%的头部企业选择使用 RocketMQ。期望阿里云的消息队列 RocketMQ 可以成为广大企业客户的心之所选。也诚邀更广大的开发者来积极参与RocketMQ 的开源社区建设,一起将 RocketMQ 打造为消息领域的引领者。
#社区动态

2022年10月31日

RocketMQ 重试机制详解及最佳实践
引言 本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。 RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。 生产者发送重试 RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢? 1. 生产者重试次数 RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。  2. 生产者重试间隔 在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略: 非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔。 流控错误场景:系统会按照预设的指数退避策略进行延迟重试。 为什么要引入退避和随机抖动?  如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connectionbackoff.md。 INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒; MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6; JITTER :随机抖动因子,默认值:0.2; MAX_BACKOFF :等待间隔时间上限,默认值:120 秒; MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。 ConnectWithBackoff() current_backoff = INITIAL_BACKOFF current_deadline = now() + INITIAL_BACKOFF while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff MULTIPLIER, MAX_BACKOFF) current_deadline = now() + current_backoff + UniformRandom(JITTER current_backoff, JITTER current_backoff) 特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。 3. 重试带来的副作用 不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。  较多的重试次数也会增大服务端的处理压力。  4. 用户的最佳实践是什么 1)合理设置发送超时时间,发送的最大次数 发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。 2)如何保证发送消息不丢失 由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种: 1. 向调用方返回业务处理失败; 2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。  3)关注流控异常导致无法重试 触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。 4)早期版本客户端如何使用故障延迟机制进行发送重试? 对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用: producer.setSendLatencyFaultEnable(true) 配置重试次数使用: producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed() 消费者消费重试 消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢? RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题: 如何保证业务完整处理消息? 消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。 业务应用异常时处理中的消息状态如何恢复? 当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。 1. 什么是消费重试? 什么时候认为消费失败? 消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败。  消费重试是什么? 消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;  重试过程状态机:消息在重试流程中的状态和变化逻辑;  重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;  最大重试次数:消息可被重试消费的最大次数。   2. 消息重试的场景 需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。 推荐使用场景: 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功; 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。   正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。 错误使用场景: 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。  反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。 消费处理中使用消费失败来做处理速率限流,是不合理的。 限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。 这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面: RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力; 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。  3. 不要以重试替代限流 上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。 如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。 RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置订阅组名称 .setConsumerGroup(consumerGroup) // 设置订阅的过滤器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // 阻塞直到获得一个令牌,也可以配置一个超时时间 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); 4. PushConsumer 消费重试策略 PushConsumer 消费消息时,消息的几个主要状态如下: Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费; Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态; Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机; DLQ:死信状态 消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。 该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。 最大重试次数   PushConsumer 的最大重试次数由创建时决定。 例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。 重试间隔时间 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下: 说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。 顺序消息:重试间隔为固定时间,默认为 3 秒。  5. SimpleConsumer 消费重试策略 和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。 例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。 修改消息不可见时间需要满足以下条件: 消息处理未超时 消息处理未提交消费状态  如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。 最大重试次数 与 PushConsumer 相同。 消息重试间隔   消息重试间隔 = 不可见时间 - 消息实际处理时长 例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。 SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。 ClientServiceProvider provider1 = ClientServiceProvider.loadService(); String topic1 = "Your Topic"; FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG); SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder() //设置消费者分组。 .setConsumerGroup("Your ConsumerGroup") //设置接入点。 .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build()) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); List messageViewList = null; try { //SimpleConsumer需要主动获取消息,并处理。 messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 //没有ack会被认为消费失败 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); } 修改消息的不可见时间   案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。 simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync(); 6. 功能约束与最佳实践 设置消费的最大超时时间和次数   尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。 不要用重试机制来进行业务限流  错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计  正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。 总结 本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。 重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。
作者: 斜阳
#行业实践 #功能特性

2022年10月24日

EventBridge 生态实践:融合 SLS 构建一体化日志服务
引言 阿里云日志服务 SLS 是一款优秀的日志服务产品,提供一站式地数据采集、加工、查询与分析、可视化、告警、消费与投递等服务。对于使用 SLS 的用户业务而言,SLS 上存储的日志信息反映着业务的运行状态,通过适当地流转加工即可创建一定价值。 另一方面,阿里云 EventBridge 作为云上事件枢纽,每天承载着大量事件的流转。云上资源的操作事件、消息队列中的数据、用户业务中的自定义事件等,是否有一站式的配置工具来将这些数据统一收敛到 SLS,进而使用 SLS 强大的加工、分析能力也是一个具有价值的问题。 为了支持上述日志、数据流入流出 SLS 的场景,阿里云 EventBridge 在近期支持了 SLS 能力。用户在 EventBridge 上通过简单地配置,即可实现数据写入 SLS 和将 SLS 中日志路由到不同的 EventBridge 目标端。EventBridge 对 SLS 的支持是全面的,用户既可以在事件总线中使用 SLS,也可以在事件流中使用。本文将从 SLS 在 EventBridge上 的使用以及若干最佳实践场景等方面,为大家介绍如何基于 EventBridge 构建 SLS 相关应用。 基于 EventBridge 使用 SLS 阿里云 SLS 日志服务 SLS[1] 是一款云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。 SLS 在 EventBridge 上的应用 阿里云 EventBridge 提供了事件总线[2]与事件流[3]两款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件经事件规则路由到多个目标。而事件流则更轻量化,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便,适用于端到端的流式数据处理场景。SLS 目前对事件总线与事件流均已支持。 针对 SLS 事件源,EventBridge 会构造一个 SLS source connector,其会实时地从 SLS 服务端拉取日志。数据拉取到 EventBridge 后,会进行一定的结构封装,保留用户日志、SLS 系统参数等数据,同时增加 event 所需要的一些系统属性。 SLS Event 样例可参考如下示例。 data 部分代表用户日志内容,其中以“__”开头和结尾的字段表示日志项的 SLS 系统属性。 { "datacontenttype": "application/json;charset=utf8", "aliyunaccountid": "1756789", "data": { "key1": "value1", "key2": "value2", "__topic__": "TopicCategory", "__source__": "SourceCategory", "__client_ip__": "122.231..", "__receive_time__": "1663487595", "__pack_id__": "59b662b2257796280" }, "subject": "acs:log:cnqingdao:1756789:project/demoproject/logstore/logstore1", "aliyunoriginalaccountid": "1756789", "source": "testSLS", "type": "sls:connector", "aliyunpublishtime": "20220918T07:53:15.387Z", "specversion": "1.0", "aliyuneventbusname": "demoBus", "id": "demoprojectlogstore11MTY2MzExODM5ODY4NjAxOTQyMw==0", "time": "20220918T07:53:12Z", "aliyunregionid": "cnqingdao", "aliyunpublishaddr": "10.50.132.112" } 针对 SLS 事件目标,EventBridge 使用 logProducer 将 event 整体作为一个字段投递到 SLS,字段 key 名称为“content”。 使用介绍 SLS 事件源   在使用 SLS 作为事件源时(这里包含了事件总线中的事件源和事件流中的事件源),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) 起始消费位点 调用角色  在创建 SLS 事件源时,EventBridge 会自动在对应 LogStore 下创建一个以“eventbridge”开头的消费组,事件源或事件流被删除时,对应消费组资源也会被清理。 日志项目与日志库参数,用户根据已创建的 Project 和 LogStore 去填写即可。 起始消费位点参数指定了新任务启动时的初始消费位点。这里可以选择“最早位点”、“最新位点”与“指定时间”。“最早位点”即从当前 LogStore 中最早的日志开始消费,会导致大量历史日志被读取,建议结合业务谨慎选择;“最新位点”则表示消费对应 EventBridge 任务启动后的日志;“指定时间”需要用户填写时间戳(以秒为单位),消费从此时刻开始的日志。 针对调用角色,其实是允许 EventBridge 以这个角色的身份去调用读取用户 SLS 日志。用户需要创建一个自定义角色,并将其授信给事件总线 EventBridge。角色的权限方面则可以按照需要去进行设置,在权限最小的原则基础上,权限策略提供的角色应保证事件总线 EventBridge 可以读取对应 LogStore 日志与消费组的增删操作,至少赋予角色 LogStore 消费权限与消费组的增删操作。参考示例: { "Version": "1", "Statement": [ { "Action": [ "log:ListShards", "log:GetCursorOrData", "log:GetConsumerGroupCheckPoint", "log:UpdateConsumerGroup", "log:ConsumerGroupHeartBeat", "log:ConsumerGroupUpdateCheckPoint", "log:ListConsumerGroup", "log:CreateConsumerGroup", "log:DeleteConsumerGroup" ], "Resource": [ "acs:log:::project//logstore/", "acs:log:::project//logstore//" ], "Effect": "Allow" } ] } SLS 事件目标   在使用 SLS 作为事件目标时(这里包含了事件总线中的事件目标和事件流中的事件目标),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) Topic 调用角色  日志项目、日志库参数含义同 SLS 事件源。Topic 即 SLS 日志主题,用户可以根据需要进行设置,非必填内容。 在创建 SLS 事件目标时,确保使用的调用角色有写入给定日志库权限即可。参考示例: { "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:::project//logstore/" ] } ] } 使用示例 SLS 事件源和事件目标,其事件总线与事件流的参数配置相同,这里示例了如何创建  SLS 事件源和事件目标的 EventBridge 事件流。 前期准备   1. 开通 EventBridge 服务; 2. 开通 SLS 服务并创建 Project 与 Store。 创建 SLS 事件源   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“日志服务 SLS”; 4. 在“日志服务 SLS”一栏中选配置 SLS Project、LogStore、起始消费位点与角色配置。 创建 SLS 事件目标   1. 在创建事件流的事件目标时,服务类型选择“日志服务”; 2. 配置 SLS Project、LogStore、日志主题、日志内容、角色配置等参数。 3. 保存启动即可创建事件流。 最佳实践示例 异步架构完备性校验 在使用消息队列搭建异步应用架构时,会偶发遇到消息丢失的情况,这种情况下的问题排查通常较为麻烦,需要确定问题到底是出在发送端、消费端还是消息队列上,这种场景可以使用 SLS + EventBridge 来进行相关预警和现场保留。 1. 业务 1 发送消息到消息队列,业务 2 异步消费 MQ 中的消息,实现架构解耦; 2. 消息发送端和消费端,在完成消费发送、消费的相关操作后,均将操作日志打印出来,并采集到 SLS 上,日志中可以包含消息 ID 等字段以确保可溯源; 3. 配置 EventBridge 事件流,事件提供方为 SLS,事件接收方为函数计算 FC; 4. FC 中的服务读取 SLS 中日志内容,若发现针对某条消息,若仅有发送日志无消费日志,则说明可能存在漏消息的可能性,需要相关人员及时介入排查。 异常业务异步处理 部分消息队列如 RocketMQ 有死信队列能力,当用户消费失败达到一定次数时,消息会被投递到死信队列。用户也可以使用 SLS + EventBridge 构建业务死信队列,以完成对异常情况的处理。 例如下图是一个电商平台的订单处理系统,当订单处理成功时,相关信息会被写入 DB 或者进行后续操作。但如果订单处理异常用户又不想要阻塞现有订单处理流程,则可以将处理异常订单的流程异步处理。 1. 用户下单/付款,订单系统进行业务处理,处理成功则将数据变更写入 DB; 2. 订单处理异常,记录相关信息日志; 3. 搭建 EventBridge 事件规则。事件源为 SLS,事件目标为函数计算 FC; 4. 当有异常业务日志产生时,日志内容被 SLS 事件源拉取,随后投递到 FC,由专门的服务来处理异常订单。当然,在架构设计时也可以将异常订单信息直接投递到函数计算,但对于大部分业务系统而言,当有异常出现时通常都会进行相关日志的打印,即异常日志大概率是存在的,这个时候使用 SLS + EventBridge 则无需再使用函数计算的发送客户端,仅按需打印日志即可,对业务的侵入性更小。 消息备份 目前阿里云上的消息队列产品种类丰富,用户在使用消息队列实现业务解耦的同时,也会产生对消息内容进行加工分析的需求。SLS 拥有强大的数据加工能力,使用 EventBridge 将消息路由到 SLS,在实现消息备份的同时也可以利用 SLS 的分析加工能力来提升业务的可观测性。 1. 搭建 EventBridge 事件流。事件提供方为各种云上消息队列,事件目标方为日志服务 SLS; 2. 使用 SLS 的能力完成消息的加工、查询、分析与可视化。 自建 SQL 审计 目前 EventBridge 已经支持了 DTS 作为事件源的能力,使用 EventBridge 可以轻松实现构建自定义 SQL 审计的需求。 1. 用户新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 _相关链接_ _[1] 日志服务SLS_ _[2] 事件总线_ _[3] 事件流_
作者:昶风
#行业实践 #生态集成

2022年10月20日

解析 RocketMQ 多样消费功能-消息过滤
什么是消息过滤 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。 就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。 消息过滤的应用场景 我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。 电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。 而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。 举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。 试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案: 1. 通过将主题进行拆分,将不同的消息发送到不同主题上。 对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。 2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。 这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。 有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。 RocketMQ 消息过滤的模式 RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。 在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍 Tag 标签过滤 功能介绍 Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。 过滤语法 1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者; 2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费; 3. 全 Tag 匹配:使用星号()作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。 使用方式 1. 发送消息,设置 Tag 标签 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息 .setTag("TagA") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个 Tag 标签 //只订阅消息标签为“TagA”的消息 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个 Tag 标签 //只订阅消息标签为“TagA”、“TagB”或“TagC”的消息 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有 Tag 标签,即不过滤 //使用Tag标签过滤消息,订阅所有消息 FilterExpression filterExpression = new FilterExpression("", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 技术原理 RocketMQ 在存储消息的时候,是通过 AppendOnly 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。 整个 Tag 标签过滤的流程如下: 1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker; 2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中; 3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中; 4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者; 5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。 SQL 属性过滤 功能介绍 SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。 过滤语法 1. 数值比较:, =, , IN 3. 判空运算:IS NULL or IS NOT NULL 4. 逻辑运算:AND, OR, NOT 使用方式 1. 发送消息,设置属性 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息属性,用于消费端根据指定属性过滤消息。 .addProperty("Channel", "TaoBao") .addProperty("Price", "5999") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price5000", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有属性 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 技术原理   由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。 整个 SQL 过滤的处理流程如下: 1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数;  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker;  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图;  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配;  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者  差异及对比 最佳实践 主题划分及消息定义 主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。 那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。 主题划分的原则   1. 业务领域是否一致 不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。 2. 业务场景是否一致 同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。 3. 消息类型是否一致 同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。 消息定义的原则   1. 无标签无属性 对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。 2. 如何定义标签 标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。 3. 如何定义属性 属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。 保持订阅关系一致 订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。 正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 TopicA 这个主题,但是消费者一订阅的是 TagA 这个标签的消息,消费者二订阅的是 TagB 这个标签的消息,那么他们两者的订阅关系就存在不一致。 导致的问题: 那么订阅关系不一致会导致什么问题呢? 1. 频繁复杂均衡 在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。 2. 消费速率下降 客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。 3. 消息重复消费 客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。 4. 消息未消费 订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。 使用的建议 在消息过滤使用中,有以下建议: 1. 不要共用消费者组 不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。 2. 不频繁变更订阅关系 频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。 3. 变更做好风险评估 由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。 4. 消费做好幂等处理 不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。 到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注。
作者:徒钟
#技术探索 #功能特性

2022年10月12日

RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。 全新统一的 API 此处的 API 并不单单只是接口上的定义,同时也规定了各个接口不同的方法和行为,明确了整个消息模型。 RocketMQ 过去的 API 从第一版开始,至今已经过了很长时间,长期依赖是一个缺乏变革的状态,对于一些中途打算废弃或者变更的 API 也没有进行后续的迭代。此外,接口的定义也不够清晰。因此,RocketMQ 希望在 5.0 中能够建立一个统一的规范,精简整个 API,通过引入 builder 模式来引入更多的不变性,同时做好异常管理,给开发者和用户一个更加清爽的面貌。 目前 C++ 和 Java 已经进行了 5.0 API 的定义与实现,更多语言的支持也陆续在路上了。我们也欢迎更多的开发者可以参与到社区的工作中来。这里给出 5.0 客户端的仓库链接: 除了在上述接口上做的一些修改之外, RocketMQ 5.0 还规定了四种新的不同的客户端类型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。 其中 Pull Consumer 还在开发中;Producer 主要还是做了接口裁剪,规范了异常管理。在功能上其实并没有做一些颠覆性的改变。Push Consumer 也是比较类似的;Simple consumer 将更多的权利将下发给用户,是一种用户可以主动控制消息接收与处理过程的消费者,特别的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都采用 RocketMQ 的 pop 机制进行实现,一些社区的同学可能已经熟悉了。 如果用户并不一定想控制或者关心整个消息的接收过程,只在乎消息的消费过程的话,这个时候 Push Consumer 可能是一个更好的选择。 RocketMQ 5.0 定义了四种不同的消息类型。过去的开源版本中其实我们并没有去突出消息类型这样一个概念,后续出于维护及运维方面的需要以及模型定义的完备,才让今天的 5.0 有了消息类型的这样一个概念。 1、NORMAL:普通消息。 2、FIFO:满足先入先出的语义。用户可以通过定义 message group 来控制消息间的消费顺序。例如图中的 fruit 这个 topic 下,可以定义不同的 message group,在 apple 这个 message group 下,会按照发送顺序决定消息被消费的顺序,并且不同的 message group 之间不会互相干扰。 3、TRANSACTIONAL:可以将一条或多条消息包成一个事务,最终用户可以根据自己的业务结果选择提交或者回滚。 4、DELAY:用户可以自主地设置消息的定时时间,相比之前开源版本仅允许用户设置定时/延迟级别,5.0 的实现中还讲允许用户设置更精确的时间戳。 以上四种消息是互斥的,我们会在 topic 的元数据去标识它的类型。实际在消息发送的时候如果如果出现了尝试发送的消息类型与 topic 类型不匹配的情况,也会做一些限制。 实现 RocketMQ 5.0 在客户端的启动过程中提前进行了更多的准备工作。比如用户提前设置要发送消息的 topic 时,Producer 会在启动过程中尝试获取对应 topic 的路由。在过去的客户端实现中,在针对于某个 topic 第一次发送消息时,需要先获取路由,这里就会有一个类似冷启动的过程。 提前获取 Topic 的路由信息有两点好处: 1. 不阻塞后面的发送,让消息的发送仅仅触发发送这一个动作。 2. 错误前置,比如用户要往一个不存在 Topic 发送消息时,因为路由的获取参与到整个客户端的启动过程,获取路由不成功,那整个客户端启动可能就会失败,用户也是拿不到对应的 Producer 对象的。 类似的,Consumer 的启动也会有这样的一个过程。 除此之外,我们在客户端和服务端之间增加了一个 Telemetry 的部分,它会在客户端和服务端之间建立起了一个进行双向数据通讯的通道,客户端和服务端会在这个过程中沟通配置,比如服务端可以实现对客户端配置的下发,更好地管理客户端。此外,Telemetry 也可以将本地配置主动上报给服务端,让服务端也可以对客户端的设置有更好的了解。Telemetry 通道也会在客户端启动时尝试建立,如果这个通道没有建立成功,也会影响客户端的启动。 总的来说,客户端的启动过程会尽可能将所有准备工作做好。同时在客户端和服务端之间建立 Telemetry 这样一个通讯通道。 客户端内部存在一些周期性的任务,比如路由的定时更新以及客户端往服务端发送心跳等。对于上文中提到的 Telemetry 过程中,客户端的配置上报也是周期性的。 Producer 在 RocketMQ 5.0 中的具体工作流程 消息在发送时,会检查是否已经获取对应 topic 的路由信息。如果已经获取,则尝试在路由中选取队列,随即查看要发送的消息的类型是否与 topic 类型匹配,如果匹配,则进行消息发送。如果发送成功,则返回;否则,判断当前重试次数是否超出用户设置的上限,如果超出,则返回失败;否则轮转到下一个队列,然后对新的队列进行重试直到消费次数超出上线。而如果启动过程中没有提前获取路由,那么消息发送时依然会先尝试获取路由,然后再进行下一步操作。 另外一点相对于老客户端较大的改变在于,客户端从底层 RPC 交互到上层的业务逻辑全部采用异步实现。Producer 依然会提供一个同步发送接口和异步发送接口,但同步的方法也是使用异步来实现,整个逻辑非常统一和清爽。 Push Consumer 分为两部分,消息的接收和消费。 消息接收流程为:客户端需要不断地从服务端拉取消息,并将消息缓存。Push Consumer 会将消息先缓存到客户端的本地,再进行消费,因此它会判断客户端本地的 Cache 是否已满,如果已满,则隔一段时间再判断,直到消息被客户端消费,Cache 尚有余量时再进行消息拉取。为了避免出现一些内存问题,Cache 的大小也是被严格限制的。 消息消费过程分为两个类型,顺序类型和非顺序类型。 其中非顺序类型即并发消费。消费者会先从 Cache 中获取消息,然后尝试消费消息,消费后再将消息从 Cache 中移除。消息消费成功时,会尝试将消息 ACK ,消费流程结束;如果消费失败,则尝试修改消息的可见时间,即决定下一次什么时候可见。 顺序消费指对于同一个 Group 的消息,最终消费时一定是前一条消息被消费过并且得到确认后,后面的消息才能够继续消费。而消费过程与非顺序消费类似,首先尝试从 Cache 中拉取消息,如果消费成功,则将消息 ACK。ACK 成功后,将其从 Cache 中移除。特别地,如果消费失败,会 suspend 一段时间,然后继续尝试对消息进行消费。此时会判断消费次数是否超限,如果超限,则会尝试将消息放入死信队列中。 相对于非顺序消费,顺序消费更复杂,因为其需要保证前一个消息消费成功后才能对后面的消息进行消费。顺序消费的消费逻辑是基于 message group 隔离的。message group 会在发送时做哈希,从而保证 message group 的消息最终会落在一个队列上,顺序消费模式本质上保证队列内部消费的顺序。 此外,因为不同 message group 的顺序消息最终可能会映射到同一个队列上,这可能会导致不同的 message group 之间的消费形成阻塞,因此服务端未来会实现一个虚拟队列,让不同的 message group 映射到客户端的虚拟队列,保证他们之间没有任何阻塞,从而加速数据消息的消费过程。 对于 Simple Consumer,用户可以主动控制消息接收和确认的流程。比如用户收到消息后,可以根据业务决定是否过一段时间再消费该消息,或者不需要再收到该消息。消费成功后将消息 ACK 掉,如果失败则主动修改可见时间,选择该消息下一次什么时候可见,即由用户自发地控制整个过程。 可观测性 Shaded Logback 因为历史原因,RocketMQ 的老客户端并不是面向 SLF4J 进行编程的,而是面向 logback 的。这么做的目的其实是为了方便快捷地获取日志,不需要让用户自己去手动配置。 RocketMQ 中专门有一个 logging 模块是负责日志部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,两者就会产生各种各样的冲突,这个 logging 模块就是用来保证这一层隔离性的。 但是 logging 模块本身的实现并不是很优雅,也带来了一定的维护成本。因此我们采用了 shade logback 的方式来达到上文提到的隔离性。shaded logback 不仅能够避免用户的 logback 与 RocketMQ 自己的 logback 冲突,还能保持较好的可维护性,将来要想在日志上去做一些修改,也来得容易的多。 具体来说,用户的 logback 会采用 logback.xml 的配置文件,通过 shade logback, RocketMQ 5.0 的客户端会使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已经完全隔离了,同时在 shade 的过程中,还对原生 logback 中使用到的一些环境变量和系统变量也进行了修改,这样就保证了两者的彻底隔离。 另外,使用 shadeed logback 之后,RocketMQ 5.0 客户端中的日志部分就全都是面向 SLF4J 来进行编程的了,这样一来,如果我们未来想让用户自己去完全控制日志的话,提供一个去除 logback 的 SDK 就可以了,非常方便。 Trace 5.0 的消息轨迹基于 OpenTelemetry 模型进行定义与实现,消息发送或接收消息的流程被定义为一个个独立的 span ,这一套 span 规范参照了 OpenTelemetry 关于 Messaging 的定义。图中这里 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命周期,从发送到接收到消费,就可以具象化为这样一个个的 span。 比如,针对 Push Consumer 而言,先会有一个 receive 的 span 来表示从服务端获取消息的过程,收到消息后到会先等待消息被处理,这个也就是 await span 表示的过程,消息被处理则对应图中的 process span,消息消费结束之后,向服务端反馈消息处理结果也会有专门的 span 进行描述。 我们通过 parent 和 link 来讲所有的这些 span 关联起来,这样通过一条消息的任意一个 span,就可以获得这条消息全生命周期的所有 span。 不仅如此,用户还将允许可以设置一个 span context 与自己的业务链路进行关联,将 RocketMQ 5.0 的消息轨迹本身嵌入进自己的全链路可观测系统中去。 Metrics Tracing 相对来说成本是比较高的,因为一条消息从发送到接收,可能会有很多流程,这就伴随着很多的 span,这就导致相对来说,tracing 数据的存储查询成本相对来说比较高。我们希望诊断整个 SDK 的健康状况,同时又不希望收集太多的 tracing 信息提高成本,此时提供一份 metrics 数据就能比较好地满足我们的需求。 在 SDK 的 metrics 中我们新增了诸多指标,包括不限于 Producer 中消息发送延时,Push Consumer 中消息的消费耗时和消息缓存量,可以帮助用户和运维者更快更好地发现异常。 5.0 中 SDK 的 metrics 也是基于 OpenTelemetry 进行实现的。以 Java程序为例,OpenTelemetry 对于 Java 的实现本身提供了一个 agent,agent 在运行时会打点采集 SDK 的一些 tracing/metrics 信息,并将它上报到对应的 metric collector 中,这种通过 agent 来降低无侵入式数据采集的方式被称之为 automatic instrumentation,而手动在代码中实现打点采集的方式则被称之 manual instrumentation。对于 metrics 我们目前还是采用 manual instrumentation 的方式来进行数据的采集和上报的。服务端会告知客户端对应的 collector 的地址,然后客户端将 Metrics 数据上传到对应的 collector 当中去。 _作者介绍:_ _艾阳坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里云智能高级开发工程师。_
作者:艾阳坤
#技术探索 #云原生

2022年9月30日

行业实践:Apache RocketMQ 业务集成典型行业应用和实践
消息典型应用场景 阿里云拥有丰富的消息产品家族,除了 RocketMQ 以外,还有大家熟知的对标开源的云 Kafka、支持 AMQP 协议的开源消息队列 RabbitMQ、物联网通信网关 MQTT、 对标 AWS SQS/SNS 的 Serverless 版消息 MNS(现在也是轻量版 RocketMQ)以及云上事件总线、事件中心 EeventBridge 。 阿里云所有消息产品均采用 RocketMQ 作为底层存储引擎,为用户提供稳定、可靠、高性能的消息通信能力,比如百万 TPS、百万队列、毫秒级通信延迟、分级存储、 Serverless 弹性等众多的消息产品。也带来了丰富的应用场景,分为应用集成和数据集成两大类。 应用集成以 RocketMQ 为主,应用最为广泛,本文也将分享 RocketMQ 在微服务解耦、电商交易、金融支付等场景下的最佳实践,比如银行的交易流水、保单的支付流转等。RabbitMQ 、MQTT 也主要用于应用集成场景,比如物联网、 IoT 双向通信、云产品的事件通知以及后处理等。对于新建的业务场景,一般首推 RocketMQ 作为消息选型,因为 RocketMQ 拥有最丰富的功能特性;而对于存量的业务迁移,则可以根据具体使用的消息产品来进行选择,以降低迁移成本。 数据集成以云 Kafka 为主,在大数据分析、日志采集分析等场景下应用最为广泛,比如游戏的玩家操作、广告埋点、数据分析、应用数据监控等。各种 SaaS 类的集成、聚石塔、电商数据打通等场景,则主要使用 EventBridge。更多的产品选型对比,可以参考专题页中的消息队列产品选型。 业务消息使用场景 RocketMQ 经过阿里集团内部十年锤炼,经过双 11 大促等万亿级规模的实时场景验证,SLA 最高可支持 4 个9,数据可靠性支持 9 个9。 以微服务解耦、订单交易、金融支付等场景为例,在互联网、电商、零售、金融等行业,消息的使用量非常大。尤其是在秒杀大促时,为了保障系统的稳定运行,需要 RocketMQ 进行削峰填谷。另外金融客户对每笔交易、每个订单也都要求数据不能丢失。因此在此类场景普遍对消息的可靠传输、海量并发、低延迟、稳定性与容灾等有着非常高的要求。RocketMQ 提供了丰富的消息类型,比如事务消息、定时消息、顺序消息等。 在交易系统里,为了简化交易流程,一般使用事务消息和定时消息。同时 RocketMQ 也提供了消息轨迹查询、消息 dashboard ,可以非常方便地对每个消息进行回溯,对每个 topic 或者 group 进行监测。RocketMQ 5.0 也提供了丰富的实例规格,从百级别 QPS 到百万级 QPS ,可以覆盖大部分应用场景。RocketMQ 默认提供多副本、多可用区部署,也提供了跨地域消息路由能力,支持客户构建高可用容灾或多活,且 RocketMQ 能够支持 99.9%的消息 RT 在 10ms 传输。 RocketMQ 事务消息举例 实现订单状态机异步流转 以订单状态机异步流转为例。此前,如果收到一笔订单交易,需要逐个通知下游的服务模块,比如需要更新购物车、更新积分等。每个业务模块耦合在一起会导致大促时的流量峰值非常大,需要每个服务模块保障其处理性能。而基于 RocketMQ 的事务消息能力,即可轻松实现订单子流程系统的异步解耦和流量的削峰填谷,事务消息可以确保数据库订单状态持久化和下游通知的事务性。 收到 LBL 订单交易时,可以先向 RocketMQ 发送一条半事务消息,此时 RocketMQ 会 hold 住消息,等核心交易事务完成后再向 MQ 提交确认半事务消息的状态,并执行下游服务模块的通知。假设核心交易模块失败,则会废弃之前提交的半事务消息,不通知下游。 对比此前的传统事务模块,使用 RocketMQ 可以大幅简化交易链路,缩短下单耗时。尤其是在大促场景下,可以解耦下游的服务模块,提供削峰填谷的能力。 超时中心 RocketMQ 的定时消息场景也是常见的使用方式。 比如双 11 等大促场景存在大量预售订单、定点尾款等,会带来大量定时任务。在电商交易过程中,订单流转也存在多个超时状态的任务,处理超时状态的任务需要确保可靠及时。以传统的方案进行构建分布式调度机制实现的时候,比如基于定时器调度延迟大,可能会存在性能瓶颈。 而采用 RocketMQ 的定时消息,实现将变得非常简单。定时任务只需提交一条延迟消息到 RocketMQ ,由 RocketMQ 保障定时消息达到秒级的精度,最高可支持百万级别的 TPS 能力,同时也能支持消息的消费重试,保障任务可靠触发,相比传统的使用方式大大简化了定时的复杂度。 RocketMQ 灰度策略举例 微服务全链路灰度 微服务场景下,精准地控制灰度流量并进行灰度版本验证,是保障线上业务稳定运行的关键。大部分情况下,用户通过划分不同的环境来进行灰度发布,对应 RocketMQ 的不同实例。但是很多用户希望能够简化环境管理,尽可能复用线上资源,结合消息来提供微服务全链路灰度能力。 如上图所示,线上已经在运行的微服务模块游 A、B、C,C 模块会产生消息,并由 A 模块进行消费。此时对服务模块 A 和 C 做灰度发布,则线上会存在两条泳道,一条是正常的业务流量,一条是灰度链路。我们希望线上版本 C 模块生产的消息能够被线上版本 A 模块进行消费,灰度版本 C 模块生产的消息能够被灰度版本 A 模块进行消费。 RocketMQ 支持透传环境标签,可在生产端给消息属性添加标签,然后开启 RocketMQ 的 SQL 92 语法过滤,服务端即可完成消息的过滤和路由,从而降低客户端的压力。 本文讲述了 RocketMQ 的业务消息场景、一些功能特性的使用方法,包括事务消息、定时消息、消息全链路灰度等,欢迎大家尝试使用。
作者:洛浩
#行业实践

2022年9月23日

Apache RocketMQ 在阿里云大规模商业化实践之路
阿里云消息队列 RocketMQ 商业化历程 RocketMQ 诞生于 2012 年,诞生即开源。2012~2015 年,RocketMQ 一直在通过内部电商业务打磨自身服务能力,并在 2015 年于阿里云上线公测。2016 年,阿里云 RocketMQ 完成商业化,同时被捐赠给 Apache 基金会,同年获得了年度受欢迎中国开源软件荣誉。 在 Apache 孵化期间,Apache RocketMQ 经历了快速发展,2017 年即毕业成为了 Apache 顶级项目。同年,Apache RocketMQ TLP RocketMQ 4.0 正式发布。此后,RocketMQ 4.0 经历了长足发展,期间阿里云商业和开源相辅相成、齐头并进,直到今天,共同迈入 RocketMQ 5.0 时代。 RocketMQ 5.0 发布后,阿里云商业会持续采取 OpenCore 的发展模式,秉承上游优先的社区发展原则,与社区一起将 RocketMQ 打造为一个超融合的数据处理平台。 阿里云消息队列产品矩阵 阿里云基于 RocketMQ 消息底座,构建了多元化的消息产品系列。 RocketMQ 是阿里云主打的消息品牌,互联网新兴业务领域首选的数据通道。消息队列 Kafka 是大数据的首选数据通道,微消息队列 MQTT 是移动互联网和物联网的数据通道,消息队列 RocketMQ 是传统业务领域的数据通道。消息服务 MNS 是 RocketMQ 轻量版,主要应用于应用集成领域,为平台型应用提供简单的队列服务。事件总线 Event Bridge 定位为云上事件枢纽,旨在阿里云上构建统一的事件中心。 阿里云消息队列产品矩阵完全构建在 RocketMQ 之上,基本实现了应用场景全覆盖,包括微服务解耦、SaaS 集成、物联网、大数据或日志收集生态,同时也在内部覆盖了阿里巴巴所有业务,在云上为数万阿里云企业提供了优质的消息服务。阿里云的消息产品矩阵涵盖了互联网、大数据、移动互联网等领域业务场景,为云原生客户提供不可或缺的一站式解决方案。 RocketMQ 在阿里云商业化历程中,一直致力于探索业务消息实践,也孵化了大量业务消息特性,并持续反哺到开源社区。 RocketMQ 4.0 业务消息探索之路 RocketMQ 在商业化过程中,陆续推出了四种消息类型来满足丰富的业务场景。 普通消息:普通消息提供极致弹性、海量堆积能力,内置重试与死信队列来满足业务对失败重试的需求,同时具备高吞吐、高可用、低延迟等特性,广泛应用于应用集成、异步解耦、削峰填谷等场景。 定时消息:提供秒级定时精度, 40 天超长定时,主要面向分布式定时调度、任务超时处理等场景,目前正在开源中。  顺序消息:支持全局与局部严格有序,从发送、存储到消费,保证端到端有序。面向有序事件处理、撮合交易、数据实时增量同步等场景。 事务消息:分布式、高性能、高可用的最终一致性事务解决方案,广泛应用于电商交易系统中服务的一致性协调场景并且已经开源。  RocketMQ 4.0 期间,商业和开源都致力于全方位拓展消息接入能力,使 RocketMQ 能够非常轻松地连接应用开源和云产品生态。比如商业上提供了多语言 SDK ,开源也有相应的 SDK 能够覆盖 Java、Go、Python 、C++使用 RocketMQ。同时支持 Spring 生态,能够通过 Spring Cloud 的方式使用 RocketMQ。商业上提供了一组非常简单易用的 HTTP API,提供了 67 种语言的实现。 除了 SDK 接入,RocketMQ 也在积极拥抱社区标准,在云产品侧提供了 AMQP 和 MQTT 的接入能力,其中 MQTT 已开源。 RocketMQ 也大力在发展 connector 生态,能够通过 RocketMQ connector 接入很多数据源,包括 Redis、MongoDB、Hudi 等大数据系统。 另外,阿里云构建的事件总线 EventBridge 也已开源,通过该产品能够将阿里云的云产品、SaaS 应用、自建数据平台的数据引入 RocketMQ。 RocketMQ 4.0 版本做了大量尝试,提供了全方位的消息接入能力。 RocketMQ 在服务阿里集团用户和商业化历程中,沉淀了大量领先的业务消息处理与服务能力。比如消息订阅方面,RocketMQ 支持集群分布式消费能力,也支持广播消费。在消息处理方面支持基于 Tag 和 SQL 做灵活过滤,其中基于 SQL 过滤是电商交易中非常重要的特性,能够支持在非常订阅比的情况下实现较低的投递比。 全球消息路由能力具备性能高、实时性强的特点。在云时代,数据中心天然分布在各个地域,各个地域之间还有 VPC 网络隔离。但是通过全球消息路由功能可以将地域与网络打通,能够满足更多业务场景。比如在阿里内部基于该能力实现了异地多活、异地容灾等企业级特性。 另外,全球消息路由具备非常高的易用性,提供了可视化任务管理界面,通过简单配置即可创建复制链路。 消息治理方面,RocketMQ 提供了访问控制、命名空间、实例限流、消息回放、重试消息、死信消息、堆积治理等能力。 服务能力方面,RocketMQ 经历了非常多沉淀,它在为交易链路服务了 12  年,参加了 10 年双 11,这也保证了 RocketMQ 能够在阿里云上提供非常高的可靠性。双 11 消息收发 TPS 峰值过亿,日消息收发总量超过 3 万亿。而即使在双十一万亿级数据洪峰下,消息也能做到 99.996% 毫秒级响应能力,消息发布平均响应时间不超过 3 毫秒,最大不超过 20 毫秒,真正实现了低延迟消息发布。 商业化初期,客户遇到最大难题是在分布式环境下如何完整地追踪异步消息链路。基于此背景,我们打造了可视化全生命周期消息轨迹追踪系统,能够提供丰富的消息查询、消息下载、定点重投、轨迹追踪能力,通过可观测系统帮助用户解决分布式环境中不可观测的问题。 如上图所示,一条消息从产生、发送至服务端存储到最终投递到消费者,整个发送和消费轨迹都有迹可循,包括投递给哪些消费者、哪些消费者在什么地方成功消费或者消费失败、何时进行重投,真正帮助客户解决了分布式观测难题。 除了功能特性,RocketMQ 在稳定性方面也做了很多建设。我们始终坚持,SLA 是云原生的根本,因此整个研发运维链路都有严格的稳定性保障措施: 架构开发:每个方案设计都会面向失败设计,代码开发阶段会有严格 Code Review 阶段,也会完整经历单元测试、集成测试、性能测试和容灾测试流程。 变更管理:有着非常严格的变更制度,要做到每个变更可灰度、可监控、可回滚、可降级。 稳定性防护:提供了限流、降级、容量评估、应急方案、大促保障等能力,会定期进行故障和预案演练,定期进行风险梳理。 体系化巡检:在云上有全方位的生产环境黑盒巡检。基于用户视角,会对全地域所有功能做全功能扫描,包含高达 50 多项检测项,任意项功能出问题都能立刻被监测到。在白盒巡检方面,会对 JVM 运行时指标、内核系统、集群指标进行巡检。 故障应急:有完整地故障应急流程,包括监控报警、故障发生、快速止血、排查根因、故障复盘。 RocketMQ 5.0 云原生架构升级之路 云原生时代,云上用户对云产品服务化程度、弹性能力、可控制性能力以及韧性都有了更高的要求。在此背景之下,我们对 RocketMQ 进行了云原生架构升级,这也是 RocketMQ 5.0 的诞生背景。 轻量级 SDK:基于云原生通信标准 gRPC 开发了一组轻量级 SDK,能够与当前富客户端优势互补。   无状态消息网关:在核心数据链路推出了无状态消息网关。通过搭建无状态服务节点Proxy,再通过 LB 进行服务暴露,将存储节点数据分离来独立负责核心消息存储和高可用。Proxy 与 Store 节点分离部署,独立弹性。  Leaderless 高可用架构:Store 节点身份完全对等,完全 Leaderless 化,去 ZK 和 HA 管控节点,能够做到非常高的可用性。同时相比传统的 Raft 一致性协议,该 Leaderless 架构能够做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移。高可用架构目前已经完成开源并与 Dledger 进行了融合。  云原生基础设施:可观测验能力云原生化,OpenTelemetry 标准化。整体架构走向 Kubernetes 化,能够充分利用售卖区的资源弹性能力。 RocketMQ 4.0 推荐的接入方式主要是富客户端。富客户端提供了诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但在云原生时代,轻量级、高性能的客户端更容易被云原生技术栈所集成。 因此,RocketMQ 5.0 重磅推出了全新多语言轻量级 SDK,具有以下优势: 全新极简 API 设计:不可变 API,有完善的错误处理。多语言 SDK 保障 API 在 Native 层面对齐。同时引入了全新的 Simple Consumer,能够支持按消息模型进行消费,用户不再需要关心消息队列,只需要关注消息。  通信层采用 gRPC 协议:拥抱云原生通信标准,gRPC 能够使服务更易被集成。多语言 SDK 通信代码也可以通过 gRPC 快速生成,更 Native 。  轻量级实现:采用无状态消费模式,能够大幅降低客户端的实现复杂度。客户端更轻量,采用的应用也更容易被 Serverless化、Mesh 化。  云原生可观测性:客户端实现了 OpenTelemetry 标准,能够支持以 OpenTelemetry 形式导出 Metrics 与 Tracing。 RocketMQ 5.0 的另一个重大升级是引入了全新的无状态消费模型。该消费模型完全构建在原先的队列模型之上。队列模型是与存储模型一致的消费模型,消费者完全按照队列做负载均衡,也按照队列做消息拉取,非常适合批量高速拉取以及对单条消息状态不敏感的场景,比如流计算等。 RocketMQ 5.0 推出了 PoP 机制,巧妙地在队列模型之上构建了消息模型,实现了鱼与熊掌兼得。在此消息模型的设计上,业务可以只关心消息而无需关心队列,所有 API 都能够支持单条消息级别的消费、重试、修改不可见时间、删除。 在消息模型下,消息发送过来被存储后,即对消费者可见。消费者通过 Receive Message API 对消息进行消费后,消息进入定时不可见状态。消息超时过后又会重新处于可见状态,能被其他消费者继续消费。某消费者确认消息后,服务端会对该消息进行删除,随即不可见。 基于消息系模型的消费流程下,API 完全面向消息而不是面向队列。而当 PoP 机制遇见了无状态 Proxy,除了存储层,其他节点都是无状态的;客户端、连接和消费也是无状态的,可任意在 Proxy 节点上飘移,真正做到轻量级。 经过重构,RocketMQ 5.0 的可观测性也走向了云原生标准。 Metrics 侧: 指标涵盖丰富:设计了更丰富的指标,包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示。 消息团队实践模板:为用户提供实践模板,并持续迭代更新。 Prometheus + Grafana:Prometheus 标准数据格式,利用 Grafana 展示。除了模板,用户也可以自定义展示大盘。 Tracing 侧: OpenTelemetry Tracing 标准:RocketMQ Tracing 标准已经合并到 OpenTelemetry 开源标准,提供了规范和丰富的 messaging tracing 场景定义。 消息领域定制化展示:按照消息维度重新组织抽象的请求 span数据,展示一对多的消费,多次消费信息直观且方便理解。 可衔接 tracing 链路上下游:消息的 tracing 可继承调用上下文,补充到完整的调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging 侧: Error Code 标准化:不同的错误有唯一的 Error Code。 Error Message 完整:包含完整的错误信息和排序所需要的资源信息。 Error Level 标准化:细化了各种不同错误信息的日志级别,用户可根据 Error、Warn 等级别配置更适合的监控告警。 弹性方面,RocketMQ 5.0 商业版能够充分撬动云的计算、存储和网络的池化资源。比如在计算方面,RocketMQ 5.0 所有工作负载完全部署在 ACK 之上,充分利用了 ACK 弹性能力,撬动 ACK 弹性资源。主要依赖 ACK 的两项技术,一是弹性资源池,另一个是 HPA 支持计算能力快速弹性。同时也会在 ACK 之上做跨可用区部署以提供高可用保障。 网络层面,RocketMQ 5.0 也会充分利用阿里云网络设施,为用户提供更便捷的网络访问能力。比如 RocketMQ 5.0 实例能够支持公网随开随用,需要依赖公网做测试的时候即开即用,测试完立即关闭,安全与方便兼具。同时支持多种私网类型的网络形态,包括 Single Tunnel、Private Link,另外也基于 CEN 构建了全球互通设计网络。 存储方面,RocketMQ 5.0 商业版率先引入多级存储概念,基于 OSS 构建二级存储,能够充分利用 OSS 存储的弹性能力,存储计费也转向了按量付费。而用户能够在 RocketMQ 之上自定义消息存储时长,比如将消息从 3 天有效时长延长至 30 天,能够真正将消息变为数据资产。同时利用二级存储能力,将冷热数据分离,为用户提供一致的冷读 SLA 。 RocketMQ 5.0 商业版发布预告 RocketMQ 4.0 历经了五年发展,开源和商业版本共同迈入了 5.0 时代。7 月底,阿里云消息队列将会基于开源版发布全新的 5.0 商业化版本。注:截止发稿前,RocketMQ 5.0 已经在阿里云消息队列 RocketMQ 产品上全新发布,目前支持国内主要地域。 RocketMQ 5.0 版相对于 4.0 版实例主要有以下几大改变: 第一,新版本、新售卖,更便宜。新版本采取了全新计量方式,有包年、包月型,也有按量付费和公网流量弹性计费。也有更全的售卖体系,比如新增专业版实例,能够满足部分用户需求。同时每个商品系列都新增了测试环境专用实例,能够方便用户以低成本的方式搭建自己的开发环境。 第二,更强弹性,降本提效利器。存储完全走向弹性,能够通过 Serverless 按需使用,按量付费。预留弹性,实例基础规格支持实时升降配,用户可以很方便地在流量到来之前做弹性。此外,专业版支持突发流量弹性,能够解决线上稳定性风险。 第三,全新架构,增强可观测运维。无状态消息消费模型能够解决一些老版本的痛点。同时在可观测上全面采取了云原生接入栈。 消息的全新形态:事件总线 EventBridge 事件总线 EventBridge 已经开源到 RocketMQ 社区中。云原生时代,事件无处不在,云计算资源散落在各地,各类生态孤岛随处可见。因此,以事件和事件驱动的方式来集成这一切是大势所趋。 基于此,阿里云推出了全新事件型产品 EventBridge。该产品构建在 RocketMQ 之上,是 RocketMQ 之上的一个事件驱动架构实践。 EventBridge 的事件源包括阿里云服务的管控事件比如资源变更事件、审计事件、配置变更事件,阿里云服务的数据事件,也包括自定义应用、SaaS 应用、自建数据平台、其他云厂商服务等。 事件经过 EventBridge 处理后会投递到事件目标,事件目标包括函数计算、消息服务、自建网关、HTTP(S)、短信、邮箱、钉钉等。 事件源到事件目标之间会经历完整的事件处理,包括事件源接入到 EB 后,可以对事件进行过滤、转换、归档、回放等。事件在 EventBridge 整个流程中也有完善的可观测性设计,包括事件查询、链路追踪。事件的接入方式非常丰富,可以通过 OpenAPI 来接入、7 种多语言 SDK、CloudEvents SDK、Web Console 和 Webhook 。 EventBridge 具有如下特点: 能够大幅度减少用户开发成本,用户无需额外开发,通过创建 EventBridge 源、事件目标、事件规则等资源即可实现事件架构。用户可以编写事件规则,对事件做过滤、转换。  提供原生 CloudEvents 支持,拥抱 CNCF 社区,能够无缝对接社区 SDK 。标准协议也能统一个阿里云事件规范。  事件 Schema 支持:能够支持事件 Schema 自动探测和校验,支持 Source 和 Target 的 Schema 绑定。  全球事件任意互通:组建了全球事件任意互通网络,组件了跨地域、跨账户的事件网络,能够支持跨云、跨数据中心的事件路由。 EventBridge在云上生态已经初具规模,已经集成了 255+ 云产品事件源和 1000+ 事件类型。 EventBridge率先对消息生态做了融合。阿里云的消息产品矩阵生态均通过 EventBridge 做了完全融合。任何一款消息产品与另一款消息产品的数据都能互通。同时,依靠 EventBridge 的全球事件网络,能够为所有消息产品赋予全球消息路由的能力。 EventBridge 目前已经在内部接入钉钉 ISV、聚石塔 ISV,外部也有 50+ SaaS 系统可以通过 Webhook 的方式接入。另外,海量事件源可以触达 10 多种事件目标,已经对接了全系云产品 API ,任何事件都可以驱动全量云产品 API。 作者介绍: 周新宇 Apache Member,Apache RocketMQ PMC Member,阿里云消息队列 RocketMQ 研发负责人。
作者:周新宇
#行业实践 #云原生

2022年8月29日

阿里云基于全新 RocketMQ 5.0 内核的落地实践
前言 在上个月结束的 RocketMQ Summit 全球开发者峰会中,Apache RocketMQ 社区发布了新一代 RocketMQ 的能力全景图,为众多开发者阐述 RocketMQ 5.0 这一大版本的技术定位与发展方向。 在过去七年大规模云计算实践中,RocketMQ 不断自我演进,今天,RocketMQ 正式迈进 5.0 时代。 从社区关于 5.0 版本的解读可以看到,在云原生以及企业全面上云的大潮下,为了更好地匹配业务开发者的诉求,Apache RocketMQ 做了很多的架构升级和产品化能力的适配。那么如何在企业的生产实践中落地 RocketMQ 5.0 呢?本篇文章的核心就消息架构以及产品能力的云原生化,介绍了阿里云是如何基于全新的 RocketMQ 5.0 内核做出自己的判断和演进,以及如何适配越来越多的企业客户在技术和能力方面的诉求。 云原生消息服务的演进方向 首先我们来看下云原生消息服务有哪些演进? 面向未来,适应云原生架构的消息产品能力应该在以下方面做出重要突破: 大规模弹性:企业上云的本质是解放资源供给的负担和压力,专注于业务的集成和发展。作为消息服务的运维方,应该为上层业务提供与模型匹配的资源供给能力,伴随业务流量的发展提供最贴合的弹性能力。一方面可以解决面向不确定突发流量的系统风险,另一方面也可以实现资源利用率的提升。  易用性:易用性是集成类中间件的重要能力,消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。低门槛才能打开市场,扩大心智和群体。   可观测性:可观测性对于消息服务的所有参与方来说都很重要,服务提供方应提供边界清晰、标准开放的观测诊断能力,这样才能解放消息运维方的负担,实现使用者自排查和边界责任的清晰化。  稳定性高 SLA:稳定性是生产系统必备的核心能力,消息来说往往集成在核心交易链路,消息系统应该明确服务的可用性、可靠性指标。使用方应基于明确的 SLA 去设计自己的故障兜底和冗余安全机制。 立足于这个四个关键的演进方向,下面为大家整体介绍一下阿里云 RocketMQ 5.0 在这些方面是如何落地实践的。 大规模弹性:提供匹配业务模型的最佳资源供给能力 消息服务一般集成在业务的核心链路,比如交易、支付等场景,这一类场景往往存在波动的业务流量,例如大促、秒杀、早高峰等。 面对波动的业务场景,阿里云 RocketMQ 5.0 的消息服务可以伴随业务的诉求进行自适应实现资源扩缩。一方面在比较稳定的业务处理基线范围内,按照最低的成本预留固定的资源;另一方面在偶尔存在的突发流量毛刺时,支持自适应弹性,按量使用,按需付费。两种模式相互结合,可以实现稳定安全的高水位运行,无需一直为不确定的流量峰值预留大量资源。 除了消息处理流量的弹性适应外,消息系统也是有状态的系统,存储了大量高价值的业务数据。当系统调用压力变化时,存储本身也需要具备弹性能力,一方面需要保障数据不丢失,另一方面还需要节省存储的成本,避免浪费。传统的基于本地磁盘的架构天然存在扩缩容问题,其一本地磁盘容量有限,当需要扩大容量时只能加节点,带来计算资源的浪费;其二本地磁盘无法动态缩容,只能基于业务侧流量的隔离下线才能缩减存储成本,操作非常复杂。 阿里云 RocketMQ 5.0 的消息存储具备天然的 Serverless 能力,存储空间按需使用,按量付费,业务人员只需要按照需求设置合理的 TTL 时间,即可保障长时间存储时的数据完整性。 集成易用性:简化业务开发,降低心智负担和理解成本 集成易用性是一种系统设计约束,要求消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。举个典型场景,在消息队列例如 RocketMQ 4.x 版本或 Kafka 中,业务消费消息时往往被负载均衡策略所困扰,业务方需要关注当前消息主题的队列数(分区数)以及当前消费者的数量。因为消费者是按照队列粒度做负载均衡和任务分配,只要消费者能力不对等,或者数量不能平均分配,必然造成部分消费者堆积、无法恢复的问题。 在典型的业务集成场景,客户端其实只需要以无状态的消息模型进行消费,业务只需关心消息本身是否处理即可,而不应该关心内部的存储模型和策略。 阿里云 RocketMQ 5.0 正是基于这种思想提供了全新的 SimpleConsumer 模型,支持任意单条消息粒度的消费、重试和提交等原子能力。 可观测性:提供边界清晰、标准开放的自助诊断能力 有运维消息队列经验的同学都会发现,消息系统耦合了业务的上游生产和下游消费处理,往往业务侧出问题时无法清晰地界定是消息服务异常还是业务处理逻辑的异常。 阿里云 RocketMQ 5.0 的可观测性就是为这种模糊不确定的边界提供解法,以事件、轨迹、指标这三个方面为基础,依次从点、线、面的纬度覆盖链路中的所有细节。关于事件、轨迹、指标的定义涵盖如下内容: 事件:覆盖服务端的运维事件,例如宕机、重启、变更配置;客户端侧的变更事件,例如触发订阅、取消订阅、上线、下线等;   轨迹:覆盖消息或者调用链的生命周期,展示一条消息从生产到存储,最后到消费完成的整个过程,按时间轴抓出整个链路的所有参与方,锁定问题的范围;  指标:指标则是更大范围的观测和预警,量化消息系统的各种能力,例如收发 TPS、吞吐、流量、存储空间、失败率和成功率等。 阿里云 RocketMQ 在可观测性方面也是积累良多,不仅率先支持了完善的消息轨迹链路查询,而且在 5.0 新版本中还支持将客户端和服务端的 Trace、Metrics 信息以标准的 OpenTelemetry协议上报到第三方Trace、Metrics中存储,借助开源的 Prometheus 和 Grafana 等产品可以实现标准化的展示和分析。 稳定性 SLA:提供可评估、可量化、边界明确的服务保障能力 稳定性是生产系统必备的核心能力,消息系统往往集成在核心交易链路,消息系统是否稳定直接影响了业务是否完整和可用。但稳定性的保障本身并不只是运维管理,而是要从系统架构的设计阶段开始梳理,量化服务边界和服务指标,只有明确了服务的可用性和可靠性指标,使用方才能设计自己的故障兜底和冗余安全机制。 传统的基于运维手段的被动保障方式,只能做基本的扩缩容和系统指标监控,对于消息的各种复杂边界场景,例如消息堆积、冷读、广播等并不能很好的提供量化服务能力。一旦上层业务方触发这些场景,系统则会被打穿,从而丧失服务能力。 阿里云 RocketMQ 5.0 体系化的稳定性建设,是从系统设计阶段就提供对消息堆积、冷读等场景量化服务的能力,确定合理的消息发送 RT、端到端延迟和收发吞吐 TPS 能力等,一旦系统触发这些情况,可在承受范围内做限制和保护。 本篇文章从大规模弹性、集成易用性、可观测性和稳定性 SLA 等方面介绍了 RocketMQ 5.0 的演进和方向,同时针对性介绍了阿里云消息队列 RocketMQ 5.0 在这些方面的实践和落地。 阿里云消息队列 RocketMQ 5.0 目前已正式商业化,在功能、弹性、易用性和运维便捷性等方面进行了全面增强,同时定价相比上一代实例最高降低 50%,助力企业降本增效,以更低的门槛实现业务开发和集成。新一代实例支持 0~100 万 TPS 规模自伸缩、支持突发流量弹性和存储 Serverless;在可观测性方面,支持全链路轨迹集成和自定义 Metrics 集成;在集成易用性方面,支持新一代轻量原生多语言 SDK,更加稳定和易用。
#行业实践 #云原生

2022年8月18日

解析 RocketMQ 业务消息--顺序消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。 简介 顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。 功能原理 在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。 顺序发送 在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。 同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。  因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。 顺序消费 与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。 小结 无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。 我们回到开头那个问题: 既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢? 现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。 下述是一个表格,简要对比了顺序消息和普通消息。 最佳实践 合理设置 MessageGroup MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。 同步发送和发送重试 如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。 消费幂等 消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。 应用案例 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。   电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。   实战 发送 可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下: public class ProducerFifoMessageExample { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class); private ProducerFifoMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "yourFifoTopic"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing. .setTopics(topic) // May throw {@link ClientException} if the producer is not initialized. .build(); // Define your message body. byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey1ff69ada8e0e") // Message group decides the message delivery order. .setMessageGroup("youMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { LOGGER.error("Failed to send message", t); } // Close the producer when you don't need it anymore. producer.close(); } } 消费 消费的代码如下: public class SimpleConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(5); final List messages = consumer.receive(maxMessageNum, invisibleDuration); for (MessageView message : messages) { try { consumer.ack(message); } catch (Throwable t) { LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); } }
作者:绍舒
#技术探索 #功能特性

2022年8月10日

云原生事件驱动引擎(Apache RocketMQ-EventBridge)应用场景与技术解析
在刚刚过去的 RocketMQ Summit 2022 全球开发者峰会上,我们对外正式开源了我们的新产品 RocketMQEventbridge 事件驱动引擎。 RocketMQ 给人最大的印象一直是一个消息引擎。那什么是事件驱动引擎?为什么我们这次要推出事件驱动引擎这个产品?他有哪些应用场景,以及对应的技术方案是什么? 今天我们就一起来看下,整篇文章包含三部分: 第一部分,我们一起看下什么是事件。 第二部分,和大家一起看看,事件有哪些不一样的“超能力”,使用这些“超能力”呢,我们又能干些什么? 第三部分,我们讲一下 RocketMQ 给出的关于事件的解决方案,也是我们这次开源的项目:RocketMQEventBridge。 什么是事件 大家自己可以先在脑袋里想一下,什么是事件?我们给事件下的一个定义是: 过去已经发生的事,尤其是比较重要的事。 A thing that happens, especially one of importance. 这个很好理解。比如说,昨天下午我做了一次核酸检测;今天上午又吃了一个冰激淋。这些都是过去已经发生的事件。但是,如果我再问:事件跟消息有什么区别?这个时候,大家是不是觉得事件这个定义,好像又不那么清晰? 刚才说的那些事件,是不是也可以理解为消息啊?如果,老张给我发送了一条短信,这个算是事件,还是消息啊?平常开发过程中,“什么时候使用消息,什么时候使用事件?” 不过,在回答这个问题之前,我们一起来看一个典型的微服务。 一个微服务系统和外部系统的交互,可以简单分为两部分:一是接收外部请求(就是图中上面黄色的部分);二是是调用外部服务(就是图中下面绿色的部分)。 接收外部请求,我们有两种方式:一种是提供 API,接收外部发过来的 Query 请求和 Command 请求;另外一种是主动订阅外部 Command 消息。这两类操作,进入系统内部之后呢,我们常常还会,调用其他为微服务系统,一起协同处理,来完成一个具体的操作。当这些操作,使得系统状态发生改变时,就会产生事件。 这里呢,我们把从外部接收到的 Command 消息,和系统内部产生的事件,都称之为消息。 我们总结一下,消息和事件的关系是这样的:消息包含两部分,Command 消息和 Event 消息 1、看图中左半部分,Command 是外部系统发送给本系统的一条操作命令; 2、再看图中右半部分,Event 则是本系统收到 Command 操作请求,系统内部发生改变之后,随之而产生了事件; 所以,事件和消息是不同的,事件可以理解为是一种特殊的消息。其特殊的点,主要在 4 个地方: 已发生、且不可变 事件,一定是“已发的”。“已发生”的代表什么呢?不可变的。我们不可能改变过去。这个特性非常重要,在我们处理事件、分析事件的时候,这就意味着,我们绝对可以相信这些事件,只要是收到的事件,一定是系统真实发生过的行为。而且是不可修改。 对比 Command 和 Query。Command 的中文是什么?命令。很显然,它是还没有发生的,只是表达了一种期望。我们知道“期望的”,不一定会成功发生。 比如:把厨房的灯打开、去按下门铃、转给 A 账户 10w…… 这些都是 Command,都是期望发生的行为。但是,最终有没有发生?并不知道。 Event 则是明确已经发生的事情。比如:厨房灯被打开了、有人按了门铃、A 账户收到了 10w…… 再对比 Query,它则是查询系统当前状态的一种请求,比如:厨房的灯是打开着的、门铃正在响、查下账户显示余额 11w…… 无期望的 这个怎么理解?事件是客观的描述一个事物的状态或属性值的变化,但对于如何处理事件本身并没有做任何期望。 相比之下,Command 和 Query 则都是有期望的,他们希望系统做出改变或则返回结果,但是 Event 呢,它只是客观描述系统的一个变化。 我们看一个例子:交通信号灯,从绿灯变成红灯,事件本身并没有要求行人或汽车禁止通行,而是交通法规需要红绿灯,并赋予了其规则。 所以,系统一般不会定向的、单独向另外一个系统发送事件,而是统一的告诉“事件中心”,“事件中心”呢,那里面有各个系统上报上来的,各式各样的事件。系统会向事件中心说明:自己这个系统,会产生哪些事件呀,这些事件的格式是怎么样的呀。 别的系统如果感兴趣呢,就可以来主动订阅这些事件。真正赋予事件价值的,是事件消费者。事件消费者想看看,某个系统发生了什么变化呀?OK,那他就去订阅这些事件,所以事件是消费者驱动的。 这跟消息有什么区别呢?Command 消息的发送和订阅,是双方约定好的,外人不知道,往往是以文档或代码的形式,大家按约定好的协议,发送和订阅消费,所以消息是生产者驱动的。 我们打个比喻,事件就像市场经济,商品被生产出来,具体有什么价值,有多大价值,很大程度上看其消费者。我们能看到系统中各种各样的事件,就像橱窗里摆放了各种各样的商品。而 Command 消息呢,有点像计划经济,一出生就带着很强的目的性,我就是要“分配”给谁消费。 天然有序 事件的第三个特性是:“天然有序”。含义:同一个实体,不能同时发生 A 又发生 B,必有先后关系;如果是,则这两个事件必属于不同的事件类型。 比如:针对同一个交通信号灯,不能既变成绿灯,又变成红灯,同一时刻,只能变成一种状态。 大家可能发现了一点,这里其实隐藏了事件的一个额外属性:因为天然有序,跟时间轴上的某一时刻强绑定,且不能同时发生,所以它一定是唯一的。 如果我们看到了两个内容一样的事件,那么一定是发生了两次,而且一次在前,一次在后。(这对于我们处理数据最终一致性、以及系统行为分析都很有价值:我们看到的,不光光是系统的一个最终结果,而是看到变成这个结果之前的,一系列中间过程) 具像化 事件的第四个特性是:“具象化”的。 事件会尽可能的把“案发现场”完整的记录下来,因为它也不知道消费者会如何使用它,所以它会做到尽量的详尽,比如: ●是由谁产生的事件?Subject ●是什么类型的事件?Type ●是谁发送的事件?Source ●事件的唯一性标志是什么?Id ●什么时候发生?Time ●事件的内容是什么?Data ●事件的内容有哪些信息?Dataschema 我们还是以交通信号灯举例子: 对比我们常见的消息,因为上下游一般是确定的,常常为了性能和传输效率,则会做到尽可能的精简,只要满足“计划经济”指定安排的消费者需求即可。 总结一下,事件上面的 4 个特性,是对事件巨大的一个属性加成,让事件拥有了跟普通消息不一样的“超能力”。使事件,常常被用到 4 个典型场景:事件通知、事件溯源、系统间集成和 CQRS。 下面让我们一个个展开,具体看看这些应用场景。 事件的典型应用场景 事件通知 事件通知是我们系统中很常见的一个场景。比如:用户下单事件通知给支付系统;用户付款事件通知给交易系统。 这里,让我们回到一开始信号灯那个例子。当交通信号灯,从红灯变成绿灯时,可能存在很多系统都需要这个信息。 方式 1:发送方主动调用,适配接收方 一种最简单的方式是,我们依次 call 每个系统,并把信息传递出去。比如:信号灯系统,主动调用地图导航的 API 服务、调用交警中控的 API 服务,调用城市大脑的 API 服务,把红绿灯变化信号发送出去。 但我们都知道,这个设计非常糟糕。尤其当系统越来越多时,这无疑是灾难的,不仅开发成本高,而且其中一个系统出现问题,可能会 hang 住整个服务,则导致调用其他系统都会受到影响。 方式 2:接收方主动订阅,适配发送方 一个很自然的解决方案是,我们将这些信息发送到中间消息服务 Broker,其他系统如果有需要,则主动去订阅这些消息即可。 这个时候,信号灯系统与其他系统并没有直接的调用依赖,交警中控服务、地图导航服务、城市大脑服务,只要按照约定的协议,去订阅信号灯的消息,并解析这些信息即可。 但是,这里同样存在一个问题:这个架构中,是以“信号灯”为中心。消费者需要理解发送者的业务领域,并主动添加适配层,(就是图中白色回旋镖部分),将消息转化为自己业务领域内的语言。但对于每一个微服务来说,他都希望都是高内聚低耦合的。 如果交警中控需要全国的信号灯数据,但是每个地域的消息格式又不一样,这就意味着,交警中控需要适配每一个地域的协议,做一层转换。而且万一后面变化了怎么办?想想就知道这个运维成本有多可怕。 那是否交警中控系统,可以要求全国所有红绿灯系统,都按同一种数据协议给到自己呢?不好意思,这些信号灯数据地图服务也在用,城市大脑也在用,不能更改。 方式 3:引入事件,Borker 根据接收方协议,进行灵活适配 但如果使用事件,就不一样了。因为事件是“无期望的”,“具像化的”,天然的保留了案发现场尽可能多的信息,且更加规范标准,对于消费者(也就是交警中空)来说,可以轻易将不同省份,收集上来的事件,轻易组装成,符合自己业务要求的格式。 而且,这一组装,是在中间层 Broker 发生的。对于交警中控来说,它只需要,按照自己业务领域的设计,提供一个接收事件的 API,然后其他事件,通过 Broker,主动投递到这个 API 上即可。从头到尾,对交警中控系统,没有一行适配外部业务的代码。 所以,这种方式有 3 个明显的优势: 1、只关注自己业务领域本身,不需要做适配外部的代码; 2、所有对系统的变更,收敛到 API,为唯一入口;同一个 API,可能既是用来接收事件的,也可能同时用于控制台操作; 3、因为事件是推送过来的,所以,也不需要像之前一样,引入一个 SDK,和 Broker 发生连接,获取消息,降低了系统的复杂度。 这样,我们一开始的图,就会变成这个样子:交通信号灯产生事件,投递到事件中心,其他需要这些事件的消费者,在事件中心订阅,再由事件中心,按照他们期望的事件格式,主动投递过去。 让我们再来回顾下整个过程: 第 1 幅图:一开始,我们通过强依赖的方式,让信号灯系统,主动将信息发送给各个系统。那这张图里,我们是以各个下游服务为中心,信号灯系统去适配各个下游服务。 第 2 幅图:后来,我们采用传统消息的方式,对调用链路进行了解耦,两边系统不再直接依赖了,但是依旧会存在业务上的依赖。消费者需要去理解生产者的消息格式,并在自己系统内部,进行转换适配。所以,这里其实是以生产者为中心。 第 3 幅图:最后,我们引入了事件通知的方式,对于这种方式,生产者和消费者,他们都只需要关注自己系统本身就可以了。生产者,生产什么样的事件,消费者,消费什么样的数据格式,都各自以自己的业务为中心,不需要为对方做适配。真正做到我们说的高内聚低耦合,实现彻底的完全解耦。 现在,回到我们一开始提到的典型微服务模型,对于有些场景,我们就可以变为下面这种方式:对微服务的变更操作,统一收敛到 API 操作入口,去掉 Command 消息入口。收敛入口,对于我们维护微服务,保障系统稳定性,常常非常有好处的。 事件溯源 事件溯源是什么?事件溯源简单理解就是让系统回到过去任意时刻。那怎么样,才能让系统可以回到过去呢?很简单,首先系统所有发生的变化,都得以事件的方式记录下来;然后,我们就可以通过回放事件的方式,回到过去任何一个时刻。 那为什么只有事件才能做这个事,其他普通消息不行呢?这个还是要回到我们刚才说的几个事件特性:已发生不可变的、天然有序且唯一的、而且是非常详细具体的,完整的记录了事件的案发现场。所以,对于事件溯源这个场景,事件可以说是系统的一等一的公民。 举个例子:比如说,如果我们能够完整地收集路上的各种事件信息,包括信号灯、车量、天气、拥堵路况等等,那么,我们就可以“穿越时间”,回到交通现场,重新做一次决策。比如,在智慧交通场景,当我们想去验证一个调度算法的时候,我们就可以回放当时发生的所有事件,来重现现场。 大家可能觉得这个很神奇,但是,其实我们平常一直有接触,大家知道是什么吗?就是我们常用的代码版本管理系统,比如:github。 这里有大家可能会问,如果一个系统积赞了很多事件,想重放是不是得很久?比如在一些交易场景,每天都会产生大量的事件,那应该怎么处理呢?这里呢,系统一般每天晚上都会打一份快照。如果系统意外宕机,想回到某一个时刻,就可以把前一天的快照取出,然后再重新跑下当天的事件,即可恢复。而白天呢,所有的事件都是在内存中进行处理,不会跟数据库交互,所以系统性能非常快,只有事件会落盘。 当然,事件溯源也不是适合所有场景,它有优点也有缺点,详细看上图。 系统间集成 刚才讲的第1个场景:事件通知,一般涉及到两个上下游团队的协作开发;讲的第 2 个场景:事件溯源,则一般是 1 个团队内的开发;但系统间集成,则往往面对的是三个业务团队的协作开发。这个怎么理解呢? 其实这个也很常见:比如公司里购买了 ERP 系统,同时也购买了外部考勤系统、外部营销系统服务等等。这些系统都有一个共同点,是什么?都不是我们自己开发的,是而买来的。 如果我们想把 ERP 系统的人员信息,实时且自动同步到考勤系统中去怎么办?其实这个是有点麻烦的,因为这些都不是我们自己开发的。 1、我们不能修改 ERP 系统的代码,主动去调用考勤系统,把人员变更信息发送过去; 2、也不能修改考情系统的代码,主动去调用外部 ERP 系统的 API; 但是我们可以通过事件总线,借助 webhook 或则标准 API 等等方式,收集上游的 ERP 系统产生的人员变更事件,然后进行过滤和转换,推送到下游考勤系统中去,当然,这里也可以是内部自研服务。 所以,现在的研发模式变成了:事件中心管理了所有 SaaS 服务,包括内部自研系统产生的所有事件。然后呢,我们只需要在事件中心,寻找我们需要的事件,进行订阅,对 SaaS 服务和内部自研系统,进行简单服务编排,即可完成开发。 CQRS CQRS 中的 C 代表 Command,Command 什么意思?就是明令,一般包含:Create/Update/Delete,Q 代表 Query,是指查询。所以 CQRS 本质是读写分离:所有的写操作,在图中左边的系统中完成,然后将系统因为 Command 产生变化的事件,同步到右边的查询系统。 这里同学可能有疑问,这跟数据库的读写分离有什么区别?数据库读写分离也是提供一个写的 DB,一个读的 DB,两边做同步。对吧… 那这里很大的一个区别是:对于数据库的读写分离,是以数据库为中心,两边的数据库是一模一样的,甚至数据的存储结构也是一模一样的。 但是对于 CQRS 的读写分离场景,是以业务为中心,两边存储的数据结构格式,往往是不一样的,甚至数据库都不是同一种。完全围绕各自的读写业务逻辑,设计最佳技术选型。对于写场景,为了保障事务,我们可能使用关系性数据库;对于读的场景,我们为了提高性能,我们可能会使用 Redis、HBase 等 Nosql 数据库。 当然 CQRS 也不是适合所有场景,他往往比较适合: ●希望同时满足高并发的写、高并发的读; ●写模型和读模型差别比较大时; ●读/写比非常高时; 我们刚才讲了事件的 4 个应用场景,但是,事件不是万能的,就像软件研发也没有银弹,有很多场景也并不适合使用事件。包括: 1. 强依赖 Response 的同步调用场景; 2. 要求服务调用保持事务强一致性的场景。 RocketMQ 关于事件的解决方案 需要什么样的能力? 首先,按照之前讲到的事件应用场景,我们整理下,如果我们做好事件驱动这块,我们的系统,需要具备什么样的能力呢? 第一,我们肯定得有一个事件标准,对吧…因为,事件不是给自己看的,也不是给他看的,而是给所有人看的。刚才,我们也讲到事件是无期望的,它没有明确的消费者,所有都是潜在的消费者,所以,我们得规范化事件的定义,让所有人都能看得懂,一目了然。 第二,我们得有一个事件中心,事件中心里面有所有系统,注册上来的各种事件,(这个跟消息不一样,我们没有消息中心,因为消息一般是定向的,是生产者和消费者约定的,有点像计划经济,消息生产出来的时候,带着很强的目的性,是给谁谁消费的。而事件有点像市场经济,事件中心呢,)这个有点类似市场经济大卖场,玲琅满目,里面分类摆放了各种各样的事件,所有人即使不买,也都可以进来瞧一瞧,看一看,有哪些事件,可能是我需要的,那就可以买回去。 第三,我们得有一个事件格式,用来描述事件的具体内容。这相当于市场经济的一个买卖契约。生产者发送的事件格式是什么,得确定下来,不能总是变;消费者以什么格式接收事件也得确定下来,不然整个市场就乱套了。 第四,我们得给消费者一个,把投递事件到目标端的能力。并且投递前,可以对事件进行过滤和转换,让它可以适配目标端 API 接收参数的格式,我们把这个过程呢,统一叫做订阅规则。 第五,我们还得有一个存储事件的地方,就是最中间的事件总线。 事件标准 关于刚才提到的第一点事件标准,我们选取了 CNCF 旗下的开源项目 CloudEvents,目前已被广泛集成,算是一个事实上的标准。 它的协议也很简单,主要规范了 4 个必选字段:id,source、type、specversion;以及多个可选字段:subject、time、dataschema、datacontenttype和data。上图右边,我们有一个简单的例子,大家可以看下,这里就不具体展开了。 另外,事件的传输也需要定义一种协议,方便不同系统之间的沟通,默认支持三种 HTTP 的传输方式:Binary Content Mode、Structured Content Mode 和 Batched Content Mode。通过 HTTP 的 ContentType,就可以区分这三种不同的模式。其中前两种,都是传递单个事件;第三种则是传递批量事件。 事件 Schema 事件的 Schema,用来描述事件中有哪些属性、对应的含义、约束等等信息。目前我们选取了 Json Schema. 和 OpenAPI 3.0,根据事件的 Schema 描述,我们可以对事件进行合法性校验。,当然 Schema 本身的修改,也需要符合兼容性原则,这里不作具体展开。 事件过滤和转换 关于事件的过滤和转换,我们提供了 7 种事件过滤方式和 4 种事件转换方式,详细可以下图描述: 技术架构 我们 RocketMQ 围绕事件驱动推出的产品,叫做 EventBridge,也是我们这次要开源的新产品。 他的整个架构可以分为两部分:上面是我们的控制面、下面是我们的数据面。 控制面中最上面的 EventSource 是各个系统注册上来的事件源,这些事件可以通过 APIGateway 发送事件到事件总线,也可以通过配置的 EventSource,生成 SouceRuner,主动从我们的系统中,去拉取事件。事件到达事件总线 EventBus 之后,我们就可以配置订阅规则了 EventRule,在规则 EventRule 里我们设置了事件怎么过滤,以及投递到目标端前,做哪些转换。系统基于创建的规则会生成 TargetRunner,就可以将事件推送到指定的目标端。 那这里 SouceRuner 和 TargetRunner 是什么呢?我们具体能对接哪些上下游 Source 和 Target? 这些我们都可以在下面的 SourceRegister 和 TargetRegister 提前进行注册。 所以 EventBridge 的数据面是一个开放的架构,他定义了事件处理的SPI,底下可以有多种实现。比如,我们把 RocketMQ 的 HTTPConnector 注册到 EventBridge 中,那我们就可以把事件推送到 HTTP 服务端。 如果我们把 Kafka 的 JDBC Connector 注册到 EventBridge 中,我们就可以把事件推送到数据库。 当然,如果你的系统不是通用的像 HTTP/JDPC 等协议,也可以开发自己的 Connector,这样就能将事件实时同步到 EventBridge,或则接收来自 EventBridge 的事件。 除此之外,我们还会有一些附加的运维能力,包括:事件追踪、事件回放、事件分析、事件归档。 RocketMQEventBridge 与云上 在所有开源的,与其他上下游系统做集成的 Connector 当中,我们有一个特殊的 Connector,叫:EventBridgeConnector,通过它可以方便的和阿里云云上的事件总线进行集成。这里有两个典型的应用场景: 第一个场景是:IDC 系统内部产生的事件,不仅可以用来做内部系统间的解耦,还可以实时同步到云上,驱动云上的一些计算服务,比如通过云上 Maxcompute 对内部产生的事件进行离线分析,或则驱动云上的图像识别服务,实时分析事件中标注的图片。 第二个场景是:如果 IDC 内部使用到了自建 MQ,我们同样可以通过 MQConnector 和 EventBridgeConnector,实时同步事件到云上,逐步将内部自建 MQ,迁移到云上MQ。 生态发展 关于 EventBridge 的未来方向,我们希望是在开源,构建一个支持多云架构的事件总线生态。这个怎么理解?简单来说,我们希望在不同云厂商之间,包括云厂商和内部 IDC 系统之间,可以通过事件,来打破围墙,实现互通。虽然,这几年云计算发展很快,但是对于一些特别大的客户来讲,有时候并不希望跟某家云厂商强绑定。这不光是市场充分竞争的结果,也是大客户一种降低风险的手段。所以,这个时候,如何在不同云厂商之间,包括云厂商系统和自己内部 IDC 系统之间,灵活的交互,甚至灵活的迁移,是企业非常重要的一个诉求。 当然,实现这个是有一定难度的。不过如果我们在进行企业架构设计的时候,是基于事件驱动架构进行设计开发——不同系统之间的交互,围绕事件展开,就会容易很多。 事件,在这里,就好比一种通用语言,通过这个通用语言,就可以实现和不同系统之间的沟通交流。比如:用 IDC 系统内部的事件,去驱动阿里云上服务;甚至用阿里云上的事件,去驱动 AWS 上的服务运行; 为了实现这个目标,我们在和不同云厂商,不同 SaaS 系统服务商,进行系统间集成的时候,需要开发与之对应的连接器。 也欢迎大家,一起来共建 RocketMQEventBridge 的生态。 源码地址:
作者:静罗
#技术探索 #强力推荐