Kafka Productor

基本生产者



        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        try(Producer<String, String> producer = new KafkaProducer<>(props)){
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<String, String>("quickstart-events", Integer.toString(i), Integer.toString(i)));
            }
            producer.flush();
        }


事务



        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-transactional-id");
        try(Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer())) {
            producer.initTransactions();

            producer.beginTransaction();
            for (int i = 0;i<10;i++){
                producer.send(new ProducerRecord<>("quickstart-events",Integer.toString(i),Integer.toString(i)));
            }
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

        } catch (KafkaException e) {

        }