我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:
警告:——zookeeper已弃用,并将在Kafka的未来版本中删除
使用——bootstrap-server代替;例如
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
and
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
其他回答
更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。
是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。
kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。
首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true
然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic
然后重新创建它。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。
bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]
只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:
Topic: Topic “[主题名]”被标记为删除。
下面是删除名为MyTopic的主题的步骤:
描述主题,并记下代理id 为列出的每个代理ID停止Apache Kafka守护进程。 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作 删除主题元数据:zkCli.sh,然后删除rmr /brokers/MyTopic 为每台停止的机器启动Apache Kafka守护进程
如果你错过了第3步,Apache Kafka将继续报告当前的主题(例如当你运行Kafka -list-topic.sh时)。
使用Apache Kafka 0.8.0测试。
有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。
我遵循这些步骤,特别是在使用TLS时。
1:使用kafka工具运行:
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2:运行:
kafka-控制台-消费者——消费者-财产安全。SSL .truststore.location=/etc/schema-registry/secrets/trust。JKS——消费者-属性ssl.truststore。Password = Password——consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity。JKS——消费者-属性ssl.keystore。密码=密码——consumer-property ssl.key。Password = Password——bootstrap-server broker01.kafka.com:9092——topic <topic-name>——new-consumer——from-beginning
3:当主题为空时,将主题保留设置回初始设置。
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
希望这能帮助到一些人,因为它不容易宣传。