2

Am trying to enable 'idempotent' option on kafka-console-producer. Referring to the following links:

Command used:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true

Following exception is observed:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) at kafka.producer.NewShinyProducer.(BaseProducer.scala:40) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence. at org.apache.kafka.clients.producer.KafkaProducer.configureAcks(KafkaProducer.java:510) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:375)

Though acks is already set to 'all', we observe this exception. What am I missing?

Following are the versions versions used:

  • broker - 1.0.0
  • client - The console producer bundled with broker 1.0.0

Update

I could enable idempotence on console producer using the --request-required-acks -1 option as suggested in reply.

However, I get ClusterAuthorizationException.

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true  --request-required-acks -1  --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

This exception occurs only when idempotence option is enabled. It is possible to produce messages with out this option.

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2

What am I missing?

Sarvesh
  • 519
  • 1
  • 8
  • 16

1 Answers1

3

You cannot set acks through producer-property for ConsoleProducer. Use request-required-acks instead, as shown below:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer-property enable.idempotence=true --request-required-acks -1

amethystic
  • 6,821
  • 23
  • 25