kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

06-01 1890阅读

kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

    代码示例:

    consumer.subscribe(Collections.singleton("test-topic"));
    Set assignment = new HashSet();
    // 确保分配到分区
    while (assignment.isEmpty()) {
        consumer.poll(Duration.ofMillis(100));
        assignment = consumer.assignment();
    }
    // 设置所有分区从offset=100开始消费
    assignment.forEach(tp -> consumer.seek(tp, 100));
    

    2.2.从时间戳开始消费

    使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()。

    源码分析:

    offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

    代码实例:

    Map timestamps = assignment.stream()
        .collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
    // 获取24小时前的偏移量
    Map offsets = consumer.offsetsForTimes(timestamps);
    offsets.forEach((tp, offsetAndTs) -> {
        if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
    });
    

    2.3.从分区首尾消费

    使用seekToBeginning()或seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

    代码实例:

    // 从分区末尾开始消费(等效于auto.offset.reset=latest)
    Map endOffsets = consumer.endOffsets(assignment);
    assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));
    

    2.4.注意事项

    1. 分区分配与poll()的依赖

      seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

    2. 数据过期问题

      若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

    3. 异步提交与位移覆盖风险

      异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性。

    4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

    3.完整代码实例

    public class SeekToTimestampDemo {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "seek-demo");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("enable.auto.commit", "false");
            KafkaConsumer consumer = new KafkaConsumer(props);
            consumer.subscribe(Collections.singleton("test-topic"));
            // 等待分区分配
            Set assignment = new HashSet();
            while (assignment.isEmpty()) {
                consumer.poll(Duration.ofMillis(100));
                assignment = consumer.assignment();
            }
            // 获取24小时前的时间戳对应偏移量
            Map timestamps = assignment.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));
            Map offsets = consumer.offsetsForTimes(timestamps);
            // 指定位移
            offsets.forEach((tp, offsetAndTs) -> {
                if (offsetAndTs != null) {
                    consumer.seek(tp, offsetAndTs.offset());
                } else {
                    // 处理无有效偏移量的情况(如从头开始)
                    consumer.seekToBeginning(Collections.singleton(tp));
                }
            });
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));
            }
        }
    }
    
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码