我对卡夫卡还比较陌生。我已经做了一些试验,但一些事情是不清楚的,我对消费者抵消。根据我目前的理解,当消费者启动时,它将开始读取的偏移量由配置设置auto.offset.reset决定(如果我错了,请纠正我)。

现在假设主题中有10条消息(偏移量从0到9),一个消费者碰巧在它关闭之前(或者在我杀死消费者之前)使用了其中的5条消息。然后重新启动该使用者进程。我的问题是:

如果auto.offset.reset设置为最早,它总是从偏移量0开始消费吗? 如果auto.offset.reset设置为latest,它会从偏移量5开始消费吗? 这种情况下的行为总是确定的吗?

如果我的问题有不清楚的地方,请不要犹豫评论。


当前回答

只是一个更新:从Kafka 0.9开始,Kafka使用了一个新的Java版本的消费者,并且auto.offset.reset参数名已经更改;摘自手册:

当Kafka中没有初始偏移量或如果当前 偏移量不再存在于服务器上(例如,因为该数据 已删除): 最早:自动将偏移量重置为最早的偏移量 Latest:自动将偏移量重置为最近的偏移量 None:如果没有发现之前的偏移量,则向消费者抛出异常 针对消费者群体 其他情况:向使用者抛出异常。

在检查了被接受的答案后,我花了一些时间找到了这个答案,所以我认为它可能对社区发布有用。

其他回答

此外,还有补偿。留存。分钟。如果距离上次提交的时间为> offset .retention。分钟,然后auto.offset.reset也开始生效

只是一个更新:从Kafka 0.9开始,Kafka使用了一个新的Java版本的消费者,并且auto.offset.reset参数名已经更改;摘自手册:

当Kafka中没有初始偏移量或如果当前 偏移量不再存在于服务器上(例如,因为该数据 已删除): 最早:自动将偏移量重置为最早的偏移量 Latest:自动将偏移量重置为最近的偏移量 None:如果没有发现之前的偏移量,则向消费者抛出异常 针对消费者群体 其他情况:向使用者抛出异常。

在检查了被接受的答案后,我花了一些时间找到了这个答案,所以我认为它可能对社区发布有用。

这比你描述的要复杂一点。 auto.offset.reset配置只在你的消费组没有提交有效的偏移量时才生效(现在支持的偏移量存储是Kafka和Zookeeper),这也取决于你使用的消费组类型。

如果你使用高级java消费者,那么想象以下场景:

You have a consumer in a consumer group group1 that has consumed 5 messages and died. Next time you start this consumer it won't even use that auto.offset.reset config and will continue from the place it died because it will just fetch the stored offset from the offset storage (Kafka or ZK as I mentioned). You have messages in a topic (like you described) and you start a consumer in a new consumer group group2. There is no offset stored anywhere and this time the auto.offset.reset config will decide whether to start from the beginning of the topic (earliest) or from the end of the topic (latest)

影响偏移值与最早和最新配置对应的另一个因素是日志保留策略。假设您有一个主题,保留时间配置为1小时。你产生5条消息,然后一个小时后你再发布5条消息。最新的偏移量仍然和前面的例子一样,但是最早的偏移量不能为0,因为Kafka已经删除了这些消息,因此最早可用的偏移量将是5。

上面提到的所有内容都与SimpleConsumer无关,每次运行它时,它将决定从哪里开始使用auto.offset.reset配置。

如果你使用的Kafka版本高于0.9,你必须将最早的,最新的替换为最小的,最大的。