2022年1月23日

平安保险基于 SPI 机制的 RocketMQ 定制化应用
为什么选用 RocketMQ 首先跟大家聊聊我们为什么会选用 RocketMQ,在做技术选型的过程中,应用场景应该是最先考虑清楚的,只有确定好了应用场景在做技术选型的过程中才有明确的目标和衡量的标准。像异步、解耦、削峰填谷这些消息中间件共有的特性就不一一介绍了,这些特性是决定你的场景需不需要使用消息中间件,这里主要讲述下在确定使用消息中间件后,又是如何去选择哪款消息中间件的。 同步双写,确保业务数据安全可靠不丢失 我们在搭建消息中间件平台时的定位是给业务系统做业务数据的传输使用,对业务数据的很重要的一个要求就是不允许丢数据,所以选用 RocketMQ 的第一点就是他有同步双写机制,数据在主从服务器上都刷盘成功才算发送成功。同步双写条件下,MQ 的写入性能与异步刷盘异步赋值相比肯定会有所下降,与异步条件下大约会有 20% 左右的下降,单主从架构下,1K 的消息写入性能还是能达到 8W+ 的 TPS,对大部分业务场景而言性能是能完全满足要求的,另外对下降的这部分性能可以通过 broker 的横向扩招来弥补,所以在同步双写条件下,性能是能满足业务需求的。 多 topic 应用场景下,性能依旧强悍 第二点,业务系统的使用场景会特别多,使用场景广泛带来的问题就是会创建大量的 topic,所以这时候就得去衡量消息中间件在多 topic 场景下性能是否能满足需求。我自己在测试的时候呢,用 1K 的消息随机往 1 万个 topic 写数据,单 broker 状态下能达到2W左右的 TPS,这一点比 Kafka 要强很多。所以多 topic 应用场景下,性能依旧强悍是我们选用 topic 的第二个原因。这点也是由底层文件存储结构决定的,像 Kafka、RocketMQ 这类消息中间件能做到接近内存的读写能力,主要取决于文件的顺序读写和内存映射。RocketMQ 中的所有 topic 的消息都是写在同一个 commitLog 文件中的,但是 Kafka 中的消息是以 topic 为基本单位组织的,不同的 topic 之间是相互独立的。在多 topic 场景下就造成了大量的小文件,大量的小文件在读写时存在一个寻址的过程,就有点类似随机读写了,影响整体的性能。 支持事务消息、顺序消息、延迟消息、消息消费失败重试等 RocketMQ 支持事务消息、顺序消息、消息消费失败重试、延迟消息等,功能比较丰富,比较适合复杂多变的业务场景使用 社区建设活跃,阿里开源系统 另外,在选用消息中间件时也要考虑下社区的活跃度和源码所使用的开发语言,RocketMQ 使用 Java 开发,对 Java 开发人员就比较友好,不管是阅读源码排查问题还是在 MQ 的基础上做二次开发都比较容易一点。社区里同学大都是国内的小伙伴,对大家参与 RocketMQ 开源贡献也是比较亲近的,这里呢也是希望更多的小伙伴能参与进来,为国内开源项目多做贡献。 SPI 机制简介及应用 介绍完为什么选用 RocketMQ 后,接下来给大家介绍下我们是如何基于 SPI 机制应用 RocketMQ 的。SPI 全称为 (Service Provider Interface) ,是 JDK 内置的一种服务提供发现机制,我个人简单理解就是面向接口编程,留给使用者一个扩展的点,像 springBoot 中的 spring.factories 也是 SPI 机制的一个应用。如图给大家展示的是 RocketMQ 中 SPI 的一个应用。我们基于 SPI 机制的 RocketMQ 客户端的应用的灵感也是来自于 MQ 中 SPI 机制的应用。RocketMQ 在实现 ACL 权限校验的时候,是通过实现 AccessValidator 接口,PlainAccessValidator 是 MQ 中的默认实现。权限校验这一块,可能因为组织架构的不一样会有不同的实现方式,通过 SPI 机制提供一个接口,为开发者定制化开发提供扩展点。在有定制化需求时只需要重新实现 AccessValidator 接口,不需要对源码大动干戈。 接下来先给大家介绍下我们配置文件的一个简单模型,在这个配置文件中除了 sendMsgService、consumeMsgConcurrently、consumeMsgOrderly 这三个配置项外其余的都是 RocketMQ 原生的配置文件,发送消息和消费消息这三个配置项呢就是 SPI 机制的应用,是为具体实现提供的接口。可能有的同学会有疑问,SPI 的配置文件不是应该放在 METAINF.service 路径下么?这里呢我们是为了方便配置文件的管理,索性就跟 MQ 配置文件放在了一起。前面也提到了,METAINF.service 只是一个默认的路径而已,为了方便管理做相应的修改也没有违背SPI机制的思想。 我们再看下这个配置文件模型,这里的配置项呢囊括了使用 MQ 时所要配置的所有选项,proConfigs 支持所有的 MQ 原生配置,这样呢也就实现了配置与应用实现的解耦,应用端只需呀关注的具体的业务逻辑即可,生产者消费者的实现和消费者消费的 topic 都可以通过配置文件来指定。另外该配置文件也支持多 nameserver 的多环境使用,在较复杂的应用中支持往多套 RocketMQ 环境发送消息和消费多套不同环境下的消息。消费者提供了两个接口主要是为了支持 RocketMQ 的并发消费和顺序消费。接下来呢给大家分享下如何根据这个配置文件来初始化生产者消费者。首先给大家先介绍下我们抽象出来的客户端加载的一个核心流程。 客户端核心流程详情 图中大家可以看到,客户端的核心流程我们抽象成了三部分,分别是启动期、运行期和终止期。首先加载配置文件呢就是加载刚刚介绍的那个配置文件模型,在配置与应用完全解耦的状态下,必须先加载完配置文件才能初始化后续的流程。在初始化生产者和消费者之前应当先创建好应用实现的生产者和消费者的业务逻辑对象 供生产者和消费者使用。在运行期监听配置文件的变化,根据变化动态的调整生产者和消费者实例。这里还是要再强调下配置与应用的解耦为动态调整提供了可能。终止期就比较简单了,就是关闭生产者和消费者,并从容器中移除。这里的终止期指的生产者和消费者的终止,并不是整个应用的终止,生产者和消费者的终止可能出现在动态调整的过程中,所以终止了的实例一定要从容器中移除,方便初始化后续的生产者和消费者。介绍完基本流程后,接下来给大家介绍下配置文件的加载过程。 如何加载配置文件 配置文件加载这一块的话,流程是比较简单的。这里主要讲的是如何去兼容比较老的项目。RocketMQ 客户端支持的 JDK 最低版本是 1.6,所以在封装客户端时应该要考虑到新老项目兼容的问题。在这里呢我们客户端的核心包是支持 JDK1.6 的,spring 早期的项目配置文件一般都是放在在 resources 路径下,我们是自己实现了一套读取配置文件的和监听配置文件的方法,具体的大家可以参考 acl 中配置文件的读取和监听。在核心包的基础上用 springBoot 又封装了一套自动加载配置文件的包供微服务项目使用,配置文件的读取和监听都用的 spring 的那一套。配置文件加载完之后, 配置文件中应用实现的生产者和消费者是如何与 RocketMQ 的生产者和消费者相关联的呢?接下来给大家分享下这方面的内容。 如何将生产消费者与业务实现关联 首先先看下消费者是如何实现关联的,上图是 MQ 消费者的消息监听器,需要我们去实现具体的业务逻辑处理。通过将配置文件中实现的消费逻辑关联到这里就能实现配置文件中的消费者与 RocketMQ 消费者的关联。消费者的接口定义也是很简单,就是去消费消息。消费消息的类型可以通过泛型指定,在初始化消费者的时候获取具体实现的参数类型,并将 MQ 接受到的消息转换为具体的业务类型数据。由客户端统一封装好消息类型的转换。对消费消息的返回值大家可以根据需要与 MQ 提供的 status 做一个映射,这里的 demo 只是简单显示了下。在获取具体的应用消费者实例的时候,如果你的消费逻辑里使用了 spring 管理的对象,那么你实现的消费逻辑对象也要交给 spring 管理,通过 spring 上下文获取初始化好的对象;如果你的消费逻辑里没有使用 spring 进行管理,可以通过反射的方式自己创建具体的应用实例。 与消费者不一样的是生产者需要将初始化好的 producer 对象传递到应用代码中去,而消费者是去获取应用中实现的逻辑对象,那如何将 producer 传递到业务应用中去呢? 业务代码中实现的生产者需要继承 SendMessage,这样业务代码就获得了 RmqProducer 对象,这是一个被封装后的生产者对象,该对象对发送消息的方法进行的规范化定义,使之符合公司的相应规范制度,该对象中的方法也会对 topic 的命名规范进行检查,规范 topic 有一个统一的命名规范。 如何动态调整生产消费者 首先谈到动态调整就需要谈一下动态调整发生的场景,如果没有合适的使用场景的话实现动态调整就有点华而不实了。这里我列举了四个配置文件发生变化的场景: nameserver发生变化的时候,需要重新初始化所有的生产者和消费者,这个一般是在 MQ 做迁移或者当前 MQ 集群不可用是需要紧急切换 MQ; 增减实例的场景只要启动或关闭相应的实例即可,增加应用实例的场景一般是在需要增加一个消费者来消费新的 topic 的,减少消费者一般是在某个消费者发生异常时需要紧急关闭这个消费者,及时止损。 调整消费者线程的场景中我们对源码进行了一点修改,让应用端能获取到消费者的线程池对象,以便对线程池的核心线程数进行动态调整。这个的应用场景一般是在当某个消费者消费的数据比较多,占用过多的 CPU 资源时,导致优先级更高的消息得不到及时处理,可以先将该消费者的线程调小一些。 应用的优点
作者:孙园园
#行业实践

2022年1月20日

