我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
另一种手动清除主题的方法是:
在经纪人中:
停止kafka broker Sudo服务kafka停止 删除所有分区日志文件(应该在所有代理上执行) sudo rm - r /kafka-storage/kafka-logs/<some_topic_name>-* .
动物园管理员:
运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkCli删除主题元数据 rmr /代理/主题/ < some_topic_name >
在经纪人那里:
重新启动代理服务 Sudo服务kafka启动
其他回答
Thomas的建议很好,但不幸的是zkCli在旧版本的Zookeeper(例如3.3.6)中似乎不支持rmr。例如,比较现代Zookeeper 3.3版本中的命令行实现。
如果你面对的是旧版本的Zookeeper,一个解决方案是使用客户端库,比如zc。zk代表Python。对于不熟悉Python的人,您需要使用pip或easy_install安装它。然后启动一个Python shell (Python),你可以做:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
甚至
zk.delete_recursive('brokers')
如果你想从卡夫卡中删除所有的主题。
使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。
./kafka-path/bin/kafka-console-consumer.sh——zookeeper localhost:2181——topic topicName——from-beginning——group application-group .sh
kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。
首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true
然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic
然后重新创建它。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)
## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
正确的命令应该是
docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
希望能节省一些人的时间。
另外,请注意消息不会立即删除,而是在关闭日志段时删除。
要清除队列,可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创造它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test