集群管理工具KafkaAdminClient——原理與示例
1
前言
一般情況下,我們都習慣使用Kafka中bin目錄下的腳本工具來管理查看Kafka,但是有些時候需要將某些管理查看的功能集成到系統(比如Kafka Manager)中,那麼就需要調用一些API來直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務端代碼,採用Scala編寫)下的AdminClient和AdminUtils來實現部分的集群管理操作,比如筆者之前在Kafka解析之topic創建(1)和Kafka解析之topic創建(2)兩篇文章中所講解的Topic的創建就用到了AdminUtils類。而在Kafka0.11.0.0版本之後,又多了一個AdminClient,這個是在kafka-client包下的,這是一個抽象類,具體的實現是org. apache. kafka. clients. admin. KafkaAdminClient,這個就是本文所要陳述的重點了。
功能與原理介紹
在Kafka官網中這麼描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):
創建Topic:createTopics(Collection newTopics)
刪除Topic:deleteTopics(Collection topics)
羅列所有Topic:listTopics()
查詢Topic:describeTopics(Collection topicNames)
查詢集群信息:describeCluster()
查詢ACL信息:describeAcls(AclBindingFilter filter)
創建ACL信息:createAcls(Collection acls)
刪除ACL信息:deleteAcls(Collection filters)
查詢配置信息:describeConfigs(Collection resources)
修改配置信息:alterConfigs(Map configs)
修改副本的日誌目錄:alterReplicaLogDirs(Map replicaAssignment)
查詢節點的日誌目錄信息:describeLogDirs(Collection brokers)
查詢副本的日誌目錄信息:describeReplicaLogDirs(Collection replicas)
增加分區:createPartitions(Map newPartitions)
其內部原理是使用Kafka自定義的一套二進位協議來實現,詳細可以參見Kafka協議。主要實現步驟:
示例
下面就以創建Topic來舉一個簡單的KafkaAdminClient的使用案例,【代碼清單1】:
示例中的createTopics()方法就創建了一個分區數為4,副本因子為1的「topic-test2」的Topic。
代碼剖析
下面來詳細介紹一下KafkaAdminClient中現有的listTopics()方法(這個方法的實現相對乾淨利落,代碼量少、易於講解)的實現方式,以便可以了解KafkaAdminClient中的大體脈絡。listTopics()方法的具體代碼如【代碼清單2】所示:
不過ListTopicsOptions擴展了一個成員變數listInternal,用來指明是否需要羅列內部Topic,比如在Kafka解析之topic創建(1)中提及的「__consumer_offsets」和「transaction_state」就是兩個內部Topic。ListTopicsOptions的代碼如【代碼清單4】所示:
listInternal的值默認為false,如果同時要羅列出目前的內部Topic的話就需要將這個listInternal設置為true,示例代碼如【代碼清單5】所示:
接下去繼續講解listTopics()方法,其返回值為ListTopicResult類型。與ListTopicsOptions對應,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXResult類型的返回值,其內部一般包含一個KafkaFuture,用於非同步發送請求之後等待操作結果。KafkaFuture實現了Java中的Future介面,用來支持鏈式調用以及其他非同步編程模型,可以看成是Java8中CompletableFuture的一個小型版本,其中也有類似thenApply、complete、completeExceptionally的方法。
再來看代碼清單2中的 runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) 這行代碼,runnable的類型是AdminClientRunnable,其是KafkaAdminClient負責處理與服務端交互請求的服務線程。AdminClientRunnable中的call方法用作入隊一個Call請求,進而對其處理。Call請求代表與服務端的一次請求交互,比如listTopics和createTopics都是一次Call請求,AdminClientRunnable線程負責處理這些Call請求。
Call類是一個抽象類,構造方法接收三個參數:本次請求的名稱callName、超時時間deadlineMs、以及節點提供器nodeProvider。nodeProvider是NodeProvider類型,用來提供本次請求所交互的Broker節點。Call類中還有3個抽象方法:createRequest()、handleResponse()、handleFailure(),分別用來創建請求、處理回執和處理失敗。在代碼清單2中,對於listTopics()方法而言,其內部原理就是發送MetadataRequest請求然後處理MetadataResponse,其處理邏輯峰封裝在createRequest()、handleResponse()、handleFailure()這三個方法之中了。
綜上,如果要自定義實現一個功能,只需要三個步驟:1.自定義XXXOptions;2.自定義XXXResult返回值;3.自定義Call,然後挑選合適的XXXRequest和XXXResponse來實現Call類中的3個抽象方法。
KafkaAdminClient目前而言尚未形成一個完全體,裡面還可以擴展很多功能,就拿上一篇文章《如何獲取Kafka的消費者詳情——從Scala到Java的切換》中介紹的而言,目前KafkaAdminClient尚未實現describeConsumerGroup和listGroupOffsets的功能,所以需要進一步的升級改造。篇幅限制,這部分內容將在下一篇文章進行介紹,如果想要先睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….
END
下
期
預
告
《集群管理工具KafkaAdminClient——改造》
TAG:朱小廝的博客 |