消息队列 RocketMQ 遇上可观测:业务核心链路可视化
引言:本篇文章主要介绍 RocketMQ 的可观测性工具在线上生产环境的最佳实践。RocketMQ的可观测性能力领先业界同类产品,RocketMQ 的 Dashboard 和消息轨迹等功能为业务核心链路保驾护航,有效应对线上大规模生产使用过程中遇到的容量规划、消息收发问题排查以及自定义监控等场景。 消息队列简介 进入主题之前,首先简要介绍下什么是阿里云的消息队列? 阿里云提供了丰富的消息产品家族,消息产品矩阵涵盖了互联网、大数据、物联网等各个业务场景的领域,为云上客户提供了多维度可选的消息解决方案。无论哪一款消息队列产品,核心都是帮助用户解决业务和系统的异步、解耦以及应对流量洪峰时的削峰填谷,同时具备分布式、高吞吐、低延迟、高可扩展等特性。 但是不同的消息产品在面向客户业务的应用中也有不同的侧重。简单来做,消息队列 RocketMQ 是业务领域的首选消息通道;Kafka 是大数据领域不可或缺的消息产品;MQTT 是物联网领域的消息解决方案;RabbitMQ 侧重于传统业务消息领域;云原生的产品集成和事件流通道是通过消息队列 MNS 来完成;最后事件总线 EventBridge 是一个阿里云上的一个事件枢纽,统一构建事件中心。 本篇主要讲的是业务领域的消息首选通道:消息队列 RocketMQ。RocketMQ 诞生于阿里的电商系统,具有高性能、低延迟、削峰填谷等能力,并且提供了丰富的在业务和消息场景上应对瞬时流量洪峰的功能,被集成在用户的核心业务链路上。 作为一个核心业务链路上的消息,就要求 RocketMQ 具备非常高的可观测性能力,用户能通过可观测性能力及时的监控定位异常波动,同时对具体的业务数据问题进行排查。由此,可观测性能力逐步成为消息队列 RocketMQ 的核心能力之一。 那么什么是可观测能力呢?下面简单对可观测能力进行介绍。 可观测能力 提到可观测能力,大家可能最先想到的是可观测的三要素:Metrics(指标)、Tracing(追踪)和 Logging(日志)。 结合消息队列的理解,可观测能力三要素的细化解释如下: Metrics:Dashborad 大盘 1)指标涵盖丰富:包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示; 2)消息团队最佳实践模板:为用户提供最佳模板,特别是在复杂的消费消息场景,提供了丰富的指标帮助快速定位问题,并持续迭代更新; 3)Prometheus + Grafana:Prometheus标准数据格式、利用Grafana展示,除了模板,用户也可以自定义展示大盘。 Tracing:消息轨迹 1)OpenTelemetry tracing标准:RocketMQ tracing 标准已经合并到 OpenTelemetry 开源标准,规范和丰富 messaging tracing 场景定义; 2)消息领域定制化展示:按照消息维度重新组织抽象的请求 span 数据,展示一对多的消费,多次消费信息,直观、方便理解; 3)可衔接 tracing链路上下游:消息的 tracing 可继承调用上下文,补充到完整调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging:客户端日志标准化 1)Error Code标准化:不同的错误有唯一的 error code; 2)Error Message 完整:包含完整的错误信息和排序所需要的资源信息; 3)Error Level 标准化:细化了各种不同错误信息的日志级别,让用户根据 Error、Warn 等级别配置更合适和监控告警。 了解消息队列和可观测能力的基础概念,让我们来看看当消息队列 RocketMQ 遇到可观测,会产生什么样的火花? RocketMQ 的可观测性工具的概念介绍 从上文的介绍中可以看到 RocketMQ 的可观测能力能够帮助用户根据错误信息排查消息在生产和消费过程中哪些环节出了问题,为了帮助大家更好的理解功能的应用,先简要介绍下消息生产消费流程过程中的一些概念。 消息生产和消费流程概念 首先我们先明确以下几个概念: Topic:消息主题,一级消息类型,通过Topic对消息进行分类; 消息(Message):消息队列中信息传递的载体; Broker:消息中转角色,负责存储消息,转发消息; Producer:消息生产者,也称为消息发布者,负责生产并发送消息; Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。 消息生产和消费的流程简单来说就是生产者将消息发送到 topic 的 MessageQueue 上进行存储,然后消费者去消费这些 MessageQueue 上的消息,如果有多个消费者,那么一个完整的一次消息生产发生的生命周期是什么样子的? 这里我们以定时消息为例,生产者 Producer 发送消息经过一定的耗时到达 MQ Server,MQ 将消息存储在 MessageQueue,这时队列中有一个存储时间,如果是定时消息,还需要经过一定的定时时间之后才能被消费者消费,这个时间就是消息就绪的时间;经过定时的时间后消费者 Consumer 开始消费,消费者从 MessageQueue 中拉取消息,然后经过网络的耗时之后到达消费者客户端,这时候不是低码进行消费的,会有一个等待消费者资源线程的过程,等到消费者的线程资源后才开始进行真正的业务消息处理。 从上面的介绍中可以看出,业务消息有一定的耗时处理,完成之后才会向服务端返回ack的结果,在整个生产和消费的过程中,最复杂的便是消费的过程,因为耗时等原因,会经常有消息堆积的场景,下面来重点看一下在消息堆积场景下各个指标表示的含义。 消息堆积场景 如上图,消息队列中,灰色部分的消息表示是已完成的消息量,就是消费者已处理完成并返回 ack 的消息;橙色部分的消息表示这些消息已经被拉取到消费者客户端,正在被处理中,但是还没有返回处理结果的消息,这个消息其实有一个非常重要的指标,就是消息处理耗时;最后绿色的消息表示这些消息在已经发生的 MQ 队列中已存储完成,并且已经是可被消费者消费的一个状态,称为已就绪的消息。 _已就绪消息量(Ready messages):_ _含义:已就绪消息的消息的条数。_ _作用:消息量的大小反映还未被消费的消息规模,在消费者异常情况下,就绪消息量会变多。_ _消息排队时间(Queue time)_ _含义:最早一条就绪消息的就绪时间和当前时间差。_ _作用:这个时间大小反映了还未被处理消息的时间延迟情况,对于时间敏感的业务来说是非常重要的度量指标。_ RocketMQ 的可观测性工具的功能介绍 结合上文介绍的消息队列 RocketMQ 可观测概念,下面具体对 RocketMQ 的可观测性工具的两个核心功能进行介绍。 可观测功能介绍 Dashboard Dashboard 大盘可以根据各种参数查看指定的指标数据,主要的指标数据包含下面三点: 1)Overview(概览): 查看实例据总的消息收发量、TPS、消息类型分布情况。 查看是的各个指标当前的分布和排序情况:发送消息量最多的 Topic、消费消息量最多的 GroupID、堆积消息量最多的 GroupID、排队时间最长的 GroupID 等。 2)Topic(消息发送): 查看指定 Topic 的发送消息量曲线图。 查看指定 Topic 的发送成功率曲线图。 查看指定 Topic 的发送耗时曲线图。 3)GroupID(消息消费): 查看指定 Group 订阅指定 Topic 的消息量曲线图。 查看指定 Group 订阅指定 Topic 的消费成功率。 查看指定 Group 订阅指定 Topic 的消费耗时等指标。 查看指定 Group 订阅指定 Topic 的消息堆积相关指标。 可观测功能介绍 消息轨迹 在 Tracing 方面提供了消息轨迹功能,主要包含以下三方面能力: 1)便捷的查询能力:可根据消息基本信息查询相关的轨迹;二期还可以根据结果状态、耗时时长来过滤查询,过滤出有效轨迹快速定位问题。 2)详细的 tracing 信息:除了各个生命周期的时间和耗时数据,还包含了生产者、消费者的账号和机器信息。 3)优化展示效果:不同的消息类型轨迹;多个消费 GroupID 的场景;同个消费 GroupID 多次重投的场景等。 最佳实践 场景一:问题排查 1)目标:消息生产消费健康情况 2)原则 一级指标:用来报警的指标,公认的没有异议的指标。 二级指标:一级指标发生变化的时候,通过查看二级指标,能够快速定位问题的原因所在。 三级指标:定位二级指标波动原因。根据各自业务的特点和经验添加。 基于目标和原则,生产者用户和消费者用户问题排查和分析方式如下: 场景二:容量规划 容量规划场景下只要解决下面三个问题: 1)问题一:怎样评估实例容量? 解决方法: 实例详情页》查看指定实例数据统计,可以看到所选时间段内的最大消息收发的 TPS 峰值。 铂金版实例可以根据这个数据来添加报警监控和判断业务。 2)问题二:怎样查看标准版实例的消耗 解决方法: 可以查看概览总消息量模块 3)问题三:有哪些已下线,需要清理资源? 解决方法: 指定一段时间内(例如近一周),按 Topic 的消息发送量由小到大排序,查看是否有消息发送量为 0 的 Topic,这些 Topic 相关的业务或许已下线。 指定一段时间内(例如近一周),按 GroupID 的消息消费量由小到大排序,查看是否有消息消费量为 0 的 GroupID,这些 GroupID 相关的业务或许已下线。 场景三:业务规划 业务规划场景下主要解决以下三个问题: 1)问题一:如何查看业务峰值分布情况? 解决方法: 查看 Topic 消息接收量的每天的高峰时间段。 查看 Topic 消息接收量周末和非周某的消息量差别。 查看 Topic 消息接收量节假日的变化情况。 2)问题二:如何判断目前哪些业务有上升趋势? 解决方法: 查看消息量辅助判断业务量变化趋势。 3)问题三 :怎样优化消费者系统性能? 解决方法: 查看消息处理耗时,判断是否在合理范围内有提升的空间。 本篇文章通过消息队列、可观测能力、RocketMQ 可观测概念及功能和最佳实践的介绍,呈现了 RocketMQ 的可观测性工具在业务核心链路上的可视化能力,希望给大家在日常的线上的一些问题排查和运维过程中带来一些帮助。
作者:文婷、不周
#行业实践 #可观测

2022年1月14日

