1.idea里面新建module

2.新建verticareader模块

3.复制mysqlreader下的pom.xml到本项目的pom.xml中
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>verticareader</artifactId>
<name>verticareader</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.vertica</groupId>
<artifactId>vertica-jdbc</artifactId>
<version>9.3.1-0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.新建assembly目录

5.目录下新建package.xml,将此文件粘贴进去
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/verticareader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>verticareader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/verticareader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/verticareader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
新建plugin.json
{
"name": "verticareader",
"class": "com.xxxx.VerticaReader",
"description": "vertica reader",
"developer": "cx"
}
新建plugin_job_template.json
{
"name": "verticareader",
"parameter": {
"username": "",
"password": "",
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"where": ""
}
}
6.src目录新建java文件,包名可以随便取
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
public class VerticaReader extends Reader {
private static final DataBaseType DATABASE_TYPE = DataBaseType.Vertica;
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory
.getLogger(Job.class);
private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
if (userConfigedFetchSize != null) {
LOG.warn("对 mysqlreader 不需要配置 fetchSize, mysqlreader 将会忽略这项配置. 如果您不想再看到此警告,请去除fetchSize 配置.");
}
this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
// this.commonRdbmsReaderJob.init(this.originalConfig);
}
@Override
public void prepare() {
LOG.info("vertica prepare....");
init();
// this.commonRdbmsReaderJob.preCheck(this.originalConfig,DATABASE_TYPE);
}
@Override
public List<Configuration> split(int adviceNumber) {
LOG.info("vertica split...."+adviceNumber);
List<Configuration> configurations = new ArrayList<>();
Integer partitions = 1;
for (int i = 0; i < partitions; i++) {
configurations.add(this.originalConfig.clone());
}
return configurations;
}
@Override
public void post() {
LOG.info("vertica post....");
this.commonRdbmsReaderJob.post(this.originalConfig);
}
@Override
public void destroy() {
LOG.info("vertica destroy....");
this.commonRdbmsReaderJob.destroy(this.originalConfig);
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory
.getLogger(Task.class);
private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;
static final String JDBC_DRIVER="com.vertica.jdbc.Driver";
static String DB_URL="jdbc:vertica://xxxxxx:5433/test1";
static String USER = "xxxx";
static String PASSWORD = "xxxxx";
static String sql = "";
Connection conn = null;
Statement stmt = null;
public void doCreateParams(Configuration readerSliceConfig) {
this.USER = readerSliceConfig.getString(Key.USERNAME);
this.PASSWORD = readerSliceConfig.getString(Key.PASSWORD);
List<Object> conns = readerSliceConfig.getList(Constant.CONN_MARK, Object.class);
List<Configuration> splittedConfigs = new ArrayList<Configuration>();
Configuration connConf = Configuration.from(conns.get(0).toString());
this.DB_URL = connConf.getList(Key.JDBC_URL).get(0).toString();
this.sql = connConf.getList(Key.QUERY_SQL).get(0).toString();
// for (int i = 0, len = conns.size(); i < len; i++) {
// Configuration sliceConfig = originalSliceConfig.clone();
//
// Configuration connConf = Configuration.from(conns.get(i).toString());
// String jdbcUrl = connConf.getString(Key.JDBC_URL);
// }
LOG.info("this is ob1_0 jdbc url. user=" + this.USER + " :url=" + this.DB_URL);
}
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE,super.getTaskGroupId(), super.getTaskId());
this.doCreateParams(this.readerSliceConfig);
try{
//注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
//打开链接
System.out.println("Connect DB..."+this.USER+","+this.PASSWORD+","+this.DB_URL);
conn= DriverManager.getConnection(DB_URL,USER,PASSWORD);
//执行查询
System.out.println("实例化Statement对象...");
stmt = conn.createStatement();
}
catch(Exception e){
e.printStackTrace();
}
}
@Override
public void prepare() {
LOG.info("vertica Task prepare....");
}
@Override
public void startRead(RecordSender recordSender) {
// int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
//
// this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
// super.getTaskPluginCollector(), fetchSize);
try {
String table = readerSliceConfig.getString(Key.TABLE);
String columns = readerSliceConfig.getString(Key.COLUMN);
LOG.info(table+","+columns+"------------------");
ResultSet rs = stmt.executeQuery(sql);
//展开结果集数据库
System.out.println("aoeId" + "\t" + "aoeAes" + "\t" + "aoeSm4");
while (rs.next()) {
System.out.print(rs.getInt(1) + "\t");
System.out.print(rs.getString(2) + "\t");
System.out.print(rs.getString(3) + "\t");
System.out.println();
Record record = recordSender.createRecord();
record.addColumn(new StringColumn(rs.getString(1)));
record.addColumn(new StringColumn(rs.getString(2)));
record.addColumn(new StringColumn(rs.getString(3)));
recordSender.sendToWriter(record);
}
rs.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
@Override
public void post() {
LOG.info("vertica Task post....");
this.commonRdbmsReaderTask.post(this.readerSliceConfig);
}
@Override
public void destroy() {
try {
stmt.close();
conn.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
LOG.info("vertica Task destroy....");
this.commonRdbmsReaderTask.destroy(this.readerSliceConfig);
}
}
}
7.执行命令,生成插件
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
8.将本地datax\plugin\reader目录上传到datax服务器目录datax/plugin/reader

9.编写.json配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"password": "xxxxxx",
"connection": [
{
"jdbcUrl": [
"jdbc:vertica://xxxxx:5433/test1"
],
"querySql": [
"SELECT id,name as ds,age as server_id FROM TEST_01"
]
}
],
"writeMode": "update",
"username": "xxxxx"
},
"name": "verticareader"
},
"writer": {
"parameter": {
"password": "xxxxx",
"column": [
"id",
"ds",
"server_id"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://xxxxxx:3306/xxxxx?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true",
"table": [
"xxxxx"
]
}
],
"writeMode": "update",
"username": "xxxx"
},
"name": "mysqlwriter",
"writeMode": "update"
}
}
],
"setting": {
"speed": {
"byte": 1048576,
"channel": 1
}
}
}
}
10.执行命令python bin/datax.py job/vertica_2_mysql.json

vertica表结构

mysql表结构

搞定:)