I am really new to kafka streams world and have just beginners level knowledge in this area. In our current application we receive json messages from Kafka topic in below format:
Message1
{
"projectId": "Project1",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "27",
"category": "FA"
}
]
}
Message2
{
"projectId": "Project1",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "28",
"category": "FA"
}
]
}
Message3
{
"projectId": "Project2",
"userId": "User1",
"actions": [
{
"APP_MESSAGE_ID": "29",
"category": "PD"
}
]
}
Now we want to push it to destination topic in this format:
Desired destination Output message
[
{
"projectId": "Project1",
"data": [
{
"APP_MESSAGE_ID": "27",
"category": "FA"
},
{
"APP_MESSAGE_ID": "28",
"category": "FA"
}
]
},
{
"projectId": "Project2",
"data": [
{
"APP_MESSAGE_ID": "29",
"category": "PD"
}
]
}
]
I have managed to write for my input topic as below
KStreamBuilder builder = new KStreamBuilder();
Pattern pattern = Pattern.compile("Input-Message.*");
KStream<String, Object> sourceStream = builder.stream(pattern);
KTable aggregatedTable =
sourceStream
.filter((k, v) -> v != null)
.mapValues(v -> getAMModelFromMessage(v))
.groupBy((k, v) -> KeyValue.pair(v != null ? v.getProjectId() : null, v))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
.aggregate(ArrayList::new, (k, v, list) -> {
list.add(v);
return list;
}
, Materialized.as("NewStore").withValueSerde(new ArrayListSerde(Serdes.String()))
.withKeySerde(Serdes.String()));
KStream<Object, ArrayList<AMKafkaMessage>> outputStream = aggregatedTable.toStream();
outputStream.to("destination_topic");
I have implemented(used) ArrayListSerde, ArrayListSerializer and ArrayListDeserializer classes as per this question: Issue with ArrayList Serde in Kafka Streams API
AMKafkaMessage is my model class as below
import lombok.Data;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
@Data
@JsonIgnoreProperties
public class AMKafkaMessage {
private String projectId;
private String userId;
}
and written below method to get its instance via parsing it from json
private AMKafkaMessage getAMModelFromMessage(Object message) {
try {
Gson gson = new Gson();
AMKafkaMessage amKafkaMessage = gson.fromJson(message,
AMKafkaMessage.class);
return amKafkaMessage;
} catch (Exception e) {
logger.error("Exception occurred while decrypting message: {}", e.toString());
}
return null;
}
And want to write to topic named "destination_topic"
"destination_topic"
Any help to achieve this would be really appreciated. Also please let me know if I am doing (or thinking) it wrong way about how streams should be utilised. Thanks in advance.
Above code is giving me exceptions like
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic kafka-stream-NewStore-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.KeyValue / value type: com.admin.model.kafkaModel.AMKafkaMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:152) ~[kafka-streams-2.6.2.jar:?]
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:129) ~[kafka-streams-2.6.2.jar:?]