2022年10月31日

RocketMQ 重试机制详解及最佳实践
引言 本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。 RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。 生产者发送重试 RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢? 1. 生产者重试次数 RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。  2. 生产者重试间隔 在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略: 非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔。 流控错误场景:系统会按照预设的指数退避策略进行延迟重试。 为什么要引入退避和随机抖动?  如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connectionbackoff.md。 INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒; MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6; JITTER :随机抖动因子,默认值:0.2; MAX_BACKOFF :等待间隔时间上限,默认值:120 秒; MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。 ConnectWithBackoff() current_backoff = INITIAL_BACKOFF current_deadline = now() + INITIAL_BACKOFF while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff MULTIPLIER, MAX_BACKOFF) current_deadline = now() + current_backoff + UniformRandom(JITTER current_backoff, JITTER current_backoff) 特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。 3. 重试带来的副作用 不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。  较多的重试次数也会增大服务端的处理压力。  4. 用户的最佳实践是什么 1)合理设置发送超时时间,发送的最大次数 发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。 2)如何保证发送消息不丢失 由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种: 1. 向调用方返回业务处理失败; 2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。  3)关注流控异常导致无法重试 触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。 4)早期版本客户端如何使用故障延迟机制进行发送重试? 对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用: producer.setSendLatencyFaultEnable(true) 配置重试次数使用: producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed() 消费者消费重试 消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢? RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题: 如何保证业务完整处理消息? 消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。 业务应用异常时处理中的消息状态如何恢复? 当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。 1. 什么是消费重试? 什么时候认为消费失败? 消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败。  消费重试是什么? 消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;  重试过程状态机:消息在重试流程中的状态和变化逻辑;  重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;  最大重试次数:消息可被重试消费的最大次数。   2. 消息重试的场景 需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。 推荐使用场景: 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功; 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。   正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。 错误使用场景: 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。  反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。 消费处理中使用消费失败来做处理速率限流,是不合理的。 限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。 这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面: RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力; 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。  3. 不要以重试替代限流 上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。 如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。 RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置订阅组名称 .setConsumerGroup(consumerGroup) // 设置订阅的过滤器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // 阻塞直到获得一个令牌,也可以配置一个超时时间 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); 4. PushConsumer 消费重试策略 PushConsumer 消费消息时,消息的几个主要状态如下: Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费; Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态; Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机; DLQ:死信状态 消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。 该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。 最大重试次数   PushConsumer 的最大重试次数由创建时决定。 例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。 重试间隔时间 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下: 说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。 顺序消息:重试间隔为固定时间,默认为 3 秒。  5. SimpleConsumer 消费重试策略 和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。 例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。 修改消息不可见时间需要满足以下条件: 消息处理未超时 消息处理未提交消费状态  如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。 最大重试次数 与 PushConsumer 相同。 消息重试间隔   消息重试间隔 = 不可见时间 - 消息实际处理时长 例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。 SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。 ClientServiceProvider provider1 = ClientServiceProvider.loadService(); String topic1 = "Your Topic"; FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG); SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder() //设置消费者分组。 .setConsumerGroup("Your ConsumerGroup") //设置接入点。 .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build()) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); List messageViewList = null; try { //SimpleConsumer需要主动获取消息,并处理。 messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 //没有ack会被认为消费失败 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); } 修改消息的不可见时间   案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。 simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync(); 6. 功能约束与最佳实践 设置消费的最大超时时间和次数   尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。 不要用重试机制来进行业务限流  错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计  正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。 总结 本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。 重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。
作者: 斜阳
#行业实践 #功能特性

2022年10月24日

EventBridge 生态实践:融合 SLS 构建一体化日志服务
引言 阿里云日志服务 SLS 是一款优秀的日志服务产品,提供一站式地数据采集、加工、查询与分析、可视化、告警、消费与投递等服务。对于使用 SLS 的用户业务而言,SLS 上存储的日志信息反映着业务的运行状态,通过适当地流转加工即可创建一定价值。 另一方面,阿里云 EventBridge 作为云上事件枢纽,每天承载着大量事件的流转。云上资源的操作事件、消息队列中的数据、用户业务中的自定义事件等,是否有一站式的配置工具来将这些数据统一收敛到 SLS,进而使用 SLS 强大的加工、分析能力也是一个具有价值的问题。 为了支持上述日志、数据流入流出 SLS 的场景,阿里云 EventBridge 在近期支持了 SLS 能力。用户在 EventBridge 上通过简单地配置,即可实现数据写入 SLS 和将 SLS 中日志路由到不同的 EventBridge 目标端。EventBridge 对 SLS 的支持是全面的,用户既可以在事件总线中使用 SLS,也可以在事件流中使用。本文将从 SLS 在 EventBridge上 的使用以及若干最佳实践场景等方面,为大家介绍如何基于 EventBridge 构建 SLS 相关应用。 基于 EventBridge 使用 SLS 阿里云 SLS 日志服务 SLS[1] 是一款云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。 SLS 在 EventBridge 上的应用 阿里云 EventBridge 提供了事件总线[2]与事件流[3]两款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件经事件规则路由到多个目标。而事件流则更轻量化,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便,适用于端到端的流式数据处理场景。SLS 目前对事件总线与事件流均已支持。 针对 SLS 事件源,EventBridge 会构造一个 SLS source connector,其会实时地从 SLS 服务端拉取日志。数据拉取到 EventBridge 后,会进行一定的结构封装,保留用户日志、SLS 系统参数等数据,同时增加 event 所需要的一些系统属性。 SLS Event 样例可参考如下示例。 data 部分代表用户日志内容,其中以“__”开头和结尾的字段表示日志项的 SLS 系统属性。 { "datacontenttype": "application/json;charset=utf8", "aliyunaccountid": "1756789", "data": { "key1": "value1", "key2": "value2", "__topic__": "TopicCategory", "__source__": "SourceCategory", "__client_ip__": "122.231..", "__receive_time__": "1663487595", "__pack_id__": "59b662b2257796280" }, "subject": "acs:log:cnqingdao:1756789:project/demoproject/logstore/logstore1", "aliyunoriginalaccountid": "1756789", "source": "testSLS", "type": "sls:connector", "aliyunpublishtime": "20220918T07:53:15.387Z", "specversion": "1.0", "aliyuneventbusname": "demoBus", "id": "demoprojectlogstore11MTY2MzExODM5ODY4NjAxOTQyMw==0", "time": "20220918T07:53:12Z", "aliyunregionid": "cnqingdao", "aliyunpublishaddr": "10.50.132.112" } 针对 SLS 事件目标,EventBridge 使用 logProducer 将 event 整体作为一个字段投递到 SLS,字段 key 名称为“content”。 使用介绍 SLS 事件源   在使用 SLS 作为事件源时(这里包含了事件总线中的事件源和事件流中的事件源),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) 起始消费位点 调用角色  在创建 SLS 事件源时,EventBridge 会自动在对应 LogStore 下创建一个以“eventbridge”开头的消费组,事件源或事件流被删除时,对应消费组资源也会被清理。 日志项目与日志库参数,用户根据已创建的 Project 和 LogStore 去填写即可。 起始消费位点参数指定了新任务启动时的初始消费位点。这里可以选择“最早位点”、“最新位点”与“指定时间”。“最早位点”即从当前 LogStore 中最早的日志开始消费,会导致大量历史日志被读取,建议结合业务谨慎选择;“最新位点”则表示消费对应 EventBridge 任务启动后的日志;“指定时间”需要用户填写时间戳(以秒为单位),消费从此时刻开始的日志。 针对调用角色,其实是允许 EventBridge 以这个角色的身份去调用读取用户 SLS 日志。用户需要创建一个自定义角色,并将其授信给事件总线 EventBridge。角色的权限方面则可以按照需要去进行设置,在权限最小的原则基础上,权限策略提供的角色应保证事件总线 EventBridge 可以读取对应 LogStore 日志与消费组的增删操作,至少赋予角色 LogStore 消费权限与消费组的增删操作。参考示例: { "Version": "1", "Statement": [ { "Action": [ "log:ListShards", "log:GetCursorOrData", "log:GetConsumerGroupCheckPoint", "log:UpdateConsumerGroup", "log:ConsumerGroupHeartBeat", "log:ConsumerGroupUpdateCheckPoint", "log:ListConsumerGroup", "log:CreateConsumerGroup", "log:DeleteConsumerGroup" ], "Resource": [ "acs:log:::project//logstore/", "acs:log:::project//logstore//" ], "Effect": "Allow" } ] } SLS 事件目标   在使用 SLS 作为事件目标时(这里包含了事件总线中的事件目标和事件流中的事件目标),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) Topic 调用角色  日志项目、日志库参数含义同 SLS 事件源。Topic 即 SLS 日志主题,用户可以根据需要进行设置,非必填内容。 在创建 SLS 事件目标时,确保使用的调用角色有写入给定日志库权限即可。参考示例: { "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:::project//logstore/" ] } ] } 使用示例 SLS 事件源和事件目标,其事件总线与事件流的参数配置相同,这里示例了如何创建  SLS 事件源和事件目标的 EventBridge 事件流。 前期准备   1. 开通 EventBridge 服务; 2. 开通 SLS 服务并创建 Project 与 Store。 创建 SLS 事件源   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“日志服务 SLS”; 4. 在“日志服务 SLS”一栏中选配置 SLS Project、LogStore、起始消费位点与角色配置。 创建 SLS 事件目标   1. 在创建事件流的事件目标时,服务类型选择“日志服务”; 2. 配置 SLS Project、LogStore、日志主题、日志内容、角色配置等参数。 3. 保存启动即可创建事件流。 最佳实践示例 异步架构完备性校验 在使用消息队列搭建异步应用架构时,会偶发遇到消息丢失的情况,这种情况下的问题排查通常较为麻烦,需要确定问题到底是出在发送端、消费端还是消息队列上,这种场景可以使用 SLS + EventBridge 来进行相关预警和现场保留。 1. 业务 1 发送消息到消息队列,业务 2 异步消费 MQ 中的消息,实现架构解耦; 2. 消息发送端和消费端,在完成消费发送、消费的相关操作后,均将操作日志打印出来,并采集到 SLS 上,日志中可以包含消息 ID 等字段以确保可溯源; 3. 配置 EventBridge 事件流,事件提供方为 SLS,事件接收方为函数计算 FC; 4. FC 中的服务读取 SLS 中日志内容,若发现针对某条消息,若仅有发送日志无消费日志,则说明可能存在漏消息的可能性,需要相关人员及时介入排查。 异常业务异步处理 部分消息队列如 RocketMQ 有死信队列能力,当用户消费失败达到一定次数时,消息会被投递到死信队列。用户也可以使用 SLS + EventBridge 构建业务死信队列,以完成对异常情况的处理。 例如下图是一个电商平台的订单处理系统,当订单处理成功时,相关信息会被写入 DB 或者进行后续操作。但如果订单处理异常用户又不想要阻塞现有订单处理流程,则可以将处理异常订单的流程异步处理。 1. 用户下单/付款,订单系统进行业务处理,处理成功则将数据变更写入 DB; 2. 订单处理异常,记录相关信息日志; 3. 搭建 EventBridge 事件规则。事件源为 SLS,事件目标为函数计算 FC; 4. 当有异常业务日志产生时,日志内容被 SLS 事件源拉取,随后投递到 FC,由专门的服务来处理异常订单。当然,在架构设计时也可以将异常订单信息直接投递到函数计算,但对于大部分业务系统而言,当有异常出现时通常都会进行相关日志的打印,即异常日志大概率是存在的,这个时候使用 SLS + EventBridge 则无需再使用函数计算的发送客户端,仅按需打印日志即可,对业务的侵入性更小。 消息备份 目前阿里云上的消息队列产品种类丰富,用户在使用消息队列实现业务解耦的同时,也会产生对消息内容进行加工分析的需求。SLS 拥有强大的数据加工能力,使用 EventBridge 将消息路由到 SLS,在实现消息备份的同时也可以利用 SLS 的分析加工能力来提升业务的可观测性。 1. 搭建 EventBridge 事件流。事件提供方为各种云上消息队列,事件目标方为日志服务 SLS; 2. 使用 SLS 的能力完成消息的加工、查询、分析与可视化。 自建 SQL 审计 目前 EventBridge 已经支持了 DTS 作为事件源的能力,使用 EventBridge 可以轻松实现构建自定义 SQL 审计的需求。 1. 用户新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 _相关链接_ _[1] 日志服务SLS_ _[2] 事件总线_ _[3] 事件流_
作者:昶风
#行业实践 #生态集成

