We are noticing Streams Apps threads fail transactions during rolling restarts of our Kafka Brokers. The transaction failure causes stream thread fencing which in turn causes a restart of the thread and re-balancing. The re-balancing causes some delay in processing. Our goal is to make broker restarts as smooth as possible and prevent processing delays as much as possible.
For our rolling Broker restarts we use the controlled.shutdown=true
configuration, and before each restart we wait for all partitions to be in-sync across all replicas.
For our Streams Apps we have properly configured group.instance.id
and an appropriate session.timeout.ms
so that rolling restarts of the streams apps themselves are smooth and without re-balances.
From the Kafka Streams app logs I have identified a sequence of events leading up to the fencing:
- Broker starts shutting down
- App logs error producing to topic due to
NOT_LEADER_OR_FOLLOWER
- App heartbeats failing because coordinator is restarting broker
- App discovers new group coordinator (this bounces a a bit between the restarting broker and live brokers)
- App stabilizes
- Broker starting up again
- App fails to do fetch request to starting broker due to
FETCH_SESSION_ID_NOT_FOUND
- App discovers starting broker as transaction coordinator
- App transaction fails due to one of two reasons:
InvalidProducerEpochException: Producer attempted to produce with an old epoch.
ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one
- Stream threads end up in fatal error state, get fenced and restarted which causes a rebalance.
What could be causing the two exceptions that cause stream thread transactions to fail? My intuition is that the broker starting up is assigned as transaction coordinator before it has synced its transaction states with the in-sync brokers. This could explain old epochs or different transactional ids to be known by that broker.
How can we further identify what is going wrong here and how it can be improved?