0

I am trying to extend FlinkKafkaConsumer to throttle Kafka consumer in my Flink Job using flink version 1.12 .As part of it I tried to follow below thread to achieve the same. But, i am ending up with compilation issues in createFetcher method while creating AbstractFetcher.

How to use Ratelimiter on flink?

Is their a method named in emitRecord, as i could not find the both KafkaFetcher and AbstractFetcher classes ?

Below is the code snippet

protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> partitionsWithOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup, boolean useMetrics)
        throws Exception {

    return new KafkaFetcher<T>(
            sourceContext,
            partitionsWithOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics) {
        protected void emitRecord(T record,
                                  KafkaTopicPartitionState<TopicPartition> partitionState,
                                  long offset) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecord(record, partitionState, offset);
        }

        @Override
        protected void emitRecordWithTimestamp(T record,
                                               KafkaTopicPartitionState<TopicPartition> partitionState,
                                               long offset, long timestamp) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
        }
    };

}

Any Suggestions to resolve this Issue is obtaining the custom rate limit for Flink Kafka Consumer

Sonu Agarwal
  • 97
  • 2
  • 12

1 Answers1

0

emitRecord is now implemented by org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter#emitRecord.

David Anderson
  • 39,434
  • 4
  • 33
  • 60