2022年8月3日

解析 RocketMQ 业务消息——事务消息
引言:在分布式系统调用场景中存在这样一个通用问题,即在执行一个核心业务逻辑的同时,还需要调用多个下游做业务处理,而且要求多个下游业务和当前核心业务必须同时成功或者同时失败,进而避免部分成功和失败的不一致情况出现。简单来说,消息队列中的“事务”,主要解决的是消息生产者和消费者的数据一致性问题。本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。 点击下方链接,查看视频讲解: 场景:为什么需要事务消息 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括: 主分支订单系统状态更新:由未支付变更为支付成功; 物流系统状态新增:新增待发货物流记录,创建订单物流记录; 积分系统状态变更:变更用户积分,更新用户积分表; 购物车系统状态变更:清空购物车,更新用户购物车记录。  分布式系统调用的特点是:一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。 传统 XA 事务方案:性能不足 为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务,基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。 基于普通消息方案:一致性保障困难 将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。 该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如: 消息发送成功,订单没有执行成功,需要回滚整个事务; 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致; 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。 基于RocketMQ分布式事务消息:支持最终一致性 上述普通消息方案中,普通消息和订单事务无法保证一致的本质原因是普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于消息队列 RocketMQ 版实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 消息队列 RocketMQ 版事务消息的方案,具备高性能、可扩展、业务开发简单的优势。 基本原理 概念介绍 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;  半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;  消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。 事务消息生命周期 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态;  事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;  消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;  提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;  消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试;  消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。  消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。 事务消息基本流程 事务消息交互流程如下图所示: 1. 生产者将消息发送至 RocketMQ 服务端;  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息;  3. 生产者开始执行本地事务逻辑;  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下: 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者; 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;   6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。 实现细节:RocketMQ 事务消息如何实现 根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。 处理 Half 消息 发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图: 具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。 Commit 或 Rollback 命令处理 发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图: Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费;  Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。  具体实现在 EndTransactionProcessor 中。 事务消息 check 如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图: 事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。 另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。 具体实现参考: TransactionalMessageServiceImplcheck 方法。 实战:使用事务消息 了解了 RocketMQ 事务消息的原理后,我们看下如何使用事务。首先,我们需要创建一个 “事务消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 事务消息相比普通消息发送时需要修改以下几点: 发送事务消息前,需要开启事务并关联本地的事务执行。 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。  当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意: 1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟; 2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端; 3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。 
作者:合伯
#技术探索 #功能特性

2022年7月27日

RocketMQ 消息集成:多类型业务消息——定时消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。 本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。 点击下方链接,查看直播讲解: 概念:什么是定时消息 在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。 相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。 对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。 而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。 场景:为什么需要使用定时消息 在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求: 高性能吞吐:需要大量事件触发,不能有性能瓶颈。 高可靠可重试:不能丢失事件触发。 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。  传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。 这类方案虽然可以实现定时调度,但往往存在很多不足之处: 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。 在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。 案例:使用定时消息实现金融支付超时需求 利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。 基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。 原理:RocketMQ 定时消息如何实现 固定间隔定时消息 如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。 Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。 当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。 时间轮算法 在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。 如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。 现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间, 那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。 任意秒级定时消息 在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。 TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下: 对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。 TimerWheel 和 TimerLog 直接的关系如下图所示: TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。 当需要新增一条记录的时候,例如现在我们要新增一个 “14”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “13”,然后修改 lastPos 指向 “14”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。 有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。 首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去 然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮 对于入时间轮: TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理 TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中  对于出时间轮: TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息 TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。  实战:使用定时消息 了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。 当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。
作者:凯易、明锻
#技术探索 #功能特性

2022年7月22日

