I need a Serde for an ArrayList, and searching the web for ArrayListSerde finds references to such a thing, but I can't find it in either the documentation or the library of the version of Kafka Streams that I'm using. Where can I find it please?
Asked
Active
Viewed 938 times
0
-
1Perhaps this https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api can help. – Val Bonn Aug 01 '19 at 11:22
-
I saw that, as a roll-your-own example, which I thought the post said didn't actually work. But I've seen other references to such a thing as if it were a standard class that you expected to exist ... and I can't find it. – Tim Ward Aug 01 '19 at 11:29
-
I did not find any standard implementation of that. But, if you apply the fix suggested by Matthias J. Sax's answer (fix the constructor in the Serde), the code should work fine. – Val Bonn Aug 01 '19 at 11:34
-
Yes, I'm working on a variant of that, ta. – Tim Ward Aug 01 '19 at 11:42
-
A standard implementation is WIP and should be included in 2.4 release: https://issues.apache.org/jira/browse/KAFKA-8326 – Matthias J. Sax Aug 01 '19 at 21:39
2 Answers
1
There is no official implementation for ArrayListSerde provided by KStream library. You need to implement custom Serde with Serializer and Deserializer interfaces.
Also referred in the below post:

Nishu Tayal
- 20,106
- 8
- 49
- 101
0
I'm trying something like this. It looks like it's doing sensible things, although I'm not yet convinced I'm doing the right thing with a null
parameter to serialize
(and don't yet know why it's getting called with null
).
public class ArrayListSerde<T> implements Serde<ArrayList<T>> {
private final Serializer <T> innerSerialiser;
private final Deserializer<T> innerDeserialiser;
public ArrayListSerde(Serde<T> inner) {
innerSerialiser = inner.serializer ();
innerDeserialiser = inner.deserializer();
}
@Override
public Serializer<ArrayList<T>> serializer() {
return new Serializer<ArrayList<T>>() {
@Override
public byte[] serialize(String topic, ArrayList<T> data) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
if (data != null ) {
final int size = data.size();
final DataOutputStream dos = new DataOutputStream(baos);
final Iterator<T> iterator = data.iterator();
try {
dos.writeInt(size);
while (iterator.hasNext()) {
final byte[] bytes = innerSerialiser.serialize(topic, iterator.next());
dos.writeInt(bytes.length);
dos.write(bytes);
}
} catch (IOException e) {
throw new RuntimeException("Unable to serialize ArrayList", e);
}
}
return baos.toByteArray();
}
};
}
@Override
public Deserializer<ArrayList<T>> deserializer() {
return new Deserializer<ArrayList<T>>() {
@Override
public ArrayList<T> deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
final ArrayList<T> arrayList = new ArrayList<>();
final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));
try {
final int records = dataInputStream.readInt();
for (int i = 0; i < records; i++) {
final byte[] valueBytes = new byte[dataInputStream.readInt()];
dataInputStream.read(valueBytes);
arrayList.add(innerDeserialiser.deserialize(topic, valueBytes));
}
} catch (IOException e) {
throw new RuntimeException("Unable to deserialize ArrayList", e);
}
return arrayList;
}
};
}
}

Tim Ward
- 45
- 1
- 4
-
Hey Tim, Im facing the same issue. Even with adding no arg constructor, only no arg constructor is getting called and no object is getting initialized and im getting npe. when will ArrayListSerde(Serde
inner) be called? – MrTambourineMan Dec 08 '19 at 21:39 -