我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。

$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic

其他回答

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }

offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了

虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过kafka-configs来完成。

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

通过该方法设置的配置可以通过命令显示

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。

./kafka-path/bin/kafka-console-consumer.sh——zookeeper localhost:2181——topic topicName——from-beginning——group application-group .sh

user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:

警告:——zookeeper已弃用,并将在Kafka的未来版本中删除

使用——bootstrap-server代替;例如

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

and

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms

这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令应该是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望能节省一些人的时间。

另外,请注意消息不会立即删除,而是在关闭日志段时删除。