生于云、长于云,RocketMQ 5.0再出发
7 月 21 日7 月 22 日,由 Apache RocketMQ 社区主办,阿里云天池平台、云原生应用平台承办的首届 RocketMQ Summit 全球开发者峰会拉开帷幕。Apache RocketMQ 联合创始人林清山发布 RocketMQ 能力全景图,为众多开发者阐述 RocketMQ 5.0 的技术定位与发展方向,来自快手、小米、字节跳动等互联网头部企业的 40 位演讲嘉宾与众多开发者分享各自行业的最佳实践与技术探索经验。 阿里云云原生应用平台负责人丁宇表示,开源让云计算更加的标准化、云计算让开源产品化和规模化,未来的数字世界,将构建在云计算和开源之上。阿里巴巴将以开源的方式,践行开放共享好科技理念,把开源作为技术战略的重要组成部分。 今天,阿里巴巴的开源项目总数超过 3000 个,涵盖云计算、大数据、AI、中间件、数据库、容器、Serverless、高可用等领域,拥有超过 30000 名 Contributor,超过百万 Star,位列中国企业社区贡献榜首,连续十年蝉联中国厂商开源活跃度第一、影响力第一。未来,阿里云也将会持续投入 RocketMQ 的开源建设,构建更加繁荣的社区生态。希望与更多的开发者、贡献者一起,追求极致、开放共享,实现开源技术的普惠。 基于自身实践,RocketMQ 社区对于消息队列演进趋势的洞察 消息队列作为最经典的中间件之一,已经有三十多年历史。伴随着技术发展,消息队列领域不断扩展,迸发新生命力,作为国内大规模实践先行者,RocketMQ 社区认为消息领域将迎来以下趋势变化: 1. 全面拥抱云原生 消息队列将向上演进消息型的产品形态,更好去支撑微服务、事件驱动、Serverless 化等云原生应用架构;向下演进消息系统自身云原生架构,通过系统重构充分释放基础设施的弹性计算、存储、网络等能力,全方位提升消息技术指标,降低消息成本,提高消息队列弹性能力。 2. 全面拥抱物联网 物联网技术将更广泛的落地到各行各业,万物互联、边缘计算进一步拓展消息的边界。面向物联网的消息队列要海量异构设备接入,海量消息队列存储,能够随处运行,具备云边端一体的无边界部署能力。 3. 全面拥抱实时数据 企业的数字化转型的步伐不断加速,从业务数字化迈向数字业务化。数字化企业持续产生业务数据,对业务数据实时洞察与决策,才能帮助企业快速响应商机、把握商机,使得业务获得更大成功。同时,消息队列也将从在线业务架构的基础设施延伸到实时数据架构的基础设施,达到事务分析一体化。 四大方向,全面解读 RocketMQ 5.0 架构演进 1. 面向微服务 引入微服务架构,数字化企业以“高内聚、低耦合”的方式高效协作。微服务架构也带来新问题,比如大量同步微服务会面临延迟增大、可用性降低等风险。为了解决这个问题,越来越多的企业引入消息队列建设异步微服务体系,进一步提高微服务的韧性,降低响应延迟。 业界的微服务技术趋势,一方面是已经形成了事实标准,比如说像 Spring Cloud 体系,Dubbo 体系,通信协议有 HTTP、AMQP 等,另一方面下一代的微服务体系也在快速发展中,主要体现在基础设施下沉,比如 Servicemesh,Serverless 等技术。 在这个趋势下,RocketMQ 5.0, 在 SDK 层面将原来的重型客户端往轻量客户端演进,基于标准 gRPC 作为 remoting 层实现 SDK,同时也将更多客户端逻辑下沉到服务端,比如消息重试、负载均衡等,大幅度降低多语言 SDK 的实现成本。轻量客户端更好的匹配了 ServiceMesh 的需求,RocketMQ 的 Mesh 能力已正式合入 CNCF Envoy 官方社区。 在负载均衡方面,RocketMQ 从原来的队列粒度负载均衡演进到了消息粒度负载均衡模式,消息粒度负载均衡更加匹配 Serverless 应用的场景,无状态 Serverless 应用弹性伸缩过程不会触发频繁的队列重平衡,降低消息重复率和端到端延迟。 RocketMQ 5.0 提供无状态 proxy,通过 proxy 可以很方便的扩展更多标准消息协议以及流量治理功能。无状态 proxy 也具备良好的的网络穿透能力,可以灵活应对企业在上云过程中面临复杂跨网络访问场景。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 RocketMQ、MNS、RabbitMQ 等多款云消息产品。其中阿里云 RabbitMQ 是一款兼容 AMQP 协议、RabbitMQ SDK 的消息服务,可以帮助开源存量用户无缝上云。同时它也充分释放了底层 RocketMQ 云原生架构的技术红利,具备和 RocketMQ 一致的高性能、无限扩展、高可用等特点,是云原生的 RabbitMQ。 2. 事件驱动(EDA) 事件驱动在 18 年被 Gartner 评为年度十大技术趋势。在未来新型的数字化商业解决方案中,会有 60% 以上的商业数字化解决方案采纳 EDA 架构。EDA 为软件架构带来彻底解耦,实现更灵活的业务扩展和业务敏捷能力,不仅可以用于单一业务领域的微服务解耦,还可以用于跨部门、跨组织、跨业务领域的事件集成。消息队列是 EDA 架构中最核心的组件,承担 eventbroker 的职责。随着 EDA 架构被大规模跨组织的落地,要进一步提高行业级生产力,标准化也迫在眉睫。为此 CNCF 推出了 CloudEvent 规范,基于统一的规范,跨系统、跨组织的数字化协同有了共同的“语言”,能够实现更高效的系统集成,有了规范也方便沉淀面向事件的统一基础软件设施,提高研发效率。 面向 EDA 趋势,RocketMQ 5.0 发布全新产品形态——Eventbridge。整个领域模型以事件为中心,并拥抱 CloudEvent 规范,CloudEvent 社区开源 SDK 可无缝接入 Eventbridge。同时,还提供各种低代码事件编排、过滤、路由能力,灵活实现各种事件集成。 今天我们以 RocketMQ 5.0 核心能力为基础,支撑了阿里云 EventBridge 产品,助力云客户实现事件驱动、事件集成的商业生态。 3. 物联网 全球的 IoT 设备爆发式增长,预计到了 2025 年将达到 200 多亿台,。并且物联网也带来了边缘计算的兴起,未来将有 75% 的数据将在传统数据中心或云环境之外进行处理。目前物联网行业已经形成了多个标准协议,其中最流行莫过于 MQTT,这是"发布订阅"模式的消息协议,除此之外还有各种车联网协议、工业协议等等,物联网消息队列要具备多样化异构海量设备接入能力。RocketMQ 可作为物联网应用的基础通信设施,用于 IoT、移动设备的数据上报,还有指令下行,为 IoT 业务连接云边端。 面向 IoT 的趋势,RocketMQ 5.0 发布轻量级百万队列引擎,轻量元数据服务。在新存储内核之上,建设物联网形态消息队列 MQTT,支持标准物联网协议,支持海量物联网设备接入和海量队列存储。 RocketMQ 5.0 遵循零外部依赖的精简架构原则,新 HA 架构为低资源消耗场景提供更多选择,用户可以权衡可靠性、成本、可用性,选择最优副本策略。比如边缘场景由于资源受限,RocketMQ 不一定需要提供三副本存储,可以是 2 副本,甚至是单副本就能满足业务需求。 今天以 RocketMQ 5.0 核心能力为基础,支撑了阿里云微消息队列 MQTT,为客户提供云端一体化消息解决方案,实现万物互联、云端互联。 4. 实时大数据 未来大数据将走向实时化,预测在 2025 年实时大数据的比例将达到 30%。数字化企业通过实时感知、实时分析、实时决策,能够抓住商机、快速响应用户,实时大数据的重要性愈发突出。消息队列是实时大数据的关键技术之一,作为事件流的核心存储,它承担数据的分发,数据的缓冲,还有轻量的流处理的作用。 事件流技术越来越多的在 IoT 场景进行使用,IDC 预测未来 95% 的实时事件流将来自IoT场景;另外有越来越多的交易事件需要进行实时分析,挖掘更多业务价值,事件流技术也开始呈现事务分析一体化的趋势。 面向事件流的趋势,RocketMQ 5.0 在流存储和流分析能力进行重点打造: 流存储方面,支持批量索引,大幅度提高 RocketMQ 吞吐量。支持 compacttopic,用于实现流处理过程中的状态存储,零外部依赖。除了功能特性之外,RocketMQ 5.0 的流存储同时进行了云原生架构改造,引入逻辑队列的概念,解耦了数据逻辑分区跟物理存储之间的绑定关系,能够实现全局固定分区前提下进行无缝扩缩容,零数据迁移。 流分析方面,RocketMQ 5.0 全新发布了轻量的流计算引擎,它可以兼容 flink SQL,方便用户在不同场景无缝切换。如果用户需要大而全流计算能力,可以使用大型计算平台。如果用户有边缘计算、资源受限、简单流处理场景可以直接使用 RocketMQ 的 RSQL 来支持。 今天我们以 RocketMQ 5.0 事件流能力为基础,支撑了阿里云 Kafka 产品,存量 Kafka 用户实现无缝上云。同时基于 RocketMQ 的逻辑队列能力,阿里云 Kafka 具备快速弹性伸缩能力,提供了 Serverless 化的产品形态。让存量 Kafka 用户也能够体验到云原生架构的红利。 不断演进,RocketMQ 正式迈进 5.0 时代 在过去七年大规模云计算实践中,RocketMQ 不停自我演进。今天 RocketMQ 正式迈进了 5.0 的时代。从互联网业务消息中间扩展到“消息、事件、流”超融合处理平台,解锁了更全面能力。 在消息领域,全面拥抱云原生技术,以获得更好的弹性伸缩。在事件领域产品形态进行全面升级,拥抱行业标准,让事件驱动的架构无处不在,从单一业务的数字化系统扩展到跨组织跨业务的数字化商业生态事件驱动的架构,也同时让云计算原生的技术能够更大规模的落地,提高云产品跟用户业务的集中度。让 Serverless 的技术能够被更大范围的采纳,帮助企业客户去降本增效。在流存储和流计算领域,流存储增强批量的特性,大幅度提高 RocketMQ 数据吞吐量,新增逻辑队列能力,解耦逻辑资源跟物理资源,在流场景也具备无缝伸缩能力;新增轻量流处理引擎也提供了实时事件流处理、流分析能力。 RocketMQ 基于端云一体化架构,实现完整物联网消息队列能力,从原来连接应用扩展到连接物联网设备。同时 RocketMQ 5.0 也继续保持着极简架构原则,即便产品能力全面提升,也依然能够以最低资源消耗,最低运维代价去搭建服务。 现在 RocketMQ 已经真正具备连接一切,随处运行的能力,提供云、边端一体化实时数据解决方案。物联网设备持续的产生的数据,边缘 RocketMQ 可以进行实时数据分析,快速响应业务。通过实时 ETL,实时决策产生的高价值事件,或者数据可以传输到云端,通过 RocketMQ  eventing 能力连接更强大的公有云平台,利用云的一站式平台技术,进一步放大每份数据的价值。 不止于开源,RocketMQ 赋能海量行业客户 今天,基于 RocketMQ 5.0 为内核,阿里云也打造一站式消息平台,在统一云原生消息内核基础上,提供 6 种消息产品形态,有面向 IoT 场景的微消息队列 MQTT,有面向 EDA 场景的 EventBridge,有面向开源用户无缝上云的托管开源产品如 Kafka、RabbitMQ、RocketMQ。 通过多样化产品形态,RocketMQ 在阿里云上面已服务数万个企业用户,帮助其完成数字化转型的同时。RocketMQ 也得到业界的广泛认可。近期获得多个奖项,包括 OSCHINA 优秀技术团队奖、中国开源云联盟优秀基础软件、中国科学技术协会颁布的科创中国开源创新榜等,并进入 Apache 中国开源项目领导者象限。RocketMQ 成为第一个通过信通院可信云分布式消息队列服务的“增强级”认证,第一个通过信通院金融级稳定性评测的“先进级”认证。 随着潜在用户数的增大,RocketMQ 的商业价值也被进一步的放大。目前已经有十家的云厂商提供 Apache RocketMQ 的商业服务,它几乎覆盖了国内主流的公共云厂商。这样 RocketMQ 的用户就有了更多的选择,真正实现无厂商锁定,RocketMQ 已经成为原生消息的事实标准。 万物皆云的时代,RocketMQ 让数字化转型更简单高效,也将消息、事件、流的价值最大程度释放。Apache RocketMQ 将不断推动技术演进与落地实践,帮助企业真正实现高质量数字化转型与创新。
#社区动态

