SpringBoot整合ActiveMQ实现延时消息队列_activemq延时队列-程序员宅基地

技术标签: spring boot  SpringBoot  

1、修改mq配置文件 activemq.xml

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

添加上

schedulerSupport="true"

2、pom文件添加

 <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.16.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.16.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

3、springboot配置文件添加

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
spring.activemq.password=admin
spring.activemq.user=admin

4、添加MQ代码配置  必须步骤

package com.zsj.daka.daka.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;

/**
 * @Description TODO
 * @Date 2020/9/2 18:04
 * @Author zsj
 */
@Configuration
public class ActiveMqConfig {

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
        // 设置信任序列化包集合
        List<String> models = new ArrayList<>();
        models.add("com.zsj.daka.daka.model");
        factory.setTrustedPackages(models);

        return factory;
    }

}

5、消息类

package com.zsj.daka.daka.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * @Description TODO
 * @Date 2020/9/2 17:56
 * @Author zsj
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageModel implements Serializable {

    private String message;

    private String title;
}

6、消息生产者

package com.zsj.daka.daka.quartz;

/**
 * @Description TODO
 * @Date 2020/9/2 17:43
 * @Author zsj
 */

import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.io.Serializable;

@Slf4j
@Service("producer")
public class Producer {
    @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
    private JmsMessagingTemplate jmsTemplate;


