我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加取回。size在这里并不理想,因为我实际上并不想接受那么大的消息。


当前回答

除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“delete”(默认),如果是“compact”,它将保留消息更长的时间,即,如果它是“compact”,你必须指定delete.retention.ms。

$ ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
            
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*

$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'

26599762
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'

26599762

另一个问题是,你必须先获得当前配置,以便在删除成功后记得恢复: ./bin/kafka-config .sh——zookeeper localhost:2181——describe——entity-name test-topic-3-100——entity-type topic

其他回答

Thomas的建议很好,但不幸的是zkCli在旧版本的Zookeeper(例如3.3.6)中似乎不支持rmr。例如,比较现代Zookeeper 3.3版本中的命令行实现。

如果你面对的是旧版本的Zookeeper,一个解决方案是使用客户端库,比如zc。zk代表Python。对于不熟悉Python的人,您需要使用pip或easy_install安装它。然后启动一个Python shell (Python),你可以做:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

甚至

zk.delete_recursive('brokers')

如果你想从卡夫卡中删除所有的主题。

临时更新主题的保留时间为1秒:

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

在更新的Kafka版本中,你也可以使用Kafka -configs——实体类型主题

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复以前的保留。女士的价值。

如果您希望在Java应用程序中以编程方式完成此操作,可以使用AdminClient的API deleterrecords。使用AdminClient可以删除分区和偏移量级别上的记录。

根据JavaDocs, 0.11.0.0或更高版本的代理支持此操作。

这里有一个简单的例子:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);

Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。

首先,确保服务器。属性文件有,如果没有添加delete.topic.enable=true

然后,删除主题 bin/kafka-topics.sh——zookeeper localhost:2181——delete——topic myTopic

然后重新创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

如果你使用confluentinc/cp-kafka容器,下面是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.