2022年10月12日

RocketMQ 5.0 API 与 SDK 的演进
RocketMQ 5.0 SDK 采用了全新的 API,使用 gRPC 作为通信层的实现,并在可观测性上做了很大幅度的提升。 全新统一的 API 此处的 API 并不单单只是接口上的定义,同时也规定了各个接口不同的方法和行为,明确了整个消息模型。 RocketMQ 过去的 API 从第一版开始,至今已经过了很长时间,长期依赖是一个缺乏变革的状态,对于一些中途打算废弃或者变更的 API 也没有进行后续的迭代。此外,接口的定义也不够清晰。因此,RocketMQ 希望在 5.0 中能够建立一个统一的规范,精简整个 API,通过引入 builder 模式来引入更多的不变性,同时做好异常管理,给开发者和用户一个更加清爽的面貌。 目前 C++ 和 Java 已经进行了 5.0 API 的定义与实现,更多语言的支持也陆续在路上了。我们也欢迎更多的开发者可以参与到社区的工作中来。这里给出 5.0 客户端的仓库链接: 除了在上述接口上做的一些修改之外, RocketMQ 5.0 还规定了四种新的不同的客户端类型,即 Producer/Push Consumer/Simple Consumer/Pull Consumer。 其中 Pull Consumer 还在开发中;Producer 主要还是做了接口裁剪,规范了异常管理。在功能上其实并没有做一些颠覆性的改变。Push Consumer 也是比较类似的;Simple consumer 将更多的权利将下发给用户,是一种用户可以主动控制消息接收与处理过程的消费者,特别的,5.0 的 SDK 中,Push Consumer 和 Simple Consumer 都采用 RocketMQ 的 pop 机制进行实现,一些社区的同学可能已经熟悉了。 如果用户并不一定想控制或者关心整个消息的接收过程,只在乎消息的消费过程的话,这个时候 Push Consumer 可能是一个更好的选择。 RocketMQ 5.0 定义了四种不同的消息类型。过去的开源版本中其实我们并没有去突出消息类型这样一个概念,后续出于维护及运维方面的需要以及模型定义的完备,才让今天的 5.0 有了消息类型的这样一个概念。 1、NORMAL:普通消息。 2、FIFO:满足先入先出的语义。用户可以通过定义 message group 来控制消息间的消费顺序。例如图中的 fruit 这个 topic 下,可以定义不同的 message group,在 apple 这个 message group 下,会按照发送顺序决定消息被消费的顺序,并且不同的 message group 之间不会互相干扰。 3、TRANSACTIONAL:可以将一条或多条消息包成一个事务,最终用户可以根据自己的业务结果选择提交或者回滚。 4、DELAY:用户可以自主地设置消息的定时时间,相比之前开源版本仅允许用户设置定时/延迟级别,5.0 的实现中还讲允许用户设置更精确的时间戳。 以上四种消息是互斥的,我们会在 topic 的元数据去标识它的类型。实际在消息发送的时候如果如果出现了尝试发送的消息类型与 topic 类型不匹配的情况,也会做一些限制。 实现 RocketMQ 5.0 在客户端的启动过程中提前进行了更多的准备工作。比如用户提前设置要发送消息的 topic 时,Producer 会在启动过程中尝试获取对应 topic 的路由。在过去的客户端实现中,在针对于某个 topic 第一次发送消息时,需要先获取路由,这里就会有一个类似冷启动的过程。 提前获取 Topic 的路由信息有两点好处: 1. 不阻塞后面的发送,让消息的发送仅仅触发发送这一个动作。 2. 错误前置,比如用户要往一个不存在 Topic 发送消息时,因为路由的获取参与到整个客户端的启动过程,获取路由不成功,那整个客户端启动可能就会失败,用户也是拿不到对应的 Producer 对象的。 类似的,Consumer 的启动也会有这样的一个过程。 除此之外,我们在客户端和服务端之间增加了一个 Telemetry 的部分,它会在客户端和服务端之间建立起了一个进行双向数据通讯的通道,客户端和服务端会在这个过程中沟通配置,比如服务端可以实现对客户端配置的下发,更好地管理客户端。此外,Telemetry 也可以将本地配置主动上报给服务端,让服务端也可以对客户端的设置有更好的了解。Telemetry 通道也会在客户端启动时尝试建立,如果这个通道没有建立成功,也会影响客户端的启动。 总的来说,客户端的启动过程会尽可能将所有准备工作做好。同时在客户端和服务端之间建立 Telemetry 这样一个通讯通道。 客户端内部存在一些周期性的任务,比如路由的定时更新以及客户端往服务端发送心跳等。对于上文中提到的 Telemetry 过程中,客户端的配置上报也是周期性的。 Producer 在 RocketMQ 5.0 中的具体工作流程 消息在发送时,会检查是否已经获取对应 topic 的路由信息。如果已经获取,则尝试在路由中选取队列,随即查看要发送的消息的类型是否与 topic 类型匹配,如果匹配,则进行消息发送。如果发送成功,则返回;否则,判断当前重试次数是否超出用户设置的上限,如果超出,则返回失败;否则轮转到下一个队列,然后对新的队列进行重试直到消费次数超出上线。而如果启动过程中没有提前获取路由,那么消息发送时依然会先尝试获取路由,然后再进行下一步操作。 另外一点相对于老客户端较大的改变在于,客户端从底层 RPC 交互到上层的业务逻辑全部采用异步实现。Producer 依然会提供一个同步发送接口和异步发送接口,但同步的方法也是使用异步来实现,整个逻辑非常统一和清爽。 Push Consumer 分为两部分,消息的接收和消费。 消息接收流程为:客户端需要不断地从服务端拉取消息,并将消息缓存。Push Consumer 会将消息先缓存到客户端的本地,再进行消费,因此它会判断客户端本地的 Cache 是否已满,如果已满,则隔一段时间再判断,直到消息被客户端消费,Cache 尚有余量时再进行消息拉取。为了避免出现一些内存问题,Cache 的大小也是被严格限制的。 消息消费过程分为两个类型,顺序类型和非顺序类型。 其中非顺序类型即并发消费。消费者会先从 Cache 中获取消息,然后尝试消费消息,消费后再将消息从 Cache 中移除。消息消费成功时,会尝试将消息 ACK ,消费流程结束;如果消费失败,则尝试修改消息的可见时间,即决定下一次什么时候可见。 顺序消费指对于同一个 Group 的消息,最终消费时一定是前一条消息被消费过并且得到确认后,后面的消息才能够继续消费。而消费过程与非顺序消费类似,首先尝试从 Cache 中拉取消息,如果消费成功,则将消息 ACK。ACK 成功后,将其从 Cache 中移除。特别地,如果消费失败,会 suspend 一段时间,然后继续尝试对消息进行消费。此时会判断消费次数是否超限,如果超限,则会尝试将消息放入死信队列中。 相对于非顺序消费,顺序消费更复杂,因为其需要保证前一个消息消费成功后才能对后面的消息进行消费。顺序消费的消费逻辑是基于 message group 隔离的。message group 会在发送时做哈希,从而保证 message group 的消息最终会落在一个队列上,顺序消费模式本质上保证队列内部消费的顺序。 此外,因为不同 message group 的顺序消息最终可能会映射到同一个队列上,这可能会导致不同的 message group 之间的消费形成阻塞,因此服务端未来会实现一个虚拟队列,让不同的 message group 映射到客户端的虚拟队列,保证他们之间没有任何阻塞,从而加速数据消息的消费过程。 对于 Simple Consumer,用户可以主动控制消息接收和确认的流程。比如用户收到消息后,可以根据业务决定是否过一段时间再消费该消息,或者不需要再收到该消息。消费成功后将消息 ACK 掉,如果失败则主动修改可见时间,选择该消息下一次什么时候可见,即由用户自发地控制整个过程。 可观测性 Shaded Logback 因为历史原因,RocketMQ 的老客户端并不是面向 SLF4J 进行编程的,而是面向 logback 的。这么做的目的其实是为了方便快捷地获取日志,不需要让用户自己去手动配置。 RocketMQ 中专门有一个 logging 模块是负责日志部分的,像用户自己使用了 logback ,RocketMQ SDK 如果也直接去使用 logback,两者就会产生各种各样的冲突,这个 logging 模块就是用来保证这一层隔离性的。 但是 logging 模块本身的实现并不是很优雅,也带来了一定的维护成本。因此我们采用了 shade logback 的方式来达到上文提到的隔离性。shaded logback 不仅能够避免用户的 logback 与 RocketMQ 自己的 logback 冲突,还能保持较好的可维护性,将来要想在日志上去做一些修改,也来得容易的多。 具体来说,用户的 logback 会采用 logback.xml 的配置文件,通过 shade logback, RocketMQ 5.0 的客户端会使用 rocketmq.logback.xml 的配置文件,因此在配置部分就已经完全隔离了,同时在 shade 的过程中,还对原生 logback 中使用到的一些环境变量和系统变量也进行了修改,这样就保证了两者的彻底隔离。 另外,使用 shadeed logback 之后,RocketMQ 5.0 客户端中的日志部分就全都是面向 SLF4J 来进行编程的了,这样一来,如果我们未来想让用户自己去完全控制日志的话,提供一个去除 logback 的 SDK 就可以了,非常方便。 Trace 5.0 的消息轨迹基于 OpenTelemetry 模型进行定义与实现,消息发送或接收消息的流程被定义为一个个独立的 span ,这一套 span 规范参照了 OpenTelemetry 关于 Messaging 的定义。图中这里 Process P 表示 Producer ,Process C 表示 Consumer。消息的全生命周期,从发送到接收到消费,就可以具象化为这样一个个的 span。 比如,针对 Push Consumer 而言,先会有一个 receive 的 span 来表示从服务端获取消息的过程,收到消息后到会先等待消息被处理,这个也就是 await span 表示的过程,消息被处理则对应图中的 process span,消息消费结束之后,向服务端反馈消息处理结果也会有专门的 span 进行描述。 我们通过 parent 和 link 来讲所有的这些 span 关联起来,这样通过一条消息的任意一个 span,就可以获得这条消息全生命周期的所有 span。 不仅如此,用户还将允许可以设置一个 span context 与自己的业务链路进行关联,将 RocketMQ 5.0 的消息轨迹本身嵌入进自己的全链路可观测系统中去。 Metrics Tracing 相对来说成本是比较高的,因为一条消息从发送到接收,可能会有很多流程,这就伴随着很多的 span,这就导致相对来说,tracing 数据的存储查询成本相对来说比较高。我们希望诊断整个 SDK 的健康状况,同时又不希望收集太多的 tracing 信息提高成本,此时提供一份 metrics 数据就能比较好地满足我们的需求。 在 SDK 的 metrics 中我们新增了诸多指标,包括不限于 Producer 中消息发送延时,Push Consumer 中消息的消费耗时和消息缓存量,可以帮助用户和运维者更快更好地发现异常。 5.0 中 SDK 的 metrics 也是基于 OpenTelemetry 进行实现的。以 Java程序为例,OpenTelemetry 对于 Java 的实现本身提供了一个 agent,agent 在运行时会打点采集 SDK 的一些 tracing/metrics 信息,并将它上报到对应的 metric collector 中,这种通过 agent 来降低无侵入式数据采集的方式被称之为 automatic instrumentation,而手动在代码中实现打点采集的方式则被称之 manual instrumentation。对于 metrics 我们目前还是采用 manual instrumentation 的方式来进行数据的采集和上报的。服务端会告知客户端对应的 collector 的地址,然后客户端将 Metrics 数据上传到对应的 collector 当中去。 _作者介绍:_ _艾阳坤,Apache RocketMQ 5.0 Java SDK 作者,CNCF Envoy Contributor,CNCF OpenTelemetry Contributor,阿里云智能高级开发工程师。_
作者:艾阳坤
#技术探索 #云原生

