• datax扩展vertica插件


    1.idea里面新建module

    2.新建verticareader模块

    3.复制mysqlreader下的pom.xml到本项目的pom.xml中

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-all</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <artifactId>verticareader</artifactId>
        <name>verticareader</name>
        <packaging>jar</packaging>
    
        <dependencies>
            <dependency>
                <groupId>com.alibaba.datax</groupId>
                <artifactId>datax-common</artifactId>
                <version>${datax-project-version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-log4j12</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba.datax</groupId>
                <artifactId>plugin-rdbms-util</artifactId>
                <version>${datax-project-version}</version>
            </dependency>
            <dependency>
                <groupId>com.vertica</groupId>
                <artifactId>vertica-jdbc</artifactId>
                <version>9.3.1-0</version>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <plugins>
                <!-- compiler plugin -->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>${jdk-version}</source>
                        <target>${jdk-version}</target>
                        <encoding>${project-sourceEncoding}</encoding>
                    </configuration>
                </plugin>
                <!-- assembly plugin -->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptors>
                            <descriptor>src/main/assembly/package.xml</descriptor>
                        </descriptors>
                        <finalName>datax</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <id>dwzip</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    4.新建assembly目录

    5.目录下新建package.xml,将此文件粘贴进去

    <assembly
            xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
        <id></id>
        <formats>
            <format>dir</format>
        </formats>
        <includeBaseDirectory>false</includeBaseDirectory>
        <fileSets>
            <fileSet>
                <directory>src/main/resources</directory>
                <includes>
                    <include>plugin.json</include>
                    <include>plugin_job_template.json</include>
                </includes>
                <outputDirectory>plugin/reader/verticareader</outputDirectory>
            </fileSet>
            <fileSet>
                <directory>target/</directory>
                <includes>
                    <include>verticareader-0.0.1-SNAPSHOT.jar</include>
                </includes>
                <outputDirectory>plugin/reader/verticareader</outputDirectory>
            </fileSet>
        </fileSets>
    
        <dependencySets>
            <dependencySet>
                <useProjectArtifact>false</useProjectArtifact>
                <outputDirectory>plugin/reader/verticareader/libs</outputDirectory>
                <scope>runtime</scope>
            </dependencySet>
        </dependencySets>
    </assembly>
    

    新建plugin.json

    {
      "name": "verticareader",
      "class": "com.xxxx.VerticaReader",
      "description": "vertica reader",
      "developer": "cx"
    }

    新建plugin_job_template.json

    {
        "name": "verticareader",
        "parameter": {
            "username": "",
            "password": "",
            "column": [],
            "connection": [
                {
                "jdbcUrl": [],
                "table": []
                }
            ],
            "where": ""
        }
    }

     6.src目录新建java文件,包名可以随便取

    import com.alibaba.datax.common.element.StringColumn;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordSender;
    import com.alibaba.datax.common.spi.Reader;
    import com.alibaba.datax.common.util.Configuration;
    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
    import com.alibaba.datax.plugin.rdbms.reader.Constant;
    import com.alibaba.datax.plugin.rdbms.reader.Key;
    import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
    import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.ArrayList;
    import java.util.List;
    
    public class VerticaReader extends Reader {
    
        private static final DataBaseType DATABASE_TYPE = DataBaseType.Vertica;
    
        public static class Job extends Reader.Job {
    
            private static final Logger LOG = LoggerFactory
                    .getLogger(Job.class);
            private Configuration originalConfig = null;
            private CommonRdbmsReader.Job commonRdbmsReaderJob;
    
            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
                if (userConfigedFetchSize != null) {
                    LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
                }
                this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
    
                this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
    //            this.commonRdbmsReaderJob.init(this.originalConfig);
            }
    
            @Override
            public void prepare() {
                LOG.info("vertica prepare....");
                init();
    //            this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);
            }
    
            @Override
            public List<Configuration> split(int adviceNumber) {
                LOG.info("vertica split...."+adviceNumber);
                List<Configuration> configurations = new ArrayList<>();
                Integer partitions = 1;
                for (int i = 0; i < partitions; i++) {
                    configurations.add(this.originalConfig.clone());
                }
                return configurations;
            }
    
            @Override
            public void post() {
                LOG.info("vertica post....");
                this.commonRdbmsReaderJob.post(this.originalConfig);
            }
    
            @Override
            public void destroy() {
                LOG.info("vertica destroy....");
                this.commonRdbmsReaderJob.destroy(this.originalConfig);
            }
    
        }
    
        public static class Task extends Reader.Task {
            private static final Logger LOG = LoggerFactory
                    .getLogger(Task.class);
            private Configuration readerSliceConfig;
            private CommonRdbmsReader.Task commonRdbmsReaderTask;
    
            static final String JDBC_DRIVER="com.vertica.jdbc.Driver";
    
            static  String  DB_URL="jdbc:vertica://xxxxxx:5433/test1";
    
            static  String USER = "xxxx";
    
            static  String PASSWORD = "xxxxx";
    
            static String sql = "";
    
            Connection conn = null;
            Statement stmt = null;
    
            public void doCreateParams(Configuration readerSliceConfig) {
                this.USER = readerSliceConfig.getString(Key.USERNAME);
                this.PASSWORD = readerSliceConfig.getString(Key.PASSWORD);
    
                List<Object> conns = readerSliceConfig.getList(Constant.CONN_MARK, Object.class);
    
                List<Configuration> splittedConfigs = new ArrayList<Configuration>();
                Configuration connConf = Configuration.from(conns.get(0).toString());
                this.DB_URL = connConf.getList(Key.JDBC_URL).get(0).toString();
                this.sql = connConf.getList(Key.QUERY_SQL).get(0).toString();
    //            for (int i = 0, len = conns.size(); i < len; i++) {
    //                Configuration sliceConfig = originalSliceConfig.clone();
    //
    //                Configuration connConf = Configuration.from(conns.get(i).toString());
    //                String jdbcUrl = connConf.getString(Key.JDBC_URL);
    //            }
    
    
                LOG.info("this is ob1_0 jdbc url. user=" + this.USER + " :url=" + this.DB_URL);
    
            }
    
            @Override
            public void init() {
                this.readerSliceConfig = super.getPluginJobConf();
                this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());
                this.doCreateParams(this.readerSliceConfig);
    
                try{
                    //注册 JDBC 驱动
                    Class.forName(JDBC_DRIVER);
                    //打开链接
                    System.out.println("Connect DB..."+this.USER+","+this.PASSWORD+","+this.DB_URL);
                    conn= DriverManager.getConnection(DB_URL,USER,PASSWORD);
    
                    //执行查询
                    System.out.println("实例化Statement对象...");
                    stmt = conn.createStatement();
    
                }
                catch(Exception e){
                    e.printStackTrace();
                }
    
            }
    
            @Override
            public void prepare() {
                LOG.info("vertica Task prepare....");
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
    //            int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
    //
    //            this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
    //                    super.getTaskPluginCollector(), fetchSize);
                try {
                    String table = readerSliceConfig.getString(Key.TABLE);
                    String columns = readerSliceConfig.getString(Key.COLUMN);
                    LOG.info(table+","+columns+"------------------");
    
                    ResultSet rs = stmt.executeQuery(sql);
                    //展开结果集数据库
                    System.out.println("aoeId" + "\t" + "aoeAes" + "\t" + "aoeSm4");
                    while (rs.next()) {
                        System.out.print(rs.getInt(1) + "\t");
                        System.out.print(rs.getString(2) + "\t");
                        System.out.print(rs.getString(3) + "\t");
                        System.out.println();
                        Record record = recordSender.createRecord();
                        record.addColumn(new StringColumn(rs.getString(1)));
                        record.addColumn(new StringColumn(rs.getString(2)));
                        record.addColumn(new StringColumn(rs.getString(3)));
                        recordSender.sendToWriter(record);
                    }
                    rs.close();
    
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
    
            @Override
            public void post() {
                LOG.info("vertica Task post....");
                this.commonRdbmsReaderTask.post(this.readerSliceConfig);
            }
    
            @Override
            public void destroy() {
                try {
                    stmt.close();
                    conn.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
                LOG.info("vertica Task destroy....");
                this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
            }
        }
    
    }
    

    7.执行命令,生成插件

    mvn -U clean package assembly:assembly -Dmaven.test.skip=true

    8.将本地datax\plugin\reader目录上传到datax服务器目录datax/plugin/reader

    9.编写.json配置文件

    {
        "job": {
            "content": [
                {
                    "reader": {
                        "parameter": {
                            "password": "xxxxxx",
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        "jdbc:vertica://xxxxx:5433/test1"
                                    ],
                                    "querySql": [
                                        "SELECT id,name as ds,age as server_id FROM TEST_01"
                                    ]
                                }
                            ],
                            "writeMode": "update",
                            "username": "xxxxx"
                        },
                        "name": "verticareader"
                    },
                    "writer": {
                        "parameter": {
                            "password": "xxxxx",
                            "column": [
                                "id",
                                "ds",
                                "server_id"
                            ],
                            "connection": [
                                {
                                    "jdbcUrl": "jdbc:mysql://xxxxxx:3306/xxxxx?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true",
                                    "table": [
                                        "xxxxx"
                                    ]
                                }
                            ],
                            "writeMode": "update",
                            "username": "xxxx"
                        },
                        "name": "mysqlwriter",
                        "writeMode": "update"
                    }
                }
            ],
            "setting": {
                "speed": {
                    "byte": 1048576,
                    "channel": 1
                }
            }
        }
    }

    10.执行命令python bin/datax.py job/vertica_2_mysql.json

    vertica表结构

     mysql表结构

    搞定:)

  • 相关阅读:
    Qt学习12 计算器核心解析算法 (上)
    vue简单案例----小张记事本
    linux之framebuffer(1)
    【吞噬星空】劲爆,徐欣因祸得福,罗峰处理好评,终于有点爽文男主的感觉
    systemverilog 中的 `define ---带参数的宏函数--macro function
    Google Earth Engine(GEE)——美国俄勒冈大学制作的可视化土地分类下载器
    “大数据分析师”来了,提高职业含金量,欢迎来领
    【论文阅读】自动作文评分系统:一份系统的文献综述
    prototype 原型对象
    腾讯二面:为什么不建议在 Docker 中跑 MySQL?
  • 原文地址:https://blog.csdn.net/u010479989/article/details/125616944