0

I have a consumer and many times it takes so long for finishing the process to respond to the queue. It is because the consumer manages big files. So, the execution time is longer than the consumer_timeout of the queue. The situation makes the queue keep the message forever. We solved it by restarting the RabbitMQ service, but I want to know, how can I avoid this?

This is my consumer:

public void run() {
        try {
            Channel channel = connection.createChannel();

            channel.basicQos(1);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();

                    String response = "";
                    String inputMsgBody = null;
                    try {
                        inputMsgBody = new String(body, "UTF-8");
                        if (QWorkerUtil.checkAndUpdate(envelope, properties)) {
                            response = doJob(inputMsgBody);
                        } else {
                            QWorkerUtil.CallFailureApi(inputMsgBody);
                        }
                        
                    } catch (Throwable t) {
                        QWorkerUtil.CallFailureApi(inputMsgBody);
                        logger.error(
                                "Exception or error occurred :"
                                        + inputMsgBody);
                        logger.error("Exception", t);
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    }
                }
            };

            channel.basicConsume(queueName, false, consumer);

        } catch (IOException e) {
            logger.error("Exception", e);
        } 
    }
  • could you show some code? is it possible to share some [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example)? the article [how to ask](https://stackoverflow.com/help/how-to-ask) might help to improve your question – Gastón Schabas Aug 11 '23 at 16:42
  • get an average of the usual processing time and increase the `consumer_timeout` ? you might even be able to change the timeout dynamically for this use case only. (something similar on this thread: https://stackoverflow.com/questions/70957962/rabbitmq-consumer-timeout-behavior-not-working-as-expected) – Jorge Campos Aug 11 '23 at 16:52
  • @GastónSchabas It is my consumer – Eder Armando Anillo Lora Aug 11 '23 at 17:18
  • @JorgeCampos I can do that but, however the consumers work in pararel and a thread can wait for a resource even after consuming the message and maybe it will happen again. The time of process depends of the access of certain resources from Java(big pdf files in GB). We can't solve it from Java at this moment becuase it rquieres too much modifications. – Eder Armando Anillo Lora Aug 11 '23 at 17:36
  • What about making the code that process big files to run async? You check whether the message that came in is a one to process the big ones and if so you start a task to process it in a separate thread, this will free up the consumer and your file will be processed in the background. If you want to make it so multiple files do not run in parallel you can create a simple table and the job checks it... and the consumer job is just to save the resource somewhere for the job to grab it – Jorge Campos Aug 11 '23 at 20:05

1 Answers1

0

After investigating, I discovered the problem. When execution time is greater than consumer_timeout, it tries to start another consumer in a different thread and first one hasn't even finished. It repeats the process till it uses the maximum number of threads. So we have n-threads processing the same file.

This causes that the message go back to the queue and it never is acknowledged. RabbitMQ close the channel producing an error when you try to do basicAck or basicPublish. After that, the new messages that arrive are not processed, so it is necessary to restart the service.

I suggest the following options to implement from backend:

  • We can send the ack to the current queue when receive the message and use a new queue/notification for checking when your process finishes. With another thread check if the process is failed or successfull.
  • Save a notification in a memory object for cheking the status. We must avoid that another thread processes the file. If another thread tries to consume the message, we set the message as acknowledged.
  • Disable the consumer_timeout(Not recommended)