• 【Flink实战】Flink对接Kafka Connetor使用docker部署kafka


    🚀 作者 :“大数据小禅”

    🚀 文章简介Flink对接Kafka Connetor第一步 使用docker部署kafka

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    什么是Docker

    • Docker 是一个开源的容器化平台,用于将应用程序和其依赖的环境打包成一个独立的容器,以实现应用程序的快速部署、可移植性和可伸缩性。

    0 传统的应用部署方式通常需要在目标环境中手动设置各种依赖项和配置,可能面临不同操作系统或软件版本之间的兼容性问题。而 Docker 可以通过容器的方式隔离应用程序和其依赖的环境,使得应用程序能够在任意系统上以相同的方式运行,并且不受目标环境的影响。

    Docker 的主要概念包括以下几个方面:

    1. 镜像(Image):镜像是一个只读的模板,它包含了运行一个应用程序所需的所有文件系统、库和依赖项。镜像可以从一个基础镜像上构建,也可以通过 Dockerfile 中的指令一步步构建。镜像是 Docker 容器的基础。

    2. 容器(Container):容器是镜像的运行实例,它包含了应用程序以及其所需的运行环境。容器是隔离运行的,其使用的文件系统和网络资源都是独立的。容器可以快速创建、启动、停止和销毁,它提供了一种轻量级、快速和可移植的应用部署方式。

    3. Dockerfile:Dockerfile 是一个文本文件,包含了构建镜像时的一系列指令。通过 Dockerfile,可以定义容器中的文件、环境变量、运行命令等。使用 Dockerfile 可以实现自动化构建、重现性和标准化的镜像构建过程。

    4. 仓库(Repository):仓库是用于存储和共享镜像的地方。Docker Hub 是 Docker 官方提供的公共仓库,可以从中获取各种常用的镜像。也可以配置私有仓库来存储和管理自己的镜像。

    Docker常用命令

    • docker安装教程查看个人主页的docker教程
      查看已经下载的镜像:

    • docker images
      删除指定的镜像:

    • docker rmi [IMAGE ID]
      其中,[IMAGE ID]是要删除的镜像的ID或名称。可以通过docker images命令查看对应的IMAGE ID。

    删除所有未被使用的镜像:

    • docker image prune
      这个命令会删除所有没有被任何容器使用的镜像。
      注意:删除镜像操作无法撤销,因此请谨慎操作。

    要查看正在运行的容器,可以使用以下命令:

    • docker ps
      该命令将列出当前正在运行的所有容器的详细信息,包括容器ID、镜像名称、启动时间、状态等。
      如果要查看所有的容器,包括已停止的容器,可以在命令中添加-a选项:

    • docker ps -a
      这个命令将列出所有的容器,包括正在运行和已停止的容器。

    要删除一个容器,可以使用以下命令:

    • docker rm [CONTAINER ID]
      其中,[CONTAINER ID]是要删除的容器的ID。可以通过docker ps -a命令查看对应的CONTAINER ID。

    Docker安装过程

    cd /etc/yum.repos.d
    //使用阿里云yum源
    wget http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
    依次执行:
    查看docker安装包:yum list | grep docker
    安装Docker Ce 社区版本:yum install -y docker-ce.x86_64
    设置开机启动:systemctl enable docker
    更新xfsprogs:yum -y update xfsprogs
    启动docker:systemctl start docker
    查看版本:docker version
    查看详细信息:docker info
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Docker部署kafka

    • 配置阿里云加速

      • 控制台->容器镜像服务->镜像加速器

    在这里插入图片描述

    • 每个账号一个加速地址 https://cr.console.aliyun.com/cn-shenzhen/instances/mirrors

    • mkdir -p /etc/docker/
      vi /etc/docker/daemon.json
      
      {
        "registry-mirrors": ["https://pw7d3bwu.mirror.aliyuncs.com"]
      }
      
      重启:systemctl daemon-reload && systemctl restart docker
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    • 快速搭建

    • #zk
      docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
      
      #kafka
      docker run -d --name xdclass_kafka \
      -p 9092:9092 \
      -e KAFKA_BROKER_ID=0 \
      -e KAFKA_ZOOKEEPER_CONNECT=192.168.192.100:2181 \
      -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.192.100:9092 \
      -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
      
      #查看容器
      docker ps
      
      #进入容器内部,创建topic
      docker exec -it 9b5070d79dfa /bin/bash
      
      cd /opt/kafka
      bin/kafka-topics.sh --create --zookeeper 192.168.192.100:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
      
      
      #创建生产者发送消息
      bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic
      
      
      #运行一个消费者
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-topic --from-beginning
      
      #开两个窗口可测试生产者消费者成功链接
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29

    在这里插入图片描述

    • 测试案例
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    public class FlilnkKafkaApp01 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties props = new Properties();
            //kafka地址
            props.setProperty("bootstrap.servers", "192.168.192.100:9092");
            //组名 随意指定
            props.setProperty("group.id", "test");
            FlinkKafkaConsumer<String> kafka =new FlinkKafkaConsumer<>("xdclass-topic", new SimpleStringSchema(), props);
            DataStreamSource<String> stream = env.addSource(kafka);
            stream.print();
            env.execute("FlinkKafkaApp");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    ETL数据转换工具类型与适用场景
    Linux 的性能调优的思路
    交易用户如何去使用l2行情数据api接口?
    systemverilog中@和wait的区别
    Docker 安装 mysql5.7
    NumPy(二)
    MySQL代码错误号大全
    POI-TL制作word
    springboot基于Java Web的华家医疗器械商城设计与实现毕业设计源码261620
    vue父子组件通信方式
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132857817