• flink统计次数


    java:

    1. public static void main(String[] args) throws Exception{
    2. //1. 创建流式处理的执行环境
    3. // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    5. // 参数中提取主机名和端口号
    6. ParameterTool parameterTool = ParameterTool.fromArgs(args);
    7. String host = parameterTool.get("host");
    8. int port = parameterTool.getInt("port");
    9. //2. 读取文本流
    10. DataStreamSource<String> stringDataStreamSource = env.socketTextStream(host, port);
    11. SingleOutputStreamOperator<String> filter = stringDataStreamSource.filter(new FilterFunction<String>() {
    12. @Override
    13. public boolean filter(String s) throws Exception {
    14. return false;
    15. }
    16. });
    17. // 3. 转换计算
    18. SingleOutputStreamOperator<Tuple2<String, Long>> returns =
    19. stringDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
    20. String[] words = line.split(" ");
    21. for (String v : words) {
    22. out.collect(Tuple2.of(v, 1L));
    23. }
    24. }).returns(Types.TUPLE(Types.STRING, Types.LONG));
    25. // 4.分组操作
    26. KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = returns.keyBy(data -> data.f0);
    27. // 5.求和
    28. SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
    29. // 6. 打印
    30. sum.print();
    31. // 7.启动执行
    32. env.execute();
    33. // 启动之前 需要 一个linux 服务器
    34. // 启动linux 执行 nc -lk 7777
    35. // 24 行的IP 和 端口 要对应
    36. // windows 下载 netcat
    37. // 执行 nc -l -p 7777
    38. }

    pom:

    1. <project xmlns="http://maven.apache.org/POM/4.0.0"
    2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <modelVersion>4.0.0modelVersion>
    5. <groupId>com.flinkgroupId>
    6. <artifactId>flik-demoartifactId>
    7. <version>1.0-SNAPSHOTversion>
    8. <properties>
    9. <flink.version>1.13.0flink.version>
    10. <java.version>1.8java.version>
    11. <scala.binary.version>2.12scala.binary.version>
    12. <slf4j.version>1.7.30slf4j.version>
    13. properties>
    14. <dependencies>
    15. <dependency>
    16. <groupId>org.apache.flinkgroupId>
    17. <artifactId>flink-javaartifactId>
    18. <version>${flink.version}version>
    19. dependency>
    20. <dependency>
    21. <groupId>org.apache.flinkgroupId>
    22. <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
    23. <version>${flink.version}version>
    24. dependency>
    25. <dependency>
    26. <groupId>org.apache.flinkgroupId>
    27. <artifactId>flink-clients_${scala.binary.version}artifactId>
    28. <version>${flink.version}version>
    29. dependency>
    30. <dependency>
    31. <groupId>org.slf4jgroupId>
    32. <artifactId>slf4j-apiartifactId>
    33. <version>${slf4j.version}version>
    34. dependency>
    35. <dependency>
    36. <groupId>org.slf4jgroupId>
    37. <artifactId>slf4j-log4j12artifactId>
    38. <version>${slf4j.version}version>
    39. dependency>
    40. <dependency>
    41. <groupId>org.apache.logging.log4jgroupId>
    42. <artifactId>log4j-to-slf4jartifactId>
    43. <version>2.14.0version>
    44. dependency>
    45. dependencies>
    46. project>

    注意:必须先启动 端口,才能启动程序

    启动之前 需要 一个linux 服务器
    启动linux  执行 nc -lk 7777
    24 行的IP 和 端口 要对应
    
    windows 下载 netcat
    执行 nc -l -p 7777
  • 相关阅读:
    【BOOST C++ 5 】通信(03 网络编程 )
    HTTP Only限制XSS盗取cookie
    【调制解调】QPSK信号的调制解调附matlab代码
    java 多线程&wait/notify机制——61
    文章提交秒收录软件
    3.2 配置系统
    使用golang+antlr4构建一个自己的语言解析器(完结篇)
    IPD的商业实现过程
    解析java在的debug模式之属性断点调试
    html+css布局,DIV区域的宽度和高度随页面宽度变化时等比变化
  • 原文地址:https://blog.csdn.net/HDXxiazai/article/details/126171571