轻松搞定RabbitMQ(六)——主题_weixin_30454481的博客-程序员秘密

技术标签: java  大数据  

转自 http://blog.csdn.net/xiaoxian8023/article/details/48806871

 

   翻译地址:http://www.rabbitmq.com/tutorials/tutorial-five-java.html

       在上一篇博文中,我们进一步改良了日志系统。使用Direct类型的转换器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解,请阅读:轻松搞定RabbitMQ(四)——发布/订阅

       虽然使用Direct类型的转换器改进了日志系统。但它仍然有一定的局限性——不能根据多重条件进行路由选择。

       在我们的日志系统中,我们可能不仅仅根据日志严重性订阅日志,也想根据发送源订阅。你可能从unix工具syslog了解过这个概念,它可以根据严重性(info/warning/crit…)和设备(auth/cron/kern…)转发日志。

       这将给我们更多的灵活性——我们可以订阅来自元‘cron’的致命错误日志,同时也可以订阅‘kern’的所有日志。

       为了在我们的日志系统中实现上述需求,我们需要了解更复杂的主题类型的转发器——Topic Exchange。


Topic exchange(主题转发器)

       发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

       绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

       举一个简单例子,如下图:


       在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

       我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

       路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

       如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

       另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

       注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。


完整实例

       我们将主题类型的转发器应用到日志系统中,路由格式为:“<设备>.<严重级别>”。

发送端(EmitLogTopic.java)

 

[java] view plain copy
 
  1. public class EmitLogDirect {  
  2.     private final static String EXCHANGE_NAME = "topic_logs";  
  3.   
  4.     public static void main(String[] args) throws IOException {  
  5.         /** 
  6.          * 创建连接连接到MabbitMQ 
  7.          */  
  8.         ConnectionFactory factory = new ConnectionFactory();  
  9.         // 设置MabbitMQ所在主机ip或者主机名  
  10.         factory.setHost("127.0.0.1");  
  11.         // 创建一个连接  
  12.         Connection connection = factory.newConnection();  
  13.         // 创建一个频道  
  14.         Channel channel = connection.createChannel();  
  15.         // 指定转发——广播  
  16.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  17.   
  18.         //所有设备和日志级别  
  19.         String[] facilities ={ "auth","cron","kern","auth.A"};  
  20.         String[] severities={ "error","info","warning"};  
  21.           
  22.         for(int i=0;i<4;i++){  
  23.             for(int j=0;j<3;j++){  
  24.             //每一个设备,每种日志级别发送一条日志消息  
  25.             String routingKey = facilities[i]+"."+severities[j%3];  
  26.               
  27.             // 发送的消息  
  28.             String message =" Hello World!"+Strings.repeat(".", i+1);  
  29.             //参数1:exchange name  
  30.             //参数2:routing key  
  31.             channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());  
  32.             System.out.println(" [x] Sent [" + routingKey +"] : '"+ message + "'");  
  33.             }  
  34.         }  
  35.         // 关闭频道和连接  
  36.         channel.close();  
  37.         connection.close();  
  38.     }  
  39. }  

 

消费者1(ReceiveLogs2Console.java)

 

[java] view plain copy
 
  1. public class ReceiveLogs2Console {  
  2.     private static final String EXCHANGE_NAME = "topic_logs";  
  3.   
  4.     public static void main(String[] argv) throws IOException, InterruptedException {  
  5.         ConnectionFactory factory = new ConnectionFactory();  
  6.         factory.setHost("127.0.0.1");  
  7.         // 打开连接和创建频道,与发送端一样  
  8.         Connection connection = factory.newConnection();  
  9.         final Channel channel = connection.createChannel();  
  10.   
  11.         //声明exchange  
  12.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  13.         // 声明一个随机队列  
  14.         String queueName = channel.queueDeclare().getQueue();  
  15.   
  16.         String[] routingKeys ={ "auth.*","*.info","#.warning"};//关注所有的授权日志、所有info和waring级别的日志  
  17.         for (String routingKey : routingKeys) {  
  18.             //关注所有级别的日志(多重绑定)  
  19.             channel.queueBind(queueName, EXCHANGE_NAME, routingKey);  
  20.         }  
  21.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  22.           
  23.         // 创建队列消费者  
  24.         final Consumer consumer = new DefaultConsumer(channel) {  
  25.               @Override  
  26.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  27.                 String message = new String(body, "UTF-8");  
  28.                 System.out.println(" [x] Received ["  + envelope.getRoutingKey() + "] :'" + message + "'");  
  29.               }  
  30.             };  
  31.             channel.basicConsume(queueName, true, consumer);  
  32.     }  
  33. }  

 

