當前位置:
首頁 > 知識 > kafka 源碼分析 3 : Producer

kafka 源碼分析 3 : Producer

(點擊

上方公眾號

,可快速關注)




來源:劉正陽 ,


liuzhengyang.github.io/2017/12/31/kafka-source-3-kafka-producer/




Producer



Producer是生產者的介面定義


常用的方法有





public Future<RecordMetadata> send(ProducerRecord<K, V> record);


public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);


public void flush();

public void close();




KafkaProducer是非同步的,調用send方法後,kafka並沒有立即發送給broker,而是先放在buffer緩衝池中就立即返回,後台的IO線程來負責把消息記錄轉換成請求發送給kafka集群。




buffer大小通過batch.size配置置頂,producer維護每個partition的沒有發送記錄的buffer。


默認情況下不滿的buffer也是可以發送的,可以通過linger.ms來設置等待時間減少請求數量,跟TCP中的Nagle演算法是一個道理。


producer的總的buffer大小可以通過buffer.memory控制,如果生產太快來不及發送超過了這個值則會block住,block的最大時間通過max.block.ms,超時後會拋出TimeoutException


key.serialize和value.serializer控制如何把Java對象轉換成byte數組傳輸給kafka集群。

acks控制producer什麼時候認為寫成功了,數量是需要leader獲得的ack的數量。acks=0時producer把消息記錄放到socket buffer中就認為成功了;acks=1時,需要leader成功寫到本地就返回,但是不需要等待follower的ack。acks=all是,需要所有的in-sync replica都返回ack才認為是發送成功,這樣只要有一個in-sync replica存活消息就沒有丟。




Partitioner負責決定將哪一個消息寫入到哪一個partition, 有一些場景希望特定的key發送到特定的partition時可以指定自己實現的Paritioner。


默認的Partitioner是隨機負載均衡的。





public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {


       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);


       int numPartitions = partitions.size();


       if (keyBytes == null) {


           int nextValue = nextValue(topic);


           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);


           if (!availablePartitions.isEmpty()) {


               int part = Utils.toPositive(nextValue) % availablePartitions.size();


               return availablePartitions.get(part).partition();


           } else {


               // no partitions are available, give a non-available partition


               return Utils.toPositive(nextValue) % numPartitions;


           }


       } else {


           // hash the keyBytes to choose a partition


           return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;


       }


   }


   private int nextValue(String topic) {


       AtomicInteger counter = topicCounterMap.get(topic);


       if (null == counter) {


           counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());


           AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);


           if (currentCounter != null) {


               counter = currentCounter;


           }


       }


       return counter.getAndIncrement();


   }




ProducerRecord




ProducerRecord包含了發送給Broker需要的內容





class ProducerRecord<K, V> {


    private final String topic;


    private final Integer partition;


    private final Headers headers;


    private final K key;


    private final V value;


    private final Long timestamp;


}




KafkaProducer構建過程


 



// 創建partitioner


this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);


long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);


// 配置序列化


if (keySerializer == null) {


    this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,


                                                                             Serializer.class));


    this.keySerializer.configure(config.originals(), true);


} else {


    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);


    this.keySerializer = ensureExtended(keySerializer);


}


if (valueSerializer == null) {


    this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,


                                                                               Serializer.class));


    this.valueSerializer.configure(config.originals(), false);


} else {


    config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);


    this.valueSerializer = ensureExtended(valueSerializer);


}


// load interceptors and make sure they get clientId


userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);


List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,


        ProducerInterceptor.class);


this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);


ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);


this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),


        true, true, clusterResourceListeners);


this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);


this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);


this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));


this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);


this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);


this.transactionManager = configureTransactionState(config);


int retries = configureRetries(config, transactionManager != null);


int maxInflightRequests = configureInflightRequests(config, transactionManager != null);


short acks = configureAcks(config, transactionManager != null);


this.apiVersions = new ApiVersions();


// RecordAccumulator中實現了累加和等待的邏輯


this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),


        this.totalMemorySize,


        this.compressionType,


        config.getLong(ProducerConfig.LINGER_MS_CONFIG),


        retryBackoffMs,


        metrics,


        time,


        apiVersions,


        transactionManager);


