Kafka Consumer

基本消费者代码



        // 配置属性
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 订阅
            consumer.subscribe(Arrays.asList("quickstart-events"));
            while (true) {
                // 拉取
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }


更详细的消费控制


        
        Properties props = new Properties();

        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("quickstart-events"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                for (TopicPartition part:collection) {
                    // 获取offset 可以自行处理
                    Long currOffset = consumer.position(part);
                    // we can save the offset in own db
                }
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {

//                consumer.seekToBeginning(collection);
                for (TopicPartition part:collection) {
                    // get offset from own db
                    // 可以从自己的系统取得offset 以达将offset与业务系统进行关联
                    Long currOffset = 0L;
                    consumer.seek(part,currOffset);
                }

            }
        });
        // 按照批去
        final int minBatchSize = 10;

        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                printThis(buffer);
                // 手动提交
                consumer.commitAsync();
                buffer.clear();
            }
        }


         // 指定分区
        String topic = "foo";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0, partition1));