當前位置:
首頁 > 最新 > RabbitMQ 實戰教程 路由

RabbitMQ 實戰教程 路由

在上一個教程中,我們構建了一個簡單的日誌記錄系統。我們能夠向許多消費者廣播日誌消息。

在本教程中,我們將為其添加一個功能 - 我們將可以僅訂閱一部分消息。例如,我們將能夠僅將關鍵的錯誤消息寫入到日誌文件(以節省磁碟空間),同時仍然能夠在控制台上列印所有日誌消息。

翻譯自「RabbitMQ Tutorials」,部分內容增減。

原文地址 :https://www.rabbitmq.com/tutorials/tutorial-four-java.html

綁定(Bindings)

在上一個教程中,我們已經使用過綁定。你可能會記得如下代碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");

綁定是建立交換器和隊列之間的關係。這可以簡單地理解:隊列對該交換器上的消息感興趣。

為了避免與 basicPublish 方法的參數混淆,我們將其稱為綁定鍵。下面是我們如何用一個綁定鍵創建一個綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

綁定鍵的意義依賴於交換器的類型。以前我們以前使用的 fanout 類型的交換器可以忽略此參數。

直接交換(Direct exchange)

我們從上一個教程的日誌記錄系統向所有消費者廣播所有消息。現在,我們需要一個將錯誤日誌消息寫入磁碟,而不會把硬碟空間浪費警告或消息類型日誌消息上。

其中,第一個隊列與綁定鍵 orange 綁定,第二個隊列有兩個綁定,一個綁定鍵為 black,另一個綁定為 green。在這樣的設置中,具有 orange 的交換器的消息將被路由到隊列 Q1。具有 black 或 green 的交換器的消息將轉到 Q2。所有其他消息將被丟棄。

多重綁定(Multiple bindings)

此外,使用相同的綁定鍵綁定多個隊列是完全合法的。在我們的示例中,我們可以在 X 和 Q1 之間添加綁定鍵 black。在這種情況下,direct 類型的交換器將消息廣播到所有匹配的隊列 Q1 和 Q2。

發送日誌(Emitting logs)

我們將使用 direct 類型的交換器進行日誌記錄系統。

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

現在,我們準備發送一條消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

為了簡化代碼,我們假定 『severity』 是 『info』, 『warning』, 『error』 中的一個。

訂閱(Subscribing)

接收消息將像上一個教程一樣工作,除了一個例外 - 我們給我們所感興趣的嚴重性類型的日誌創建一個綁定。

String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }案例實戰

發送端

發送端,連接到 RabbitMQ,發送一條數據,然後退出。

public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"}; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 發送消息 for (int i = 0; i

接受端,不斷等待伺服器推送消息,然後在控制台輸出。

public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"}; public static void main(String[] args) throws IOException, TimeoutException { // 創建連接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 創建一個連接 Connection connection = factory.newConnection(); // 創建一個通道 Channel channel = connection.createChannel(); // 指定一個交換器 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 設置日誌級別 int rand = new Random().nextInt(3); String severity = LOG_LEVEL_ARR[rand]; // 創建一個非持久的、唯一的、自動刪除的隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定交換器和隊列 // queueBind(String queue, String exchange, String routingKey) // 參數1 queue :隊列名 // 參數2 exchange :交換器名 // 參數3 routingKey :路由鍵名 channel.queueBind(queueName, EXCHANGE_NAME, severity); // 創建隊列消費者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received " + message + " "); } }; channel.basicConsume(queueName, true, consumer); } }

現在,做一個實驗,我們開啟三個 ReceiveLogsDirect 工作程序:ReceiveLogsDirect1 、ReceiveLogsDirect2 與 ReceiveLogsDirect3。

ReceiveLogsDirect1

ReceiveLogsDirect2

[*] Waiting for messages. To exit press CTRL+C [*] LOG LEVEL : error [x] Received Liang-MSG log : [error]493dce2a-7ce1-4111-953c-99ab2564a2d0 [x] Received Liang-MSG log : [error]2446dd80-d5f0-4d39-888f-31579b9d2724 [x] Received Liang-MSG log : [error]fe8219e0-5548-40ba-9810-d922d1b03dd8 [x] Received Liang-MSG log : [error]797b6d0e-9928-4505-9c76-56043322b1f0

ReceiveLogsDirect3

[*] Waiting for messages. To exit press CTRL+C [*] LOG LEVEL : debug [x] Received Liang-MSG log : [debug]c05eee3e-b820-4b69-9c3f-c2bbded85195 [x] Received Liang-MSG log : [debug]4645c9ba-4070-41d7-adc9-7f8b2df1e3c8 [x] Received Liang-MSG log : [debug]d3d3ad5c-8f97-49ea-8fd6-c434790e40eb

此時,ReceiveLogsDirect1 、ReceiveLogsDirect2 與 ReceiveLogsDirect3 同時收到了屬於自己級別的消息。

(完)

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 推酷 的精彩文章:

AtomicInteger 與樂觀鎖
EA 商業遊戲新作 NBA LIVE 2018 正式出爐,籃球遊戲又迎來血戰
原來,中國的設計師一直缺一個像樣的協同工具
13 年來,我寫了這些糟糕的遊戲代碼

TAG:推酷 |

您可能感興趣

Apple停止使用AirPort WiFi路由器
網關 Spring-Cloud-Gateway 源碼解析——路由之RouteDefinitionLocator一覽
Spring Cloud Gateway的After路由斷言工廠
PCIe掃盲——TLP路由之Implicit Routing
Roaming Mantis:通過Wi-Fi路由器感染智能手機
使用Istio控制Serverless架構Fn Project中的函數間流量路由
Wi-Fi EasyMesh協議讓路由器變身網狀網
Android項目解耦-路由框架ARouter源碼解析
Kotlin打造Android路由框架
摸索:Istio 路由規則 Alpha v3
使用 Quagga 實現 Linux 動態路由
OpenFlow下的「路由技術」
Mozilla發布新版Things Gateway 用樹莓派3打造物聯網路由器
關於 DrayTek Vigor系列路由器跨站請求偽造漏洞的情況通報
基於 node.js 的自動路由組件-HttpPostman
Cisco Packet Tracer中配置單臂路由
Cradlepoint為AT&T提供5G路由器設備
實現多樓層WiFi無縫漫遊的「奧秘」-網件Orbi分體式MESH路由器
Cisco路由器Easy VPN的配置
Verizon BGP路由泄漏,亞馬遜、Facebook及眾多區塊鏈交易所受到影響