2022年9月30日

行业实践:Apache RocketMQ 业务集成典型行业应用和实践
消息典型应用场景 阿里云拥有丰富的消息产品家族,除了 RocketMQ 以外,还有大家熟知的对标开源的云 Kafka、支持 AMQP 协议的开源消息队列 RabbitMQ、物联网通信网关 MQTT、 对标 AWS SQS/SNS 的 Serverless 版消息 MNS(现在也是轻量版 RocketMQ)以及云上事件总线、事件中心 EeventBridge 。 阿里云所有消息产品均采用 RocketMQ 作为底层存储引擎,为用户提供稳定、可靠、高性能的消息通信能力,比如百万 TPS、百万队列、毫秒级通信延迟、分级存储、 Serverless 弹性等众多的消息产品。也带来了丰富的应用场景,分为应用集成和数据集成两大类。 应用集成以 RocketMQ 为主,应用最为广泛,本文也将分享 RocketMQ 在微服务解耦、电商交易、金融支付等场景下的最佳实践,比如银行的交易流水、保单的支付流转等。RabbitMQ 、MQTT 也主要用于应用集成场景,比如物联网、 IoT 双向通信、云产品的事件通知以及后处理等。对于新建的业务场景,一般首推 RocketMQ 作为消息选型,因为 RocketMQ 拥有最丰富的功能特性;而对于存量的业务迁移,则可以根据具体使用的消息产品来进行选择,以降低迁移成本。 数据集成以云 Kafka 为主,在大数据分析、日志采集分析等场景下应用最为广泛,比如游戏的玩家操作、广告埋点、数据分析、应用数据监控等。各种 SaaS 类的集成、聚石塔、电商数据打通等场景,则主要使用 EventBridge。更多的产品选型对比,可以参考专题页中的消息队列产品选型。 业务消息使用场景 RocketMQ 经过阿里集团内部十年锤炼,经过双 11 大促等万亿级规模的实时场景验证,SLA 最高可支持 4 个9,数据可靠性支持 9 个9。 以微服务解耦、订单交易、金融支付等场景为例,在互联网、电商、零售、金融等行业,消息的使用量非常大。尤其是在秒杀大促时,为了保障系统的稳定运行,需要 RocketMQ 进行削峰填谷。另外金融客户对每笔交易、每个订单也都要求数据不能丢失。因此在此类场景普遍对消息的可靠传输、海量并发、低延迟、稳定性与容灾等有着非常高的要求。RocketMQ 提供了丰富的消息类型,比如事务消息、定时消息、顺序消息等。 在交易系统里,为了简化交易流程,一般使用事务消息和定时消息。同时 RocketMQ 也提供了消息轨迹查询、消息 dashboard ,可以非常方便地对每个消息进行回溯,对每个 topic 或者 group 进行监测。RocketMQ 5.0 也提供了丰富的实例规格,从百级别 QPS 到百万级 QPS ,可以覆盖大部分应用场景。RocketMQ 默认提供多副本、多可用区部署,也提供了跨地域消息路由能力,支持客户构建高可用容灾或多活,且 RocketMQ 能够支持 99.9%的消息 RT 在 10ms 传输。 RocketMQ 事务消息举例 实现订单状态机异步流转 以订单状态机异步流转为例。此前,如果收到一笔订单交易,需要逐个通知下游的服务模块,比如需要更新购物车、更新积分等。每个业务模块耦合在一起会导致大促时的流量峰值非常大,需要每个服务模块保障其处理性能。而基于 RocketMQ 的事务消息能力,即可轻松实现订单子流程系统的异步解耦和流量的削峰填谷,事务消息可以确保数据库订单状态持久化和下游通知的事务性。 收到 LBL 订单交易时,可以先向 RocketMQ 发送一条半事务消息,此时 RocketMQ 会 hold 住消息,等核心交易事务完成后再向 MQ 提交确认半事务消息的状态,并执行下游服务模块的通知。假设核心交易模块失败,则会废弃之前提交的半事务消息,不通知下游。 对比此前的传统事务模块,使用 RocketMQ 可以大幅简化交易链路,缩短下单耗时。尤其是在大促场景下,可以解耦下游的服务模块,提供削峰填谷的能力。 超时中心 RocketMQ 的定时消息场景也是常见的使用方式。 比如双 11 等大促场景存在大量预售订单、定点尾款等,会带来大量定时任务。在电商交易过程中,订单流转也存在多个超时状态的任务,处理超时状态的任务需要确保可靠及时。以传统的方案进行构建分布式调度机制实现的时候,比如基于定时器调度延迟大,可能会存在性能瓶颈。 而采用 RocketMQ 的定时消息,实现将变得非常简单。定时任务只需提交一条延迟消息到 RocketMQ ,由 RocketMQ 保障定时消息达到秒级的精度,最高可支持百万级别的 TPS 能力,同时也能支持消息的消费重试,保障任务可靠触发,相比传统的使用方式大大简化了定时的复杂度。 RocketMQ 灰度策略举例 微服务全链路灰度 微服务场景下,精准地控制灰度流量并进行灰度版本验证,是保障线上业务稳定运行的关键。大部分情况下,用户通过划分不同的环境来进行灰度发布,对应 RocketMQ 的不同实例。但是很多用户希望能够简化环境管理,尽可能复用线上资源,结合消息来提供微服务全链路灰度能力。 如上图所示,线上已经在运行的微服务模块游 A、B、C,C 模块会产生消息,并由 A 模块进行消费。此时对服务模块 A 和 C 做灰度发布,则线上会存在两条泳道,一条是正常的业务流量,一条是灰度链路。我们希望线上版本 C 模块生产的消息能够被线上版本 A 模块进行消费,灰度版本 C 模块生产的消息能够被灰度版本 A 模块进行消费。 RocketMQ 支持透传环境标签,可在生产端给消息属性添加标签,然后开启 RocketMQ 的 SQL 92 语法过滤,服务端即可完成消息的过滤和路由,从而降低客户端的压力。 本文讲述了 RocketMQ 的业务消息场景、一些功能特性的使用方法,包括事务消息、定时消息、消息全链路灰度等,欢迎大家尝试使用。
作者:洛浩
#行业实践

