0

We are noticing Streams Apps threads fail transactions during rolling restarts of our Kafka Brokers. The transaction failure causes stream thread fencing which in turn causes a restart of the thread and re-balancing. The re-balancing causes some delay in processing. Our goal is to make broker restarts as smooth as possible and prevent processing delays as much as possible.

For our rolling Broker restarts we use the controlled.shutdown=true configuration, and before each restart we wait for all partitions to be in-sync across all replicas.

For our Streams Apps we have properly configured group.instance.id and an appropriate session.timeout.ms so that rolling restarts of the streams apps themselves are smooth and without re-balances.

From the Kafka Streams app logs I have identified a sequence of events leading up to the fencing:

  • Broker starts shutting down
  • App logs error producing to topic due to NOT_LEADER_OR_FOLLOWER
  • App heartbeats failing because coordinator is restarting broker
  • App discovers new group coordinator (this bounces a a bit between the restarting broker and live brokers)
  • App stabilizes
  • Broker starting up again
  • App fails to do fetch request to starting broker due to FETCH_SESSION_ID_NOT_FOUND
  • App discovers starting broker as transaction coordinator
  • App transaction fails due to one of two reasons:
    1. InvalidProducerEpochException: Producer attempted to produce with an old epoch.
    2. ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one
  • Stream threads end up in fatal error state, get fenced and restarted which causes a rebalance.

What could be causing the two exceptions that cause stream thread transactions to fail? My intuition is that the broker starting up is assigned as transaction coordinator before it has synced its transaction states with the in-sync brokers. This could explain old epochs or different transactional ids to be known by that broker.

How can we further identify what is going wrong here and how it can be improved?

  • Does this answer your question? [Handling exceptions in Kafka streams](https://stackoverflow.com/questions/51299528/handling-exceptions-in-kafka-streams) – vaibhav shanker Dec 13 '21 at 14:12
  • No that would answer how to handle the exceptions without restarting the streams thread. My question is how to prevent the exceptions in the first place. – Pieter Hameete Dec 13 '21 at 15:35

1 Answers1

0

you can set request.timeout.ms in kafka streams which will make stream API wait for a longer period of time. if kafka broker is not up in a given period of time then only it will throw an exception which can be handled by using ProductionExceptionHandler as described in Handling exceptions in Kafka streams

vaibhav shanker
  • 173
  • 2
  • 10
  • Hi Vaibhav, thanks for the answer. I think the problem is not a timeout. The broker does send a response. However according to the Broker the producer epoch is too old, or there is a newer transactionalId. – Pieter Hameete Dec 13 '21 at 16:06