JobPlus知识库 IT 大数据 文章
使KafKa每次读取消息到最新发送消息的解决方案

使KafKa每次读取消息到最新发送消息的解决方案(使用版本0.10.1.1)

使KafKa每次读取消息到最新发送消息,查了很多资料,对kafka的消费组和偏移量也有些研究,但本地与集群,不同版本都有不少不同之处。

第一个方案

考虑直接去掉偏移量,清空偏移量想要reset一个group的offset,是在创建group的时候就告诉Brokers不要记录它的offset:在代码里就是init KafkaConsumer的时候设置参数enable_auto_commit=false。准备试试,但在程序中使用就有问题,程序没有偏移量会报错。

第二个方案

没法干掉偏移量,准备换掉groupid。实现方法是在groupid后加时间戳,代码运行切换group。此时通过初始偏移量offset的配置为kafka.auto.offset.reset=latest,每次都只读最新数据,但不断新建,Consumers consuming from this topic增加,偏移量___consumer_offsets 多次保存。topic删除group_id较困难,也没办法做到定期清理,应该会对性能有影响。

第三个方案,采用 simple consumer api

是调用提供的api:在代码里就是seekTond方法(需要注意的是,如果你是初始化KafkaConsumer的时候指定的topic(high-level consumer 的 api),即使用subscribe方法,然后执行seekTond方法会报错。这是没有指定分区而自动分区,同后面提交分区冲突。

必须在初始化之后调用assign()方法来指定抓哪个topic的哪个partition里的数据,然后再调用seekTond方法)。示例代码如下:

  1. List<TopicPartition> topicPartitionList= new ArrayList<>();

  2. topicPartitionList.add(new TopicPartition(this.consumer_kafka_topic,0));

  3. this.consumer.assign(topicPartitionList);

  4. this.consumer.seekToEnd(topicPartitionList);

因为集群指定分区为零。且仅有一个分区,所以可以较好的满足需求,配置很简单。对于需要多个分区的问题,这方面的设置就会比较复杂。

记录些问题

1.___consumer_offsets的产生速度很快,在集群其他topic没有接入数据时,仍不断刷新消费者的偏移量,从不间断。如此高频次的记录是为了什么?

2.Kafka权威指南 有一句话:如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区... 消费者加入,再均衡,分区 三者之间的关系是什么。


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
180人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序