當前位置:
首頁 > 知識 > Flink與Spark Streaming在與kafka結合的區別!

Flink與Spark Streaming在與kafka結合的區別!

本文主要是想聊聊flink與kafka結合。當然,單純的介紹flink與kafka的結合呢,比較單調,也沒有可對比性,所以的準備順便幫大家簡單回顧一下Spark Streaming與kafka的結合。

看懂本文的前提是首先要熟悉kafka,然後了解spark Streaming的運行原理及與kafka結合的兩種形式,然後了解flink實時流的原理及與kafka結合的方式。

kafka

kafka作為一個消息隊列,在企業中主要用於緩存數據,當然,也有人用kafka做存儲系統,比如存最近七天的數據。kafka的基本概念請參考:kafka入門介紹

更多kafka的文章請關注浪尖公眾號,閱讀。

首先,我們先看下圖,這是一張生產消息到kafka,從kafka消費消息的結構圖。

當然, 這張圖很簡單,拿這張圖的目的是從中可以得到的跟本節文章有關的消息,有以下兩個:

1,kafka中的消息不是kafka主動去拉去的,而必須有生產者往kafka寫消息。

2,kafka是不會主動往消費者發布消息的,而必須有消費者主動從kafka拉取消息。

spark Streaming結合kafka

Spark Streaming現在在企業中流處理也是用的比較廣泛,但是大家都知道其不是真正的實時處理,而是微批處理。

在spark 1.3以前,SPark Streaming與kafka的結合是基於Receiver方式,顧名思義,我們要啟動1+個Receiver去從kafka裡面拉去數據,拉去的數據會每隔200ms生成一個block,然後在job生成的時候,取出該job處理時間範圍內所有的block,生成blockrdd,然後進入Spark core處理。

自Spark1.3以後,增加了direct Stream API,這種呢,主要特點是去掉了Receiver,在生成job,去取rdd的時候,計算每個partition要取數據的offset範圍,然後生成一個kafkardd,該rdd特點是與kafka的分區是一一對應的。

有上面的特點可以看出,Spark Streaming是要生成rdd,然後進行處理的,rdd數據集我們可以理解為靜態的,然每個批次,都會生成一個rdd,該過程就體現了批處理的特性,由於數據集時間段小,數據小,所以又稱微批處理,那麼就說明不是真正的實時處理。

還有一點,spark Streaming與kafka的結合是不會發現kafka動態增加的topic或者partition。

Spark的詳細教程,請關注浪尖公眾號,查看歷史推文。

Spark Streaming與kafka結合源碼講解,請加入知識星球,獲取。

flink結合kafka

大家都知道flink是真正的實時處理,他是基於事件觸發的機制進行處理,而不是像spark Streaming每隔若干時間段,生成微批數據,然後進行處理。那麼這個時候就有了個疑問,在前面kafka小節中,我們說到了kafka是不會主動往消費者裡面吐數據的,需要消費者主動去拉去數據來處理。那麼flink是如何做到基於事件實時處理kafka的數據呢?在這裡浪尖帶著大家看一下源碼,flink1.5.0為例。

1,flink與kafka結合的demo。

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.disableSysoutLogging

env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))

// create a checkpoint every 5 seconds

env.enableCheckpointing(5000)

// make parameters available in the web interface

env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x

val kafkaConsumer = new FlinkKafkaConsumer010(

params.getRequired("input-topic"),

new SimpleStringSchema,

params.getProperties)

val messageStream = env

.addSource(kafkaConsumer)

.map(in => prefix + in)

// create a Kafka producer for Kafka 0.10.x

val kafkaProducer = new FlinkKafkaProducer010(

params.getRequired("output-topic"),

new SimpleStringSchema,

params.getProperties)

// write data into Kafka

messageStream.addSink(kafkaProducer)

env.execute("Kafka 0.10 Example")

從上面的demo可以看出,數據源的入口就是FlinkKafkaConsumer010,當然這裡面只是簡單的構建了一個對象,並進行了一些配置的初始化,真正source的啟動是在其run方法中run方法的調用過程在這裡不講解,後面會出教程講解。

首先看一下類的繼承關係

public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>

public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>

其中,run方法就在FlinkKafkaConsumerBase里,當然其中open方法裡面對kafka相關內容進行里初始化。

從輸入到計算到輸出完整的計算鏈條的調用過程,後面浪尖會出文章介紹。在這裡只關心flink如何從主動消費數據,然後變成事件處理機制的過程。

由於其FlinkKafkaConsumerBase的run比較長,我這裡只看重要的部分,首先是會創建Kafka09Fetcher。

this.kafkaFetcher = createFetcher(

sourceContext,

subscribedPartitionsToStartOffsets,

periodicWatermarkAssigner,

punctuatedWatermarkAssigner,

(StreamingRuntimeContext) getRuntimeContext(),

offsetCommitMode,

getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),

useMetrics);

接著下面有段神器,flink嚴重優越於Spark Streaming的,代碼如下:

final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();

