提示:以下是本篇文章正文内容,下面案例可供参考
同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能。
RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。
在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。
首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。
如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ 则给出了另外一种解决方案。
RocketMQ 中的分布式事务实现
在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。
在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。
这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。
综合上面讲的通用事务消息的实现和 RocketMQ 的事务反查机制,使用 RocketMQ 事务消息功能实现分布式事务的流程如上图。
几个问题:
我们可以利用消息队列的有序性来验证是否有消息丢失。如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。
消费做好幂等性来处理重复消费消息的问题。
在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。
At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
幂等性:其任意多次执行所产生的影响均与一次执行的影响相同。
支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。
给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。
给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。分布式锁来辅助全局唯一ID执行保证分布式中的幂等性。
优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。
对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。
文章浏览阅读1.8k次。WWDC 2018 session 414: Understanding Crashes and Crash Logs每个人在写代码的时候,或多或少都会犯错。有的错误就会导致程序崩溃,这非常影响用户体验。本session主要介绍崩溃的原理,他们为什么会发生,以及如何查看、分析崩溃日志,找到并解决问题。基础知识崩溃是什么?崩溃指的是应用程序在尝试执行不允许的操作时,突然中止的..._4418 崩溃
文章浏览阅读1.2k次。/********************************* * 日期:2013-1-29 * 作者:SJF0115 * 题号: 九度1167 * 题目:数组排序 * 来源:http://ac.jobdu.com/problem.php?pid=1167 * 结果:AC * 题意: * 总结:****************
文章浏览阅读397次。组合预测模型 | GA-LSTM遗传算法优化长短期记忆网络多输入单输出数据回归预测模型(Matlab完整程序)_ga-lstm
文章浏览阅读1.1k次。分享一个文件处理的工具类,依赖如下: <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant</artifactId> <version>1.10.5</versi..._fileutil依赖
文章浏览阅读1.3k次,点赞9次,收藏42次。车订票系统主要功能模块包括系统用户管理、车票中心、购票订票、退票纪录,采取面对对象的开发模式进行软件的开发和硬体的架设,能很好的满足实际使用的需求,完善了对应的软体架设以及程序编码的工作,采取MySQL作为后台数据的主要存储单元,采用Springboot框架、JSP技术、Ajax技术进行业务系统的编码及其开发,实现了本系统的全部功能。_火车购票系统三层数据流
文章浏览阅读95次。I would like to calculate total order amount in the previous month.I got the query for getting the data for the present month from the current date.SELECT SUM(goods_total) AS Total_Amount FROM orders..._mysql last month
文章浏览阅读347次。作者有幸受邀参加 Linux 基金会 7 月 27 日在瑞士日内瓦举办的 Open Source Congress,议程如下:https://oscongress2023.sched.com/。我看到了互动讨论环节建议参会者预先阅读的一些报告和文章里,其中有一篇 OSI 的博客《Meta 的 LLaMa 2 许可证不是开源许可证》,特别引起了我的注意,也因此取得了 OSI 的同意,将它翻译出来..._2023年7月20日,开源组织osi(open source initiative)发文指出,llama 2所适用的许
文章浏览阅读307次。xss攻击手法以及绕过防御_xss变形
文章浏览阅读3.5k次。网上我们一般查询到这样一些数据,如何识别? 例1:-----------------------------------------------------------------------------------------KBS World Telkom 1 at 108.0°E 3972 H 2100-3/4 DVB-S2/8PSK MPEG-4 SID(In Hex):_biss key
文章浏览阅读159次。有志足风流,惜诺自可亲 这是我大学时代信奉的格言。转眼年至不惑,回想人生倒也是感慨万千。 在这里,作为一个老码农,我想梳理下自己的技术栈。为继续做一个码农而努力。 一、首先,对于各种技术的掌握程度作出如下定义: 了解: 阅读过相关资料或书籍,有可能..._回炉夜话全集
文章浏览阅读1.2w次。今天,调试一个app,又出现“signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 0xxxxxx”问题了。而且只在Android10以上版本才会有,导致的现象是app崩溃,这怎么怎?问题log:signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 0x739ae8d004全部log如下:05-08 10:21:31.065 D/a.module(1890.._to unreadable libraries. for unwinds of apps, only shared libraries
文章浏览阅读1.9k次。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。由于监听器过早的生效时间导致我们自动注入的bean的引用名称还没有生效(实际上bean已经注入了,但是监听器此时识别不到,小写类名首字母也没有用),这时候就要用到自定义bean名称了!仔细想一下,查看我监听器的代码,监听器实现了ServletContextListener接口,是一个全局监听器,也就是项目刚启动是就会生效,于是我添加了一条输出信息,就是“进入监听器”..._正在构建工件 'ssm0950my8t:war exploded': 正在复制文件…