使KafKa每次读取消息到最新发送消息的解决方案(使用版本0.10.1.1)
使KafKa每次读取消息到最新发送消息,查了很多资料,对kafka的消费组和偏移量也有些研究,但本地与集群,不同版本都有不少不同之处。
第一个方案
考虑直接去掉偏移量,清空偏移量想要reset一个group的offset,是在创建group的时候就告诉Brokers不要记录它的offset:在代码里就是init KafkaConsumer的时候设置参数enable_auto_commit=false。准备试试,但在程序中使用就有问题,程序没有偏移量会报错。
第二个方案
没法干掉偏移量,准备换掉groupid。实现方法是在groupid后加时间戳,代码运行切换group。此时通过初始偏移量offset的配置为kafka.auto.offset.reset=latest,每次都只读最新数据,但不断新建,Consumers consuming from this topic增加,偏移量___consumer_offsets 多次保存。topic删除group_id较困难,也没办法做到定期清理,应该会对性能有影响。
第三个方案,采用 simple consumer api
是调用提供的api:在代码里就是seekTond方法(需要注意的是,如果你是初始化KafkaConsumer的时候指定的topic(high-level consumer 的 api),即使用subscribe方法,然后执行seekTond方法会报错。这是没有指定分区而自动分区,同后面提交分区冲突。
必须在初始化之后调用assign()方法来指定抓哪个topic的哪个partition里的数据,然后再调用seekTond方法)。示例代码如下:
List<TopicPartition> topicPartitionList= new ArrayList<>();
topicPartitionList.add(new TopicPartition(this.consumer_kafka_topic,0));
this.consumer.assign(topicPartitionList);
this.consumer.seekToEnd(topicPartitionList);
因为集群指定分区为零。且仅有一个分区,所以可以较好的满足需求,配置很简单。对于需要多个分区的问题,这方面的设置就会比较复杂。
记录些问题
1.___consumer_offsets的产生速度很快,在集群其他topic没有接入数据时,仍不断刷新消费者的偏移量,从不间断。如此高频次的记录是为了什么?
2.Kafka权威指南 有一句话:如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区... 消费者加入,再均衡,分区 三者之间的关系是什么。
登录 | 立即注册