In short
I had the same issue and it took me a night to find a solution which is incredibly simple: your kafka listener container must work in batch mode (by default the mode is single). You can configure it through a property spring.kafka.listener.type
. If you use your own KafkaListenerContainerFactory
bean then just add factory.setBatchListener(true);
into bean definition.
In detail
Steps to reproduce the situation (it is not a bug!).
Create class representing kafka messages and corresponding deserializer. In your case it is kafka.psmessage.PMessage, I will use CommunicationRequest.
@ToString
public class CommunicationRequest {
public int id;
public String action;
}
@Component
public class CommunicationRequestDeserializer implements Deserializer<CommunicationRequest> {
@Autowired
ObjectMapper objectMapper;
@SneakyThrows
@Override
public CommunicationRequest deserialize(String topic, byte[] data) {
String json = new String(data);
return objectMapper.readValue(json, CommunicationRequest.class);
}
}
Configure kafka:
@Slf4j
@Configuration
public class KafkaConfiguration {
@Autowired
private final KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory<String, CommunicationRequest> consumerFactory(
CommunicationRequestDeserializer deserializer
) {
return new DefaultKafkaConsumerFactory<>(
this.kafkaProperties.buildConsumerProperties(),
new StringDeserializer(),
deserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>()
;
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(false); /* Important line! */
return factory;
}
/* other beans are not shown to make example shorter */
}
Now you can create a class with KafkaListener annotation to read messages. You have a few choices on your method signature (remove all methods but one!).
@Slf4j
@Component
public class MyListener {
@KafkaListener(topics = "test-topic")
public void consume1(Object record) {
log.info("consume1: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume2(CommunicationRequest record) {
log.info("consume2: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume3(ConsumerRecord<String, CommunicationRequest> record) {
log.info("consume3: {}", record.toString());
}
@KafkaListener(topics = "test-topic")
public void consume4(List<Object> records) {
log.info("consume4: size = {}, first = {}", records.size(), records.get(0).toString());
}
@KafkaListener(topics = "test-topic")
public void consume5(List<CommunicationRequest> records) {
log.info("consume5: size = {}, first = {}", records.size(), records.get(0).toString());
}
@KafkaListener(topics = "test-topic")
public void consume6(List<ConsumerRecord<String, CommunicationRequest>> records) {
log.info("consume6: size = {}, first = {}", records.size(), records.get(0).toString());
}
}
Put messages into kafka and you will get the following when listener type = single, i.e. setBatchListener(false);
was specified:
2021-11-19 01:34:54,029 INFO MyListener - consume1: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:31:53,141 INFO MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:36:40,483 INFO MyListener - consume3: ConsumerRecord(topic = test-topic, ...)
2021-11-19 01:41:23,265 INFO MyListener - consume4: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:42:34,259 INFO MyListener - consume5: size = 1, first = CommunicationRequest(action=abc, id=1)
2021-11-19 01:44:39,999 ERROR SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(...
As you see Spring Kafka tries to guess required class to cast message to. If you ask for a list of messages Spring Kafka will return a list of size 1 regardless how many messages were actually read in the last poll(). By the way some signatures are invalid and cannot be called like the last one.
The above describes behaviour when KafkaListenerContainer has type single, not batch (look up the important line in the listings). Now let's create batch listener!
/* ... */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> kafkaListenerContainerFactory(
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers,
ConsumerFactory<String, CommunicationRequest> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, CommunicationRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>()
;
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); /* Important line! */
return factory;
}
/* ... */
Let's use the same MyListener class and see results:
2021-11-19 01:57:10,505 INFO MyListener - consume1: io.opentracing.contrib.kafka.TracingKafkaConsumer@6a9f8235
2021-11-19 01:58:18,382 INFO MyListener - consume2: CommunicationRequest(action=abc, id=1)
2021-11-19 01:59:43,188 ERROR KafkaMessageListenerContainer$ListenerConsumer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
2021-11-19 02:00:50,468 INFO MyListener - consume4: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:01:53,214 INFO MyListener - consume5: size = 2, first = CommunicationRequest(action=abc, id=1)
2021-11-19 02:03:04,642 INFO MyListener - consume6: size = 2, first = ConsumerRecord(topic = test-topic, ...)
Again Spring Kafka is good enough to recognize almost all signatures. The most important is that now you receive a list of messages - all the batch that was consumed in the last poll() call. In my situation there were only 2 messages in topic.
Conclusion
You can use different method signatures and combine them with listener types, single and batch. Not all combinations are allowed. Your one is disallowed and has nothing to do with Apache Kafka itself and topic recreation: you receive a List of Pmessage despite the fact that your signature says you expect a List of ConsumerRecord. That's how Spring Kafka works.