我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。

./kafka-path/bin/kafka-console-consumer.sh——zookeeper localhost:2181——topic topicName——from-beginning——group application-group .sh

其他回答

在Kafka 0.8.2中测试,作为快速启动的例子: 首先,添加一行到服务器。配置文件夹下的属性文件:

delete.topic.enable=true

然后,执行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它,以便客户端继续对空主题进行操作

你是否考虑过让你的应用只使用一个新的重命名主题?(例如,一个主题的名称与原始主题相同,但在结尾追加了“1”)。

这也会给你的应用一个新鲜干净的主题。

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }

offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。

根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。

这里有一个简单的例子:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}