2022年9月23日

Apache RocketMQ 在阿里云大规模商业化实践之路
阿里云消息队列 RocketMQ 商业化历程 RocketMQ 诞生于 2012 年,诞生即开源。2012~2015 年,RocketMQ 一直在通过内部电商业务打磨自身服务能力,并在 2015 年于阿里云上线公测。2016 年,阿里云 RocketMQ 完成商业化,同时被捐赠给 Apache 基金会,同年获得了年度受欢迎中国开源软件荣誉。 在 Apache 孵化期间,Apache RocketMQ 经历了快速发展,2017 年即毕业成为了 Apache 顶级项目。同年,Apache RocketMQ TLP RocketMQ 4.0 正式发布。此后,RocketMQ 4.0 经历了长足发展,期间阿里云商业和开源相辅相成、齐头并进,直到今天,共同迈入 RocketMQ 5.0 时代。 RocketMQ 5.0 发布后,阿里云商业会持续采取 OpenCore 的发展模式,秉承上游优先的社区发展原则,与社区一起将 RocketMQ 打造为一个超融合的数据处理平台。 阿里云消息队列产品矩阵 阿里云基于 RocketMQ 消息底座,构建了多元化的消息产品系列。 RocketMQ 是阿里云主打的消息品牌,互联网新兴业务领域首选的数据通道。消息队列 Kafka 是大数据的首选数据通道,微消息队列 MQTT 是移动互联网和物联网的数据通道,消息队列 RocketMQ 是传统业务领域的数据通道。消息服务 MNS 是 RocketMQ 轻量版,主要应用于应用集成领域,为平台型应用提供简单的队列服务。事件总线 Event Bridge 定位为云上事件枢纽,旨在阿里云上构建统一的事件中心。 阿里云消息队列产品矩阵完全构建在 RocketMQ 之上,基本实现了应用场景全覆盖,包括微服务解耦、SaaS 集成、物联网、大数据或日志收集生态,同时也在内部覆盖了阿里巴巴所有业务,在云上为数万阿里云企业提供了优质的消息服务。阿里云的消息产品矩阵涵盖了互联网、大数据、移动互联网等领域业务场景,为云原生客户提供不可或缺的一站式解决方案。 RocketMQ 在阿里云商业化历程中,一直致力于探索业务消息实践,也孵化了大量业务消息特性,并持续反哺到开源社区。 RocketMQ 4.0 业务消息探索之路 RocketMQ 在商业化过程中,陆续推出了四种消息类型来满足丰富的业务场景。 普通消息:普通消息提供极致弹性、海量堆积能力,内置重试与死信队列来满足业务对失败重试的需求,同时具备高吞吐、高可用、低延迟等特性,广泛应用于应用集成、异步解耦、削峰填谷等场景。 定时消息:提供秒级定时精度, 40 天超长定时,主要面向分布式定时调度、任务超时处理等场景,目前正在开源中。  顺序消息:支持全局与局部严格有序,从发送、存储到消费,保证端到端有序。面向有序事件处理、撮合交易、数据实时增量同步等场景。 事务消息:分布式、高性能、高可用的最终一致性事务解决方案,广泛应用于电商交易系统中服务的一致性协调场景并且已经开源。  RocketMQ 4.0 期间,商业和开源都致力于全方位拓展消息接入能力,使 RocketMQ 能够非常轻松地连接应用开源和云产品生态。比如商业上提供了多语言 SDK ,开源也有相应的 SDK 能够覆盖 Java、Go、Python 、C++使用 RocketMQ。同时支持 Spring 生态,能够通过 Spring Cloud 的方式使用 RocketMQ。商业上提供了一组非常简单易用的 HTTP API,提供了 67 种语言的实现。 除了 SDK 接入,RocketMQ 也在积极拥抱社区标准,在云产品侧提供了 AMQP 和 MQTT 的接入能力,其中 MQTT 已开源。 RocketMQ 也大力在发展 connector 生态,能够通过 RocketMQ connector 接入很多数据源,包括 Redis、MongoDB、Hudi 等大数据系统。 另外,阿里云构建的事件总线 EventBridge 也已开源,通过该产品能够将阿里云的云产品、SaaS 应用、自建数据平台的数据引入 RocketMQ。 RocketMQ 4.0 版本做了大量尝试,提供了全方位的消息接入能力。 RocketMQ 在服务阿里集团用户和商业化历程中,沉淀了大量领先的业务消息处理与服务能力。比如消息订阅方面,RocketMQ 支持集群分布式消费能力,也支持广播消费。在消息处理方面支持基于 Tag 和 SQL 做灵活过滤,其中基于 SQL 过滤是电商交易中非常重要的特性,能够支持在非常订阅比的情况下实现较低的投递比。 全球消息路由能力具备性能高、实时性强的特点。在云时代,数据中心天然分布在各个地域,各个地域之间还有 VPC 网络隔离。但是通过全球消息路由功能可以将地域与网络打通,能够满足更多业务场景。比如在阿里内部基于该能力实现了异地多活、异地容灾等企业级特性。 另外,全球消息路由具备非常高的易用性,提供了可视化任务管理界面,通过简单配置即可创建复制链路。 消息治理方面,RocketMQ 提供了访问控制、命名空间、实例限流、消息回放、重试消息、死信消息、堆积治理等能力。 服务能力方面,RocketMQ 经历了非常多沉淀,它在为交易链路服务了 12  年,参加了 10 年双 11,这也保证了 RocketMQ 能够在阿里云上提供非常高的可靠性。双 11 消息收发 TPS 峰值过亿,日消息收发总量超过 3 万亿。而即使在双十一万亿级数据洪峰下,消息也能做到 99.996% 毫秒级响应能力,消息发布平均响应时间不超过 3 毫秒,最大不超过 20 毫秒,真正实现了低延迟消息发布。 商业化初期,客户遇到最大难题是在分布式环境下如何完整地追踪异步消息链路。基于此背景,我们打造了可视化全生命周期消息轨迹追踪系统,能够提供丰富的消息查询、消息下载、定点重投、轨迹追踪能力,通过可观测系统帮助用户解决分布式环境中不可观测的问题。 如上图所示,一条消息从产生、发送至服务端存储到最终投递到消费者,整个发送和消费轨迹都有迹可循,包括投递给哪些消费者、哪些消费者在什么地方成功消费或者消费失败、何时进行重投,真正帮助客户解决了分布式观测难题。 除了功能特性,RocketMQ 在稳定性方面也做了很多建设。我们始终坚持,SLA 是云原生的根本,因此整个研发运维链路都有严格的稳定性保障措施: 架构开发:每个方案设计都会面向失败设计,代码开发阶段会有严格 Code Review 阶段,也会完整经历单元测试、集成测试、性能测试和容灾测试流程。 变更管理:有着非常严格的变更制度,要做到每个变更可灰度、可监控、可回滚、可降级。 稳定性防护:提供了限流、降级、容量评估、应急方案、大促保障等能力,会定期进行故障和预案演练,定期进行风险梳理。 体系化巡检:在云上有全方位的生产环境黑盒巡检。基于用户视角,会对全地域所有功能做全功能扫描,包含高达 50 多项检测项,任意项功能出问题都能立刻被监测到。在白盒巡检方面,会对 JVM 运行时指标、内核系统、集群指标进行巡检。 故障应急:有完整地故障应急流程,包括监控报警、故障发生、快速止血、排查根因、故障复盘。 RocketMQ 5.0 云原生架构升级之路 云原生时代,云上用户对云产品服务化程度、弹性能力、可控制性能力以及韧性都有了更高的要求。在此背景之下,我们对 RocketMQ 进行了云原生架构升级,这也是 RocketMQ 5.0 的诞生背景。 轻量级 SDK:基于云原生通信标准 gRPC 开发了一组轻量级 SDK,能够与当前富客户端优势互补。   无状态消息网关:在核心数据链路推出了无状态消息网关。通过搭建无状态服务节点Proxy,再通过 LB 进行服务暴露,将存储节点数据分离来独立负责核心消息存储和高可用。Proxy 与 Store 节点分离部署,独立弹性。  Leaderless 高可用架构:Store 节点身份完全对等,完全 Leaderless 化,去 ZK 和 HA 管控节点,能够做到非常高的可用性。同时相比传统的 Raft 一致性协议,该 Leaderless 架构能够做到副本数灵活选择,同步异步自动升降级,实现秒级故障转移。高可用架构目前已经完成开源并与 Dledger 进行了融合。  云原生基础设施:可观测验能力云原生化,OpenTelemetry 标准化。整体架构走向 Kubernetes 化,能够充分利用售卖区的资源弹性能力。 RocketMQ 4.0 推荐的接入方式主要是富客户端。富客户端提供了诸如客户端侧负载均衡、消息缓存、故障转移等一系列企业级特性。但在云原生时代,轻量级、高性能的客户端更容易被云原生技术栈所集成。 因此,RocketMQ 5.0 重磅推出了全新多语言轻量级 SDK,具有以下优势: 全新极简 API 设计:不可变 API,有完善的错误处理。多语言 SDK 保障 API 在 Native 层面对齐。同时引入了全新的 Simple Consumer,能够支持按消息模型进行消费,用户不再需要关心消息队列,只需要关注消息。  通信层采用 gRPC 协议:拥抱云原生通信标准,gRPC 能够使服务更易被集成。多语言 SDK 通信代码也可以通过 gRPC 快速生成,更 Native 。  轻量级实现:采用无状态消费模式,能够大幅降低客户端的实现复杂度。客户端更轻量,采用的应用也更容易被 Serverless化、Mesh 化。  云原生可观测性:客户端实现了 OpenTelemetry 标准,能够支持以 OpenTelemetry 形式导出 Metrics 与 Tracing。 RocketMQ 5.0 的另一个重大升级是引入了全新的无状态消费模型。该消费模型完全构建在原先的队列模型之上。队列模型是与存储模型一致的消费模型,消费者完全按照队列做负载均衡,也按照队列做消息拉取,非常适合批量高速拉取以及对单条消息状态不敏感的场景,比如流计算等。 RocketMQ 5.0 推出了 PoP 机制,巧妙地在队列模型之上构建了消息模型,实现了鱼与熊掌兼得。在此消息模型的设计上,业务可以只关心消息而无需关心队列,所有 API 都能够支持单条消息级别的消费、重试、修改不可见时间、删除。 在消息模型下,消息发送过来被存储后,即对消费者可见。消费者通过 Receive Message API 对消息进行消费后,消息进入定时不可见状态。消息超时过后又会重新处于可见状态,能被其他消费者继续消费。某消费者确认消息后,服务端会对该消息进行删除,随即不可见。 基于消息系模型的消费流程下,API 完全面向消息而不是面向队列。而当 PoP 机制遇见了无状态 Proxy,除了存储层,其他节点都是无状态的;客户端、连接和消费也是无状态的,可任意在 Proxy 节点上飘移,真正做到轻量级。 经过重构,RocketMQ 5.0 的可观测性也走向了云原生标准。 Metrics 侧: 指标涵盖丰富:设计了更丰富的指标,包含消息量、堆积量、各个阶段耗时等指标,每个指标从实例、Topic、消费 GroupID 多维度做聚合和展示。 消息团队实践模板:为用户提供实践模板,并持续迭代更新。 Prometheus + Grafana:Prometheus 标准数据格式,利用 Grafana 展示。除了模板,用户也可以自定义展示大盘。 Tracing 侧: OpenTelemetry Tracing 标准:RocketMQ Tracing 标准已经合并到 OpenTelemetry 开源标准,提供了规范和丰富的 messaging tracing 场景定义。 消息领域定制化展示:按照消息维度重新组织抽象的请求 span数据,展示一对多的消费,多次消费信息直观且方便理解。 可衔接 tracing 链路上下游:消息的 tracing 可继承调用上下文,补充到完整的调用链路中,消息链路信息串联了异步链路的上游和下游链路信息。 Logging 侧: Error Code 标准化:不同的错误有唯一的 Error Code。 Error Message 完整:包含完整的错误信息和排序所需要的资源信息。 Error Level 标准化:细化了各种不同错误信息的日志级别,用户可根据 Error、Warn 等级别配置更适合的监控告警。 弹性方面,RocketMQ 5.0 商业版能够充分撬动云的计算、存储和网络的池化资源。比如在计算方面,RocketMQ 5.0 所有工作负载完全部署在 ACK 之上,充分利用了 ACK 弹性能力,撬动 ACK 弹性资源。主要依赖 ACK 的两项技术,一是弹性资源池,另一个是 HPA 支持计算能力快速弹性。同时也会在 ACK 之上做跨可用区部署以提供高可用保障。 网络层面,RocketMQ 5.0 也会充分利用阿里云网络设施,为用户提供更便捷的网络访问能力。比如 RocketMQ 5.0 实例能够支持公网随开随用,需要依赖公网做测试的时候即开即用,测试完立即关闭,安全与方便兼具。同时支持多种私网类型的网络形态,包括 Single Tunnel、Private Link,另外也基于 CEN 构建了全球互通设计网络。 存储方面,RocketMQ 5.0 商业版率先引入多级存储概念,基于 OSS 构建二级存储,能够充分利用 OSS 存储的弹性能力,存储计费也转向了按量付费。而用户能够在 RocketMQ 之上自定义消息存储时长,比如将消息从 3 天有效时长延长至 30 天,能够真正将消息变为数据资产。同时利用二级存储能力,将冷热数据分离,为用户提供一致的冷读 SLA 。 RocketMQ 5.0 商业版发布预告 RocketMQ 4.0 历经了五年发展,开源和商业版本共同迈入了 5.0 时代。7 月底,阿里云消息队列将会基于开源版发布全新的 5.0 商业化版本。注:截止发稿前,RocketMQ 5.0 已经在阿里云消息队列 RocketMQ 产品上全新发布,目前支持国内主要地域。 RocketMQ 5.0 版相对于 4.0 版实例主要有以下几大改变: 第一,新版本、新售卖,更便宜。新版本采取了全新计量方式,有包年、包月型,也有按量付费和公网流量弹性计费。也有更全的售卖体系,比如新增专业版实例,能够满足部分用户需求。同时每个商品系列都新增了测试环境专用实例,能够方便用户以低成本的方式搭建自己的开发环境。 第二,更强弹性,降本提效利器。存储完全走向弹性,能够通过 Serverless 按需使用,按量付费。预留弹性,实例基础规格支持实时升降配,用户可以很方便地在流量到来之前做弹性。此外,专业版支持突发流量弹性,能够解决线上稳定性风险。 第三,全新架构,增强可观测运维。无状态消息消费模型能够解决一些老版本的痛点。同时在可观测上全面采取了云原生接入栈。 消息的全新形态:事件总线 EventBridge 事件总线 EventBridge 已经开源到 RocketMQ 社区中。云原生时代,事件无处不在,云计算资源散落在各地,各类生态孤岛随处可见。因此,以事件和事件驱动的方式来集成这一切是大势所趋。 基于此,阿里云推出了全新事件型产品 EventBridge。该产品构建在 RocketMQ 之上,是 RocketMQ 之上的一个事件驱动架构实践。 EventBridge 的事件源包括阿里云服务的管控事件比如资源变更事件、审计事件、配置变更事件,阿里云服务的数据事件,也包括自定义应用、SaaS 应用、自建数据平台、其他云厂商服务等。 事件经过 EventBridge 处理后会投递到事件目标,事件目标包括函数计算、消息服务、自建网关、HTTP(S)、短信、邮箱、钉钉等。 事件源到事件目标之间会经历完整的事件处理,包括事件源接入到 EB 后,可以对事件进行过滤、转换、归档、回放等。事件在 EventBridge 整个流程中也有完善的可观测性设计,包括事件查询、链路追踪。事件的接入方式非常丰富,可以通过 OpenAPI 来接入、7 种多语言 SDK、CloudEvents SDK、Web Console 和 Webhook 。 EventBridge 具有如下特点: 能够大幅度减少用户开发成本,用户无需额外开发,通过创建 EventBridge 源、事件目标、事件规则等资源即可实现事件架构。用户可以编写事件规则,对事件做过滤、转换。  提供原生 CloudEvents 支持,拥抱 CNCF 社区,能够无缝对接社区 SDK 。标准协议也能统一个阿里云事件规范。  事件 Schema 支持:能够支持事件 Schema 自动探测和校验,支持 Source 和 Target 的 Schema 绑定。  全球事件任意互通:组建了全球事件任意互通网络,组件了跨地域、跨账户的事件网络,能够支持跨云、跨数据中心的事件路由。 EventBridge在云上生态已经初具规模,已经集成了 255+ 云产品事件源和 1000+ 事件类型。 EventBridge率先对消息生态做了融合。阿里云的消息产品矩阵生态均通过 EventBridge 做了完全融合。任何一款消息产品与另一款消息产品的数据都能互通。同时,依靠 EventBridge 的全球事件网络,能够为所有消息产品赋予全球消息路由的能力。 EventBridge 目前已经在内部接入钉钉 ISV、聚石塔 ISV,外部也有 50+ SaaS 系统可以通过 Webhook 的方式接入。另外,海量事件源可以触达 10 多种事件目标,已经对接了全系云产品 API ,任何事件都可以驱动全量云产品 API。 作者介绍: 周新宇 Apache Member,Apache RocketMQ PMC Member,阿里云消息队列 RocketMQ 研发负责人。
作者:周新宇
#行业实践 #云原生

