我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

根据@steven appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我有用。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

其他回答

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:

Topic: Topic “[主题名]”被标记为删除。

您必须在配置中启用此功能

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

清除主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

创建主题

# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

阅读主题

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。

$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic

./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

这将提高用户留存率。女士配置。然后您可以使用上面的alter命令将其更改为1秒(稍后恢复为默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000