I configured a route in Spring Integration 5.4.4 that read from an AMQP queue and write to an http outbound adapter. I'm not able to control retries when, for example, I programmatically declare a wrong http hostname for the http outbound adapter (Cause java.net.UnknownHostException).
This seems to generate an infinite retry (message not acked on RabbitMQ container), even if I configured the RetryTemplate logic in the amqpInboundAdapter.
My goal should be: requeue the message for N times in case http outbound adapter is in error, discard the message otherwise and don't requeue again.
Code here:
Spring Integration route
public IntegrationFlow route(AmqpInboundChannelAdapterSMLCSpec amqpInboundChannelAdapterSMLCSpec) {
return IntegrationFlows
.from(amqpInboundChannelAdapterSMLCSpec)
.filter(validJsonFilter())
.enrichHeaders(h -> h.header("X-Insert-Key",utboundHttpConfig.outboundHttpToken))
.enrichHeaders(h -> h.header("Content-Encoding", "gzip"))
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundChannelAdapter(outboundHttpConfig.outboundHttpUrl) .mappedRequestHeaders("X-Insert-Key")
.httpMethod(HttpMethod.POST)
)
.get();
}
AmqpInboundChannelAdapterSMLCSpec
public AmqpInboundChannelAdapterSMLCSpec gatewayEventInboundAmqpAdapter(ConnectionFactory connectionFactory) {
RetryTemplate retryTemplate = new RetryTemplate();
exceptionClassifierRetryPolicy.setPolicyMap(exceptionPolicy);
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(1));
retryTemplate.setThrowLastExceptionOnExhausted(true);
return Amqp
.inboundAdapter(connectionFactory, rabbitConfig.inboundQueue())
.configureContainer(c -> c
.concurrentConsumers(3)
.maxConcurrentConsumers(5)
.receiveTimeout(2000)
.alwaysRequeueWithTxManagerRollback(false)
)
.retryTemplate(retryTemplate);
}
Any ideas?
Thanks a lot