2022年9月30日

行业实践:Apache RocketMQ 业务集成典型行业应用和实践
消息典型应用场景 阿里云拥有丰富的消息产品家族,除了 RocketMQ 以外,还有大家熟知的对标开源的云 Kafka、支持 AMQP 协议的开源消息队列 RabbitMQ、物联网通信网关 MQTT、 对标 AWS SQS/SNS 的 Serverless 版消息 MNS(现在也是轻量版 RocketMQ)以及云上事件总线、事件中心 EeventBridge 。 阿里云所有消息产品均采用 RocketMQ 作为底层存储引擎,为用户提供稳定、可靠、高性能的消息通信能力,比如百万 TPS、百万队列、毫秒级通信延迟、分级存储、 Serverless 弹性等众多的消息产品。也带来了丰富的应用场景,分为应用集成和数据集成两大类。 应用集成以 RocketMQ 为主,应用最为广泛,本文也将分享 RocketMQ 在微服务解耦、电商交易、金融支付等场景下的最佳实践,比如银行的交易流水、保单的支付流转等。RabbitMQ 、MQTT 也主要用于应用集成场景,比如物联网、 IoT 双向通信、云产品的事件通知以及后处理等。对于新建的业务场景,一般首推 RocketMQ 作为消息选型,因为 RocketMQ 拥有最丰富的功能特性;而对于存量的业务迁移,则可以根据具体使用的消息产品来进行选择,以降低迁移成本。 数据集成以云 Kafka 为主,在大数据分析、日志采集分析等场景下应用最为广泛,比如游戏的玩家操作、广告埋点、数据分析、应用数据监控等。各种 SaaS 类的集成、聚石塔、电商数据打通等场景,则主要使用 EventBridge。更多的产品选型对比,可以参考专题页中的消息队列产品选型。 业务消息使用场景 RocketMQ 经过阿里集团内部十年锤炼,经过双 11 大促等万亿级规模的实时场景验证,SLA 最高可支持 4 个9,数据可靠性支持 9 个9。 以微服务解耦、订单交易、金融支付等场景为例,在互联网、电商、零售、金融等行业,消息的使用量非常大。尤其是在秒杀大促时,为了保障系统的稳定运行,需要 RocketMQ 进行削峰填谷。另外金融客户对每笔交易、每个订单也都要求数据不能丢失。因此在此类场景普遍对消息的可靠传输、海量并发、低延迟、稳定性与容灾等有着非常高的要求。RocketMQ 提供了丰富的消息类型,比如事务消息、定时消息、顺序消息等。 在交易系统里,为了简化交易流程,一般使用事务消息和定时消息。同时 RocketMQ 也提供了消息轨迹查询、消息 dashboard ,可以非常方便地对每个消息进行回溯,对每个 topic 或者 group 进行监测。RocketMQ 5.0 也提供了丰富的实例规格,从百级别 QPS 到百万级 QPS ,可以覆盖大部分应用场景。RocketMQ 默认提供多副本、多可用区部署,也提供了跨地域消息路由能力,支持客户构建高可用容灾或多活,且 RocketMQ 能够支持 99.9%的消息 RT 在 10ms 传输。 RocketMQ 事务消息举例 实现订单状态机异步流转 以订单状态机异步流转为例。此前,如果收到一笔订单交易,需要逐个通知下游的服务模块,比如需要更新购物车、更新积分等。每个业务模块耦合在一起会导致大促时的流量峰值非常大,需要每个服务模块保障其处理性能。而基于 RocketMQ 的事务消息能力,即可轻松实现订单子流程系统的异步解耦和流量的削峰填谷,事务消息可以确保数据库订单状态持久化和下游通知的事务性。 收到 LBL 订单交易时,可以先向 RocketMQ 发送一条半事务消息,此时 RocketMQ 会 hold 住消息,等核心交易事务完成后再向 MQ 提交确认半事务消息的状态,并执行下游服务模块的通知。假设核心交易模块失败,则会废弃之前提交的半事务消息,不通知下游。 对比此前的传统事务模块,使用 RocketMQ 可以大幅简化交易链路,缩短下单耗时。尤其是在大促场景下,可以解耦下游的服务模块,提供削峰填谷的能力。 超时中心 RocketMQ 的定时消息场景也是常见的使用方式。 比如双 11 等大促场景存在大量预售订单、定点尾款等,会带来大量定时任务。在电商交易过程中,订单流转也存在多个超时状态的任务,处理超时状态的任务需要确保可靠及时。以传统的方案进行构建分布式调度机制实现的时候,比如基于定时器调度延迟大,可能会存在性能瓶颈。 而采用 RocketMQ 的定时消息,实现将变得非常简单。定时任务只需提交一条延迟消息到 RocketMQ ,由 RocketMQ 保障定时消息达到秒级的精度,最高可支持百万级别的 TPS 能力,同时也能支持消息的消费重试,保障任务可靠触发,相比传统的使用方式大大简化了定时的复杂度。 RocketMQ 灰度策略举例 微服务全链路灰度 微服务场景下,精准地控制灰度流量并进行灰度版本验证,是保障线上业务稳定运行的关键。大部分情况下,用户通过划分不同的环境来进行灰度发布,对应 RocketMQ 的不同实例。但是很多用户希望能够简化环境管理,尽可能复用线上资源,结合消息来提供微服务全链路灰度能力。 如上图所示,线上已经在运行的微服务模块游 A、B、C,C 模块会产生消息,并由 A 模块进行消费。此时对服务模块 A 和 C 做灰度发布,则线上会存在两条泳道,一条是正常的业务流量,一条是灰度链路。我们希望线上版本 C 模块生产的消息能够被线上版本 A 模块进行消费,灰度版本 C 模块生产的消息能够被灰度版本 A 模块进行消费。 RocketMQ 支持透传环境标签,可在生产端给消息属性添加标签,然后开启 RocketMQ 的 SQL 92 语法过滤,服务端即可完成消息的过滤和路由,从而降低客户端的压力。 本文讲述了 RocketMQ 的业务消息场景、一些功能特性的使用方法,包括事务消息、定时消息、消息全链路灰度等,欢迎大家尝试使用。
作者:洛浩
#行业实践

2022年9月23日

