• [平台运维、Hadoop]kafka streams概述


    9633f3bb7c3643d0a6989e51c0470ac6.gif#pic_center

    目录

    一、 kafka streams概述

    二、kafka streams开发单词计数应用


    一、 kafka streams概述

    Kafka Streams是Apache Kafka开源项目的一个流处理框架,它是基于Kafka的生产者和消费者,为开发者提供了流式处理的能力,具有低延迟性.高扩展性、高弹性、高容错性的特点,易于集成到现有的应用程序中。

    KafkaStreams是一套处理分析Kafka中存储数据的客户端类库,处理完的数据可以重新写回Kafka,也可以发送给外部存储系统。作为类库,可以非常方便地嵌人到应用程序中,直接提供具体的类供开发者调用,而且在打包和部署的过程中基本没有任何要求,整个应用的运行方式主要由开发者控制,方便使用和调试。

    二、kafka streams开发单词计数应用

    (步骤一)编写代码

    ①创建名为LogProcessor的Java class

    be560a569b5043269ad807bc530e3203.png

    ② 编写LogProcessor.java代码

    1. package cn.itcast;
    2. import org.apache.kafka.streams.processor.Processor;
    3. import org.apache.kafka.streams.processor.ProcessorContext;
    4. import java.util.HashMap;
    5. public class LogProcessor implements Processor<byte[],byte[]> {
    6.     //上下文对象
    7.     private ProcessorContext processorContext;
    8.     @Override
    9.     public void init(ProcessorContext processorContext) {
    10.         //初始化方法
    11.         this.processorContext=processorContext;
    12.     }
    13.     @Override
    14.     public void process(byte[] key, byte[] value) {
    15.         //处理一条消息
    16.         String inputOri = new String(value);
    17.         HashMap map = new HashMap();
    18.         int times = 1;
    19.         if(inputOri.contains(" ")){
    20.             //截取字段
    21.             String [] words = inputOri.split(" ");
    22.             for (String word : words){
    23.                 if(map.containsKey(word)){
    24.                     map.put(word,map.get(word)+1);
    25.                 }else{
    26.                     map.put(word,times);
    27.                 }
    28.             }
    29.         }
    30.         inputOri = map.toString();
    31.         processorContext.forward(key,inputOri.getBytes());
    32.     }
    33.     @Override
    34.     public void close() {}
    35. }

    686d4c9beaab4971b39036648f1cc487.png

    ③ 创建名为App的Java class

    567d718ee2494d77a90ffd1440988f55.png

     编写App.java代码

    1. import org.apache.kafka.streams.KafkaStreams;
    2. import org.apache.kafka.streams.StreamsConfig;
    3. import org.apache.kafka.streams.Topology;
    4. import org.apache.kafka.streams.processor.Processor;
    5. import org.apache.kafka.streams.processor.ProcessorSupplier;
    6. import java.util.Properties;
    7. public class App {
    8.     public static void main(String[] args) {
    9.         //声明来源主题
    10.         String fromTopic = "testStreams1";
    11.         //声明目标主题
    12.         String toTopic = "testStreams2";
    13.         //设置参数
    14.         Properties props = new Properties();
    15.         props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");
    16.         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092,slave2:9092");
    17.         //实例化StreamsConfig
    18.         StreamsConfig config = new StreamsConfig(props);
    19.         //构建拓扑结构
    20.         Topology topology = new Topology();
    21.         //添加源处理节点,为源处理节点指定名称和它订阅的主题
    22.         topology.addSource("SOURCE",fromTopic)
    23.                 //添加自定义处理节点,指定名称,处理器类和上一个节点的名称
    24.                 .addProcessor("PROCESSOR", new ProcessorSupplier() {
    25.                     @Override
    26.                     public Processor get() {//调用这个方法,就知道这条数据用哪个process处理,
    27.                         return new LogProcessor();
    28.                     }
    29.                 },"SOURCE")
    30.                 //添加目标处理节点,需要指定目标处理节点的名称,和上一个节点名称。
    31.                 .addSink("SINK",toTopic,"PROCESSOR");//最后给SINK
    32.         //实例化KafkaStreams
    33.         KafkaStreams streams = new KafkaStreams(topology,config);
    34.         streams.start();
    35.     }
    36. }

    8e4b1945497e41cc9ef8d40c01a4ba2b.png

    (步骤二)执行测试

    ① 在master节点创建testStreams1和testStreams2主题

    $ bin/kafka-topics.sh --create --topic testStreams1 --partitions 3 --replication-factor 1 --zookeeper master:2181,slave1:2181,slave2:2181
    $ bin/kafka-topics.sh --create --topic testStreams2 --partitions 3 --replication-factor 1 --zookeeper master:2181,slave1:2181,slave2:2181

    875f6f8093324670be6a10b2ce822e20.png

     启动生产者服务命令

    bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic testStreams1

    0bd4d287b2da4e358e2b584687037122.png

     启动消费者服务命令

    bin/kafka-console-consumer.sh --from-beginning --topic testStreams2 --bootstrap-server master:9092,slave1:9092,slave2:9092

    2926eee5410141e3909f7559be233bb8.png

     再运行App.java程序

    d316ba8e7b6a4a619846748ad8642c3b.png

     在master节点输入任意数据,按enter键发送,在slave1节点上可以查看到消息

    输入内容如下:Hello itcast hello spark hello kafka,结果如下图

    1b5ec4cba6644892a6eb077346761c75.png

     438785ec166549cb937f68ac491be72b.png

  • 相关阅读:
    为什么别人家的ChatGPT比我家的更聪明?
    【JS】Chapter11-正则&阶段案例
    Hadoop生态之hive
    MySQL高级学习笔记(二)
    【PPT制作】基础篇
    halcon自定义函数基本操作
    最长算术(暑假每日一题 11)
    性能测试常见问题总结
    在 VMware vSphere 中构建 Kubernetes 存储环境
    代码随想录-015-剑指Offer206. 反转链表
  • 原文地址:https://blog.csdn.net/m0_57781407/article/details/127131830