當前位置:
首頁 > 知識 > Kafka 源碼分析 5 :KafkaConsumer 消費處理

Kafka 源碼分析 5 :KafkaConsumer 消費處理

(點擊

上方公眾號

,可快速關注)




來源:劉正陽 ,


liuzhengyang.github.io/2018/02/21/kafka-source-5-kafka-consumer/




Kafka消費者客戶端從Kafka cluster中讀取消息並處理。




Kafka消費者可以手動綁定自己到某個topic的某些partition上或者通過subscribe方法監聽某個topic自動綁定。Kafka消費者綁定到某個parition後就和這個partition的leader連接,然後發出fetch request, 獲取消息後進行處理。



offset管理




kafka的消費模型是一個partition最多被一個consumer消費,而offset可以有consumer控制,例如通過seek前進或後退到某個offset位置。




首次連接時,可以通過KafkaConsumer配置參數里的auto.offset.reset參數決定是從最新的位置(默認)還是從就早的位置開始消費。




默認情況下, enable.auto.commit參數是true,即KafkaConsumer客戶端會定時commit offset,所有要注意的一點是如果poll函數得到ConsumerRecords後如果處理是非同步的,則可能出現消費處理還沒有完成但是卻commit offset了,這時如果進程掛掉則重啟後則會發生丟消息的情況。這裡有兩種解決方案,1是poll後的處理是同步的,這樣下一次poll會嘗試commit offset,則能保證at least one語義。2是關閉enable.auto.commit, 然後通過KafkaConsumer.commitSync方法來手動commit offset。



max.poll.interval.ms參數用於設置kafka消費者處理一次poll的消費結果的最大時間(默認300s),如果超過了這個時間則consumer被認為掛了會重新rebalance。




Consumer線程相關




消費者多線程處理有幾種方式






  1. 每個consumer只由一個線程處理,優點是能保證partition內有序和實現簡單,缺點是並發能力受限於partition的數量



  2. 將consumption和process過程分離,即consumer拉到一個消息後傳遞給另一個線程或線程池處理,這裡提高了並發能力但是需要注意多線程處理中的順序問題不再保證以及可能出現consumer提交了offset而線程池沒處理完的情況,另外線程池要注意處理慢導致的內存隊列積壓問題。




KafkaConsumer.subscribe




監聽某個topic




subscribe(Collection topics, ConsumerRebalanceListener listener)


當消費者使用kafka cluster來管理group membership時,ConsumerRebalanceListener會在consumer rebalance時調用,consumer rebalance發生在消費者或消費關係變化的時候





  1. 某個消費進程掛掉



  2. 新消費進程加入



  3. partition數量發生變化時




這個Listener的常見用途是保存這個partition的最新消費offset,在void onPartitionsRevoked(java.util.Collection<TopicPartition> partitions)里保存當前的partition和offset到資料庫中。然後reassign完成後,void onPartitionsAssigned(java.util.Collection partitions)中從資料庫讀取之前的消費位置,通過seek方法設置消費位置繼續消費。




Kafka.poll




public ConsumerRecords<K, V> poll(long timeout) {


        // KafkaConsumer不是線程安全的


       acquireAndEnsureOpen();


       try {


           if (timeout < 0)


               throw new IllegalArgumentException("Timeout must not be negative");


           if (this.subscriptions.hasNoSubscriptionOrUserAssignment())


               throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");


           // poll for new data until the timeout expires


           long start = time.milliseconds();


           long remaining = timeout;


           do {


               Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);


               if (!records.isEmpty()) {


                   // before returning the fetched records, we can send off the next round of fetches


                   // and avoid block waiting for their responses to enable pipelining while the user


                   // is handling the fetched records.


                   //


                   // NOTE: since the consumed position has already been updated, we must not allow


                   // wakeups or any other errors to be triggered prior to returning the fetched records.


                   if (fetcher.sendFetches() > 0 || client.hasPendingRequests())


                       client.pollNoWakeup();


                   if (this.interceptors == null)


                       return new ConsumerRecords<>(records);


                   else


                       return this.interceptors.onConsume(new ConsumerRecords<>(records));


               }


               long elapsed = time.milliseconds() - start;


               remaining = timeout - elapsed;


           } while (remaining > 0);


           return ConsumerRecords.empty();


       } finally {


           release();


       }


   }




pollOnce處理





private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {


        client.maybeTriggerWakeup();


        // 協調者進行一次poll,裡面會根據auto.commit.interval.ms決定是否自動提交offset


        coordinator.poll(time.milliseconds(), timeout);


        // fetch positions if we have partitions we"re subscribed to that we


        // don"t know the offset for


        if (!subscriptions.hasAllFetchPositions())


            updateFetchPositions(this.subscriptions.missingFetchPositions());


        // 如果已經有record數據了直接返回


        // if data is available already, return it immediately


        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();


        if (!records.isEmpty())


            return records;


        // 發送一次fetch請求


        // send any new fetches (won"t resend pending fetches)


        fetcher.sendFetches();


        long now = time.milliseconds();


        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);


        // 等待fetch請求結果


        client.poll(pollTimeout, now, new PollCondition() {


            @Override


            public boolean shouldBlock() {


                // since a fetch might be completed by the background thread, we need this poll condition


                // to ensure that we do not block unnecessarily in poll()


                return !fetcher.hasCompletedFetches();


            }


        });


        // after the long poll, we should check whether the group needs to rebalance


        // prior to returning data so that the group can stabilize faster


        if (coordinator.needRejoin())


            return Collections.emptyMap();


        // 返回fetch結果


        return fetcher.fetchedRecords();


    }




看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

為什麼存儲密碼字元數組比字元串更合適?
MySQL 分頁優化中的 「 INNER JOIN方式優化分頁演算法 」 到底在什麼情況下會生效?

TAG:ImportNew |