消息队列高手课1-程序员宅基地

技术标签: java  kafka  rabbitmq  


提示:以下是本篇文章正文内容,下面案例可供参考

一、模型

1.RabbitMQ 的消息模型


同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。这也可以变相地实现新发布 - 订阅模型中,“一份消息数据可以被多个订阅者来多次消费”这样的功能。

2.RocketMQ 的消息模型

RocketMQ消息模型
RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。

3.Kafka同RocketMQ相同

二、事务处理

在这里插入图片描述
首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。

半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。

Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ 则给出了另外一种解决方案。

RocketMQ 中的分布式事务实现
在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功还是失败。

在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。

综合上面讲的通用事务消息的实现和 RocketMQ 的事务反查机制,使用 RocketMQ 事务消息功能实现分布式事务的流程如上图。

几个问题:

  1. 对于上面订单的例子,为什么不等待订单创建成功再向消息队列发送订单数据呢?这样串行的话,对性能影响应该也不大,也不用考虑订单创建失败而发送消息的情况了。
    作者回复: 考虑这样一种情况:订单创建成功了,还没来得及发消息,这个节点突然断电了
  2. 先开启本地事务,然后创建订单,订单创建成功后再发消息,根据发消息是否成功来决定提交还是回滚本地事务。这样不需要事务消息也能解决这个场景的问题了?还是说我考虑的不够全面。
    作者回复: 如果本地事务提交失败,已经发出去的消息是无法撤回的,会导致数据不一致。

三、如何确保消息不会丢失

1.检测消息丢失的方法

我们可以利用消息队列的有序性来验证是否有消息丢失。如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

2.确保消息可靠传递

  1. 在生产阶段,你需要捕获消息发送的错误,并重发消息。
  2. 在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
  3. 在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。

消费做好幂等性来处理重复消费消息的问题。

四、处理消费过程中的重复消息

在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。

At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

1.消费端幂等性

幂等性:其任意多次执行所产生的影响均与一次执行的影响相同。

1). 利用数据库的唯一约束实现幂等

支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,你可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

2). 为更新的数据设置前置条件

给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

3). 记录并检查操作

给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。分布式锁来辅助全局唯一ID执行保证分布式中的幂等性。

五、消费积压的处理

优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。

对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_44546038/article/details/124687625

智能推荐

理解崩溃和崩溃日志(WWDC 2018 session 414)_4418 崩溃-程序员宅基地

文章浏览阅读1.8k次。WWDC 2018 session 414: Understanding Crashes and Crash Logs每个人在写代码的时候,或多或少都会犯错。有的错误就会导致程序崩溃,这非常影响用户体验。本session主要介绍崩溃的原理,他们为什么会发生,以及如何查看、分析崩溃日志,找到并解决问题。基础知识崩溃是什么?崩溃指的是应用程序在尝试执行不允许的操作时,突然中止的..._4418 崩溃

九度1167 数组排序-程序员宅基地

文章浏览阅读1.2k次。/********************************* * 日期:2013-1-29 * 作者:SJF0115 * 题号: 九度1167 * 题目:数组排序 * 来源:http://ac.jobdu.com/problem.php?pid=1167 * 结果:AC * 题意: * 总结:****************

组合预测模型 | GA-LSTM遗传算法优化长短期记忆网络多输入单输出数据回归预测模型(Matlab程序)-程序员宅基地

文章浏览阅读397次。组合预测模型 | GA-LSTM遗传算法优化长短期记忆网络多输入单输出数据回归预测模型(Matlab完整程序)_ga-lstm

文件操作工具类FileUtil_fileutil依赖-程序员宅基地

文章浏览阅读1.1k次。分享一个文件处理的工具类,依赖如下: <dependency> <groupId>org.apache.ant</groupId> <artifactId>ant</artifactId> <version>1.10.5</versi..._fileutil依赖

(附源码)spring boot火车订票系统 毕业设计 031012_火车购票系统三层数据流-程序员宅基地

文章浏览阅读1.3k次,点赞9次,收藏42次。车订票系统主要功能模块包括系统用户管理、车票中心、购票订票、退票纪录,采取面对对象的开发模式进行软件的开发和硬体的架设,能很好的满足实际使用的需求,完善了对应的软体架设以及程序编码的工作,采取MySQL作为后台数据的主要存储单元,采用Springboot框架、JSP技术、Ajax技术进行业务系统的编码及其开发,实现了本系统的全部功能。_火车购票系统三层数据流

mysql计算上个月,MySQL查询以计算上个月-程序员宅基地

文章浏览阅读95次。I would like to calculate total order amount in the previous month.I got the query for getting the data for the present month from the current date.SELECT SUM(goods_total) AS Total_Amount FROM orders..._mysql last month

随便推点

Meta 的 LLaMa 2 许可证并非开源许可证-程序员宅基地

文章浏览阅读347次。作者有幸受邀参加 Linux 基金会 7 月 27 日在瑞士日内瓦举办的 Open Source Congress,议程如下:https://oscongress2023.sched.com/。我看到了互动讨论环节建议参会者预先阅读的一些报告和文章里,其中有一篇 OSI 的博客《Meta 的 LLaMa 2 许可证不是开源许可证》,特别引起了我的注意,也因此取得了 OSI 的同意,将它翻译出来..._2023年7月20日,开源组织osi(open source initiative)发文指出,llama 2所适用的许

XSS 跨站点脚本漏洞详解_xss变形-程序员宅基地

文章浏览阅读307次。xss攻击手法以及绕过防御_xss变形

关于BISS Key的教程-程序员宅基地

文章浏览阅读3.5k次。网上我们一般查询到这样一些数据,如何识别? 例1:-----------------------------------------------------------------------------------------KBS World Telkom 1 at 108.0°E 3972 H 2100-3/4 DVB-S2/8PSK MPEG-4 SID(In Hex):_biss key

回炉夜话 - 序-程序员宅基地

文章浏览阅读159次。有志足风流,惜诺自可亲 这是我大学时代信奉的格言。转眼年至不惑,回想人生倒也是感慨万千。 在这里,作为一个老码农,我想梳理下自己的技术栈。为继续做一个码农而努力。 一、首先,对于各种技术的掌握程度作出如下定义: 了解: 阅读过相关资料或书籍,有可能..._回炉夜话全集

Android问题解决--“signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 0xxxxxxx” 又出现了_to unreadable libraries. for unwinds of apps, only-程序员宅基地

文章浏览阅读1.2w次。今天,调试一个app,又出现“signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 0xxxxxx”问题了。而且只在Android10以上版本才会有,导致的现象是app崩溃,这怎么怎?问题log:signal 11 (SIGSEGV), code 2 (SEGV_ACCERR), fault addr 0x739ae8d004全部log如下:05-08 10:21:31.065 D/a.module(1890.._to unreadable libraries. for unwinds of apps, only shared libraries

工件SSMwar exploded 部署工件时出错。请参阅服务器日志了解详细信息_正在构建工件 'ssm0950my8t:war exploded': 正在复制文件…-程序员宅基地

文章浏览阅读1.9k次。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。由于监听器过早的生效时间导致我们自动注入的bean的引用名称还没有生效(实际上bean已经注入了,但是监听器此时识别不到,小写类名首字母也没有用),这时候就要用到自定义bean名称了!仔细想一下,查看我监听器的代码,监听器实现了ServletContextListener接口,是一个全局监听器,也就是项目刚启动是就会生效,于是我添加了一条输出信息,就是“进入监听器”..._正在构建工件 'ssm0950my8t:war exploded': 正在复制文件…