2022年7月17日

融合数据库生态:利用 EventBridge 构建 CDC 应用
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备  1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流  1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _ _ _ _ _ _ _ _ 感兴趣的小伙伴们可以扫描下方二维码加入钉钉群讨论(群号:44552972) 点击,进入 EventBridge 官网了解更多信息~
作者:昶风
#行业实践 #生态集成 #事件驱动架构

2022年7月7日

RocketMQ 消息集成:多类型业务消息-普通消息
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备   1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _[1] DTS:_ _[2] Debezium:_ _[3] Canal:_ _[4] Maxwell:_ _[5] DTS 数据订阅:_ _[6] 事件总线:_ _[7] 事件流:_ _[8] SUBSCRIBE 消费模式:_
作者:昶风
#技术探索 #功能特性

2022年6月26日

EventBridge 在 SaaS 企业集成领域的探索与实践
当下降本增效是各行各业的主题,而 SaaS 应用作为更快触达和服务业务场景的方式则被更多企业熟知和采用。随着国内 SaaS 商业环境的逐渐成熟,传统企业中各个部门的工程师和管理者,能迅速决定采购提升效率的 SaaS 产品,然后快速投入生产和使用。但是随着行业 SaaS 越来越多,如何解决各个 SaaS 系统的数据孤岛,如何将SaaS 应用数据与现有系统数据进行打通,已然变成了企业使用 SaaS 的瓶颈。因此,业内也广泛提出 B2B integration 企业集成的概念。 本文将结合实际业务场景讲述在 SaaS 行业的洞察与 SaaS 集成的探索实践。 什么是 SaaS SaaS 概述 SaaS(SoftwareasaService,软件即服务)源自于美国 Salesforce 公司(1999 年创立)创造的新软件服务模式。相比于传统软件,用户使用的 SaaS 软件,其数据保存在云端(国内有很多行业 SaaS 由于其数据敏感会单独部署在客户 IDC)。而且,SaaS 公司提供给客户的服务通常是按需租用的,比如按年缴纳使用费5年,第二年再续费,如果不满意也可以不续费,这会大大激发 SaaS 创业公司持续的打磨产品、持续的为客户提供更大价值的动力。 SaaS 典型分类 SaaS 分类比较繁琐,一般有两个分类维度。一个维度按照使用场景,另一个维度按照商业价值。 SaaS 一般分类为 通用 SaaS 和 行业 SaaS 两个基础类。 通用 SaaS 顾名思义是通用的,跨行业的,比如钉钉即时通讯或者某司的 HR 产品,由于使用场景更广,因而客群也会更多。 行业 SaaS 是在某个行业内使用的产品,比如餐饮企业 SaaS、电商 SaaS 等。 当然,还有第二个维度是工具 SaaS 和 商业 SaaS。 工具 SaaS,为客户企业提供一个提高管理效率的工具;商业 SaaS,除了提供一部分“工具”价值外,还能为客户企业提供增值价值,比如增加营收、获得资金等。 商业 SaaS 产品虽然风险更大,但在国内特色的商业环境、管理水平及人才结构下,更容易快速实现客户价值和自我价值。 SaaS 在中国的发展历程 对于 SaaS 领域来讲,云服务的普及提振了大家对 SaaS 服务稳定性和数据安全性的信心。同时,人口红利消退使得 SaaS 成本优势凸显。当下疫情环境也加快了市场教育,企业主转变思路,降本增效的需求显著上升。随着整个行业的渗透率加快,SaaS 场景和行业越做越深,SaaS 市场可以遇见在未来会有高速的增长。很多企业会在新业务场景使用 SaaS 服务,小步快跑试错,解决活下来的问题,而不是重复造轮子。 什么是 B2B ? B2B 即 BusinesstoBusiness (B2B) integration 是指将两个或者多个组织之间的业务流程和通讯自动化,通过自动化关键业务流程,实现不用应用和组织关系的打通,有效促进应用提供方和客户之间的数据打通与合作。 可以断言,随着 SaaS 行业逐渐渗透,企业集成的诉求会逐渐增多。数据同步、用户同步、接口同步的诉求会逐步增多,包括自建服务与 SaaS 服务的打通,SaaS 服务与 SaaS 服务的打通等。 SaaS 集成领域场景分析 随着行业类 SaaS 的逐渐丰富,在企业生产实践中,应用和应用的数据集成和互通变得至关重要。包括 API 集成,数据集成,流程集成等场景。 API 集成 通过 API 将 SaaS 应用的业务流程串联,现阶段大部分 SaaS 集成对接都是通过标准 API 协议实现的。源端采用 WebHook 机制推送到指定 HTTP 端点,目标端则采用类似 API 接口调用的方式,主动调取执行动作。 实现结构如下: 业内通用方案通几乎均为同步方案,通过 API + 中间网关对调用做解耦和映射。该方案的主要问题是调用追溯难;其次如出现上下游接口限制不一致问题,会导致下游调用大量失败。 数据集成 数据集成场景主要是企业自建系统和 SaaS 系统的打通。当企业使用的行业 SaaS 逐渐增多,数据一致性问题就会变的迫在眉睫。 大部分企业通常会面临云上数据导入/同步到 SaaS 应用的场景,而业内对 SaaS 应用的数据集成方案并没有类似 CDC 场景下的 Debezium 那么标准和通用。 企业在 SaaS 集成领域的痛点 接入成本高 对大量使用 SaaS 应用的企业来讲, SaaS 集成是必须做的基础建设。但是该部分基础建设通常会消耗大量人力,由于各个行业的 SaaS 百花齐放,通常很难使用一套架构满足全部集成场景。意味着通常情况下,企业使用每一款 SaaS 都会面临 SaaS 系统与自身系统集成的困难。 异构数据多 异构数据多是集成领域又一个比较典型的特点,异构数据通常有结构化数据,非机构化数据,半结构化数据。比如企业自建关系型数据库就是典型的结构化数据,但是要被其他 SaaS 系统集成通常是 Json 这种半结构数据入参。当然这部分内容可以通过定制代码搞定,但这个思路一定不是做消息枢纽的思路; 异构数据如何高效的统一处理其实是当前 SaaS 集成亟待解决的问题,也是最大的冲突点。 分发/路由困难 当很多集成需求同时涌现时,如何对已集成数据进行合理分发,会变成集成领域又一个难以解决的问题。每个细分场景甚至每个集成链路所需要的数据内容甚至数据类型都不一样。如果路由/分发无法完成,那么企业统一集成将无法实现。 集成追踪困难 当全部采用同步链路时,这里的集成状态追踪就会变成玄学,除非将链路接入 Tracing ,但是这部分又回产生高额的改造成本,同时多源 Tracing 的复杂相对于单链路会呈几何倍数的增加。 老系统迁移困难 老系统迁移主要是数据集成部分,如果将新老系统对接,并构建统一的应用网是当下企业构建 SaaS 建设的难点。企业迫切的需要一种能将"新"“老”应用联接起来的方式,打破企业应用发展的时间与空间界限,协同企业原有核心数据资产及创新应用,构建可平滑演进的企业IT架构。 EventBridge 一站式企业 SaaS 集成方案 针对业内 SaaS 系统集成的种种痛点,EventBridge 推出一站式企业 SaaS 集成方案。通过收敛 SaaS 集成痛点诉求,EventBridge 推出 API 集成方案和数据集成方案,打通应用与应用,云与应用的连接。 低代码集成平台 提供完全托管的集成平台服务,仅需在控制台进行简单配置即可在几分钟内完成应用集成。客户无需关心底层资源及实现细节即可打通云下到云上,SaaS 到 SaaS 的集成与连接,轻松完成异构数据接入。 金融级稳定性 满足不同客户企业级集成项目的要求,提供高可用性、灾难恢复、安全性、服务等级协议(SLA)和全方面的技术支持。 全方位的集成能力 支持各种集成场景,打通云上云下企业应用、物联网、设备及合作伙伴之间的信息孤岛。支持事件规则,事件路由等多种路由方式,实现跨云跨地域互通和信息共享。同时强大的链路追踪能力可以帮助企业快速排障。 开放的平台 拥抱 CloudEvents 社区,提供标准化的事件集成方案。提供丰富的开发者工具,拥有海量的生态伙伴及开发者,丰富开箱即用的连接器和应用组件可以帮助加速企业业务创新。 EventBridge 在 SaaS 领域的典型应用场景 SaaS 应用同步 应用同步是指在特定时间点将一组特定的事件从一个系统迁移到另一个系统的行为。事件同步模式允许开发人员创建数据自动迁移集成服务;业务人员和开发人员可以通过配置集成应用,自动化的将特定范围内的数据传递到下游应用;创建可重用的服务可以为开发和运营团队节省大量时间。 例如: 把销售机会数据从一个旧式 CRM (客户关系管理) 系统迁移到新的 CRM 实例; 把销售订单数据从一个 CRM 组织迁移到另一个组织; 从 ERP (企业资源计划) 同步产品主数据到 CRM 系统中。 事件广播 事件广播是在连续的、近实时或实时的基础上将事件从单个源系统移动到多个目标系统的行为。本质上,它是一对多的单向同步。通常,“单向同步”表示 1:1 关系。但是,广播模式也可以是 1:n(n 大于 1)的关系。 例如: 当一个销售机会在 CRM 中被标记为成功关单的时候,应在 ERP 中创建销售订单。 SaaS 应用通知 事件通知是指当 SaaS 应用发生某个类型的事件,可以通过钉钉,短信等通知方式告知用户。用户可及时获取到关键事件信息。 例如: 当一个销售机会在 CRM 中被标记为重要商机的时候,会及时通知给其他同事进行跟进并关注。 自建系统到云上迁移 EventBridge 支持云上数据库、云上消息队列、云产品事件对接 SaaS 系统,完善企业用户建设应用一张网的诉求,打破企业应用发展的时间和空间界限,协同企业原有核心资产与 SaaS 系统,构建可演进的企业 IT 架构。 例如: 当引入一个新的 SaaS 应用时,可通过 EventBridge 将数据库/大数据平台的核心资产(如人员信息等)同步至 SaaS 应用。
作者:肯梦
#行业实践 #生态集成

