Flink是如何從kafka中拉取資料的
首先來看一下 FlinkKafkaConsumerBase.run方法,相當於是Flink 從kafka中拉取資料的入口方法:
//入口方法 start a source public void run(SourceContext<T> sourceContext) throws Exception { ...... // from this point forward: //- 'snapshotState' will draw offsets from the fetcher, //instead of being built from `subscribedPartitionsToStartOffsets` //- 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to //Kafka through the fetcher, if configured to do so) //建立Fetcher 從kafka中拉取資料 this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, periodicWatermarkAssigner, punctuatedWatermarkAssigner, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: //1) New state - partition discovery loop executed as separate thread, with this //thread running the main fetcher loop //2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { //未配置KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS kafkaFetcher.runFetchLoop(); } else { //仍然呼叫了kafkaFetcher.runFetchLoop(); runWithPartitionDiscovery(); } }
createFetcher方法
@Override protected AbstractFetcher<T, ?> createFetcher( SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { ...... return new KafkaFetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), runtimeContext.getTaskNameWithSubtasks(), deserializer, properties, pollTimeout, runtimeContext.getMetricGroup(), consumerMetricGroup, useMetrics); }
返回了一個 KafkaFetcher物件,我們點進去看一下
KafkaFetcher的構造器裡面建立了一個 KafkaConsumerThread物件
public KafkaFetcher( SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, KafkaDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { ...... this.consumerThread = new KafkaConsumerThread( LOG, //KafkaConsumerThread 構造器中的引數 handover, kafkaProperties, //unassignedPartitionsQueue具體是什麼呢?咱們會在flink startupMode是如何起作用的 詳細去講 unassignedPartitionsQueue, getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup); }
至此為止createFetch就介紹完了,也可以看作是拉取資料的準備工作,接下來看一下kafkaFetcher.runFetchLoop();
KafkaFetch中的runFetchLoop方法,正式開始從kafka中拉取message
//fetcher message from kafka public void runFetchLoop() throws Exception { try { //KafkaConsumerThread構造的引數之一 final Handover handover = this.handover; // kick off the actual Kafka consumer //實際的從kafka中拉取資料的地方 consumerThread.start(); while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread //從handover中獲取資料,然後對records進行處理 final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord<byte[], byte[]> record : partitionRecords) { final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled running = false; break; } // emit the actual record. this also updates offset state atomically // and deals with timestamps and watermark generation //傳送訊息,接下來就是timestamps和watermark的處理了 emitRecord(value, partition, record.offset(), record); } } } } finally { // this signals the consumer thread that no more work is to be done consumerThread.shutdown(); } // on a clean exit, wait for the runner thread try { consumerThread.join(); } catch (InterruptedException e) { // may be the result of a wake-up interruption after an exception. // we ignore this here and only restore the interruption state Thread.currentThread().interrupt(); } }
既然consumerThread.start()開始了實際的kafka consumer,我們一起來看一下consumerThread中的方法
@Override public void run() { // early exit check if (!running) { return; } // this is the means to talk to FlinkKafkaConsumer's main thread //構造器中引數,用於FlinkKafkaConsumer主執行緒進行消費,上面提到的handover.pollNext() final Handover handover = this.handover; // This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. try { //獲取consumer this.consumer = getConsumer(kafkaProperties); } catch (Throwable t) { handover.reportError(t); return; } // from here on, the consumer is guaranteed to be closed properly ...... // early exit check if (!running) { return; } // the latest bulk of records. May carry across the loop if the thread is woken up // from blocking on the handover ConsumerRecords<byte[], byte[]> records = null; // reused variable to hold found unassigned new partitions. // found partitions are not carried across loops using this variable; // they are carried across via re-adding them to the unassigned partitions queue List<KafkaTopicPartitionState<TopicPartition>> newPartitions; // main fetch loop while (running) { // check if there is something to commit //default false if (!commitInProgress) { // get and reset the work-to-be committed, so we don't repeatedly commit the same //這裡具體可以參考[Flink是如何儲存Offset的](https://www.jianshu.com/p/ee4fe63f0182) final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null); if (commitOffsetsAndCallback != null) { log.debug("Sending async offset commit request to Kafka broker"); // also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } try { //hasAssignedPartitions default false //當發現新的partition的時候,會add到unassignedPartitionsQueue和sub //具體可以參考 flink startupMode是如何起作用的 if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); } if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; } if (!hasAssignedPartitions) { // Without assigned partitions KafkaConsumer.poll will throw an exception continue; } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try { //通過kafkaAPI 拉取資料 records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { //handover對records進行"包裝",供FlinkKafkaConsumer主執行緒消費 handover.produce(records); records = null; } catch (Handover.WakeupException e) { // fall through the loop } } // end main fetch loop } catch (Throwable t) { // let the main thread know and exit // it may be that this exception comes because the main thread closed the handover, in // which case the below reporting is irrelevant, but does not hurt either handover.reportError(t); } finally { // make sure the handover is closed if it is not already closed or has an error handover.close(); // make sure the KafkaConsumer is closed try { consumer.close(); } catch (Throwable t) { log.warn("Error while closing Kafka consumer", t); } } }
至此如何從kafka中拉取資料,已經介紹完了