4

We have multiple microservices written in GoLang exchanging messages over the Kafka message bus. A microservice writes on a Kafka topic with a partition count of 3 with a replica factor of 2. We use AWS MSK for kafka brooker. We are using the Shopify Kafka client to connect with brokers.

Here is my Producer code -

package kf

import (
    "fmt"
    "github.com/Shopify/sarama"
    "github.com/segmentio/kafka-go"
    "net"
    "strconv"
)

type Producer struct {
    flowEventProducer sarama.SyncProducer
    topic             string
}

func InitProducer(brokers []string, topic string) *Producer {
    CreateKafkaTopic(brokers[0], topic)
    p := &Producer{}
    prod, err := newFlowWriter(brokers)
    if err != nil {
        panic("failed to connect to producer")
    }
    p.flowEventProducer = prod
    p.topic = topic
    return p
}

func CreateKafkaTopic(kafkaURL, topic string) {
    conn, err := kafka.Dial("tcp", kafkaURL)
    if err != nil {
        panic(err.Error())
    }
    controller, err := conn.Controller()
    if err != nil {
        panic(err.Error())
    }

    var controllerConn *kafka.Conn
    controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        panic(err.Error())
    }

    defer controllerConn.Close()
    topicConfigs := []kafka.TopicConfig{
        {
            Topic:             topic,
            NumPartitions:     3,
            ReplicationFactor: 2,
        },
    }
    err = controllerConn.CreateTopics(topicConfigs...)
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()
}

func newFlowWriter(brokers []string) (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    version := "2.6.2"
    kafkaVer, err := sarama.ParseKafkaVersion(version)
    if err != nil {
        panic("failed to parse kafka version, producer will not run")
    }
    config.Producer.Partitioner = sarama.NewHashPartitioner
    config.Net.MaxOpenRequests = 10
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Return.Successes = true
    config.Version = kafkaVer
    producer, err := sarama.NewSyncProducer(brokers, config)

    return producer, err
}

func (p *Producer) WriteMessage(uuid string, data []byte) error {
    msg := &sarama.ProducerMessage{
        Topic: p.topic,
        Key:   sarama.ByteEncoder(uuid),
        Value: sarama.ByteEncoder(data),
    }

    part, off, err := p.flowEventProducer.SendMessage(msg)
    if err != nil {
        return err
    } else {
        fmt.Printf("message wriiten on part:%d and offset: %d", part, off)
    }
    return nil
}


Here is my consumer -

package kf

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/Shopify/sarama"
)

type Consumer struct {
    flowEventReader sarama.ConsumerGroup
    topic           string
    brokerUrls      []string
}

type data struct {
    Name     string `json:"name"`
    Employee string `json:"employee"`
}

func InitConsumer(brokers []string, topic string) *Consumer {
    c := &Consumer{}
    c.topic = topic
    c.brokerUrls = brokers
    var (
        err error
    )
    conf := createSaramaKafkaConf()
    c.flowEventReader, err = sarama.NewConsumerGroup(c.brokerUrls, "myconf", conf)
    if err != nil {
        panic("failed to create consumer group on kafka cluster")
    }

    return c
}

type KafkaConsumerGroupHandler struct {
    Cons *Consumer
}

func (c *Consumer) HandleMessages() {
    // Consume from kafka and process
    for {
        var err = c.flowEventReader.Consume(context.Background(), []string{c.topic}, &KafkaConsumerGroupHandler{Cons: c})
        if err != nil {
            fmt.Println("FAILED")
            continue
        }
    }

}
func (*KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (*KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (l *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        l.Cons.logMessage(msg)
        sess.MarkMessage(msg, "")
    }
    return nil
}

func (c *Consumer) logMessage(msg *sarama.ConsumerMessage) {
    d := &data{}
    err := json.Unmarshal(msg.Value, d)
    if err != nil {
        fmt.Println(err)
    }
    fmt.Printf("messages: key: %s and val:%+v", string(msg.Key), d)
}

func createSaramaKafkaConf() *sarama.Config {
    conf := sarama.NewConfig()
    version := "2.6.2"
    kafkaVer, err := sarama.ParseKafkaVersion(version)
    if err != nil {
        panic("failed to parse kafka version, executor will not run")
    }
    conf.Version = kafkaVer
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}

    return conf
}


If we put load in microservices and producer starts producing messages of order of 500 events with each having size of ~1kb. we are encountering a delay of 30 seconds in message delivery. We want instant message delivery post-production. I think Kafka is very much capable of for my use-case. Please, help me in figuring out the issue for this delay.

  • 1
    Kafka Producers don't send immediately. They batch requests until a buffer is filled, and only will flush after a certain time/size. Also, `SyncProducer` will block, so if you want speed, then you will want async requests and/or goroutines – OneCricketeer Dec 12 '22 at 21:42
  • Can you check for Lags in your topic? Is your consumer-group able to consume as fast as the producer is producing the messages? – hacker315 Dec 13 '22 at 10:32
  • @OneCricketeer Thanks for valuable input, I did tried by making producer and consumer async along with setting batch timeout of 10 ms. But I didn't have any luck. – Ashutosh Pandey Dec 14 '22 at 05:04
  • I haven't used either Sarama, or kafka-go, so not really sure, but do you really need both? Sarama can create topics, I thought. – OneCricketeer Dec 14 '22 at 16:15
  • @OneCricketeer Sure that can be done, but that won't be any hindrance in performance as It is just a one-time process when producer starts. – Ashutosh Pandey Dec 17 '22 at 10:46

0 Answers0