I have a consumer and many times it takes so long for finishing the process to respond to the queue. It is because the consumer manages big files. So, the execution time is longer than the consumer_timeout
of the queue. The situation makes the queue keep the message forever. We solved it by restarting the RabbitMQ service, but I want to know, how can I avoid this?
This is my consumer:
public void run() {
try {
Channel channel = connection.createChannel();
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
String inputMsgBody = null;
try {
inputMsgBody = new String(body, "UTF-8");
if (QWorkerUtil.checkAndUpdate(envelope, properties)) {
response = doJob(inputMsgBody);
} else {
QWorkerUtil.CallFailureApi(inputMsgBody);
}
} catch (Throwable t) {
QWorkerUtil.CallFailureApi(inputMsgBody);
logger.error(
"Exception or error occurred :"
+ inputMsgBody);
logger.error("Exception", t);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
}
}
};
channel.basicConsume(queueName, false, consumer);
} catch (IOException e) {
logger.error("Exception", e);
}
}