Am trying to enable 'idempotent' option on kafka-console-producer. Referring to the following links:
- https://gerardnico.com/dit/kafka/producer#idempotent
- https://gerardnico.com/dit/kafka/kafka-console-producer
Command used:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true
Following exception is observed:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) at kafka.producer.NewShinyProducer.(BaseProducer.scala:40) at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50) at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) Caused by: org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence. at org.apache.kafka.clients.producer.KafkaProducer.configureAcks(KafkaProducer.java:510) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:375)
Though acks is already set to 'all', we observe this exception. What am I missing?
Following are the versions versions used:
- broker - 1.0.0
- client - The console producer bundled with broker 1.0.0
Update
I could enable idempotence on console producer using the --request-required-acks -1
option as suggested in reply.
However, I get ClusterAuthorizationException.
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true --request-required-acks -1 --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
This exception occurs only when idempotence option is enabled. It is possible to produce messages with out this option.
bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2
What am I missing?