消费者2(ReceiveLogs2File.java)

 

[java] view plain copy
 
  1. public class ReceiveLogs2File {  
  2.     private static final String EXCHANGE_NAME = "topic_logs";  
  3.   
  4.     public static void main(String[] argv) throws IOException, InterruptedException {  
  5.         ConnectionFactory factory = new ConnectionFactory();  
  6.         factory.setHost("127.0.0.1");  
  7.         // 打开连接和创建频道,与发送端一样  
  8.         Connection connection = factory.newConnection();  
  9.         final Channel channel = connection.createChannel();  
  10.   
  11.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");  
  12.         // 声明一个随机队列  
  13.         String queueName = channel.queueDeclare().getQueue();  
  14.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
  15.   
  16.         String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。  
  17.         channel.queueBind(queueName, EXCHANGE_NAME, severity);  
  18.           
  19.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  20.           
  21.         // 创建队列消费者  
  22.         final Consumer consumer = new DefaultConsumer(channel) {  
  23.               @Override  
  24.               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
  25.                 String message = new String(body, "UTF-8");  
  26.                 //记录日志到文件:  
  27.                 print2File( "["+ envelope.getRoutingKey() + "] "+message);  
  28.               }  
  29.             };  
  30.             channel.basicConsume(queueName, true, consumer);  
  31.     }  
  32.       
  33.     private static void print2File(String msg) {  
  34.         try {  
  35.             String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();  
  36.             String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());  
  37.             File file = new File(dir, logFileName + ".log");  
  38.             FileOutputStream fos = new FileOutputStream(file, true);  
  39.             fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());  
  40.             fos.flush();  
  41.             fos.close();  
  42.         } catch (FileNotFoundException e) {  
  43.             e.printStackTrace();  
  44.         } catch (IOException e) {  
  45.             e.printStackTrace();  
  46.         }  
  47.     }    
  48. }  

 

       最终结果:

转载于:https://www.cnblogs.com/Damon-Luo/p/7838120.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_30454481/article/details/95290825

智能推荐

java中Class对象详解、类名.class, class.forName(), getClass()区别_路易斯睿齐的博客-程序员秘密

Java反射学习所谓反射,可以理解为在运行时期获取对象类型信息的操作。传统的编程方法要求程序员在编译阶段决定使用的类型,但是在反射的帮助下,编程人员可以动态获取这些信息,从而编写更加具有可移植性的代码。严格地说,反射并非编程语言的特性,因为在任何一种语言都可以实现反射机制,但是如果编程语言本身支持反射,那么反射的实现就会方便很多。1,获得类型类我们知道在Java中一切都是对象,我们一般所使用的对象都

Python读取excel 日期 时分秒_python 把一列 时分秒_amberom的博客-程序员秘密

import xlrdimport datetimefile=u"伏特加.xls"#注意读中文文件名稍微处理一下data=xlrd.open_workbook(file)table = data.sheet_by_index(0)#按照索引读Excel文件colContent=table.col_values(1)#读某一列,日期在第二列nrows=table.nrows #行数print nrowsncols = table.ncols#列数print "有%s列"%ncols #只.

Result Maps collection does not contain value for java.util.HashMap_tornado430的博客-程序员秘密

Result Maps collection does not contain value for java.util.HashMap出现上述错误 主要是因为select标签内部的resultMap、resultType属性指向的不正确注意:在mybatis .xml文件中只要有任何一个resultMap或resultType属性指向错误,则在这个文件中其余正确的语句也不能执行,con...

uni-app 安卓打包流程; 未获取AppKey或配置错误_dcloud_appkey__套码的汉子的博客-程序员秘密

打包后安装应用提示: 未获取AppKey或配置错误;移步最下边第7节uni-app 安卓打包流程(预备环节):1. 安装 SDK 和 android studio2. 申请证书获取Appkey3. (第一步和第二步按照uni-app官方文档都可以顺利完成)官方文档 https://nativesupport.dcloud.net.cn/AppDocs/usesdk/android?id=versionCode建议下载官方的SDK 会包含 HBuilder-Integrate-AS 项目(在 an

