I am trying to extend FlinkKafkaConsumer to throttle Kafka consumer in my Flink Job using flink version 1.12 .As part of it I tried to follow below thread to achieve the same. But, i am ending up with compilation issues in createFetcher method while creating AbstractFetcher.
How to use Ratelimiter on flink?
Is their a method named in emitRecord, as i could not find the both KafkaFetcher and AbstractFetcher classes ?
Below is the code snippet
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {
return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}
@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};
}
Any Suggestions to resolve this Issue is obtaining the custom rate limit for Flink Kafka Consumer