云原生中间件RocketMQ-消费者消费模式之广播模式、偏移量offset解析_rocketmq广播消费_共饮一杯无的博客-程序员宅基地

技术标签: java-rocketmq  云原生  Docker&K8S&云原生  中间件  消息队列  

PushConsumer消费模式-广播模式

广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
在这里插入图片描述
相比于集群模式,广播模式的特点为: 每个消费者都会消费所订阅的Topic + Tag下的所有queue中的所有消息。
适用场景&注意事项

  1. 广播消费模式下不支持顺序消息。
  2. 广播消费模式下不支持重置消费位点。
  3. 每条消息都需要被相同逻辑的多台机器处理。
  4. 广播模式下, 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次, 但是并不会对消费失败的消息进行失败重投, 因此业务方需要关注消费失败的情况。
  5. 广播模式下, 客户端每一次重启都会从最新消息消费。 客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择
  6. 广播模式下, 每条消息都会被大量的客户端重复处理, 因此推荐尽可能使用集群模式。
  7. 目前仅 Java 客户端支持广播模式。
  8. 广播模式下服务端不维护消费进度, 所以消息队列 RocketMQ 控制台不支持消息堆积查询、 消息堆积报警和订阅关系查询功能。
  9. 消费进度在客户端维护, 出现消息重复消费的概率稍大于集群模式。

设置成广播模式相关代码如下:

//设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

至少一次设计理念

在集群模式下,RocketMQ 可以保证Topic + Tag下的消息可以肯定会被整个集群至少消费一次。
在广播模式下,RocketMQ 可以保证至少被每台机器消费一次。
类似于数据库的事务操作,消费者未消费完成不返回ack给RocketMQ。官方对于至少一次的解释如下:
在这里插入图片描述
官方地址:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。

消息存储核心-偏移量Offset

Offset指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行定位到这条消息。Offset是消息消费进度的核心。

  • message queue 是无限长的数组,一条消息进来下标就会加1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
  • message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
  • fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费。

Offset的存储实现分为远程文件类型和本地文件类型两种方式。

集群模式-RemoteBrokerOffsetStore解析

默认集群模式clustering,采用远程文件存储Offset。
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分
这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。

广播模式-LocalFileOffsetStore解析

  1. 广播模式下,由于每个Consumer都会收到消息且消费
  2. 各个Consumer之间没有任何干扰,独立线程消费
  3. 所以使用LocalFileOffsetStore,也就是把Offset存储到本地

RocketMQ消费者拉取模式-PullConsumer使用

Pull方式主要做了三件事:

  • 获取Message Queue并遍历
  • 维护OffsetStore
  • 根据不同的消息状态做不同的处理

代码案例如下:
DefaultMQPullConsumer拉取:

package com.zjq.rocketmq.consumer.pull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import com.zjq.rocketmq.constants.Const;
 

public class PullConsumer {
    
	//Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
    //实际中可以放在redis里面或者持久化记录消费的位置
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
    
    	
    	String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //	从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //	遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
    
            System.out.println("Consume from the queue: " + mq);
            
            SINGLE_MQ: while (true) {
    
                try {
    
                	//	从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
    
	                    case FOUND:
	                    	List<MessageExt> list = pullResult.getMsgFoundList();
	                    	for(MessageExt msg : list){
    
	                    		System.out.println(new String(msg.getBody()));
	                    	}
	                        break;
	                    case NO_MATCHED_MSG:
	                        break;
	                    case NO_NEW_MSG:
	                    	System.out.println("没有新的数据啦...");
	                        break SINGLE_MQ;
	                    case OFFSET_ILLEGAL:
	                        break;
	                    default:
	                        break;
                    }
                }
                catch (Exception e) {
    
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
 
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
    
        offseTable.put(mq, offset);
    }
 
 
    private static long getMessageQueueOffset(MessageQueue mq) {
    
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
 
}

MQPullConsumerScheduleService定时拉取:

package com.zjq.rocketmq.consumer.pull;

import java.util.List;

import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.zjq.rocketmq.constants.Const;
 
public class PullScheduleService {
    

    public static void main(String[] args) throws MQClientException {
    
    	
    	String group_name = "test_pull_consumer_name";
    	
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
        
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        
        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        
        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
    
 
            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
    
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId()  + "-------------");
                try {
    
                    // 获取从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;
 
                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
    
                    case FOUND:
                    	List<MessageExt> list = pullResult.getMsgFoundList();
                    	for(MessageExt msg : list){
    
                    		//消费数据...
                    		System.out.println(new String(msg.getBody()));
                    	}
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);
          
                }
                catch (Exception e) {
    
                    e.printStackTrace();
                }
            }
        });
 
        scheduleService.start();
    }
}


参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md

本文内容到此结束了,
如有收获欢迎点赞收藏关注️,您的鼓励是我最大的动力。
如有错误疑问欢迎各位大佬指出。
主页共饮一杯无的博客汇总

保持热爱,奔赴下一场山海

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_35427589/article/details/126448022