Apache RocketMQ 在阿里云大规模商业化实践之路
阿里云消息队列 RocketMQ 商业化历程 RocketMQ 诞生于 2012 年,诞生即开源。2012~2015 年,RocketMQ 一直在通过内部电商业务打磨自身服务能力,并在 2015 年于阿里云上线公测。2016 年,阿里云 RocketMQ 完成商业化,同时被捐赠给 Apache 基金会,同年获得了年度受欢迎中国开源软件荣誉。 在 Apache 孵化期间,Apache RocketMQ 经历了快速发展,2017 年即毕业成为了 Apache 顶级项目。同年,Apache RocketMQ TLP RocketMQ 4.0 正式发布。此后,RocketMQ 4.0 经历了长足发展,期间阿里云商业和开源相辅相成、齐头并进,直到今天,共同迈入 RocketMQ 5.0 时代。 RocketMQ 5.0 发布后,阿里云商业会持续采取 OpenCore 的发展模式,秉承上游优先的社区发展原则,与社区一起将 RocketMQ 打造为一个超融合的数据处理平台。 阿里云消息队列产品矩阵 阿里云基于 RocketMQ 消息底座,构建了多元化的消息产品系列。 RocketMQ 是阿里云主打的消息品牌,互联网新兴业务领域首选的数据通道。消息队列 Kafka 是大数据的首选数据通道,微消息队列 MQTT 是移动互联网和物联网的数据通道,消息队列 RocketMQ 是传统业务领域的数据通道。消息服务 MNS 是 RocketMQ 轻量版,主要应用于应用集成领域,为平台型应用提供简单的队列服务。事件总线 Event Bridge 定位为云上事件枢纽,旨在阿里云上构建统一的事件中心。 阿里云消息队列产品矩阵完全构建在 RocketMQ 之上,基本实现了应用场景全覆盖,包括微服务解耦、SaaS 集成、物联网、大数据或日志收集生态,同时也在内部覆盖了阿里巴巴所有业务,在云上为数万阿里云企业提供了优质的消息服务。阿里云的消息产品矩阵涵盖了互联网、大数据、移动互联网等领域业务场景,为云原生客户提供不可或缺的一站式解决方案。 RocketMQ 在阿里云商业化历程中,一直致力于探索业务消息实践,也孵化了大量业务消息特性,并持续反哺到开源社区。 RocketMQ 4.0 业务消息探索之路 RocketMQ 在商业化过程中,陆续推出了四种消息类型来满足丰富的业务场景。 普通消息:普通消息提供极致弹性、海量堆积能力,内置重试与死信队列来满足业务对失败重试的需求,同时具备高吞吐、高可用、低延迟等特性,广泛应用于应用集成、异步解耦、削峰填谷等场景。 定时消息:提供秒级定时精度, 40 天超长定时,主要面向分布式定时调度、任务超时处理等场景,目前正在开源中。  顺序消息:支持全局与局部严格有序,从发送、存储到消费,保证端到端有序。面向有序事件处理、撮合交易、数据实时增量同步等场景。 事务消息:分布式、高性能、高可用的最终一致性事务解决方案,广泛应用于电商交易系统中服务的一致性协调场景并且已经开源。  RocketMQ 4.0 期间,商业和开源都致力于全方位拓展消息接入能力,使 RocketMQ 能够非常轻松地连接应用开源和云产品生态。比如商业上提供了多语言 SDK ,开源也有相应的 SDK 能够覆盖 Java、Go、Python 、C++使用 RocketMQ。同时支持 Spring 生态,能够通过 Spring Cloud 的方式使用 RocketMQ。商业上提供了一组非常简单易用的 HTTP API,提供了 67 种语言的实现。 除了 SDK 接入,RocketMQ 也在积极拥抱社区标准,在云产品侧提供了 AMQP 和 MQTT 的接入能力,其中 MQTT 已开源。 RocketMQ 也大力在发展 connector 生态,能够通过 RocketMQ connector 接入很多数据源,包括 Redis、MongoDB、Hudi 等大数据系统。 另外,阿里云构建的事件总线 EventBridge 也已开源,通过该产品能够将阿里云的云产品、SaaS 应用、自建数据平台的数据引入 RocketMQ。 RocketMQ 4.0 版本做了大量尝试,提供了全方位的消息接入能力。 RocketMQ 在服务阿里集团用户和商业化历程中,沉淀了大量领先的业务消息处理与服务能力。比如消息订阅方面,RocketMQ 支持集群分布式消费能力,也支持广播消费。在消息处理方面支持基于 Tag 和 SQL 做灵活过滤,其中基于 SQL 过滤是电商交易中非常重要的特性,能够支持在非常订阅比的情况下实现较低的投递比。 全球消息路由能力具备性能高、实时性强的特点。在云时代,数据中心天然分布在各个地域,各个地域之间还有 VPC 网络隔离。但是通过全球消息路由功能可以将地域与网络打通,能够满足更多业务场景。比如在阿里内部基于该能力实现了异地多活、异地容灾等企业级特性。 另外,全球消息路由具备非常高的易用性,提供了可视化任务管理界面,通过简单配置即可创建复制链路。 消息治理方面,RocketMQ 提供了访问控制、命名空间、实例限流、消息回放、重试消息、死信消息、堆积治理等能力。 服务能力方面,RocketMQ 经历了非常多沉淀,它在为交易链路服务了 12  年,参加了 10 年双 11,这也保证了 RocketMQ 能够在阿里云上提供非常高的可靠性。双 11 消息收发 TPS 峰值过亿,日消息收发总量超过 3 万亿。而即使在双十一万亿级数据洪峰下,消息也能做到 99.996% 毫秒级响应能力,消息发布平均响应时间不超过 3 毫秒,最大不超过 20 毫秒,真正实现了低延迟消息发布。 商业化初期,客户遇到最大难题是在分布式环境下如何完整地追踪异步消息链路。基于此背景,我们打造了可视化全生命周期消息轨迹追踪系统,能够提供丰富的消息查询、消息下载、定点重投、轨迹追踪能力,通过可观测系统帮助用户解决分布式环境中不可观测的问题。 如上图所示,一条消息从产生、发送至服务端存储到最终投递到消费者,整个发送和消费轨迹都有迹可循,包括投递给哪些消费者、哪些消费者在什么地方成功消费或者消费失败、何时进行重投,真正帮助客户解决了分布式观测难题。 除了功能特性,RocketMQ 在稳定性方面也做了很多建设。我们始终坚持,SLA 是云原生的根本,因此整个研发运维链路都有严格的稳定性保障措施: 架构开发:每个方案设计都会面向失败设计,代码开发阶段会有严格 Code Review 阶段,也会完整经历单元测试、集成测试、性能测试和容灾测试流程。 变更管理:有着非常严格的变更制度,要做到每个变更可灰度、可监控、可回滚、可降级。 稳定性防护:提供了限流、降级、容量评估、应急方案、大促保障等能力,会定期进行故障和预案演练,定期进行风险梳理。 体系化巡检:在云上有全方位的生产环境黑盒巡检。基于用户视角,会对全地域所有功能做全功能扫描,包含高达 50 多项检测项,任意项功能出问题都能立刻被监测到。在白盒巡检方面,会对 JVM 运行时指标、内核系统、集群指标进行巡检。 故障应急:有完整地故障应急流程,包括监控报警、故障发生、快速止血、排查根因、故障复盘。 RocketMQ 5.0 云原生架构升级之路 云原生时代,云上用户对云产品服务化程度、弹性能力、可控制性能力以及韧性都有了更高的要求。在此背景之下,我们对 RocketMQ 进行了云原生架构升级,这也是 RocketMQ 5.0 的诞生背景。 轻量级 SDK:基于云原生通信标准 gRPC 开发了一组轻量级 SDK,能够与当前富客户端优势互补。   无状态消息网关:在核心数据链路推出了无状态消息网关。通过搭建无状态服务节点Proxy,再通过 LB 进行服务暴露,将存储节点数据分离来独立负责核心消息存储和高可用。Proxy 与 Store 节点分离部署,独立弹性。  Leaderless 高可用架构:Store 节点身份完全对等,完全 Leaderless 化,去 ZK 和 HA 管控节点,能够做到非常高的可用性。同时相比传统的 Raft 一致性协议,该 Leaderless 架构能够做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移。高可用架构目前已经完成开源并与 Dledger 进行了融合。  云原生基础设施:可观测验能力云原生化,OpenTelemetry 标准化。整体架构走向 Kubernetes 化,能够充分利用售卖区的资源弹性能力。 RocketMQ 4.0 推荐的接入方式主要是富客户端。富客户端提供了诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但在云原生时代,轻量级、高性能的客户端更容易被云原生技术栈所集成。 因此,RocketMQ 5.0 重磅推出了全新多语言轻量级 SDK,具有以下优势: 全新极简 API 设计:不可变 API,有完善的错误处理。多语言 SDK 保障 API 在 Native 层面对齐。同时引入了全新的 Simple Consumer,能够支持按消息模型进行消费,用户不再需要关心消息队列,只需要关注消息。  通信层采用 gRPC 协议:拥抱云原生通信标准,gRPC 能够使服务更易被集成。多语言 SDK 通信代码也可以通过 gRPC 快速生成,更 Native 。  轻量级实现:采用无状态消费模式,能够大幅降低客户端的实现复杂度。客户端更轻量,采用的应用也更容易被 Serverless化、Mesh 化。  云原生可观测性:客户端实现了 OpenTelemetry 标准,能够支持以 OpenTelemetry 形式导出 Metrics 与 Tracing。 RocketMQ 5.0 的另一个重大升级是引入了全新的无状态消费模型。该消费模型完全构建在原先的队列模型之上。队列模型是与存储模型一致的消费模型,消费者完全按照队列做负载均衡,也按照队列做消息拉取,非常适合批量高速拉取以及对单条消息状态不敏感的场景,比如流计算等。 RocketMQ 5.0 推出了 PoP 机制,巧妙地在队列模型之上构建了消息模型,实现了鱼与熊掌兼得。在此消息模型的设计上,业务可以只关心消息而无需关心队列,所有 API 都能够支持单条消息级别的消费、重试、修改不可见时间、删除。 在消息模型下,消息发送过来被存储后,即对消费者可见。消费者通过 Receive Message API 对消息进行消费后,消息进入定时不可见状态。消息超时过后又会重新处于可见状态,能被其他消费者继续消费。某消费者确认消息后,服务端会对该消息进行删除,随即不可见。 基于消息系模型的消费流程下,API 完全面向消息而不是面向队列。而当 PoP 机制遇见了无状态 Proxy,除了存储层,其他节点都是无状态的;客户端、连接和消费也是无状态的,可任意在 Proxy 节点上飘移,真正做到轻量级。 经过重构,RocketMQ 5.0 的可观测性也走向了云原生标准。 Metrics 侧: 指标涵盖丰富:设计了更丰富的指标,包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示。 消息团队实践模板:为用户提供实践模板,并持续迭代更新。 Prometheus + Grafana:Prometheus 标准数据格式,利用 Grafana 展示。除了模板,用户也可以自定义展示大盘。 Tracing 侧: OpenTelemetry Tracing 标准:RocketMQ Tracing 标准已经合并到 OpenTelemetry 开源标准,提供了规范和丰富的 messaging tracing 场景定义。 消息领域定制化展示:按照消息维度重新组织抽象的请求 span数据,展示一对多的消费,多次消费信息直观且方便理解。 可衔接 tracing 链路上下游:消息的 tracing 可继承调用上下文,补充到完整的调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging 侧: Error Code 标准化:不同的错误有唯一的 Error Code。 Error Message 完整:包含完整的错误信息和排序所需要的资源信息。 Error Level 标准化:细化了各种不同错误信息的日志级别,用户可根据 Error、Warn 等级别配置更适合的监控告警。 弹性方面,RocketMQ 5.0 商业版能够充分撬动云的计算、存储和网络的池化资源。比如在计算方面,RocketMQ 5.0 所有工作负载完全部署在 ACK 之上,充分利用了 ACK 弹性能力,撬动 ACK 弹性资源。主要依赖 ACK 的两项技术,一是弹性资源池,另一个是 HPA 支持计算能力快速弹性。同时也会在 ACK 之上做跨可用区部署以提供高可用保障。 网络层面,RocketMQ 5.0 也会充分利用阿里云网络设施,为用户提供更便捷的网络访问能力。比如 RocketMQ 5.0 实例能够支持公网随开随用,需要依赖公网做测试的时候即开即用,测试完立即关闭,安全与方便兼具。同时支持多种私网类型的网络形态,包括 Single Tunnel、Private Link,另外也基于 CEN 构建了全球互通设计网络。 存储方面,RocketMQ 5.0 商业版率先引入多级存储概念,基于 OSS 构建二级存储,能够充分利用 OSS 存储的弹性能力,存储计费也转向了按量付费。而用户能够在 RocketMQ 之上自定义消息存储时长,比如将消息从 3 天有效时长延长至 30 天,能够真正将消息变为数据资产。同时利用二级存储能力,将冷热数据分离,为用户提供一致的冷读 SLA 。 RocketMQ 5.0 商业版发布预告 RocketMQ 4.0 历经了五年发展,开源和商业版本共同迈入了 5.0 时代。7 月底,阿里云消息队列将会基于开源版发布全新的 5.0 商业化版本。注:截止发稿前,RocketMQ 5.0 已经在阿里云消息队列 RocketMQ 产品上全新发布,目前支持国内主要地域。 RocketMQ 5.0 版相对于 4.0 版实例主要有以下几大改变: 第一,新版本、新售卖,更便宜。新版本采取了全新计量方式,有包年、包月型,也有按量付费和公网流量弹性计费。也有更全的售卖体系,比如新增专业版实例,能够满足部分用户需求。同时每个商品系列都新增了测试环境专用实例,能够方便用户以低成本的方式搭建自己的开发环境。 第二,更强弹性,降本提效利器。存储完全走向弹性,能够通过 Serverless 按需使用,按量付费。预留弹性,实例基础规格支持实时升降配,用户可以很方便地在流量到来之前做弹性。此外,专业版支持突发流量弹性,能够解决线上稳定性风险。 第三,全新架构,增强可观测运维。无状态消息消费模型能够解决一些老版本的痛点。同时在可观测上全面采取了云原生接入栈。 消息的全新形态:事件总线 EventBridge 事件总线 EventBridge 已经开源到 RocketMQ 社区中。云原生时代,事件无处不在,云计算资源散落在各地,各类生态孤岛随处可见。因此,以事件和事件驱动的方式来集成这一切是大势所趋。 基于此,阿里云推出了全新事件型产品 EventBridge。该产品构建在 RocketMQ 之上,是 RocketMQ 之上的一个事件驱动架构实践。 EventBridge 的事件源包括阿里云服务的管控事件比如资源变更事件、审计事件、配置变更事件,阿里云服务的数据事件,也包括自定义应用、SaaS 应用、自建数据平台、其他云厂商服务等。 事件经过 EventBridge 处理后会投递到事件目标,事件目标包括函数计算、消息服务、自建网关、HTTP(S)、短信、邮箱、钉钉等。 事件源到事件目标之间会经历完整的事件处理,包括事件源接入到 EB 后,可以对事件进行过滤、转换、归档、回放等。事件在 EventBridge 整个流程中也有完善的可观测性设计,包括事件查询、链路追踪。事件的接入方式非常丰富,可以通过 OpenAPI 来接入、7 种多语言 SDK、CloudEvents SDK、Web Console 和 Webhook 。 EventBridge 具有如下特点: 能够大幅度减少用户开发成本,用户无需额外开发,通过创建 EventBridge 源、事件目标、事件规则等资源即可实现事件架构。用户可以编写事件规则,对事件做过滤、转换。  提供原生 CloudEvents 支持,拥抱 CNCF 社区,能够无缝对接社区 SDK 。标准协议也能统一个阿里云事件规范。  事件 Schema 支持:能够支持事件 Schema 自动探测和校验,支持 Source 和 Target 的 Schema 绑定。  全球事件任意互通:组建了全球事件任意互通网络,组件了跨地域、跨账户的事件网络,能够支持跨云、跨数据中心的事件路由。 EventBridge在云上生态已经初具规模,已经集成了 255+ 云产品事件源和 1000+ 事件类型。 EventBridge率先对消息生态做了融合。阿里云的消息产品矩阵生态均通过 EventBridge 做了完全融合。任何一款消息产品与另一款消息产品的数据都能互通。同时,依靠 EventBridge 的全球事件网络,能够为所有消息产品赋予全球消息路由的能力。 EventBridge 目前已经在内部接入钉钉 ISV、聚石塔 ISV,外部也有 50+ SaaS 系统可以通过 Webhook 的方式接入。另外,海量事件源可以触达 10 多种事件目标,已经对接了全系云产品 API ,任何事件都可以驱动全量云产品 API。 作者介绍: 周新宇 Apache Member,Apache RocketMQ PMC Member,阿里云消息队列 RocketMQ 研发负责人。
作者:周新宇
#行业实践 #云原生

