當前位置:
首頁 > 知識 > 在kafka單伺服器的情況下消費者端如何確定伺服器是否啟動

在kafka單伺服器的情況下消費者端如何確定伺服器是否啟動

在kafka集群的單伺服器情況下,如何通過consumer消費者確定kafka伺服器或者zookeeper伺服器是否啟動(因為消費者目前是無法判斷伺服器是否啟動的,它只是去輪詢獲取伺服器數據而不報錯),如果沒有啟動,消費者端做出相應的操作來提醒消費者端使用人員進行維護,在這裡我提供一個簡單的解決方案,可能並不是非常通用,提供一個簡單的思路而已。

如果kafka伺服器或者zookeeper伺服器沒有啟動,在producer生產者端向伺服器發送信息的時候會出現錯誤(org.apache.kafka.common.errors.TimeoutException),我使用的kafka的maven版本是0.11.0.0的,當有異常錯誤出現的時候我們可以在生產者端使用一個靜態變數來變更記錄這個狀態,同時在生產者端提供一個介面以供消費者端調用;在consumer消費者端我們提供一個定時任務,如果消費者端在規定的時間裡面沒有從kafka伺服器獲取到數據,定義一個靜態變數去記錄獲取數據狀態,那麼在定時任務裡面就會根據這個靜態變數的值決定是否去調用producer消費者端的介面判斷介面返回的狀態情況,如果返回的狀態表示伺服器沒有啟動則消費者端做出相應的操作。如果在集群多伺服器的情況下,客戶端可以根據訂閱的主題topic判斷哪些伺服器是否存活的,下面提供了生產者producer和消費者consumer的main方法,並沒有按照上面的敘述去完成介面,如果需要自己去實現。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* @author liaoyubo
* @version 1.0 2017/7/25
* @description
*/
public class KafkaProducerTest {

public static void main(String [] args){
Properties properties = new Properties;
//properties.put("zookeeper.connect","localhost:2181");
properties.put("bootstrap.servers", "192.168.201.190:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

for(int i = 0;i < 10;i++){
Future<RecordMetadata> futureRecordMetadata = producer.send(new ProducerRecord<String, String>("myTopic",Integer.toString(i),Integer.toString(i)));
try {
futureRecordMetadata.get(3, TimeUnit.SECONDS);
System.out.println("發送的message:"+i);
} catch (InterruptedException e) {
e.printStackTrace;
} catch (ExecutionException e) {
if(e.getMessage.split(":")[0].split("\.")[5].equals("TimeoutException")){
System.out.println("無法連接到伺服器");
}
e.printStackTrace;
} catch (TimeoutException e) {
System.out.println("無法連接到伺服器");
e.printStackTrace;
}
}

producer.close;

}

}

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
* @author liaoyubo
* @version 1.0 2017/7/26
* @description
*/
public class KafkaConsumerClientTest {

public static void main(String [] args){

Properties properties = new Properties;
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.201.190:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

consumer.subscribe(Arrays.asList("myTopic","myTest"));
while (true){
ConsumerRecords<String,String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
//int partition = record.partition;
String topic = record.topic;
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfoList){
Node node = partitionInfo.leader;
System.out.println(node.host);
//獲取存活的伺服器
Node nodes = partitionInfo.replicas;
}
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset, record.key, record.value);
}
}
}

}

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

Android 基於 Speex 的高度封裝語音庫,0 耦合,沒三方jar包
echarts添加點擊事件
初始jvm(一)——jvm內存區域與溢出
Python判斷文件是否存在的三種方法
《HelloGitHub》第 16 期

TAG:達人科技 |

您可能感興趣

蘋果三款iPhone真機和命名已確定,可能會讓你失望!
新iPhone處理器確定!性能讓高通崩潰
這麼可愛的hypekid,你確定不看一下?
最神秘的iPhone新機 幾款原型機里蘋果也不確定用誰
iPhone Xs支持更快無線充電 但細節不確定
Youtube總部有槍手闖入!不確定是否和其禁槍的相關規定有關
Burberry 將不再直接銷毀滯銷產品,也確定了取消天然皮草的時間
iPhone9基本確定,雙卡雙待終於實錘,價格讓果粉很激動
蘋果「終於覺醒」:iPhoneX Plus又被確定,「回來吧,老用戶」
「杠精」還真不是一般人能叫的,導師李健你確定不要pick一下嗎?
iPhoneX Plus已確定,價格「涼心」,真心買不起!
果粉該睡不僅僅著了,iPhone9正式確定
iPhoneXs Plus已確定,價格心碎,真心買不起!
蘋果智能音箱HomePod確定開售時間,權威人士質疑:是否太晚?
新款LCD版iPhone價格出乎意料 你確定能手持6s而不心動?
iPhone一直不升級系統,你確定沒問題?
阿迪boost真的有踩屎感嗎!怎麼樣確定是真boost
Uber確定無人車死亡事故原因:或與自動駕駛系統軟體有關
外形確定 廉價iPhone X配置曝光 處理器/內存狠縮水
終於來啦!Wanna One確定完整體出擊《認識的哥哥》,這也是他們最想出演的節目呢!