List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));


this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());


ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);


Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);


// 高層的網路處理,封裝了send、poll等介面


NetworkClient client = new NetworkClient(


        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),


                this.metrics, time, "producer", channelBuilder),


        this.metadata,


        clientId,


        maxInflightRequests,


        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),


        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),


        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),


        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),


        this.requestTimeoutMs,


        time,


        true,


        apiVersions,


        throttleTimeSensor);


// 負責實際發送請求給kafka集群的後台線程


this.sender = new Sender(client,


        this.metadata,


        this.accumulator,


        maxInflightRequests == 1,


        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),


        acks,


        retries,


        this.metrics,


        Time.SYSTEM,


        this.requestTimeoutMs,


        config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),


        this.transactionManager,


        apiVersions);


String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");


this.ioThread = new KafkaThread(ioThreadName, this.sender, true);


this.ioThread.start();


this.errors = this.metrics.sensor("errors");


config.logUnused();


AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);


log.debug("Kafka producer started");




KafkaProducer#send




入口在doSend(interceptedRecord, callback);





// 獲取cluster信息, 來得到對應topic的cluster節點信息


ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);


long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);


   Cluster cluster = clusterAndWaitTime.cluster;


   byte[] serializedKey;


   try {


       serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());


   } catch (ClassCastException cce) {


       throw new SerializationException("Can"t convert key of class " + record.key().getClass().getName() +


               " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +


               " specified in key.serializer");


   }


   byte[] serializedValue;


   try {


       serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());


   } catch (ClassCastException cce) {


       throw new SerializationException("Can"t convert value of class " + record.value().getClass().getName() +


               " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +


               " specified in value.serializer");


   }


   // 找到對應的partition


   int partition = partition(record, serializedKey, serializedValue, cluster);


   tp = new TopicPartition(record.topic(), partition);


   setReadOnly(record.headers());


   Header[] headers = record.headers().toArray();


   int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),


           compressionType, serializedKey, serializedValue, headers);


   ensureValidRecordSize(serializedSize);


   long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();


   log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);


   // producer callback will make sure to call both "callback" and interceptor callback


   Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);


   if (transactionManager != null && transactionManager.isTransactional())


       transactionManager.maybeAddPartitionToTransaction(tp);


   // 追加到RecordAccumulator中


   RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,


           serializedValue, headers, interceptCallback, remainingWaitMs);


   if (result.batchIsFull || result.newBatchCreated) {


       log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);


       this.sender.wakeup();


   }


   return result.future;




RecordAccumulator




使用雙端隊列Deque保存ProducerBatch





// We keep track of the number of appending thread to make sure we do not miss batches in


   // abortIncompleteBatches().


   appendsInProgress.incrementAndGet();


   ByteBuffer buffer = null;


   if (headers == null) headers = Record.EMPTY_HEADERS;


   try {


       // check if we have an in-progress batch


       // 獲取或創建對應TopicPartition的隊列


       Deque<ProducerBatch> dq = getOrCreateDeque(tp);


       synchronized (dq) {


           if (closed)


               throw new IllegalStateException("Cannot send after the producer is closed.");


           // 如果最後一個節點能加入就加入返回


           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);


           if (appendResult != null)


               return appendResult;


       }


       // 加入不了就要新申請一個


       // we don"t have an in-progress record batch try to allocate a new batch


       byte maxUsableMagic = apiVersions.maxUsableProduceMagic();


       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));


       log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());


       buffer = free.allocate(size, maxTimeToBlock);


       synchronized (dq) {


           // Need to check if producer is closed again after grabbing the dequeue lock.


           if (closed)


               throw new IllegalStateException("Cannot send after the producer is closed.");


           // 這兩個同步塊間可能有其他線程已經創建了下一個Batch


           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);


           if (appendResult != null) {


               // Somebody else found us a batch, return the one we waited for! Hopefully this doesn"t happen often...


               return appendResult;


           }


           MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);


           ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());


           FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));


           dq.addLast(batch);


           incomplete.add(batch);


           // Don"t deallocate this buffer in the finally block as it"s being used in the record batch


           buffer = null;


           return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);


       }


   } finally {


       if (buffer != null)


           free.deallocate(buffer);


       appendsInProgress.decrementAndGet();


   }




