2025年6月18日

乐刻运动:基于 Apache RocketMQ + MQTT 实现健身产业数字化升级
乐刻运动:助推数字经济与健身产业深度融合发展 乐刻运动,2015 年创立于杭州的健身产业互联网平台,以让每个人平等享有运动健康的资源和权利为使命,以每天响应 1 亿人次的运动健康需求为愿景。乐刻以用户运营为核心,构建数智中台,打通场景、用户、教练、服务,对健身产业进行数字化升级改造,提高运营效率和供应链管理能力,搭建健身服务新零售生态,助推数字经济与健身产业深度融合发展。截至 2024 年 12月,乐刻运动已在全国 30 多个城市开设超过 1700 家门店。 乐刻运动在数字化升级中的挑战 在数字化升级过程中,乐刻运动计划在各门店部署物联网设备,覆盖用户进门、签到以及运动器材等场景,以便实时、持续地收集相关信息并对其进行维护管理,从而进一步提升门店运营效率和服务质量。然而,随着业务规模的持续扩大,终端设备数量也在不断增加,带来了以下挑战: 1. 高并发连接与实时监控:鉴于健身房门店众多,且设备种类和数量繁多,需要一个能够支持大规模并发连接的可靠通信架构,确保所有终端设备的状态被及时监测,并迅速响应任何异常情况。 2. 轻量低带宽的消息传输:由于网络资源的限制,在客户端设备与服务端之间的连接上,需要采用一种轻量级、低带宽的消息传输协议,以优化数据传输效率并减少对现有网络基础设施的压力。 3. 高效的消息处理机制:在业务高峰期时,客户端上报数据量大且频率高,需要一套高效的消息处理机制,来避免因服务器应用有限,无法及时消费,而造成消息堆积的问题。 结合 RocketMQ 与 MQTT 的高效解决方案 为应对上述挑战,乐刻运动采用 RocketMQ 与 MQTT 协议相结合的解决方案,显著提升了整体架构的稳定性和可扩展性,提高了消息处理效率,确保了高并发场景下的业务连续性,最终优化了用户体验。 1. MQTT 海量终端数据实时收集:通过 MQTT 协议,系统能够实时收集健身房内各类物联网设备上报的数据,满足高并发需求,确保数据传输的高效与可靠。 2. RocketMQ 消息缓存与负载均衡:将 MQTT 的消息流出挂载到 RocketMQ,通过 RocketMQ 对客户端采集到的大量消息进行消息缓存和负载均衡,从而有效缓解服务端的压力,确保系统的稳定运行。 云消息队列助力乐刻运动数字化升级 在实施上述方案的过程中,乐刻运动选择了阿里云的云消息队列 RocketMQ 版和云消息队列 MQTT 版作为核心消息中间件,这两个产品在实际生产环境中展现出显著的优势和价值。 1. 产品简介 + 云消息队列 RocketMQ 版:云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。 + 云消息队列 MQTT 版:云消息队列 MQTT 版是专为移动互联网(MI)、物联网(IoT)领域设计的消息产品,覆盖直播互动、金融支付、智能餐饮、即时聊天、移动 Apps、智能设备、车联网等多种应用场景;通过对 MQTT、WebSocket 等协议的全面支持,连接端云之间的双向通信,实现 C2C、C2B、B2C 等业务场景之间的消息通信,可支撑千万级设备与消息并发。 2. 实际生产环境中的优势和价值 + 实时数据处理:云消息队列 MQTT 版通过 MQTT 协议实现终端设备与服务器之间的实时通信,利用发布/订阅模式,确保设备状态和用户行为的及时监控和响应,从而实现高效的实时数据处理能力。 + 高度可扩展性:云消息队列 MQTT 版具备强大的横向扩展能力,能够轻松支持海量终端设备和传感器的接入,能够轻松应对健身房规模不断扩大带来的设备接入需求。 + 可靠性和稳定性:云消息队列 MQTT 版的 MQTT 协议支持多种服务质量(QoS)级别,可以根据业务需求选择合适的级别,确保消息可靠传递,防止数据丢失。 + 性能表现卓越:服务端应用之间隐含着对等和任务分摊的关系,云消息队列 RocketMQ 版的集群消费模式提供原生的负载均衡机制,能够提升系统的整体性能,确保高并发场景下的高效稳定运行。 乐刻运动通过采用阿里云的云消息队列 RocketMQ 版和云消息队列 MQTT 版,不仅提升了系统的实时数据处理能力,还增强了系统的可扩展性、可靠性和性能,为业务的持续发展和流畅的用户体验,提供了坚实的技术支持,进一步推动了数字经济与健身产业的深度融合。
#行业实践

2025年6月18日

