1

I have a Kafka Streams application written in Kotlin. In its current form, the Kafka StateStore in the application is designed to store key-value entries of type <String, CustomAVROSchema>. The StateStore, being a PersistentTimestampedKeyValueStore, is set up to use a SpecificAvroSerde for the value part of the key-value entries, which works like a charm.

The problem with this approach is that I want to keep the header and timestamp values of the incoming record, but the current solution only stores the value. I've done some research, but I cannot find any decent examples of how I can keep the header and timestamp fields.

So, how can I keep the header and timestamp fields through a StateStore operation?

My main approach to this problem what to rewrite the application to store the record itself instead of only the value, meaning that each entry in the StateStore would be of type <String, Record<String, CustomAVROSchema>> ( or <String, ConsumerRecord<String, CustomAVROSchema>>, I don't fully know the difference).

These application changes worked out ok, except when I needed to change the Serde options for the StateStore. I could not find any good replacement for the SpecificAvroSerde used earlier.

As the concept of Records is so integrated into Kafka, I was surprised when came short of finding examples of how such a SerDe configuration could look like. Does Kafka not support this feature out of the box? When reading the documentation, I learned that it's possible to create a custom serialization and deserialization class, but I had problems implementing this. Is this approach the only way to resolve my problem?

All your help would be appreciated

HaavardG
  • 211
  • 1
  • 8
  • See comments on https://forum.confluent.io/t/message-headers-and-state-stores/7458/2 – Lucas Brutschy Mar 21 '23 at 12:54
  • I dont think Serde is what you need to be looking at. That is only for key/values... The ProcessorContext should be able to get record headers, and maybe timestamp - https://stackoverflow.com/questions/61270063/kafka-streams-how-to-get-the-kafka-headers – OneCricketeer Mar 22 '23 at 00:24
  • @OneCricketeer I am already using a ProcessorContext. One part of the topology uses `.process()` to call on a class that implements the `ContextualProcessor` interface. This class uses the `process` method to store records in the statestore, and the `schedule` method to retrive records every `n` seconds. But I am having difficulties storing `>` in the statestore. The `ContextualProcessor` gives me access to the ProcessorContext you are mentioning. – HaavardG Mar 22 '23 at 06:25
  • @LucasBrutschy if I understand your answer correctly, you want me to create a new Avro schema (or something similar through another technology), and use this as a serde? Because this is something I'd rather avoid. I do not want to create any more schemas. I tried creating a `data class` containing the relevant information, but that did not work. – HaavardG Mar 22 '23 at 06:29
  • As mentioned, Statestore only keeps keys/values. You'll need to create a new serialized object format to preserve extra data (which you can at least get headers from the example code I linked to). Then you will pass a new serde type to `Materialized` class when you create the store – OneCricketeer Mar 22 '23 at 13:50
  • If you want to go schemaless, you can consider using JSON. See this example how to do it conveniently with the jackson JSON library: https://github.com/apache/kafka/blob/3.3/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java#L83 Obviously the serialization will be less efficient than avro or protobuf – Lucas Brutschy Mar 22 '23 at 16:31
  • @OneCricketeer Yes, I believe I indirectly mentioned that Statestores only store key-value pairs. Which is why I wanted to update key value part to support a Record. I do not understand the `Materialized` part, but I believe I came up with a solution that works fairly descent – HaavardG Mar 28 '23 at 10:40
  • @LucasBrutschy I noticed your answer a bit late, but I see that my solution draws a lot of similarities from the link you provided, thanks! – HaavardG Mar 28 '23 at 10:42

1 Answers1

0

After going back and forth on my problem, I decided on trying to make my own SerDe.

I made a StatestoreSerializerclass that implements Kafka Serializer.

override fun serialize(topic: String, data: Record<String, CustomAVROSchema>): ByteArray? {
    return try {
        val serializedAvro = ...serializer().serialize(data.key(), data.value()).asList()
        val byteHeaders = headersToString(data.headers())
        objectMapper.writeValueAsBytes(StateStoreObject(...))
    } catch (e: Exception) {
        throw SerializationException(
            "Error when serializing record", e
        )
    }
}

private fun headersToString(headers: Headers): String {
    val map = headers
        .associate { it.key() to it.value() }
        .mapValues { Base64.getEncoder().encodeToString(it.value) }
    return map.toString()
}

Things to note includes the following:

  • Using ObjectMapper().registerKotlinModule() to enable the use of Kotlin data classes.
  • Encoding the avro object and the record headers independently, placing them into a data class, and encoding the data class.

The deserializer essentially does the same ting, only in reverse.

Although un-optimized, the custom SerDe configuration did not add any significant delay compared to only using avro SerDe.

HaavardG
  • 211
  • 1
  • 8