2022年8月29日

阿里云基于全新 RocketMQ 5.0 内核的落地实践
前言 在上个月结束的 RocketMQ Summit 全球开发者峰会中,Apache RocketMQ 社区发布了新一代 RocketMQ 的能力全景图,为众多开发者阐述 RocketMQ 5.0 这一大版本的技术定位与发展方向。 在过去七年大规模云计算实践中,RocketMQ 不断自我演进,今天,RocketMQ 正式迈进 5.0 时代。 从社区关于 5.0 版本的解读可以看到,在云原生以及企业全面上云的大潮下,为了更好地匹配业务开发者的诉求,Apache RocketMQ 做了很多的架构升级和产品化能力的适配。那么如何在企业的生产实践中落地 RocketMQ 5.0 呢?本篇文章的核心就消息架构以及产品能力的云原生化,介绍了阿里云是如何基于全新的 RocketMQ 5.0 内核做出自己的判断和演进,以及如何适配越来越多的企业客户在技术和能力方面的诉求。 云原生消息服务的演进方向 首先我们来看下云原生消息服务有哪些演进? 面向未来,适应云原生架构的消息产品能力应该在以下方面做出重要突破: 大规模弹性:企业上云的本质是解放资源供给的负担和压力,专注于业务的集成和发展。作为消息服务的运维方,应该为上层业务提供与模型匹配的资源供给能力,伴随业务流量的发展提供最贴合的弹性能力。一方面可以解决面向不确定突发流量的系统风险,另一方面也可以实现资源利用率的提升。  易用性:易用性是集成类中间件的重要能力,消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。低门槛才能打开市场,扩大心智和群体。   可观测性:可观测性对于消息服务的所有参与方来说都很重要,服务提供方应提供边界清晰、标准开放的观测诊断能力,这样才能解放消息运维方的负担,实现使用者自排查和边界责任的清晰化。  稳定性高 SLA:稳定性是生产系统必备的核心能力,消息来说往往集成在核心交易链路,消息系统应该明确服务的可用性、可靠性指标。使用方应基于明确的 SLA 去设计自己的故障兜底和冗余安全机制。 立足于这个四个关键的演进方向,下面为大家整体介绍一下阿里云 RocketMQ 5.0 在这些方面是如何落地实践的。 大规模弹性:提供匹配业务模型的最佳资源供给能力 消息服务一般集成在业务的核心链路,比如交易、支付等场景,这一类场景往往存在波动的业务流量,例如大促、秒杀、早高峰等。 面对波动的业务场景,阿里云 RocketMQ 5.0 的消息服务可以伴随业务的诉求进行自适应实现资源扩缩。一方面在比较稳定的业务处理基线范围内,按照最低的成本预留固定的资源;另一方面在偶尔存在的突发流量毛刺时,支持自适应弹性,按量使用,按需付费。两种模式相互结合,可以实现稳定安全的高水位运行,无需一直为不确定的流量峰值预留大量资源。 除了消息处理流量的弹性适应外,消息系统也是有状态的系统,存储了大量高价值的业务数据。当系统调用压力变化时,存储本身也需要具备弹性能力,一方面需要保障数据不丢失,另一方面还需要节省存储的成本,避免浪费。传统的基于本地磁盘的架构天然存在扩缩容问题,其一本地磁盘容量有限,当需要扩大容量时只能加节点,带来计算资源的浪费;其二本地磁盘无法动态缩容,只能基于业务侧流量的隔离下线才能缩减存储成本,操作非常复杂。 阿里云 RocketMQ 5.0 的消息存储具备天然的 Serverless 能力,存储空间按需使用,按量付费,业务人员只需要按照需求设置合理的 TTL 时间,即可保障长时间存储时的数据完整性。 集成易用性:简化业务开发,降低心智负担和理解成本 集成易用性是一种系统设计约束,要求消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。举个典型场景,在消息队列例如 RocketMQ 4.x 版本或 Kafka 中,业务消费消息时往往被负载均衡策略所困扰,业务方需要关注当前消息主题的队列数(分区数)以及当前消费者的数量。因为消费者是按照队列粒度做负载均衡和任务分配,只要消费者能力不对等,或者数量不能平均分配,必然造成部分消费者堆积、无法恢复的问题。 在典型的业务集成场景,客户端其实只需要以无状态的消息模型进行消费,业务只需关心消息本身是否处理即可,而不应该关心内部的存储模型和策略。 阿里云 RocketMQ 5.0 正是基于这种思想提供了全新的 SimpleConsumer 模型,支持任意单条消息粒度的消费、重试和提交等原子能力。 可观测性:提供边界清晰、标准开放的自助诊断能力 有运维消息队列经验的同学都会发现,消息系统耦合了业务的上游生产和下游消费处理,往往业务侧出问题时无法清晰地界定是消息服务异常还是业务处理逻辑的异常。 阿里云 RocketMQ 5.0 的可观测性就是为这种模糊不确定的边界提供解法,以事件、轨迹、指标这三个方面为基础,依次从点、线、面的纬度覆盖链路中的所有细节。关于事件、轨迹、指标的定义涵盖如下内容: 事件:覆盖服务端的运维事件,例如宕机、重启、变更配置;客户端侧的变更事件,例如触发订阅、取消订阅、上线、下线等;   轨迹:覆盖消息或者调用链的生命周期,展示一条消息从生产到存储,最后到消费完成的整个过程,按时间轴抓出整个链路的所有参与方,锁定问题的范围;  指标:指标则是更大范围的观测和预警,量化消息系统的各种能力,例如收发 TPS、吞吐、流量、存储空间、失败率和成功率等。 阿里云 RocketMQ 在可观测性方面也是积累良多,不仅率先支持了完善的消息轨迹链路查询,而且在 5.0 新版本中还支持将客户端和服务端的 Trace、Metrics 信息以标准的 OpenTelemetry协议上报到第三方Trace、Metrics中存储,借助开源的 Prometheus 和 Grafana 等产品可以实现标准化的展示和分析。 稳定性 SLA:提供可评估、可量化、边界明确的服务保障能力 稳定性是生产系统必备的核心能力,消息系统往往集成在核心交易链路,消息系统是否稳定直接影响了业务是否完整和可用。但稳定性的保障本身并不只是运维管理,而是要从系统架构的设计阶段开始梳理,量化服务边界和服务指标,只有明确了服务的可用性和可靠性指标,使用方才能设计自己的故障兜底和冗余安全机制。 传统的基于运维手段的被动保障方式,只能做基本的扩缩容和系统指标监控,对于消息的各种复杂边界场景,例如消息堆积、冷读、广播等并不能很好的提供量化服务能力。一旦上层业务方触发这些场景,系统则会被打穿,从而丧失服务能力。 阿里云 RocketMQ 5.0 体系化的稳定性建设,是从系统设计阶段就提供对消息堆积、冷读等场景量化服务的能力,确定合理的消息发送 RT、端到端延迟和收发吞吐 TPS 能力等,一旦系统触发这些情况,可在承受范围内做限制和保护。 本篇文章从大规模弹性、集成易用性、可观测性和稳定性 SLA 等方面介绍了 RocketMQ 5.0 的演进和方向,同时针对性介绍了阿里云消息队列 RocketMQ 5.0 在这些方面的实践和落地。 阿里云消息队列 RocketMQ 5.0 目前已正式商业化,在功能、弹性、易用性和运维便捷性等方面进行了全面增强,同时定价相比上一代实例最高降低 50%,助力企业降本增效,以更低的门槛实现业务开发和集成。新一代实例支持 0~100 万 TPS 规模自伸缩、支持突发流量弹性和存储 Serverless;在可观测性方面,支持全链路轨迹集成和自定义 Metrics 集成;在集成易用性方面,支持新一代轻量原生多语言 SDK,更加稳定和易用。
#行业实践 #云原生