乐言科技:云原生加速电商行业赋能,基于 Apache RocketMQ 云服务降本 37%
深耕 AI SaaS+,助力数万电商客户数智化转型 上海乐言科技股份有限公司( 以下简称“乐言科技”,官网:https://www.leyantech.com/ )自 2016 年成立以来,专注于利用自然语言处理和深度学习等核心 AI 技术,为电商、金融、医疗、科学等多个垂直领域提供整体解决方案。公司在杭州、广州等地设有分支机构,已成为国内领先的人工智能企业。 深耕行业八年,乐言科技形成了完整的能力栈,发布了“乐言 GPT 大模型”,推进大模型解决方案赋能行业,并与头部品牌合作探索创新应用。公司已申报多个重大项目,获得多项荣誉和认证,并积极参与标准编制,以“引领人工智能技术,为客户创造价值”为使命,持续推动 AI 技术与行业的深度融合。 自研智能客服机器人“乐语助人”日均服务超千万人次 乐言科技致力于提升行业服务效率,核心业务之一是智能客服机器人,面向电商企业提供 AI SaaS+ 服务。其自主研发的电商智能客服机器人“乐语助人”( 官网介绍视频:https://www.leyantech.com/themes/leyan/public/assets/video.mp4 )适用于天猫、淘宝、京东等国内主流电商平台,基于自然语言处理、知识图谱、深度学习等领先的人工智能技术,具备充分的语言理解能力,可以模拟金牌客服的回复逻辑,进行买家咨询接待、业务问题处理、智能推荐、客情维系等工作。在降低人工客服团队营运开支的同时,大幅提升了客服人均接待效率与营销转化率,为电商商家创造了更多利润。 目前,“乐语助人”每天服务超过 2000 万人次,与六万余家电商客户合作,提供 AI SaaS+ 全链路数智化解决方案,助力企业完成数智化转型。上海乐言科技股份有限公司累计 SaaS 软件年收入约十亿元,并积极探索海外市场,推出跨境电商 AIGC 解决方案,服务 400 多万海外店铺。 智能客服机器人业务量激增,自建消息队列面临诸多痛点 在智能客服机器人系统中,“对话消息分发”是核心功能之一,对提高回复效率和处理高并发请求等起到关键作用。 在系统建设初期,由于业务规模较小,开发与运维团队的规模及技术能力有限,乐言科技统一采用自建 Apahce Kafka 作为消息中间件,以实现业务解耦与流量削峰,增强系统的灵活性和可扩展性。同时,Apache Kafka 还作为各数据系统(如 AI、大数据等)之间的数据通道。因此,确保其消息服务流程的顺畅至关重要。 然而,随着业务规模增长和系统复杂度增加,消息处理的精细化需求日益凸显,单一消息中间件架构需额外投入更多技术资源以维持效能,其扩展性与灵活性也逐渐成为系统演进的约束条件。同时,自建 Apache Kafka 集群的运维成本持续攀升,还逐渐暴露出系统稳定性不足、精准投递功能笨重等问题,导致运维压力倍增。 核心痛点如下: + 稳定性和弹性问题:公司核心业务系统共用 Apache Kafka 大集群。不同业务系统对集群的 IO 压力重叠,会造成彼此影响,例如:侧重高吞吐量系统可能会对延迟敏感的系统造成影响。而集群的扩容和缩容需要对分区进行重新均衡,也会对延迟敏感的对话消息造成稳定性影响。 + 运维成本过高:为了应对共用集群带来的影响,公司对 Apache Kafka 集群进行了拆分。然而不同集群每天业务消息量的波峰波谷明显且差值较大,波谷期资源利用率偏低,容易导致资源浪费,造成成本冗余。此外,临时扩容周期长且需大量人力投入。 + 无法精细化消息处理:Apache Kafka 仅充当消息管道,无法根据消息 Tag 进行精准消费和 SQL 过滤。业务系统为满足精准消费的需求,需要增加研发成本,基于 Apache Kafka Topic 进行额外开发,容易出错且灵活性很差,制约了我们新业务模式的展开速度。这在对接大客户的定制化需求时,尤为迫切。 + 消息级别可观测性差:Apache Kafka 无法直接查看每条消息的详情和消费状态,无法满足问题排查和运营支持的需求,需要开发额外工具或系统进行支持。 精准破局:从自建开源消息队列到阿里云消息队列 因此,乐言科技基于消息类型特征与业务逻辑复杂度拆分业务,并精准匹配消息队列选型策略: + 业务解耦与强一致性场景:针对侧重于业务解耦、涉及较多后置逻辑处理的场景(如强一致性、顺序消息等),采用阿里云消息队列 RocketMQ 版 Serverless 系列,以满足高可靠性与确定性需求。 + 实时流处理场景:大数据及日志类实时流处理业务沿用 Apache Kafka 架构,并计划迁移至阿里云消息队列 Kafka 版,以提升资源弹性与成本效益,持续优化技术架构。 对于业务解耦场景,采用云消息队列 RocketMQ 版 Serverless 系列替换自建开源 Apache Kafka,可以实现更高效的精细化消息处理,具体优势如下: 1. 高效实现分布式顺序消息:仅需按照顺序消息的投递 API 和定义顺序消费 Group 组,即可实现分布式顺序消息,相比 Kafka 指定 Partition 投递和消费扩展性强,业务仅需按照所需设置 MessageGroup,实现更灵活,与服务端绑定低。 2. 支持服务端消息过滤:在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务处理,各下游业务的处理逻辑不同,且只关注自身逻辑需要的消息子集。云消息队列 RocketMQ 版支持 Tag 标签过滤和 SQL 属性过滤,使用云消息队列 RocketMQ 版的消息过滤功能,可以帮助消费者更高效地过滤自己需要的消息集合,避免大量无效消息投递给消费者,降低下游系统处理压力。实现降低客户端的开发工作量和处理流量。 3. Serverless 系列弹性降本:云消息队列 RocketMQ 版 Serverless 系列能够通过资源动态伸缩,实现资源使用量与实际业务负载贴近,并支持按照实际使用量计费,无需按照最高峰值预留资源,有效降低运维的压力和使用成本。 采用云消息队列 RocketMQ 版 Serverless 系列,整体降本 37% 1. 保障业务稳定 通过使用云消息队列 RocketMQ 版 Serverless 系列替换自建开源 Apache Kafka,成功实现业务拆分解耦与流量隔离,有效避免了业务流量冲突导致的中间件并发问题。云消息队列 RocketMQ 版提供 99.99% 服务可用性和容灾保障,显著提升了整体业务的稳定性和连续性。 2. 降低开发成本 借助云消息队列 RocketMQ 版 Serverless 系列的顺序消息与消息过滤能力,将复杂的分布式顺序消息场景简化,有效减少了业务逻辑的复杂性,降低了开发成本。 3. 提升运维效率 基于云消息队列 RocketMQ 版提供的丰富的 Metrics 和 Trace 可观测工具,构建了完整的运维体系,极大提升了日常问题排查和巡检效率。 4. 资源弹性降本 云消息队列 RocketMQ 版 Serverless 系列采用动态资源调整策略,根据实时业务负载自动弹性伸缩,按量付费,无需预先估算并配置实例规格。通过将对话引擎、基础数据服务等业务迁移至云消息队列 RocketMQ 版 Serverless 系列,整体成本相较于之前降低了 37%。 云原生生态深度赋能乐言科技架构升级与创新突破 乐言科技依托云原生架构及阿里云云原生产品体系,实现基础设施与业务解耦以及弹性调度,在提升业务稳定性的同时,显著增加研发效能并降低运维成本,加速电商客户定制化需求交付,推动云计算与 AI 技术在电商领域的深度融合。 + 在大促等流量突增场景中,云原生架构通过秒级自适应弹性扩容,保障业务连续性,结合微服务引擎 MSE Nacos 的自动扩缩容和节点自愈能力,系统抗风险能力显著提升。MSE Nacos 团队基于双版本(社区与商业)维护经验持续优化商业产品的核心能力,比如性能提升、配置标签灰度、推空保护、配置中心的传输和存储加密,进一步提升微服务可用性与安全性。 + 在智能客服场景中,乐言科技采用日志服务 SLS 替代原有的自建日志系统,统一采集与存储多平台的客服沟通记录,以便用于数据分析驱动产品演进。相比自建日志系统,SLS 凭借高可用性与高吞吐量优势,有效解决了业务增长带来的存储成本激增、稳定性不足及人力投入过高等问题,显著降低综合运维成本。同时,为了进一步观测云上资源使用情况,使用企业云监控导出云上监控数据,与实际业务需求相结合,为构建智能化运维体系提供强有力的支撑。 面对 AI 技术发展与海外市场拓展等机遇,乐言科技将深化与阿里云的合作,基于业务需求迭代云原生架构,深度应用云原生产品,助力电商客户实现数智化转型,持续推动 AI 技术在行业应用中的创新突破。 :
#行业实践

2025年6月16日

EventBridge 构建智能化时代的企业级云上事件枢纽
产品演进历程:在技术浪潮中的成长之路 早在 2018 年,Gartner 评估报告便将事件驱动模型(EventDriven Model)列为十大战略技术趋势之一,指出事件驱动架构(EDA,Eventdriven Architectures)将成为微服务架构未来的演进方向。 随着云原生与 Serverless 技术的迅猛发展,2020 年,阿里云重磅发布事件总线 EventBridge,构建了云原生环境下的统一事件枢纽。事件总线 EventBridge 支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助企业轻松构建松耦合、分布式的事件集成驱动架构。 自 6 月 3 日起,阿里云事件总线 EventBridge 正式商业化。历经五年迭代升级,事件总线 EventBridge 在产品功能和用户体验方面不断优化,积累了丰富的规模化生产实践经验。在数据智能化时代,事件总线 EventBridge 持续深耕企业云上事件集成,适用于各种规模和行业的事件集成场景。通过 Event 桥接各个系统,满足 AI 和企业集成等领域的各种数据集成需求,为企业提供便捷且创新的数据集成解决方案。 产品核心特性:构建企业级事件中枢的关键 从诞生到商业化,事件总线 EventBridge 不断优化产品核心特性,致力于定义企业级事件集成标准,为企业构建云上事件枢纽提供坚实可靠的支撑和保障。 + 稳定与安全:依托海量数据传输及运维经验,提供高稳定性且安全合规的企业集成服务; + 性能与成本:提供高性能且性价比高的企业集成方案,显著降低用户数据集成成本; + 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成; + 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; + 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; + 流式事件通道:事件流提供轻量、实时、端到端的流式数据处理,对源端产生的事件进行实时抽取、转换和分析并加载至目标端。 多元应用场景:覆盖各类企业事件集成场景 EDA 事件驱动场景 事件总线 EventBridge 通过事件连接应用程序、云服务和 Serverless 服务,构建事件驱动架构(EDA,Eventdriven Architectures),实现应用与应用、应用与云服务之间的高效连接。 流式 ETL 场景 在企业集成场景中,事件总线 EventBridge 可以作为流式数据管道,提供基础的过滤与转换功能,支持不同数据仓库、数据处理程序、数据分析与处理系统之间的数据同步和跨地域备份,连接不同系统与服务。 AI 数据集成场景 事件总线 EventBridge 提供非结构化数据到结构化数据的链路集成,可处理多种数据源,如关系型数据库、API 数据、文件、ODPS 等,并支持将数据向量化后存储至向量数据库或其他数仓,同时支持数据清洗、转换和规范化。为 RAG 和模型数据准备等场景,提供一站式数据集成服务。 统一事件通知服务 事件总线 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,用户可以直接监听云产品产生的数据,并上报至监控和通知等下游服务,实现高效的事件管理和响应。 商用计费体系:计费模式灵活匹配企业需求 事件总线 EventBridge 已于 6 月 3 日起正式商业化,分为事件总线、事件流两类资源进行计费,计费模式如下: 计费组成说明 事件总线 EventBridge 各资源的计费组成信息请参见下图。 事件总线计费说明 事件总线【1】分为云服务专用事件总线和自定义事件总线。 + 云服务专用事件总线:是系统自动创建用于接收阿里云官方事件源的事件且不可修改的内置事件总线。 + 自定义事件总线:需要你自行创建并管理的事件总线。 事件源单价 | 计费项 | 计费单价 | | | | | 云服务专用总线事件发布 | 免费 | | 自定义总线事件发布 | 5.64元/百万次事件发布 | 每个 64KB 大小的事件计为 1 个事件。 事件目标单价 | 计费项 | 计费单价 | | | | | 阿里云服务目标事件推送/通知 | 免费 | | 自定义目标事件推送 | 1.29元/百万次事件推送 | 每个 64KB 大小的事件计为 1 个事件。 事件流计费说明 事件流【2】是端到端的流式事件通道,适用于端到端的流式数据处理场景。事件流属于收费服务,并支持按事件量计费和按 CU 额度计费,这两种计费【3】方式你只需二选一即可。 按事件量计费 | 计费项 | 计费单价 | | | | | 事件量 | 2.8元/百万条 | | 空置资源占用费 | 1元/天 | + 事件量:事件流拉取上游数据源的事件总量。每个 64KB 大小的事件计为 1 个事件。计算公式:事件量(向上取整)=单个事件大小/64KB。例如:批量投递了 300KB 的单个事件,则按照 5 个事件进行计费。若事件发生重试,则按照配置的重试规则每次重试计为 1 个事件。 + 空置资源占用量:若单个事件流在一个月内没有任何数据流入将会按照计费标准收取空置资源占用费,空置资源占用费将在有数据流入或者删除该事件流后取消计费。 按 CU 配额计费 名词介绍 CU 配额: CU(Capacity Unit)是事件总线任务的容量单位。若采用 CU 规格计费,则每个任务必须至少分配 1CU 作为最小配额。亦可在任务创建时指定允许弹性的最大 CU 规格和最小 CU 规格。 计费项和计费单价 + 计费项:CU 配额 + 计费单价:0.24元/小时/CU 说明 单个 CU 可支持的条件(条件为或): 1. 最大支持每秒事件量(EPS)5000Event/s(实际情况受限于链路上下游性能)。 2. 最大支持的峰值吞吐量(BPS)50MB/s 的容量。 为了方便你快速评估费用,事件总线(EventBridge)提供了价格计算器:事件总线(EventBridge)价格计算器【4】。 如果你在使用事件总线 EventBridge 的过程中有任何反馈或疑问,欢迎加入钉钉用户群(钉钉群号:31481771)与阿里云研发团队即时沟通。 【1】事件总线 https://help.aliyun.com/zh/eventbridge/userguide/eventbusoverview 【2】事件流 https://help.aliyun.com/zh/eventbridge/userguide/eventstreamoverview 【3】计费 https://help.aliyun.com/zh/eventbridge/productoverview/billingofeventstreams 【4】事件总线(EventBridge)价格计算器 https://eventbridge.console.aliyun.com/calculator
#社区动态

2025年6月11日

Apache RocketMQ + “太乙” = 开源贡献新体验
Apache RocketMQ 是 Apache 基金会托管的顶级项目,自 2012 年诞生于阿里巴巴,服务于淘宝等核心交易系统,历经多次双十一万亿级数据洪峰稳定性验证,至今已有十余年发展历程。RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 Apache RocketMQ 的茁壮成长离不开全球 800 多位开发者的积极参与和贡献。如今,Apache RocketMQ 开源社区将携手"太乙"平台,共同开启一场开源贡献竞赛,为广大开发者提供一个全新的体验平台和参与机会!新一轮开源竞赛于 6 月 1 日正式启动。 关于“太乙”平台 "太乙"是特色化示范性软件学院年度质量检测指标开源数据的官方唯一指定获取平台,服务于示范性软件学院联盟( https://www.pses.com.cn/home/publicresource )204 家高校成员单位。 “太乙”平台( https://www.taiyi.top/ )是浙江大学软件学院自主研发的开源能力评价与服务系统,提供开发者开源贡献价值评价与开源竞赛等服务,旨在精准衡量开发者的贡献价值、影响力和技能水平。 平台通过系统化分析开发者在开源社区中的各类可量化贡献,构建起定性与定量相结合的全维度评价体系,对开发者进行全面刻画。 + 在定性评价方面,平台从影响力、贡献度、语言能力、项目经验和活跃度五个维度对开发者进行宏观审视; + 在定量评价方面,则依据项目重要性、贡献类型、内容关键性、贡献体量及复杂度等指标,并结合程序语言分析与自然语言处理等技术,提供精准、客观、自动化的价值评估,充分认可每一份贡献。 “太乙”平台联合各大公司与知名社区,全年不间断、滚动式发布开源竞赛。基于科学的价值自动化评价系统,参赛者可根据贡献价值大小等比例获得奖金,甚至获得头部企业的实习与就业机会,太乙系统能够支持低成本、高效率的长周期开源竞赛组织,激发开发者的积极性,推动中国开源生态的发展。 Apache RocketMQ x “太乙” 开源竞赛 __ 本轮竞赛于 2025 年 6 月 1 日 启动,竞赛链接: https://www.taiyi.top/competitiondetails?id=6836d00651c0e4a2bd63770c 我们联合太乙平台,为 Apache RocketMQ 的开发者和学习者提供了一站式引导服务。我们不仅提供了详尽的原理介绍、文档说明以及部署教程,还特别设计了一键式自动搭建体验环境,帮助开发者轻松体验 Apache RocketMQ 的部署流程和消息收发过程。 为了进一步促进开发者融入社区生态,我们还精心准备了一份贡献指南:涵盖了社区生态、入门指引、issue 推荐等等。相信通过参与本次开源竞赛,开发者们能够快速掌握如何在 Apache RocketMQ 开源社区作出贡献,顺利成为一名 Contributor!欢迎开发者们踊跃参与~ 你可以获得的 参与 Apache RocketMQ x “太乙”开源竞赛,你将获得宝贵的开源社区贡献经历、可量化的贡献奖金以及成为 Apache 顶级社区 Contributor/Committer/PMC 的成长机会,这些都将为你未来的职业发展提供强大助力! 为确保开源竞赛奖金分配机制的公平性与合理性,兼顾开发者与项目发起方的共同利益,“太乙”平台采用了一套科学的公式来计算可分配的奖金额度。随着开发者提交贡献的增加,可分配的奖金总额将随之上涨,最高达 5000 元。 此次竞赛活动将进一步促进 Apache RocketMQ 技术生态的繁荣与发展,为参与者创造更多学习、交流与成长的机会。我们诚邀每一位热衷于分布式消息处理技术探索与实践的开发者加入,在实践中不断提升自我,共同推动 Apache RocketMQ 的技术进步与社区发展。 让我们在开源的道路上,携手共进,创造不凡! 点击:了解太乙开源服务系统更多详情
#社区动态

2025年5月29日

Apache RocketMQ 源码解析 —— 秒级定时消息介绍
背景 如今rocketmq的应用场景日益拓宽,延时消息的需求也在增加。原本的特定级别延时消息已经不足以支撑rocketmq灵活的使用场景。因此,我们需要一个支持任意时间的延迟消息feature。 支持任意时间延迟的feature能够让使用者在消息发出时指定其消费时间,在生活与生产中具有非常重要的意义。 目标 1. 支持任意时延的延迟消息。 2. 提供延迟消息可靠的存储方式。 3. 保证延迟消息具有可靠的收发性能。 4. 提供延迟消息的可观测性排查能力。 架构 存储数据结构 本方案主要通过时间轮实现任意时延的定时消息。在此过程中,涉及两个核心的数据结构:TimerLog(存储消息索引)和TimerWheel(时间轮,用于定时消息到时)。 TimerLog,为本RIP中所设计的定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息: | 名称 | 大小 | 备注 | | | | | | size | 4B | 保存记录的大小 | | prev_pos | 8B | 前一条记录的位置 | | next_Pos | 8B | 后一条记录的位置,暂时为1,作为保留字段 | | magic | 4B | magic value | | delayed_time | 4B | 该条记录的定时时间 | | offset_real | 8B | 该条消息在commitLog中的位置 | | size_real | 4B | 该条消息在commitLog中的大小 | | hash_topic | 4B | 该条消息topic的hash code | | varbody | | 存储可变的body,暂时没有 | TimerWheel是对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。 时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。时间轮的每一格设计如下: | delayed_time(8B) 延迟时间 | first_pos(8B) 首条位置 | last_pos(8B) 最后位置 | num(4B)消息条数 | | | | | | 上述设计的TimerLog与TimerWheel的协作如下图所示。 pipeline 在存储方面,采用本地文件系统作为可靠的延时消息存储介质。延时消息另存TimerLog文件中。通过时间轮对定时消息进行定位以及存取。针对长时间定时消息,通过消息滚动的方式避免过大的消息存储量。其具体架构如下所示: 从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下: 1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(TIMER_TOPIC)的定时消息。 1. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。 2. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。 2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。 1. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。 2. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。 3. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。 代码实现 TimerLog与TimerWheel的协作实现 定时消息的核心存储由TimerLog和TimerWheel协同完成。TimerLog作为顺序写入的日志文件,每条记录包含消息在CommitLog中的物理偏移量(offsetPy)和延迟时间(delayed_time)。当消息到达时,TimerEnqueuePutService会将其索引信息追加到TimerLog,并通过prev_pos字段构建链表结构,确保同一时刻的多个消息可被快速遍历。 ```java // TimerLog.java 核心写入逻辑 public long append(byte[] data, int pos, int len) { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 处理文件切换:当当前文件剩余空间不足时,填充空白段并创建新文件 if (len + MIN_BLANK_LEN mappedFile.getFileSize() mappedFile.getWrotePosition()) { ByteBuffer blankBuffer = ByteBuffer.allocate(MIN_BLANK_LEN); blankBuffer.putInt(mappedFile.getFileSize() mappedFile.getWrotePosition()); blankBuffer.putLong(0); // prev_pos置空 blankBuffer.putInt(BLANK_MAGIC_CODE); // 标记空白段 mappedFile.appendMessage(blankBuffer.array()); mappedFile = this.mappedFileQueue.getLastMappedFile(0); // 切换到新文件 } // 写入实际数据并返回物理偏移量 long currPosition = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); mappedFile.appendMessage(data, pos, len); return currPosition; } ``` 此代码展示了消息追加的核心流程: 1. 检查当前文件的剩余空间,不足时填充空白段并创建新文件 2. 将消息索引数据写入内存映射文件 3. 返回写入位置的全局偏移量,供时间轮记录 时间轮槽位管理 TimerWheel通过数组结构管理时间槽位,每个槽位记录该时刻的首尾指针和消息数量。当消息入队时,putSlot方法会更新对应槽位的链表结构: ```java // TimerWheel.java 槽位更新逻辑 public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) { localBuffer.get().position(getSlotIndex(timeMs) Slot.SIZE); localBuffer.get().putLong(timeMs / precisionMs); // 标准化时间戳 localBuffer.get().putLong(firstPos); // 链表头指针 localBuffer.get().putLong(lastPos); // 链表尾指针 localBuffer.get().putInt(num); // 当前槽位消息总数 localBuffer.get().putInt(magic); // 特殊标记(如滚动/删除) } ``` 该方法的实现细节: + getSlotIndex(timeMs)将时间戳映射到环形数组的索引 + 同时记录首尾指针以实现O(1)复杂度插入 消息入队流程 TimerEnqueueGetService:消息扫描服务 该服务作为定时消息入口,持续扫描CommitLog中的TIMER_TOPIC消息。其核心逻辑通过enqueue方法实现: ```java // TimerMessageStore.java public boolean enqueue(int queueId) { ConsumeQueueInterface cq = this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId); ReferredIterator iterator = cq.iterateFrom(currQueueOffset); while (iterator.hasNext()) { CqUnit cqUnit = iterator.next(); MessageExt msgExt = getMessageByCommitOffset(cqUnit.getPos(), cqUnit.getSize()); // 构造定时请求对象 TimerRequest timerRequest = new TimerRequest( cqUnit.getPos(), cqUnit.getSize(), Long.parseLong(msgExt.getProperty(TIMER_OUT_MS)), System.currentTimeMillis(), MAGIC_DEFAULT, msgExt ); // 放入入队队列(阻塞式) while (!enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) { if (!isRunningEnqueue()) return false; } currQueueOffset++; // 更新消费进度 } } ``` 关键设计点: 1. 增量扫描:通过currQueueOffset记录消费位移,避免重复处理 2. 消息转换:将ConsumeQueue中的索引转换为包含完整元数据的TimerRequest` TimerEnqueuePutService:时间轮写入服务 从队列获取请求后,该服务执行核心的定时逻辑: ```java // TimerMessageStore.java public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) { // 计算目标时间槽位 Slot slot = timerWheel.getSlot(delayedTime); // 构造TimerLog记录 ByteBuffer buffer = ByteBuffer.allocate(TimerLog.UNIT_SIZE); buffer.putLong(slot.lastPos); // 前驱指针指向原槽位尾 buffer.putLong(offsetPy); // CommitLog物理偏移 buffer.putInt(sizePy); // 消息大小 buffer.putLong(delayedTime); // 精确到毫秒的延迟时间 // 写入TimerLog并更新时间轮 long pos = timerLog.append(buffer.array()); timerWheel.putSlot(delayedTime, slot.firstPos, pos, slot.num + 1); } ``` 写入优化策略: + 空间预分配:当检测到当前MappedFile剩余空间不足时,自动填充空白段并切换文件 + 链表式存储:通过prev_pos字段构建时间槽位的倒序链表,确保新消息快速插入 + 批量提交:积累多个请求后批量写入,减少文件I/O次数 TimerEnqueuePutService从队列获取消息请求,处理消息滚动逻辑。当检测到延迟超过时间轮窗口时,将消息重新写入并标记为滚动状态: ```java // TimerMessageStore.java 消息滚动处理 boolean needRoll = delayedTime tmpWriteTimeMs = (long) timerRollWindowSlots precisionMs; if (needRoll) { magic |= MAGIC_ROLL; // 调整延迟时间为时间轮窗口中间点,确保滚动后仍有处理时间 delayedTime = tmpWriteTimeMs + (long) (timerRollWindowSlots / 2) precisionMs; } ``` 此逻辑的关键设计: 1. 当延迟时间超过当前时间轮容量时触发滚动 2. 将消息的delayed_time调整为窗口中间点,避免频繁滚动 3. 设置MAGIC_ROLL标记,出队时识别滚动消息 消息出队处理 TimerDequeueGetService:到期扫描服务 该服务以固定频率推进时间指针,触发到期消息处理: ```java // TimerMessageStore.java public int dequeue() throws Exception { // 获取当前时间槽位 Slot slot = timerWheel.getSlot(currReadTimeMs); // 遍历TimerLog链表 long currPos = slot.lastPos; while (currPos != 1) { SelectMappedBufferResult sbr = timerLog.getTimerMessage(currPos); ByteBuffer buf = sbr.getByteBuffer(); // 解析记录元数据 long prevPos = buf.getLong(); long offsetPy = buf.getLong(); int sizePy = buf.getInt(); long delayedTime = buf.getLong(); // 分类处理(普通消息/删除标记) if (isDeleteMarker(buf)) { deleteMsgStack.add(new TimerRequest(offsetPy, sizePy, delayedTime)); } else { normalMsgStack.addFirst(new TimerRequest(offsetPy, sizePy, delayedTime)); } currPos = prevPos; // 前向遍历链表 } // 分发到处理队列 splitAndDispatch(normalMsgStack); splitAndDispatch(deleteMsgStack); moveReadTime(); // 推进时间指针 } ``` TimerDequeuePutMessageService:消息投递服务 TimerDequeuePutMessageService将到期消息重新写入CommitLog。对于滚动消息,会修改主题属性并增加重试计数: ```java // TimerMessageStore.java 消息转换逻辑 MessageExtBrokerInner convert(MessageExt messageExt, long enqueueTime, boolean needRoll) { if (needRoll) { // 增加滚动次数标记 MessageAccessor.putProperty(messageExt, TIMER_ROLL_TIMES, Integer.parseInt(messageExt.getProperty(TIMER_ROLL_TIMES)) + 1 + ""); } // 恢复原始主题和队列ID messageInner.setTopic(messageInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC)); messageInner.setQueueId(Integer.parseInt( messageInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); return messageInner; } ``` 此转换过程确保: + 滚动消息保留原始主题信息 + 每次滚动增加TIMER_ROLL_TIMES属性 该服务最终将到期消息重新注入CommitLog: ```java // TimerMessageStore.java public void run() { while (!stopped) { TimerRequest req = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS); MessageExtBrokerInner innerMsg = convert(req.getMsg(), req.needRoll()); // 消息重投递逻辑 int result = doPut(innerMsg, req.needRoll()); switch (result) { case PUT_OK: // 成功:更新监控指标 timerMetrics.recordDelivery(req.getTopic()); break; case PUT_NEED_RETRY: // 重试:重新放回队列头部 dequeuePutQueue.putFirst(req); break; case PUT_NO_RETRY: // 丢弃:记录错误日志 log.warn("Discard undeliverable message:{}", req); } } } ``` 故障恢复机制 系统重启时通过recover方法重建时间轮状态。关键步骤包括遍历TimerLog文件并修正槽位指针: ```java // TimerMessageStore.java 恢复流程 private long recoverAndRevise(long beginOffset, boolean checkTimerLog) { List mappedFiles = timerLog.getMappedFileQueue().getMappedFiles(); for (MappedFile mappedFile : mappedFiles) { SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); ByteBuffer bf = sbr.getByteBuffer(); while (position Google Doc: + Shimo:
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— Controller 高可用切换架构
一、原理及核心概念浅述 1.1 核心架构 1.2 核心概念 1. controller:负责管理broker间的主备关系,可以挂在namesrv中,不影响namesrv能力,支持独立部署。 2. master/slave:主备身份。 3. syncStateSet:字面意思为“同步状态集合”。当备节点能够及时跟上主节点,则会纳入syncStateSet。 4. epoch:用于记录每一次主备切换时的状态,避免切换后产生数据丢失或者不一致的情况。 为方便理解,在某些过程中可以把controller当作班主任,master作为小组长,slave作为小组成员。同步过程是各位同学向小组长抄作业的过程,位于syncStateSet中的是优秀作业。 二、相关代码文件及说明 核心是“controller+broker+复制过程”,因此分三块进行叙述。 2.1 Controller 该部分代码主要集中在rocketmqcontroller模块下,主要有如下代码文件: + ControllerManager: 负责管理controller,其中存储了许多controller相关配置,并负责了心跳管理等核心功能。(班主任管理条例) + DLederController: Controller的DLedger实现。包含了controller的基本功能。在其中实现了副本信息管理、broker存活情况探测、选举Master等核心功能。(某种班主任) + DefaultBrokerHeartbeatManager: 负责管理broker心跳,其中包含了broker存活情况表,以及在broker下线时的listeners,当副本掉线时,触发重新选举。(点名册) + ReplicasInfoManager: 负责controller中事件的处理。即各种选举事件、更换SyncStateSet事件等等。(小组登记册) + ControllerRequestProcessor: 处理向controller发送的requests,例如让controller选举、向controller注册broker、心跳、更换SyncStateSet等等。(班主任信箱) + DefaultElectPolicy: 选举Master的策略。可以选择从sync状态的副本中选,也可以支持从所有副本中(无论是否同步)的unclean选举。(班规) + ...... 2.2 Broker 该部分代码主要集中在rocketmqbroker模块中,可进入org/apache/rocketmq/broker/controller进行查看: + ReplicasManager: 完成自己作为一个replica的使命——找controller,角色管理,Master更新(Expand/Shrink)SyncStateSet等等。 2.3 复制模块 该部分代码主要集中在rocketmqstore模块中的ha文件夹下: + HAService: 每个Replica必备的的service,负责管理作为主、备的同步任务。 + HAClient: 每个Slave 的HAService中必备的client,负责管理同步任务中的读、写操作。 + HAConnection: 代表在Master中的HA连接,每个connection理论上对应一个slave。在该connection类中存储了传输过程中的诸多内容,包括channel、传输状态、当前传输位点等等信息。 三、核心流程 3.1 心跳 核心CODE:BROKER_HEARTBEAT Broker端: 该部分较简单,带上code向controller发request,不再赘述: BrokerController.sendHeartbeat() brokerOuterAPI.sendHeartbeat() Controller端: 1. 首先由ControllerRequestProcessor接收到code,进入处理逻辑: ```java private RemotingCommand handleBrokerHeartbeat(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final BrokerHeartbeatRequestHeader requestHeader = (BrokerHeartbeatRequestHeader) request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); if (requestHeader.getBrokerId() == null) { return RemotingCommand.createResponseCommand(ResponseCode.CONTROLLER_INVALID_REQUEST, "Heart beat with empty brokerId"); } this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerId(), requestHeader.getHeartbeatTimeoutMills(), ctx.channel(), requestHeader.getEpoch(), requestHeader.getMaxOffset(), requestHeader.getConfirmOffset(), requestHeader.getElectionPriority()); return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "Heart beat success"); } ``` 2. 之后在onBrokerHeartbeat()中,主要更新controller brokerHeartbeatManager中的brokerLiveTable: ```java public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId, Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) { BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId); BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo); ...... if (null == prev) { this.brokerLiveTable.put(...); log.info("new broker registered, {}, brokerId:{}", brokerIdentityInfo, realBrokerId); } else { prev.setXXX(......) } } ``` 3.2 选举 相关CODE: CONTROLLER_ELECT_MASTER 有如下几种情形可能触发选举: 1. controller主动发起,通过triggerElectMaster(): 1. HeartbeatManager监听到有broker心跳失效。 (班主任发现有小组同学退学了) 2. Controller检测到有一组Replica Set不存在master。(班主任发现有组长虽然在名册里,但是挂了) 2. broker发起将自己选为master,通过ReplicaManager.brokerElect(): 1. Broker向controller查metadata时,没找到master信息。(同学定期检查小组情况,问班主任为啥没小组长) 2. Broker向controller注册完后,仍未从controller获取到master信息。(同学报道后发现没小组长,汇报) 3. 通过tools发起: 1. 通过选举命令ReElectMasterSubCommand发起。(校长直接任命) 上述所有过程,最终均触发: controller.electMaster() replicasInfoManager.electMaster() // 即,所有小组长必须通过班主任任命 ```java public ControllerResult electMaster(final ElectMasterRequestHeader request, final ElectPolicy electPolicy) { ... // 从request中取信息 ... if (syncStateInfo.isFirstTimeForElect()) { // 从未注册,直接任命 newMaster = brokerId; } // 按选举政策选主 if (newMaster == null) { // we should assign this assignedBrokerId when the brokerAddress need to be elected by force Long assignedBrokerId = request.getDesignateElect() ? brokerId : null; newMaster = electPolicy.elect(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), syncStateSet, allReplicaBrokers, oldMaster, assignedBrokerId); } if (newMaster != null && newMaster.equals(oldMaster)) { // 老主 == 新主 // old master still valid, change nothing String err = String.format("The old master %s is still alive, not need to elect new master for broker %s", oldMaster, brokerReplicaInfo.getBrokerName()); LOGGER.warn("{}", err); // the master still exist response.setXXX() result.setBody(new ElectMasterResponseBody(syncStateSet).encode()); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_STILL_EXIST, err); return result; } // a new master is elected if (newMaster != null) { // 出现不一样的新主 final int masterEpoch = syncStateInfo.getMasterEpoch(); final int syncStateSetEpoch = syncStateInfo.getSyncStateSetEpoch(); final HashSet newSyncStateSet = new HashSet<(); //设置新的syncStateSet newSyncStateSet.add(newMaster); response.setXXX()... ElectMasterResponseBody responseBody = new ElectMasterResponseBody(newSyncStateSet); } result.setBody(responseBody.encode()); final ElectMasterEvent event = new ElectMasterEvent(brokerName, newMaster); result.addEvent(event); return result; } // 走到这里,说明没有主,选举失败 // If elect failed and the electMaster is triggered by controller (we can figure it out by brokerAddress), // we still need to apply an ElectMasterEvent to tell the statemachine // that the master was shutdown and no new master was elected. if (request.getBrokerId() == null) { final ElectMasterEvent event = new ElectMasterEvent(false, brokerName); result.addEvent(event); result.setCodeAndRemark(ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE, "Old master has down and failed to elect a new broker master"); } else { result.setCodeAndRemark(ResponseCode.CONTROLLER_ELECT_MASTER_FAILED, "Failed to elect a new master"); } return result; } ``` 3.3 更新SyncStateSet 核心CODE: CONTROLLER_ALTER_SYNC_STATE_SET 1. 由master发起,主动向controller更换syncStateSet(等价于小组长汇报优秀作业) 2. controllerRequestProcessor接收更换syncStateSet的请求,进入handleAlterSyncStateSet()方法: ```java private RemotingCommand handleAlterSyncStateSet(ChannelHandlerContext ctx, RemotingCommand request) throws Exception { final AlterSyncStateSetRequestHeader controllerRequest = (AlterSyncStateSetRequestHeader) request.decodeCommandCustomHeader(AlterSyncStateSetRequestHeader.class); final SyncStateSet syncStateSet = RemotingSerializable.decode(request.getBody(), SyncStateSet.class); final CompletableFuture future = this.controllerManager.getController().alterSyncStateSet(controllerRequest, syncStateSet); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); } return RemotingCommand.createResponseCommand(null); } ``` 3. 之后进入Controller.alterSyncStateSet() replicasInfoManager.alterSyncStateSet()方法: ```java public ControllerResult alterSyncStateSet( final AlterSyncStateSetRequestHeader request, final SyncStateSet syncStateSet, final BrokerValidPredicate brokerAlivePredicate) { final String brokerName = request.getBrokerName(); ... final Set newSyncStateSet = syncStateSet.getSyncStateSet(); final SyncStateInfo syncStateInfo = this.syncStateSetInfoTable.get(brokerName); final BrokerReplicaInfo brokerReplicaInfo = this.replicaInfoTable.get(brokerName); // 检查syncStateSet是否有变化 final Set oldSyncStateSet = syncStateInfo.getSyncStateSet(); if (oldSyncStateSet.size() == newSyncStateSet.size() && oldSyncStateSet.containsAll(newSyncStateSet)) { String err = "The newSyncStateSet is equal with oldSyncStateSet, no needed to update syncStateSet"; ... } // 检查是否是master发起的 if (!syncStateInfo.getMasterBrokerId().equals(request.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the current leader is:{%s}, not {%s}", syncStateInfo.getMasterBrokerId(), request.getMasterBrokerId()); ... } // 检查master的任期epoch是否一致 if (request.getMasterEpoch() != syncStateInfo.getMasterEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current master epoch is:{%d}, not {%d}", syncStateInfo.getMasterEpoch(), request.getMasterEpoch()); ... } // 检查syncStateSet的epoch if (syncStateSet.getSyncStateSetEpoch() != syncStateInfo.getSyncStateSetEpoch()) { String err = String.format("Rejecting alter syncStateSet request because the current syncStateSet epoch is:{%d}, not {%d}", syncStateInfo.getSyncStateSetEpoch(), syncStateSet.getSyncStateSetEpoch()); ... } // 检查新的syncStateSet的合理性 for (Long replica : newSyncStateSet) { // 检查replica是否存在 if (!brokerReplicaInfo.isBrokerExist(replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't exist", replica); ... } // 检查broker是否存活 if (!brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerReplicaInfo.getBrokerName(), replica)) { String err = String.format("Rejecting alter syncStateSet request because the replicas {%s} don't alive", replica); ... } } // 检查是否包含master if (!newSyncStateSet.contains(syncStateInfo.getMasterBrokerId())) { String err = String.format("Rejecting alter syncStateSet request because the newSyncStateSet don't contains origin leader {%s}", syncStateInfo.getMasterBrokerId()); ... } // 更新epoch int epoch = syncStateInfo.getSyncStateSetEpoch() + 1; ... // 生成事件,替换syncStateSet final AlterSyncStateSetEvent event = new AlterSyncStateSetEvent(brokerName, newSyncStateSet); ... } ``` 4. 最后通过syncStateInfo.updateSyncStateSetInfo(),更新syncStateSetInfoTable.get(brokerName)得到的syncStateInfo信息(该过程可以理解为班主任在班级分组册上找到了组长的名字,拿出组员名单,更新)。 3.4 复制 该部分较复杂,其中HAService/HAClient/HAConnection以及其中的各种Service/Reader/Writer容易产生混淆,对阅读造成阻碍。因此绘制本图帮助理解(可在粗读源码后回头理解): 下面对HA复制过程作拆解,分别讲解: 1. 在各个replica的DefaultMessageStore中均注册了HAService,负责管理HA的复制。 2. 在Master的 HAService中有一个AcceptSocketService, 负责自动接收各个slave的连接: ```java protected abstract class AcceptSocketService extends ServiceThread { ... / Starts listening to slave connections. @throws Exception If fails. / public void beginAccept() throws Exception { ... } @Override public void shutdown(final boolean interrupt) { ... } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); Set selected = this.selector.selectedKeys(); if (selected != null) { for (SelectionKey k : selected) { if (k.isAcceptable()) { SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); if (sc != null) { DefaultHAService.log.info("HAService receive new connection, " + sc.socket().getRemoteSocketAddress()); try { HAConnection conn = createConnection(sc); conn.start(); DefaultHAService.this.addConnection(conn); } catch (Exception e) { log.error("new HAConnection exception", e); sc.close(); } } } ... } } ``` 3. 在各个Slave 的HAService中存在一个HAClient,负责向master发起连接、传输请求。 ```java public class AutoSwitchHAClient extends ServiceThread implements HAClient { ... } public interface HAClient { void start(); void shutdown(); void wakeup(); void updateMasterAddress(String newAddress); void updateHaMasterAddress(String newAddress); String getMasterAddress(); String getHaMasterAddress(); long getLastReadTimestamp(); long getLastWriteTimestamp(); HAConnectionState getCurrentState(); void changeCurrentState(HAConnectionState haConnectionState); void closeMaster(); long getTransferredByteInSecond(); } ``` 4. 当master收到slave的连接请求后,将会创建一个HAConnection,负责收发内容。 ```java public interface HAConnection { void start(); void shutdown(); void close(); SocketChannel getSocketChannel(); HAConnectionState getCurrentState(); String getClientAddress(); long getTransferredByteInSecond(); long getTransferFromWhere(); long getSlaveAckOffset(); } ``` 5. Master的HAConnection会与Slave的HAClient建立连接,二者均通过HAWriter(较简单,不解读,位于HAWriter类)往socket中写内容,再通过HAReader读取socket中的内容。只不过一个是HAServerReader,一个是HAClientReader: ```java public abstract class AbstractHAReader { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); protected final List readHookList = new ArrayList<(); public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) { int readSizeZeroTimes = 0; while (byteBufferRead.hasRemaining()) { ... boolean result = processReadResult(byteBufferRead); ... } } ... protected abstract boolean processReadResult(ByteBuffer byteBufferRead); } ``` 6. 两种HAReader均实现了processReadResult()方法,负责处理从socket中得到的数据。client需要详细阐述该方法,因为涉及到如何将读进来的数据写入commitlog,client的processReadResult(): ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { int readSocketPos = byteBufferRead.position(); try { while (true) { ... switch (AutoSwitchHAClient.this.currentState) { case HANDSHAKE: { ... // 握手阶段,先检查commitlog完整性,截断 } break; case TRANSFER: { // 传输阶段,将body写入commitlog ... byte[] bodyData = new byte[bodySize]; ... if (bodySize 0) { // 传输阶段,将body写入commitlog AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length); } haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); ... break; } default: break; } if (isComplete) { continue; } } // 检查buffer中是否还有数据, 如果有, compact() ... break; } } ... } ``` 7. server的processReadResult()主要用于接收client的握手等请求,较简单。更需要解释其WriteSocketService如何向socket中调用HAwriter去写数据: ```java abstract class AbstractWriteSocketService extends ServiceThread { ... private void transferToSlave() throws Exception { ... int size = this.getNextTransferDataSize(); if (size 0) { ... buildTransferHeaderBuffer(this.transferOffset, size); this.lastWriteOver = this.transferData(size); } else { // 无需传输,直接更新caught up的时间 AutoSwitchHAConnection.this.haService.updateConnectionLastCaughtUpTime(AutoSwitchHAConnection.this.slaveId, System.currentTimeMillis()); haService.getWaitNotifyObject().allWaitForRunning(100); } } @Override public void run() { AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { this.selector.select(1000); switch (currentState) { case HANDSHAKE: // Wait until the slave send it handshake msg to master. // 等待slave的握手请求,并进行回复 break; case TRANSFER: ... transferToSlave(); break; default: ... } } catch (Exception e) { ... } } ... // 在service结束后的一些事情 } ... } ``` 此处同样附上server实现processReadResult(),读socket中数据的代码: ```java @Override protected boolean processReadResult(ByteBuffer byteBufferRead) { while (true) { ... HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)]; switch (slaveState) { case HANDSHAKE: // 收到了client的握手 ... LOGGER.info("Receive slave handshake, slaveBrokerId:{}, isSyncFromLastFile:{}, isAsyncLearner:{}", AutoSwitchHAConnection.this.slaveId, AutoSwitchHAConnection.this.isSyncFromLastFile, AutoSwitchHAConnection.this.isAsyncLearner); break; case TRANSFER: // 收到了client的transfer状态 ... // 更新client状态信息 break; default: ... } ... } ``` 3.5 Active Controller的选举 该选举主要通过DLedger实现,在DLedgerController中通过RoleChangeHandler.handle()更新自身身份: ```java class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler { private final String selfId; private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_")); private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER; public RoleChangeHandler(final String selfId) { this.selfId = selfId; } @Override public void handle(long term, MemberState.Role role) { Runnable runnable = () { switch (role) { case CANDIDATE: this.currentRole = MemberState.Role.CANDIDATE; // 停止扫描inactive broker任务 ... case FOLLOWER: this.currentRole = MemberState.Role.FOLLOWER; // 停止扫描inactive broker任务 ... case LEADER: { log.info("Controller {} change role to leader, try process a initial proposal", this.selfId); int tryTimes = 0; while (true) { // 将会开始扫描inactive brokers ... break; } } }; this.executorService.submit(runnable); } ... } ```
#技术探索

2025年5月29日

Apache RocketMQ 源码解析 —— POP 消费模式逻辑介绍
一、背景 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 1. Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主动从Broker拉取消息。 2. Push模式:需要注册消息监听器,当有消息到达时会通过回调函数进行消息消费,从表面上看就像是Broker主动推送给消费者一样,所以叫做推模式,底层依旧是消费者从Broker拉取数据然后触发回调函数进行消息消费,只不过不需要像Pull模式一样不断判断是否有消息到来。 不过不管是Pull模式还是Push模式,在集群模式下,一个消息队列只能分配给同一个消费组内的某一个消费者进行消费,所以需要进行Rebalance负载均衡为每个消费者分配消息队列之后才可以进行消息消费。 Rebalance的工作是在每个消费者端进行的,消费端负责的工作太多,除了负载均衡还有消费位点管理等功能,如果新增一种语言的支持,就需要重新实现一遍对应的业务逻辑代码。 除此以外,在RocketMQ 5.0以前负载均衡是以消息队列为维度为每个消费者分配的,一个消息队列只能分给组内一个消费者消费,所以会存在以下问题: (1)队列一次只能分给组内一个消费者消费,扩展能力有限。 (2)消息队列数量与消费者数量比例不均衡时,可能会导致某些消费者没有消息队列可以分配或者某些消费者承担过多的消息队列,分配不均匀; (3)如果某个消费者hang住,会导致分配到该消费者的消息队列中的消息无法消费,导致消息积压; 在RocketMQ 5.0增加了Pop模式消费,将负载均衡、消费位点管理等功能放到了Broker端,减少客户端的负担,使其变得轻量级,并且5.0之后支持消息粒度的负载均衡。当前支持任意时延的定时消息已在开源社区开源,而其支持的任意时延定时特性可以帮助POP进行一定优化。主要优化点在于InvisibleTime的设置。 二、原理 POP模式下,消息的消费有如下几个过程: 客户端发起POP消费请求Broker处理POP请求,返回消息客户端消费完成,返回ACKBroker处理Ack请求,在内部进行消费完成的位点更新。 可以用下图表示: 三、核心代码 3.1 核心Processor 3.1.1 PopMessageProcessor 该线程用于处理客户端发出的POP请求。在收到POP请求后,将处理该请求,找到待POP的队列: ```java int index = (PermName.isPriority(topicConfig.getPerm()) ? i : randomQ + i) % queueIdList.size(); int queueId = queueIdList.get(index); if (brokerController.getBrokerConfig().isMarkTaintWhenPopQueueList()) { this.brokerController.getPullMessageProcessor().getPullRebalanceManager() .addOrUpdateTaint(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId); } if (queueId = 0 && queueId 1) { // 通过位运算,标记指定下标的CK消息被ACK标记。 markBitCAS(pointWrapper.getBits(), indexOfAck); } ``` 在上述存储完成后,后续通过其它线程进行处理。处理包括CK、AK的存储,以及抵消等操作。主要在3.2节中阐述。 3.1.3 ChangeInvisibleTimeProcessor 该Processor主要用于修改消息不可见时间。此外,其中还定义了一些实用的方法,例如CK的buffer存储方法,以及持久化CK的方法。此处不过多展开。 3.2 核心Service 3.2.1 PopReviveService 该Service被AckMessageProcessor启动。该Service主要对重试队列中的消息进行merge,尝试消除此前没有CK、ACK的消息。 在该Service中,会定期消费Revive队列中的CK、ACK对。在其中会维护一个map,将CK和ACK进行存放与检索: ```java if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { ...... // 将CK存放入map map.put(point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt(), point); point.setRo(messageExt.getQueueOffset()); ...... } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) { ...... // 从map中检索是否有CK AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); String mergeKey = ackMsg.getT() + ackMsg.getC() + ackMsg.getQ() + ackMsg.getSo() + ackMsg.getPt(); PopCheckPoint point = map.get(mergeKey); ...... } ``` 在完成消息的取出后,会进行revive过程中未ACK消息的重试,以及位点提交: 3.2.2 PopBufferMergeService 该Service主要对Buffer中的CK、ACK进行检查。根据检查结果将展开不同的操作。此外,在该Service中还定义了许多实用的CK、ACK操作方法,能够对CK、ACK进行检查以及存储等相关操作。 该Service会启动一个线程,定期执行`scan()`方法。此外,在扫描达到一定次数之后,将执行`scanGarbage()`方法清理数据。 在`scan()`中将会执行如下操作: 1. 获取buffer中所有键值对,作为CK集合。 ```java Iterator iterator = buffer.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); PopCheckPointWrapper pointWrapper = entry.getValue(); ``` 2. 针对每一个CK集合,检查其中的每个CK是否均被ACK,如果已经被ACK,则删除该CK集合。 ```java if ((pointWrapper.isJustOffset() && pointWrapper.isCkStored()) || isCkDone(pointWrapper) || (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored())) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck done, {}", pointWrapper); } iterator.remove(); counter.decrementAndGet(); continue; } ``` 3. 针对未被ACK的CK,将CK、ACK信息进行存储,待一定时间后进行重试。 ```java if (removeCk) { // put buffer ak to store if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, false); countCk++; } if (!pointWrapper.isCkStored()) { continue; } for (byte i = 0; i < point.getN(); i++) { // reput buffer ak to store if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { if (putAckToStore(pointWrapper, i)) { count++; markBitCAS(pointWrapper.getToStoreBits(), i); } } } ``` 在上述操作中,CK与ACK能够成对存储,且定时重试。直到消费者发送回成对的ACK、CK请求,这对请求方可抵消,也象征该消息被消费成功。 代码详解 可参考如下PPT中的介绍内容:
#技术探索

2025年5月12日

开源之夏 2025|Apache RocketMQ 社区项目期待你的参与!
开源之夏 2025 开源之夏是由中国科学院软件研究所“开源软件供应链点亮计划”发起并长期支持的一项暑期开源活动,旨在鼓励在校学生积极参与开源软件的开发维护,培养和发掘更多优秀的开发者,促进优秀开源软件社区的蓬勃发展,助力开源软件供应链建设。开源之夏于 2020 年正式发起,开源之夏 2025 是第六届活动。 活动联合各大开源社区,针对重要开源软件的开发与维护提供项目开发任务,并向全球高校学生开放报名。学生可自主选择感兴趣的项目进行申请,中选后在项目开发者(社区导师)的指导下进行开发。 通过参与活动,不仅可以结识开源界小伙伴和技术大牛,获得社区导师的专业指导,与开源项目开发者深度交流,还能获得丰富的项目实践经验,提升项目开发技能,为学习深造提供助力,为职业发展积攒履历。 此外,根据项目的难易程度和完成情况,结项者将获取开源之夏活动劳务报酬和结项证书。项目难度分为基础和进阶两档,对应结项劳务报酬分别为:税前 8000 元人民币和税前 12000 元人民币。 Apache RocketMQ 社区项目 Apache RocketMQ 于 2012 年诞生于阿里巴巴核心电商系统,于 2016 年捐赠给 Apache 基金会,于 2017 年成为 Apache 顶级项目。现 Apache RocketMQ 致力于构建低延迟、低成本、高可用的分布式“消息、事件、流”统一处理平台,覆盖“云、边、端”⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生 / AI 原生应用。 本次开源之夏共提供 2 个课题项目: 1. RocketMQ 主备副本利用系统内置 Topic 完成元数据增量同步,项目社区导师:金融通 2. RocketMQ 路由反向更新机制,项目社区导师:ShannonDing 如何参与项目? 欢迎扫描上方海报二维码,查看 Apache RocketMQ 社区项目详情,其中有项目导师的姓名与联系邮箱,可通过邮件与导师进行沟通,并准备项目申请材料、提交项目申请,每位同学可以申请一个项目。 以下是开源之夏的活动流程,更多参与指南请查看
#社区动态

2024年12月20日

Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。 2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。 在评审出的 10 个年度开源项目中,Apache RocketMQ 成功入选。 Apache RocketMQ 社区近况 Apache RocketMQ 创新论文连续被软件工程顶级会议录用 (1)2024 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 锁性能优化论文《Beyond the Bottleneck: Enhancing HighConcurrency Systems with Lock Tuning》被 CCFA 类软件工程顶级会议 FM 2024 录用。 高并发系统常常面临性能瓶颈,主要是由于线程间激烈竞争锁导致的等待和上下文切换。作为一家云计算公司,我们非常重视性能的最大化。为此,我们对轻量级自旋锁进行了改进,并提出了一种简洁的参数微调策略,能够在最低风险条件下突破系统性能瓶颈。该策略在高吞吐量消息队列系统 Apache RocketMQ 中得到了验证,实现了 X86 CPU 性能提升 37.58% 和 ARM CPU 性能提升 32.82%。此外,我们还确认了这种方法在不同代码版本和 IO 刷新策略下的一致有效性,显示出其在实际应用中的广泛适用性。这项工作不仅为解决高并发系统的性能问题提供了实用工具,还突显了形式化技术在工程问题解决中的实际价值。 (2)2023 年 9 月,由阿里云消息队列团队发表的关于 RocketMQ 高可用范式设计论文《RocketHA: A Logbased Storage High Availability Paradigm for Messaging and Streaming Scenarios》被软件工程 CCFA 类顶级会议 ASE 2023 录用。 该论文详细探讨了 RocketMQ 在其发展历程中所蕴含的高可用性设计理念,凝聚了团队在行业应用中积累的宝贵经验。为了应对分布式系统中常见的故障,如崩溃和网络分区,RocketHA 提出了一种基于日志存储的高可用性设计框架。该框架由六个基本组件构成,旨在实现系统在面对各种故障时的自动集群恢复。具体而言,RocketHA 通过模块化设计,实现了消息、事件及流场景的高可用性,确保系统能够在发生意外故障时迅速且有效地恢复。此外,该设计还优先考虑了高吞吐量与数据丢失防护,以保障系统在进行大规模数据处理时的稳定性和可靠性。评估结果表明,RocketMQ 在多种负载和故障场景下都表现出卓越的高可用性和快速恢复能力。本文提出的 RocketHA 的设计理念可为其他基于日志存储的系统提供参考和借鉴,推动相关领域的研究与开发。 GSoC(Google Summer of Code) 2024 在谷歌主办的 GSoC 2024 中,Apache RocketMQ 开源社区共提报通过两个选题: 1. RocketMQ Dashboard Supports RocketMQ 5.0 Architecture and Enhances Usability:该题目旨在强化 RocketMQ 的开源控制台能力。 2. Optimizing Lock Mechanisms in Apache RocketMQ:该题目旨在优化锁行为,优化 RocketMQ 的性能以及资源占用。 两个题目均成功结项,第一个题目为 Apache RocketMQ 发布了 rocketmqdashboard 2.0.0,自此RocketMQ Dashboard 支持 Apache RocketMQ 5.0 。第二个题目创新性地提出了 ABS 锁,为轻量化的自旋锁提供了一套退避策略,从而实现低成本、有限制的锁自旋行为,同时适应不同强度的资源争抢情况 Apache RocketMQ 社区 5.3.0、5.3.1 版本发布 Apache RocketMQ 社区近期发布了 5.3.0 和 5.3.1 两个版本,两个版本主要修复现有的 bug 并提升系统的整体稳定性和性能。值得一提的是,Apache RocketMQ 5.3.0 引入了 Apache RocketMQ ACL 2.0 支持,为用户带来了更加灵活和安全的访问控制机制。这些改进和新增功能将显著提升 Apache RocketMQ 在生产环境中的稳定性和安全性,进一步满足用户的业务需求。 Apache RocketMQ 中文社区全新升级 2024 年 7 月,Apache RocketMQ 中文社区(https://rocketmq.io)全新升级,致力于为每一位热衷于 RocketMQ 技术探索与实践的开发者,打造一个集时效性、全面性、深度于一体的一站式学习平台。 最全最新资讯: Apache RocketMQ 中文社区提供从基础到深入的全面学习资料,涵盖原理介绍、架构解读、源码分析等基础知识,高级性能使用、技术前沿探索、场景最佳实践等博客文章,用户反馈的真实答疑样例等,并及时更新版本发布、架构演进和功能迭代等社区动态,以及社区相关活动和会议信息,为您提供更多学习和交流的机会。 智能专家答疑: Apache RocketMQ 中文社区基于 Apache RocketMQ 领域专业知识库,并结合先进的大模型技术进行优化,为您提供 AI 问答助手,作为您的智能学习伴侣。通过自然语言问答,让您的疑问得到迅速解答,使您的学习之旅更加轻松有趣。 关于 Apache RocketMQ RocketMQ 致力于构建低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,覆盖云边端⼀体化数据处理场景,帮助企业和开发者在智能化时代,轻松构建事件驱动架构的云原生应用。 RocketMQ 自 2012 年诞生于阿里巴巴集团的核心交易链路,至今已经历十余年“双十一”的万亿级数据洪峰验证。2015 年,阿里云面向企业提供商业化的消息队列服务,其中包括云消息队列 RocketMQ 版。2016 年,阿里巴巴向 Apache 软件基金会捐赠了 RocketMQ 项目,RocketMQ 进入 Apache 孵化器。2017 年,Apache RocketMQ 成为 Apache 顶级项目,在开源消息中间件领域占据领导地位。2022 年,Apache RocketMQ 5.0 正式发布,全面拥抱云原生架构、超融合架构,进一步拓展事件驱动、物联网等场景。
#社区动态

2024年11月18日

基于Apache RocketMQ 事件驱动架构的 AI 应用实践
AI 应用在商业化服务的阶段会面临诸多挑战,比如更快的服务交付速度,更实时、精准的结果以及更人性化的体验等,传统架构限制于同步交互,无法满足上述需求,本篇文章给大家分享一下如何基于事件驱动架构应对上述挑战。 盘点 AI 应用场景 在深入探讨事件驱动架构跟 AI 结合前,我们先梳理一下 AI 应用的现状。 从应用架构层面,大致可以把 AI 应用分为以下三类: 1)基于基础模型的扩展应用,典型的如 ChatGpt(文本生成)、StableDiffusion(图像生成)、CosyVoice(声音生成)等,这类应用通常会以模型能力为核心,提供相对原子化的服务。 2)智能知识库应用,如 Langchain chatchat,这类应用是以 LLM 为核心,基于 RAG(增强检索技术)构建的具有广泛的业务场景的应用。 3)智能体应用,智能体应用核心要点是应用以 LLM 为交互中枢,能够通过工具的调用联通外部世界,复杂的表现形式如多智能体协作等,是企业 AI 应用落地最具想象空间的一类应用。 浅析 AI “原生” 说到“原生”二字,它代表的是对某种概念的广泛认知,比如提移动原生应用立马可以联想到手机端的 APP,提云原生应用很多开发者立马可以想到容器化等,而对于 AI “原生”,除了 ChatGpt,Midjourney 等几款头部 AI 应用,我们似乎还没有看到像移动应用那样广泛的“原生”应用被定义出来,当然今天也没有办法给出明确的结论,只是通过一些事实,帮大家推演 AI “原生”的方向,希望能够帮助慢慢凝聚在内心中那个对“AI 原生”的影像。 AI 给应用架构带来的变化 当 AI 能力加入后,我们的应用架构发生了较大的变化。RAG,Agent 等编程范式被引入,传统的工作流也因为有了 AI 节点,变得与以往有所不同。 AI 应用架构RAG AI 应用架构Agent 加入 AI 节点的工作流 AI 应用的变化趋势 从观察知名 AI 厂商的产品形态演进看,AI 应用由前面提到的基础模型扩展、智能知识库、智能体三类叠加又相对分离,在慢慢向由智能体统一管控约束的方向发展。 比如 Open AI 的 Canvas,Claude Artifacts,Vercel v0 等产品特性。它们都表现出了一系列的共性:智能内核,多模态,LUI 交互。 从另外一个角度理解,AI 原生的应用只有突破之前的用户体验才有可能让用户买单。分散的基础模型能力,多模态能力都只能在某些场景下有体验提升,某些方面甚至不如传统应用的用户体验。所以需要整合,将对话式交互,智能模型和多模态叠加从而构建出超越传统应用的使用体验。 使用事件驱动构建 AI 原生 这里并不是单纯为了追求技术的先进性而使用事件驱动架构,是因为实践中顺序式的架构有时候无法满足业务需求。 传统顺序式的架构在构建 AI 原生的挑战 顺序调用无法保障推理体验 模型服务的推理耗时远高于传统意义的网络服务调用,比如在文生图这个场景下使用 StableDiffusion 服务,即使经过算法优化后最快也是秒级,并发量较大的时候,会很容易导致服务器宕机。此外如声音的合成,数字人的合成等耗时可能是分钟级的,此时顺序调用明显就不太合适。选择事件驱动的架构可以快速响应用户,推理服务按需执行,这样既能够保障用户体验,同时也降低系统宕机风险。 顺序调用无法支持实时数据构建的需求 在智能问答系统中,结果的好坏跟数据有很大的关系。问答召回数据的实时性和准确性很大程度影响着智能问答系统的用户体验,从系统架构层面,问答和数据的更新是分开的。靠人工去更新海量数据不现实,通过设置定时任务以及构建知识库数据更新的工作流能够更加有效的解决数据实时更新的问题,事件驱动架构在这个场景下优势非常明显。 双向互动场景无法实现 在问答服务场景下,拟人化的行为能够得到用户好感从而扩展商机,传统的问答式应用架构相对机械死板,而使用消息队列作为信息传输可以有效主动触达用户,通过合理的意图判断,主动向用户问好,是有效的留存手段。 事件驱动构建 AI 原生的实践 接下来分享一下基于事件驱动架构构建的 AI 应用的一些实践。 StableDiffusion 异步推理 前面提到了关于文生图模型 StableDiffusion 在服务客户中遇到的问题,我们利用事件驱动架构,使用函数计算和轻量消息队列(原 MNS)构建了 StableDiffusion 的异步推理架构,用户请求到来时经过函数计算网关到达 API 代理函数,API 代理函数对请求进行打标鉴权,之后将请求发送到 MNS 队列,同时记录请求的元数据和推理信息到表格存储 TableStore,推理函数根据任务队列进行消费,调度 GPU 实例启动 StableDiffusion 进行服务,结束后返回图片结果以及更新请求状态,端侧通过页面上的轮询告知用户。 VoiceAgent 实时对话 这是一个相对复杂的应用,使用者可以通过语音跟背后的智能问答服务实时对话,同时还能够接收到来自智能服务的主动询问。 整体依然采用事件驱动架构,其 RTC Server 部分安装 rocketmqclient,订阅中心化的服务 topic,由定时任务(主要是意图分析)触发向队列 topic 生产消息内容,然后由 rocketmqclient 消费,进行主动询问。 VoiceAgent 知识库实时数据流 对于问答的另外一端,知识库的自动更新,则是通过 Catch Data Capture 策略,比如由外部系统数据源触发,或者通过将文档上传 OSS 触发。数据经过切片,向量化之后存储到向量数据库以及全文检索数据库。 面向 AI 原生应用的事件驱动架构 最后分享一下作为 AI 应用开发者的一套组合方案:通过 阿里云云应用平台 CAP(Cloud Application Platform) 选出基础模型服务,如 Ollama,ComfyUI,Cosyvoice,Embedding 等进行快速托管,使用 RcoketMQ,Kafka,MNS, Eventbridge 等搭建数据流管道和消息中心,本地利用 Spring AI Alibaba 等框架开发后端服务,实现 RAG,Agent 等能力。前端使用 Nextjs 框架构建界面,之后将开发好的前后端通过 Serverless Devs 工具部署到 CAP 平台,进行线上调用访问,最终上生产采用云原生网关保驾护航,对于长期的知识库或者智能体的运维则通过 ARMS 进行指标监控。
作者:寒斜
#技术探索