2022年8月29日

阿里云基于全新 RocketMQ 5.0 内核的落地实践
前言 在上个月结束的 RocketMQ Summit 全球开发者峰会中,Apache RocketMQ 社区发布了新一代 RocketMQ 的能力全景图,为众多开发者阐述 RocketMQ 5.0 这一大版本的技术定位与发展方向。 在过去七年大规模云计算实践中,RocketMQ 不断自我演进,今天,RocketMQ 正式迈进 5.0 时代。 从社区关于 5.0 版本的解读可以看到,在云原生以及企业全面上云的大潮下,为了更好地匹配业务开发者的诉求,Apache RocketMQ 做了很多的架构升级和产品化能力的适配。那么如何在企业的生产实践中落地 RocketMQ 5.0 呢?本篇文章的核心就消息架构以及产品能力的云原生化,介绍了阿里云是如何基于全新的 RocketMQ 5.0 内核做出自己的判断和演进,以及如何适配越来越多的企业客户在技术和能力方面的诉求。 云原生消息服务的演进方向 首先我们来看下云原生消息服务有哪些演进? 面向未来,适应云原生架构的消息产品能力应该在以下方面做出重要突破: 大规模弹性:企业上云的本质是解放资源供给的负担和压力,专注于业务的集成和发展。作为消息服务的运维方,应该为上层业务提供与模型匹配的资源供给能力,伴随业务流量的发展提供最贴合的弹性能力。一方面可以解决面向不确定突发流量的系统风险,另一方面也可以实现资源利用率的提升。  易用性:易用性是集成类中间件的重要能力,消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。低门槛才能打开市场,扩大心智和群体。   可观测性:可观测性对于消息服务的所有参与方来说都很重要,服务提供方应提供边界清晰、标准开放的观测诊断能力,这样才能解放消息运维方的负担,实现使用者自排查和边界责任的清晰化。  稳定性高 SLA:稳定性是生产系统必备的核心能力,消息来说往往集成在核心交易链路,消息系统应该明确服务的可用性、可靠性指标。使用方应基于明确的 SLA 去设计自己的故障兜底和冗余安全机制。 立足于这个四个关键的演进方向,下面为大家整体介绍一下阿里云 RocketMQ 5.0 在这些方面是如何落地实践的。 大规模弹性:提供匹配业务模型的最佳资源供给能力 消息服务一般集成在业务的核心链路,比如交易、支付等场景,这一类场景往往存在波动的业务流量,例如大促、秒杀、早高峰等。 面对波动的业务场景,阿里云 RocketMQ 5.0 的消息服务可以伴随业务的诉求进行自适应实现资源扩缩。一方面在比较稳定的业务处理基线范围内,按照最低的成本预留固定的资源;另一方面在偶尔存在的突发流量毛刺时,支持自适应弹性,按量使用,按需付费。两种模式相互结合,可以实现稳定安全的高水位运行,无需一直为不确定的流量峰值预留大量资源。 除了消息处理流量的弹性适应外,消息系统也是有状态的系统,存储了大量高价值的业务数据。当系统调用压力变化时,存储本身也需要具备弹性能力,一方面需要保障数据不丢失,另一方面还需要节省存储的成本,避免浪费。传统的基于本地磁盘的架构天然存在扩缩容问题,其一本地磁盘容量有限,当需要扩大容量时只能加节点,带来计算资源的浪费;其二本地磁盘无法动态缩容,只能基于业务侧流量的隔离下线才能缩减存储成本,操作非常复杂。 阿里云 RocketMQ 5.0 的消息存储具备天然的 Serverless 能力,存储空间按需使用,按量付费,业务人员只需要按照需求设置合理的 TTL 时间,即可保障长时间存储时的数据完整性。 集成易用性:简化业务开发,降低心智负担和理解成本 集成易用性是一种系统设计约束,要求消息服务应该从 API 设计到集成开发、再到配置运维,全面地降低用户的负担,避免犯错。举个典型场景,在消息队列例如 RocketMQ 4.x 版本或 Kafka 中,业务消费消息时往往被负载均衡策略所困扰,业务方需要关注当前消息主题的队列数(分区数)以及当前消费者的数量。因为消费者是按照队列粒度做负载均衡和任务分配,只要消费者能力不对等,或者数量不能平均分配,必然造成部分消费者堆积、无法恢复的问题。 在典型的业务集成场景,客户端其实只需要以无状态的消息模型进行消费,业务只需关心消息本身是否处理即可,而不应该关心内部的存储模型和策略。 阿里云 RocketMQ 5.0 正是基于这种思想提供了全新的 SimpleConsumer 模型,支持任意单条消息粒度的消费、重试和提交等原子能力。 可观测性:提供边界清晰、标准开放的自助诊断能力 有运维消息队列经验的同学都会发现,消息系统耦合了业务的上游生产和下游消费处理,往往业务侧出问题时无法清晰地界定是消息服务异常还是业务处理逻辑的异常。 阿里云 RocketMQ 5.0 的可观测性就是为这种模糊不确定的边界提供解法,以事件、轨迹、指标这三个方面为基础,依次从点、线、面的纬度覆盖链路中的所有细节。关于事件、轨迹、指标的定义涵盖如下内容: 事件:覆盖服务端的运维事件,例如宕机、重启、变更配置;客户端侧的变更事件,例如触发订阅、取消订阅、上线、下线等;   轨迹:覆盖消息或者调用链的生命周期,展示一条消息从生产到存储,最后到消费完成的整个过程,按时间轴抓出整个链路的所有参与方,锁定问题的范围;  指标:指标则是更大范围的观测和预警,量化消息系统的各种能力,例如收发 TPS、吞吐、流量、存储空间、失败率和成功率等。 阿里云 RocketMQ 在可观测性方面也是积累良多,不仅率先支持了完善的消息轨迹链路查询,而且在 5.0 新版本中还支持将客户端和服务端的 Trace、Metrics 信息以标准的 OpenTelemetry协议上报到第三方Trace、Metrics中存储,借助开源的 Prometheus 和 Grafana 等产品可以实现标准化的展示和分析。 稳定性 SLA:提供可评估、可量化、边界明确的服务保障能力 稳定性是生产系统必备的核心能力,消息系统往往集成在核心交易链路,消息系统是否稳定直接影响了业务是否完整和可用。但稳定性的保障本身并不只是运维管理,而是要从系统架构的设计阶段开始梳理,量化服务边界和服务指标,只有明确了服务的可用性和可靠性指标,使用方才能设计自己的故障兜底和冗余安全机制。 传统的基于运维手段的被动保障方式,只能做基本的扩缩容和系统指标监控,对于消息的各种复杂边界场景,例如消息堆积、冷读、广播等并不能很好的提供量化服务能力。一旦上层业务方触发这些场景,系统则会被打穿,从而丧失服务能力。 阿里云 RocketMQ 5.0 体系化的稳定性建设,是从系统设计阶段就提供对消息堆积、冷读等场景量化服务的能力,确定合理的消息发送 RT、端到端延迟和收发吞吐 TPS 能力等,一旦系统触发这些情况,可在承受范围内做限制和保护。 本篇文章从大规模弹性、集成易用性、可观测性和稳定性 SLA 等方面介绍了 RocketMQ 5.0 的演进和方向,同时针对性介绍了阿里云消息队列 RocketMQ 5.0 在这些方面的实践和落地。 阿里云消息队列 RocketMQ 5.0 目前已正式商业化,在功能、弹性、易用性和运维便捷性等方面进行了全面增强,同时定价相比上一代实例最高降低 50%,助力企业降本增效,以更低的门槛实现业务开发和集成。新一代实例支持 0~100 万 TPS 规模自伸缩、支持突发流量弹性和存储 Serverless;在可观测性方面,支持全链路轨迹集成和自定义 Metrics 集成;在集成易用性方面,支持新一代轻量原生多语言 SDK,更加稳定和易用。
#行业实践 #云原生