2022年5月25日

漫画 | 新一代软件架构会影响到谁?
周末的晚上,张大胖照例要去 Hello World 咖啡馆,没想到在这里碰到了好几个老伙计。
#技术探索

2022年5月17日

云钉一体:EventBridge 联合钉钉连接器打通云钉生态
背景 以事件集成阿里云,从 EventBridge 开始”是 EventBridge 这款云产品的愿景和使命。作为一款无服务器事件总线服务,EventBridge 从发布以来,以标准化的 CloudEvents 1.0 协议连接了大量云产品和云事件,用户可以通过 EventBridge 轻松访问云上事件,驱动云上生态。 截止目前为止,EventBridge 已集成 85+ 阿里云产品,提供了 941+ 事件类型,集成 50+ SaaS产品,通过事件规则可轻松驱动 10+ 阿里系一方云产品的计算力。 另一方面,钉钉生态空前繁荣,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等。通过事件将钉钉生态与阿里云生态联通,是践行「云钉一体」战略的重要途径,EventBridge 作为阿里云标准化的事件枢纽,其重要性不言而喻。 今天,EventBridge 联合钉钉连接器,打通了钉钉生态和阿里云生态,钉钉的生态伙伴可以通过通道的能力驱动阿里云上海量的计算力。 关键技术 EventBridge 集成阿里云和钉钉生态的方案,核心能力由钉钉连接器和 EventBridge 的 HTTP 事件源能力提供。 钉钉连接器 钉钉连接平台通过可视化拖拽配置、一键订阅等零代码方式,简单高效的实现钉钉、企业内部系统、知名厂商系统(金蝶、用友、SAP 等)、钉钉第三方企业应用之间数据互通和集成。 近期,钉钉连接器在「连接流」中发布了「HTTP Webhook」的执行动作能力,支持将钉钉生态开放给外部生态,EventBridge 正是通过该能力将钉钉生态接入到阿里云生态。 EventBridge HTTP 事件源 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP 事件源能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 具体而言,HTTP 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP  事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP  事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 在安全方面,HTTP 事件源不需要用户进行复杂的签名鉴权,支持 3 种类型开箱即用的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 应用场景 钉钉连接器市场有数百款连接器,包含官方连接器和第三方生态连接器。 官方连接器,来源主要是钉钉官方的应用,比如视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以将钉钉的官方数据流与其他系统做深度集成。 第三方连接器,来源主要是钉钉的生态合作伙伴,比如金蝶、行翼云、集简云、用友、易快报、销帮帮等。SaaS 厂商可以通过开放连接器来开放数据,与其它应用互联互通。 如上图所示,借助钉钉连接器,可以将钉钉官方事件源和钉钉 SaaS 事件源连接到阿里云 EventBridge,从而能驱动云上的弹性资源。SaaS 厂商能够借助 EventBridge 连接的能力快速构建云原生的 SaaS 应用,借助云的弹性能力,采用云原生最新的技术栈,快速高效地开发 SaaS 应用,同时利用 EventBridge 获取钉钉和其它 SaaS 应用的数据源,轻松进行业务创新。 当钉钉生态和 EventBridge 联通后,能产生哪些应用场景呢? 分析场景:企业借助 EventBridge 事件分析能力,对钉钉官方事件进行分析,快速洞察企业运转数据。比如审批效率,员工变更趋势、会议效率等。 通知场景:钉钉连接器 + EventBridge  可覆盖绝大多数消息通知场景,帮助企业用户快速感知 审批,员工变动,会议室信息等一些列企业基础支持系统。 集成场景:基于阿里云基础建设,可快速提升钉钉生态和企业内部数据的互通。例如当公司需要对钉钉和企业内部 IT 系统进行数据打通时,EventBridge 解决方案可以毫不费力地将建立在阿里云体系的 IT 系统连通起来,比如函数计算,云数据库,消息队列等连接扩展阿里云生态。 EDA 场景:使用 EventBridge 快速构建 EDA 驱动的自动化业务流程。例如在新员工入职时,获取员工变动信息。并集中推送到邮箱系统,业务支持系统(DB),CRM 系统等。对企业新员工权限账户进行一站式授权,较少重复机械的业务审批流程。 最佳实践:新增员工 0 代码入库 本章节介绍使用钉钉连接器和 EventBridge 的最佳实践,通过一个例子展示如何 0 代码将钉钉的一个新员工入职记录录入到自定义的数据库当中,企业可以根据该数据库搭建各类员工管理系统。 方案简介 整个方案涉及到钉钉、钉钉连接器、EventBridge、阿里云数据库等产品,整个链路如下图所示: 前置条件: 拥有一个钉钉账号,并创建一个团队成为管理员,并能登陆钉钉开放平台。 拥有一个阿里云账号,并开通 EventBridge 和阿里云数据库。 实践步骤 整个实践过程分为以下几个步骤。 1)创建事件总线和 HTTP 事件源 首先登陆 EventBridge 控制台,创建一个事件总线和 HTTP 事件源,如下图所示,可以先跳过规则和目标的创建。 创建完成后,进入事件总线的详情列表,获取 HTTP 事件源的公网「Webhook 地址」,如下图所示: 2)创建钉钉连接流 登陆钉钉开放平台,进入连接平台,在「我的连接」下创建连接流,在创建界面,选择触发器为「官方通讯录通讯录用户增加」。 连接流创建完成后,进入编辑页面,添加一个「HTTP Webhook」的节点,在「请求地址」一栏填入上个步骤获取到的「HTTP 事件源」地址。 3)钉钉触发新增员工事件 打开钉钉,进入团队,邀请另一个账号加入团队,然后进入事件总线的「事件追踪」页面,可以发现该员工新增事件已经投递到了事件总线之上。 该事件被转换成了一个「CloudEvents」格式,其「$.data.body」为事件的详情,包含 dingId, userId, department 等字段。 { "datacontenttype": "application/json", "aliyunaccountid": "1148", "data": { "headers": { }, "path": "/webhook/putEvents", "body": { "syncAction": "user_add_org", "orderInDepts": "{1:1762632512}", "dingId": "$::$5RUQhP/pK+4A==", "active": true, "avatar": "", "isAdmin": false, "userId": "141146379", "isHide": false, "isLeaderInDepts": "{1:false}", "isBoss": false, "isSenior": false, "name": "小明", "department": [ 1 ] }, "httpMethod": "POST", "queryString": {} }, "subject": "acs:eventbridge:cnhangzhou::eventbus//eventsource/my.dingtalk", "aliyunoriginalaccountid": "11848", "source": "my.dingtalk", "type": "eventbridge:Events:HTTPEvent", "aliyunpublishtime": "20220513T07:28:29.505Z", "specversion": "1.0", "aliyuneventbusname": "chenyangbus", "id": "7059131cb232c4c3592120ae", "time": "20220513T15:28:29.504+08:00", "aliyunregionid": "cnhangzhou", "aliyunpublishaddr": "..61.88" } 4)数据库创建员工表 通过 RDS 控制台购买一个实例,并创建好数据库,然后根据上述新增员工事件的格式,提取部分字段对数据库进行建表。 CREATE TABLE 'user_info' ( 'dingId' varchar(256) NULL, 'active' varchar(256) NULL, 'isAdmin' varchar(256) NULL, 'userId' varchar(256) NULL, 'name' varchar(256) NULL ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; 5)创建事件规则 数据库准备好后,返回 EventBridge 控制台,为第一步创建的事件总线创建事件规则,对「新增员工事件」进行转换并投递至数据库当中。 首先创建规则,过滤第一步创建的 HTTP 事件源。 然后选择 RDS 目标,做好参数映射。 6)触发事件入库 第三步触发事件时,因未配置规则和目标,事件没有被消费,故需要通过钉钉重新触发一次事件,然后从 EventBridge 控制台观察推送轨迹。 从轨迹中可以看出推送成功,然后通过 RDS 控制台可以查询到该条记录。 至此,一个钉钉团队新员工入职的记录通过 0 代码的方式入库到企业数据库当中,可以非常低的成本开发企业级管理应用。 _参考链接:_
作者:尘央
#行业实践 #生态集成

