Kafka Stream

Stream 基本结构


        // 属性配置
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // stream 构建器
        final StreamsBuilder builder = new StreamsBuilder();

        // builder.stream("Source-Topic").xxx.xxx.to("Sink-Topic")
        builder.<String, String>stream("streams-plaintext-input")
               .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
               .groupBy((key, value) -> value)
               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
               .toStream()
               .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }


Processor



import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

import java.time.Duration;
import java.util.Locale;

public class WorldCountProcessor implements Processor<String,String,String,String> {
    private KeyValueStore<String,Integer> keyValueStore;

    @Override
    public void process(Record<String,String> record) {
        
        final String[] words =
                record.value().toUpperCase(Locale.ROOT).split("\\W+");
        for (final String word:words) {
            final Integer oldValue = keyValueStore.get(word);
            if (oldValue == null) {
                keyValueStore.put(word,1);
            }else {
                keyValueStore.put(word,oldValue + 1);
            }
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void init(ProcessorContext context) {
        context.schedule(Duration.ofSeconds(1),
                PunctuationType.STREAM_TIME,
                timestamp ->{
                    try(final KeyValueIterator<String,Integer> iter = keyValueStore.all()) {
                        while (iter.hasNext()) {
                            final KeyValue<String,Integer> entry = iter.next();
                            context.forward(new Record(entry.key,entry.value.toString(),timestamp));
                        }
                    }
                });
        keyValueStore = context.getStateStore("Counts");
    }
}

Topology use Processor


                Stores
                        .keyValueStoreBuilder(
                                Stores.persistentKeyValueStore("Counts"),
                                Serdes.String(),
                                Serdes.Long()
                        );
        builder.addSource("Source","source-topic")
                .addProcessor("Processor", () -> new WorldCountProcessor(),"Source")
                .addStateStore(countsStoreBuilder,"Process")
                .addSink("Sink","sink-topic","Process");
    }

    // kafka test driver
//    https://kafka.apache.org/35/documentation/streams/developer-guide/testing
}

Test Driver