• kafka复习:(25)kafka stream


    一、java代码:

    package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.*;
    
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;
    
    public class KafkaTest25 {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "k8s-master:9092");
            props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            StreamsBuilder builder = new StreamsBuilder();
            KStream source = builder.stream("streams-plaintext-input");
    
            KTable counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(" ")))
                    .groupBy((key, value) -> value).count();
    
            counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
    
            final KafkaStreams streams = new KafkaStreams(builder.build(), props);
            final CountDownLatch latch = new CountDownLatch(1);
            Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });
    
            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
    
            System.exit(0);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    二、启动console producer:

    bin/kafka-console-producer.sh --broker-list xx.xx.xx.xx:9092 --topic streams-plaintext-input
    
    • 1

    三、启动console consumer:

    ./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:9092 \
        --topic streams-wordcount-output \
        --from-beginning \
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.key=true \
        --property print.value=true \
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    四、在producer端输入字符串(空格分割),看consumer输出

  • 相关阅读:
    【malloc详解】| malloc是什么意思以及如何使用
    7.4缓存
    Spring Cloud学习(十)【Elasticsearch搜索功能 分布式搜索引擎02】
    外发图纸如何控制 才能有效防止敏感数据泄露?
    【开发小记】elementUI面包屑跳到二级路由仍然保持父级导航栏高亮
    JAVA集合框架(一)-ARRAYLIST
    嵌入式C语言——常见面试题
    【数据库】数据库的慢查询问题
    css详细笔记
    【华为机试真题 Python实现】出错的或电路
  • 原文地址:https://blog.csdn.net/amadeus_liu2/article/details/132611972