ue4 设置intellisence_虚幻引擎4设置Visual Studio_weixin_39683598的博客-程序员秘密

转自:http://www.unrealchina.net/portal.php?mod=view&amp;aid=149设置Visual Studio和虚幻引擎4协同工作有利于提高开发人员使用UE4 的效率和整体用户体验。On this page:推荐设置Intellisense(智能编码)、Live Errors(实时错误)和Squiggles(波浪线提示)实现细节UnrealVS 插件调试针...

如何创建一个简单的二叉树(TreeNode)?_怎样创建treenode_南丘xf的博客-程序员秘密

我们都知道,数据结构最典型的就是数组和链表,在《算法图解》一书中,详细介绍了数组和链表的优缺点:数组查询快(下标),但删除或者插入就比较慢(遍历)链表与之相反,删除和插入元素很快,但查找很慢所以二叉树就应运而生了,它结合二者的优点,取二家之长,但是在实际编程中,大多时候,根本用不到二叉树(实际编程中,我从来没用过二叉树,但是面试可能会问啊,所以趁有空学习总结下),网上找了一下二叉树的应用场...

随便推点

中断和异常的处理_ProgrammingRing的博客-程序员秘密

本文为 第17章笔记中断和异常中断和异常概述中断和异常的作用是指示系统中的某个地方发生一些事件, 需要引起处理器(包括正在执行中的程序和任务)的注意. 当中断和异常发生时, 典型的结果是迫使处理器将控制从当前正在执行的程序或任务转移到另一个历程或任务中去. 该例程叫做中断处理程序, 或者异常处理程序. 如果是一个任务, 则发生任务切换.1. 中断(Interrupt)

Linux 生产者消费者问题代码实现_linux生产者消费者源程序代码_猪猪爱芮芮的博客-程序员秘密

进程间通信(Linux):使用多线程和信号量解决生产者/消费者问题:有一个长度为N的缓冲池被生产者和消费者共同使用。只要缓冲池未满,生产者就可以将消息送入缓冲池中;只要缓冲池不空,消费者便可从缓冲池中取走一个消息。生产者向缓冲池放入消息的同时,消费者不能操作缓冲池,反之亦然。设计要求:(1) 说明设置哪些信号量?信号量的含义和初始值是什么?并用信号量和P、V操作写出进程的同步与互斥算法。(注:...

面试问答汇总_靠谱的内推君的博客-程序员秘密

面试问答汇总目 录一、你最大的优点是什么?(回答技巧及范例)二、 “你有什么问题要问我的吗?” 向面试官提问三、自我介绍四、你为什么会选择你目前学习的专业呢?五、你有过和别人合作的经历吗?六、说说您的缺点,好吗?七、在人际沟通上是否曾经有过不和谐?八、能说一说你未来的职业生涯规划吗?九、在被问到家庭情况的时候,应该怎么回答?十、多个问题同时出现时,应该如何解决十一、关于面试礼仪十二、小组面试十三、回答“你的最大缺点是什么”技巧及范例十四、面试.

mongodb导出数据_error validating settings: invalid collection name_benben_2015的博客-程序员秘密

mongoDB通过mongoexport程序将mongodb实例中的数据存储在json或csv文件中导出。例如:mongoexport --host mongodb1.example.net --port 37017 --username user --password &amp;quot;pass&amp;quot; --collection contacts --db marketing --out mdb-example.j...

进阶的阿牛哥之用python向多人发送邮件(带附件)、遍历邮件获取内容(两种方法:imbox、imaplib)_python发送邮件给多人_进阶的阿牛哥的博客-程序员秘密

一、使用的库这个程序涉及两个库:smtplib 和 email这两个库都是Python自带的,所以不需要额外的下载安装。二、思路和步骤总体思路很简单,就像我们平常上网是通过HTTP协议一样,我们发送邮件是通过SMTP(Simple Mail Transfer Protocol,简单邮件传输协议)来传输的,而现在我们需要做的就是:(1)开启邮箱 SMTP 服务:以 QQ 邮箱为例,开启 SMTP 的路径是:邮箱首页 → 设置 → 账户 → POP3/IMAP/SMTP/Exchange/CardD

推荐文章

热门文章

相关标签