2022年8月18日

解析 RocketMQ 业务消息--顺序消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。 简介 顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。 功能原理 在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。 顺序发送 在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。 同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。  因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。 顺序消费 与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。 小结 无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。 我们回到开头那个问题: 既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢? 现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。 下述是一个表格,简要对比了顺序消息和普通消息。 最佳实践 合理设置 MessageGroup MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。 同步发送和发送重试 如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。 消费幂等 消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。 应用案例 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。   电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。   实战 发送 可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下: public class ProducerFifoMessageExample { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class); private ProducerFifoMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "yourFifoTopic"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing. .setTopics(topic) // May throw {@link ClientException} if the producer is not initialized. .build(); // Define your message body. byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey1ff69ada8e0e") // Message group decides the message delivery order. .setMessageGroup("youMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { LOGGER.error("Failed to send message", t); } // Close the producer when you don't need it anymore. producer.close(); } } 消费 消费的代码如下: public class SimpleConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(5); final List messages = consumer.receive(maxMessageNum, invisibleDuration); for (MessageView message : messages) { try { consumer.ack(message); } catch (Throwable t) { LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); } }
作者:绍舒
#技术探索 #功能特性

2022年8月10日

云原生事件驱动引擎(Apache RocketMQ-EventBridge)应用场景与技术解析
在刚刚过去的 RocketMQ Summit 2022 全球开发者峰会上,我们对外正式开源了我们的新产品 RocketMQEventbridge 事件驱动引擎。 RocketMQ 给人最大的印象一直是一个消息引擎。那什么是事件驱动引擎?为什么我们这次要推出事件驱动引擎这个产品?他有哪些应用场景,以及对应的技术方案是什么? 今天我们就一起来看下,整篇文章包含三部分: 第一部分,我们一起看下什么是事件。 第二部分,和大家一起看看,事件有哪些不一样的“超能力”,使用这些“超能力”呢,我们又能干些什么? 第三部分,我们讲一下 RocketMQ 给出的关于事件的解决方案,也是我们这次开源的项目:RocketMQEventBridge。 什么是事件 大家自己可以先在脑袋里想一下,什么是事件?我们给事件下的一个定义是: 过去已经发生的事,尤其是比较重要的事。 A thing that happens, especially one of importance. 这个很好理解。比如说,昨天下午我做了一次核酸检测;今天上午又吃了一个冰激淋。这些都是过去已经发生的事件。但是,如果我再问:事件跟消息有什么区别?这个时候,大家是不是觉得事件这个定义,好像又不那么清晰? 刚才说的那些事件,是不是也可以理解为消息啊?如果,老张给我发送了一条短信,这个算是事件,还是消息啊?平常开发过程中,“什么时候使用消息,什么时候使用事件?” 不过,在回答这个问题之前,我们一起来看一个典型的微服务。 一个微服务系统和外部系统的交互,可以简单分为两部分:一是接收外部请求(就是图中上面黄色的部分);二是是调用外部服务(就是图中下面绿色的部分)。 接收外部请求,我们有两种方式:一种是提供 API,接收外部发过来的 Query 请求和 Command 请求;另外一种是主动订阅外部 Command 消息。这两类操作,进入系统内部之后呢,我们常常还会,调用其他为微服务系统,一起协同处理,来完成一个具体的操作。当这些操作,使得系统状态发生改变时,就会产生事件。 这里呢,我们把从外部接收到的 Command 消息,和系统内部产生的事件,都称之为消息。 我们总结一下,消息和事件的关系是这样的:消息包含两部分,Command 消息和 Event 消息 1、看图中左半部分,Command 是外部系统发送给本系统的一条操作命令; 2、再看图中右半部分,Event 则是本系统收到 Command 操作请求,系统内部发生改变之后,随之而产生了事件; 所以,事件和消息是不同的,事件可以理解为是一种特殊的消息。其特殊的点,主要在 4 个地方: 已发生、且不可变 事件,一定是“已发的”。“已发生”的代表什么呢?不可变的。我们不可能改变过去。这个特性非常重要,在我们处理事件、分析事件的时候,这就意味着,我们绝对可以相信这些事件,只要是收到的事件,一定是系统真实发生过的行为。而且是不可修改。 对比 Command 和 Query。Command 的中文是什么?命令。很显然,它是还没有发生的,只是表达了一种期望。我们知道“期望的”,不一定会成功发生。 比如:把厨房的灯打开、去按下门铃、转给 A 账户 10w…… 这些都是 Command,都是期望发生的行为。但是,最终有没有发生?并不知道。 Event 则是明确已经发生的事情。比如:厨房灯被打开了、有人按了门铃、A 账户收到了 10w…… 再对比 Query,它则是查询系统当前状态的一种请求,比如:厨房的灯是打开着的、门铃正在响、查下账户显示余额 11w…… 无期望的 这个怎么理解?事件是客观的描述一个事物的状态或属性值的变化,但对于如何处理事件本身并没有做任何期望。 相比之下,Command 和 Query 则都是有期望的,他们希望系统做出改变或则返回结果,但是 Event 呢,它只是客观描述系统的一个变化。 我们看一个例子:交通信号灯,从绿灯变成红灯,事件本身并没有要求行人或汽车禁止通行,而是交通法规需要红绿灯,并赋予了其规则。 所以,系统一般不会定向的、单独向另外一个系统发送事件,而是统一的告诉“事件中心”,“事件中心”呢,那里面有各个系统上报上来的,各式各样的事件。系统会向事件中心说明:自己这个系统,会产生哪些事件呀,这些事件的格式是怎么样的呀。 别的系统如果感兴趣呢,就可以来主动订阅这些事件。真正赋予事件价值的,是事件消费者。事件消费者想看看,某个系统发生了什么变化呀?OK,那他就去订阅这些事件,所以事件是消费者驱动的。 这跟消息有什么区别呢?Command 消息的发送和订阅,是双方约定好的,外人不知道,往往是以文档或代码的形式,大家按约定好的协议,发送和订阅消费,所以消息是生产者驱动的。 我们打个比喻,事件就像市场经济,商品被生产出来,具体有什么价值,有多大价值,很大程度上看其消费者。我们能看到系统中各种各样的事件,就像橱窗里摆放了各种各样的商品。而 Command 消息呢,有点像计划经济,一出生就带着很强的目的性,我就是要“分配”给谁消费。 天然有序 事件的第三个特性是:“天然有序”。含义:同一个实体,不能同时发生 A 又发生 B,必有先后关系;如果是,则这两个事件必属于不同的事件类型。 比如:针对同一个交通信号灯,不能既变成绿灯,又变成红灯,同一时刻,只能变成一种状态。 大家可能发现了一点,这里其实隐藏了事件的一个额外属性:因为天然有序,跟时间轴上的某一时刻强绑定,且不能同时发生,所以它一定是唯一的。 如果我们看到了两个内容一样的事件,那么一定是发生了两次,而且一次在前,一次在后。(这对于我们处理数据最终一致性、以及系统行为分析都很有价值:我们看到的,不光光是系统的一个最终结果,而是看到变成这个结果之前的,一系列中间过程) 具像化 事件的第四个特性是:“具象化”的。 事件会尽可能的把“案发现场”完整的记录下来,因为它也不知道消费者会如何使用它,所以它会做到尽量的详尽,比如: ●是由谁产生的事件?Subject ●是什么类型的事件?Type ●是谁发送的事件?Source ●事件的唯一性标志是什么?Id ●什么时候发生?Time ●事件的内容是什么?Data ●事件的内容有哪些信息?Dataschema 我们还是以交通信号灯举例子: 对比我们常见的消息,因为上下游一般是确定的,常常为了性能和传输效率,则会做到尽可能的精简,只要满足“计划经济”指定安排的消费者需求即可。 总结一下,事件上面的 4 个特性,是对事件巨大的一个属性加成,让事件拥有了跟普通消息不一样的“超能力”。使事件,常常被用到 4 个典型场景:事件通知、事件溯源、系统间集成和 CQRS。 下面让我们一个个展开,具体看看这些应用场景。 事件的典型应用场景 事件通知 事件通知是我们系统中很常见的一个场景。比如:用户下单事件通知给支付系统;用户付款事件通知给交易系统。 这里,让我们回到一开始信号灯那个例子。当交通信号灯,从红灯变成绿灯时,可能存在很多系统都需要这个信息。 方式 1:发送方主动调用,适配接收方 一种最简单的方式是,我们依次 call 每个系统,并把信息传递出去。比如:信号灯系统,主动调用地图导航的 API 服务、调用交警中控的 API 服务,调用城市大脑的 API 服务,把红绿灯变化信号发送出去。 但我们都知道,这个设计非常糟糕。尤其当系统越来越多时,这无疑是灾难的,不仅开发成本高,而且其中一个系统出现问题,可能会 hang 住整个服务,则导致调用其他系统都会受到影响。 方式 2:接收方主动订阅,适配发送方 一个很自然的解决方案是,我们将这些信息发送到中间消息服务 Broker,其他系统如果有需要,则主动去订阅这些消息即可。 这个时候,信号灯系统与其他系统并没有直接的调用依赖,交警中控服务、地图导航服务、城市大脑服务,只要按照约定的协议,去订阅信号灯的消息,并解析这些信息即可。 但是,这里同样存在一个问题:这个架构中,是以“信号灯”为中心。消费者需要理解发送者的业务领域,并主动添加适配层,(就是图中白色回旋镖部分),将消息转化为自己业务领域内的语言。但对于每一个微服务来说,他都希望都是高内聚低耦合的。 如果交警中控需要全国的信号灯数据,但是每个地域的消息格式又不一样,这就意味着,交警中控需要适配每一个地域的协议,做一层转换。而且万一后面变化了怎么办?想想就知道这个运维成本有多可怕。 那是否交警中控系统,可以要求全国所有红绿灯系统,都按同一种数据协议给到自己呢?不好意思,这些信号灯数据地图服务也在用,城市大脑也在用,不能更改。 方式 3:引入事件,Borker 根据接收方协议,进行灵活适配 但如果使用事件,就不一样了。因为事件是“无期望的”,“具像化的”,天然的保留了案发现场尽可能多的信息,且更加规范标准,对于消费者(也就是交警中空)来说,可以轻易将不同省份,收集上来的事件,轻易组装成,符合自己业务要求的格式。 而且,这一组装,是在中间层 Broker 发生的。对于交警中控来说,它只需要,按照自己业务领域的设计,提供一个接收事件的 API,然后其他事件,通过 Broker,主动投递到这个 API 上即可。从头到尾,对交警中控系统,没有一行适配外部业务的代码。 所以,这种方式有 3 个明显的优势: 1、只关注自己业务领域本身,不需要做适配外部的代码; 2、所有对系统的变更,收敛到 API,为唯一入口;同一个 API,可能既是用来接收事件的,也可能同时用于控制台操作; 3、因为事件是推送过来的,所以,也不需要像之前一样,引入一个 SDK,和 Broker 发生连接,获取消息,降低了系统的复杂度。 这样,我们一开始的图,就会变成这个样子:交通信号灯产生事件,投递到事件中心,其他需要这些事件的消费者,在事件中心订阅,再由事件中心,按照他们期望的事件格式,主动投递过去。 让我们再来回顾下整个过程: 第 1 幅图:一开始,我们通过强依赖的方式,让信号灯系统,主动将信息发送给各个系统。那这张图里,我们是以各个下游服务为中心,信号灯系统去适配各个下游服务。 第 2 幅图:后来,我们采用传统消息的方式,对调用链路进行了解耦,两边系统不再直接依赖了,但是依旧会存在业务上的依赖。消费者需要去理解生产者的消息格式,并在自己系统内部,进行转换适配。所以,这里其实是以生产者为中心。 第 3 幅图:最后,我们引入了事件通知的方式,对于这种方式,生产者和消费者,他们都只需要关注自己系统本身就可以了。生产者,生产什么样的事件,消费者,消费什么样的数据格式,都各自以自己的业务为中心,不需要为对方做适配。真正做到我们说的高内聚低耦合,实现彻底的完全解耦。 现在,回到我们一开始提到的典型微服务模型,对于有些场景,我们就可以变为下面这种方式:对微服务的变更操作,统一收敛到 API 操作入口,去掉 Command 消息入口。收敛入口,对于我们维护微服务,保障系统稳定性,常常非常有好处的。 事件溯源 事件溯源是什么?事件溯源简单理解就是让系统回到过去任意时刻。那怎么样,才能让系统可以回到过去呢?很简单,首先系统所有发生的变化,都得以事件的方式记录下来;然后,我们就可以通过回放事件的方式,回到过去任何一个时刻。 那为什么只有事件才能做这个事,其他普通消息不行呢?这个还是要回到我们刚才说的几个事件特性:已发生不可变的、天然有序且唯一的、而且是非常详细具体的,完整的记录了事件的案发现场。所以,对于事件溯源这个场景,事件可以说是系统的一等一的公民。 举个例子:比如说,如果我们能够完整地收集路上的各种事件信息,包括信号灯、车量、天气、拥堵路况等等,那么,我们就可以“穿越时间”,回到交通现场,重新做一次决策。比如,在智慧交通场景,当我们想去验证一个调度算法的时候,我们就可以回放当时发生的所有事件,来重现现场。 大家可能觉得这个很神奇,但是,其实我们平常一直有接触,大家知道是什么吗?就是我们常用的代码版本管理系统,比如:github。 这里有大家可能会问,如果一个系统积赞了很多事件,想重放是不是得很久?比如在一些交易场景,每天都会产生大量的事件,那应该怎么处理呢?这里呢,系统一般每天晚上都会打一份快照。如果系统意外宕机,想回到某一个时刻,就可以把前一天的快照取出,然后再重新跑下当天的事件,即可恢复。而白天呢,所有的事件都是在内存中进行处理,不会跟数据库交互,所以系统性能非常快,只有事件会落盘。 当然,事件溯源也不是适合所有场景,它有优点也有缺点,详细看上图。 系统间集成 刚才讲的第1个场景:事件通知,一般涉及到两个上下游团队的协作开发;讲的第 2 个场景:事件溯源,则一般是 1 个团队内的开发;但系统间集成,则往往面对的是三个业务团队的协作开发。这个怎么理解呢? 其实这个也很常见:比如公司里购买了 ERP 系统,同时也购买了外部考勤系统、外部营销系统服务等等。这些系统都有一个共同点,是什么?都不是我们自己开发的,是而买来的。 如果我们想把 ERP 系统的人员信息,实时且自动同步到考勤系统中去怎么办?其实这个是有点麻烦的,因为这些都不是我们自己开发的。 1、我们不能修改 ERP 系统的代码,主动去调用考勤系统,把人员变更信息发送过去; 2、也不能修改考情系统的代码,主动去调用外部 ERP 系统的 API; 但是我们可以通过事件总线,借助 webhook 或则标准 API 等等方式,收集上游的 ERP 系统产生的人员变更事件,然后进行过滤和转换,推送到下游考勤系统中去,当然,这里也可以是内部自研服务。 所以,现在的研发模式变成了:事件中心管理了所有 SaaS 服务,包括内部自研系统产生的所有事件。然后呢,我们只需要在事件中心,寻找我们需要的事件,进行订阅,对 SaaS 服务和内部自研系统,进行简单服务编排,即可完成开发。 CQRS CQRS 中的 C 代表 Command,Command 什么意思?就是明令,一般包含:Create/Update/Delete,Q 代表 Query,是指查询。所以 CQRS 本质是读写分离:所有的写操作,在图中左边的系统中完成,然后将系统因为 Command 产生变化的事件,同步到右边的查询系统。 这里同学可能有疑问,这跟数据库的读写分离有什么区别?数据库读写分离也是提供一个写的 DB,一个读的 DB,两边做同步。对吧… 那这里很大的一个区别是:对于数据库的读写分离,是以数据库为中心,两边的数据库是一模一样的,甚至数据的存储结构也是一模一样的。 但是对于 CQRS 的读写分离场景,是以业务为中心,两边存储的数据结构格式,往往是不一样的,甚至数据库都不是同一种。完全围绕各自的读写业务逻辑,设计最佳技术选型。对于写场景,为了保障事务,我们可能使用关系性数据库;对于读的场景,我们为了提高性能,我们可能会使用 Redis、HBase 等 Nosql 数据库。 当然 CQRS 也不是适合所有场景,他往往比较适合: ●希望同时满足高并发的写、高并发的读; ●写模型和读模型差别比较大时; ●读/写比非常高时; 我们刚才讲了事件的 4 个应用场景,但是,事件不是万能的,就像软件研发也没有银弹,有很多场景也并不适合使用事件。包括: 1. 强依赖 Response 的同步调用场景; 2. 要求服务调用保持事务强一致性的场景。 RocketMQ 关于事件的解决方案 需要什么样的能力? 首先,按照之前讲到的事件应用场景,我们整理下,如果我们做好事件驱动这块,我们的系统,需要具备什么样的能力呢? 第一,我们肯定得有一个事件标准,对吧…因为,事件不是给自己看的,也不是给他看的,而是给所有人看的。刚才,我们也讲到事件是无期望的,它没有明确的消费者,所有都是潜在的消费者,所以,我们得规范化事件的定义,让所有人都能看得懂,一目了然。 第二,我们得有一个事件中心,事件中心里面有所有系统,注册上来的各种事件,(这个跟消息不一样,我们没有消息中心,因为消息一般是定向的,是生产者和消费者约定的,有点像计划经济,消息生产出来的时候,带着很强的目的性,是给谁谁消费的。而事件有点像市场经济,事件中心呢,)这个有点类似市场经济大卖场,玲琅满目,里面分类摆放了各种各样的事件,所有人即使不买,也都可以进来瞧一瞧,看一看,有哪些事件,可能是我需要的,那就可以买回去。 第三,我们得有一个事件格式,用来描述事件的具体内容。这相当于市场经济的一个买卖契约。生产者发送的事件格式是什么,得确定下来,不能总是变;消费者以什么格式接收事件也得确定下来,不然整个市场就乱套了。 第四,我们得给消费者一个,把投递事件到目标端的能力。并且投递前,可以对事件进行过滤和转换,让它可以适配目标端 API 接收参数的格式,我们把这个过程呢,统一叫做订阅规则。 第五,我们还得有一个存储事件的地方,就是最中间的事件总线。 事件标准 关于刚才提到的第一点事件标准,我们选取了 CNCF 旗下的开源项目 CloudEvents,目前已被广泛集成,算是一个事实上的标准。 它的协议也很简单,主要规范了 4 个必选字段:id,source、type、specversion;以及多个可选字段:subject、time、dataschema、datacontenttype和data。上图右边,我们有一个简单的例子,大家可以看下,这里就不具体展开了。 另外,事件的传输也需要定义一种协议,方便不同系统之间的沟通,默认支持三种 HTTP 的传输方式:Binary Content Mode、Structured Content Mode 和 Batched Content Mode。通过 HTTP 的 ContentType,就可以区分这三种不同的模式。其中前两种,都是传递单个事件;第三种则是传递批量事件。 事件 Schema 事件的 Schema,用来描述事件中有哪些属性、对应的含义、约束等等信息。目前我们选取了 Json Schema. 和 OpenAPI 3.0,根据事件的 Schema 描述,我们可以对事件进行合法性校验。,当然 Schema 本身的修改,也需要符合兼容性原则,这里不作具体展开。 事件过滤和转换 关于事件的过滤和转换,我们提供了 7 种事件过滤方式和 4 种事件转换方式,详细可以下图描述: 技术架构 我们 RocketMQ 围绕事件驱动推出的产品,叫做 EventBridge,也是我们这次要开源的新产品。 他的整个架构可以分为两部分:上面是我们的控制面、下面是我们的数据面。 控制面中最上面的 EventSource 是各个系统注册上来的事件源,这些事件可以通过 APIGateway 发送事件到事件总线,也可以通过配置的 EventSource,生成 SouceRuner,主动从我们的系统中,去拉取事件。事件到达事件总线 EventBus 之后,我们就可以配置订阅规则了 EventRule,在规则 EventRule 里我们设置了事件怎么过滤,以及投递到目标端前,做哪些转换。系统基于创建的规则会生成 TargetRunner,就可以将事件推送到指定的目标端。 那这里 SouceRuner 和 TargetRunner 是什么呢?我们具体能对接哪些上下游 Source 和 Target? 这些我们都可以在下面的 SourceRegister 和 TargetRegister 提前进行注册。 所以 EventBridge 的数据面是一个开放的架构,他定义了事件处理的SPI,底下可以有多种实现。比如,我们把 RocketMQ 的 HTTPConnector 注册到 EventBridge 中,那我们就可以把事件推送到 HTTP 服务端。 如果我们把 Kafka 的 JDBC Connector 注册到 EventBridge 中,我们就可以把事件推送到数据库。 当然,如果你的系统不是通用的像 HTTP/JDPC 等协议,也可以开发自己的 Connector,这样就能将事件实时同步到 EventBridge,或则接收来自 EventBridge 的事件。 除此之外,我们还会有一些附加的运维能力,包括:事件追踪、事件回放、事件分析、事件归档。 RocketMQEventBridge 与云上 在所有开源的,与其他上下游系统做集成的 Connector 当中,我们有一个特殊的 Connector,叫:EventBridgeConnector,通过它可以方便的和阿里云云上的事件总线进行集成。这里有两个典型的应用场景: 第一个场景是:IDC 系统内部产生的事件,不仅可以用来做内部系统间的解耦,还可以实时同步到云上,驱动云上的一些计算服务,比如通过云上 Maxcompute 对内部产生的事件进行离线分析,或则驱动云上的图像识别服务,实时分析事件中标注的图片。 第二个场景是:如果 IDC 内部使用到了自建 MQ,我们同样可以通过 MQConnector 和 EventBridgeConnector,实时同步事件到云上,逐步将内部自建 MQ,迁移到云上MQ。 生态发展 关于 EventBridge 的未来方向,我们希望是在开源,构建一个支持多云架构的事件总线生态。这个怎么理解?简单来说,我们希望在不同云厂商之间,包括云厂商和内部 IDC 系统之间,可以通过事件,来打破围墙,实现互通。虽然,这几年云计算发展很快,但是对于一些特别大的客户来讲,有时候并不希望跟某家云厂商强绑定。这不光是市场充分竞争的结果,也是大客户一种降低风险的手段。所以,这个时候,如何在不同云厂商之间,包括云厂商系统和自己内部 IDC 系统之间,灵活的交互,甚至灵活的迁移,是企业非常重要的一个诉求。 当然,实现这个是有一定难度的。不过如果我们在进行企业架构设计的时候,是基于事件驱动架构进行设计开发——不同系统之间的交互,围绕事件展开,就会容易很多。 事件,在这里,就好比一种通用语言,通过这个通用语言,就可以实现和不同系统之间的沟通交流。比如:用 IDC 系统内部的事件,去驱动阿里云上服务;甚至用阿里云上的事件,去驱动 AWS 上的服务运行; 为了实现这个目标,我们在和不同云厂商,不同 SaaS 系统服务商,进行系统间集成的时候,需要开发与之对应的连接器。 也欢迎大家,一起来共建 RocketMQEventBridge 的生态。 源码地址:
作者:静罗
#技术探索 #强力推荐