this.discoveryLoopThread = new Thread(new Runnable() {

@Override

public void run() {

try {

// --------------------- partition discovery loop ---------------------

List<KafkaTopicPartition> discoveredPartitions;

// throughout the loop, we always eagerly check if we are still running before

// performing the next operation, so that we can escape the loop as soon as possible

while (running) {

if (LOG.isDebugEnabled()) {

LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());

}

try {

discoveredPartitions = partitionDiscoverer.discoverPartitions();

} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {

// the partition discoverer may have been closed or woken up before or during the discovery;

// this would only happen if the consumer was canceled; simply escape the loop

break;

}

// no need to add the discovered partitions if we were closed during the meantime

if (running && !discoveredPartitions.isEmpty()) {

kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);

}

// do not waste any time sleeping if we"re not running anymore

if (running && discoveryIntervalMillis != 0) {

try {

Thread.sleep(discoveryIntervalMillis);

} catch (InterruptedException iex) {

// may be interrupted if the consumer was canceled midway; simply escape the loop

break;

}

}

}

} catch (Exception e) {

discoveryLoopErrorRef.set(e);

} finally {

// calling cancel will also let the fetcher loop escape

// (if not running, cancel() was already called)

if (running) {

cancel();

}

}

}

}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

它定義了一個線程池對象,去動態發現kafka新增的topic(支持正則形式指定消費的topic),或者動態發現kafka新增的分區。

接著肯定是啟動動態發現分區或者topic線程,並且啟動kafkaFetcher。

discoveryLoopThread.start();

kafkaFetcher.runFetchLoop();

// --------------------------------------------------------------------

// make sure that the partition discoverer is properly closed

partitionDiscoverer.close();

discoveryLoopThread.join();

接著,我們進入kafkaFetcher的runFetchLoop方法,映入眼帘的是

// kick off the actual Kafka consumer

consumerThread.start();

這個線程是在構建kafka09Fetcher的時候創建的

this.consumerThread = new KafkaConsumerThread(

LOG,

handover,

kafkaProperties,

unassignedPartitionsQueue,

createCallBridge(),

getFetcherName() + " for " + taskNameWithSubtasks,

pollTimeout,

useMetrics,

consumerMetricGroup,

subtaskMetricGroup);

KafkaConsumerThread 繼承自Thread,然後在其run方法里,首先看到的是

// this is the means to talk to FlinkKafkaConsumer"s main thread

final Handover handover = this.handover;

這個handover的作用呢暫且不提,接著分析run方法裡面內容

1,獲取消費者

try {

this.consumer = getConsumer(kafkaProperties);

}

2,檢測分區並且會重分配新增的分區

try {

if (hasAssignedPartitions) {

newPartitions = unassignedPartitionsQueue.pollBatch();

}

else {

// if no assigned partitions block until we get at least one

// instead of hot spinning this loop. We rely on a fact that

// unassignedPartitionsQueue will be closed on a shutdown, so

// we don"t block indefinitely

newPartitions = unassignedPartitionsQueue.getBatchBlocking();

}

if (newPartitions != null) {

reassignPartitions(newPartitions);

}

3,消費數據

// get the next batch of records, unless we did not manage to hand the old batch over

if (records == null) {

try {

records = consumer.poll(pollTimeout);

}

catch (WakeupException we) {

continue;

}

}

4,通過handover將數據發出去

try {

handover.produce(records);

records = null;

}

由於被kafkaConsumerThread打斷了kafkaFetcher的runFetchLoop方法的分析,我們在這裡繼續

1,拉取handover.producer生產的數據

while (running) {

// this blocks until we get the next records

// it automatically re-throws exceptions encountered in the consumer thread

final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

2,數據格式整理,並將數據整理好後,逐個Record發送,將循環主動批量拉取kafka數據,轉化為事件觸發。

// get the records for each topic partition

for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

List<ConsumerRecord<byte[], byte[]>> partitionRecords =

records.records(partition.getKafkaPartitionHandle());

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {

final T value = deserializer.deserialize(

record.key(), record.value(),

record.topic(), record.partition(), record.offset());

if (deserializer.isEndOfStream(value)) {

// end of stream signaled

running = false;

break;

}

// emit the actual record. this also updates offset state atomically

// and deals with timestamps and watermark generation

emitRecord(value, partition, record.offset(), record);

}

}

肯定會注意到這行代碼emitRecord(value, partition, record.offset(), record);,從這裡開始flink變成事件觸發的流引擎。

handover-樞紐

handover是在構建kafkaFetcher的時候構建的

this.handover = new Handover();

handover是一個工具,將一組數據或者異常從生產者線程傳輸到消費者線程。它高效的扮演了一個阻塞隊列的特性。該類運行於flink kafka consumer,用來在kafkaConsumer 類和主線程之間轉移數據和異常。

handover有兩個重要方法,分別是:

1,producer

producer是將kafkaConusmer獲取的數據發送出去,在KafkaConsumerThread中調用。代碼如上

2,pollnext

從handover裡面拉去下一條數據,會阻塞的,行為很像是從一個阻塞隊列裡面拉去數據。

綜述

kafkaConsumer批量拉去數據,flink將其經過整理之後變成,逐個Record發送的事件觸髮式的流處理。這就是flink與kafka結合事件觸發時流處理的基本思路。

重要的事情再說一遍:Flink支持動態發現新增topic或者新增partition哦。具體實現思路,前面有代碼為證,後面會對比spark Streaming的這塊(不支持動態發現新增kafka topic或者partition),來詳細講解。

Flink與Spark Streaming在與kafka結合的區別!

打開今日頭條,查看更多精彩圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

推薦 9 個樣式化組件的 React UI 庫
spark中dataFrame的一些方法回顧

TAG:程序員小新人學習 |