我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
临时更新主题的保留时间为1秒:
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。
其他回答
从Java,使用新的AdminZkClient代替已弃用的AdminUtils:
public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {
for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}
private void deleteTopic(String topic, KafkaZkClient zkClient) {
// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}
System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}
if (!success) {
Assert.fail("failed to create " + topic);
}
}
private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();
return topics;
}
虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过kafka-configs来完成。
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
通过该方法设置的配置可以通过命令显示
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。
根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。
这里有一个简单的例子:
String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
Thomas的建议很好,但不幸的是zkCli在旧版本的Zookeeper(例如3.3.6)中似乎不支持rmr。例如,比较现代Zookeeper 3.3版本中的命令行实现。
如果你面对的是旧版本的Zookeeper,一个解决方案是使用客户端库,比如zc。zk代表Python。对于不熟悉Python的人,您需要使用pip或easy_install安装它。然后启动一个Python shell (Python),你可以做:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
甚至
zk.delete_recursive('brokers')
如果你想从卡夫卡中删除所有的主题。
我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。
bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]
只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:
Topic: Topic “[主题名]”被标记为删除。