Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("quickstart-events"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for (TopicPartition part:collection) {
// 获取offset 可以自行处理
Long currOffset = consumer.position(part);
// we can save the offset in own db
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
// consumer.seekToBeginning(collection);
for (TopicPartition part:collection) {
// get offset from own db
// 可以从自己的系统取得offset 以达将offset与业务系统进行关联
Long currOffset = 0L;
consumer.seek(part,currOffset);
}
}
});
// 按照批去
final int minBatchSize = 10;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
printThis(buffer);
// 手动提交
consumer.commitAsync();
buffer.clear();
}
}
// 指定分区
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));