我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000

其他回答

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创造它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。

我遵循这些步骤,特别是在使用TLS时。

1:使用kafka工具运行:

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2:运行:

kafka-控制台-消费者——消费者-财产安全。SSL .truststore.location=/etc/schema-registry/secrets/trust。JKS——消费者-属性ssl.truststore。Password = Password——consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity。JKS——消费者-属性ssl.keystore。密码=密码——consumer-property ssl.key。Password = Password——bootstrap-server broker01.kafka.com:9092——topic <topic-name>——new-consumer——from-beginning

3:当主题为空时,将主题保留设置回初始设置。

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这能帮助到一些人,因为它不容易宣传。

另一种手动清除主题的方法是:

在经纪人中:

停止kafka broker Sudo服务kafka停止 删除所有分区日志文件(应该在所有代理上执行) sudo rm - r /kafka-storage/kafka-logs/<some_topic_name>-* .

动物园管理员:

运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkCli删除主题元数据 rmr /代理/主题/ < some_topic_name >

在经纪人那里:

重新启动代理服务 Sudo服务kafka启动

你是否考虑过让你的应用只使用一个新的重命名主题?(例如,一个主题的名称与原始主题相同,但在结尾追加了“1”)。

这也会给你的应用一个新鲜干净的主题。

我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:

Topic: Topic “[主题名]”被标记为删除。