2022年8月3日

解析 RocketMQ 业务消息——事务消息
引言:在分布式系统调用场景中存在这样一个通用问题,即在执行一个核心业务逻辑的同时,还需要调用多个下游做业务处理,而且要求多个下游业务和当前核心业务必须同时成功或者同时失败,进而避免部分成功和失败的不一致情况出现。简单来说,消息队列中的“事务”,主要解决的是消息生产者和消费者的数据一致性问题。本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。 点击下方链接,查看视频讲解: 场景:为什么需要事务消息 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括: 主分支订单系统状态更新:由未支付变更为支付成功; 物流系统状态新增:新增待发货物流记录,创建订单物流记录; 积分系统状态变更:变更用户积分,更新用户积分表; 购物车系统状态变更:清空购物车,更新用户购物车记录。  分布式系统调用的特点是:一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。 传统 XA 事务方案:性能不足 为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务,基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。 基于普通消息方案:一致性保障困难 将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。 该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如: 消息发送成功,订单没有执行成功,需要回滚整个事务; 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致; 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。 基于RocketMQ分布式事务消息:支持最终一致性 上述普通消息方案中,普通消息和订单事务无法保证一致的本质原因是普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于消息队列 RocketMQ 版实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 消息队列 RocketMQ 版事务消息的方案,具备高性能、可扩展、业务开发简单的优势。 基本原理 概念介绍 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;  半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;  消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。 事务消息生命周期 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态;  事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;  消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;  提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;  消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试;  消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。  消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。 事务消息基本流程 事务消息交互流程如下图所示: 1. 生产者将消息发送至 RocketMQ 服务端;  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息;  3. 生产者开始执行本地事务逻辑;  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下: 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者; 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;   6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。 实现细节:RocketMQ 事务消息如何实现 根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。 处理 Half 消息 发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图: 具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。 Commit 或 Rollback 命令处理 发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图: Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费;  Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。  具体实现在 EndTransactionProcessor 中。 事务消息 check 如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图: 事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。 另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。 具体实现参考: TransactionalMessageServiceImplcheck 方法。 实战:使用事务消息 了解了 RocketMQ 事务消息的原理后,我们看下如何使用事务。首先,我们需要创建一个 “事务消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 事务消息相比普通消息发送时需要修改以下几点: 发送事务消息前,需要开启事务并关联本地的事务执行。 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。  当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意: 1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟; 2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端; 3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。 
作者:合伯
#技术探索 #功能特性

