3

I'm using kafka with spring cloud streams. My listener code looks like this

    @Bean
    public Consumer<List<Map<String, Object>>> receive() throws MyException {
        return message -> {
            try {
                messageService.processMessage(message);
            } catch (MyException e) {
                throw new MyException(e.getMessage());
            }
        };
    }

I recently changed it to consume messages in batch mode. I need the message to go to a dlq topic if it fails. Right now it goes to the error channel which is simply

    @StreamListener("errorChannel")
    public void handleError(ErrorMessage errorMessage) {
        log.error("exception occurred. errorMessage = {}", errorMessage);
    }

but I get a ClassCastException with error trace:

2020-04-02 15:34:21.841 ERROR 22222 --- [container-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app'), failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking MessageListener#receive[1 args]; nested exception is com.corelogic.idap.idappipelinegenericelasticsink.exception.ElasticClientException: forced, failedMessage=GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}], headers={kafka_data=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], id=0cb01a1e-fd22-f409-bcbc-1f86880efeec, sourceData=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], timestamp=1585856060836}] for original GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1777) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:1501) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1383) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1254) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:217) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:201) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:527) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:497) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1475) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1370) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
    ... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$8(KafkaMessageChannelBinder.java:1057) ~[spring-cloud-stream-binder-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

My configuration looks like this

spring.cloud.stream.bindings:
  input:
    destination: test
    group: ${application.group}
    consumer:
      max-attempts: 5
      batch-mode: true
  output:
    contentType: application/json

spring.cloud.stream.kafka.bindings:
  input:
    consumer:
      enableDlq: true
      dlqName: dead-topic
      autoCommitOnError: true
      autoCommitOffset: true
      republish-to-dlq: true
      dlqProducerProperties:
        configuration:
          key.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
          value.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

There's a similar question question here Kafka Consumer- ClassCastException java but it didn't help much.

Do I need to write my own ListSerde as done in this question? Issue with ArrayList Serde in Kafka Streams API or wait for the standard ListSerde PR to go in?

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
afzal
  • 33
  • 4

1 Answers1

1

The DLQ is not currently supported with batch listeners; please open an issue against spring-cloud-stream-binder-kafka on GitHub.

Bear in mind, though, that the framework doesn't know which record in thee batch failed so all the records in the batch will go there, even if some were successfully processed.

It's generally better to handle errors in the listener for batch mode.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Yeah. I've been wondering how batch would work for a mixture of failed and successful records. That was the next thing I wanted to look into. But anyway it makes sense to handle errors in the listener for batch mode. I'll open up an issue with spring-cloud-stream-binder-kafka. Thanks! – afzal Apr 02 '20 at 22:17
  • 1
    @afzal, could you please provide a link to this issue, if any ? It seems to be I'm facing the same error - batch consumer & dlq. Thanks. – daoway Jul 06 '21 at 17:54
  • @daoway I didn't really create an issue for this. I ended up manually doing the retries and manually put the failed messages in the dlq. – afzal Jul 08 '21 at 04:42
  • 1
    I opened the issue today : https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1206 – Sébastien Nussbaumer Mar 18 '22 at 15:05
  • @SébastienNussbaumer That one got closed and replaced with this one: https://github.com/spring-cloud/spring-cloud-stream/issues/2317 – Emmanuel Apr 13 '22 at 15:01
  • This feature is now available in Spring Cloud Stream. See this for details - https://github.com/spring-cloud/spring-cloud-stream/issues/2317#issuecomment-1428716481 – sobychacko Feb 13 '23 at 21:31