2

So with the below configuration when we scale the spring boot containers to 10 jvms , the number of event is randomly more than published , for eg , if there are 320000 messages published the events are sometimes 320500 etc..

//Consumer container bean 
    private static final int CONCURRENCY = 1;


@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
        props.put("enable.auto.commit", "false");

        //props.put("isolation.level", "read_committed");
        return props;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setConcurrency(CONCURRENCY);
        return factory;
    }

//Listener 
    @KafkaListener(id="claimserror",topics = "${kafka.topic.dataintakeclaimsdqerrors}",groupId = "topic1", containerFactory = "kafkaListenerContainerFactory")
    public void receiveClaimErrors(String event,Acknowledgment ack) throws JsonProcessingException {
//save event to table ..
}

Updated The below change seems to be working fine now , i will have just add a duplicate check in the consumer to prevent a consumer failure scenario

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "-1");
    //props.put("isolation.level", "read_committed");
    return props;
}
Hichem BOUSSETTA
  • 1,791
  • 1
  • 21
  • 27
djs
  • 21
  • 3

2 Answers2

1

You can try setting ENABLE_IDEMPOTENCE_CONFIG as true, this will help ensure that exactly one copy of each message is written in the stream by the producer.

NITIKA RAJ
  • 41
  • 3
0

This way work for me.

You have to configure KafkaListenerContainerFactory like this :

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaFactory);
    factory.setConcurrency(10);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

and use ConcurrentMessageListenerContainer like this :

    @Bean
public IntegrationFlow inboundFlow() {
    final ContainerProperties containerProps = new ContainerProperties(PartitionConfig.TOPIC);
    containerProps.setGroupId(GROUP_ID);

    ConcurrentMessageListenerContainer concurrentListener = new ConcurrentMessageListenerContainer(kafkaFactory, containerProps);
    concurrentListener.setConcurrency(10);
    final KafkaMessageDrivenChannelAdapter kafkaMessageChannel = new KafkaMessageDrivenChannelAdapter(concurrentListener);

    return IntegrationFlows
            .from(kafkaMessageChannel)
            .channel(requestsIn())
            .get();
}

You can see this for more informations how-does-kafka-guarantee-consumers-doesnt-read-a-single-message-twice and documentation-ConcurrentMessageListenerContainer

katy
  • 61
  • 4