Flink與Spark Streaming在與kafka結合的區別!
本文主要是想聊聊flink與kafka結合。當然,單純的介紹flink與kafka的結合呢,比較單調,也沒有可對比性,所以的準備順便幫大家簡單回顧一下Spark Streaming與kafka的結合。
看懂本文的前提是首先要熟悉kafka,然後了解spark Streaming的運行原理及與kafka結合的兩種形式,然後了解flink實時流的原理及與kafka結合的方式。
kafka
kafka作為一個消息隊列,在企業中主要用於緩存數據,當然,也有人用kafka做存儲系統,比如存最近七天的數據。kafka的基本概念請參考:kafka入門介紹
更多kafka的文章請關注浪尖公眾號,閱讀。
首先,我們先看下圖,這是一張生產消息到kafka,從kafka消費消息的結構圖。
當然, 這張圖很簡單,拿這張圖的目的是從中可以得到的跟本節文章有關的消息,有以下兩個:
1,kafka中的消息不是kafka主動去拉去的,而必須有生產者往kafka寫消息。
2,kafka是不會主動往消費者發布消息的,而必須有消費者主動從kafka拉取消息。
spark Streaming結合kafka
Spark Streaming現在在企業中流處理也是用的比較廣泛,但是大家都知道其不是真正的實時處理,而是微批處理。
在spark 1.3以前,SPark Streaming與kafka的結合是基於Receiver方式,顧名思義,我們要啟動1+個Receiver去從kafka裡面拉去數據,拉去的數據會每隔200ms生成一個block,然後在job生成的時候,取出該job處理時間範圍內所有的block,生成blockrdd,然後進入Spark core處理。
自Spark1.3以後,增加了direct Stream API,這種呢,主要特點是去掉了Receiver,在生成job,去取rdd的時候,計算每個partition要取數據的offset範圍,然後生成一個kafkardd,該rdd特點是與kafka的分區是一一對應的。
有上面的特點可以看出,Spark Streaming是要生成rdd,然後進行處理的,rdd數據集我們可以理解為靜態的,然每個批次,都會生成一個rdd,該過程就體現了批處理的特性,由於數據集時間段小,數據小,所以又稱微批處理,那麼就說明不是真正的實時處理。
還有一點,spark Streaming與kafka的結合是不會發現kafka動態增加的topic或者partition。
Spark的詳細教程,請關注浪尖公眾號,查看歷史推文。
Spark Streaming與kafka結合源碼講解,請加入知識星球,獲取。
flink結合kafka
大家都知道flink是真正的實時處理,他是基於事件觸發的機制進行處理,而不是像spark Streaming每隔若干時間段,生成微批數據,然後進行處理。那麼這個時候就有了個疑問,在前面kafka小節中,我們說到了kafka是不會主動往消費者裡面吐數據的,需要消費者主動去拉去數據來處理。那麼flink是如何做到基於事件實時處理kafka的數據呢?在這裡浪尖帶著大家看一下源碼,flink1.5.0為例。
1,flink與kafka結合的demo。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)
val messageStream = env
.addSource(kafkaConsumer)
.map(in => prefix + in)
// create a Kafka producer for Kafka 0.10.x
val kafkaProducer = new FlinkKafkaProducer010(
params.getRequired("output-topic"),
new SimpleStringSchema,
params.getProperties)
// write data into Kafka
messageStream.addSink(kafkaProducer)
env.execute("Kafka 0.10 Example")
從上面的demo可以看出,數據源的入口就是FlinkKafkaConsumer010,當然這裡面只是簡單的構建了一個對象,並進行了一些配置的初始化,真正source的啟動是在其run方法中run方法的調用過程在這裡不講解,後面會出教程講解。
首先看一下類的繼承關係
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
其中,run方法就在FlinkKafkaConsumerBase里,當然其中open方法裡面對kafka相關內容進行里初始化。
從輸入到計算到輸出完整的計算鏈條的調用過程,後面浪尖會出文章介紹。在這裡只關心flink如何從主動消費數據,然後變成事件處理機制的過程。
由於其FlinkKafkaConsumerBase的run比較長,我這裡只看重要的部分,首先是會創建Kafka09Fetcher。
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
接著下面有段神器,flink嚴重優越於Spark Streaming的,代碼如下:
final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
this.discoveryLoopThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// --------------------- partition discovery loop ---------------------
List<KafkaTopicPartition> discoveredPartitions;
// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}
try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we"re not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
它定義了一個線程池對象,去動態發現kafka新增的topic(支持正則形式指定消費的topic),或者動態發現kafka新增的分區。
接著肯定是啟動動態發現分區或者topic線程,並且啟動kafkaFetcher。
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();
// --------------------------------------------------------------------
// make sure that the partition discoverer is properly closed
partitionDiscoverer.close();
discoveryLoopThread.join();
接著,我們進入kafkaFetcher的runFetchLoop方法,映入眼帘的是
// kick off the actual Kafka consumer
consumerThread.start();
這個線程是在構建kafka09Fetcher的時候創建的
this.consumerThread = new KafkaConsumerThread(
LOG,
handover,
kafkaProperties,
unassignedPartitionsQueue,
createCallBridge(),
getFetcherName() + " for " + taskNameWithSubtasks,
pollTimeout,
useMetrics,
consumerMetricGroup,
subtaskMetricGroup);
KafkaConsumerThread 繼承自Thread,然後在其run方法里,首先看到的是
// this is the means to talk to FlinkKafkaConsumer"s main thread
final Handover handover = this.handover;
這個handover的作用呢暫且不提,接著分析run方法裡面內容
1,獲取消費者
try {
this.consumer = getConsumer(kafkaProperties);
}
2,檢測分區並且會重分配新增的分區
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don"t block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
reassignPartitions(newPartitions);
}
3,消費數據
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
4,通過handover將數據發出去
try {
handover.produce(records);
records = null;
}
由於被kafkaConsumerThread打斷了kafkaFetcher的runFetchLoop方法的分析,我們在這裡繼續
1,拉取handover.producer生產的數據
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
2,數據格式整理,並將數據整理好後,逐個Record發送,將循環主動批量拉取kafka數據,轉化為事件觸發。
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
}
肯定會注意到這行代碼emitRecord(value, partition, record.offset(), record);,從這裡開始flink變成事件觸發的流引擎。
handover-樞紐
handover是在構建kafkaFetcher的時候構建的
this.handover = new Handover();
handover是一個工具,將一組數據或者異常從生產者線程傳輸到消費者線程。它高效的扮演了一個阻塞隊列的特性。該類運行於flink kafka consumer,用來在kafkaConsumer 類和主線程之間轉移數據和異常。
handover有兩個重要方法,分別是:
1,producer
producer是將kafkaConusmer獲取的數據發送出去,在KafkaConsumerThread中調用。代碼如上
2,pollnext
從handover裡面拉去下一條數據,會阻塞的,行為很像是從一個阻塞隊列裡面拉去數據。
綜述
kafkaConsumer批量拉去數據,flink將其經過整理之後變成,逐個Record發送的事件觸髮式的流處理。這就是flink與kafka結合事件觸發時流處理的基本思路。
重要的事情再說一遍:Flink支持動態發現新增topic或者新增partition哦。具體實現思路,前面有代碼為證,後面會對比spark Streaming的這塊(不支持動態發現新增kafka topic或者partition),來詳細講解。
打開今日頭條,查看更多精彩圖片※推薦 9 個樣式化組件的 React UI 庫
※spark中dataFrame的一些方法回顧
TAG:程序員小新人學習 |