全链路灰度之 RocketMQ 灰度
之前的系列文章中,我们已经通过全链路金丝雀发布这个功能来介绍了 MSE 对于全链路流量控制的场景,我们已经了解了 Spring Cloud 和 Dubbo 这一类 RPC 调用的全链路灰度应该如何实现,但是没有涉及到消息这类异步场景下的流量控制,今天我们将以上次介绍过的《》中的场景为基础,来进一步介绍消息场景的全链路灰度。 虽然绝大多数业务场景下对于消息的灰度的要求并不像 RPC 的要求得这么严格,但是在以下两个场景下,还是会对消息的全链路有一定的诉求的。 1、第一种场景是在消息消费时,可能会产生新的 RPC 调用,如果没有在消息这一环去遵循之前设定好的全链路流量控制的规则,会导致通过消息产生的这部分流量“逃逸”,从而导致全链路灰度的规则遭到破坏,导致出现不符合预期的情况。 为了防止出现这个情况,我们需要在消费时候将消息里原来的流量标复原,并在 RPC 调用的时候遵循原来的规则。我们通过架构图来详细描述一下,满足这个逻辑之后,调用链路是怎样的,从下图中我们可以看到,灰度和基线环境生产出来的消息,虽然在消息推送的时候是随机的,但是在消费过程中,产生的新的 RPC 调用,还是能够回到流量原来所属的环境。 2、第二种场景需要更加严格的消息灰度隔离。比如当消息的消费逻辑进行了修改时,这时候希望通过小流量的方式来验证新的消息消费逻辑的正确性,要严格地要求灰度的消息只能被推送给灰度的消息消费者。 今天我们就来实操一下第二种场景消息的全链路灰度,目前 MSE 仅支持 RocketMQ 消息的灰度。若您使用的是开源版 RocketMQ,那么版本需要在 4.5.0 及以上,若您使用的是阿里云商业版 RocketMQ,那么需要使用铂金版,且 Ons Client 版本在 1.8.0.Final 及以上。如果只是想使用第一种场景,只需要给 B 应用开启全链路灰度的功能即可,不需要做额外的消息灰度相关的配置。 在这次最佳实践的操作中,我们是将应用部署在阿里云容器服务 Kubernetes 版本,即 ACK 集群来演示,但是事实上,消息灰度对于应用的部署模式是没有限制性要求的,您可以参考 MSE 帮助文档,找到自己所使用的部署模式对应的接入方式,也能使用消息全链路灰度。 前提条件 1. 开通 MSE 专业版,请参见开通 MSE 微服务治理专业版[1]。 2. 创建 ACK 集群,请参见创建 Kubernetes 集群[2]。 操作步骤 步骤一:接入 MSE 微服务治理 1、安装 mseackpilot 1. 登录容器服务控制台[3]。 2. 在左侧导航栏单击市场 应用目录。 3. 在应用目录页面点击阿里云应用,选择微服务,并单击 ackmsepilot。 4. 在 ackmsepilot 页面右侧集群列表中选择集群,然后单击创建。 安装 MSE 微服务治理组件大约需要 2 分钟,请耐心等待。 创建成功后,会自动跳转到目标集群的 Helm 页面,检查安装结果。如果出现以下页面,展示相关资源,则说明安装成功。 2、为 ACK 命名空间中的应用开启 MSE 微服务治理 1. 登录 MSE 治理中心控制台[4],如果您尚未开通 MSE 微服务治理,请根据提示开通。 2. 在左侧导航栏选择微服务治理中心 Kubernetes 集群列表。 3. 在 Kubernetes 集群列表页面搜索框列表中选择集群名称或集群 ID,然后输入相应的关键字,单击搜索图标。 4. 单击目标集群操作列的管理。 5. 在集群详情页面命名空间列表区域,单击目标命名空间操作列下的开启微服务治理。 6. 在开启微服务治理对话框中单击确认。 步骤二:还原线上场景 首先,我们将分别部署  springcloudzuul、springclouda、springcloudb、springcloudc 这四个业务应用,以及注册中心 Nacos Server 和消息服务 RocketMQ Server,模拟出一个真实的调用链路。 Demo 应用的结构图下图,应用之间的调用,既包含了 Spring Cloud 的调用,也包含了 Dubbo 的调用,覆盖了当前市面上最常用的两种微服务框架。其中 C 应用会生产出 RocketMQ 消息,由 A 应用进行消费,A 在消费消息时,也会发起新的调用。这些应用都是最简单的 Spring Cloud 、 Dubbo 和 RocketMQ 的标准用法,您也可以直接在 项目上查看源码。 部署之前,简单介绍一下这个调用链路 springcloudzuul 应用在收到 “/A/dubbo” 的请求时,会把请求转发给 springclouda ,然后 springclouda 通过 dubbo 协议去访问 springcloudb, springcloudb 也通过 dubbo 协议去访问 springcloudc,springcloudc 在收到请求后,会生产一个消息,并返回自己的环境标签和 ip。这些生产出来的消息会由 springclouda 应用消费,springclouda 应用在消费消息的时候,会通过 spring cloud 去调用 B,B 进而通过 spring cloud 去调用 C,并且将结果输出到自己的日志中。 当我们调用 /A/dubbo 的时候 返回值是这样 A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 同时,A 应用在接收到消息之后,输出的日志如下 20211228 10:58:50.301 INFO 1 [essageThread_15] c.a.mse.demo.service.MqConsumer : topic:TEST_MQ,producer:C[10.25.0.30],invoke result:A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 熟悉了调用链路之后,我们继续部署应用,您可以使用 kubectl 或者直接使用 ACK 控制台来部署应用。部署所使用的 yaml 文件如下,您同样可以直接在 上获取对应的源码。 部署 Nacos Server apiVersion: apps/v1 kind: Deployment metadata: name: nacosserver spec: selector: matchLabels: app: nacosserver template: metadata: annotations: labels: app: nacosserver spec: containers: env: name: MODE value: "standalone" image: registry.cnshanghai.aliyuncs.com/yizhan/nacosserver:latest imagePullPolicy: IfNotPresent name: nacosserver ports: containerPort: 8848 apiVersion: v1 kind: Service metadata: name: nacosserver spec: type: ClusterIP selector: app: nacosserver ports: name: http port: 8848 targetPort: 8848 部署业务应用 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudzuul spec: selector: matchLabels: app: springcloudzuul template: metadata: annotations: msePilotCreateAppName: springcloudzuul labels: app: springcloudzuul spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre name: enable.mq.invoke value: 'true' image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudzuul:1.0.0 imagePullPolicy: Always name: springcloudzuul ports: containerPort: 20000 apiVersion: v1 kind: Service metadata: annotations: service.beta.kubernetes.io/alibabacloudloadbalancerspec: slb.s1.small service.beta.kubernetes.io/alicloudloadbalanceraddresstype: internet name: zuulslb spec: ports: port: 80 protocol: TCP targetPort: 20000 selector: app: springcloudzuul type: LoadBalancer status: loadBalancer: {} apiVersion: apps/v1 kind: Deployment metadata: name: springclouda spec: selector: matchLabels: app: springclouda template: metadata: annotations: msePilotCreateAppName: springclouda labels: app: springclouda spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springclouda ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudb spec: selector: matchLabels: app: springcloudb template: metadata: annotations: msePilotCreateAppName: springcloudb labels: app: springcloudb spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudb ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudc spec: selector: matchLabels: app: springcloudc template: metadata: annotations: msePilotCreateAppName: springcloudc labels: app: springcloudc spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudc ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: rockectmqbroker spec: selector: matchLabels: app: rockectmqbroker template: metadata: labels: app: rockectmqbroker spec: containers: command: sh mqbroker 'n' 'mqnamesrv:9876' 'c /home/rocketmq/rocketmq4.5.0/conf/broker.conf' env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rockectmqbroker ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 apiVersion: apps/v1 kind: Deployment metadata: name: rocketmqnameserver spec: selector: matchLabels: app: rocketmqnameserver template: metadata: labels: app: rocketmqnameserver spec: containers: command: sh mqnamesrv env: name: ROCKETMQ_HOME value: /home/rocketmq/rocketmq4.5.0 image: registry.cnshanghai.aliyuncs.com/yizhan/rocketmq:4.5.0 imagePullPolicy: Always name: rocketmqnameserver ports: containerPort: 9876 protocol: TCP containerPort: 10911 protocol: TCP containerPort: 10912 protocol: TCP containerPort: 10909 protocol: TCP apiVersion: v1 kind: Service metadata: name: mqnamesrv spec: type: ClusterIP selector: app: rocketmqnameserver ports: name: mqnamesrv98769876 port: 9876 targetPort: 9876 安装成功后,示例如下: ➜ ~ kubectl get svc,deploy NAME TYPE CLUSTERIP EXTERNALIP PORT(S) AGE service/kubernetes ClusterIP 192.168.0.1 443/TCP 7d service/mqnamesrv ClusterIP 192.168.213.38 9876/TCP 47h service/nacosserver ClusterIP 192.168.24.189 8848/TCP 47h service/zuulslb LoadBalancer 192.168.189.111 123.56.253.4 80:30260/TCP 47h NAME READY UPTODATE AVAILABLE AGE deployment.apps/nacosserver 1/1 1 1 4m deployment.apps/rockectmqbroker 1/1 1 1 4m deployment.apps/rocketmqnameserver 1/1 1 1 5m deployment.apps/springclouda 1/1 1 1 5m deployment.apps/springcloudb 1/1 1 1 5m deployment.apps/springcloudc 1/1 1 1 5m deployment.apps/springcloudzuul 1/1 1 1 5m 同时这里我们可以通过 zuulslb 来验证一下刚才所说的调用链路 ➜ ~ curl http://123.56.253.4/A/dubbo A[10.25.0.32] B[10.25.0.152] C[10.25.0.30] 步骤三:开启消息灰度功能 现在根据控制台的提示,在消息的生产者 springcloudc 和消息的消费者 springclouda 都开启消息的灰度。我们直接通过 MSE 的控制台开启,点击进入应用的详情页,选择“消息灰度”标签。 可以看到,在未打标环境忽略的标签中,我们输入了 gray,这里意味着,带着 gray 环境标的消息,只能由 springcloudagray 消费,不能由 springclouda 来消费。 _1、这里需要额外说明一下,因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布。_ _2、我们把未打标环境消费行为的选择权交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。_ 使用此功能您无需修改应用的代码和配置。 消息的生产者和消息的消费者,需要同时开启消息灰度,消息的灰度功能才能生效。 消息类型目前只支持 RocketMQ,包含开源版本和阿里云商业版。 如果您使用开源 RocketMQ,则 RocketMQ Server 和 RocketMQ Client 都需要使用 4.5.0 及以上版本。 如果您使用阿里云 RocketMQ,需要使用铂金版,且 Ons Client 使用 1.8.0.Final 及以上版本。 开启消息灰度后,MSE 会修改消息的 Consumer Group。例如原来的 Consumer Group 为 group1,环境标签为 gray,开启消息灰度后,则 group 会被修改成 group1_gray,如果您使用的是阿里云 RocketMQ ,请提前创建好 group。 默认使用 SQL92 的过滤方式,如果您使用的开源 RocketMQ,需要在服务端开启此功能(即在 broker.conf 中配置 enablePropertyFilter=true)。 默认情况下,未打标节点将消费所有环境的消息,若需要指定 未打标环节点 不消费 某个标签环境生产出来的消息,请配置“未打标环境忽略的标签”,修改此配置后动态生效,无需重启应用。 步骤四:重启节点,部署新版本应用,并引入流量进行验证 首先,因为开启和关闭应用的消息灰度功能后都需要重启节点才能生效,所以首先我们需要重启一下 springclouda 和 springcloudc 应用,重启的方式可以在控制台上选择重新部署,或者直接使用 kubectl 命令删除现有的 pod。 然后,继续使用 yaml 文件的方式在 Kubernetes 集群中部署新版本的 springcloudagray、springcloudbgray 和 springcloudcgray apiVersion: apps/v1 kind: Deployment metadata: name: springcloudagray spec: selector: matchLabels: app: springcloudagray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springclouda labels: app: springcloudagray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springclouda:1.0.0 imagePullPolicy: Always name: springcloudagray ports: containerPort: 20001 livenessProbe: tcpSocket: port: 20001 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudbgray spec: selector: matchLabels: app: springcloudbgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudb labels: app: springcloudbgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudb:1.0.0 imagePullPolicy: Always name: springcloudbgray ports: containerPort: 20002 livenessProbe: tcpSocket: port: 20002 initialDelaySeconds: 10 periodSeconds: 30 apiVersion: apps/v1 kind: Deployment metadata: name: springcloudcgray spec: selector: matchLabels: app: springcloudcgray template: metadata: annotations: alicloud.service.tag: gray msePilotCreateAppName: springcloudc labels: app: springcloudcgray spec: containers: env: name: JAVA_HOME value: /usr/lib/jvm/java1.8openjdk/jre image: registry.cnshanghai.aliyuncs.com/yizhan/springcloudc:1.0.0 imagePullPolicy: Always name: springcloudcgray ports: containerPort: 20003 livenessProbe: tcpSocket: port: 20003 initialDelaySeconds: 10 periodSeconds: 30 部署完成之后,我们引入流量,并进行验证 1. 登录 MSE 治理中心控制台[4],选择应用列表。 2. 单击应用 springclouda 应用详情菜单,此时可以看到,所有的流量请求都是去往 springclouda 应用未打标的版本,即稳定版本。 3. 点击页面下方的 标签路由中的添加按钮,为 springclouda 应用的 gray 版本设置灰度规则。 4. 发起流量调用,我们通过 zuulslb,分别发起流量调用,并查看灰度的情况。 我们通过 springclouda 和 springcloudagray 的日志去查看消息消费的情况。可以看到,消息的灰度功能已经生效, springcloudagray 这个环境,只会消费带有 gray 标的消息,springclouda 这个环境,只会消费未打标的流量生产出来的消息。 在截图中我们可以看见,springcloudagray 环境输出的日志  topic:TEST_MQ, producer: Cgray [10.25.0.102] , invoke result: Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102], springcloudagray 只会消费 Cgray 生产出来的消息,而且消费消息过程中发起的 Spring Cloud 调用,结果也是 Agray[10.25.0.101] Bgray[10.25.0.25] Cgray[10.25.0.102],即在灰度环境闭环。 而 springclouda 这个环境,输出的日志为 topic:TEST_MQ,producer:C[10.25.0.157],invoke result:A[10.25.0.100] B[10.25.0.152] C[10.25.0.157],只会消费 C 的基线环境生产出来的消息,且在这个过程中发起的 Spring Cloud 调用,也是在基线环境闭环。 步骤五:调整消息的标签过滤规则,并进行验证 因为考虑到实际场景中,springcloudc 应用和 springclouda  应用的所有者可能不是同一个人,不一定能够做到两者同时进行灰度发布同步的操作,所以在消息的灰度中,未打标环境默认的行为是消费所有消息。这样 springcloudc 在进行灰度发布的时候,可以不需要强制 springclouda 应用也一定要同时灰度发布,且使用相同的环境标。 springclouda 在消费时候,未打标环境的行为的选择权是交给 springclouda 的所有者,如果需要实现未打标环境不消费 cgray 生产出来的消息,只需要在控制台进行配置即可,配置之后实时生效。 1. 调整 springclouda 未打标环境的过滤规则。比如这里我们要选择未打标环境不再消费 gray 环境生产出来的消息,只需要在“未打标环境忽略的标签”里面选择 gray,然后点击确定即可。 2. 调整规则之后,规则是可以动态地生效,不需要进行重启的操作,我们直接查看 springclouda 的日志,验证规则调整生效。 从这个日志中,我们可以看到,此时基线环境可以同时消费 gray 和 基线环境生产出来的消息,而且在消费对应环境消息时产生的 Spring Cloud 调用分别路由到 gray 和 基线环境中。 操作总结 1. 全链路消息灰度的整个过程是不需要修改任何代码和配置的。 2. 目前仅支持 RocketMQ,Client 版本需要在 4.5.0 之后的版本。RocketMQ Server 端需要支持 SQL92 规则过滤,即开源 RocketMQ 需要配置 enablePropertyFilter=true,阿里云 RocketMQ 需要使用铂金版。 3. 开启消息灰度后,MSE Agent 会修改消息消费者的 group,如原来的消费 group 为 group1,环境标签为 gray,则 group 会被修改成 group1_gray,如果使用的是阿里云 RocketMQ,需要提前创建好修改后的 group。 4. 开启和关闭消息灰度后,应用需要重启才能生效;修改未打标环境忽略的标签功能可以动态生效,不需要重启。 相关链接 [1] MSE 微服务治理专业版: [2] Kubernetes 集群: [3] 容器服务控制台: [4] MSE 治理中心控制台
作者:亦盏
#行业实践 #功能特性

2021年12月15日

