當前位置:
首頁 > 知識 > 一種基於kafka+storm實現的日誌記錄方法(二)

一種基於kafka+storm實現的日誌記錄方法(二)

引言

上一篇分享博文《一種基於kafka+storm實現的日誌記錄方法》,講述了一種基於大數據實時運算實現的日誌記錄方式。在文中只是提出了一種技術實現思路,以及整體架構,並且在我所在的項目中已經進行了實踐,感興趣的朋友,可以進一步完善,比如添加許可權等,實現一種新日誌平台的搭建。

博文發布後,有網友留言希望公開部分源碼。今天準備整理下我們已經實現的代碼,去掉公司業務部分,做一個簡單share,以回應網友要求。本文不再對整體實現流程進行講解,感興趣的朋友請直接前往上一遍博文。

代碼實現主要分兩部分:第一部分是java客戶端往kafka寫日誌消息(生產者);第二部分是storm消費kafka日誌消息,歸類,批量寫入hbase。從hbase查詢日誌部分比較簡單,代碼就不提供了。

由於這周末還要準備一個「晉陞答辯」,本次分享只整理出來第一部分「java客戶端往kafka寫日誌消息」。

Java寫日誌消息到kafka

我實現的第一版發送日誌消息到kafka是復用的「點擊流」日誌上報流程,即用nginx+lua實現的http介面,往kafka寫消息,當然也有採用nginx+go語言實現的。這種方式適用於做頁面埋點,當用戶瀏覽頁面產生點擊操作時調用該http介面,往kafka寫日誌,當時主要是想通過這種方式實現點擊熱力圖、注意力熱圖等。這種方式實現的http介面性能相當優異,而且在支持高並發、高吞吐量方面表現優異,現在在各大電商網站廣泛的運用,用戶收集用戶的行為數據,這些都是做大數據計算、分析、以及智能推薦的基礎。

好吧不扯遠了,後面有時間再分享下我們做大數據實時計算、以及智能推薦相關實現。既然這種nginx+lua+kafka的方式實現的http介面能支持每天海量的「點擊流」日誌上報,那它同樣能滿足「伺服器」端的日誌記錄,而且這點日誌量對於該http介面來說簡直毫無壓力。我的第一版實現很簡單,直接在java服務端適用httpclient構造http請求,調用該http介面進行「伺服器」端的日誌上報。而且正如料想的一樣,毫無壓力。

這僅僅是我們的第一次嘗試,但每次列印日誌都需要調用一個http介面,我還是覺得很彆扭,而且http介面還是有一定的網路開銷。既然這種方式可行,那就可以放棄http介面,直接在java應用伺服器端直連kafka發送日誌消息,如果是http介面還有一點網路開銷的話(10ms-50ms),這種方式對「應用伺服器」來說毫無感知(1-2ms),這也是我想要的效果,畢竟只是列印一條日誌。我把這個想法告訴我的同事「丹哥」(外號甄子丹),最後把這部分代碼實現做成一個jar包,在需要採用這種方式列印日誌的系統引入這個jar包,再做一些配置即可。

核心代碼講解

下面我們來看下該jar包的核心代碼LogCollectorClient類:

@Component
public class LogCollectorClient {
private static final Log log = LogFactory.getLog(LogCollectorClient.class);

//kafka生產者(京東對kafka做了一些簡單封裝,簡稱JDQ)
private JDQProducerClient<String, byte> producer = null;

private boolean HASAUTH = false;

//每一批日誌量,批量上報日誌使用
protected int OFFSET = 500;

//spring 讀取properties配置文件
@Resource
private Environment env;

//初始化方法
@PostConstruct
private void init {
try {

//step1:連接kafka許可權驗證,公司對kafka做的許可權封裝,可以根據自己公司kafka具體情況調整
Authentication e = new Authentication(env.getProperty("kafka_key"), env.getProperty("test_token"));//開發、測試環境kafka

//step2 設置kafka生成者相關配置屬性
Properties pros = new Properties;
pros.setProperty("partitioner.class", env.getProperty("partitioner"));//指定分片策略
pros.setProperty("producer.type", env.getProperty("producer.type"));
pros.setProperty("compression.codec", env.getProperty("compression.codec"));
pros.setProperty("request.required.acks", env.getProperty("request.required.acks"));

//step3 初始化kafka生產者客戶端
this.producer = new JDQProducerClient(e, pros);
} catch (Exception var4) {
log.info("kafaka鑒權初始化失敗!");
}

}

/**
* 上報一條單條日誌
* @param key
* @param type
* @param logMap
* @throws JDQOverSpeedException
* @throws JDQException
*/
public void sendLogInfo(String key, String type, Map<String, String> logMap) throws JDQOverSpeedException, JDQException {
if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(type) && null != logMap && !logMap.isEmpty) {
this.producer.send(new JDQMessage(key + "_" + type, this.assembleJsonStr(key, type, logMap).getBytes));
}

}

/**
* 轉換成json格式上報
* @param key
* @param type
* @param logMap
* @return
*/
private String assembleJsonStr(String key, String type, Map<String, String> logMap) {
StringBuffer valueStr = new StringBuffer;
Iterator logInfo = logMap.entrySet.iterator;

while(logInfo.hasNext) {
Map.Entry entry = (Map.Entry)logInfo.next;
valueStr.append((StringUtils.isNotBlank((String)entry.getKey)?((String)entry.getKey).replaceAll("&", " "):"") + "=").append(StringUtils.isNotBlank((String)entry.getValue)?((String)entry.getValue).replaceAll("&", " "):"").append("&");
}

LogAssembleInfo logInfo1 = new LogAssembleInfo("key=" + key + "&type=" + type + "&" + valueStr.toString, DateUtil.getTime);
return JsonUtil.write2JsonStr(logInfo1);
}

