我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

根据@steven appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我有用。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

其他回答

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创造它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令应该是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望能节省一些人的时间。

另外,请注意消息不会立即删除,而是在关闭日志段时删除。

您必须在配置中启用此功能

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

清除主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

创建主题

# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

阅读主题

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }

offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.