2022年7月17日

融合数据库生态:利用 EventBridge 构建 CDC 应用
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备  1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流  1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _ _ _ _ _ _ _ _ 感兴趣的小伙伴们可以扫描下方二维码加入钉钉群讨论(群号:44552972) 点击,进入 EventBridge 官网了解更多信息~
作者:昶风
#行业实践 #生态集成 #事件驱动架构

2022年6月26日

EventBridge 在 SaaS 企业集成领域的探索与实践
当下降本增效是各行各业的主题,而 SaaS 应用作为更快触达和服务业务场景的方式则被更多企业熟知和采用。随着国内 SaaS 商业环境的逐渐成熟,传统企业中各个部门的工程师和管理者,能迅速决定采购提升效率的 SaaS 产品,然后快速投入生产和使用。但是随着行业 SaaS 越来越多,如何解决各个 SaaS 系统的数据孤岛,如何将SaaS 应用数据与现有系统数据进行打通,已然变成了企业使用 SaaS 的瓶颈。因此,业内也广泛提出 B2B integration 企业集成的概念。 本文将结合实际业务场景讲述在 SaaS 行业的洞察与 SaaS 集成的探索实践。 什么是 SaaS SaaS 概述 SaaS(SoftwareasaService,软件即服务)源自于美国 Salesforce 公司(1999 年创立)创造的新软件服务模式。相比于传统软件,用户使用的 SaaS 软件,其数据保存在云端(国内有很多行业 SaaS 由于其数据敏感会单独部署在客户 IDC)。而且,SaaS 公司提供给客户的服务通常是按需租用的,比如按年缴纳使用费5年,第二年再续费,如果不满意也可以不续费,这会大大激发 SaaS 创业公司持续的打磨产品、持续的为客户提供更大价值的动力。 SaaS 典型分类 SaaS 分类比较繁琐,一般有两个分类维度。一个维度按照使用场景,另一个维度按照商业价值。 SaaS 一般分类为 通用 SaaS 和 行业 SaaS 两个基础类。 通用 SaaS 顾名思义是通用的,跨行业的,比如钉钉即时通讯或者某司的 HR 产品,由于使用场景更广,因而客群也会更多。 行业 SaaS 是在某个行业内使用的产品,比如餐饮企业 SaaS、电商 SaaS 等。 当然,还有第二个维度是工具 SaaS 和 商业 SaaS。 工具 SaaS,为客户企业提供一个提高管理效率的工具;商业 SaaS,除了提供一部分“工具”价值外,还能为客户企业提供增值价值,比如增加营收、获得资金等。 商业 SaaS 产品虽然风险更大,但在国内特色的商业环境、管理水平及人才结构下,更容易快速实现客户价值和自我价值。 SaaS 在中国的发展历程 对于 SaaS 领域来讲,云服务的普及提振了大家对 SaaS 服务稳定性和数据安全性的信心。同时,人口红利消退使得 SaaS 成本优势凸显。当下疫情环境也加快了市场教育,企业主转变思路,降本增效的需求显著上升。随着整个行业的渗透率加快,SaaS 场景和行业越做越深,SaaS 市场可以遇见在未来会有高速的增长。很多企业会在新业务场景使用 SaaS 服务,小步快跑试错,解决活下来的问题,而不是重复造轮子。 什么是 B2B ? B2B 即 BusinesstoBusiness (B2B) integration 是指将两个或者多个组织之间的业务流程和通讯自动化,通过自动化关键业务流程,实现不用应用和组织关系的打通,有效促进应用提供方和客户之间的数据打通与合作。 可以断言,随着 SaaS 行业逐渐渗透,企业集成的诉求会逐渐增多。数据同步、用户同步、接口同步的诉求会逐步增多,包括自建服务与 SaaS 服务的打通,SaaS 服务与 SaaS 服务的打通等。 SaaS 集成领域场景分析 随着行业类 SaaS 的逐渐丰富,在企业生产实践中,应用和应用的数据集成和互通变得至关重要。包括 API 集成,数据集成,流程集成等场景。 API 集成 通过 API 将 SaaS 应用的业务流程串联,现阶段大部分 SaaS 集成对接都是通过标准 API 协议实现的。源端采用 WebHook 机制推送到指定 HTTP 端点,目标端则采用类似 API 接口调用的方式,主动调取执行动作。 实现结构如下: 业内通用方案通几乎均为同步方案,通过 API + 中间网关对调用做解耦和映射。该方案的主要问题是调用追溯难;其次如出现上下游接口限制不一致问题,会导致下游调用大量失败。 数据集成 数据集成场景主要是企业自建系统和 SaaS 系统的打通。当企业使用的行业 SaaS 逐渐增多,数据一致性问题就会变的迫在眉睫。 大部分企业通常会面临云上数据导入/同步到 SaaS 应用的场景,而业内对 SaaS 应用的数据集成方案并没有类似 CDC 场景下的 Debezium 那么标准和通用。 企业在 SaaS 集成领域的痛点 接入成本高 对大量使用 SaaS 应用的企业来讲, SaaS 集成是必须做的基础建设。但是该部分基础建设通常会消耗大量人力,由于各个行业的 SaaS 百花齐放,通常很难使用一套架构满足全部集成场景。意味着通常情况下,企业使用每一款 SaaS 都会面临 SaaS 系统与自身系统集成的困难。 异构数据多 异构数据多是集成领域又一个比较典型的特点,异构数据通常有结构化数据,非机构化数据,半结构化数据。比如企业自建关系型数据库就是典型的结构化数据,但是要被其他 SaaS 系统集成通常是 Json 这种半结构数据入参。当然这部分内容可以通过定制代码搞定,但这个思路一定不是做消息枢纽的思路; 异构数据如何高效的统一处理其实是当前 SaaS 集成亟待解决的问题,也是最大的冲突点。 分发/路由困难 当很多集成需求同时涌现时,如何对已集成数据进行合理分发,会变成集成领域又一个难以解决的问题。每个细分场景甚至每个集成链路所需要的数据内容甚至数据类型都不一样。如果路由/分发无法完成,那么企业统一集成将无法实现。 集成追踪困难 当全部采用同步链路时,这里的集成状态追踪就会变成玄学,除非将链路接入 Tracing ,但是这部分又回产生高额的改造成本,同时多源 Tracing 的复杂相对于单链路会呈几何倍数的增加。 老系统迁移困难 老系统迁移主要是数据集成部分,如果将新老系统对接,并构建统一的应用网是当下企业构建 SaaS 建设的难点。企业迫切的需要一种能将"新"“老”应用联接起来的方式,打破企业应用发展的时间与空间界限,协同企业原有核心数据资产及创新应用,构建可平滑演进的企业IT架构。 EventBridge 一站式企业 SaaS 集成方案 针对业内 SaaS 系统集成的种种痛点,EventBridge 推出一站式企业 SaaS 集成方案。通过收敛 SaaS 集成痛点诉求,EventBridge 推出 API 集成方案和数据集成方案,打通应用与应用,云与应用的连接。 低代码集成平台 提供完全托管的集成平台服务,仅需在控制台进行简单配置即可在几分钟内完成应用集成。客户无需关心底层资源及实现细节即可打通云下到云上,SaaS 到 SaaS 的集成与连接,轻松完成异构数据接入。 金融级稳定性 满足不同客户企业级集成项目的要求,提供高可用性、灾难恢复、安全性、服务等级协议(SLA)和全方面的技术支持。 全方位的集成能力 支持各种集成场景,打通云上云下企业应用、物联网、设备及合作伙伴之间的信息孤岛。支持事件规则,事件路由等多种路由方式,实现跨云跨地域互通和信息共享。同时强大的链路追踪能力可以帮助企业快速排障。 开放的平台 拥抱 CloudEvents 社区,提供标准化的事件集成方案。提供丰富的开发者工具,拥有海量的生态伙伴及开发者,丰富开箱即用的连接器和应用组件可以帮助加速企业业务创新。 EventBridge 在 SaaS 领域的典型应用场景 SaaS 应用同步 应用同步是指在特定时间点将一组特定的事件从一个系统迁移到另一个系统的行为。事件同步模式允许开发人员创建数据自动迁移集成服务;业务人员和开发人员可以通过配置集成应用,自动化的将特定范围内的数据传递到下游应用;创建可重用的服务可以为开发和运营团队节省大量时间。 例如: 把销售机会数据从一个旧式 CRM (客户关系管理) 系统迁移到新的 CRM 实例; 把销售订单数据从一个 CRM 组织迁移到另一个组织; 从 ERP (企业资源计划) 同步产品主数据到 CRM 系统中。 事件广播 事件广播是在连续的、近实时或实时的基础上将事件从单个源系统移动到多个目标系统的行为。本质上,它是一对多的单向同步。通常,“单向同步”表示 1:1 关系。但是,广播模式也可以是 1:n(n 大于 1)的关系。 例如: 当一个销售机会在 CRM 中被标记为成功关单的时候,应在 ERP 中创建销售订单。 SaaS 应用通知 事件通知是指当 SaaS 应用发生某个类型的事件,可以通过钉钉,短信等通知方式告知用户。用户可及时获取到关键事件信息。 例如: 当一个销售机会在 CRM 中被标记为重要商机的时候,会及时通知给其他同事进行跟进并关注。 自建系统到云上迁移 EventBridge 支持云上数据库、云上消息队列、云产品事件对接 SaaS 系统,完善企业用户建设应用一张网的诉求,打破企业应用发展的时间和空间界限,协同企业原有核心资产与 SaaS 系统,构建可演进的企业 IT 架构。 例如: 当引入一个新的 SaaS 应用时,可通过 EventBridge 将数据库/大数据平台的核心资产(如人员信息等)同步至 SaaS 应用。
作者:肯梦
#行业实践 #生态集成