@PreDestroy
private void destroy {
if(null != this.producer) {
this.producer.close;
}
}
}

這個類其實很簡單,說明如下:

1、採用@Component註解,說明只是一個簡單的spring 單例 bean,spring容器啟動時注入到容器中。

2、@PostConstruct 註解的init方法,bean初始化時,就會初始化一個kafka生產者對象,我們公司kafka團隊對kafka做了簡單的封裝 JDQProducerClient本質上對應的是kafka的kafka.javaapi.producer.Producer。如果你使用的原生kafka,生產者的初始化方法如下:

public static void main(String[] args) throws Exception {

Properties prop = new Properties;

prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");

prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");

prop.put("serializer.class", StringEncoder.class.getName);

Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));

int i = 0;

while(true){

producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));

Thread.sleep(1000);

}

}

3、關於kafka初始化的相關配置信息放到一個properties的配置文件中,通過spring的Environment環境上下文對象的getProperty方法獲取與連接kafka所以的配置。

4、採用@PreDestroy註解的destroy方法,這裡是應用伺服器tomcat停止之前,優雅的自動關閉kafka連接。

5、最後來看下日誌上報方法sendLogInfo,在需要上報日誌的類中注入LogCollectorClient對象即可,如下:


@Component
public class TestService {

@Resource
private LogCollectorClient log;

public void publish{
//省略業務代碼

//開始上報日誌,日誌內容放到一個map里
Map<String,String> param = new HashMap<String, String>;
param.put("time", DateTimeUtils.getDateTime);
param.put("logs", "xxx發布活動");
log.sendLogInfo(pageId, SystemConstant.APP_ID, param);
}
}

sendLogInfo日誌上報方法需要三個參數:

第一個是查詢key, hbase日誌表中rowkey構成部分。比如:這裡的pageId,發布頁面的id。

第二個是系統id,用於區分hbase日誌表,每個系統對應一個固定的常量。

第三個是需要答應的日誌內容,考慮到列印的日誌可能比較多,這裡用一個map存放,也可以改為一個String。

好了,關於java先kafka上報日誌的核心類LogCollectorClient講解完畢。正如前面所說,把LogCollectorClient類打成一個jar包,在需要日誌列印的應用系統里引入這個jar包,以及一個kafka的properties配置文件即可。

當然,你可以把kafka需要的配置當做常量寫死在jar包中的一個常量類中,這樣應用系統只需要一個jar包即可。

優化

最後你還可以對上述LogCollectorClient類做一些優化:

1、比如加一個線程池,把上報日誌改為非同步上報,在線程中處理kafka·異常。這樣即便kafka·出現問題時,也不會影響正常業務,唯一影響的就是日誌會丟失。

2、另外如果你的日誌量很大,你還可以採用kafka·的批量上報,當日誌量達到一定條數後 才調用一次producer的sender方法。

3、也許你已經發現了這裡上報的日誌內容是json格式,為了更加高效你改為pb格式。

最後需要說明的是:理論上這種日誌記錄方式可以完全代替傳統的日誌列印到文件的方式,比如Log4j。但是沒有必要,個人覺得一些無關緊要的調試日誌還是使用Log4j,對於一些敏感日誌或者重要的流水日誌,採用這種方式。 Log4j列印日誌更簡單,基於kafka+storm的更加安全、永久存放、日誌更集中(一張hbase表中),二者結合使用天衣無縫。

關於第一部分「java客戶端往kafka寫日誌消息(生產者)」就這裡,由於下周還有一個「晉陞答辯」需要準備,預祝自己這次晉陞能成功吧。第二部分「storm消費kafka日誌消息放到hbase」只能緩幾天,才能整理出來啦,忘諒解。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

使用Spring boot + jQuery上傳文件(kotlin)
一步一步學Vue(四)
誰將新樽辭舊月,今月曾經照古人
python實現發郵件

TAG:達人科技 |

您可能感興趣

有一種Strong,叫Marathon Strong
有一種時髦叫made in Thailand!
另一種選擇 小眾Woolrich Wool Patrol Down 經典 Parka 鵝絨大衣
蜘蛛Rescue Assist&Atlantic Salt橫向測評:同時擁有是一種什麼樣的體驗?
Roaming:一種安全的實現
一種叫Boogie的病 無葯可解 魔王John Lee Hooker特輯!
TeleRAT:一種利用 Telegram 秘密竊取數據的新型 Android 木馬
有一種良師益友叫Balenciaga與Givenchy
Music Never Die——有一種甜美叫王心凌
Android系統中發現了一種新的竊聽病毒-RedDrop
《黑豹》總是給人一種made in China的既視感
Less is more,有一種魅力叫現代簡約!
終於,谷歌發布了一種新量子處理器Bristlecone
Jürgen Mayer H:建築也是一種冒險
PanoMoments是一種沉浸式即時敘事媒介,被稱為「VR的GIF動圖」
谷歌正在測試一種名為「公告」——Bulletin的工具
一種基於TensorFlow的廣告異常流量檢測策略
有一種迷失的美感:來自比利時藝術家 Kaatje Vermeire 繪畫作品
自在也是一種選擇:蘇梅島萬麗酒店 Renaissance
用上 Razer Phone 是一種什麼樣的體驗?