1

My Kafka consumer throws an exception when trying to process messages in a batch(i.e process list of messages)

Error Message

is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.pMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app'); nested exception is java.lang.ClassCastException: class kafka.psmessage.PMessage cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (kafka.psmessage.PMessage and org.apache.kafka.clients.consumer.ConsumerRecord are in unnamed module of loader 'app')

Code snippet

   public void receive(List<ConsumerRecord<String, PMessage>> records) {
           List<PMessage> msgList = records.stream().map(message -> message.value()).collect(Collectors.toList());

PMessage test = records.get(0).value();
ConsumerRecord<String, PMessage> firstMessage = records.get(0);

All 3 statements above giving ClassCastException

@Override
    protected DefaultKafkaConsumerFactory<String, PMessage> createConsumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.putAll(kafkaProperties.buildConsumerProperties());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serviceInfo.getBrokersAuthUrl());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.putAll(KafkaSaslUtils.getSaslProperties(serviceInfo));

        return new DefaultKafkaConsumerFactory<>(props,
                new StringDeserializer(),
                new MessageDeserializer());
    }

MessageDeserializer:

@Override
    public PMessage deserialize(String topic, byte[] data) {
        if (data == null) {
            throw new SerializationException("Can not deserialize. data is null for topic: '" + topic + "'");
        }

        try {
            SeekableByteArrayInput seekableByteArrayInput = new SeekableByteArrayInput(data);
            GenericDatumReader reader = new GenericDatumReader<GenericRecord>();
            DataFileReader<GenericRecord> dataFileReader = new 
         DataFileReader(seekableByteArrayInput, reader);
            GenericRecord genericRecord = extractGenericRecord(dataFileReader);

Any pointer will be appreciated. Thanks!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
ray work
  • 11
  • 1
  • 4

1 Answers1

2

In short

I had the same issue and it took me a night to find a solution which is incredibly simple: your kafka listener container must work in batch mode (by default the mode is single). You can configure it through a property spring.kafka.listener.type. If you use your own KafkaListenerContainerFactory bean then just add factory.setBatchListener(true); into bean definition.

In detail

Steps to reproduce the situation (it is not a bug!).

Create class representing kafka messages and corresponding deserializer. In your case it is kafka.psmessage.PMessage, I will use CommunicationRequest.

@ToString
public class CommunicationRequest {
    public int id;
    public String action;
}

@Component
public class CommunicationRequestDeserializer implements Deserializer<CommunicationRequest> {
    @Autowired
    ObjectMapper objectMapper;

    @SneakyThrows
    @Override
    public CommunicationRequest deserialize(String topic, byte[] data) {
        String json = new String(data);
        return objectMapper.readValue(json, CommunicationRequest.class);
    }
}

Configure kafka:

@Slf4j
@Configuration
public class KafkaConfiguration {
    @Autowired
    private final KafkaProperties kafkaProperties;

    @Bean
    public ConsumerFactory<String, CommunicationRequest> consumerFactory(
        CommunicationRequestDeserializer deserializer
    ) {
        return new DefaultKafkaConsumerFactory<>(
            this.kafkaProperties.buildConsumerProperties(),
            new StringDeserializer(),
            deserializer
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
        ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
        ConsumerFactory<String, CommunicationRequest> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
            new ConcurrentKafkaListenerContainerFactory<>()
        ;
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(false); /* Important line! */
        return factory;
    }

    /* other beans are not shown to make example shorter */
}

Now you can create a class with KafkaListener annotation to read messages. You have a few choices on your method signature (remove all methods but one!).

@Slf4j
@Component
public class MyListener {
    @KafkaListener(topics = "test-topic")
    public void consume1(Object record) {
        log.info("consume1: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume2(CommunicationRequest record) {
        log.info("consume2: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume3(ConsumerRecord<String, CommunicationRequest> record) {
        log.info("consume3: {}", record.toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume4(List<Object> records) {
        log.info("consume4: size = {}, first = {}", records.size(), records.get(0).toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume5(List<CommunicationRequest> records) {
        log.info("consume5: size = {}, first = {}", records.size(), records.get(0).toString());
    }

    @KafkaListener(topics = "test-topic")
    public void consume6(List<ConsumerRecord<String, CommunicationRequest>> records) {
        log.info("consume6: size = {}, first = {}", records.size(), records.get(0).toString());
    }
}

Put messages into kafka and you will get the following when listener type = single, i.e. setBatchListener(false); was specified:

2021-11-19 01:34:54,029 INFO  MyListener - consume1: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:31:53,141 INFO  MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:36:40,483 INFO  MyListener - consume3: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:41:23,265 INFO  MyListener - consume4: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:42:34,259 INFO  MyListener - consume5: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:44:39,999 ERROR SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(...

As you see Spring Kafka tries to guess required class to cast message to. If you ask for a list of messages Spring Kafka will return a list of size 1 regardless how many messages were actually read in the last poll(). By the way some signatures are invalid and cannot be called like the last one.


The above describes behaviour when KafkaListenerContainer has type single, not batch (look up the important line in the listings). Now let's create batch listener!

/* ... */

@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
    ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
    ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
    ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
        new ConcurrentKafkaListenerContainerFactory<>()
    ;
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true); /* Important line! */
    return factory;
}

/* ... */

Let's use the same MyListener class and see results:

2021-11-19 01:57:10,505 INFO  MyListener - consume1: io.opentracing.contrib.kafka.TracingKafkaConsumer@6a9f8235
2021-11-19 01:58:18,382 INFO  MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:59:43,188 ERROR KafkaMessageListenerContainer$ListenerConsumer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
2021-11-19 02:00:50,468 INFO  MyListener - consume4: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:01:53,214 INFO  MyListener - consume5: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:03:04,642 INFO  MyListener - consume6: size = 2, first = ConsumerRecord(topic = test-topic, ...)

Again Spring Kafka is good enough to recognize almost all signatures. The most important is that now you receive a list of messages - all the batch that was consumed in the last poll() call. In my situation there were only 2 messages in topic.

Conclusion

You can use different method signatures and combine them with listener types, single and batch. Not all combinations are allowed. Your one is disallowed and has nothing to do with Apache Kafka itself and topic recreation: you receive a List of Pmessage despite the fact that your signature says you expect a List of ConsumerRecord. That's how Spring Kafka works.