我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

从Java,使用新的AdminZkClient代替已弃用的AdminUtils:

  public void reset() {
    try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
        5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {

      for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
        deleteTopic(entry.getKey(), zkClient);
      }
    }
  }

  private void deleteTopic(String topic, KafkaZkClient zkClient) {

    // skip Kafka internal topic
    if (topic.startsWith("__")) {
      return;
    }

    System.out.println("Resetting Topic: " + topic);
    AdminZkClient adminZkClient = new AdminZkClient(zkClient);
    adminZkClient.deleteTopic(topic);

    // deletions are not instantaneous
    boolean success = false;
    int maxMs = 5_000;
    while (maxMs > 0 && !success) {
      try {
        maxMs -= 100;
        adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
        success = true;
      } catch (TopicExistsException ignored) {
      }
    }

    if (!success) {
      Assert.fail("failed to create " + topic);
    }
  }

  private Map<String, List<PartitionInfo>> listTopics() {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
    props.put("group.id", "test-container-consumer-group");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Map<String, List<PartitionInfo>> topics = consumer.listTopics();
    consumer.close();

    return topics;
  }

其他回答

kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。

首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true

然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic

然后重新创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。

$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic

user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:

警告:——zookeeper已弃用,并将在Kafka的未来版本中删除

使用——bootstrap-server代替;例如

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

and

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms

下面是删除名为MyTopic的主题的步骤:

描述主题,并记下代理id 为列出的每个代理ID停止Apache Kafka守护进程。 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作 删除主题元数据:zkCli.sh,然后删除rmr /brokers/MyTopic 为每台停止的机器启动Apache Kafka守护进程


如果你错过了第3步,Apache Kafka将继续报告当前的主题(例如当你运行Kafka -list-topic.sh时)。

使用Apache Kafka 0.8.0测试。