2022年5月17日

云钉一体:EventBridge 联合钉钉连接器打通云钉生态
背景 以事件集成阿里云,从 EventBridge 开始”是 EventBridge 这款云产品的愿景和使命。作为一款无服务器事件总线服务,EventBridge 从发布以来,以标准化的 CloudEvents 1.0 协议连接了大量云产品和云事件,用户可以通过 EventBridge 轻松访问云上事件,驱动云上生态。 截止目前为止,EventBridge 已集成 85+ 阿里云产品,提供了 941+ 事件类型,集成 50+ SaaS产品,通过事件规则可轻松驱动 10+ 阿里系一方云产品的计算力。 另一方面,钉钉生态空前繁荣,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等。通过事件将钉钉生态与阿里云生态联通,是践行「云钉一体」战略的重要途径,EventBridge 作为阿里云标准化的事件枢纽,其重要性不言而喻。 今天,EventBridge 联合钉钉连接器,打通了钉钉生态和阿里云生态,钉钉的生态伙伴可以通过通道的能力驱动阿里云上海量的计算力。 关键技术 EventBridge 集成阿里云和钉钉生态的方案,核心能力由钉钉连接器和 EventBridge 的 HTTP 事件源能力提供。 钉钉连接器 钉钉连接平台通过可视化拖拽配置、一键订阅等零代码方式,简单高效的实现钉钉、企业内部系统、知名厂商系统(金蝶、用友、SAP 等)、钉钉第三方企业应用之间数据互通和集成。 近期,钉钉连接器在「连接流」中发布了「HTTP Webhook」的执行动作能力,支持将钉钉生态开放给外部生态,EventBridge 正是通过该能力将钉钉生态接入到阿里云生态。 EventBridge HTTP 事件源 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP 事件源能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 具体而言,HTTP 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP  事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP  事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 在安全方面,HTTP 事件源不需要用户进行复杂的签名鉴权,支持 3 种类型开箱即用的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 应用场景 钉钉连接器市场有数百款连接器,包含官方连接器和第三方生态连接器。 官方连接器,来源主要是钉钉官方的应用,比如视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以将钉钉的官方数据流与其他系统做深度集成。 第三方连接器,来源主要是钉钉的生态合作伙伴,比如金蝶、行翼云、集简云、用友、易快报、销帮帮等。SaaS 厂商可以通过开放连接器来开放数据,与其它应用互联互通。 如上图所示,借助钉钉连接器,可以将钉钉官方事件源和钉钉 SaaS 事件源连接到阿里云 EventBridge,从而能驱动云上的弹性资源。SaaS 厂商能够借助 EventBridge 连接的能力快速构建云原生的 SaaS 应用,借助云的弹性能力,采用云原生最新的技术栈,快速高效地开发 SaaS 应用,同时利用 EventBridge 获取钉钉和其它 SaaS 应用的数据源,轻松进行业务创新。 当钉钉生态和 EventBridge 联通后,能产生哪些应用场景呢? 分析场景:企业借助 EventBridge 事件分析能力,对钉钉官方事件进行分析,快速洞察企业运转数据。比如审批效率,员工变更趋势、会议效率等。 通知场景:钉钉连接器 + EventBridge  可覆盖绝大多数消息通知场景,帮助企业用户快速感知 审批,员工变动,会议室信息等一些列企业基础支持系统。 集成场景:基于阿里云基础建设,可快速提升钉钉生态和企业内部数据的互通。例如当公司需要对钉钉和企业内部 IT 系统进行数据打通时,EventBridge 解决方案可以毫不费力地将建立在阿里云体系的 IT 系统连通起来,比如函数计算,云数据库,消息队列等连接扩展阿里云生态。 EDA 场景:使用 EventBridge 快速构建 EDA 驱动的自动化业务流程。例如在新员工入职时,获取员工变动信息。并集中推送到邮箱系统,业务支持系统(DB),CRM 系统等。对企业新员工权限账户进行一站式授权,较少重复机械的业务审批流程。 最佳实践:新增员工 0 代码入库 本章节介绍使用钉钉连接器和 EventBridge 的最佳实践,通过一个例子展示如何 0 代码将钉钉的一个新员工入职记录录入到自定义的数据库当中,企业可以根据该数据库搭建各类员工管理系统。 方案简介 整个方案涉及到钉钉、钉钉连接器、EventBridge、阿里云数据库等产品,整个链路如下图所示: 前置条件: 拥有一个钉钉账号,并创建一个团队成为管理员,并能登陆钉钉开放平台。 拥有一个阿里云账号,并开通 EventBridge 和阿里云数据库。 实践步骤 整个实践过程分为以下几个步骤。 1)创建事件总线和 HTTP 事件源 首先登陆 EventBridge 控制台,创建一个事件总线和 HTTP 事件源,如下图所示,可以先跳过规则和目标的创建。 创建完成后,进入事件总线的详情列表,获取 HTTP 事件源的公网「Webhook 地址」,如下图所示: 2)创建钉钉连接流 登陆钉钉开放平台,进入连接平台,在「我的连接」下创建连接流,在创建界面,选择触发器为「官方通讯录通讯录用户增加」。 连接流创建完成后,进入编辑页面,添加一个「HTTP Webhook」的节点,在「请求地址」一栏填入上个步骤获取到的「HTTP 事件源」地址。 3)钉钉触发新增员工事件 打开钉钉,进入团队,邀请另一个账号加入团队,然后进入事件总线的「事件追踪」页面,可以发现该员工新增事件已经投递到了事件总线之上。 该事件被转换成了一个「CloudEvents」格式,其「$.data.body」为事件的详情,包含 dingId, userId, department 等字段。 { "datacontenttype": "application/json", "aliyunaccountid": "1148", "data": { "headers": { }, "path": "/webhook/putEvents", "body": { "syncAction": "user_add_org", "orderInDepts": "{1:1762632512}", "dingId": "$::$5RUQhP/pK+4A==", "active": true, "avatar": "", "isAdmin": false, "userId": "141146379", "isHide": false, "isLeaderInDepts": "{1:false}", "isBoss": false, "isSenior": false, "name": "小明", "department": [ 1 ] }, "httpMethod": "POST", "queryString": {} }, "subject": "acs:eventbridge:cnhangzhou::eventbus//eventsource/my.dingtalk", "aliyunoriginalaccountid": "11848", "source": "my.dingtalk", "type": "eventbridge:Events:HTTPEvent", "aliyunpublishtime": "20220513T07:28:29.505Z", "specversion": "1.0", "aliyuneventbusname": "chenyangbus", "id": "7059131cb232c4c3592120ae", "time": "20220513T15:28:29.504+08:00", "aliyunregionid": "cnhangzhou", "aliyunpublishaddr": "..61.88" } 4)数据库创建员工表 通过 RDS 控制台购买一个实例,并创建好数据库,然后根据上述新增员工事件的格式,提取部分字段对数据库进行建表。 CREATE TABLE 'user_info' ( 'dingId' varchar(256) NULL, 'active' varchar(256) NULL, 'isAdmin' varchar(256) NULL, 'userId' varchar(256) NULL, 'name' varchar(256) NULL ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; 5)创建事件规则 数据库准备好后,返回 EventBridge 控制台,为第一步创建的事件总线创建事件规则,对「新增员工事件」进行转换并投递至数据库当中。 首先创建规则,过滤第一步创建的 HTTP 事件源。 然后选择 RDS 目标,做好参数映射。 6)触发事件入库 第三步触发事件时,因未配置规则和目标,事件没有被消费,故需要通过钉钉重新触发一次事件,然后从 EventBridge 控制台观察推送轨迹。 从轨迹中可以看出推送成功,然后通过 RDS 控制台可以查询到该条记录。 至此,一个钉钉团队新员工入职的记录通过 0 代码的方式入库到企业数据库当中,可以非常低的成本开发企业级管理应用。 _参考链接:_
作者:尘央
#行业实践 #生态集成

