我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

根据@steven appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我有用。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

其他回答

来自kafka 1.1

清除一个主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

等待至少1分钟,以确保卡夫卡清除主题 删除配置,然后转到默认值

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

你是否考虑过让你的应用只使用一个新的重命名主题?(例如,一个主题的名称与原始主题相同,但在结尾追加了“1”)。

这也会给你的应用一个新鲜干净的主题。

这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令应该是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望能节省一些人的时间。

另外,请注意消息不会立即删除,而是在关闭日志段时删除。