0

I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this?

    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , groupId);
    consumerProps.put("zookeeper.connect", zooKeeper);


    consumerProps.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");

    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    ConsumerConnector consumer = 
    kafka.consumer.Consumer.createJavaConsumerConnector(
           consumerConfig);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
       consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    for ( KafkaStream stream : streams) {

         ConsumerIterator<byte[], byte[]> it = stream.iterator();

         //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");


       while (it.hasNext())
         {

             try{

                     String mesg = new String(it.next().message());
                     System.out.println( mesg);

EDIT:

I don't think this is a duplicate as it is specific to java and the actual message format. I can consume from the queue but cannot view the message contents. Need guidance on how to convert this to a string or readable value?

39:

enter image description here

vbNewbie
  • 3,291
  • 15
  • 71
  • 155
  • They are not encrypted – OneCricketeer Sep 10 '18 at 22:01
  • https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1327-L1345 – OneCricketeer Sep 10 '18 at 22:05
  • If you want other code/tools that checks lag, you can try Burrow https://github.com/linkedin/Burrow – OneCricketeer Sep 10 '18 at 22:06
  • so is the lag metrix not available in this topic? – vbNewbie Sep 11 '18 at 11:52
  • you say it's not encrypted, then why is the consumeriterator not displaying a text value after a string message stream is created? – vbNewbie Sep 11 '18 at 12:33
  • You *compute* lag from the topic (like Burrow does), it's not *stored* there. In other words, there's additional math involved... You, of course, need to know what offsets are being actively read by a consumer group, along with the latest offsets for a given topic of the group, not only those committed values stored in the offsets topic. And *encryption* is different from *encoding*. For example, it's not UTF8 encoded, so string deserializer won't work. I wrote sample code in the other post https://stackoverflow.com/a/52266219/2308683 – OneCricketeer Sep 11 '18 at 14:51
  • Thanks for responding. Yes I realized I need to compute lag but was told that one can read the consumer offsets and a max lag value for each group. I included the encoding to UTF-8 in the properties for the consumer and therefore surprised to see the output format. Thanks I will try and adapt your code and see if that helps. – vbNewbie Sep 11 '18 at 15:01

0 Answers0