我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:
kafka.common.InvalidMessageSizeException: invalid message size
增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。
当前回答
以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置,使消息保留1000毫秒。在清除消息之后,可以将其设置为不同的值。
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic --alter --add-config retention.ms=1000
其他回答
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
这将提高用户留存率。女士配置。然后您可以使用上面的alter命令将其更改为1秒(稍后恢复为默认值)。
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)
## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
我应该使用zookeeper:2181而不是——zookeeper localhost:2181作为我的撰写文件
## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
正确的命令应该是
docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000
希望能节省一些人的时间。
另外,请注意消息不会立即删除,而是在关闭日志段时删除。
更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。
是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。
如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。
根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。
这里有一个简单的例子:
String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);
Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
临时更新主题的保留时间为1秒:
kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000
在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题
kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000
然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。