我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

其他回答

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

{ “分区”:( { “主题”:“foo”, “分区”:1、 “抵消”:1 } ), “版本”:1 }

offset:-1将删除所有记录的地方 这个命令已经在kafka 2.0.1中测试过了

临时更新主题的保留时间为1秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创造它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。

我遵循这些步骤,特别是在使用TLS时。

1:使用kafka工具运行:

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2:运行:

kafka-控制台-消费者——消费者-财产安全。SSL .truststore.location=/etc/schema-registry/secrets/trust。JKS——消费者-属性ssl.truststore。Password = Password——consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity。JKS——消费者-属性ssl.keystore。密码=密码——consumer-property ssl.key。Password = Password——bootstrap-server broker01.kafka.com:9092——topic <topic-name>——new-consumer——from-beginning

3:当主题为空时,将主题保留设置回初始设置。

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这能帮助到一些人,因为它不容易宣传。