技术标签: java-rocketmq 云原生 Docker&K8S&云原生 中间件 消息队列
广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
相比于集群模式,广播模式的特点为: 每个消费者都会消费所订阅的Topic + Tag下的所有queue中的所有消息。
适用场景&注意事项:
设置成广播模式相关代码如下:
//设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
在集群模式下,RocketMQ 可以保证Topic + Tag下的消息可以肯定会被整个集群至少消费一次。
在广播模式下,RocketMQ 可以保证至少被每台机器消费一次。
类似于数据库的事务操作,消费者未消费完成不返回ack给RocketMQ。官方对于至少一次的解释如下:
官方地址:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
Offset指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行定位到这条消息。Offset是消息消费进度的核心。
Offset的存储实现分为远程文件类型和本地文件类型两种方式。
默认集群模式clustering,采用远程文件存储Offset。
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分
这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。
Pull方式主要做了三件事:
代码案例如下:
DefaultMQPullConsumer拉取:
package com.zjq.rocketmq.consumer.pull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import com.zjq.rocketmq.constants.Const;
public class PullConsumer {
//Map<key, value> key为指定的队列,value为这个队列拉取数据的最后位置
//实际中可以放在redis里面或者持久化记录消费的位置
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");
// 从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
// 遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
// 从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
System.out.println(pullResult.getPullStatus());
System.out.println();
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
System.out.println("没有新的数据啦...");
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
MQPullConsumerScheduleService定时拉取:
package com.zjq.rocketmq.consumer.pull;
import java.util.List;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import com.zjq.rocketmq.constants.Const;
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
try {
// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
//消费数据...
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
本文内容到此结束了,
如有收获欢迎点赞收藏关注️,您的鼓励是我最大的动力。
如有错误疑问欢迎各位大佬指出。
主页:共饮一杯无的博客汇总保持热爱,奔赴下一场山海。
mysql中select和where子句优化的方法有哪些发布时间:2020-12-05 09:47:31来源:亿速云阅读:93作者:小新小编给大家分享一下mysql中select和where子句优化的方法有哪些,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!数据库优化:1.可以在单个SQL语句,整个应用程序,单个数据库服务...
n个结点链结成一个链表,即为线性表的链式存储结构,因为此链表的每个结点中只包含一个指针域,所以叫做单链表。 线性表的单链表存储结构:typedef struct Node{ ElemType data; struct Node *next;}Node;typedef struct Node *LinkList; /* 定义LinkList */ 单链
本连载文章只讨论写病毒的技术,并不讨论危害计算机及网络,所示例的程序只是一个无危害的模板,你可以在技术范围及法律范围内扩充实验.在读本程序前请保证不用此程序进行违法活动,否则,请马上离开.连载1——最基本的病毒.本病毒的功能:1.在C、D、E盘和c:\windows\system、c:\windows中生成本病毒体文件2.在C、D、E盘中生成自动运行文件3.注册c:\w_怎么用c++写病毒
conda create --name tf1.15 python=3.6conda activate tf1.15conda install tensorflow == 1.15_tyflow1.015
级联查询select level||'层',lpad('*',level*5)||id id ,connect_by_isleaffrom teststart with superid = '0' connect by prior id=superid;/*------method one------*/select superid,ltrim(max(sys_connect_by_path(id..._多个库级联如何求和
最近雨笋教育黄佳院长,忙着打比赛、忙着做裁判、忙着上课,难得挤出一点时间给大家做技术分享,一定要认真看哦,跟着大佬学起来!欢迎转发和关注,分享给你身边有需要的朋友0x01在对某目标的渗透中,发现某端口开放了一个SpringBoot服务扫描SpringBoot的常见信息泄漏的路由,如/actuator、/env、/swagger-ui.html等都未果。后续通过其他方式拿到了目标服务的源码,需要进行代码审计。0x02 Code Audit对于Java应用,审计首先看lib目录下_雨笋教育培训教程
http://bbs.cps.com.cn/cps-574624-1-1.html网络摄像机基础知识: a$ }# x# M. @7 [2 @& ], o8 e8 Z6 }4 i目录一,什么是网络摄像机 1二,网络摄像机的工作原理 1三,网络摄像机产品结构: 29 @/ x( Q2 W) f五,如何选择网络摄像机_ip camera 原理
本文主要是对swift字典类型的介绍
公钥由两部分组成:模和公共指数。模数的大小决定键的大小。因此,如果给密钥对生成器的大小是2048位。公共指数可以是任意值,也可以是2048位。然而,它通常很小。现在它通常被设置为65537值,在十六进制中是010001。它是一个特殊的数,叫做费马的第四素数,通常用“F4”表示。在公钥结构应该包含这两个组件。对于任何非对称原语(如RSA),编码的密钥大小通常大于密钥大小。除此之外,is可能包含开销(..._python根据模数和指数生成rsa 4096长度的公钥
function Get-CurrentUserRoles { $SecurityPrinciple = New-Object -TypeName System.Security.Principal.WindowsPrincipal -ArgumentLis...
var flag = false;var fuhao;var op1;var pastrd = 10;function jinzhi(m) {if (pastrd == 10) {document.getElementById("pm").innerHTML = (parseInt(document.getElementById("pm").innerHTML)).toString(m);} el..._android进制转换计算器
安装了xdebug后,发现每次调试都需要从eclipse中先从头启动,然后一步步走到你要调试的页面,而不是说想什么时候调试就什么时候调试。之前用zenddebugger的时候则是可以在任意页面启动调试,直接从浏览器通知开发环境需要调试。而不用先从开发环境启动调试。随时需要调试的时候就可以执行调试。后来发现了chrome浏览器有一款插件叫xdebug helper,火狐下也有easy xdebug,..._xdebug 谷歌浏览器