我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
您必须在配置中启用此功能
echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
清除主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
创建主题
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
阅读主题
# /opt/kafka/bin/kafka-console-consumer.sh localhost:9092 --topic flows --from-beginning
其他回答
根据@steven appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我有用。
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms
最简单的方法是将各个日志文件的日期设置为比保留期更早的日期。然后经纪人会在几秒钟内为你清理并移除它们。这有几个优点:
不需要关闭代理,这是一个运行时操作。 避免出现无效偏移异常的可能性(下文将详细介绍)。
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
还可以通过清除zookeeper中针对该主题的消费者偏移量来缓解这一问题。但如果你不需要一个处女主题,只是想删除现有的内容,那么简单地“触摸”一些主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。
这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)
## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
正确的命令应该是
docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
希望能节省一些人的时间。
另外,请注意消息不会立即删除,而是在关闭日志段时删除。
如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。
docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>
成功的回应:
Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Thomas的建议很好,但不幸的是zkCli在旧版本的Zookeeper(例如3.3.6)中似乎不支持rmr。例如,比较现代Zookeeper 3.3版本中的命令行实现。
如果你面对的是旧版本的Zookeeper,一个解决方案是使用客户端库,比如zc。zk代表Python。对于不熟悉Python的人,您需要使用pip或easy_install安装它。然后启动一个Python shell (Python),你可以做:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
甚至
zk.delete_recursive('brokers')
如果你想从卡夫卡中删除所有的主题。