2022年4月20日

EventBridge 集成云服务实践
EvenBridge 集成概述 EventBridge 是阿里云所推出了一款无服务器事件总线,其目标是拓展事件生态,打破系统间的数据孤岛,建立事件集成生态。提供统一的事件标准化接入及管理能力,完善集成与被集成通路,帮助客户快速实现事件驱动的核心原子功能,可将 EventBridge 快速集成至 BPM、RPA、CRM 等系统。 EventBridge 通过事件标准化,接入标准化,组件标准化三个方向作为支点拓展 EventBridge 事件生态: 事件标准化:拥抱 CloudEvents 1.0 开源社区标准协议,原生支持 CloudEvents 社区 SDK 和 API,全面拥抱开源社区事件标准生态; 接入标准化:提供标准事件推送协议 PutEvent,并支持 Pull 和 Push 两种事件接入模型,可有效降低事件接入难度,提供云上完善的事件接入标准化流程; 组件标准化:封装标准的事件下游组件工具链体系,包括 Schema 注册、事件分析、事件检索、事件仪表盘等。提供完善的事件工具链生态。 在集成领域 EventBridge 重点打造事件集成和数据集成两类核心场景,下面将围绕这两类场景具体展开描述。 事件集成 目前 EventBridge 已经拥有 80+ 云产品的事件源,800+ 种事件类型。整个事件生态还正在逐步丰富中。 那么,EventBridge 如何实现云产品的事件集成呢? 首先在 EventBridge 控制台可以看见一个名为 default 的事件总线,云产品的事件都会投递到这个总线; 然后点击创建规则,就可以选择所关心的云产品以及它的相关事件进行事件的监听和投递。 下面以两个例子为例,来看下 EventBridge 事件集成的方式。 OSS 事件集成 以 OSS 事件源为例,来讲解一下如何集成 OSS 事件。 OSS 事件现在主要分为 4 类,操作审计相关、云监控相关、配置审计相关、以及云产品相关的事件例如 PutObject 上传文件等等。其他的云产品的事件源也类似,基本都可以分为这几个类型的事件。 下面演示一下事件驱动的在线文件解压服务: 在 OSS Bucket 下面会有一个  zip 文件夹存放需要解压的文件,一个 unzip 文件夹存放解压后的文件; 当上传文件到 OSS Bucket 之后,会触发文件上传的事件并投递到 EventBridge 的云服务专用总线; 然后会使用一个事件规则过滤 zip 这个 bucket 的事件并投递到解压服务的 HTTP Endpoint; 解压服务会在收到事件之后,根据事件里面的文件路径从 OSS 下载文件解压,并在解压之后将文件传到 unzip 目录下; 同时,还会有一个事件规则,监听 unzip 目录的文件上传事件,并将事件转换后推送到钉钉群。 一起来看下是如何实现的: 前往下方链接查看视频: 1)首先创建一个 bucket,下面有一个 zip 目录用于存放上传的压缩文件,一个 unzip 目录用于存放解压后的文件。 2) 部署解压服务,并且暴露公网访问的地址。 解压服务的源码地址为: 也可以使用 ASK 直接部署,yaml 文件地址为: 3)创建一个事件规则监听 zip 目录下的上传文件的事件,并投递到解压服务的 HTTP  Endpoint。 这里使用 subject,匹配 zip 目录。 4)再创建一个事件规则监听 unzip 目录的事件,投递解压事件到钉钉群。 这里同样使用 subject,匹配 unzip 目录。 对于变量和模板的配置可以参考官方文档 : 。 EventBridge 会通过 JSONPath 的方式从事件中提取参数,然后把这些值放到变量中,最后通过模板的定义渲染出最终的输出投递到事件目标。OSS 事件源的事件格式也可以参考官方文档 : _ _,并根据实际的业务需要使用 JSONPath 定义变量。5)最后,通过 oss 控制台上传一个文件进行验证。 可以看到刚刚上传的 eventbridge.zip 已经解压到并上传上来了,也可以在钉钉群里面,收到解压完成的通知。此外,还可以在事件追踪这边查看事件的内容已经投递的轨迹。 可以看到有两个上传事件:一个是通过控制台上传的事件,一个是解压文件后上传的事件。 可以查看轨迹,都成功投递到了解压服务的 HTTP Endpoint 以及钉钉机器人。 以自定义事件源以及云产品事件目标的方式集成云产品 刚才演示的 demo 是集成云服务的事件源,下面再通过一个 demo 看一下如何通过以自定义事件源以及云产品事件目标的方式集成云产品。 前往下方链接查看视频: 这个 demo 的最终效果是通过 EventBridge 自动进行数据的清洗,并投递到 RDS 中去。事件内容是一个 JSON,拥有两个字段一个名字一个年龄,现在希望将把大于 10 岁的用户过滤出来并存储到 RDS 中。 整体的架构如图所示,使用一个 MNS Queue 作为自定义事件源,并通过 EventBridge 过滤并转换事件最终直接输出到 RDS 中去。 1)首先已经创建好了一个 MNS Queue,创建好一个 RDS 实例以及数据库表,表结构如下所示: 2)创建一个自定事件总线,选择事件提供方为 MNS,队列为提前创建好的队列; 创建好了之后,我们就可以在事件源这里看见一个已经正在运行中的事件源; 3)接下来创建规则投递到 RDS 配置的事件模式内容如下: { "source": [ "my.user" ], "data": { "messageBody": { "age": [ { "numeric": [ "", 10 ] } ] } } } 数值匹配可以参考官方文档:   4) 点击下一步,选择事件目标为数据库,填写数据库信息,配置转化规则,完成创建。 5)最后,先用 MNS Queue 发送一个消息,这个的 age 是大于 10 的。 可以看见这条事件就输出到了 RDS 里面了。 下面再发一个小于 10 的消息到 MNS Queue。 这条事件就被过滤掉了,没有输出到 RDS。 也可通过事件追踪查看事件: 可以看到一条事件成功投递到了 RDS,一条事件被过滤掉了,没有进行投递。 数据集成 事件流是 EventBridge 为数据集成提供的一个更为轻量化、实时的端到端的事件流试的通道,主要目标是将事件在两个端点之间进行数据同步,同时提供过滤和转换的功能。目前已经支持阿里云各消息产品之间的事件流转。 不同于事件总线模型,在事件流中,并不需要事件总线,其 1:1 的模型更加的轻量,直接到目标的方式也让事件更加的实时;通过事件流,我们可以实现不同系统之间的协议转换,数据同步,跨地域备份的能力。 下面将通过一个例子讲解如何使用事件流,将 RocketMQ 的消息路由到 MNS Queue,将两个产品集成起来。 整体的结构如图所示,通过EventBridge 将 RocketMQ 中 TAG 为 MNS 的消息路由到 MNQ Queue。 一起看下怎么实现: 前往下方链接查看视频: 首先创建一个事件流,选择源 RocketMQ 实例,填写 Tag 为 mns。 事件模式内容留空表示匹配所有。 目标选择 MNS,选择目标队列完成创建。 完成创建之后,点击启动,启动事件流任务。 事件流启动完成之后,我们就可以通过控制台或者 SDK 发送消息到源 RocketMQ Topic 里面。当有 Tag 为 mns 的时候,我们可以看见消息路由到了 mns;当有 Tag 不为 mns 的时候,消息就不会路由到 mns。 总结 本篇文章主要向大家分享了通过 EventBridge 如何集成云产品事件源,如何集成云产品事件目标以及通过事件流如何集成消息产品.
作者:李凯(凯易)
#行业实践 #生态集成