智能推荐

mysql的where字句调优_mysql中select和where子句优化的方法有哪些-程序员宅基地

mysql中select和where子句优化的方法有哪些发布时间:2020-12-05 09:47:31来源:亿速云阅读:93作者:小新小编给大家分享一下mysql中select和where子句优化的方法有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!数据库优化:1.可以在单个SQL语句,整个应用程序,单个数据库服务...

【数据结构基础】线性表的链式存储结构--单链表-程序员宅基地

n个结点链结成一个链表,即为线性表的链式存储结构,因为此链表的每个结点中只包含一个指针域,所以叫做单链表。 线性表的单链表存储结构:typedef struct Node{ ElemType data; struct Node *next;}Node;typedef struct Node *LinkList; /* 定义LinkList */ 单链

c++编写病毒入门_怎么用c++写病毒-程序员宅基地

本连载文章只讨论写病毒的技术,并不讨论危害计算机及网络,所示例的程序只是一个无危害的模板,你可以在技术范围及法律范围内扩充实验.在读本程序前请保证不用此程序进行违法活动,否则,请马上离开.连载1——最基本的病毒.本病毒的功能:1.在C、D、E盘和c:\windows\system、c:\windows中生成本病毒体文件2.在C、D、E盘中生成自动运行文件3.注册c:\w_怎么用c++写病毒

tensorflow1.15安装笔记_tyflow1.015-程序员宅基地

conda create --name tf1.15 python=3.6conda activate tf1.15conda install tensorflow == 1.15_tyflow1.015

oracle中两级联查求和,oracle 级联查询 级联求和 汇总_牡丹一抹红的博客-程序员宅基地

级联查询select level||'层',lpad('*',level*5)||id id ,connect_by_isleaffrom teststart with superid = '0' connect by prior id=superid;/*------method one------*/select superid,ltrim(max(sys_connect_by_path(id..._多个库级联如何求和

雨笋教育渗透项目分享:实战从代码审计到Getshell_雨笋教育培训教程-程序员宅基地

最近雨笋教育黄佳院长,忙着打比赛、忙着做裁判、忙着上课,难得挤出一点时间给大家做技术分享,一定要认真看哦,跟着大佬学起来!欢迎转发和关注,分享给你身边有需要的朋友0x01在对某目标的渗透中,发现某端口开放了一个SpringBoot服务扫描SpringBoot的常见信息泄漏的路由,如/actuator、/env、/swagger-ui.html等都未果。后续通过其他方式拿到了目标服务的源码,需要进行代码审计。0x02 Code Audit对于Java应用,审计首先看lib目录下_雨笋教育培训教程

随便推点

IP Camera 基础知识_ip camera 原理-程序员宅基地

http://bbs.cps.com.cn/cps-574624-1-1.html网络摄像机基础知识: a$ }# x# M. @7 [2 @& ], o8 e8 Z6 }4 i目录一,什么是网络摄像机 1二,网络摄像机的工作原理 1三,网络摄像机产品结构: 29 @/ x( Q2 W) f五,如何选择网络摄像机_ip camera 原理

Swift字典-程序员宅基地

本文主要是对swift字典类型的介绍

python rsa加密长度_RSA公钥大小python-程序员宅基地

公钥由两部分组成:模和公共指数。模数的大小决定键的大小。因此,如果给密钥对生成器的大小是2048位。公共指数可以是任意值,也可以是2048位。然而,它通常很小。现在它通常被设置为65537值,在十六进制中是010001。它是一个特殊的数,叫做费马的第四素数,通常用“F4”表示。在公钥结构应该包含这两个组件。对于任何非对称原语(如RSA),编码的密钥大小通常大于密钥大小。除此之外,is可能包含开销(..._python根据模数和指数生成rsa 4096长度的公钥

PowerShell获取当前用户的权限-程序员宅基地

function Get-CurrentUserRoles { $SecurityPrinciple = New-Object -TypeName System.Security.Principal.WindowsPrincipal -ArgumentLis...

android 计算器的进制转换代码,简易进制转换计算器-程序员宅基地

var flag = false;var fuhao;var op1;var pastrd = 10;function jinzhi(m) {if (pastrd == 10) {document.getElementById("pm").innerHTML = (parseInt(document.getElementById("pm").innerHTML)).toString(m);} el..._android进制转换计算器

linux 谷歌浏览器debugger,chrome浏览器下的xdebug helper使用教程_AI Box专栏小助手的博客-程序员宅基地

安装了xdebug后,发现每次调试都需要从eclipse中先从头启动,然后一步步走到你要调试的页面,而不是说想什么时候调试就什么时候调试。之前用zenddebugger的时候则是可以在任意页面启动调试,直接从浏览器通知开发环境需要调试。而不用先从开发环境启动调试。随时需要调试的时候就可以执行调试。后来发现了chrome浏览器有一款插件叫xdebug helper,火狐下也有easy xdebug,..._xdebug 谷歌浏览器