RabbitMQ 實戰教程 路由
在上一個教程中,我們構建了一個簡單的日誌記錄系統。我們能夠向許多消費者廣播日誌消息。
在本教程中,我們將為其添加一個功能 - 我們將可以僅訂閱一部分消息。例如,我們將能夠僅將關鍵的錯誤消息寫入到日誌文件(以節省磁碟空間),同時仍然能夠在控制台上列印所有日誌消息。
翻譯自「RabbitMQ Tutorials」,部分內容增減。
原文地址 :https://www.rabbitmq.com/tutorials/tutorial-four-java.html
綁定(Bindings)
在上一個教程中,我們已經使用過綁定。你可能會記得如下代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是建立交換器和隊列之間的關係。這可以簡單地理解:隊列對該交換器上的消息感興趣。
為了避免與 basicPublish 方法的參數混淆,我們將其稱為綁定鍵。下面是我們如何用一個綁定鍵創建一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵的意義依賴於交換器的類型。以前我們以前使用的 fanout 類型的交換器可以忽略此參數。
直接交換(Direct exchange)
我們從上一個教程的日誌記錄系統向所有消費者廣播所有消息。現在,我們需要一個將錯誤日誌消息寫入磁碟,而不會把硬碟空間浪費警告或消息類型日誌消息上。
其中,第一個隊列與綁定鍵 orange 綁定,第二個隊列有兩個綁定,一個綁定鍵為 black,另一個綁定為 green。在這樣的設置中,具有 orange 的交換器的消息將被路由到隊列 Q1。具有 black 或 green 的交換器的消息將轉到 Q2。所有其他消息將被丟棄。
多重綁定(Multiple bindings)
此外,使用相同的綁定鍵綁定多個隊列是完全合法的。在我們的示例中,我們可以在 X 和 Q1 之間添加綁定鍵 black。在這種情況下,direct 類型的交換器將消息廣播到所有匹配的隊列 Q1 和 Q2。
發送日誌(Emitting logs)
我們將使用 direct 類型的交換器進行日誌記錄系統。
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
現在,我們準備發送一條消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
為了簡化代碼,我們假定 『severity』 是 『info』, 『warning』, 『error』 中的一個。
訂閱(Subscribing)
接收消息將像上一個教程一樣工作,除了一個例外 - 我們給我們所感興趣的嚴重性類型的日誌創建一個綁定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }案例實戰
發送端
發送端,連接到 RabbitMQ,發送一條數據,然後退出。
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"}; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 發送消息 for (int i = 0; i
接受端,不斷等待伺服器推送消息,然後在控制台輸出。
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"}; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 設置日誌級別 int rand = new Random().nextInt(3); String severity = LOG_LEVEL_ARR[rand]; // 創建一個非持久的、唯一的、自動刪除的隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定交換器和隊列 // queueBind(String queue, String exchange, String routingKey) // 參數1 queue :隊列名 // 參數2 exchange :交換器名 // 參數3 routingKey :路由鍵名 channel.queueBind(queueName, EXCHANGE_NAME, severity); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received " + message + " "); } }; channel.basicConsume(queueName, true, consumer); } }
現在,做一個實驗,我們開啟三個 ReceiveLogsDirect 工作程序:ReceiveLogsDirect1 、ReceiveLogsDirect2 與 ReceiveLogsDirect3。
ReceiveLogsDirect1
ReceiveLogsDirect2
[*] Waiting for messages. To exit press CTRL+C [*] LOG LEVEL : error [x] Received Liang-MSG log : [error]493dce2a-7ce1-4111-953c-99ab2564a2d0 [x] Received Liang-MSG log : [error]2446dd80-d5f0-4d39-888f-31579b9d2724 [x] Received Liang-MSG log : [error]fe8219e0-5548-40ba-9810-d922d1b03dd8 [x] Received Liang-MSG log : [error]797b6d0e-9928-4505-9c76-56043322b1f0
ReceiveLogsDirect3
[*] Waiting for messages. To exit press CTRL+C [*] LOG LEVEL : debug [x] Received Liang-MSG log : [debug]c05eee3e-b820-4b69-9c3f-c2bbded85195 [x] Received Liang-MSG log : [debug]4645c9ba-4070-41d7-adc9-7f8b2df1e3c8 [x] Received Liang-MSG log : [debug]d3d3ad5c-8f97-49ea-8fd6-c434790e40eb
此時,ReceiveLogsDirect1 、ReceiveLogsDirect2 與 ReceiveLogsDirect3 同時收到了屬於自己級別的消息。
(完)
※AtomicInteger 與樂觀鎖
※EA 商業遊戲新作 NBA LIVE 2018 正式出爐,籃球遊戲又迎來血戰
※原來,中國的設計師一直缺一個像樣的協同工具
※13 年來,我寫了這些糟糕的遊戲代碼
TAG:推酷 |
※Apple停止使用AirPort WiFi路由器
※網關 Spring-Cloud-Gateway 源碼解析——路由之RouteDefinitionLocator一覽
※Spring Cloud Gateway的After路由斷言工廠
※PCIe掃盲——TLP路由之Implicit Routing
※Roaming Mantis:通過Wi-Fi路由器感染智能手機
※使用Istio控制Serverless架構Fn Project中的函數間流量路由
※Wi-Fi EasyMesh協議讓路由器變身網狀網
※Android項目解耦-路由框架ARouter源碼解析
※Kotlin打造Android路由框架
※摸索:Istio 路由規則 Alpha v3
※使用 Quagga 實現 Linux 動態路由
※OpenFlow下的「路由技術」
※Mozilla發布新版Things Gateway 用樹莓派3打造物聯網路由器
※關於 DrayTek Vigor系列路由器跨站請求偽造漏洞的情況通報
※基於 node.js 的自動路由組件-HttpPostman
※Cisco Packet Tracer中配置單臂路由
※Cradlepoint為AT&T提供5G路由器設備
※實現多樓層WiFi無縫漫遊的「奧秘」-網件Orbi分體式MESH路由器
※Cisco路由器Easy VPN的配置
※Verizon BGP路由泄漏,亞馬遜、Facebook及眾多區塊鏈交易所受到影響