多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內最全IT社區平臺 聯系我們 | 收藏本站
阿里云優惠2
您當前位置:首頁 > php開源 > 綜合技術 > 深入學習RabbitMQ(一):mandatory標志的作用

深入學習RabbitMQ(一):mandatory標志的作用

來源:程序員人生   發布時間:2017-02-23 09:30:07 閱讀次數:2784次

        在生產者通過channel的basicPublish方法發布消息時,通常有幾個參數需要設置,為此我們有必要了解清楚這些參數代表的具體含義及其作用,查看Channel接口,會發現存在3個重載的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
        他們共有的參數分別是:
        exchange:交換機名稱
        routingKey:路由鍵
        props:消息屬性字段,比如消息頭部信息等等
        body:消息主體部份
        除此以外,還有mandatory和immediate這兩個參數,鑒于RabbitMQ3.0不再支持immediate標志,因此我們重點討論mandatory標志
        mandatory的作用:

        當mandatory標志位設置為true時,如果exchange根據本身類型和消息routingKey沒法找到1個適合的queue存儲消息,那末broker會調用basic.return方法將消息返還給生產者;當mandatory設置為false時,出現上述情況broker會直接將消息拋棄;通俗的講,mandatory標志告知broker代理服務器最少將消息route到1個隊列中,否則就將消息return給發送者;

       下面我們通過幾個實例測試下mandatory標志的作用:
        測試1:設置mandatory標志,且exchange未綁定隊列

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創建生產者
		Sender producer = new Sender(factory, count, exchangeName, routingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String routingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//發送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個參數是exchangeName(默許情況下代理服務器端是存在1個""名字的exchange的,因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話我們需要將該參數設置成創建的exchange的名字),第2個參數是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

       第45行我們將basicPublish的第3個參數mandatory設置成了true,表示開啟了mandatory標志,但我們沒有為當前exchange綁定任何隊列;

       通過wireshark抓包看到下面輸出: 

       可以看到最后履行了basic.return方法,將發布者發出的消息返還給了發布者,查看協議的Arguments參數部份可以看到,Reply-Text字段值為:NO_ROUTE,表示消息并沒有路由到適合的隊列中;

       那末我們該怎樣獲得到沒有被正確路由到適合隊列的消息呢?這時候候可以通過為channel信道設置ReturnListener監聽器來實現,具體實現代碼見下:

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創建生產者
		Sender producer = new Sender(factory, count, exchangeName, routingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String routingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.routingKey = routingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//發送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個參數是exchangeName(默許情況下代理服務器端是存在1個""名字的exchange的,
				//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
				//我們需要將該參數設置成創建的exchange的名字),第2個參數是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
			channel.addReturnListener(new ReturnListener() {
				
				@Override
				public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
						throws IOException {
					//此處便是履行Basic.Return以后回調的地方
					String message = new String(arg5);
					System.out.println("Basic.Return返回的結果:  "+message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}
       在設置了ReturnListener監聽器以后,broker(代理服務器)發出basic.return方法以后,就會回調第52行的handleReturn方法,在這個方法里面我們就能夠進行消息的重新發布操作啦;

       測試2:設置mandatory標志,且為exchange綁定隊列(路由鍵和綁定鍵1致)

public class ProducerTest {
	public static void main(String[] args) {
		String exchangeName = "confirmExchange";
		String queueName = "confirmQueue";
		String routingKey = "confirmRoutingKey";
		String bindingKey = "confirmRoutingKey";
		//String bindingKey = "confirmBindingKey";
		int count = 3;
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("172.16.151.74");
		factory.setUsername("test");
		factory.setPassword("test");
		factory.setPort(5672);
		
		//創建生產者
		Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
		producer.run();
	}
}

class Sender
{
	private ConnectionFactory factory;
	private int count;
	private String exchangeName;
	private String 	queueName;
	private String routingKey;
	private String bindingKey;
	
	public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
		this.factory = factory;
		this.count = count;
		this.exchangeName = exchangeName;
		this.queueName = queueName;
		this.routingKey = routingKey;
		this.bindingKey = bindingKey;
	}
	
	public void run() {
		try {
			Connection connection = factory.newConnection();
			Channel channel = connection.createChannel();
			//創建exchange
			channel.exchangeDeclare(exchangeName, "direct", true, false, null);
			//創建隊列
			channel.queueDeclare(queueName, true, false, false, null);
			//綁定exchange和queue
			channel.queueBind(queueName, exchangeName, bindingKey);
			//發送持久化消息
			for(int i = 0;i < count;i++)
			{
				//第1個參數是exchangeName(默許情況下代理服務器端是存在1個""名字的exchange的,
				//因此如果不創建exchange的話我們可以直接將該參數設置成"",如果創建了exchange的話
				//我們需要將該參數設置成創建的exchange的名字),第2個參數是路由鍵
				channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"條消息").getBytes());
			}
			channel.addReturnListener(new ReturnListener() {
				
				@Override
				public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
						throws IOException {
					//此處便是履行Basic.Return以后回調的地方
					String message = new String(arg5);
					System.out.println("Basic.Return返回的結果:  "+message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

        通過抓包發現其實不會有basic.return方法被調用,查看RabbitMQ管理界面發現消息已到達了隊列;



        測試3:設置mandatory標志,且exchange綁定隊列(路由鍵和綁定鍵不1致)

        代碼就是把測試2中第6行注釋,第7行注釋打開,注意到此時的routingKey和bindingKey是不1致的,此時我們運行程序,同時抓包得到下面截圖:


       注意1點,我們發送了3條消息,那末相應的應當履行3次basic.return,其中第1次和第2次basic.return顯示在1行上了,第3次是單唯一行,不要誤認為只履行了兩次,從協議的具體返回內容里我們一樣看到了Reply-Text字段值是NO_ROUTE,這類現象在測試1中已見過了;

       到此,我們明白了mandatory標志的作用:在消息沒有被路由到適合隊列情況下會將消息返還給消息發布者,同時我們測試了哪些情況下消息不會到達適合的隊列,測試1演示的是創建了exchange但是沒有為他綁定隊列致使的消息未到達適合隊列,測試3演示的是創建了exchange同時創建了queue,但是在將二者綁定的時候,使用的bindingKey和消息發布者使用的rountingKey不1致致使的消息未到達適合隊列;

       參考資料:

       RabbitMQ(2) AMQP協議mandatory和immediate標志位區分

       RabbitMQ之mandatory




生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關閉
程序員人生
主站蜘蛛池模板: 欧美性一区 | 国产亚洲精品成人一区看片 | 操你网| 亚洲欧美日韩综合一区久久 | 在线观看的网址 | 欧美天堂久久 | 青草超级碰碰在线视频 | 国产成人综合网亚洲欧美在线 | 亚洲a色 | 欧美专区日韩专区 | 波多野结衣国产一区 | 国产日产亚洲欧美综合另类 | 国产一区二区不卡免费观在线 | www.日本在线播放 | 中文字幕在线2021 | 亚洲高清一区二区三区久久 | 午夜性a一级毛片 | 日本特一级毛片免费视频 | 自拍偷拍第6页 | 日本道色综合久久影院 | 成年人网站在线观看视频 | 亚洲国产精品久久久久久网站 | 日本视频中文字幕一区二区 | 国产毛片a| h小视频在线观看 | 色综合天天综合网国产成人网 | 色94色欧美一区 | 欧美激情一级欧美精品 | 双性h啪啪樱桃动漫直接观看 | 国人精品视频在线观看 | 中日韩一区二区三区 | 宇都宫紫苑(rion)在线播放 | 色淫影院| 久久欧美精品 | 高清视频在线播放 | 欧美24video | 国产未成女年一区二区 | 免费一级做a爰片久久毛片潮 | 91精品欧美一区二区三区 | 一级做a爰片久久毛片图片 一级做a爰片欧美aaaa | 国产成人激情视频 |