我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
临时更新主题的保留时间为1秒:
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。
其他回答
Thomas的建议很好,但不幸的是zkCli在旧版本的Zookeeper(例如3.3.6)中似乎不支持rmr。例如,比较现代Zookeeper 3.3版本中的命令行实现。
如果你面对的是旧版本的Zookeeper,一个解决方案是使用客户端库,比如zc。zk代表Python。对于不熟悉Python的人,您需要使用pip或easy_install安装它。然后启动一个Python shell (Python),你可以做:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
甚至
zk.delete_recursive('brokers')
如果你想从卡夫卡中删除所有的主题。
临时更新主题的保留时间为1秒:
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。
虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过kafka-configs来完成。
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
通过该方法设置的配置可以通过命令显示
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
user644265在这个答案中建议的临时减少主题的保留时间的解决方法仍然有效,但最新版本的kafka-configs会警告——zookeeper选项已被弃用:
警告:——zookeeper已弃用,并将在Kafka的未来版本中删除
使用——bootstrap-server代替;例如
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100
and
kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms
从Java,使用新的AdminZkClient代替已弃用的AdminUtils:
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}