// 属性配置
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// stream 构建器
final StreamsBuilder builder = new StreamsBuilder();
// builder.stream("Source-Topic").xxx.xxx.to("Sink-Topic")
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Duration;
import java.util.Locale;
public class WorldCountProcessor implements Processor<String,String,String,String> {
private KeyValueStore<String,Integer> keyValueStore;
@Override
public void process(Record<String,String> record) {
final String[] words =
record.value().toUpperCase(Locale.ROOT).split("\\W+");
for (final String word:words) {
final Integer oldValue = keyValueStore.get(word);
if (oldValue == null) {
keyValueStore.put(word,1);
}else {
keyValueStore.put(word,oldValue + 1);
}
}
}
@Override
public void close() {
}
@Override
public void init(ProcessorContext context) {
context.schedule(Duration.ofSeconds(1),
PunctuationType.STREAM_TIME,
timestamp ->{
try(final KeyValueIterator<String,Integer> iter = keyValueStore.all()) {
while (iter.hasNext()) {
final KeyValue<String,Integer> entry = iter.next();
context.forward(new Record(entry.key,entry.value.toString(),timestamp));
}
}
});
keyValueStore = context.getStateStore("Counts");
}
}
Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("Counts"),
Serdes.String(),
Serdes.Long()
);
builder.addSource("Source","source-topic")
.addProcessor("Processor", () -> new WorldCountProcessor(),"Source")
.addStateStore(countsStoreBuilder,"Process")
.addSink("Sink","sink-topic","Process");
}
// kafka test driver
// https://kafka.apache.org/35/documentation/streams/developer-guide/testing
}