Sender




Sender是一個後台線程, 不考慮事務的話,只分為senProducerDat和poll, poll中等待處理返回結果





void run(long now) {


       if (transactionManager != null) {


           if (!transactionManager.isTransactional()) {


               // this is an idempotent producer, so make sure we have a producer id


               maybeWaitForProducerId();


           } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {


               // as long as there are outstanding transactional requests, we simply wait for them to return


               client.poll(retryBackoffMs, now);


               return;


           }


           // do not continue sending if the transaction manager is in a failed state or if there


           // is no producer id (for the idempotent case).


           if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {


               RuntimeException lastError = transactionManager.lastError();


               if (lastError != null)


                   maybeAbortBatches(lastError);


               client.poll(retryBackoffMs, now);


               return;


           } else if (transactionManager.hasAbortableError()) {


               accumulator.abortUndrainedBatches(transactionManager.lastError());


           }


       }


       long pollTimeout = sendProducerData(now);


       client.poll(pollTimeout, now);


   }





```


private long sendProducerData(long now) {


//


Cluster cluster = metadata.fetch();


// 獲取準備好發送的數據,包括各個TopicParition的隊列,其中隊列長度大於1、第一個batch滿了、沒有緩存buffer空間了、正在關閉、在調用flush都會刷新待發送數據。


// get the list of partitions with data ready to send


RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);


 


    // if there are any partitions whose leaders are not known yet, force metadata update


    if (!result.unknownLeaderTopics.isEmpty()) {


        // The set of topics with unknown leader contains topics with leader election pending as well as


        // topics which may have expired. Add the topic again to metadata to ensure it is included


        // and request metadata update, since there are messages to send to the topic.


        for (String topic : result.unknownLeaderTopics)


            this.metadata.add(topic);


        this.metadata.requestUpdate();


    }


 


    // remove any nodes we aren"t ready to send to


    Iterator<Node> iter = result.readyNodes.iterator();


    long notReadyTimeout = Long.MAX_VALUE;


    while (iter.hasNext()) {


        Node node = iter.next();


        if (!this.client.ready(node, now)) {


            iter.remove();


            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));


        }


    }


 


    // 從隊列中取出


    // create produce requests


    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,


            this.maxRequestSize, now);


    if (guaranteeMessageOrder) {


        // Mute all the partitions drained


        for (List<ProducerBatch> batchList : batches.values()) {


            for (ProducerBatch batch : batchList)


                this.accumulator.mutePartition(batch.topicPartition);


        }


    }


 


    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);


    boolean needsTransactionStateReset = false;


    // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics


    // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why


    // we need to reset the producer id here.


    if (!expiredBatches.isEmpty())


        log.trace("Expired {} batches in accumulator", expiredBatches.size());


    for (ProducerBatch expiredBatch : expiredBatches) {


        failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());


        if (transactionManager != null && expiredBatch.inRetry()) {


            needsTransactionStateReset = true;


        }


        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);


    }


 


    if (needsTransactionStateReset) {


        transactionManager.resetProducerId();


        return 0;


    }


 


    sensors.updateProduceRequestMetrics(batches);


 


    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately


    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data


    // that isn"t yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes


    // with sendable data that aren"t ready to send since they would cause busy looping.


    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);


    if (!result.readyNodes.isEmpty()) {


        log.trace("Nodes with data ready to send: {}", result.readyNodes);


        // if some partitions are already ready to be sent, the select time would be 0;


        // otherwise if some partition already has some data accumulated but not ready yet,


        // the select time will be the time difference between now and its linger expiry time;


        // otherwise the select time will be the time difference between now and the metadata expiry time;


        pollTimeout = 0;


    }


    sendProduceRequests(batches, now);


 


    return pollTimeout;


}




看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

Spring AOP 的實現機制
誰在關心toString的性能?

TAG:ImportNew |