我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:
警告:——zookeeper已弃用,并将在Kafka的未来版本中删除
使用——bootstrap-server代替;例如
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
and
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
其他回答
如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。
docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>
成功的回应:
Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
您必须在配置中启用此功能
echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
清除主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
创建主题
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
阅读主题
# /opt/kafka/bin/kafka-console-consumer.sh localhost:9092 --topic flows --from-beginning
除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。
$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic
另一种手动清除主题的方法是:
在经纪人中:
停止kafka broker Sudo服务kafka停止 删除所有分区日志文件(应该在所有代理上执行) sudo rm - r /kafka-storage/kafka-logs/<some_topic_name>-* .
动物园管理员:
运行zookeeper命令行界面 sudo /usr/lib/zookeeper/bin/zkCli.sh 使用zkCli删除主题元数据 rmr /代理/主题/ < some_topic_name >
在经纪人那里:
重新启动代理服务 Sudo服务kafka启动
从Java,使用新的AdminZkClient代替已弃用的AdminUtils:
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}