I have a Kafka Streams application written in Kotlin. In its current form, the Kafka StateStore in the application is designed to store key-value entries of type <String, CustomAVROSchema>
. The StateStore, being a PersistentTimestampedKeyValueStore
, is set up to use a SpecificAvroSerde
for the value part of the key-value entries, which works like a charm.
The problem with this approach is that I want to keep the header and timestamp values of the incoming record, but the current solution only stores the value. I've done some research, but I cannot find any decent examples of how I can keep the header and timestamp fields.
So, how can I keep the header and timestamp fields through a StateStore operation?
My main approach to this problem what to rewrite the application to store the record itself instead of only the value, meaning that each entry in the StateStore would be of type <String, Record<String, CustomAVROSchema>>
( or <String, ConsumerRecord<String, CustomAVROSchema>>
, I don't fully know the difference).
These application changes worked out ok, except when I needed to change the Serde
options for the StateStore. I could not find any good replacement for the SpecificAvroSerde
used earlier.
As the concept of Records is so integrated into Kafka, I was surprised when came short of finding examples of how such a SerDe configuration could look like. Does Kafka not support this feature out of the box? When reading the documentation, I learned that it's possible to create a custom serialization and deserialization class, but I had problems implementing this. Is this approach the only way to resolve my problem?
All your help would be appreciated