2022年4月13日

基于 EventBridge 构建数据库应用集成
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持将阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,然后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件目标(Target)负责事件的处理终端与消费事件,是 EventBridge 的核心模块。针对市场上其他云厂商和垂直领域的 DB 服务,EventBridge 发布基于事件目标模块的数据库 Sink,提供简单且易于集成的 DB 落库能力,帮助开发者更加高效、便捷地实现业务上云。 数据库 Sink 概述 数据库 Sink 事件目标是 EventBridge 支持的事件目标的一种,主要能力是通过 EventBridge 将数据投递至指定数据库表中。 得益于 EventBridge 生态体系,数据库 Sink 支持众多接入方式: 阿里云云产品事件,EventBridge 支持云服务总线,通过简单配置即可直接对云服务相关事件进行入库操作; SaaS 应用事件,EventBridge 支持三方 SaaS 事件接入,支持对 SaaS 触发事件落库、查询; 用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端、HTTP Source 以及 CloudEvents 社区的开源客户端来完成接入。 数据库 Sink 能力重点聚焦在如何将 EventBridge 业务的半结构化 Json 数据转为结构化 SQL 语句,提供 LowCode 交互接入,帮助开发者一站式完成数据入库。 数据库 Sink 最佳实践 典型案例: 希望把一些 MNS 的消费消息或者 RocketMQ 的消费消息存储到指定的数据库表中,方便后面的数据分析和消息排查,也可以通过这种方式把数据新增到数据库表中; 通过 HTTP 的事件源把一些重要的日志或者是埋点数据直接存储到 DB 中,不需要经过用户业务系统,可以方便后续的客户场景分析。 使用介绍: 首先现阶段数据库 Sink For MySQL 支持两种方式:一种是基于阿里云的 RDS MySQL(VPC),另一种是用户自建的 MySQL(公网),可根据业务场景选择的不同方式接入。 步骤一 :点击事件规则并创建事件规则 步骤二 :选择事件源 可以选择阿里云官方或者自定义事件源 步骤三 :选择事件目标 1)在事件目标下面的服务类型选择数据库,这时会有两个选项就是一个是阿里云的 RDS MySQL,一个是自建 MySQL; 2)如果是阿里云 RDS MySQL,需要创建服务的关联角色。 3)授权以后就可以选择用户自己创建的 RDS MySQL 数据库的实例 ID 和数据库名称。 数据库账号和密码需手动填写,并发配置可以根据实际业务需要进行填写。因为 RDS MySQL 涉及到了跨地域访问,所以需要专有网络 VPC 的支持。 步骤四 :入库配置 入库配置支持快速配置与自定义 SQL 两种方式: 1)快速配置,支持 LowCode 方式快速选择入库内容。 2)自定义 SQL,支持自定义高级 SQL 语法。 步骤五:事件发布 当创建成功以后可以通过控制台进行事件发布: 步骤六 :事件状态追踪和查询 可以通过上个步骤中的事件 ID 可看到轨迹的详细信息,包括事件执行成功与否等信息。如果事件执行失败,会在页面展示异常信息。 通过事件追踪也可以看到详细的事件轨迹 : 总结 本文重点介绍 EventBridge 的新特性:数据库 Sink 事件目标。 作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据和消息产品业务数据整合到事件源生态中,提高了上云用户业务集成的便捷性,满足 Open API 与多语言 sdk 的支持,在此基础之上,通过 EventBridge 将数据投递至指定的数据库表中,为客户自身业务接入 EventBridge 提供了便利。 相关链接 [2] RDS 官方文档 [3] EventBridge 官方文档 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~
作者:赵海
#行业实践 #生态集成