10

Is it possible to create kafka topic in sarama? I know java API enables you do create topic but I couldn't find any information on how to do that in sarama. if it's possible, an example or explanation on which api I should use would be great thanks in advance

Mohsen Shakiba
  • 1,762
  • 3
  • 22
  • 41

3 Answers3

12

Indeed, in newer versions of Sarama you can use ClusterAdmin to create topics. Below you can find the sample code:

package main

import (
    "github.com/Shopify/sarama" // Sarama 1.22.0
    "log"
)

func main() {
    brokerAddrs := []string{"localhost:9092"}
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
    if err != nil {
        log.Fatal("Error while creating cluster admin: ", err.Error())
    }
    defer func() { _ = admin.Close() }()
    err = admin.CreateTopic("topic.test.1", &sarama.TopicDetail{
        NumPartitions:     1,
        ReplicationFactor: 1,
    }, false)
    if err != nil {
        log.Fatal("Error while creating topic: ", err.Error())
    }
}
Daniel Pacak
  • 1,388
  • 2
  • 13
  • 12
7

EDIT : Below was an old answer which still works, but at that point the sarama admin apis were under development. Since then ClusterAdmin apis have come a long way and today should be treated as a preferred way to solve this problem. Refer to the other 2 answers below if you are looking to solve this in 2020+.


It is possible to use sarama for managing Topics in Kafka. I am writing a terraform provider for managing Kafka topics and use sarama to do heavy lifting in the backend.

You need to use the sarama.Broker apis to do this. For example

// Set broker configuration
broker := sarama.NewBroker("localhost:9092")

// Additional configurations. Check sarama doc for more info
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

// Open broker connection with configs defined above
broker.Open(config)

// check if the connection was OK
connected, err := broker.Connected()
if err != nil {
    log.Print(err.Error())
}
log.Print(connected)

// Setup the Topic details in CreateTopicRequest struct
topic := "blah25s"
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(1)
topicDetail.ReplicationFactor = int16(1)
topicDetail.ConfigEntries = make(map[string]*string)

topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topic] = topicDetail

request := sarama.CreateTopicsRequest{
    Timeout:      time.Second * 15,
    TopicDetails: topicDetails,
}

// Send request to Broker
response, err := broker.CreateTopics(&request)

// handle errors if any
if err != nil {
    log.Printf("%#v", &err)
}
t := response.TopicErrors
for key, val := range t {
    log.Printf("Key is %s", key)
    log.Printf("Value is %#v", val.Err.Error())
    log.Printf("Value3 is %#v", val.ErrMsg)
}
log.Printf("the response is %#v", response)

// close connection to broker
broker.Close()

You can have a look at a working code at github. Remember to start kafka broker and import all golang dependency before running the code.

Prakhar
  • 1,065
  • 3
  • 16
  • 30
4

It is better to directly use : https://github.com/Shopify/sarama/blob/master/admin.go for this instead of directly connecting to a broker.

This handles lot of cases like:

  1. You can add multiple broker addresses for a cluster config.
  2. Identification of which broker acts as the controller is done automatically.
Nitin K
  • 117
  • 8