0

I need a Serde for an ArrayList, and searching the web for ArrayListSerde finds references to such a thing, but I can't find it in either the documentation or the library of the version of Kafka Streams that I'm using. Where can I find it please?

Tim Ward
  • 45
  • 1
  • 4
  • 1
    Perhaps this https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api can help. – Val Bonn Aug 01 '19 at 11:22
  • I saw that, as a roll-your-own example, which I thought the post said didn't actually work. But I've seen other references to such a thing as if it were a standard class that you expected to exist ... and I can't find it. – Tim Ward Aug 01 '19 at 11:29
  • I did not find any standard implementation of that. But, if you apply the fix suggested by Matthias J. Sax's answer (fix the constructor in the Serde), the code should work fine. – Val Bonn Aug 01 '19 at 11:34
  • Yes, I'm working on a variant of that, ta. – Tim Ward Aug 01 '19 at 11:42
  • A standard implementation is WIP and should be included in 2.4 release: https://issues.apache.org/jira/browse/KAFKA-8326 – Matthias J. Sax Aug 01 '19 at 21:39

2 Answers2

1

There is no official implementation for ArrayListSerde provided by KStream library. You need to implement custom Serde with Serializer and Deserializer interfaces.

https://kafka.apache.org/20/documentation/streams/developer-guide/datatypes.html#implementing-custom-serdes

Also referred in the below post:

Issue with ArrayList Serde in Kafka Streams API

Nishu Tayal
  • 20,106
  • 8
  • 49
  • 101
0

I'm trying something like this. It looks like it's doing sensible things, although I'm not yet convinced I'm doing the right thing with a null parameter to serialize (and don't yet know why it's getting called with null).

public class ArrayListSerde<T> implements Serde<ArrayList<T>> {

    private final Serializer  <T> innerSerialiser;
    private final Deserializer<T> innerDeserialiser;

    public ArrayListSerde(Serde<T> inner) {
        innerSerialiser   = inner.serializer ();
        innerDeserialiser = inner.deserializer();
    }

    @Override
    public Serializer<ArrayList<T>> serializer() {
        return new Serializer<ArrayList<T>>() {
            @Override
            public byte[] serialize(String topic, ArrayList<T> data) {
                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                if (data != null ) {
                    final int size = data.size();
                    final DataOutputStream dos = new DataOutputStream(baos);
                    final Iterator<T> iterator = data.iterator();
                    try {
                        dos.writeInt(size);
                        while (iterator.hasNext()) {
                            final byte[] bytes = innerSerialiser.serialize(topic, iterator.next());
                            dos.writeInt(bytes.length);
                            dos.write(bytes);
                        }
                    } catch (IOException e) {
                        throw new RuntimeException("Unable to serialize ArrayList", e);
                    }
                }
                return baos.toByteArray();
            }
        };
    }

    @Override
    public Deserializer<ArrayList<T>> deserializer() {
        return new Deserializer<ArrayList<T>>() {
            @Override
            public ArrayList<T> deserialize(String topic, byte[] data) {
                if (data == null || data.length == 0) {
                    return null;
                }

                final ArrayList<T> arrayList = new ArrayList<>();
                final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));

                try {
                    final int records = dataInputStream.readInt();
                    for (int i = 0; i < records; i++) {
                        final byte[] valueBytes = new byte[dataInputStream.readInt()];
                        dataInputStream.read(valueBytes);
                        arrayList.add(innerDeserialiser.deserialize(topic, valueBytes));
                    }
                } catch (IOException e) {
                    throw new RuntimeException("Unable to deserialize ArrayList", e);
                }

                return arrayList;
            }
        };
    }
}
Tim Ward
  • 45
  • 1
  • 4
  • Hey Tim, Im facing the same issue. Even with adding no arg constructor, only no arg constructor is getting called and no object is getting initialized and im getting npe. when will ArrayListSerde(Serde inner) be called? – MrTambourineMan Dec 08 '19 at 21:39
  • Got it..it was the issue with my StreamConfig value serde – MrTambourineMan Dec 09 '19 at 04:30