我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
你是否考虑过让你的应用只使用一个新的重命名主题?(例如,一个主题的名称与原始主题相同,但在结尾追加了“1”)。
这也会给你的应用一个新鲜干净的主题。
其他回答
来自kafka 1.1
清除一个主题
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100
等待至少1分钟,以确保卡夫卡清除主题 删除配置,然后转到默认值
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
这将提高用户留存率。女士配置。然后您可以使用上面的alter命令将其更改为1秒(稍后恢复为默认值)。
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
您必须在配置中启用此功能
echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
清除主题
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows
创建主题
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
阅读主题
# /opt/kafka/bin/kafka-console-consumer.sh localhost:9092 --topic flows --from-beginning
如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。
根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。
这里有一个简单的例子:
String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。
我遵循这些步骤,特别是在使用TLS时。
1:使用kafka工具运行:
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2:运行:
kafka-控制台-消费者——消费者-财产安全。SSL .truststore.location=/etc/schema-registry/secrets/trust。JKS——消费者-属性ssl.truststore。Password = Password——consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity。JKS——消费者-属性ssl.keystore。密码=密码——consumer-property ssl.key。Password = Password——bootstrap-server broker01.kafka.com:9092——topic <topic-name>——new-consumer——from-beginning
3:当主题为空时,将主题保留设置回初始设置。
kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
希望这能帮助到一些人,因为它不容易宣传。