2022年5月15日

【科普】如果程序员穿越到古代当皇帝,会发生什么?
点击查看科普小视频~ 作为 Gartner 定义的十大战略技术趋势之一,事件驱动架构(EDA)逐渐成为主流技术架构。根据 Gartner 的预估,在新型数字化商业的解决方案中,将有 60% 使用 EDA,在商业组织参与的技术栈中,EDA 有一半的占比。 当下比较成功的企业已然认识到,要想最大限度提升运营效率和客户体验,务必要将业务和技术两方面的举措紧密结合起来。事件或业务形势的变化是时下众多企业关注的焦点,这些变化能够为企业领导者带来切实有用的信息,而架构设计的主旨恰恰是从客户联系人、交易、运营等方面的信息中获取洞见,两者相辅相成。传统技术历来对企业从事件中获取洞见的速度有着诸多限制,比如用于记录、收集和处理此类事件的批处理 ETL(提取、转换、加载)等。基于以上背景,阿里云 EventBridge 应运而生,通过事件的标准化和广泛的事件集成能力,帮助开发者轻松构建松耦合、分布式的事件驱动架构。同时阿里云 EventBridge 即将开源,敬请期待! 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~
#技术探索

2022年4月20日

EventBridge 集成云服务实践
EvenBridge 集成概述 EventBridge 是阿里云所推出了一款无服务器事件总线,其目标是拓展事件生态,打破系统间的数据孤岛,建立事件集成生态。提供统一的事件标准化接入及管理能力,完善集成与被集成通路,帮助客户快速实现事件驱动的核心原子功能,可将 EventBridge 快速集成至 BPM、RPA、CRM 等系统。 EventBridge 通过事件标准化,接入标准化,组件标准化三个方向作为支点拓展 EventBridge 事件生态: 事件标准化:拥抱 CloudEvents 1.0 开源社区标准协议,原生支持 CloudEvents 社区 SDK 和 API,全面拥抱开源社区事件标准生态; 接入标准化:提供标准事件推送协议 PutEvent,并支持 Pull 和 Push 两种事件接入模型,可有效降低事件接入难度,提供云上完善的事件接入标准化流程; 组件标准化:封装标准的事件下游组件工具链体系,包括 Schema 注册、事件分析、事件检索、事件仪表盘等。提供完善的事件工具链生态。 在集成领域 EventBridge 重点打造事件集成和数据集成两类核心场景,下面将围绕这两类场景具体展开描述。 事件集成 目前 EventBridge 已经拥有 80+ 云产品的事件源,800+ 种事件类型。整个事件生态还正在逐步丰富中。 那么,EventBridge 如何实现云产品的事件集成呢? 首先在 EventBridge 控制台可以看见一个名为 default 的事件总线,云产品的事件都会投递到这个总线; 然后点击创建规则,就可以选择所关心的云产品以及它的相关事件进行事件的监听和投递。 下面以两个例子为例,来看下 EventBridge 事件集成的方式。 OSS 事件集成 以 OSS 事件源为例,来讲解一下如何集成 OSS 事件。 OSS 事件现在主要分为 4 类,操作审计相关、云监控相关、配置审计相关、以及云产品相关的事件例如 PutObject 上传文件等等。其他的云产品的事件源也类似,基本都可以分为这几个类型的事件。 下面演示一下事件驱动的在线文件解压服务: 在 OSS Bucket 下面会有一个  zip 文件夹存放需要解压的文件,一个 unzip 文件夹存放解压后的文件; 当上传文件到 OSS Bucket 之后,会触发文件上传的事件并投递到 EventBridge 的云服务专用总线; 然后会使用一个事件规则过滤 zip 这个 bucket 的事件并投递到解压服务的 HTTP Endpoint; 解压服务会在收到事件之后,根据事件里面的文件路径从 OSS 下载文件解压,并在解压之后将文件传到 unzip 目录下; 同时,还会有一个事件规则,监听 unzip 目录的文件上传事件,并将事件转换后推送到钉钉群。 一起来看下是如何实现的: 前往下方链接查看视频: 1)首先创建一个 bucket,下面有一个 zip 目录用于存放上传的压缩文件,一个 unzip 目录用于存放解压后的文件。 2) 部署解压服务,并且暴露公网访问的地址。 解压服务的源码地址为: 也可以使用 ASK 直接部署,yaml 文件地址为: 3)创建一个事件规则监听 zip 目录下的上传文件的事件,并投递到解压服务的 HTTP  Endpoint。 这里使用 subject,匹配 zip 目录。 4)再创建一个事件规则监听 unzip 目录的事件,投递解压事件到钉钉群。 这里同样使用 subject,匹配 unzip 目录。 对于变量和模板的配置可以参考官方文档 : 。 EventBridge 会通过 JSONPath 的方式从事件中提取参数,然后把这些值放到变量中,最后通过模板的定义渲染出最终的输出投递到事件目标。OSS 事件源的事件格式也可以参考官方文档 : _ _,并根据实际的业务需要使用 JSONPath 定义变量。5)最后,通过 oss 控制台上传一个文件进行验证。 可以看到刚刚上传的 eventbridge.zip 已经解压到并上传上来了,也可以在钉钉群里面,收到解压完成的通知。此外,还可以在事件追踪这边查看事件的内容已经投递的轨迹。 可以看到有两个上传事件:一个是通过控制台上传的事件,一个是解压文件后上传的事件。 可以查看轨迹,都成功投递到了解压服务的 HTTP Endpoint 以及钉钉机器人。 以自定义事件源以及云产品事件目标的方式集成云产品 刚才演示的 demo 是集成云服务的事件源,下面再通过一个 demo 看一下如何通过以自定义事件源以及云产品事件目标的方式集成云产品。 前往下方链接查看视频: 这个 demo 的最终效果是通过 EventBridge 自动进行数据的清洗,并投递到 RDS 中去。事件内容是一个 JSON,拥有两个字段一个名字一个年龄,现在希望将把大于 10 岁的用户过滤出来并存储到 RDS 中。 整体的架构如图所示,使用一个 MNS Queue 作为自定义事件源,并通过 EventBridge 过滤并转换事件最终直接输出到 RDS 中去。 1)首先已经创建好了一个 MNS Queue,创建好一个 RDS 实例以及数据库表,表结构如下所示: 2)创建一个自定事件总线,选择事件提供方为 MNS,队列为提前创建好的队列; 创建好了之后,我们就可以在事件源这里看见一个已经正在运行中的事件源; 3)接下来创建规则投递到 RDS 配置的事件模式内容如下: { "source": [ "my.user" ], "data": { "messageBody": { "age": [ { "numeric": [ "", 10 ] } ] } } } 数值匹配可以参考官方文档:   4) 点击下一步,选择事件目标为数据库,填写数据库信息,配置转化规则,完成创建。 5)最后,先用 MNS Queue 发送一个消息,这个的 age 是大于 10 的。 可以看见这条事件就输出到了 RDS 里面了。 下面再发一个小于 10 的消息到 MNS Queue。 这条事件就被过滤掉了,没有输出到 RDS。 也可通过事件追踪查看事件: 可以看到一条事件成功投递到了 RDS,一条事件被过滤掉了,没有进行投递。 数据集成 事件流是 EventBridge 为数据集成提供的一个更为轻量化、实时的端到端的事件流试的通道,主要目标是将事件在两个端点之间进行数据同步,同时提供过滤和转换的功能。目前已经支持阿里云各消息产品之间的事件流转。 不同于事件总线模型,在事件流中,并不需要事件总线,其 1:1 的模型更加的轻量,直接到目标的方式也让事件更加的实时;通过事件流,我们可以实现不同系统之间的协议转换,数据同步,跨地域备份的能力。 下面将通过一个例子讲解如何使用事件流,将 RocketMQ 的消息路由到 MNS Queue,将两个产品集成起来。 整体的结构如图所示,通过EventBridge 将 RocketMQ 中 TAG 为 MNS 的消息路由到 MNQ Queue。 一起看下怎么实现: 前往下方链接查看视频: 首先创建一个事件流,选择源 RocketMQ 实例,填写 Tag 为 mns。 事件模式内容留空表示匹配所有。 目标选择 MNS,选择目标队列完成创建。 完成创建之后,点击启动,启动事件流任务。 事件流启动完成之后,我们就可以通过控制台或者 SDK 发送消息到源 RocketMQ Topic 里面。当有 Tag 为 mns 的时候,我们可以看见消息路由到了 mns;当有 Tag 不为 mns 的时候,消息就不会路由到 mns。 总结 本篇文章主要向大家分享了通过 EventBridge 如何集成云产品事件源,如何集成云产品事件目标以及通过事件流如何集成消息产品.
作者:李凯(凯易)
#行业实践 #生态集成