當前位置:
首頁 > 知識 > NET中解決KafKa多線程發送多主題的問題

NET中解決KafKa多線程發送多主題的問題

一般在KafKa消費程序中消費可以設置多個主題,那在同一程序中需要向KafKa發送不同主題的消息,如異常需要發到異常主題,正常的發送到正常的主題,這時候就需要實例化多個主題,然後逐個發送。

在NET中用RdKafka組件來做消息處理,在Nuget中引用。

NET中解決KafKa多線程發送多主題的問題

在程序中初始化Producer,並創建多個Topic

private string comtopic = "topic1";
private string errtopic = "topic2";
private string kfkip = "192.168.80.32:9092";
Topic topic = null;
Topic errTopic = null;

public ExcuteFlow
{
try
{
Producer producer = new Producer(kfkip);
topic = producer.Topic(comtopic);
errTopic = producer.Topic(errtopic);
}
catch (RdKafkaException ex)
{
LogHelper.Error("KafKa初始化KafKa異常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("KafKa初始化異常", ex);
}

}

在程序中發送其中一個主題:

try
{

if (topic != null)
{
byte datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
Task<DeliveryReport> deliveryReport = topic.Produce(datas);
var unused = deliveryReport.ContinueWith(task =>
{
LogHelper.Info("內容:{flowCommond.ID} 發送到分區:{task.Result.Partition}, Offset 為: {task.Result.Offset}");
});
}
else
{
throw new Exception("發送消息到KafKa topic 為空");
}
}
catch (RdKafkaException ex)
{
LogHelper.Error("發送消息到KafKa KafKa異常", ex);
}
catch (Exception ex)
{
LogHelper.Error("發送消息到KafKa異常", ex);
}

flowCommond為要發送的對象內容,格式化為Json字元串再發送。

另一個主題一樣處理。

這裡實現一個線程裡面發送多個主題,那下面實現多個線程中如何發送多個主題。

多線程中如果每個線程都new Producer(kfkip) 一次,那KafKa的連接很快會被佔滿。

那這裡就用單例模式來解決這個問題,每次要用到Producer時檢查一下是否已經存在Producer實例,若存在則直接用不用再生成。

/// <summary>
/// 單例模式的實現
/// </summary>
public class SingleProduct : Producer
{
// 定義一個靜態變數來保存類的實例
private static SingleProduct uniqueInstance;
// 定義一個標識確保線程同步
private static readonly object locker = new object;
// 定義私有構造函數,使外界不能創建該類實例
private SingleProduct(string brokerList) : base(brokerList)
{
}

/// <summary>
/// 定義公有方法提供一個全局訪問點,同時你也可以定義公有屬性來提供全局訪問點
/// </summary>
/// <returns></returns>
public static SingleProduct GetInstance
{
// 當第一個線程運行到這裡時,此時會對locker對象 "加鎖",
// 當第二個線程運行該方法時,首先檢測到locker對象為"加鎖"狀態,該線程就會掛起等待第一個線程解鎖
// lock語句運行完之後(即線程運行完之後)會對該對象"解鎖"
if (uniqueInstance == null)
{
lock (locker)
{
// 如果類的實例不存在則創建,否則直接返回
if (uniqueInstance == null)
{
string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];

try
{
uniqueInstance = new SingleProduct(kfkip);
LogHelper.Error("單例模式 實例化 SingleProduct");
}
catch (RdKafkaException ex)
{
LogHelper.Error("單例模式 KafKa初始化KafKa異常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("單例模式 KafKa初始化異常", ex);
}
}
}
}

return uniqueInstance;
}
}

然後在初始化的代碼中替換Producer producer = new Producer(kfkip);為 Producer producer = SingleProduct.GetInstance;

OK!以上就完成了多線程多主題的消息發送。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

最大流當前弧優化Dinic分層模板
VMware workstation虛擬集群實踐(1)——配置集群多節點互信
ASP.NET Core 源碼學習之 Logging「2」:Configure

TAG:科技優家 |

您可能感興趣

Supreme? / RIMOWA 聯乘企劃突擊發送
Python模擬發送Slack消息
PHP 使用 phpmailer 發送電子郵件
SpringBoot中發送QQ郵件
Verizon發送推送通知給iPhone用戶建議升級到Galaxy S9
用戶現可用Opera的加密貨幣錢包發送CryptoKitties
Windows 10 Your Phone應用更新 現可查看和發送簡訊
Facebook計劃讓Messenger,Instagram和WhatsApp用戶互相發送消息
ESET新發現了一款用通訊錄發送惡意簡訊的Android勒索軟體
SpaceX剛剛向國際空間站發送的AI機器人CIMON
SpaceX將向國際空間站發送新設備:AI機器人CIMON
php中curl同時發送多個請求curl_multi函數集的用法
Linux Mint 19「Tara」最早5月上線:承諾不收集/發送隱私數據
讓 Emacs shell 命令發送桌面通知
Coinbase向美國客戶發送IRS報稅表1099-K
Localhost環境下使用Django send_mail發送郵件-以QQ和163郵箱為例
Facebook Messenger將容對發送的消息進行刪除
英特爾CPU新漏洞「預兆」(L1TF)|VORACLE攻擊可解密通過VPN發送的HTTP流量
前端周報:npm鬧烏龍,發送錯誤代碼,Node之父發布「下一代Node」項目
NASA準備將2個CubeSats微型衛星發送到火星