重新定义分析 - EventBridge实时事件分析平台发布
对于日志分析大家可能并不陌生,在分布式计算、大数据处理和 Spark 等开源分析框架的支持下,每天可以对潜在的数百万日志进行分析。 事件分析则和日志分析是两个完全不同的领域,事件分析对实时性的要求更高,需要磨平事件领域中从半结构化到结构化的消息转换管道,实现查询检索,可视化等功能。但是目前针对流式的事件做分析的可用工具非常少,这对于期望使用Serverless架构或 EDA(事件驱动)架构的开发者会非常不便。(更多 EDA 架构介绍参考 :) 基于事件的特征,无法追溯事件内容,无法跟踪事件流转,无法对事件做可视化分析成为了事件驱动架构演进的绊脚石。为了解决事件领域中针对流式事件做分析的难题,EventBridge 近日发布了针对事件/消息领域的全新分析工具EventBridge 实时事件分析平台。下面简要对 EventBridge 实时事件分析平台的内容进行介绍。 EventBridge 实时事件分析平台简介_ EventBridge 实时事件分析平台依托基于事件的实时处理引擎,提供数值检索、可视化分析、多组态分析、事件轨迹、事件溯源和 Schema 管理等能力。EventBridge 实时事件分析平台具有无入侵、无需数据上报,低成本,操作快捷等特点,通过简单的引导式交互,即可快速实现基于事件的流式查询与分析。 EventBridge 实时事件分析平台依托基于事件的实时处理引擎,提供数值检索,可视化分析,多组态分析,事件轨迹,事件溯源,Schema 管理等能力。EventBridge 实时事件具有无入侵,无需数据上报,低成本,操作快捷等特点,通过简单的引导式交互,即可快速实现基于事件的流式查询与分析。 核心功能 多场景支持 目前市面上比较流行的是事件查询平台,但是分析和查询还是有些本质区别,分析基于查询,但是查询并不是分析的全部。 EventBridge 构建了一套完整的事件工具链,帮助开发,运维,甚至运营团队更高效的使用分析工具,统一在一个分析平台上无缝整合全部事件,提供高效、可靠、通用的事件分析能力。 Serverless 领域:得益于 Serverless 架构的推广,事件驱动被更多用在企业核心链路。无服务器的定义是不必管理任何基础设施,但是无服务器的不透明且难以调试却是整个架构必需解决的痛点,当我们配置完触发器后不会知道什么数据在什么时刻触发了函数,触发链路是否异常。EventBridge 事件分析能力将彻底解决 Serverless触发数据黑箱的问题,让所有事件触发都清晰可见。 微服务领域:微服务在现代开发架构中比较常见,该架构由小型、松耦合、可独立部署的服务集合而成,这导致微服务架构很难调试,系统中某一部分的小故障可能会导致大规模服务崩溃。很多时候不得不跳过某些正常服务来调试单个请求。EventBridge 事件分析可将全部链路微服务消息通过事件 ID 染色做有效追踪与排障,帮助微服务做可视化排障。 消息领域:在传统消息领域,消息 Schema 管理、消息内容检索一直是无法解决的难题,大部分情况下需要增加订阅者来对消息做离线分析。EventBridge 事件分析平台提供消息 Schema 管理与消息内容查询能力,为消息可视化提供更完全的解决方案。 云产品领域:云产品在极大程度降低了企业对基础设施建设的复杂性,但同样带来了诸多问题,以 ECS 为例,很多情况会因系统错误或云盘性能受损而触发故障类事件,这类事件通常会涉及到周边产品(比如 ACK 等),捕获全部云上事件做基础排障的挑战性比较大。EventBridge 支持全部云服务事件无缝接入,更大程度降低由云产品变更导致的运维故障。 EventBridge 提供更高效、通用的事件分析平台,基于该平台可以解决大部分场景对事件分析、事件查询、事件轨迹的诉求。 开箱即用 支持提供 Schema 管理,数值检索,可视化分析,多组态分析,事件轨迹,事件溯源等核心能力,无需额外部署,即开即用。 数值检索:提供基础数值检索能力,支持键入 key,value ,= ,!= , exists ,AND,OR 等参数,满足事件检索场景的基本诉求。 可视化分析:提供 GROUP BY,ORDER BY 等可视化分析能力,支持多组态,多图表,多维度分析能力。 链路追踪:提供事件轨迹能力,还原事件整体链路状态。帮助开发者快速排障,快速定位链路问题。 低成本接入 EventBridge 支持以事件总线(EventBus)形式接入,分为云服务事件总线和自定义事件总线。云服务总线支持几乎全部阿里云产品事件,无缝支持云服务事件接入事件分析平台;自定义事件总线支持 RocketMQ、Kafka 或其他自定义事件接入(当前版本仅支持少量云服务事件)。 整体接入流程较为简单,对原有业务入侵小,可随时关闭或开启事件分析,同时实现在线配置,且具备实时生效功能。 总结_ EventBridge 提供更便捷高效的事件分析工具,可以帮助开发人员简单定义查询条件,及时进行可视化的事件内容分析。
作者:肯梦
#技术探索 #生态集成

2021年12月11日

