• SpringBoot集成flink


    Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
    最大亮点是流处理,最适合的应用场景是低时延的数据处理。
    场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

    环境搭建:

    ①、安装flink

    https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

    ②、安装Netcat

    Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
    用于测试网络中的端口,发送文件等操作。
    进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

    yum install -y nc # 安装nc命令
    
    nc -lk 8888 # 启动socket端口
    
    • 1
    • 2
    • 3

    无界流之读取socket文本流

    一、依赖

    
    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>springboot-demoartifactId>
            <groupId>com.etgroupId>
            <version>1.0-SNAPSHOTversion>
        parent>
        <modelVersion>4.0.0modelVersion>
    
        <artifactId>flinkartifactId>
    
        <properties>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
        properties>
        <dependencies>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-webartifactId>
            dependency>
    
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-autoconfigureartifactId>
            dependency>
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
                <scope>testscope>
            dependency>
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-javaartifactId>
                <version>1.17.0version>
            dependency>
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-javaartifactId>
                <version>1.17.0version>
            dependency>
    
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clientsartifactId>
                <version>1.17.0version>
            dependency>
    
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-baseartifactId>
                <version>1.17.0version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-filesartifactId>
                <version>1.17.0version>
            dependency>
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-kafkaartifactId>
                <version>1.17.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-runtime-webartifactId>
                <version>1.17.0version>
            dependency>
    
    
        dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-shade-pluginartifactId>
                    <executions>
                        <execution>
                            <phase>packagephase>
                            <goals>
                                <goal>shadegoal>
                            goals>
                            <configuration>
                                <transformers>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.handlersresource>
                                    transformer>
                                    <transformer
                                            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                        <resource>META-INF/spring.factoriesresource>
                                    transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.schemasresource>
                                    transformer>
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.et.flink.job.SocketJobmainClass>
                                    transformer>
                                transformers>
                            configuration>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    二、SoketJob

    public class SocketJob{
    	
    	public static void main(String[] args)throws Exception{
    		
    		// 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 指定并行度,默认电脑线程数
            env.setParallelism(3);
            // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
            DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);
    
            // 处理数据: 切换、转换、分组、聚合 得到统计结果
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                    .flatMap(
                            (String value, Collector<Tuple2<String, Integer>> out) -> {
                                String[] words = value.split(" ");
                                for (String word : words) {
                                    out.collect(Tuple2.of(word, 1));
                                }
                            }
                    )
                    .setParallelism(2)
                    // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据
                    .returns(new TypeHint<Tuple2<String, Integer>>() {
                    })
    //                .returns(Types.TUPLE(Types.STRING,Types.INT))
                    .keyBy(value -> value.f0)
                    .sum(1);
    
    
            // 输出
            sum.print();
    
            // 执行
            env.execute();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    测试:

    启动socket流:

    nc -l 8888
    
    • 1

    本地执行:直接ideal启动main程序,在socket流中输入

    abc bcd cde
    bcd cde fgh
    cde fgh hij
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    集群执行:
    执行maven打包,将打包的jar上传到集群中
    在这里插入图片描述

  • 相关阅读:
    java小游戏-超级玛丽
    各位程序员们,睡眠不足产生的后果超出你想象!
    树结构处理,list和tree互转
    不懂Hybird开发,感觉错过一个亿~
    c++视觉处理----分水岭算法
    吹爆这款制作电子图册的工具,真是太绝了
    Go 使用Viper处理Go应用程序的配置
    权重衰退(PyTorch)
    css_易忘点总结
    说好的女程序员做测试有优势?面试十几家,被面试官虐哭~~
  • 原文地址:https://blog.csdn.net/usa_washington/article/details/136492798