我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

临时更新主题的保留时间为1秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。

其他回答

如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。

根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。

这里有一个简单的例子:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

临时更新主题的保留时间为1秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。

下面是删除名为MyTopic的主题的步骤:

描述主题,并记下代理id 为列出的每个代理ID停止Apache Kafka守护进程。 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作 删除主题元数据:zkCli.sh,然后删除rmr /brokers/MyTopic 为每台停止的机器启动Apache Kafka守护进程


如果你错过了第3步,Apache Kafka将继续报告当前的主题(例如当你运行Kafka -list-topic.sh时)。

使用Apache Kafka 0.8.0测试。

在Kafka 0.8.2中测试,作为快速启动的例子: 首先,添加一行到服务器。配置文件夹下的属性文件:

delete.topic.enable=true

然后,执行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它,以便客户端继续对空主题进行操作

以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000