• 1.34.FlinkX\工作原理\快速起步|1.35.Flink资料


    1.34.FlinkX
    1.34.1.什么是FlinkX
    1.34.2.工作原理
    1.34.3.快速起步
    1.34.3.1.运行模式
    1.34.3.2.执行环境
    1.34.3.3.打包
    1.34.3.4.启动
    1.34.3.4.1.命令行参数选项
    1.34.3.4.2.启动数据同步
    1.34.4.数据同步任务模板
    1.34.4.1.setting
    1.34.4.1.1.speed
    1.34.4.1.2.errorLimit
    1.34.4.1.3.dirty
    1.34.4.1.4.restore
    1.34.4.2.content
    1.34.4.3.数据同步任务例子
    1.35.Flink资料

    1.34.FlinkX

    1.34.1.什么是FlinkX

    FlinkX是在是袋鼠云内部广泛使用的基于flink的分布式离线数据同步框架,实现了多种异构数据源之间高效的数据迁移。

    不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
    在这里插入图片描述

    1.34.2.工作原理

    在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
    在这里插入图片描述

    1.34.3.快速起步

    1.34.3.1.运行模式

    单击模式:对应Flink集群的单机模式
    standalone模式:对应Flink集群的分布式模式
    yarn模式:对应Flink集群的yarn模式

    1.34.3.2.执行环境

    Java: JDK8及以上
    Flink集群: 1.4及以上(单机模式不需要安装Flink集群)
    操作系统:理论上不限,但是目前只编写了shell启动脚本,用户可以可以参考shell脚本编写适合特定操作系统的启动脚本。

    1.34.3.3.打包

    进入项目根目录,使用maven打包:

    mvn clean package -Dmaven.test.skip
    
    • 1

    打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包。

    1.34.3.4.启动

    1.34.3.4.1.命令行参数选项

    在这里插入图片描述
    在这里插入图片描述

    1.34.3.4.2.启动数据同步

    以本地模式启动数据同步任务

    bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
    
    • 1

    以standalone模式启动数据同步任务

    bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json  -pluginRoot /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
    
    • 1

    以yarn模式启动数据同步任务

    bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json  -pluginRoot /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
    
    • 1

    1.34.4.数据同步任务模板

    从最高空俯视,一个数据同步的构成很简单,如下:

    {
        "job": {
            "setting": {...},
            "content": [...]
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    数据同步任务包括一个job元素,而这个元素包括setting和content两部分。
    setting: 用于配置限速、错误控制和脏数据管理。
    content: 用于配置具体任务信息,包括从哪里来(Reader插件信息),到哪里去(Writer插件信息)

    1.34.4.1.setting

    "setting": {
        "speed": {...},
        "errorLimit": {...},
        "dirty": {...}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    settings包括speed、errorLimit和dirty部分,分别描述限速、错误控制和脏数据管理的配置信息。

    1.34.4.1.1.speed
    "speed": {
        "channel": 3,
        "bytes": 0
    }
    
    • 1
    • 2
    • 3
    • 4

    channel: 任务并发数
    bytes: 每秒字节数,默认为 Long.MAX_VALUE

    1.34.4.1.2.errorLimit
    "errorLimit": {
         "record": 10000,
         "percentage": 100
    }
    
    • 1
    • 2
    • 3
    • 4

    record: 出错记录数超过record设置的条数时,任务标记为失败。
    percentage: 当出错记录超过percentage百分数时,任务标记为失败

    1.34.4.1.3.dirty
    "dirty": {
    	"path": "/tmp",
    	"hadoopConfig": {
    		"fs.default.name": "hdfs://ns1",
    		"dfs.nameservices": "ns1",
    		"dfs.ha.namenodes.ns1": "nn1,nn2",
    		"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
    		"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
    		"dfs.ha.automatic-failover.enabled": "true",
    		"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    		"fs.hdfs.impl.disable.cache": "true"
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    path: 脏数据存放数据
    hadoopConfig: 脏数据存放路径对应hdfs的配置信息(hdfs高可用配置)

    1.34.4.1.4.restore
    "restore": {
    	"isRestore": false,
    	"restoreColumnName": "",
    	"restoreColumnIndex": 0
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    restore配置请参考”断点续传”(https://gitee.com/tuzuoquan/flinkx/blob/master/docs/restore.md)

    1.34.4.2.content

    "content": [
    	{
    	   "reader": {
    			"name": "...",
    			"parameter": {
    				...
    			}
    		},
    	   "writer": {
    			"name": "...",
    			"parameter": {
    				 ...
    			 }
    		}
    	}
    ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    reader: 用于读取数据的插件信息.
    writer: 用于写入数据的插件的信息。

    reader和writer包括name和parameter,分别表示插件名称和插件参数。

    1.34.4.3.数据同步任务例子

    具体可以查看:
    https://gitee.com/tuzuoquan/flinkx

    1.35.Flink资料

    https://www.ucloud.cn/yun/74764.html

    https://github.com/apache/rocketmq-externals

    https://github.com/zhisheng17/flink-learning

    https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink

    https://www.dazhuanlan.com/2019/11/15/5dcd9f1999f57/?cf_chl_jschl_tk=2b92cebd5bcf222f98c03e2941998e9123b63576-1600045745-0-AfjF13iSAepCCX5phJ7zu3dpwwpsc3ynAiWlvDjZgtZMJcQGQt5pXsS4p8FWKwQRA7gG0q9Zitp1CHHZCcSrUsNhQ9a20rvXKPj5VsQKfWwTdnqGNMpRBzB19PwwnPWMInWITKEn6QA_dZaBtUcrXnlEMIDjZ7fecSFQnCPcvHw87KWHrQ3qDDNjZhOWGnHZww9zwJ20lZcbFi1HucoDBtJlAKPPMrXVF8zpHxKOJwLR5DMGz3UL3PSwHSCoX1HzmvbBymDkrlY2_cwHBU2rlYIdZQsg_Cy87XH8tzudmZVwD5-11hVBs7PQlS8CHt4NGA

  • 相关阅读:
    组合拳 | 本地文件包含漏洞+TFTP=Getshell
    Python学习记录(3) 列表
    指静脉处理综合代码(orz)
    WebSocket通讯架构
    【Node.js操作SQLite指南】
    Oracle OCP 19c认证考试1Z0-082题库最新解析 第二十七题
    Spring5入门到实战------10、操作术语解释--Aspectj注解开发实例。AOP切面编程的实际应用
    Linux文件/目录高级管理二
    Redis 淘汰策略与过期策略及其应用场景
    apache开启https
  • 原文地址:https://blog.csdn.net/toto1297488504/article/details/125630572