0

I am really new to kafka streams world and have just beginners level knowledge in this area. In our current application we receive json messages from Kafka topic in below format:

Message1

{
  "projectId": "Project1",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "27",
      "category": "FA"
    }
  ]
}

Message2

{
  "projectId": "Project1",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "28",
      "category": "FA"
    }
  ]
}

Message3

{
  "projectId": "Project2",
  "userId": "User1",
  "actions": [
    {
      "APP_MESSAGE_ID": "29",
      "category": "PD"
    }
  ]
}

Now we want to push it to destination topic in this format:

Desired destination Output message

[
  {
    "projectId": "Project1",
    "data": [
      {
        "APP_MESSAGE_ID": "27",
        "category": "FA"
      },
      {
        "APP_MESSAGE_ID": "28",
        "category": "FA"
      }
    ]
  },
  {
    "projectId": "Project2",
    "data": [
      {
        "APP_MESSAGE_ID": "29",
        "category": "PD"
      }
    ]
  }
]

I have managed to write for my input topic as below

KStreamBuilder builder = new KStreamBuilder();
Pattern pattern = Pattern.compile("Input-Message.*");
KStream<String, Object> sourceStream = builder.stream(pattern);
KTable aggregatedTable =
            sourceStream
                .filter((k, v) -> v != null)
                .mapValues(v -> getAMModelFromMessage(v))
                .groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
                .aggregate(ArrayList::new, (k, v, list) -> {
                        list.add(v);
                        return list;
                    }
                    , Materialized.as("NewStore").withValueSerde(new ArrayListSerde(Serdes.String()))
                        .withKeySerde(Serdes.String()));

KStream<Object, ArrayList<AMKafkaMessage>> outputStream = aggregatedTable.toStream();

        

outputStream.to("destination_topic");

I have implemented(used) ArrayListSerde, ArrayListSerializer and ArrayListDeserializer classes as per this question: Issue with ArrayList Serde in Kafka Streams API

AMKafkaMessage is my model class as below

import lombok.Data;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;

@Data
@JsonIgnoreProperties
public class AMKafkaMessage {

    private String projectId;
    private String userId;
}

and written below method to get its instance via parsing it from json

private AMKafkaMessage getAMModelFromMessage(Object message) {
        try {
            Gson gson = new Gson();
            
            AMKafkaMessage amKafkaMessage = gson.fromJson(message,
                AMKafkaMessage.class);
            return amKafkaMessage;
        } catch (Exception e) {
            logger.error("Exception occurred while decrypting message: {}", e.toString());
        }
        return null;
    }

And want to write to topic named "destination_topic"

"destination_topic"

Any help to achieve this would be really appreciated. Also please let me know if I am doing (or thinking) it wrong way about how streams should be utilised. Thanks in advance.

Above code is giving me exceptions like

   org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic kafka-stream-NewStore-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.KeyValue / value type: com.admin.model.kafkaModel.AMKafkaMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:152) ~[kafka-streams-2.6.2.jar:?]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129) ~[kafka-streams-2.6.2.jar:?]

Failing here enter image description here

1 Answers1

0

The error message means, that the specified Serdes do not match the actual data type. You did specify StringSerde / StringSerde for key and value (seems those come from StreamsConfig), but the actually data types are KeyValue / AMKafkaMessage.

Given your program this makes sense.

.mapValues(v -> getAMModelFromMessage(v))

This transform the value into AMKafkaMessage. When you do the groupBy you trigger a repartitioning, so you will need to set a corresponding serde for the value using Grouped parameter (otherwise, Kafka Streams falls back to the serdes from the config that seems to be set to StringSerde).

.groupBy(..., Grouped.valueSerde(/* set Serded for AMKafkaMessage type */)

Furthermore, you do

.groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))

This returns a KeyValue pair for the key. Note, that groupBy does not change the input record value, but the specified KeyValueMapper is supposed to only return the new key. Ie, for input <k1,v1> the result key-value-pair for your code would be <<p1,v>, v> (assuming p1 is extracted from the value).

I assume, you actually want to do:

.groupBy((k, v) -> v != null ? v.getProjectId() : null)

to set the projectId as new key, and get <p1,v> as result?

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • Thanks for the suggestion, I managed to get it working with casting list of object to String and ended up using String serde for communication. – Vishal Vyavahare Jun 09 '21 at 13:29