我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:

Topic: Topic “[主题名]”被标记为删除。

其他回答

临时更新主题的保留时间为1秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创造它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

最简单的方法是将各个日志文件的日期设置为比保留期更早的日期。然后经纪人会在几秒钟内为你清理并移除它们。这有几个优点:

不需要关闭代理,这是一个运行时操作。 避免出现无效偏移异常的可能性(下文将详细介绍)。

In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.

还可以通过清除zookeeper中针对该主题的消费者偏移量来缓解这一问题。但如果你不需要一个处女主题,只是想删除现有的内容,那么简单地“触摸”一些主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。

您必须在配置中启用此功能

echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties 
sudo systemctl stop kafka 
sudo systemctl start kafka 

清除主题

/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows

创建主题

# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test

阅读主题

# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning