NET中解決KafKa多線程發送多主題的問題
一般在KafKa消費程序中消費可以設置多個主題,那在同一程序中需要向KafKa發送不同主題的消息,如異常需要發到異常主題,正常的發送到正常的主題,這時候就需要實例化多個主題,然後逐個發送。
在NET中用RdKafka組件來做消息處理,在Nuget中引用。
在程序中初始化Producer,並創建多個Topic
private string comtopic = "topic1";
private string errtopic = "topic2";
private string kfkip = "192.168.80.32:9092";
Topic topic = null;
Topic errTopic = null;
public ExcuteFlow
{
try
{
Producer producer = new Producer(kfkip);
topic = producer.Topic(comtopic);
errTopic = producer.Topic(errtopic);
}
catch (RdKafkaException ex)
{
LogHelper.Error("KafKa初始化KafKa異常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("KafKa初始化異常", ex);
}
}
在程序中發送其中一個主題:
try
{
if (topic != null)
{
byte datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
Task<DeliveryReport> deliveryReport = topic.Produce(datas);
var unused = deliveryReport.ContinueWith(task =>
{
LogHelper.Info("內容:{flowCommond.ID} 發送到分區:{task.Result.Partition}, Offset 為: {task.Result.Offset}");
});
}
else
{
throw new Exception("發送消息到KafKa topic 為空");
}
}
catch (RdKafkaException ex)
{
LogHelper.Error("發送消息到KafKa KafKa異常", ex);
}
catch (Exception ex)
{
LogHelper.Error("發送消息到KafKa異常", ex);
}
flowCommond為要發送的對象內容,格式化為Json字元串再發送。
另一個主題一樣處理。
這裡實現一個線程裡面發送多個主題,那下面實現多個線程中如何發送多個主題。
多線程中如果每個線程都new Producer(kfkip) 一次,那KafKa的連接很快會被佔滿。
那這裡就用單例模式來解決這個問題,每次要用到Producer時檢查一下是否已經存在Producer實例,若存在則直接用不用再生成。
/// <summary>
/// 單例模式的實現
/// </summary>
public class SingleProduct : Producer
{
// 定義一個靜態變數來保存類的實例
private static SingleProduct uniqueInstance;
// 定義一個標識確保線程同步
private static readonly object locker = new object;
// 定義私有構造函數,使外界不能創建該類實例
private SingleProduct(string brokerList) : base(brokerList)
{
}
/// <summary>
/// 定義公有方法提供一個全局訪問點,同時你也可以定義公有屬性來提供全局訪問點
/// </summary>
/// <returns></returns>
public static SingleProduct GetInstance
{
// 當第一個線程運行到這裡時,此時會對locker對象 "加鎖",
// 當第二個線程運行該方法時,首先檢測到locker對象為"加鎖"狀態,該線程就會掛起等待第一個線程解鎖
// lock語句運行完之後(即線程運行完之後)會對該對象"解鎖"
if (uniqueInstance == null)
{
lock (locker)
{
// 如果類的實例不存在則創建,否則直接返回
if (uniqueInstance == null)
{
string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];
try
{
uniqueInstance = new SingleProduct(kfkip);
LogHelper.Error("單例模式 實例化 SingleProduct");
}
catch (RdKafkaException ex)
{
LogHelper.Error("單例模式 KafKa初始化KafKa異常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("單例模式 KafKa初始化異常", ex);
}
}
}
}
return uniqueInstance;
}
}
然後在初始化的代碼中替換Producer producer = new Producer(kfkip);為 Producer producer = SingleProduct.GetInstance;
OK!以上就完成了多線程多主題的消息發送。


※最大流當前弧優化Dinic分層模板
※VMware workstation虛擬集群實踐(1)——配置集群多節點互信
※ASP.NET Core 源碼學習之 Logging「2」:Configure
TAG:科技優家 |
※Supreme? / RIMOWA 聯乘企劃突擊發送
※Python模擬發送Slack消息
※PHP 使用 phpmailer 發送電子郵件
※SpringBoot中發送QQ郵件
※Verizon發送推送通知給iPhone用戶建議升級到Galaxy S9
※用戶現可用Opera的加密貨幣錢包發送CryptoKitties
※Windows 10 Your Phone應用更新 現可查看和發送簡訊
※Facebook計劃讓Messenger,Instagram和WhatsApp用戶互相發送消息
※ESET新發現了一款用通訊錄發送惡意簡訊的Android勒索軟體
※SpaceX剛剛向國際空間站發送的AI機器人CIMON
※SpaceX將向國際空間站發送新設備:AI機器人CIMON
※php中curl同時發送多個請求curl_multi函數集的用法
※Linux Mint 19「Tara」最早5月上線:承諾不收集/發送隱私數據
※讓 Emacs shell 命令發送桌面通知
※Coinbase向美國客戶發送IRS報稅表1099-K
※Localhost環境下使用Django send_mail發送郵件-以QQ和163郵箱為例
※Facebook Messenger將容對發送的消息進行刪除
※英特爾CPU新漏洞「預兆」(L1TF)|VORACLE攻擊可解密通過VPN發送的HTTP流量
※前端周報:npm鬧烏龍,發送錯誤代碼,Node之父發布「下一代Node」項目
※NASA準備將2個CubeSats微型衛星發送到火星