Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(props)){
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("quickstart-events", Integer.toString(i), Integer.toString(i)));
}
producer.flush();
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
try(Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
producer.initTransactions();
producer.beginTransaction();
for (int i = 0;i<10;i++){
producer.send(new ProducerRecord<>("quickstart-events",Integer.toString(i),Integer.toString(i)));
}
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
} catch (KafkaException e) {
}