當前位置:
首頁 > 知識 > Elasticsearch 與 Kafka 整合剖析

Elasticsearch 與 Kafka 整合剖析


1.概述

目前,隨著大數據的浪潮,Kafka 被越來越多的企業所認可,如今的Kafka已發展到0.10.x,其優秀的特性也帶給我們解決實際業務的方案。對於數據分流來說,既可以分流到離線存儲平台(HDFS),離線計算平台(Hive倉庫),也可以分流實時流水計算(Storm,Spark)等,同樣也可以分流到海量數據查詢(HBase),或是及時查詢(ElasticSearch)。而今天筆者給大家分享的就是Kafka 分流數據到 ElasticSearch。


2.內容

我們知道,ElasticSearch是有其自己的套件的,簡稱ELK,即ElasticSearch,Logstash以及Kibana。ElasticSearch負責存儲,Logstash負責收集數據來源,Kibana負責可視化數據,分工明確。想要分流Kafka中的消息數據,可以使用Logstash的插件直接消費,但是需要我們編寫複雜的過濾條件,和特殊的映射處理,比如系統保留的`_uid`欄位等需要我們額外的轉化。今天我們使用另外一種方式來處理數據,使用Kafka的消費API和ES的存儲API來處理分流數據。通過編寫Kafka消費者,消費對應的業務數據,將消費的數據通過ES存儲API,通過創建對應的索引的,存儲到ES中。其流程如下圖所示:

Elasticsearch 與 Kafka 整合剖析

3.實現

下面,我們開始進行實現細節處理,這裡給大家提供實現的核心代碼部分,實現代碼如下所示:

3.1 定義ES格式

我們以插件的形式進行消費,從Kafka到ES的數據流向,只需要定義插件格式,如下所示:

{
"job": {
"content": {
"reader": {
"name": "kafka",
"parameter": {
"topic": "kafka_es_client_error",
"groupid": "es2",
"bootstrapServers": "k1:9094,k2:9094,k3:9094"
},
"threads": 6
},
"writer": {
"name": "es",
"parameter": {
"host": [
"es1:9300,es2:9300,es3:9300"
],
"index": "client_error_%s",
"type": "client_error"
}
}
}
}
}

這裡處理消費存儲的方式,將讀和寫的源分開,配置各自屬性即可。

3.2 數據存儲

這裡,我們通過每天建立索引進行存儲,便於業務查詢,實現細節如下所示:

public class EsProducer {

private final static Logger LOG = LoggerFactory.getLogger(EsProducer.class);
private final KafkaConsumer consumer;
private ExecutorService executorService;
private Configuration conf = null;
private static int counter = 0;

public EsProducer {
String root = System.getProperty("user.dir") + "/conf/";
String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");
conf = Configuration.from(new File(root + path));
Properties props = new Properties;
props.put("bootstrap.servers", conf.getString("job.content.reader.parameter.bootstrapServers"));
props.put("group.id", conf.getString("job.content.reader.parameter.groupid"));
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(conf.getString("job.content.reader.parameter.topic")));
}

public void execute {
executorService = Executors.newFixedThreadPool(conf.getInt("job.content.reader.threads"));
while (true) {
ConsumerRecords records = consumer.poll(100);
if (null != records) {
executorService.submit(new KafkaConsumerThread(records, consumer));
}
}
}

public void shutdown {
try {
if (consumer != null) {
consumer.close;
}
if (executorService != null) {
executorService.shutdown;
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.error("Shutdown kafka consumer thread timeout.");
}
} catch (InterruptedException ignored) {
Thread.currentThread.interrupt;
}
}

class KafkaConsumerThread implements Runnable {

private ConsumerRecords records;

public KafkaConsumerThread(ConsumerRecords records, KafkaConsumer consumer) {
this.records = records;
}

@Override
public void run {
String index = conf.getString("job.content.writer.parameter.index");
String type = conf.getString("job.content.writer.parameter.type");
for (TopicPartition partition : records.partitions) {
List> partitionRecords = records.records(partition);
for (ConsumerRecord record : partitionRecords) {
JSONObject json = JSON.parseObject(record.value);
List> list = new ArrayList<>;
Map map = new HashMap<>;
index = String.format(index, CalendarUtils.timeSpan2EsDay(json.getLongValue("_tm") * 1000L));

if (counter < 10) { LOG.info("Index : " + index); counter++; } for (String key : json.keySet) { if ("_uid".equals(key)) { map.put("uid", json.get(key)); } else { map.put(key, json.get(key)); } list.add(map); } EsUtils.write2Es(index, type, list); } } } } }

