當前位置:
首頁 > 最新 > 集群管理工具KafkaAdminClient——原理與示例

集群管理工具KafkaAdminClient——原理與示例

1

前言

一般情況下,我們都習慣使用Kafka中bin目錄下的腳本工具來管理查看Kafka,但是有些時候需要將某些管理查看的功能集成到系統(比如Kafka Manager)中,那麼就需要調用一些API來直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務端代碼,採用Scala編寫)下的AdminClient和AdminUtils來實現部分的集群管理操作,比如筆者之前在Kafka解析之topic創建(1)和Kafka解析之topic創建(2)兩篇文章中所講解的Topic的創建就用到了AdminUtils類。而在Kafka0.11.0.0版本之後,又多了一個AdminClient,這個是在kafka-client包下的,這是一個抽象類,具體的實現是org. apache. kafka. clients. admin. KafkaAdminClient,這個就是本文所要陳述的重點了。


在Kafka官網中這麼描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

創建Topic:createTopics(Collection newTopics)

刪除Topic:deleteTopics(Collection topics)

羅列所有Topic:listTopics()

查詢Topic:describeTopics(Collection topicNames)

查詢集群信息:describeCluster()

查詢ACL信息:describeAcls(AclBindingFilter filter)

創建ACL信息:createAcls(Collection acls)

刪除ACL信息:deleteAcls(Collection filters)

查詢配置信息:describeConfigs(Collection resources)

修改配置信息:alterConfigs(Map configs)

修改副本的日誌目錄:alterReplicaLogDirs(Map replicaAssignment)

查詢節點的日誌目錄信息:describeLogDirs(Collection brokers)

查詢副本的日誌目錄信息:describeReplicaLogDirs(Collection replicas)

增加分區:createPartitions(Map newPartitions)

其內部原理是使用Kafka自定義的一套二進位協議來實現,詳細可以參見Kafka協議。主要實現步驟:

下面就以創建Topic來舉一個簡單的KafkaAdminClient的使用案例,【代碼清單1】:

示例中的createTopics()方法就創建了一個分區數為4,副本因子為1的「topic-test2」的Topic。


下面來詳細介紹一下KafkaAdminClient中現有的listTopics()方法(這個方法的實現相對乾淨利落,代碼量少、易於講解)的實現方式,以便可以了解KafkaAdminClient中的大體脈絡。listTopics()方法的具體代碼如【代碼清單2】所示:

不過ListTopicsOptions擴展了一個成員變數listInternal,用來指明是否需要羅列內部Topic,比如在Kafka解析之topic創建(1)中提及的「__consumer_offsets」和「transaction_state」就是兩個內部Topic。ListTopicsOptions的代碼如【代碼清單4】所示:

listInternal的值默認為false,如果同時要羅列出目前的內部Topic的話就需要將這個listInternal設置為true,示例代碼如【代碼清單5】所示:

接下去繼續講解listTopics()方法,其返回值為ListTopicResult類型。與ListTopicsOptions對應,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXResult類型的返回值,其內部一般包含一個KafkaFuture,用於非同步發送請求之後等待操作結果。KafkaFuture實現了Java中的Future介面,用來支持鏈式調用以及其他非同步編程模型,可以看成是Java8中CompletableFuture的一個小型版本,其中也有類似thenApply、complete、completeExceptionally的方法。

再來看代碼清單2中的 runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) 這行代碼,runnable的類型是AdminClientRunnable,其是KafkaAdminClient負責處理與服務端交互請求的服務線程。AdminClientRunnable中的call方法用作入隊一個Call請求,進而對其處理。Call請求代表與服務端的一次請求交互,比如listTopics和createTopics都是一次Call請求,AdminClientRunnable線程負責處理這些Call請求。

Call類是一個抽象類,構造方法接收三個參數:本次請求的名稱callName、超時時間deadlineMs、以及節點提供器nodeProvider。nodeProvider是NodeProvider類型,用來提供本次請求所交互的Broker節點。Call類中還有3個抽象方法:createRequest()、handleResponse()、handleFailure(),分別用來創建請求、處理回執和處理失敗。在代碼清單2中,對於listTopics()方法而言,其內部原理就是發送MetadataRequest請求然後處理MetadataResponse,其處理邏輯峰封裝在createRequest()、handleResponse()、handleFailure()這三個方法之中了。

綜上,如果要自定義實現一個功能,只需要三個步驟:1.自定義XXXOptions;2.自定義XXXResult返回值;3.自定義Call,然後挑選合適的XXXRequest和XXXResponse來實現Call類中的3個抽象方法。

KafkaAdminClient目前而言尚未形成一個完全體,裡面還可以擴展很多功能,就拿上一篇文章《如何獲取Kafka的消費者詳情——從Scala到Java的切換》中介紹的而言,目前KafkaAdminClient尚未實現describeConsumerGroup和listGroupOffsets的功能,所以需要進一步的升級改造。篇幅限制,這部分內容將在下一篇文章進行介紹,如果想要先睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….

END

《集群管理工具KafkaAdminClient——改造》


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 朱小廝的博客 的精彩文章:

Kafka的Lag計算誤區及正確實現

TAG:朱小廝的博客 |