为什么会产生消息队列?有几个原因:
《Java帝国之消息队列》
《一个故事告诉你什么是消息队列》
《到底什么时候该使用MQ》
代码:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092
开发语言:Erlang – 面向并发的编程语言。
AMQP是消息队列的一个协议。
下载地址:http://www.rabbitmq.com/download.html
下载:https://www.erlang.org/downloads (OTP 22.0 Windows 64-bit Binary File (94094976))
安装:
安装完成。
安装完成。
开始菜单里出现如下选项:
启动、停止、重新安装等。
3.2.3.启用管理工具
1、双击
2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:rabbitmq-plugins enable rabbitmq_management
这样就启动了管理工具,可以试一下命令:
停止:net stop RabbitMQ
启动:net start RabbitMQ
3、在浏览器中输入地址查看:http://127.0.0.1:15672/
4、使用默认账号登录:guest/ guest
3.3.1.安装Erlang
3.3.2.添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
使用yum安装:
sudo yum install erlang
3.3.3.安装RabbitMQ
上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
3.3.4.启动、停止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
3.3.5.设置开机启动
chkconfig rabbitmq-server on
3.3.6.设置配置文件
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
3.3.7.开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config
注意要去掉后面的逗号。
3.3.8.开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
3.3.9.防火墙开放15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
1、推荐使用默认的安装路径
2、系统用户名必须是英文
Win10改名字非常麻烦,具体方法百度
3、计算机名必须是英文
4、系统的用户必须是管理员
如果安装失败应该如何解决:
1、重装系统 – 不推荐
2、将RabbitMQ安装到linux虚拟机中
a)推荐
3、使用别人安装好的RabbitMQ服务
a)只要给你开通一个账户即可。
b)使用公用的RabbitMQ服务,在192.168.50.22
c)推荐
常见错误:
1、系统服务中有RabbitMQ服务,停止、启动、重启
2、打开命令行工具
如果找不到命令行工具,直接cd到相应目录:
输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
查看管理页面
通过默认账户 guest/guest 登录
如果能够登录,说明安装成功。
如果添加角色时报错–>用户管理(Not management user )
执行命令:
[[email protected]_0_3_centos ~]# rabbitmqctl list_users
Listing users ...
user tags
guest []
[[email protected]_0_3_centos ~]# rabbitmqctl set_user_tags guest administrator
Setting tags for user "guest" to [administrator] ...
[[email protected]_0_3_centos ~]# rabbitmqctl list_users
Listing users ...
user tags
guest [administrator]
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
选中Admin用户,设置权限:
看到权限已加:
项目下载地址:
https://download.csdn.net/download/zpcandzhj/10585077
5.2.1.图示
P:消息的生产者
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
5.2.2.导入RabbitMQ的客户端依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
5.2.3.获取MQ的连接
package com.zpc.rabbitmq.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("testhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
5.2.4.生产者发送消息到队列
package com.zpc.rabbitmq.simple;
import com.zpc.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
5.2.5.管理工具中查看消息
点击上面的队列名称,查询具体的队列中的信息:
5.2.6.消费者从队列中获取消息
package com.zpc.rabbitmq.simple;
import com.zpc.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
5.3.1.图示
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
5.3.2.消费者1
package com.zpc.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [y] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回确认状态,注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.3.3.消费者2
package com.zpc.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
//下面这行注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.3.4.生产者
向队列中发送100条消息。
package com.zpc.rabbitmq.work;
import com.zpc.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
5.3.5.测试
测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
打开上述代码的注释:
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
同时改为手动确认:
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);
测试:
消费者1比消费者2获取的消息更多。
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
手动模式:
自动模式:
5.6.1.图示
解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
5.6.2.消息的生产者(看作是后台系统)
向交换机中发送消息。
package com.zpc.rabbitmq.subscribe;
import com.zpc.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
5.6.3.消费者1(看作是前台系统)
package com.zpc.rabbitmq.subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "test_queue_work1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.6.4.消费者2(看作是搜索系统)
package com.zpc.rabbitmq.subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.6.5.测试
测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
在管理工具中查看队列和交换机的绑定关系:
5.7.1.图示
5.7.2.生产者
package com.mq.rabbitmq.route;
import com.mq.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 路由模式(跟订阅模式差不多)
* 生产者发往路由器,路由器根据条件发往不同的队列
*
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机,direct ->交换机类型
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String message = "插入商品";
// delete 消息key(关键字)
channel.basicPublish(EXCHANGE_NAME,"insert",null,message.getBytes());
System.out.println("[X] Send '" + message + "' ");
channel.close();
connection.close();
}
}
5.7.3.消费者1(假设是前台系统)
package com.mq.rabbitmq.route;
import com.mq.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* 路由模式(跟订阅模式差不多)
* 生产者发往路由器,路由器根据条件发往不同的队列
*
*/
public class Recv {
// 队列名称
private final static String QUEUE_NAME = "test_queue_direct1";
// 交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args)throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
// 同一时刻服务器只发一条信息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回
channel.basicConsume(QUEUE_NAME,false,consumer);
// 循环获取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String message = new String(body);
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
5.7.4.消费2(假设是搜索系统)
package com.mq.rabbitmq.route;
import com.mq.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
/**
* 路由模式(跟订阅模式差不多)
* 生产者发往路由器,路由器根据条件发往不同的队列
*
*/
public class Recv2 {
// 队列名称
private final static String QUEUE_NAME = "test_queue_direct2";
// 交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args)throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
// 同一时刻服务器只发一条信息
channel.basicQos(1);
// 定义消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回
channel.basicConsume(QUEUE_NAME,false,consumer);
// 循环获取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String message = new String(body);
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
5.8.1.图示
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
5.8.2.生产者
package com.zpc.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!!";
channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
5.8.3.消费者1(前台系统)
package com.zpc.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zpc.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_work_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv_x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5.8.4.消费者2(搜索系统)
package com.zpc.rabbitmq.topic;
import com.zpc.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_work_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2_x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
6.3.1.消费者
package com.zpc.rabbitmq.spring;
/**
* 消费者
*
* @author Evan
*/
public class Foo {
//具体执行业务的方法
public void listen(String foo) {
System.out.println("\n消费者: " + foo + "\n");
}
}
6.3.2.生产者
package com.zpc.rabbitmq.spring;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringMain {
public static void main(final String... args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
"classpath:spring/rabbitmq-context.xml");
//RabbitMQ模板
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
//发送消息
template.convertAndSend("Hello, 鸟鹏!");
Thread.sleep(1000);// 休眠1秒
ctx.destroy(); //容器销毁
}
}
6.3.3.配置文件
1、定义连接工厂
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="admin" password="admin"
virtual-host="testhost" />
2、定义模板(可以指定交换机或队列)
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory" exchange="fanoutExchange" />
3、定义队列、交换机、以及完成队列和交换机的绑定
<!-- 定义队列,自动声明 -->
<rabbit:queue name="zpcQueue" auto-declare="true"/>
<!-- 定义交换器,把Q绑定到交换机,自动声明 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="zpcQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
4、定义监听
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
</rabbit:listener-container>
<bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
5、定义管理,用于管理队列、交换机等:
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
完整配置文件rabbitmq-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="admin" password="admin"
virtual-host="testhost" />
<!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
<!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="fanoutExchange" routing-key="foo.bar" /> -->
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定义队列,自动声明 -->
<rabbit:queue name="zpcQueue" auto-declare="true"/>
<!-- 定义交换器,把Q绑定到交换机,自动声明 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="zpcQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- <rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="foo.*" />
</rabbit:bindings>
</rabbit:topic-exchange> -->
<!-- 队列监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="zpcQueue" />
</rabbit:listener-container>
<bean id="foo" class="com.zpc.rabbitmq.spring.Foo" />
</beans>
持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。
非持久化的性能高于持久化。
如何选择持久化?非持久化? – 看需求。
创建三个系统A,B,C
A作为生产者,B、C作为消费者(B,C作为web项目启动)
项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077
7.1.1.导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zpc</groupId>
<artifactId>myrabbitA</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>myrabbit</name>
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
</project>
7.1.2.队列和交换机的绑定关系
实现:
1、在配置文件中将队列和交换机完成绑定
2、可以在管理界面中完成绑定
a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
b)管理更加灵活
c)更容易对绑定关系的权限管理,流程管理
本例选择第2种方式
7.1.3.配置
rabbitmq-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="admin" password="admin"
virtual-host="testhost" />
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定义交换器,暂时不把Q绑定到交换机,在管理界面去绑定 -->
<!--<rabbit:topic-exchange name="topicExchange" auto-declare="true" ></rabbit:topic-exchange>-->
<rabbit:direct-exchange name="directExchange" auto-declare="true" ></rabbit:direct-exchange>
<!--<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" ></rabbit:fanout-exchange>-->
<!-- 定义Rabbit模板,指定连接工厂以及定义exchange(exchange要和上面的一致) -->
<!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="topicExchange" />-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchange" />
<!--<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />-->
</beans>
7.1.4.消息内容
方案:
1、消息内容使用对象做json序列化发送
a)数据大
b)有些数据其他人是可能用不到的
2、发送特定的业务字段,如id、操作类型
7.1.5.实现
生产者MsgSender.java:
package com.zpc.myrabbit;
import com.alibaba.fastjson.JSON;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 消息生产者
*/
public class MsgSender {
public static void main(String[] args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
"classpath:spring/rabbitmq-context.xml");
//RabbitMQ模板
RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
//发送消息
Map<String, Object> msg = new HashMap<String, Object>();
msg.put("type", "1");
msg.put("date", date);
template.convertAndSend("type2", JSON.toJSONString(msg));
Thread.sleep(1000);// 休眠1秒
ctx.destroy(); //容器销毁
}
}
7.2.1.导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zpc</groupId>
<artifactId>myrabbitB</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>myrabbit</name>
<properties>
<spring.version>4.1.3.RELEASE</spring.version>
<fastjson.version>1.2.46</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<!-- web层需要配置Tomcat插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<path>/testRabbit</path>
<uriEncoding>UTF-8</uriEncoding>
<port>8081</port>
</configuration>
</plugin>
</plugins>
</build>
</project>
7.2.2.配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="admin" password="admin"
virtual-host="testhost" />
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定义B系统需要监听的队列,自动声明 -->
<rabbit:queue name="q_topic_testB" auto-declare="true"/>
<!-- 队列监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testB" />
</rabbit:listener-container>
<bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
</beans>
7.2.3.具体处理逻辑
public class Listener {
//具体执行业务的方法
public void listen(String msg) {
System.out.println("\n消费者B开始处理消息: " + msg + "\n");
}
}
7.2.4.在界面管理工具中完成绑定关系
选中定义好的交换机(exchange)
1)direct
2)fanout
3)topic
(和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)
7.3.1.配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="127.0.0.1" port="5672" username="admin" password="admin"
virtual-host="testhost" />
<!-- MQ的管理,包括队列、交换器等 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 定义C系统需要监听的队列,自动声明 -->
<rabbit:queue name="q_topic_testC" auto-declare="true"/>
<!-- 队列监听 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="myMQlistener" method="listen" queue-names="q_topic_testC" />
</rabbit:listener-container>
<bean id="myMQlistener" class="com.zpc.myrabbit.listener.Listener" />
</beans>
7.3.2.处理业务逻辑
public class Listener {
//具体执行业务的方法
public void listen(String msg) {
System.out.println("\n消费者C开始处理消息: " + msg + "\n");
}
}
7.3.3.在管理工具中绑定队列和交换机
见7.2.4
7.3.4.测试
分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型
1、配置pom文件,主要是添加spring-boot-starter-amqp的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置application.properties文件
配置rabbitmq的安装地址、端口以及账户信息
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=testhost
3、配置队列
package com.zpc.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("q_hello");
}
}
4、发送者
package com.zpc.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
String context = "hello " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Q名
this.rabbitTemplate.convertAndSend("q_hello", context);
}
}
5、接收者
package com.zpc.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
6、测试
package com.zpc.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}
注册两个Receiver:
package com.zpc.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_hello")
public class HelloReceive1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
package com.zpc.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
生产者
public void send(int i) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
String context = "hello " + i + " " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Q名
this.rabbitTemplate.convertAndSend("q_hello", context);
}
测试发送100条消息
@Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
helloSender.send(i);
Thread.sleep(300);
}
}
首先对topic规则配置,这里使用两个队列(消费者)来演示。
1)配置队列,绑定交换机
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
final static String message = "q_topic_message";
final static String messages = "q_topic_messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
/**
* 声明一个Topic类型的交换机
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange("mybootexchange");
}
/**
* 绑定Q到交换机,并且指定routingKey
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
2)创建2个消费者
q_topic_message 和q_topic_messages
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
3)消息发送者(生产者)
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
}
}
send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
4)测试
package com.zpc.rabbitmq.topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTopicTest {
@Autowired
private MsgSender msgSender;
@Test
public void send1() throws Exception {
msgSender.send1();
}
@Test
public void send2() throws Exception {
msgSender.send2();
}
}
1)配置队列,绑定交换机
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue aMessage() {
return new Queue("q_fanout_A");
}
@Bean
public Queue bMessage() {
return new Queue("q_fanout_B");
}
@Bean
public Queue cMessage() {
return new Queue("q_fanout_C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("mybootfanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}
}
2)创建3个消费者
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_A")
public class ReceiverA {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_B")
public class ReceiverB {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_C")
public class ReceiverC {
@RabbitHandler
public void process(String hello) {
System.out.println("CReceiver : " + hello + "/n");
}
}
3)生产者
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSenderFanout {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
}
}
4)测试
package com.zpc.rabbitmq.fanout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitFanoutTest {
@Autowired
private MsgSenderFanout msgSender;
@Test
public void send1() throws Exception {
msgSender.send();
}
}
结果如下,三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg
1、降低系统间耦合度
2、便于管理数据的同步(数据一致性)
《RabbitMQ详解》
《大型网站技术架构:核心原理与案例分析》
本文转载至
作者:niaobirdfly
原文:https://blog.csdn.net/hellozpc/article/details/81436980
本文阐述了计算智能算法的四种模型:模糊逻辑理论(FL)、人工神经网络(ANNs)、进化计算(EC)和群体智能(SI)。近年来,与计算智能理论相关的各种研究工作取得了重大进展,本文也介绍了包括群体智能(SI)、人工免疫系统(AIS)、量子计算(QC)和DNA计算(DNAC)等新发展的方法。
2019独角兽企业重金招聘Python工程师标准>>> ...
算法实现其中: Num_subc子载波数 Pt功率 gain_subc子载波增益 noise_var噪声功率 gap信噪比间隔B带宽 K最大分配子载波数 Rt数据速率%---------Water filling algo--------------function [bit_alloc,power_alloc,bit_theory]=waterfill(Num_subc,Pt,gain_...
UnitTest 一、UnitTest 基本使用 1. 什么是 UnitTest 框架? 概念:UnitTest 是 Python 自带的一个单元测试框架,用它来做单元测试。 2. 为什么使用 UnitTest 框架? 能够组织多个用例去执行; 提供丰富的断言方法; 能够生成测试报告; 3. UnitTest 核心要素 TestCase; TestSuite; TestRunner; TestLoader; Fixture;4. TestCase .
为什么80%的码农都做不了架构师?>>> ...
问题描述:idea中启动多个服务会在services中展示服务的信息和控制台,但是经常有一些启动类会变成灰色的,而且经常会自动消失,下次启动时需要手动再去启动,很麻烦。这是因为默认最多保存五个启动类,多余的会变成灰色,而且会被idea清理掉解决方法:1.首先打开启动配置2.选中灰色的配置类,然后点击下图中箭头所示的保存按钮,就会发现灰色的启动类变成了正常颜色,而且后面不会消失了,最后点击ok保存...
在使用解决方法一for循环处理用例读取的时候,如果新增测试用例文件testa.py,那么需要在__init__.py文件中编写import testa,还需要在测试用例列表文件的数据中增加相应测试用例名称,这样才能使新的测试用例添加到测试套件中执行,这样做显然不方便。在实际过程中,可能我们需要组织成百上千条测试用例,虽然我们可以通过导入包文件的方式添加测试用例,但每创建一个新的测试用例都需要在测试套件中增加一条add Test语句,随着用例的增加,不便于管理和维护。不然会影响测试用例的执行。
FastDFS踩坑1:报错链接超时22122或23000或Unable to borrow buffer from pool2:com.github.tobato.fastdfs.exception.FdfsServerException提示错误码2错误信息找不到节点或文件的解决方法强力参考:http://bbs.chinaunix.net/thread-1920470-1-1.html1:报错链接超时22122或23000或Unable to borrow buffer from pool原因:可能
Twisted是用Python实现的基于事件驱动的网络引擎框架。Twisted诞生于2000年初,一个可扩展性高、基于事件驱动、跨平台的网络开发框架。Twisted支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、IMAP、SSH、IRC以及FTP。就像python一样,Twisted也具有“内置电池”(batteries-included)(有很多封装好、有用的库...
电脑的稳定性测试被一些电脑爱好者叫做“拷机”,顾名思义,烤机测试会带来相当高的温度,实际上电脑的稳定性测试指的就是让电脑满负荷运行一段时间,观察电脑系统是否稳定运行,还有一些人通过烤机测试来测试出电脑的极限温度等等。既然电脑的稳定性测试会带来高温,那么电脑烤机是否会对电脑造成损伤,一般烤机的时长又应该以多长时间为准呢?快来一起看看吧!
unittest是Python自带的一个单元测试框架,unittest又被称为PyUnit,是由Java的JUnit衍生而来,基本结构是类似的。对于单元测试,需要设置预先条件,对比预期结果和实际结果。由于unittest是Python自带的标准模块,所以不需要单独再去安装。引入包i即可使用。在执行测试用例的过程中,最终用例是否执行通过,是通过判断测试得到的实际结果和预期结果是否相等决定的,这时会用到断言方法。
object 表达式创建匿名内部类的形式:object: ClassName {...} val handler:Handler=object: Handler(){ override fun handleMessage(msg: Message?) { super.handleMessage(msg) when(...