我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
另一种手动清除主题的方法是:
在经纪人中:
停止kafka broker Sudo服务kafka停止 删除所有分区日志文件(应该在所有代理上执行) sudo rm - r /kafka-storage/kafka-logs/<some_topic_name>-* .
动物园管理员:
运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkCli删除主题元数据 rmr /代理/主题/ < some_topic_name >
在经纪人那里:
重新启动代理服务 Sudo服务kafka启动
其他回答
来自kafka 1.1
清除一个主题
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100
等待至少1分钟,以确保卡夫卡清除主题 删除配置,然后转到默认值
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。
$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic
下面的命令可以删除kafka topic中所有已有的消息:
kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json
删除的结构。Json文件应该如下:
{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }
offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了
要清除队列,可以删除主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
然后重新创造它:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。
根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。
这里有一个简单的例子:
String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}