2022年7月27日

RocketMQ 消息集成:多类型业务消息——定时消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。 本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。 点击下方链接,查看直播讲解: 概念:什么是定时消息 在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。 相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。 对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。 而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。 场景:为什么需要使用定时消息 在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求: 高性能吞吐:需要大量事件触发,不能有性能瓶颈。 高可靠可重试:不能丢失事件触发。 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。  传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。 这类方案虽然可以实现定时调度,但往往存在很多不足之处: 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。 在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。 案例:使用定时消息实现金融支付超时需求 利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。 基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。 原理:RocketMQ 定时消息如何实现 固定间隔定时消息 如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。 Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。 当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。 时间轮算法 在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。 如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。 现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间, 那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。 任意秒级定时消息 在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。 TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下: 对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。 TimerWheel 和 TimerLog 直接的关系如下图所示: TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。 当需要新增一条记录的时候,例如现在我们要新增一个 “14”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “13”,然后修改 lastPos 指向 “14”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。 有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。 首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去 然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮 对于入时间轮: TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理 TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中  对于出时间轮: TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息 TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。  实战:使用定时消息 了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。 当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。
作者:凯易、明锻
#技术探索 #功能特性

2022年7月22日

生于云、长于云,RocketMQ 5.0再出发
7 月 21 日7 月 22 日,由 Apache RocketMQ 社区主办,阿里云天池平台、云原生应用平台承办的首届 RocketMQ Summit 全球开发者峰会拉开帷幕。Apache RocketMQ 联合创始人林清山发布 RocketMQ 能力全景图,为众多开发者阐述 RocketMQ 5.0 的技术定位与发展方向,来自快手、小米、字节跳动等互联网头部企业的 40 位演讲嘉宾与众多开发者分享各自行业的最佳实践与技术探索经验。 阿里云云原生应用平台负责人丁宇表示,开源让云计算更加的标准化、云计算让开源产品化和规模化,未来的数字世界,将构建在云计算和开源之上。阿里巴巴将以开源的方式,践行开放共享好科技理念,把开源作为技术战略的重要组成部分。 今天,阿里巴巴的开源项目总数超过 3000 个,涵盖云计算、大数据、AI、中间件、数据库、容器、Serverless、高可用等领域,拥有超过 30000 名 Contributor,超过百万 Star,位列中国企业社区贡献榜首,连续十年蝉联中国厂商开源活跃度第一、影响力第一。未来,阿里云也将会持续投入 RocketMQ 的开源建设,构建更加繁荣的社区生态。希望与更多的开发者、贡献者一起,追求极致、开放共享,实现开源技术的普惠。 基于自身实践,RocketMQ 社区对于消息队列演进趋势的洞察 消息队列作为最经典的中间件之一,已经有三十多年历史。伴随着技术发展,消息队列领域不断扩展,迸发新生命力,作为国内大规模实践先行者,RocketMQ 社区认为消息领域将迎来以下趋势变化: 1. 全面拥抱云原生 消息队列将向上演进消息型的产品形态,更好去支撑微服务、事件驱动、Serverless 化等云原生应用架构;向下演进消息系统自身云原生架构,通过系统重构充分释放基础设施的弹性计算、存储、网络等能力,全方位提升消息技术指标,降低消息成本,提高消息队列弹性能力。 2. 全面拥抱物联网 物联网技术将更广泛的落地到各行各业,万物互联、边缘计算进一步拓展消息的边界。面向物联网的消息队列要海量异构设备接入,海量消息队列存储,能够随处运行,具备云边端一体的无边界部署能力。 3. 全面拥抱实时数据 企业的数字化转型的步伐不断加速,从业务数字化迈向数字业务化。数字化企业持续产生业务数据,对业务数据实时洞察与决策,才能帮助企业快速响应商机、把握商机,使得业务获得更大成功。同时,消息队列也将从在线业务架构的基础设施延伸到实时数据架构的基础设施,达到事务分析一体化。 四大方向,全面解读 RocketMQ 5.0 架构演进 1. 面向微服务 引入微服务架构,数字化企业以“高内聚、低耦合”的方式高效协作。微服务架构也带来新问题,比如大量同步微服务会面临延迟增大、可用性降低等风险。为了解决这个问题,越来越多的企业引入消息队列建设异步微服务体系,进一步提高微服务的韧性,降低响应延迟。 业界的微服务技术趋势,一方面是已经形成了事实标准,比如说像 Spring Cloud 体系,Dubbo 体系,通信协议有 HTTP、AMQP 等,另一方面下一代的微服务体系也在快速发展中,主要体现在基础设施下沉,比如 Servicemesh,Serverless 等技术。 在这个趋势下,RocketMQ 5.0, 在 SDK 层面将原来的重型客户端往轻量客户端演进,基于标准 gRPC 作为 remoting 层实现 SDK,同时也将更多客户端逻辑下沉到服务端,比如消息重试、负载均衡等,大幅度降低多语言 SDK 的实现成本。轻量客户端更好的匹配了 ServiceMesh 的需求,RocketMQ 的 Mesh 能力已正式合入 CNCF Envoy 官方社区。 在负载均衡方面,RocketMQ 从原来的队列粒度负载均衡演进到了消息粒度负载均衡模式,消息粒度负载均衡更加匹配 Serverless 应用的场景,无状态 Serverless 应用弹性伸缩过程不会触发频繁的队列重平衡,降低消息重复率和端到端延迟。 RocketMQ 5.0 提供无状态 proxy,通过 proxy 可以很方便的扩展更多标准消息协议以及流量治理功能。无状态 proxy 也具备良好的的网络穿透能力,可以灵活应对企业在上云过程中面临复杂跨网络访问场景。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 RocketMQ、MNS、RabbitMQ 等多款云消息产品。其中阿里云 RabbitMQ 是一款兼容 AMQP 协议、RabbitMQ SDK 的消息服务,可以帮助开源存量用户无缝上云。同时它也充分释放了底层 RocketMQ 云原生架构的技术红利,具备和 RocketMQ 一致的高性能、无限扩展、高可用等特点,是云原生的 RabbitMQ。 2. 事件驱动(EDA) 事件驱动在 18 年被 Gartner 评为年度十大技术趋势。在未来新型的数字化商业解决方案中,会有 60% 以上的商业数字化解决方案采纳 EDA 架构。EDA 为软件架构带来彻底解耦,实现更灵活的业务扩展和业务敏捷能力,不仅可以用于单一业务领域的微服务解耦,还可以用于跨部门、跨组织、跨业务领域的事件集成。消息队列是 EDA 架构中最核心的组件,承担 eventbroker 的职责。随着 EDA 架构被大规模跨组织的落地,要进一步提高行业级生产力,标准化也迫在眉睫。为此 CNCF 推出了 CloudEvent 规范,基于统一的规范,跨系统、跨组织的数字化协同有了共同的“语言”,能够实现更高效的系统集成,有了规范也方便沉淀面向事件的统一基础软件设施,提高研发效率。 面向 EDA 趋势,RocketMQ 5.0 发布全新产品形态——Eventbridge。整个领域模型以事件为中心,并拥抱 CloudEvent 规范,CloudEvent 社区开源 SDK 可无缝接入 Eventbridge。同时,还提供各种低代码事件编排、过滤、路由能力,灵活实现各种事件集成。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 EventBridge 产品,助力云客户实现事件驱动、事件集成的商业生态。 3. 物联网 全球的 IoT 设备爆发式增长,预计到了 2025 年将达到 200 多亿台,。并且物联网也带来了边缘计算的兴起,未来将有 75% 的数据将在传统数据中心或云环境之外进行处理。目前物联网行业已经形成了多个标准协议,其中最流行莫过于 MQTT,这是"发布订阅"模式的消息协议,除此之外还有各种车联网协议、工业协议等等,物联网消息队列要具备多样化异构海量设备接入能力。RocketMQ 可作为物联网应用的基础通信设施,用于 IoT、移动设备的数据上报,还有指令下行,为 IoT 业务连接云边端。 面向 IoT 的趋势,RocketMQ 5.0 发布轻量级百万队列引擎,轻量元数据服务。在新存储内核之上,建设物联网形态消息队列 MQTT,支持标准物联网协议,支持海量物联网设备接入和海量队列存储。 RocketMQ 5.0 遵循零外部依赖的精简架构原则,新 HA 架构为低资源消耗场景提供更多选择,用户可以权衡可靠性、成本、可用性,选择最优副本策略。比如边缘场景由于资源受限,RocketMQ 不一定需要提供三副本存储,可以是 2 副本,甚至是单副本就能满足业务需求。 今天以 RocketMQ 5.0 核心能力为基础,支撑了阿里云微消息队列 MQTT,为客户提供云端一体化消息解决方案,实现万物互联、云端互联。 4. 实时大数据 未来大数据将走向实时化,预测在 2025 年实时大数据的比例将达到 30%。数字化企业通过实时感知、实时分析、实时决策,能够抓住商机、快速响应用户,实时大数据的重要性愈发突出。消息队列是实时大数据的关键技术之一,作为事件流的核心存储,它承担数据的分发,数据的缓冲,还有轻量的流处理的作用。 事件流技术越来越多的在 IoT 场景进行使用,IDC 预测未来 95% 的实时事件流将来自IoT场景;另外有越来越多的交易事件需要进行实时分析,挖掘更多业务价值,事件流技术也开始呈现事务分析一体化的趋势。 面向事件流的趋势,RocketMQ 5.0 在流存储和流分析能力进行重点打造: 流存储方面,支持批量索引,大幅度提高 RocketMQ 吞吐量。支持 compacttopic,用于实现流处理过程中的状态存储,零外部依赖。除了功能特性之外,RocketMQ 5.0 的流存储同时进行了云原生架构改造,引入逻辑队列的概念,解耦了数据逻辑分区跟物理存储之间的绑定关系,能够实现全局固定分区前提下进行无缝扩缩容,零数据迁移。 流分析方面,RocketMQ 5.0 全新发布了轻量的流计算引擎,它可以兼容 flink SQL,方便用户在不同场景无缝切换。如果用户需要大而全流计算能力,可以使用大型计算平台。如果用户有边缘计算、资源受限、简单流处理场景可以直接使用 RocketMQ 的 RSQL 来支持。 今天我们以 RocketMQ 5.0 事件流能力为基础,支撑了阿里云 Kafka 产品,存量 Kafka 用户实现无缝上云。同时基于 RocketMQ 的逻辑队列能力,阿里云 Kafka 具备快速弹性伸缩能力,提供了 Serverless 化的产品形态。让存量 Kafka 用户也能够体验到云原生架构的红利。 不断演进,RocketMQ 正式迈进 5.0 时代 在过去七年大规模云计算实践中,RocketMQ 不停自我演进。今天 RocketMQ 正式迈进了 5.0 的时代。从互联网业务消息中间扩展到“消息、事件、流”超融合处理平台,解锁了更全面能力。 在消息领域,全面拥抱云原生技术,以获得更好的弹性伸缩。在事件领域产品形态进行全面升级,拥抱行业标准,让事件驱动的架构无处不在,从单一业务的数字化系统扩展到跨组织跨业务的数字化商业生态事件驱动的架构,也同时让云计算原生的技术能够更大规模的落地,提高云产品跟用户业务的集中度。让 Serverless 的技术能够被更大范围的采纳,帮助企业客户去降本增效。在流存储和流计算领域,流存储增强批量的特性,大幅度提高 RocketMQ 数据吞吐量,新增逻辑队列能力,解耦逻辑资源跟物理资源,在流场景也具备无缝伸缩能力;新增轻量流处理引擎也提供了实时事件流处理、流分析能力。 RocketMQ 基于端云一体化架构,实现完整物联网消息队列能力,从原来连接应用扩展到连接物联网设备。同时 RocketMQ 5.0 也继续保持着极简架构原则,即便产品能力全面提升,也依然能够以最低资源消耗,最低运维代价去搭建服务。 现在 RocketMQ 已经真正具备连接一切,随处运行的能力,提供云、边端一体化实时数据解决方案。物联网设备持续的产生的数据,边缘 RocketMQ 可以进行实时数据分析,快速响应业务。通过实时 ETL,实时决策产生的高价值事件,或者数据可以传输到云端,通过 RocketMQ  eventing 能力连接更强大的公有云平台,利用云的一站式平台技术,进一步放大每份数据的价值。 不止于开源,RocketMQ 赋能海量行业客户 今天,基于 RocketMQ 5.0 为内核,阿里云也打造一站式消息平台,在统一云原生消息内核基础上,提供 6 种消息产品形态,有面向 IoT 场景的微消息队列 MQTT,有面向 EDA 场景的 EventBridge,有面向开源用户无缝上云的托管开源产品如 Kafka、RabbitMQ、RocketMQ。 通过多样化产品形态,RocketMQ 在阿里云上面已服务数万个企业用户,帮助其完成数字化转型的同时。RocketMQ 也得到业界的广泛认可。近期获得多个奖项,包括 OSCHINA 优秀技术团队奖、中国开源云联盟优秀基础软件、中国科学技术协会颁布的科创中国开源创新榜等,并进入 Apache 中国开源项目领导者象限。RocketMQ 成为第一个通过信通院可信云分布式消息队列服务的“增强级”认证,第一个通过信通院金融级稳定性评测的“先进级”认证。 随着潜在用户数的增大,RocketMQ 的商业价值也被进一步的放大。目前已经有十家的云厂商提供 Apache RocketMQ 的商业服务,它几乎覆盖了国内主流的公共云厂商。这样 RocketMQ 的用户就有了更多的选择,真正实现无厂商锁定,RocketMQ 已经成为原生消息的事实标准。 万物皆云的时代,RocketMQ 让数字化转型更简单高效,也将消息、事件、流的价值最大程度释放。Apache RocketMQ 将不断推动技术演进与落地实践,帮助企业真正实现高质量数字化转型与创新。
#社区动态

2022年7月17日

融合数据库生态:利用 EventBridge 构建 CDC 应用
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备  1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流  1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _ _ _ _ _ _ _ _ 感兴趣的小伙伴们可以扫描下方二维码加入钉钉群讨论(群号:44552972) 点击,进入 EventBridge 官网了解更多信息~
作者:昶风
#行业实践 #生态集成 #事件驱动架构