    /**
     * 发送消息
     *
     * @param destination destination是发送到的队列
     * @param message     message是待发送的消息
     */
    public <T extends Serializable> void send(Destination destination, T message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    /**
     * 延时发送
     *
     * @param destination 发送的队列
     * @param data        发送的消息
     * @param time        延迟时间
     */
    public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;
        // 获取连接工厂
        ConnectionFactory connectionFactory = jmsTemplate.getConnectionFactory();
        try {
            // 获取连接
            connection = connectionFactory.createConnection();
            connection.start();
            // 获取session,true开启事务,false关闭事务
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            producer = session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            ObjectMessage message = session.createObjectMessage(data);
            //设置延迟时间
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            // 发送消息
            producer.send(message);
            log.info("发送消息:{}", data);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

7、消息消费者

package com.zsj.daka.daka.quartz;

/**
 * @Description TODO
 * @Date 2020/9/2 17:44
 * @Author zsj
 */

import com.zsj.daka.daka.model.MessageModel;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.util.Date;


@Component
public class Consumer {

    // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")
    public void receiveQueue(MessageModel messageModel) throws InterruptedException {
        System.out.println("consumer接收到"+messageModel.getTitle()+"的请求并处理完毕,时间是"+new Date());
    }


}

8、生产消息入口

package com.zsj.daka.daka.quartz;

import com.zsj.daka.daka.model.MessageModel;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.jms.Destination;

/**
 * @Description TODO
 * @Date 2020/9/2 17:44
 * @Author zsj
 */
@Controller
public class UserController {
    @Autowired
    private Producer producer;

    /**
     * 消息队列实现方式
     */
    @RequestMapping("/queue")
    @ResponseBody
    public String queue() {
        Destination destination = new ActiveMQQueue("mytest.queue");

        for (int i = 0; i < 10; i++) {
            MessageModel messageModel = MessageModel.builder()
                    .message("测试消息" + i)
                    .title("消息" + i)
                    .build();
            // 发送消息
            producer.send(destination, messageModel);
        }

        return "queue 发送成功";
    }

    @RequestMapping("/delay")
    @ResponseBody
    public String delay() {
        Destination destination = new ActiveMQQueue("mytest.queue");

        for (int i = 0; i < 10; i++) {
            MessageModel messageModel = MessageModel.builder()
                    .message("测试消息" + i)
                    .title("消息" + i)
                    .build();
            // 延时发送消息 单位毫秒
            producer.delaySend(destination, messageModel,60*1000L);
        }

        return "delay 发送成功";
    }

}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zsj777/article/details/108367504

智能推荐

CESM2:基本框架_clm5_incesm221.tar.gz-程序员宅基地

文章浏览阅读2.1k次,点赞5次,收藏12次。文章目录1. 基本框架2. CESM components3. grid4. compset4.1 命名规范:4.2 分类4.3 options4.4 Examples of simulations using different compsets:1. 基本框架CouplerAtmosphere ModelsCAM,CAM-CHEM,WACCM Community Atmosphere ModelDATM Climatological Data ModelLand Mo_clm5_incesm221.tar.gz

python定时框架APScheduler使用_python如何正确关闭调度器-程序员宅基地

文章浏览阅读674次,点赞2次,收藏2次。文章目录APScheduler安装apscheduler的组成常见调度器配置调度器启动/关闭调度器事件监听作业及作业存储Job是框架承接目前需要执行的工作和任务,我们可以在系统运行过程中进行动态的增加、修改、删除、查询等操作。1.添加作业2.移除作业3.暂停,恢复作业4.获得调度作业的列表作业控制APScheduler安装pip install apschedulerapschedule..._python如何正确关闭调度器

控制Android充电震动的代码,Android手机使用Tasker控制充电-程序员宅基地

文章浏览阅读667次。起因看到网上有人说,备用机长时间插在电源上会导致电池鼓包,虽然不确定真假,但还是“宁可信其有,不可信其无”,安全第一嘛。工具taskeresp8266开发板带光耦的继电器usb公头,母头,线材若干电烙铁(非必需,如果直接截断一根usb线接到继电器上的话,4也可以省了)操作基本流程tasker在电量变化时通知单片机当前电量,单片机收到手机发来的http post包后查看当前电量,并决定是否操作继电器..._tasker 震动

使用 matplotlib 绘制简单图表(折线图、柱形图、条形图、堆积面积图、直方图、饼图、散点图、箱形图、雷达图、误差棒图)_用matplotlib绘制图-程序员宅基地

文章浏览阅读2.1k次,点赞4次,收藏23次。X轴代表日期,y轴代表温度,由图可以清晰的看出北京市未来15天的最高气温和最低气温都呈现逐步下降后反弹的趋势。————2013-2019财年某电商平台的GMV。中的pyplot模块、numpy模块并设置中文。中的pyplot模块、numpy模块并设置中文。中的pyplot模块、numpy模块并设置中文。中的pyplot模块、numpy模块并设置中文。中的pyplot模块、numpy模块。中的pyplot模块、numpy模块。中的pyplot模块、numpy模块。中的pyplot模块、numpy模块。_用matplotlib绘制图

WebView内存泄漏解决方法_web-view memory access out-程序员宅基地

文章浏览阅读292次。##1.新开进程在WebView所在的activity新开辟一个进程,在onDestroy中杀死WebView所在的进程@Overridepublic void onDestroy() { android.os.Process.killProcess(android.os.Process.myPid()); super.onDestroy();}清单文..._web-view memory access out

Springboot整合PageHelper分页插件,java分布式开发面试题-程序员宅基地

文章浏览阅读227次,点赞3次,收藏7次。可能有人会问我为什么愿意去花时间帮助大家实现求职梦想,因为我一直坚信时间是可以复制的。我牺牲了自己的大概十个小时写了这片文章,换来的是成千上万的求职者节约几天甚至几周时间浪费在无用的资源上。上面的这些(算法与数据结构)+(Java多线程学习手册)+(计算机网络顶级教程)等学习资源6072290)][外链图片转存中…(img-S0YU0wqO-1711156072290)]上面的这些(算法与数据结构)+(Java多线程学习手册)+(计算机网络顶级教程)等学习资源本文已被。

随便推点

闵可夫斯基引擎Minkowski Engine_minkowskiengine-程序员宅基地

文章浏览阅读4.7k次。闵可夫斯基引擎Minkowski EngineMinkowski引擎是一个用于稀疏张量的自动微分库。它支持所有标准神经网络层,例如对稀疏张量的卷积,池化,解池和广播操作。有关更多信息,请访问文档页面。闵可夫斯基引擎Minkowski Enginepip install git+https://github.com/NVIDIA/MinkowskiEngine.git稀疏张量网络:空间稀疏张量的神经网络压缩神经网络以加快推理速度并最小化内存占用已被广泛研究。用于模型压缩的流行技术之一是修剪卷积网络中_minkowskiengine

LayUI-Table-添加禁止选中-程序员宅基地

文章浏览阅读1.6k次。LayUI这几年比较流行,里面的Table组件也比较强大,但是前面的CheckBox没有禁止选中功能,今天就来试试,看看能不能给添加一个禁止选中功能。Fork LayUI源码LayUI项目地址Clone到本地找到src里面lay下面的modules文件夹里面的table.js添加字段标记参照这里我们也添加一个"IS_DISABLE"的标记添加完如下 config: { ..._layui 行不允许选中

大数问题——26进制_z-26进制数是一个每位都是大写字母的数字。 a、b、c、…、x、y、z 分别依次代表一-程序员宅基地

文章浏览阅读4.3k次。杭电2100LovekeyProblem Description XYZ-26进制数是一个每位都是大写字母的数字。 A、B、C、…、X、Y、Z 分别依次代表一个0 ~ 25 的数字,一个 n 位的26进制数转化成是10进制的规则如下 A0A1A2A3…An-1 的每一位代表的数字为a0a1a2a3…an-1 ,则该XYZ-26进制数的10进制值就为 m = a0 * 26^(n-1) + a_z-26进制数是一个每位都是大写字母的数字。 a、b、c、…、x、y、z 分别依次代表一

程序员必知的8大排序(一)-------直接插入排序,希尔排序(java实现)_直接插入排序34,65,12,56,78,10-程序员宅基地

文章浏览阅读129次。程序员必知的8大排序(一)-------直接插入排序,希尔排序(java实现)原创 2012年05月12日 11:18:05标签:java /数据结构 /算法50847前几天,看到一篇前辈的博文“程序员必知的8大排序”,不禁的手痒起来,重新翻开严蔚敏老师的《数据结构》复习了一遍,然后一一的用java去实现,其中有不足之处,还望各位道友指正_直接插入排序34,65,12,56,78,10

Android:Json数据转换成Map_android json to hashmap-程序员宅基地

文章浏览阅读1.6k次。本文利用Gson来做实现,先导入:implementation 'com.google.code.gson:gson:2.8.6'主要利用的是JsonObject里的entrySet()方法,相关Demo代码如下:import com.google.gson.Gson;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import java.util.HashMap;import java.ut_android json to hashmap

R语言 quantmod下载股票代码无法访问Yahoo的唯一解决方法_r语言下载不了雅虎数据-程序员宅基地

文章浏览阅读394次。总结而言,当无法访问Yahoo时,使用quantmod包下载股票代码的一个可行解决方法是选择替代数据源,例如Alpha Vantage,并使用其提供的API密钥来获取数据。值得注意的是,使用Alpha Vantage作为替代数据源可能会有一些限制,例如每分钟请求的次数有限制,并且可能无法提供与Yahoo相同的数据范围和质量。getSymbols函数用于从指定的数据源获取股票数据,参数src用于指定数据源,api.key用于传递API密钥。在获得API密钥后,我们可以使用以下代码来获取股票代码。_r语言下载不了雅虎数据