這裡消費的數據源就處理好了,接下來,開始ES的存儲,實現代碼如下所示:

public class EsUtils {

private static TransportClient client = null;

static {
if (client == null) {
client = new PreBuiltTransportClient(Settings.EMPTY);
}
String root = System.getProperty("user.dir") + "/conf/";
String path = SystemConfigUtils.getProperty("kafka.x.plugins.exec.path");
Configuration conf = Configuration.from(new File(root + path));
List hosts = conf.getList("job.content.writer.parameter.host");
for (Object object : hosts) {
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(object.toString.split(":")[0]), Integer.parseInt(object.toString.split(":")[1])));
} catch (Exception e) {
e.printStackTrace;
}
}
}

public static void write2Es(String index, String type, List> dataSets) {

BulkRequestBuilder bulkRequest = client.prepareBulk;
for (Map dataSet : dataSets) {
bulkRequest.add(client.prepareIndex(index, type).setSource(dataSet));
}

bulkRequest.execute.actionGet;
// if (client != null) {
// client.close;
// }
}

public static void close {
if (client != null) {
client.close;
}
}
}

這裡,我們利用BulkRequestBuilder進行批量寫入,減少頻繁寫入率。


4.調度

存儲在ES中的數據,如果不需要長期存儲,比如:我們只需要存儲及時查詢數據一個月,對於一個月以前的數據需要清除掉。這裡,我們可以編寫腳本直接使用Crontab來進行簡單調用即可,腳本如下所示:

#!/bin/sh
# : ./delete_es_by_day.sh kafka_error_client logsdate 30
echo ": ./delete_es_by_day.sh kafka_error_client logsdate 30 "

index_name=$1
daycolumn=$2
savedays=$3
format_day=$4

if [ ! -n "$savedays" ]; then
echo "Oops. The args is not right,please input again...."
exit 1
fi

if [ ! -n "$format_day" ]; then
format_day="%Y%m%d"
fi

sevendayago=`date -d "-${savedays} day " +${format_day}`

curl -XDELETE "es1:9200/${index_name}/_query?pretty" -d "
{
"query": {
"filtered": {
"filter": {
"bool": {
"must": {
"range": {
"${daycolumn}": {
"from": null,
"to": ${sevendayago},
"include_lower": true,
"include_upper": true
}
}
}
}
}
}
}
}"

echo "Finished."

然後,在Crontab中進行定時調度即可。


5.總結

這裡,我們在進行數據寫入ES的時候,需要注意,有些欄位是ES保留欄位,比如`_uid`,這裡我們需要轉化,不然寫到ES的時候,會引發衝突導致異常,最終寫入失敗。

6.結束語

這篇博客就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

8 面向對象之抽象類+介面+內部類
Docker-compose實戰——Django+PostgreSQL
node.js入門系列(一)——Node.js簡介
HTTP權威指南-HTTP報文

TAG:科技優家 |

您可能感興趣

Facebook計劃整合Messenger、WhatsApp和Instagram,但今年內無望
消息稱 Facebook 計劃整合 WhatsApp、Instagram 和 Messenger 的聊天功能
消息稱 Facebook 計劃整合 WhatsApp、Instagram和Messenger 的聊天功能
Spring整合Hibernate.Final
mybatis與spring整合:Dao層映射配置
node整合webstorm
Spring Cloud Alibaba Sentinel 整合 Feign 的設計實現
Microsoft To-Do新功能確認:Cortana整合、macOS應用
Mybatis與Spring的整合
Springboot2.0 Theamleaf Security整合快速入門
財經詞條 Vol.43 Backward Integration 上游整合
Spring4+hibernate+SpringMvc整合
spring boot 整合shiro 錯誤
Spring Boot 2.1.X整合最新版本Elasticsearch的相關問題
rabbitMQ系列高級整合應用rabbitTemplate
spring-boot 之 使員Druid 整合Mybatis 最簡配置多數據源
Android Messages將整合Google Assistant功能,幫助你約會成功
Facebook將推出PyTorch 1.0,整合Caffe2+PyTorch
Ford整合自動駕駛業務與資源,成立Ford Autonomous Vehicles
雅高集團整合旗下奢華酒店 Fairmont、Raffles和Swiss?tel 忠誠會員計劃