我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

这将提高用户留存率。女士配置。然后您可以使用上面的alter命令将其更改为1秒(稍后恢复为默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

其他回答

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }

offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了

使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。

./kafka-path/bin/kafka-console-consumer.sh——zookeeper localhost:2181——topic topicName——from-beginning——group application-group .sh

来自kafka 1.1

清除一个主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

等待至少1分钟,以确保卡夫卡清除主题 删除配置,然后转到默认值

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000