I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this?
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for ( KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext())
{
try{
String mesg = new String(it.next().message());
System.out.println( mesg);
EDIT:
I don't think this is a duplicate as it is specific to java and the actual message format. I can consume from the queue but cannot view the message contents. Need guidance on how to convert this to a string or readable value?
39: