• Flink学习4 - 富函数 + 数据重分区操作 + sink 操作(kafka、redis、jdbc)


    1、富函数 - 函数类接口,可以获取运行环境的上下文,实现更复杂的功能

    在这里插入图片描述
    在这里插入图片描述

    2、数据重分区操作

    在这里插入图片描述
    在这里插入图片描述

    3、sink操作

    sink - kafka

    1、引入kafka的pom依赖

    <dependency>
    	<groupId>org.apache.flink</groupId>
    <!--<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
    <!--<version>${flink.version}</version>-->
    	<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    	<version>1.10.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.启动 zookeeper

    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    
    • 1

    3.启动 kafka 服务

    $ bin/kafka-server-start.sh config/server.properties
    
    • 1

    4.启动 kafka 生产者

    $ bin/kafka-console-profucer.sh --broker-list localhost:9092 --topic sensor
    
    • 1

    5.运行 Flink 程序,在 kafka 生产者输入数据,查看 kafka 消费者的输出结果
    ![在这里插入图在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    sink - redis

    1、添加 pom 依赖
    在这里插入图片描述
    2、 java代码
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    3、启动 redis

    redis-server..exe redis.windows.conf
    
    • 1

    原来的不要关闭,启动另一个窗口

    redis-cli.exe -h 127.0.0.1 -p 6379
    
    • 1

    4、运行程序,进行查询
    在这里插入图片描述

    sink-JDBC自定义sink-mysql

    1.pom依赖
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    错误

    1 启动 zookeeper,却无法启动 kafka

    原因:kafka 日志被异常清理导致进程频繁挂掉

    linux 会定时清理 /tmp 目录下的文件, kafka 日志文件目录正是放在了 /tmp/kafka-logs目录下,导致被定时给清理掉了,所以 kafka 在尝试读取或追加日志时就会出错。

    修改:配置文件中的log.dirs

    vi ./config/server.properties
    
    • 1

    重启kafka
    在这里插入图片描述
    2 另一个程序正在使用此文件,进程无法访问

    原因:该问题是因为在关闭启动命令窗口时,直接点击右上角的×号关闭,下次启动就会出现该问题。
    正确的关闭窗口方法:在启动窗口按ctrl+C

    输入字母—Y则可成功关闭。需要注意的是该选择可能需要等待一会。这样子关闭窗口下次就可以正常启动了。

    3由于 window 自带的 linux 子系统,由于是个 mini 的系统,没有其他功能,也无法下载软件 ,因此 windows 版本的 redis,在 window 中的 cmd 中测试

  • 相关阅读:
    [UEFI] Hob
    spring-cloud-gateway启动失败以及springboo和springcloud版本对应关系总结
    需求可追溯性的四个最佳实践
    C语言指针基础篇
    js/vue/tsx 中获取dom元素的方法集合
    数组的声明和使用
    使用conda install一直卡在solving environment的解决方法
    数字IC前端笔试常见面试问题整理
    LibAlias
    AI时代下的数据隐私问题:保护个人信息的重要性
  • 原文地址:https://blog.csdn.net/weixin_52126591/article/details/136422460