• pyflink 环境测试以及测试案例


    1. py 的 环境以来采用Anaconda环境包

    安装版本:https://www.anaconda.com/distribution/#download-section
    Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
    下载地址
    https://repo.anaconda.com/archive/

    2. 安装

    bash Anaconda3-2021.05-Linux-x86_64.sh
    
    • 1

    2.1 如图

    在这里插入图片描述

    在这里插入图片描述

    3. 配置配置anaconda的环境变量:

    vim /etc/profile
    ##增加如下配置
    export ANACONDA_HOME=/root/anaconda3/bin
    export PATH=$PATH:$ANACONDA_HOME/bin
    重新加载环境变量: source /etc/profile
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4. 修改bashrc文件

    sudo vim ~/.bashrc
    添加如下内容:
    export PATH=~/anaconda3/bin:$PATH
    
    • 1
    • 2
    • 3

    说明:

    profile
    其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
    bashrc
    bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

    5. 启动anaconda并测试

    注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
    如图:
    在这里插入图片描述

    如果没有可以重启服务器。

    如果大家发现命令行最前面出现了 (base) 信息, 可以通过以下方式, 退出Base环境

    vim ~/.bashrc
    拉到文件的最后面: 输入 i 进入插入模式
    将以下内容添加:
    conda deactivate
    
    • 1
    • 2
    • 3
    • 4

    6. Anaconda相关组件命令

    地址:https://www.continuum.io/downloads

    安装包:pip install xxx,conda install xxx
    卸载包:pip uninstall xxx,conda uninstall xxx
    升级包:pip install upgrade xxx,conda update xxx
    
    • 1
    • 2
    • 3

    6.1 功能:

    Anaconda自带,无需单独安装
    实时查看运行过程
    基本的web编辑器(本地)
    ipynb 文件分享
    可交互式
    记录历史运行结果
    修改jupyter显示的文件路径:
    通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。

    IPython:

    命令:ipython,其功能如下
    1.Anaconda自带,无需单独安装
    2.Python的交互式命令行 Shell
    3.可交互式
    4.记录历史运行结果
    5.及时验证想法

    7. Anaconda中的conda命令做详细介绍和配置。

    ** 7.1. conda命令及pip命令**

    conda install  包名    pip install 包名
    conda uninstall 包名   pip uninstall 包名
    conda install -U 包名   pip install -U 包名
    
    • 1
    • 2
    • 3

    7.2 Anaconda设置为国内下载镜像

    conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
    conda config --set show_channel_urls yes
    
    • 1
    • 2

    7.3 conda创建虚拟环境

    conda env list
    conda create py_env python=3.8.8 #创建python3.8.8环境
    
    #现在使用以下命令激活新创建的环境:
    source activate py_env   
    # 或者
    conda activate py_env   
    deactivate py_env #退出环境
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ----------------------------------------------- Pyflink 环境安装-------------------------------------

    8. pyflink 环境安装

    激活虚拟环境

    source ~/Documents/install/miniconda/bin/activate
    
    • 1

    创建 pyflink 虚拟环境

    conda create --name py310_pyflink171_venv -y -q python=3.10.8
    conda activate py310_pyflink171_venv
    pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
    pip install apache-flink==1.17.1 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517
    
    • 1
    • 2
    • 3
    • 4

    或者 flink 14.5

    # 创建 pyflink 虚拟环境
    conda create --name py314_pyflink171_venv -y -q python=3.8.8
    conda activate py314_pyflink171_venv
    pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
    pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果不想进入虚拟环境 在执行 就直接 在 服务器最外面执行:

    pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
    pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517
    
    • 1
    • 2

    9. 官网测试例子

    地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html

    vi word_count2.py

    ################################################################################
    #  Licensed to the Apache Software Foundation (ASF) under one
    #  or more contributor license agreements.  See the NOTICE file
    #  distributed with this work for additional information
    #  regarding copyright ownership.  The ASF licenses this file
    #  to you under the Apache License, Version 2.0 (the
    #  "License"); you may not use this file except in compliance
    #  with the License.  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    # limitations under the License.
    ################################################################################
    import argparse
    import logging
    import sys
    
    from pyflink.table import TableEnvironment, EnvironmentSettings, TableDescriptor, Schema,\
        DataTypes, FormatDescriptor
    from pyflink.table.expressions import col, lit
    from pyflink.table.udf import udf
    
    words = ["flink", "window", "timer", "event_time", "processing_time", "state",
             "connector", "pyflink", "checkpoint", "watermark", "sideoutput", "sql",
             "datastream", "broadcast", "asyncio", "catalog", "batch", "streaming"]
    
    max_word_id = len(words) - 1
    
    
    def streaming_word_count(output_path):
        t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    
        # define the source
        # randomly select 5 words per second from a predefined list
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('datagen')
                           .schema(Schema.new_builder()
                                   .column('word_id', DataTypes.INT())
                                   .build())
                           .option('fields.word_id.kind', 'random')
                           .option('fields.word_id.min', '0')
                           .option('fields.word_id.max', str(max_word_id))
                           .option('rows-per-second', '5')
                           .build())
        tab = t_env.from_path('source')
    
        # define the sink
        if output_path is not None:
            t_env.create_temporary_table(
                'sink',
                TableDescriptor.for_connector('filesystem')
                               .schema(Schema.new_builder()
                                       .column('word', DataTypes.STRING())
                                       .column('count', DataTypes.BIGINT())
                                       .build())
                               .option('path', output_path)
                               .format(FormatDescriptor.for_format('canal-json')
                                       .build())
                               .build())
        else:
            print("Printing result to stdout. Use --output to specify output path.")
            t_env.create_temporary_table(
                'sink',
                TableDescriptor.for_connector('print')
                               .schema(Schema.new_builder()
                                       .column('word', DataTypes.STRING())
                                       .column('count', DataTypes.BIGINT())
                                       .build())
                               .build())
    
        @udf(result_type=DataTypes.STRING())
        def id_to_word(word_id):
            return words[word_id]
    
        # compute word count
        tab.select(id_to_word(col('word_id'))).alias('word') \
           .group_by(col('word')) \
           .select(col('word'), lit(1).count) \
           .execute_insert('sink') \
           .wait()
        # remove .wait if submitting to a remote cluster, refer to
        # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
        # for more details
    
    
    if __name__ == '__main__':
        logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--output',
            dest='output',
            required=False,
            help='Output file to write results to.')
    
        argv = sys.argv[1:]
        known_args, _ = parser.parse_known_args(argv)
    
        streaming_word_count(known_args.output)
    
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    10.执行命令

     python word_count2.py
    
    • 1

    11. 执行结果

    在这里插入图片描述

    12 实例2

    12.1 安装mysql docker 安装
    https://blog.csdn.net/wudonglianga/article/details/133927305
    12.2 创建表语句

    CREATE TABLE `bigdatauser` (
      `id` int(32) NOT NULL AUTO_INCREMENT,
      `name` varchar(64) DEFAULT NULL,
      `age` int(32) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    12.3 python 脚本

    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    
    print('step 01')
    # 1. create a TableEnvironment
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    print('step 02')
    
    table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar;file:/opt/flink/lib/mysql-connector-java-5.1.49.jar")
    print('step 03')
    
    # 2. create source Table=
    table_env.execute_sql("""
        CREATE TABLE products (
            id INT,
            name STRING,
            age INT,
            PRIMARY KEY (id) NOT ENFORCED
        ) WITH (
    	'connector' = 'jdbc',  
            'url' = 'jdbc:mysql://192.168.43.185:3306/wudl',
            'username'='root',
            'password'='123456',
            'table-name' = 'bigdatauser'
            
        )
    """)
    
    # 3. create sink Table
    table_env.execute_sql("""
        CREATE TABLE dproducts (
            id int,
            name STRING,
            age int
        ) WITH (
            'connector' = 'print'
        )
    """)
    
    print('step 04')
    table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.age FROM products AS p").wait()
    print('step 06')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    12.4 执行结果
    在这里插入图片描述

    13. 任务提交服务器运行

    打包运行环境

    # 找到 minconda(安装路径 envs目录下) 或者对应虚拟环境安装目录
    # 打包 py310_pyflink171_venv 虚拟环境
    cd ~/Documents/install/miniconda/env
    zip -r py310_pyflink171_venv.zip py310_pyflink171_venv
    
    • 1
    • 2
    • 3
    • 4

    ** 1.提交至 jobmanager**

    ./flink run \
    --jobmanager localhost:8081 \
    -pyarch file:///workplace/py310_pyflink171_venv.zip \
    -pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -py /workplace/src/word_count.py
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2. 带目录,指定入口模块提交

    ./flink run \
    --jobmanager localhost:8081 \
    -pyarch file:///workplace/py310_pyflink171_venv.zip \
    -pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyfs /workplace/src \
    -pym word_count
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. 提交至 yarn 集群管理
    提交运行
    3.1.本地 py虚拟环境

    ./flink run -m yarn-cluster \
    -pyarch file:///workplace/py310_pyflink171_venv.zip \
    -pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -py word_count.py
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.2. hdfs py虚拟环境

    ./flink run  -m yarn-cluster \
    -pyarch hdfs://dae-ns/py_env/py310_pyflink171_venv.zip \
    -pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv \
    -py word_count.py
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.3.带目录 src

    ./bin/flink run-application -t yarn-application \
    -Dyarn.application.name=wordcount \
    -Dyarn.ship-files=/workplace/src \
    -pyarch shipfiles/py310_pyflink171_venv.zip \
    -pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
    -pyfs src \
    -pym word_count
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注意:

    虚拟环境打包,该虚拟环境创建方式建议使用 conda,或者virtualenv --always-copy 方式创建,这样打的虚拟环境更全
    提交虚拟环境地址:py310_pyflink171_venv.zip/py310_pyflink171_venv 注意这个地址是双层

  • 相关阅读:
    uni.getLocation() 微信小程序 线上获取失败
    技术小知识:云计算服务下的IaaS,PaaS,SaaS⑥
    Go 语言 设计模式-单例模式
    学之思开源考试系统部署至Centos7
    Kubernetes(k8s)的Volume数据存储配置储存类型ConfigMap和Secret的使用
    Guava入门~EventBus~Event Publishing示例
    快速上手Linux核心命令(十一):Linux用户相关命令
    OS--学习笔记:调度与死锁
    元强化学习 论文理解 MAESN
    经典算法-----01背包问题(动态规划)
  • 原文地址:https://blog.csdn.net/wudonglianga/article/details/133929034