0

I'm new to Golang and Kafa so this might seem like a silly question.

After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

It prints a message right before consumer.Messages() and print another message for each message received. The ~20 sec delay is between the first fmt.Println and second fmt.Println.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // Create the consumer and listen for new messages
    consumer := createConsumer()

    // Create a signal channel to know when we are done
    done := make(chan bool)

    // Start processing messages
    go func() { 
        fmt.Println("Start consuming Kafka messages")
        for msg := range consumer.Messages() {
            s := string(msg.Value[:])
            fmt.Println("Msg: ", s)
        }
    }()

    <-done

}

func createConsumer() *cluster.Consumer {
    // Define our configuration to the cluster
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = false
    config.Group.Return.Notifications = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // Create the consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"orders"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        log.Fatal("Unable to connect consumer to Kafka")
    }
    go handleErrors(consumer)
    go handleNotifications(consumer)
    return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    hostname: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

broker-1:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    hostname: broker-1
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_BROKER_RACK: rack-a
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_DELETE_TOPIC_ENABLE: "true"
    KAFKA_JMX_PORT: 9999
    KAFKA_JMX_HOSTNAME: 'broker-1'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
    CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    KAFKA_CREATE_TOPICS: "orders:1:1"
Nyxynyx
  • 61,411
  • 155
  • 482
  • 830
  • How many messages are in your topic? Kafka may take a while to seek to the beginning if there's a lot of data to go through. – Peter Dec 07 '18 at 14:48
  • @Peter Under 100 messages in that topic. There are a total of 2 topics, with the second topic having under 10 messages. Kafka and Zookeeper are running in Docker containers on a 2018 i7 Macbook Pro with 16 GB of memory – Nyxynyx Dec 07 '18 at 14:49
  • @Peter Performed `docker-compose down` and `docker-compose up` to clear all the Kafka messages, and the delay is still just under 20 seconds. – Nyxynyx Dec 07 '18 at 14:55
  • I have used similar configuration with yours in my application. And from what I have seen, whenever I started my application and try to connect to Kafka, it will needs some time for the GroupCoordinator of Kafka to rebalance/re-stabilize the group consumer. In my case, that's the reason of the delay and it's the way Kafka works...I think there is nothing wrong with the code. Hope this help :) – thanhpham Dec 07 '18 at 16:08

1 Answers1

0

After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

There can not be that much delay because consumer used message channel which receive messages from kafka. As soon as the message is available in kafka queue it will be sent to message channel which consumer can receive.

Coming to you code implementation :-

for msg := range consumer.Messages() {
    s := string(msg.Value[:])
    fmt.Println("Msg: ", s)
}

consumer.Messages() returns a channel and for loops over the channel which returns a message whenever it is available inside channel.

Refer to this question How to create a kafka consumer group in Golang? to connect using sarama. you don't need sarama-cluster for connection.

Himanshu
  • 12,071
  • 7
  • 46
  • 61