“全”事件触发:阿里云函数计算与事件总线产品完成全面深度集成
随着云原生技术的普及和落地,企业在构建业务系统时,往往需要依赖多个云产品和服务,产品互联、系统协同的需求越来越强。事件驱动架构将事件应用于解耦服务之间的触发和交互, 能够帮助用户很好实现产品、系统之间的互联互动。函数计算作为事件驱动架构的最佳选择,需要为用户提供丰富的事件源触发能力。 对于函数计算而言,事件源接入需要清晰地了解上游每一个事件源的诸多细节和鉴权要求,同时事件处理和系统错误追踪变得越加困难,集成效率成为阻碍产品能力的最大障碍。为了加速事件源集成的效率,函数计算需要找到一种统一标准的事件源接入方式,基于通用的接入层进行基础能力和可观测性的建设,为客户提供丰富的事件源触发选择。 在这样的背景和需求下,阿里云函数计算(Function Compute)和阿里云事件总线(EventBridge)产品完成全面深度集成。这意味着函数计算和阿里云生态各产品及业务 SaaS 系统有了统一标准的接入方式,意味着函数计算将具备接入 EventBridge 所有事件源的触发能力,Serverless 函数计算将实现触达阿里云全系产品服务的“最后一公里”,为基于阿里云生态产品提供重要的架构扩展能力。 为什么是 EventBridge? 阿里云事件总线(EventBridge)是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务(SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件,EventBridge 具备事件标准化和接入标准化的能力: 事件标准化:EventBridge 遵循业界标准的 CloudEvent 事件规范,汇聚了来自阿里云生态和 EventBridge 合作伙伴丰富事件源的各种事件,同时提供了完善的事件投递机制和消费策略,整个系统事件流转遵循统一的事件格式; 接入标准化:函数计算选择和 EventBridge 集成,无论是产品服务类型众多的阿里云官方事件源,还是第三方 SaaS 系统,EventBridge 都能够为函数计算和其它系统集成提供统一的集成界面,函数计算无需关注上游事件源的具体实现细节,只需要专注于事件处理,将事件的集成和投递全部交给 EventBridge 来处理; EventBridge  + Function Compute 的结合让事件驱动型应用程序的构建变得简单,因为它可以为您完成事件摄取和交付、安全保障、授权以及错误处理工作。允许您构建松散耦合和分布的事件驱动型架构,帮助提高开发人员敏捷性和应用程序弹性。函数计算系统提供了完善的函数创建, 发布和运行体系,灵活的构建能力结合极致的运行时弹性能力将帮助业务构建云原生时代最富显著特征的事件驱动型架构。 同时,EventBridge 能够提供来自事件源(例如 MQ、OSS、RDB等)的实时数据流,并将该数据路由到阿里云函数计算作为目标。您可以设置路由规则来确定发送数据的目的地,以便构建能够实时响应所有数据源的应用程序架构。 函数计算 + EventBridge 带来的变化? 提供 90+ 事件源接入 在和 EventBridge 集成之前, 函数计算已经实现了和阿里云部分核心系统的集成,随着函数计算 EventBridge 的深度集成,阿里云生态大量服务实现了和函数计算集成, 这些服务或产品的事件将作为事件源触发函数;目前函数计算触发器类型已经从原来的 15+ 增加到 90+,并随着 EventBridge 上游接入系统的增加而不断丰富; 控制台享受一站式服务 EventBridge 和函数计算控制台数据互通,用户在 EventBridge 控制台能够以事件为主体选择函数计算作为事件处理目标,在 EventBridge 控制台享受一站式服务;同样在函数计算控制台,用户能够根据不同触发器类型根据对应的事件类型编写函数;用户无需在函数计算控制台和事件总线控制台来回跳转; 保证数据一致性和稳定性 用户无论是在函数计算控制台上通过创建触发器的方式处理指定事件源的事件;还是在 EventBridge 控制台使用函数计算作为事件处理目标,提供统一的资源视图;同时在底层系统实现上,由于后端系统 API 的深度集成,能够保证上层业务逻辑采用统一的 API 及处理逻辑,从技术层面确保了多个入口功能实现的一致性,为客户系统稳定运行奠定坚实的基础; 简化数据消费投递的复杂度 对于数据消费场景,EventBridge 负责了上游系统的对接和数据消费,用户无需关心事件源系统数据具体消费方式,这部分工作统一由 EventBridge 完成;对于函数计算用户,只需要考虑数据投递的逻辑;用户可以直接选择 EventBridge 提供的下游 Target 实现数据投递,也可以在代码层面仅使用 EventBridge 提供的 SDK 实现数据的投递,大大简化了数据投递的复杂度。 触发器业务应用场景 下面就让我们一起探索, 实际的业务生产环境,我们如何利用这两把利器让这一切简单的发生: 自动化运营分析和展示 业务系统会产生大量动态指标数据,需要提取指标数据做运营分析和展示,通过 EventBridge 和 FC 异步化串联实现自动化运营分析和展示。传统方案需要基于实时计算或者离线计算产品做数据提取和分析,整个方案较重,配置复杂。数据分析结果需要做预定义的展示渲染和推送,需要手工对接业务系统,步骤繁琐。 采用新的 EDA 架构,采用 EventBridge 对接业务自定义事件数据,规则驱动过滤逻辑简单。采用 FC 可以轻量化实现常见的数据分析操作,代码编写调试更简单;同时利用EventBridge 丰富的推送能力,可以实现分析结果快速触达受众。 异步解耦 以交易引擎为例,交易系统引擎作为最核心的系统,每笔交易订单数据需要被几十几个下游业务系统关注,包括物品批价、发货、积分、流计算分析等等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。结合 EventBridge 事件中心和函数计算灵活的逻辑扩展能力构建业务逻辑。 新零售大促场景 Serverless + EDA 整合 大型新零售场景会伴随不定期大促,平时流量不大的业务在大促场景也会产生系统流量突增,极致弹性和稳定解耦的架构至关重要。基于传统模式开发稳定可靠、高弹性的后台服务人力不足、工期紧张;大促场景保障峰值流量需要预留大量资源,平时低峰期资源闲置浪费。新零售大促场景利用函数计算 + EventBridge + API 网关搭建 Serverless 模式服务中台,支撑海量请求访问, 系统具备极致弹性,无需预留管理 IaaS 资源,极大程度降低闲置成本;同时函数计算提供敏捷开发结合 EventBridge 低代码异步驱动,业务迭代效率大幅提升。 总结 如果说事件背后的服务是阿里云生态服务的积木, 那么 Serverless 函数计算将是能够将这些积木通过轻巧的方式组合起来艺术化的最佳手段;你可以利用函数计算为这些积木涂上更绚丽的色彩,同时能够将他们串联起来,搭建一个具有无比想象空间的 SaaS/PaaS 服务艺术品。 EventBridge 触发器现已在阿里云函数计算控制台所有地域(Region)开放,欢迎大家点击进行使用体验! 关于触发器具体创建,配置,参考阿里云函数计算官方帮助文档:
作者:史明伟(世如)
#行业实践 #生态集成 #云原生

2021年11月17日

阿里云 EventBridge 事件驱动架构实践
_审核&校对:白玙、佳佳_ _编辑&排版:雯燕_ _本文内容整理自 中国开源年会 演讲_ 首先做一个自我介绍,我是 RocketMQ 的 PMC member 周新宇,目前负责阿里云 RocketMQ 以及 EventBridge 的产品研发。今天我的分享主要包括以下几部分: 消息与事件、微服务与事件驱动架构 阿里云 EventBridge:事件驱动架构实践 基于 RocketMQ 内核构建阿里云统一的事件枢纽 云原生时代的新趋势:Serverless+ 事件驱动 事件驱动架构的未来展望 消息与事件、微服务与事件驱动架构 首先,我们先讲一下消息跟事件的区别:大家都知道 RocketMQ 里面的消息,它是非常泛化的概念,是一个比事件更加抽象的概念。因为消息的内容体就是 Byte 数组,没有任何一个定义,是个弱 Data,所以它是非常通用的抽象。 与之相反的,事件可能是更加具象化的。一般情况下,它有一个 Schema 来精准描述事件有哪些字段,比如 CloudEvents 就对事件有一个明确的 Schema 定义。事件也往往代表了某个事情的发生、某个状态的变化,所以非常具象化。 从用途来讲,消息往往用于微服务的异步解耦的架构。但这一块的话,事件驱动跟消息是稍微类似的。消息的应用场景往往发生在一个组织内部,消息的生产方知道这个消息要将被如何处理。比如说在一个团队里,消息的生产者跟发送者可能是同一个团队同一块业务,对这个消息内容有一个非常强的约定。相比之下,事件更加松耦合,比如说事件发送方也不知道这个事件将被投递到什么地方,将被谁消费,谁对他感兴趣,对事件被如何处理是没有任何预期的。所以说,基于事件的架构是更加解耦的。消息的应用往往还是脱离不了同一个业务部门,即使一些大公司里最多涉及到跨部门合作。消息的使用通过文档进行约束,事件通过 Schema 进行约束,所以我们认为事件是比消息更加彻底解耦的方式。 接下来,微服务架构跟 EDA 架构有什么区别? 首先是微服务架构,微服务作为从单体应用演进而来的架构,比如说把一个单体应用拆成了很多微服务,微服务之间通过 RPC 进行组织和串联。过去一个业务可能是在本地编排了一堆 function,现在通过一堆 RPC 将之串起来。比如说用户去做一个前端的下单操作,可能后台就是好几个微服务进行订单操作,一个微服务去新建订单,一个微服务去对订单进行处理,处理完再调另一个微服务去把订单已完成的消息通知出去,这是一个典型的 RPC 架构。 但纯粹的 RPC 架构有很多问题,比如所有业务逻辑是耦合在一起的,只是把本地方法调用换成了远程调用。当业务增速达到一定阶段,会发现各个微服务之间的容量可能是不对等的,比如说短信通知可以通过异步化完成,却同步完成。这就导致前端有多大流量,短信通知也需要准备同样规模的流量。当准备资源不充足,上下游流量不对等时,就有可能导致某个微服被打挂,从而影响到上游,进而产生雪崩效应。 在这种情况下,大家一般就会引入消息队列进行异步解耦。这个架构已非常接近于事件驱动架构了,还是以用户前端创建一个订单举例,订单创建的事件就会就发到事件总线、event broker、 event bus 上,下游各个不同订阅方去对这个事件做监听处理。 不同之处在于消息订阅者基于消息中间件厂商提供 SDK 的去做消息处理,业务往往需要进行改造,也会被厂商提供的技术栈绑定;事件驱动架构中订阅者属于泛化订阅,即不要求订阅方基于什么样的技术栈去开发,可以是一个 HTTP 网关,也可以是一个function,甚至可以是历史遗留的存量系统。只要 event broker 兼容业务的协议,就可以把事件推送到不同订阅方。可以看到,泛化订阅的用途更加广泛,更加解耦,改造成本也最低。 阿里云 EventBridge:事件驱动架构实践 Gartner 曾预测, EDA 架构将来会成为微服务主流。在 2022 年它将会成为 60% 的新型数字化商业解决方案,也会有 50% 的商业组织参与其中。 同时, CNCF 基金会也提出了 CloudEvents 规范,旨在利用统一的规范格式来声明事件通信。EventBridge也是遵循这一标准。CloudEvents作为社区标准,解除了大家对于厂商锁定的担忧,提高了各个系统之间的互操作性,相当于说对各个系统约定了统一的语言,这个是非常关键的一步。 事件在开源社区有了统一的规范,但在云上,很多用户购买了云厂商很多云产品,这些云产品每天可能有数以亿计的事件在不停产生,这些事件躺在不同云服务的日志、内部实现里。用户也看不着,也不知道云产品实例在云上发生什么事情。各个厂商对事件的定义也不一样,整体是没有同一类标准。各个云服务之间的事件是孤立的,就是说没有打通,这不利于挖掘事件的价值。在使用开源产品时也有类似问题,用户往往也没有统一标准进行数据互通,想去把这些生态打通时需要付出二次开发成本。 最后,事件驱动在很多场景应用的现状是偏离线的,现在比较少的人把 EDA 架构用于在线场景。一方面是因为没有事件型中间件基础设施,很难做到一个事件被实时获取,被实时推送的同时,能被业务方把整个链路给追踪起来。所以,以上也是阿里云为什么要做这款产品的背景。 因此,我们对 EventBridge 做了定义,它有几个核心价值: 一、统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛。 二、事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级。 三、开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS服务相互集成。 首先讲一下,EventBridge 基本模型,EventBridge 有四大部分。第一部分是事件源,这其中包括云服务的事件、自定义应用、SaaS应用、自建数据平台。 第二个部分就是事件总线,这是存储实体,事件过来,它要存在某个地方进行异步解耦。类似于说 RocketMQ 里面 topic 的概念,具备一定存储的同时,提供了异步能力。事件总线涵盖两种,一种默认事件总线,用于收集所有云产品的事件,另一种自定义事件总线就是用户自己去管理、去定义、去收发事件,用来实践 EDA 架构概念。第三部分就是规则,规则与 RocketMQ 的消费者、订阅比较类似,但我们赋予规则包括过滤跟转换在内的更多计算能力。第四部分就是事件目标即订阅方,对某事件感兴趣就创建规则关联这个事件,这其中包括函数计算、消息服务、HTTP 网关等等。 这里具体讲一下这个事件规则,虽然类似于订阅,但事件规则拥有事件轻量级处理能力。比如在使用消息时可能需要把这个消息拿到本地,再决定是否消费掉。但基于规则,可以在服务端就把这个消息处理掉。 事件规则支持非常复杂的事件模式过滤,包括对指定值的匹配,比如前缀匹配、后缀匹配、数值匹配、数组匹配,甚至把这些规则组合起来形成复杂的逻辑匹配能力。 另一个,就是转换器能力,事件目标泛化定义,其接受的事件格式可能有很多种,但下游服务不一定。比如说你要把事件推到钉钉,钉钉 API 已经写好了并只接受固定格式。那么,把事件推过去,就需要对事件进行转换。我们提供了包括: 完整事件:不做转换,直接投递原生 CloudEvents。 部分事件:通过 JsonPath 语法从 CloudEvents 中提取部分内容投递至事件目标。 常量:事件只起到触发器的作用,投递内容为常量。 模板转换器:通过定义模板,灵活地渲染自定义的内容投递至事件模板。 函数:通过指定处理函数,对事件进行自定义函数处理,将返回值投递至事件目标。 目前,EventBridge 集成了 80 多种云产品,约 800 多种事件类型,第一时间打通了消息生态,比如说 RocketMQ 作为一个微服务生态,我们去实践消息事件理念,就可以把 RocketMQ 的事件直接投递到 EventBridge,通过事件驱动架构去对这些消息进行处理,甚至 MQTT、KafKa 等消息生态,都进行打通集成。除了阿里云消息产品的打通,下一步也会把一些开源自建的消息系统进行打通。另一个生态就是 ISV 生态,为什么 ISV 需要 EventBridge?以钉钉 6.0 举例,其最近发布了连接器能力。钉钉里面要安装很多软件,这些软件可能是官方提供,也可能是 ISV、第三方开发者提供,这就造成数据的互通性差。因此,我们提供这个能力让 ISV 的数据流通起来。最后就是事件驱动生态,我们当前能够触达到大概 10 多种事件目标,目前也在持续丰富当中。 事件因相对消息更加解耦、离散,所以事件治理也更加困难。所以,我们制作了事件中心并提供三块能力: 事件追踪:对每一个事件能有完整的追踪,它从在哪里产生,什么时候被投递,什么时候被过滤掉了,什么时候被投递到某个目标,什么时候被处理成功了。使整个生命周期完全追踪起来。 事件洞察&分析:让用户从 EDA 编程视角变成用户视角,让用户更加迅速的了解 EventBridge 里面到底有哪些事件,并进行可视化分析。通过 EB 做到就近计算分析,直接把业务消息导入到事件总线中,对消息进行及时分析。 事件大盘:针对云产品,引导云产品对业务事件进行定义,让云产品更加开放,从而提供大盘能力。 基于 RocketMQ 内核构建阿里云统一的事件枢纽 EventBridge 一开始就构建在云原生的容器服务之上。在这之上首先是 RocketMQ 内核,内核在这个产品里扮演的角色有两种,一种就是事件存储,当成存储来用;另一方面是利用订阅能力,把订阅转化成泛化订阅。在 RocketMQ 内核之上就是 connect 集群。EventBridge 比较重要的能力是连接,所以 EventBridge 首先要具备 Source 的能力,把事件 Source 过来,然后再存下来;其核心是 Connect 集群,每个 Connect 集群有很多 Worker。每个 Worker 要负责很多事情,包括事件的摄入,事件过滤,事件转换,事件回放,事件追踪等,同时在 Connect 集群之上有 Connect 控制面,来完成集群的治理,Worker 的调度等。 在更上面一层是 API Server,一个事件的入口网关,EventBridge 的世界里,摄入事件有两种方式,一种是通过 Connect 的 Source Connector,把事件主动的 Source 过来,另一种用户或者云产品可以通过 API server,通过我们的 SDK 把事件给投递过来。投递的方式有很多种,包括有 OpenAPI,有多语言的官方 SDK,同时考虑 CloudEvents 有社区的标准,EventBridge 也完全兼容社区开源的 SDK,用户也可以通过 Webhook 将事件投递过来。 这个架构优点非常明显: (1)减少用户开发成本 用户无需额外开发进行事件处理 编写规则对事件过滤、转换 (2)原生 CloudEvents 支持 拥抱 CNCF 社区,无缝对接社区 SDK 标准协议统一阿里云事件规范 (3)事件 Schema 支持 支持事件 Schema 自动探测和校验 Source 和 Target 的 Schema 绑定 (4)全球事件任意互通 组建了跨地域、跨账户的事件网络 支持跨云、跨数据中心事件路由 云原生时代的新趋势:Serverless+ 事件驱动 我们认为 Serverless 加事件驱动是新的研发方式,各个厂商对 Serverless 理解各有侧重,但是落地方式大道趋同。 首先,Serverless 基础设施把底层 IaaS 屏蔽掉,上层 Serverless 运行时即计算托管,托管的不仅仅是微服务应用、K8s 容器,不仅仅是函数。 EventBridge 首先把这种驱动的事件源连接起来,能够触发这些运行时。因为 Serverless 最需要的就是驱动方,事件驱动带给他这样的能力,即计算入口。EventBridge 驱动 Serverless 运行时,再去连接与后端服务。目前,EventBridge 与 Serverless 结合的场景主要是松耦合场景,比如前端应用、SaaS 服务商小程序,以及音视频编解码等落地场景。 那么,Serverless 的 EDA 架构开发模式到底是怎样的呢?以函数计算为例,首先开发者从应用视角需要转换为函数视角,将各个业务逻辑在一个个函数中进行实现;一个函数代表了一个代码片段,代表了一个具体的业务,当这段代码上传后就变成了一个函数资源,然后 EventBridge 可以通过事件来驱动函数,将函数通过事件编排起来组成一个具体的应用。 这里面 function 还需要做很多事情,大家也知道 function 有很多弊端,它最受诟病的就是冷启动。因为 Serverless 需要 scale to zero 按量付费,在没有请求没有事件去触发时,应该是直接收到 0 的,从 0~1 就是一个冷启动。这个冷启动有些时候可能要秒级等待,因为它可能涉及到下载代码、下载镜像,涉及到 namespace 的构建,存储挂载,root 挂载,这里面很多事情,各个云厂商投入很大精力优化这一块。Serverless 价格优势很明显,它资源利用率特别高,因按量付费的,所以能做到接近百分百的资源利用率,也不需要去做容量规划。 举一个简单的例子,就是基于 Serverless 加 EDA 的极简编程范式,再举一个具体的例子,新零售场景下 EDA 架构对这个业务进行改造。首先来讲,业务中有几个关键资源,可能有 API 网关、函数计算,首先可以去打通一些数据,打通 rds 并把 rds 数据同步过来,兼容一些历史架构,同时去触发计算资源、function、网关。整个架构优势非常明显,所以具备极致弹性能力,不需要去预留资源。 事件驱动的未来展望 我们认为事件驱动的未来有两部分,一是要做好连接,做好云内、跨云的集成,让用户的多元架构更加高效。二是开源生态的集成,我们可以看到开源生态愈发蓬勃,所以也需要把这些开源生态中的数据集成好。此外,还有传统 IDC 计算能力、边缘计算能力这些生态都需要有连接性软件把它连接起来。 EventBridge 是云原生时代新的计算驱动力,这些数据可以去驱动云的计算能力,创造更多业务价值。 往期推荐
作者:周新宇
#行业实践 #事件驱动架构

2021年11月6日

消息队列RocketMQ应对双十一流量洪峰的“六大武器”
_审核&校对:岁月、明锻_ _编辑&排版:雯燕_ “ 4982 亿,58.3 万笔/秒 ”的背后 在新冠肺炎疫情催化下,数字化生活方式渐成新常态。“4982 亿,58.3 万笔/秒”是 2020 天猫双 11 全球狂欢节(简称:天猫双 11 )对数字经济的先发优势和巨大潜能的直观体现。 面对千万级并发、万亿级的流量洪峰,背后有力支撑的便是双十一交易核心链路的官方指定产品:消息队列 RocketMQ 。 双十一交易场景业务痛点 随着双十一的逐年升温,保障交易场景的稳定性已成为各企业在双十一业务中的关键,每年双十一活动的凌晨,是“万民狂欢”的日子,同时也是各企业交易系统备受考验的时候,保证核心交易系统的业务处理能力、有效应对每秒数十万笔的交易订单成为重中之重,若不能进行流量缓冲将直接引发这些系统的崩溃。避免系统崩溃的核心“秘诀”便是消息队列 RocketMQ。 消息队列 RocketMQ 是如何帮助各企业交易系统扛住瞬间千万级 TPS、万亿级流量洪峰的冲击,并保持各个应用之间的消息通畅的呢?下面为您介绍消息队列 RocketMQ 应对双十一流量洪峰的“六大武器”。 消息队列 RocketMQ 的“六大武器” 双十一的流量洪峰究竟会给用户和商家系统业务带来哪些问题?消息队列 RocketMQ 的“六大武器”是如何解决这些问题的呢?小编带您初探一二: 武器一:“异步解耦” 背景:双十一的夜晚,当用户在手机上“指点江山”时,可曾想,一个小小的购物 APP 背后其实是一个个庞大的系统,从用户选购商品的那一刻起,就要和成百个业务系统打交道,每一笔交易订单数据都会有几百个下游业务系统的关联,包括物流、购物车、积分、直充、流计算分析等等,整个系统庞大而且复杂,架构设计稍有不合理,将直接影响主站业务的连续性。 面对如此复杂且庞大的系统,避免系统业务之间相互耦合影响,便要用到消息队列 RocketMQ 的“异步解耦”功能,通过消息队列 RocketMQ 实现上、下游业务系统松耦合,松耦合可以降低系统的复杂度,缩短用户请求的响应时间(将原多个步骤的所需时间之和压缩到只需一条消息的时间),保证下游某个子系统的故障不影响整个链路。 武器二:“削峰填谷” 背景:在处理完交易业务背后庞大的系统所带来的耦合性问题后,从用户视角出发来看,双十一期间 0 点这个时间有成百上千万的用户在同时点击着购买页面,由于用户海量请求,导致流量激增,面对如此大量的访问流量,下游的通知系统可能无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。 为解决这些问题,就要用到消息队列 RocketMQ 的“削峰填谷”功能,可在应用和下游通知系统之间加入消息队列 RocketMQ,RocketMQ 支持高并发的消息低延迟写入,以及无限的堆积能力,可以避免超高流量的冲击,确保下游业务在安全水位内平滑稳定的运行。 武器三:“分布式事务消息” 背景:通过前面的介绍了解到,通过消息的异步解耦,可实现消息的分布式处理,在传统的分布式事务处理方式中,用户创建了一条新的订单信息,伴着这条订单信息的变更,在整个业务链条中的购物车、用户表、积分等都需要变更,系统需要借助分布式事务协调组件来保证多个业务调用的事务一致性。传统的分布式事务组件追求强一致性,性能吞吐低,系统复杂。那如何才能既实现分布式事务,同时又不使系统过于复杂? 这个时候消息队列 RocketMQ 的“分布式事务消息”的功能便起到了关键作用,通过原创的轻量级订单流转事务协调能力,只需发送一条消息,就可以实现消息最终一致性的分布式事务,同时确保订单状态持久化和下游调用一致。 武器四:“消息过滤” 背景:通过以上介绍会发现从客户下单到客户收到商品这一过程会生产一系列消息,按消息种类可以分为交易消息、物流消息、购物车消息等,如何保证各个种类的消息进行有效投递并被准确消费? 这时候就要用到消息队列 RocketMQ 的“消息过滤”功能,可以通过 Tag 给不同种类的消息定义不同的属性,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。比如给物流消息定义地域属性,按照地域分为杭州和上海: 订单消息 物流消息 物流消息且地域为杭州 物流消息且地域为上海 武器五:“定时消息” 背景:除了以上系统级别中可能出现的问题外,用户自己在购物过程中可能都遇到过一些小细节,比如在点击了购买按钮后,会出现“请您在 30 分钟内完成支付”的提示,如果超过 30 分钟未支付,订单就会自动关闭。 这个业务用到的是消息队列 RocketMQ 的“定时消息”功能,消息队列 RocketMQ 可以实现自定义秒级精度间隔的定时消息,通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息,最终实现海量订单状态变更超时的中心调度。 武器六:“顺序收发” 背景:在双 11 大促中,买家业务侧和交易系统本身会面临诸多问题,卖家侧也会遇到一些难点,比如,买家买了东西,卖家自己却看不到。 为了解决这个问题,一般需要使用消息队列的顺序消息同步能力将买家表的变更订阅同步到卖家表。此时依赖 RocketMQ 的无热点、高性能、高可靠顺序消息可以保障数据库变更的顺序同步,保证买卖家订单同步。 总结 通过以上介绍,带您了解了消息队列 RocketMQ 的六大武器在双十一“战场”上的威力,2021 年“双十一”开战在即,消息队列 RocketMQ 为您双十一的业务保架护航,同时铂金版可提供 99.99% 的服务可用性和 99.99999999% 的数据可靠性,联系我们,期待陪您的业务一起在 2021 双十一中“乘风破浪”。
作者:不周
#行业实践

2021年11月2日

基于消息队列 RocketMQ 的大型分布式应用上云最佳实践
_审核&校对:岁月、佳佳_ _编辑&排版:雯燕_ 前言 消息队列是分布式互联网架构的重要基础设施,在以下场景都有着重要的应用: 应用解耦 削峰填谷 异步通知 分布式事务 大数据处理 并涉及互动直播、移动互联网&物联网,IM 实时通信、Cache 同步、日志监控等多个领域。 而本文主要围绕着商业版本的消息队列 RocketMQ,和开源版本 RocketMQ 进行比较,并结合一些实践中的场景来展示大型分布式应用的上云最佳实践。 核心能力 商业版本消息队列 RocketMQ 相比较开源版本 RocketMQ 和其他竞品,主要有以下几点优势。 1. 开箱即用、功能丰富 2. 高性能、无限扩展能力 3. 可观测、免运维能力 4. 高 SLA 和稳定性保证 开箱即用、功能丰富 消息队列 RocketMQ 提供了定时、事务、顺序等多类型消息的支持,且支持广播、集群两种消费模式;另外在协议层面,提供 TCP/HTTP 多协议支持,还提供了 TAG/SQL 属性过滤功能,极大程度地拓宽了用户的使用场景。 高性能、无限拓展能力 消息队列 RocketMQ 经受了阿里核心电商历年双十一洪峰的考验,支持千万级 TPS 消息收发和亿级消息堆积的能力,并且能够为消息提供毫秒级端到端延迟保障,另外还提供分级存储,支持海量消息的任意保存时间。 可观测、免运维能力 消息队列 RocketMQ 提供了一个可观测性大盘,支持细粒度数据大盘,提供了消息全链路生命周期追踪和查询能力,对各个指标提供了相应的监控报警功能;此外,还提供了消息回溯和死信队列功能,能够保证用户的消息能够随时回溯消费。 高 SLA 和稳定性保障 消息队列 RocketMQ 的稳定性是我们一贯、持续、稳定投入的重要领域,提供了高可用部署和多副本写入功能;另外也支持同城多 AZ 容灾和异地多活。 产品剖面 接下来,我们会从以上的产品核心能力中挑选几个剖面,并且结合具体的场景和实践来做进一步的介绍。 多消息类型支持 高可用顺序消息 商业版本消息队列 RocketMQ 使用的顺序消息我们称之为高可用顺序消息。在介绍高可用顺序消息之前,首先简要介绍下开源版本 RocketMQ 的顺序消息。 顺序消息分为两种类型,全局顺序消息和分区顺序消息。 全局顺序消息:在 RocketMQ 存储层只会分配一个分区,也就是说全局顺序 Topic 的可用性跟单一副本的可用性强相关,且不具备可扩展的能力。 分区顺序消息:所有消息根据 Sharding Key 进行分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段。 下图是分区顺序消息的应用场景,order ID 即为此时顺序消息的 Sharding Key。 可以看到,无论是全局顺序消息还是分区顺序消息,都依赖了单一分区天然的 FIFO 特性来保证顺序,因此顺序性也只能在同一个分区内保证,当此分区所在的副本不可用时,顺序消息并不具备重试到其他副本的能力,此时消息的顺序性就难以得到保证。 为了解决这一问题,我们设计并实现了高可用顺序消息。 高可用顺序消息有以下几个特点: 一个逻辑顺序分区(PartitionGroup)下有多个物理分区。 其中任意一个物理分区是可写的,那么整个逻辑分区是可写且有序的。 我们基于 happenedbefore 的原则设计了一套基于分区位点的排序算法。 根据该算法,消费者在消费某一逻辑分区时,会从其所属的各个物理分区中拉取消息并进行合并排序,得出正确的消息顺序流。 通过这样的设计,高可用顺序消息解决了下列几点问题: 可用性问题:高可用顺序消息将具备与普通消息一致的可用性,在某副本不可用时,可快速重试至其它副本。 可扩展性问题:普通顺序消息,特别是普通全局顺序消息,不具备良好的扩展能力,只能固定在特定的副本中。高可用顺序消息的逻辑顺序分区可以将物理顺序分区分散在多个副本中。 热点问题:普通顺序消息根据 Key 将一类消息 Hash 至同一个分区中,热点 Key 会导致热点分区,高可用顺序消息具备横向扩展能力,可以为逻辑顺序分区添加多个物理分区来消除热点问题。 单点问题:普通全局顺序消息,仅包含单分区,极易出现单点故障,高可用顺序消息可以消除全局顺序消息的单点问题。 尤其需要注意的是热点问题,在阿里巴巴内部某电商业务大促时,因发送到顺序 Topic 的某一特定的 ShardingKey 数量过多,集群中一个副本接收到了大量该 ShardingKey 的消息,导致该副本超出其负荷上限,造成了消息的延迟和堆积,一定程度上影响了业务。在使用了高可用顺序消息之后,由于其在多物理分区中的负载均衡特性,提升了集群顺序消息的承载能力,从而避免了热点问题的出现。 秒级精准定时消息 定时消息,是指客户端当前发送但希望在未来的某个时间内收到的消息。定时消息广泛应用于各类调度系统或者业务系统之中。比如支付订单,产生一个支付消息,系统通常需要在一定时间后处理该消息,判断用户是否支付成功,然后系统做相应处理。 开源版本的 RocketMQ 只支持几个指定的延迟级别,并不支持秒级精度的定时消息。而面向集团内和云上多样化的需求,开源版本的定时消息并不能满足我们的需求,因此我们推出了秒级精准定时消息。 如下图所示,我们基于时间轮设计并实现了支持任意定时时间的秒级精准定时消息,同时满足以下特性: 任意定时时间 超长定时时间 海量定时消息 删除定时消息 高可用 高性能 内部某用户有这样的场景,期望在未来的某一分钟的 30s 时刻处理这样一个定时请求,开源版本的定时消息并不符合其需要,而秒级精准定时消息在保证高可用、高性能的同时,满足了其业务需求。 分布式事务消息 如下图所示,在传统的事务处理中,多个系统之间的交互耦合到一个事务中,造成整体的相应时间长,回滚过程复杂,从而潜在影响了系统的可用性;而 RocketMQ 提供的分布式事务功能,在保证了系统松耦合和数据最终一致性的前提下,实现了分布式事务。 消息队列 RocketMQ 提供的事务消息处理步骤如下: 发送方将半事务消息发送至消息队列 RocketMQ 版服务端。 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。 发送方开始执行本地事务逻辑。 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。 基于这样的实现,我们通过消息实现了分布式事务特性,即本地事务的执行结果会最终反应到订阅方是否能接收到该条消息。 消息队列 RocketMQ 的分布式事务消息广泛地应用于阿里巴巴核心交易链路中,通过分布式事务消息,实现了最小事务单元;交易系统和消息队列之间,组成一个事务处理;下游系统(购物车、积分、其它)相互隔离,并行处理。 分级存储 背景 随着云上客户的不断增多,存储逐渐成为 RocketMQ 运维的重要瓶颈,这包括并且不限于: 1. 内存大小有限,服务端不能将所有用户的数据全部缓存在内存中;在多租户场景下,当有用户拉取冷数据时,会对磁盘造成较大 IO 压力,从而影响共享集群的其他用户,亟需做到数据的冷热分离。 2. 云上有单租户定制化消息存储时长的需求。而 RocketMQ Broker 中所有用户的消息是放在一个连续文件中进行存储的,无法针对任何单一用户定制存储时长,即现有的存储结构无法满足这样的需求。 3. 如果能对海量数据提供更低成本的存储方式,可以大幅降低云上 RocketMQ 的磁盘存储成本。 基于以上现状,分级存储方案应运而生。 架构 分级存储的整体架构如下: 1. connector 节点负责将 broker 上的消息实时同步到 OSS 上 2. historyNode 节点将用户对冷数据的拉取请求转发至 OSS 上 3. 在 OSS 中是按照 Queue 粒度来组织文件结构的,即每个 Queue 会由独立的文件进行存储,从而保证了我们可以针对于租户定义消息的存储时长。 通过这样的设计,我们实现了消息数据的冷热分离。 使用场景 基于分级存储,我们进一步拓展了用户的使用场景: 1. 自定义存储时间:在消息数据的冷热分离之后,我们将冷数据存储到 OSS 这样的存储系统中,能够实现用户自定义的存储时间。 2. 消息审计:在消息的存储之间从数天扩展到自定义后,消息的属性从一个临时性的中转数据变成了用户的数据资产,而消息系统也从数据中枢转变成了数据仓库;用户能够基于数据仓库实现更多样的审计、分析、处理功能。 3. 消息回放:在流计算场景中,消息回放是非常重要的一个场景;通过拓展消息的存储时间之后,流计算能够实现更加丰富的计算分析场景。 稳定性 消息队列 RocketMQ 的稳定性是我们一贯、持续、稳定投入的重要领域。在介绍我们在稳定性的最新工作之前,首先带大家回顾下 RocketMQ 高可用架构的演进路线。 高可用架构演进路线 2012 年,RocketMQ 作为阿里巴巴全新一代的消息引擎问世,并随后开源至社区,第一代 RocketMQ 高可用架构也随之诞生。如下图所示,第一代高可用架构采取当时流行的 MasterSlave 主从架构,写流量经过 Master 节点同步至 Slave 节点,读流量也经过 Master 节点并将消费记录同步至 Slave 节点。当 Master 节点不可用时,整个副本组可读不可写。 2016 年,RocketMQ 云产品正式开始商业化,云时代单点故障频发,云产品需要完全面向失败而设计,因此 RocketMQ 推出了第二代多副本架构,依托于 Zookeeper 的分布式锁和通知机制,引入 Controller 组件负责 Broker 状态的监控以及主备状态机转换,在主不可用时,备自动切换为主。第二代架构是消息云产品规模化进程中的核心高可用架构,为云产品规模化立下了汗马功劳。 2018 年,RocketMQ 社区对 Paxos 和 Raft 引入分布式协议有极大的热情,RocketMQ 研发团队在开源社区推出了基于 Raft 协议的 Dledger 存储引擎,原生支持 Raft 多副本。 RocketMQ 高可用架构已经走过了三代,在集团、公有云和专有云多样场景的实践中,我们发现这三套高可用架构都存在一些弊端: 第一代主备架构只起到了冷备的作用,且主备切换需要人工介入,在大规模场景下有较大的资源浪费以及运维成本。 第二代架构引入了 Zookeeper 和 Controller 节点,架构上更加复杂,在主备切换做到了自动化,但故障转移时间较长,一般是 10 秒左右完成选主。 第三代 Raft 架构目前暂未在云上和阿里集团内大规模应用,且 Raft 协议就决定了需要选主,新主还需要被客户端路由发现,整个故障转移时间依然较长;另外,强一致的 Raft 版本并未支持灵活的降级策略,无法在可用性和可靠性之间做灵活的权衡。 为了应对云上日益增长的业务规模、更严苛的 SLA 要求、复杂多变的专有云部署环境,当前的消息系统需要一种架构简单、运维简单、有基于当前架构落地路径的方案,我们将其称作秒级 RTO 多副本架构。 新一代秒级 RTO 多副本架构 秒级 RTO 多副本架构是消息中间件团队设计实现的新一代高可用架构,包含副本组成机制、Failover 机制、对现有组件的侵入性修改等。 整个副本组有以下特点: Strong Leader/No Election:Leader 在部署时确定,整个生命周期内不会发生切换,但可在故障时被替换。 仅 Leader 支持消息写入:每一个副本组仅 Leader 接受消息写入,Leader 不可用时,整个副本组不可写入。 所有的副本支持消息读取:虽然 Leader 上拥有全量的消息,Follower 上的消息量不对等,但所有的副本都支持消息的读取。 灵活的副本组数量:可以基于可靠性、可用性和成本自由选择副本组的数量。 灵活的 Quorum 数量:最终所有的消息都会同步到整个副本组上,但副本组内可以灵活配置写成功最小副本数。例如 23 模式,3 副本情况下,2 副本成功即为写成功。同时,在副本不可用的情况下,Quorum 数量也可以动态自行降级。 在上述副本组的概念下,故障转移可以复用当前 RocketMQ 客户端的机制来完成。如下图所示: Producer 在主不可用时,灵活快速地切换至另一个副本组。 Consumer 在某个副本不可用时可快速切换至同副本组另一个副本上进行消息消费。 可观测性 健康大盘 我们在可观测性方面也做了大量的工作,为用户提供了一个消息系统的可观测性健康数据大盘。如下图所示,用户能够清晰的看到实例级别、topic 级别、group 级别的各种监控数据,能够全方面地监控、诊断问题。 消息链路追踪 另外我们还基于消息轨迹提供了消息全链路轨迹追踪功能。如下图所示,用户能够在控制台上看到完整的消息生命周期、从消息的发送、存储、到消费,整个链路都能被完整地记录下来。 应用场景 客户痛点:业务出现消费堆积的用户需要根据消息轨迹抽样数据,综合分析后才能大致判断引起问题原因,排查困难。 核心价值:提高线上运行问题排查的效率,和问题定位的准确性。直接在健康大盘上快速发现风险最高的 Topic 和 Group,并根据各个指标的变化情况快速定位原因。例如消息处理时间过长可以扩容消费者机器或优化消费业务逻辑,如果是失败率过高可以快速查看日志排除错误原因。 事件驱动 大家一定非常熟悉 Gartner,在2018年的一个评估报告里,Gartner 将 EventDriven Model,列为了未来10大战略技术趋势之一,并且,做出了两个预测: 2022年,超过 60% 的新型数字化商业解决方案,都会采用事件通知的软件模型。 2022年,超过 50% 的商业组织,将会参与到EDA生态系统当中去。 同一年,CNCF 基金会也提出了 CloudEvents,意在规范不同云服务之间的事件通讯协议标准。到目前为止,CloudEvents也已经发布了多个消息中间件的绑定规范。 可见事件驱动是未来业务系统的一个重要趋势,而消息天然具备和事件的亲近性,因此消息队列 RocketMQ,是坚决拥抱事件驱动的。 谈到消息和事件,这里做一个简单的阐述:消息和事件是两种不同形态的抽象,也意味着满足不同的场景: 消息:消息是比事件更通用的抽象,常用于微服务调用之间的异步解耦,微服务调用之间往往需要等到服务能力不对等时才会去通过消息对服务调用进行异步化改造;消息的内容往往绑定了较强的业务属性,消息的发送方对消息处理逻辑是有明确的预期的。 事件:事件相对于消息更加具像化,代表了事情的发送、条件和状态的变化;事件源来自不同的组织和环境,所以事件总线天然需要跨组织;事件源对事件将被如何响应没有任何预期的,所以采用事件的应用架构是更彻底的解耦,采用事件的应用架构将更加具备可扩展性和灵活性。 在2020年,阿里云发布了事件总线 EventBridge 这一产品,其使命是作为云事件的枢纽,以标准化的 CloudEvents 1.0 协议连接云产品和云应用,提供中心化的事件治理和驱动能力,帮助用户轻松构建松耦合、分布式的事件驱动架构;另外,在阿里云之外的云市场上有海量垂直领域的 SaaS 服务,EventBridge 将以出色的跨产品、跨组织以及跨云的集成与被集成能力,助力客户打造一个完整的、事件驱动的、高效可控的上云新界面。 而借助事件总线 EventBridge 提供的事件源功能,我们能够打通消息到事件的链路,使得消息队列 RocketMQ 具备事件驱动的动力,从而拥抱整个事件生态。接下来我们将借助一个案例,如下图所示,为大家展示这一功能。 创建消息队列 RocketMQ 主题 创建目标服务 我们基于容器服务快速创建一个事件驱动的服务,计算负载 Deployment 的 yaml 如下,该服务能够响应事件并将结果打印到标准输出中。 apiVersion: apps/v1 for versions before 1.8.0 use apps/v1beta1 kind: Deployment metadata: name: eventbridgehttptargetdeployment labels: app: eventbridgehttptarget spec: replicas: 2 selector: matchLabels: app: eventbridgehttptarget template: metadata: labels: app: eventbridgehttptarget spec: containers: name: ebhttptarget 下述镜像暴露了一个 HTTP 地址(/cloudevents)用于接收 CloudEvents,源码参考:https://github.com/aliyuneventbridge/simplehttptarget image: registry.cnhangzhou.aliyuncs.com/eventbridgepublic/simplehttptarget:latest ports: containerPort: 8080 前往容器服务控制台,进入服务与路由的服务页面,创建一个私网访问类型的 Service,并做好端口映射。 创建事件总线 EventBridge 自定义总线 我们来到事件总线 EventBridge 控制台,创建一个自定义总线 demowithk8s。 创建事件总线 EventBridge 自定义总线规则 我们为总线 demowithk8s 创建一个规则,并选择 HTTP 作为事件目标,选择专有网络类型,选中对应的 VPC、 VSwitch 以及安全组,并指定目标URL,如下图所示: 创建事件总线 EventBridge 事件源 我们为该自定义事件总线添加消息队列 RocketMQ 版的自定义事件源。 发送 RocketMQ 消息 接下来我们回到消息队列 RocketMQ 控制台,通过控制台的快速体验消息生产功能发送一条内容为 hello eventbridge 的消息到对应的主题中去。 接下来我们就可以发现,这条 RocketMQ 消息,以 CloudEvent 的形式被投递到了对应的服务中去,我们从而打通了消息到事件的链路。同时,基于我们上述提到的分级存储功能,消息队列 RocketMQ 转变成了一个能够源源不断提供事件的数据仓库,为整个事件生态提供了更加广阔的场景。 事件驱动是未来商业组织和业务系统的重要趋势,而消息队列 RocketMQ 会坚定地拥抱这一趋势,将消息融入到事件的生态中。 总结 我们选取了消息队列 RocketMQ 的几个产品剖面,从多消息类型、分级存储到稳定性、可观测性,再到面向未来的事件驱动,并结合与开源 RocketMQ 的对比,及具体应用场景的分析,为大家展示了基于消息队列 RocketMQ 的大型分布式应用上云最佳实践。
作者:绍舒
#行业实践

2021年10月28日

阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
从“消息”到“消息、事件、流”的大融合 消息队列作为当代应用的通信基础设施,微服务架构应用的核心依赖,通过异步解耦能力让用户更高效地构建分布式、高性能、弹性健壮的应用程序。 从数据价值和业务价值角度来看,消息队列的价值不断深化。消息队列中流动的业务核心数据涉及集成传输、分析计算和处理等不同环节与场景。伴随着不断演进,我们可以预见消息队列势必在数据通道、事件集成驱动、分析计算等场景不断产生新价值,创造新的“化学反应”。 RocketMQ 诞生于阿里巴巴内部电商系统,发展至今日,其核心架构经历了多次关键演进: 早在 2007 年,淘宝电商系统做服务化拆分的时候,就诞生了第一代消息服务 Notify,这是 RocketMQ 最早雏形。Notify 采用了关系型数据库作为存储,使用推模式。在阿里淘宝这种高频交易场景中,具有非常广泛地应用。 在 20072013 年期间,随着阿里集团业务发展,不仅需要交易场景异步调用,同时需要支持大量传输埋点数据、数据同步。此时,内部衍生出 MetaQ 以及 RocketMQ3.0 版本,这两个版本开始探索自研存储引擎,采用了自研专有消息存储,支持了单机海量 Topic,并前瞻性地去除了 Zookeeper 等组件的外部依赖。在十年后的今天,我们看到去各种 keeper 已成为整个消息领域的发展主流。 经历了前三代的内部业务打磨后,阿里巴巴积极参与开源并将 RocketMQ3.0 贡献到开源社区,并于 2017 年从 Apache 孵化器毕业,成为中国首个非 Hadoop 生态体系的 Apache 社区顶级项目。此后,RocketMQ 也开始服务于阿里云企业客户。秉承开源、商业、内部三位一体发展策略,18 年发布的 4.x 版,在高可靠低延迟方面重点优化,构建了全新的低延迟存储引擎和多场景容灾解决方案、并提供了丰富的消息特性。这也使得 RocketMQ 成为金融级的业务消息首选方案。 上个月社区发布了 RocketMQ5.0preview 版,正式宣告 5.0 的到来。RocketMQ5.0 将不再局限于消息解耦的基本场景,更是通过统一内核、存储的优势,提供消息、事件、流一体化的处理能力。 回顾 RocketMQ 发展的十余年,良好的社区环境和商业支持使得大量企业开发者可以很方便的跟进业务特点和诉求进行选型和验证。在社区活跃影响力方面,RocketMQ 社区项目收获 15000+Star,活跃的贡献者有 400+ 位,多语言、生态连接等周边活跃项目 30+ 个,深受社区开发者欢迎。在应用规模方面,RocketMQ 作为金融级业务消息方案,积累了互联网游戏、在线教育、金融证券、银行、政企能源、汽车出行等众多行业数以万计的企业客户。同时,在阿里巴巴内部担负业务核心链路,每天流转万亿级消息流量,扛过了历届双十一的零点峰值。在行业评测方面,RocketMQ 也多次斩获大奖。 官宣:阿里云新一代 RocketMQ “消息、事件、流”融合处理平台 今天发布阿里云消息队列 RocketMQ 版 5.0,我们称之为一站式“消息、事件、流”融合处理平台。 新版本核心诞生两大新亮点,首先是消息核心场景的扩展和布局,RocketMQ 5.0 不再局限于消息解耦场景,将全新布局事件驱动和消息流式处理场景;其次则是一站式融合处理的技术架构和趋势。 “消息、事件、流”一站式融合处理的技术架构可以实现一份消息存储,支持消息的流式计算、异步投递、集成驱动多种场景,极大地降低业务人员运维多套系统的技术复杂度和运维成本。可以说,无论是微服务的指令调用、异步通知,还是 CDC 变更日志、行为埋点数据,亦或是资源运维、审计事件,统一的 RocketMQ5.0 产品栈都能统一处理。 重大发布一: RocketMQ 基础架构全新升级 首先,最重要的升级是阿里云 RocketMQ 的技术架构全面焕新。 全新的 RocketMQ5.0 版将通用的存储逻辑下沉,集中解决消息存储的多副本、低延迟、海量队列分区等技术问题,将上层的消息处理和剥离出完全的无状态计算层,主要完成协议适配、权限管理、消费状态、可观测运维体系支持。得益于存算分离的架构设计,从 SDK 接入到线上运维全链路带来全面提升: 1. 轻量版 SDK 的开放和全链路可观测系统的提升:同时支持 4.x 通信协议和全新的 gRPC 通信协议,并内置 OpenTelemetry 埋点支持,新版本 SDK 新增了 10 余个指标埋点。 2. 消息级负载均衡:新版本 SDK 不再参与实际存储队列的负载均衡,消息负载均衡将更加轻量,以单条消息为调度最小单元。 3. 多网络访问支持:新版本支持单一实例同时暴露公网、内网等访问形式,方便客户多网络接入访问。 4. 海量分级存储:新版本开放分级存储历史消息保存能力,消息低成本无大小限制,最长保存 30 天。冷热数据进行分离设计,极大降低消费历史消息对实例的性能影响。 重大发布二: RocketMQ Streaming 云上最佳实践——消息ETL 消息基础架构的能力提升之外,阿里云 RocketMQ 在 Streaming 流式处理场景推出了轻量级消息 ETL 功能。 用户在数据库变更、终端数据上报、后台埋点日志等场景产生的消息,典型的消费场景就是数据清洗转化,同时再存储到外部的存储和离线分析、在线分析系统中。传统实现方案需要搭建 Flink 等重量级实时计算服务或者自建消费应用做消息处理。而使用商业版 RocketMQ ETL 功能,简单控制台配置即可实现消息的清洗和转化。RocketMQ ETL 功能有三大优势: 1. 轻量无依赖:作为阿里云消息原生功能,使用时不需要部署外部计算服务或消费程序,方案更轻量。 2. 开发门槛低:内置常见清洗转化模板,满足绝大多数消息内容处理需求,并支持用户快速编写自定义函数来支持特殊的业务逻辑。整体开发成本非常低,1 小时即可完成业务上线。 3. Serverless 弹性:无需预先估算容量,采取 Serverless 无服务器模式,实现按需弹性伸缩。 重大发布三: EDA 云上最佳实践——事件中心 EventBridge 本次 RocketMQ 最后一个发布点是在事件驱动的业务场景的布局和演进。早在 2018 年,Gartner 评估报告将 EDA(EventDrivenArchitecture) 列为十大战略技术趋势之一,事件驱动架构将成为未来微服务主流。我们首先下一个定义: 事件驱动其本质是对消息驱动的再升级,是企业IT架构深度演进的下一个必然阶段。 事件驱动架构和消息驱动架构的区别和关联主要集中于以下三点: 1. EDA 更加强调深层次解耦:消息驱动是同一业务、组织系统内不同组件之间在技术架构层面的调用解耦,其信息封装和处理都是有预期、预定义的。事件驱动适配是更宽泛的业务、组织系统,基于事件的解耦上下游之间无需有预期和行为定义,上下游统一遵循标准化的规范,这是更深度的解耦。 2. EDA 更加强调连接能力:消息驱动更多是单一系统内的调用,而事件驱动往往会涉及到不同的地域、账户主体以及三方 SaaS 的协同,事件驱动的一大特征就是生态的强连接能力。 3. EDA 更加强调 Serverless 低代码开发:类比于消息和微服务的协同关系,未来业务架构 Serverless 化的大趋势会推动业务开发模式逐步转向低代码配置化。事件驱动的另一大特征就是低代码开发,基于丰富的工具能力,业务侧不需要像消息驱动一样编写大量的生产消费代码。 因此,阿里云统一事件中心 EventBridge 产品带来如下能力: 1. 统一标准化的事件集成生态:作为阿里云事件中心,集成 80 余款云产品的业务事件,支持 800 多种事件类型,用户使用 EventBridge 可以一次性管理所有云产品资源的变更、操作使用事件,避免对接多个产品接口的重复性劳动。 2. 全球事件互通网络:贯彻事件驱动强连接的属性能力,本次发布了全球事件互通网络,首批支持国内五大地域事件互通。企业客户简单配置即可实现跨账号、跨地域、跨网络的事件聚合和流转。 3. Serverless 低代码开发:内置十余种事件目标和处理模板,涵盖了大多数业务场景,客户简单配置、低代码,无需部署服务即可完成事件的驱动和处理。 面向未来: 坚定推动“消息、事件、流”大融合的发展 RocketMQ5.0 的发布标志着阿里云消息从消息领域正式迈向了“消息、事件、流”场景大融合的新局面。未来阿里云消息产品的演进也将继续围绕消息、事件、流核心场景而开展。消息基础架构本身也必将步伐不断,继续朝着 Serverless 弹性、强容灾能力、可观测免运维方向推进,给客户带来高性能、高可靠、强容灾的高 SLA 服务;并在 Streaming 的场景会基于客户业务诉求,联合生态产品持续推出更多的消息处理计算服务;打造面向未来的企业集成模式,联合生态伙伴和开源社区大力推动事件驱动进一步发展。
#技术探索

2021年10月17日

基于 RocketMQ 的基金数字化陪伴体系的架构实践
行业背景 基金公司的核心业务主要分为两部分,一部分是投研线业务,即投资管理和行业研究业务,它体现了基金公司核心竞争力。另一部分是市场线业务,即基金公司利用自身渠道和市场能力完成基金销售并做好客户服务。 博时基金管作为中国内地首批成立的五家基金管理公司之一,截至 2021 年 6 月 30 日,博时基金公司共管理 276 只公募基金,管理资产总规模逾 15482 亿元人民币,累计分红逾 1465 亿元人民币。 随着互联网技术发展,基金销售渠道更加多元化,线上成为基金销售重要渠道。相比传统基金客户,线上渠道具有客户基数大,水平参差不齐的特点。对于那些还不成熟的客户,我们需要做好陪伴,让他们理解风险,理解投资。 RocketMQ 在陪伴体系中的应用 1、陪伴场景概述 博时基金建立了一套全方位多层次陪伴体系,从用户层面、市场层面和产品层面为用户提供投前、投中、投后的有温度的投资陪伴体验。 每个陪伴场景的达成,需要公司多个部门不同团队协同配合来完成。依赖与投研、合规、运营、大数据等上下游多个系统。但这些系统可能采用不同技术架构,实现方式各异,如果采用同步调用方式来实现协同,耦合度太高,不利于未来扩展。 2、RocketMQ 解耦异构系统 RocketMQ 提供高效可靠的消息传递特性和发布订阅机制,非常适合用于这种上下游异构系统间的解耦。我们把原来基于文件、邮件的协作方式全部线上化、流程化和机制化,大大提升了陪伴输出效率。对于这种涉及多方系统的协作,需要对消息进行合理地归类,以便进行过滤和索引。RocketMQ 提供的 Topic 和 Tags 就是用来做这件事的。 3、Topic 和 Tags 最佳实践 Topic 与 Tag 作为业务上用来归类的标识,分别属于一级分类和二级分类,这种层次化的分类标识与企业组织架构比较类似,可以结合起来实现消息过滤。举个例子,对于陪伴系统的 Topic,运营系统订阅运营类消息,我们给这类消息打上 TagA 的标签,客服系统订阅客服类消息 TagB,陪伴编排系统订阅编排类消息 TagC,合规系统需要对运营和陪伴消息进行合规审查,因此它需要订阅 TagA 和 TagC,最后是数据中心,所有的消息都要处理,因此它需要监听所有 Tag。 RocketMQ 事务消息的金融应用场景 1、金融场景概述 接下来,我们讲解一下典型的金融场景优惠购。在博时基金 APP 上申购基金可以享受低至 0 折的费率优惠,具体业务怎么样实现?这里有有两种方式,第一种先充值博时钱包,底层是替客户购买了一笔货币基金,然后再用博时钱包购买目标基金。这种方式需要用户操作两次,比较繁琐,容易引起客单流失。另外一种方式就是优惠购,把两步购买基金封装成一次事务操作。对投资者来说,开启优惠购服务后,操作少一步,投资更简单! 2、领域事件理论模型 领域事件是指业务流程的一个步骤将导致进一步的业务操作,比方说登录事件,比方说基金购买事件等。在领域模型里面,领域事件事务采用的是最终一致性,区别于强一致性,它是弱一致性的一种。在领域模型映射到微服务系统架构时,微服务之间的数据不必要求强一致,因此领域事件可以解耦微服务。依据是否跨微服务,可以分为两种场景:  第一种场景:当领域事件发生在同一个微服务。由于大部分事件发生在同一个进程内,自身可以很好地控制事务。但如果一个事件需要同时更新多个聚合,按照 DDD 中一次事务只更新一个聚合的原则,就需要引入事件总线,就是 eventbus 这种模式。  第二种场景:跨微服务。领域事件发生在微服务之间的场景比较多,事件处理的机制也更加复杂。跨微服务的事件可以推动业务流程或者数据在不同的子域或微服务间直接流转,因此需要一个协调者来推进全局事务。跨微服务的事件机制要总体考虑事件构建、发布和订阅、事件数据持久化、消息中间件、分布式事务机制等,其中具备事务消息功能的消息中间件是这个解决方案的核心组件。 3、分布式事务方案对比 在博时基金的业务场景下,需要解决的问题是事务一致性与服务解耦度之间的矛盾,因此我们的目标是让主从事务解耦,保证核心逻辑稳定,同时不因为解耦而牺牲最终一致性。因此,当时做出了几种不同的解决方案:  第一种方案:最常见普通消息+异步对账,这个方案的问题是无法保证主事务的执行和入队同时成功,需要时效性低的对账补偿解决,一致性只是较高。 第二种方案:本地消息表,对比上一种做法,它由业务将写入消息表放到主事务中,把主事务和入队变成一个原子操作,然后业务读取入队记录,自己投递给从事务。它的缺点是主事务和消息表在存储上是耦合的,没有解耦度。 第三种方案:引入 XA 事务,是个两阶段提交的协议,实现难度较大。而且面临两个问题:一是这是一种同步阻塞协议,有锁占用导致并发不会太高,另外就是 XA 事务过程中,在参与者投赞成票后,如果协调者发生故障,节点不清楚应该提交还是中止,只能等待协调者恢复。这时候可能会出现业务中断。 第四种方案:TCC,专门处理分布式事务的 TCC,只侧重于一致性,无解耦度,也是不可行。 第五种方案:事务消息,它能同时兼顾解耦度和一致性,是最合适的模式。 最终我们选择了 RocketMQ 的事务消息作为分布式事务的解决方案。 4、RocketMQ 事务消息核心流程 基于 RocketMQ 的事务消息搭建事务中心,协调分布式事务的推进和回滚。以优惠购为例,核心流程如下: 第一阶段:Prepare 阶段 ,即业务系统将 RocketMQ 的半事务消息发送到事务中心,事务中心不做发布,等待二次确认。这个阶段 RocketMQ 的半消息在消费者端是感知不到的。 第二阶段:业务系统执行主事务,即购买货币基金。 第三阶段:主事务成功后 commit 到事务中心,由事务中心投递消息到从事务。如果主事务失败,就投递 rollback 给事务中心。这里需要两阶段提交的原因是:普通的入队操作无论放在主事务之前还是之后都无法保证最终一致。如果先执行主事务,再入队,那么可能在入队前,业务会宕机,就没有机会再入队了。如果先入队再执行主事务,那么可能主事务没有执行成功,但是从事务执行成功了,业务逻辑就会发生错乱。 由于网络抖动等原因,可能导致事务消息的二次确认丢失。此时需要依赖某种机制恢复整个分布式事务的上下文,RocketMQ 提供的反查机制正是为解决分布式事务中的超时问题而设计的。我们的事务中心的反查机制流程主要是,先检查事务中心的内部状态,再通过反查接口检查本地事务的执行结果,恢复事务上下文后,正常推进后续的流程。 5、RocketMQ 如何保证事务消息在消费端正常消费 消费端消费失败后,MQ 服务端需要进行一定次数的重试,我们需要制定合理的重试策略。因为有消费重试,这要求消费方接口需要实现幂等性;如果重试多次后仍失败,我们会把消息压入死信队列 DLQ,RocketMQ 提供了死信队列的功能,对进入死信队列的消息进行告警处理。   6、事务消息的适用场景 第一类场景:需要同步执行的领域事件,比如说领域事件逻辑失败概率大,业务要及时将返回码告知客户端,自然不能放在异步流程中。举个例子,做过支付系统的小伙伴都知道,支付扣款前要检查余额是否足够,如果余额不足,那在异步流程中重试多少次都是失败。  第二类场景:是事务不可重入场景,例如业务系统发送消息时没有确定一个唯一事务 ID,那后续的业务逻辑就无法保证幂等,假设其中一个事务是创建订单,如果不能保证幂等的话,重试多次就会产生多个订单;所以这里需要使用到事务消息,用来明确一个分布式事务的开始,生成一个唯一事务 ID,让后续的流程能以这个事务 ID 来保证幂等。 未来规划 目前,我们基于 RocketMQ 在客户陪伴体系上解耦了上下游的服务,提升了运营和陪伴的效率。同时,我们在 RocketMQ 事务消息的基础上,搭建了这样一个支持分布式事务的服务协调平台,也就是我们的事务中心,大大提升了对金融场景化的产品包装能力。未来,我们将围绕着事务中心,拓宽更多的金融应用场景,创造更大的业务价值。
作者:伍振河
#行业实践