DataX3.0支持阿里的OSS的读写,但没支持S3的读写,虽然OSS的也是基于S3协议去做二开的,但是一些参数有点区别,所以参考阿里的OSSReader和OSSWriter的设计开发了S3Reader和S3Writer。

package.xml
- <assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <formats>
- <format>dir</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>src/main/resources</directory>
- <includes>
- <include>plugin.json</include>
- <include>plugin_job_template.json</include>
- </includes>
- <outputDirectory>plugin/reader/s3reader</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>target/</directory>
- <includes>
- <include>s3reader-0.0.1-SNAPSHOT.jar</include>
- </includes>
- <outputDirectory>plugin/reader/s3reader</outputDirectory>
- </fileSet>
- </fileSets>
-
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>false</useProjectArtifact>
- <outputDirectory>plugin/reader/s3reader/libs</outputDirectory>
- <scope>runtime</scope>
- </dependencySet>
- </dependencySets>
- </assembly>
Constant.java
- package com.alibaba.datax.plugin.reader.s3reader;
-
- public class Constant {
- public static final String OBJECT = "object";
- public static final int CONNECT_TIMEOUT = 50000;
- public static final String SIGNER_OVERRIDE = "S3SignerType";
- }
Key.java
- public class Key {
- public static final String ENDPOINT = "endpoint";
-
- public static final String ACCESSKEY = "accessKey";
-
- public static final String SECRETKEY = "secretKey";
-
- public static final String BUCKET = "bucket";
-
- public static final String OBJECT = "object";
- }
S3Reader.java
- package com.alibaba.datax.plugin.reader.s3reader;
-
- import com.alibaba.datax.common.exception.DataXException;
- import com.alibaba.datax.common.plugin.RecordSender;
- import com.alibaba.datax.common.spi.Reader;
- import com.alibaba.datax.common.util.Configuration;
- import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
- import com.amazonaws.services.s3.AmazonS3;
- import com.amazonaws.services.s3.model.ListObjectsRequest;
- import com.amazonaws.services.s3.model.ObjectListing;
- import com.amazonaws.services.s3.model.S3Object;
- import com.amazonaws.services.s3.model.S3ObjectSummary;
- import com.google.common.collect.Sets;
- import org.apache.commons.io.Charsets;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.InputStream;
- import java.nio.charset.UnsupportedCharsetException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Set;
- import java.util.regex.Pattern;
-
-
- public class S3Reader extends Reader {
- public static class Job extends Reader.Job {
- private static final Logger LOG = LoggerFactory.getLogger(Job.class);
-
- private Configuration readerOriginConfig = null;
-
- @Override
- public void init() {
- LOG.debug("init() begin...");
- this.readerOriginConfig = this.getPluginJobConf();
- this.validate();
- LOG.debug("init() ok and end...");
- }
-
- private void validate() {
- String endpoint = this.readerOriginConfig.getString(Key.ENDPOINT);
- if (StringUtils.isBlank(endpoint)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 endpoint");
- }
-
- String secretKey = this.readerOriginConfig.getString(Key.SECRETKEY);
- if (StringUtils.isBlank(secretKey)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 secretKey");
- }
-
- String accessKey = this.readerOriginConfig.getString(Key.ACCESSKEY);
- if (StringUtils.isBlank(accessKey)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 accessKey");
- }
-
- String bucket = this.readerOriginConfig.getString(Key.BUCKET);
- if (StringUtils.isBlank(bucket)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 bucket");
- }
-
- String object = this.readerOriginConfig.getString(Key.OBJECT);
- if (StringUtils.isBlank(object)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 object");
- }
-
- String fieldDelimiter = this.readerOriginConfig
- .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER);
- // warn: need length 1
- if (null == fieldDelimiter || fieldDelimiter.length() == 0) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 fieldDelimiter");
- }
-
- String encoding = this.readerOriginConfig
- .getString(
- com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
- com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
- try {
- Charsets.toCharset(encoding);
- } catch (UnsupportedCharsetException uce) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.ILLEGAL_VALUE,
- String.format("不支持的编码格式 : [%s]", encoding), uce);
- } catch (Exception e) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.ILLEGAL_VALUE,
- String.format("运行配置异常 : %s", e.getMessage()), e);
- }
-
- // 检测是column 是否为 ["*"] 若是则填为空
- List<Configuration> column = this.readerOriginConfig
- .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
- if (null != column
- && 1 == column.size()
- && ("\"*\"".equals(column.get(0).toString()) || "'*'"
- .equals(column.get(0).toString()))) {
- readerOriginConfig
- .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,
- new ArrayList<String>());
- } else {
- // column: 1. index type 2.value type 3.when type is Data, may
- // have
- // format
- List<Configuration> columns = this.readerOriginConfig
- .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
-
- if (null == columns || columns.size() == 0) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
- "您需要指定 columns");
- }
-
- if (null != columns && columns.size() != 0) {
- for (Configuration eachColumnConf : columns) {
- eachColumnConf
- .getNecessaryValue(
- com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
- S3ReaderErrorCode.REQUIRED_VALUE);
- Integer columnIndex = eachColumnConf
- .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX);
- String columnValue = eachColumnConf
- .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE);
-
- if (null == columnIndex && null == columnValue) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.NO_INDEX_VALUE,
- "由于您配置了type, 则至少需要配置 index 或 value");
- }
-
- if (null != columnIndex && null != columnValue) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.MIXED_INDEX_VALUE,
- "您混合配置了index, value, 每一列同时仅能选择其中一种");
- }
-
- }
- }
- }
-
- // only support compress: gzip,bzip2,zip
- String compress = this.readerOriginConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS);
- if (StringUtils.isBlank(compress)) {
- this.readerOriginConfig.set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS, null);
- } else {
- Set<String> supportedCompress = Sets.newHashSet("gzip", "bzip2", "zip");
- compress = compress.toLowerCase().trim();
- if (!supportedCompress.contains(compress)) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.ILLEGAL_VALUE,
- String.format(
- "仅支持 gzip, bzip2, zip 文件压缩格式 , 不支持您配置的文件压缩格式: [%s]",
- compress));
- }
- this.readerOriginConfig
- .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
- compress);
- }
- }
-
- @Override
- public void prepare() {
- LOG.debug("prepare()");
- }
-
- @Override
- public void post() {
- LOG.debug("post()");
- }
-
- @Override
- public void destroy() {
- LOG.debug("destroy()");
- }
-
- @Override
- public List<Configuration> split(int adviceNumber) {
- LOG.debug("split() begin...");
- List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
-
- // 将每个单独的 object 作为一个 slice
- List<String> objects = parseOriginObjects(readerOriginConfig
- .getList(Constant.OBJECT, String.class));
- if (0 == objects.size()) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.EMPTY_BUCKET_EXCEPTION,
- String.format(
- "未能找到待读取的Object,请确认您的配置项bucket: %s object: %s",
- this.readerOriginConfig.get(Key.BUCKET),
- this.readerOriginConfig.get(Key.OBJECT)));
- }
-
- for (String object : objects) {
- Configuration splitedConfig = this.readerOriginConfig.clone();
- splitedConfig.set(Constant.OBJECT, object);
- readerSplitConfigs.add(splitedConfig);
- LOG.info(String.format("S3 object to be read:%s", object));
- }
- LOG.debug("split() ok and end...");
- return readerSplitConfigs;
- }
-
- private List<String> parseOriginObjects(List<String> originObjects) {
- List<String> parsedObjects = new ArrayList<String>();
-
- for (String object : originObjects) {
- int firstMetaChar = (object.indexOf('*') > object.indexOf('?')) ? object
- .indexOf('*') : object.indexOf('?');
-
- if (firstMetaChar != -1) {
- int lastDirSeparator = object.lastIndexOf(
- IOUtils.DIR_SEPARATOR, firstMetaChar);
- String parentDir = object
- .substring(0, lastDirSeparator + 1);
- List<String> remoteObjects = getRemoteObjects(parentDir);
- Pattern pattern = Pattern.compile(object.replace("*", ".*")
- .replace("?", ".?"));
-
- for (String remoteObject : remoteObjects) {
- if (pattern.matcher(remoteObject).matches()) {
- parsedObjects.add(remoteObject);
- }
- }
- } else {
- parsedObjects.add(object);
- }
- }
- return parsedObjects;
- }
-
- private List<String> getRemoteObjects(String parentDir) {
-
- LOG.debug(String.format("父文件夹 : %s", parentDir));
- List<String> remoteObjects = new ArrayList<String>();
- AmazonS3 client = S3Util.initS3Client(readerOriginConfig);
- try {
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest(
- readerOriginConfig.getString(Key.BUCKET), parentDir, null, null, null
- );
- ObjectListing objectList;
- do {
- objectList = client.listObjects(listObjectsRequest);
- for (S3ObjectSummary objectSummary : objectList
- .getObjectSummaries()) {
- LOG.debug(String.format("找到文件 : %s",
- objectSummary.getKey()));
- remoteObjects.add(objectSummary.getKey());
- }
- listObjectsRequest.setMarker(objectList.getNextMarker());
- LOG.debug(listObjectsRequest.getMarker());
- LOG.debug(String.valueOf(objectList.isTruncated()));
-
- } while (objectList.isTruncated());
- } catch (IllegalArgumentException e) {
- throw DataXException.asDataXException(
- S3ReaderErrorCode.S3_EXCEPTION, e.getMessage());
- }
-
- return remoteObjects;
- }
- }
-
- public static class Task extends Reader.Task {
- private static Logger LOG = LoggerFactory.getLogger(Reader.Task.class);
-
- private Configuration readerSliceConfig;
-
- @Override
- public void startRead(RecordSender recordSender) {
- LOG.debug("read start");
- String bucket = readerSliceConfig.getString(Key.BUCKET);
- String object = readerSliceConfig.getString(Key.OBJECT);
- AmazonS3 client = S3Util.initS3Client(readerSliceConfig);
-
- S3Object s3Object = client.getObject(bucket, object);
- InputStream objectStream = s3Object.getObjectContent();
- UnstructuredStorageReaderUtil.readFromStream(objectStream, object,
- this.readerSliceConfig, recordSender,
- this.getTaskPluginCollector());
- recordSender.flush();
- }
-
- @Override
- public void init() {
- this.readerSliceConfig = this.getPluginJobConf();
- }
-
- @Override
- public void destroy() {
-
- }
- }
- }
S3ReaderErrorCode.java
- package com.alibaba.datax.plugin.reader.s3reader;
-
- import com.alibaba.datax.common.spi.ErrorCode;
-
- public enum S3ReaderErrorCode implements ErrorCode {
- // TODO: 修改错误码类型
- RUNTIME_EXCEPTION("S3Reader-00", "运行时异常"),
- S3_EXCEPTION("OssFileReader-01", "OSS配置异常"),
- CONFIG_INVALID_EXCEPTION("OssFileReader-02", "参数配置错误"),
- NOT_SUPPORT_TYPE("S3Reader-03", "不支持的类型"),
- CAST_VALUE_TYPE_ERROR("OssFileReader-04", "无法完成指定类型的转换"),
- SECURITY_EXCEPTION("S3Reader-05", "缺少权限"),
- ILLEGAL_VALUE("S3Reader-06", "值错误"),
- REQUIRED_VALUE("S3Reader-07", "必选项"),
- NO_INDEX_VALUE("S3Reader-08", "没有 Index"),
- MIXED_INDEX_VALUE("S3Reader-09", "index 和 value 混合"),
- EMPTY_BUCKET_EXCEPTION("S3Reader-10", "您尝试读取的Bucket为空");
-
- private final String code;
- private final String description;
-
- private S3ReaderErrorCode(String code, String description) {
- this.code = code;
- this.description = description;
- }
-
- @Override
- public String getCode() {
- return this.code;
- }
-
- @Override
- public String getDescription() {
- return this.description;
- }
-
- @Override
- public String toString() {
- return String.format("Code:[%s], Description:[%s].", this.code,
- this.description);
- }
- }
S3Util.java
- package com.alibaba.datax.plugin.reader.s3reader;
-
- import com.alibaba.datax.common.util.Configuration;
- import com.amazonaws.ClientConfiguration;
- import com.amazonaws.auth.AWSCredentials;
- import com.amazonaws.auth.AWSCredentialsProvider;
- import com.amazonaws.auth.AWSStaticCredentialsProvider;
- import com.amazonaws.auth.BasicAWSCredentials;
- import com.amazonaws.client.builder.AwsClientBuilder;
- import com.amazonaws.services.s3.AmazonS3;
- import com.amazonaws.services.s3.AmazonS3ClientBuilder;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @Description: obs 工具类
- * @Author: chenweifeng
- * @Date: 2022年11月18日 下午1:56
- **/
- public class S3Util {
- public static void main(String[] args) {
- Map<String, Object> map = new HashMap<>();
- map.put(Key.ENDPOINT, "http://x.x.x.x:9383");
- map.put(Key.ACCESSKEY, "admin");
- map.put(Key.SECRETKEY, "123456");
- Configuration conf = Configuration.from(map);
- initS3Client(conf);
- }
-
- /**
- * 初始化S3Client
- *
- * @param conf
- * @return
- */
- public static AmazonS3 initS3Client(Configuration conf) {
- String endpoint = conf.getString(Key.ENDPOINT);
- String accessKey = conf.getString(Key.ACCESSKEY);
- String secretKey = conf.getString(Key.SECRETKEY);
-
- AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard();
-
- ClientConfiguration clientConfiguration = new ClientConfiguration();
- clientConfiguration.setProtocol(com.amazonaws.Protocol.HTTPS);
- clientConfiguration.setConnectionTimeout(Constant.CONNECT_TIMEOUT);
- clientConfiguration.setSignerOverride(Constant.SIGNER_OVERRIDE);
- amazonS3ClientBuilder.setClientConfiguration(clientConfiguration);
-
- AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
- AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
- amazonS3ClientBuilder.setCredentials(awsCredentialsProvider);
-
- AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, null);
- amazonS3ClientBuilder.setEndpointConfiguration(endpointConfiguration);
-
- AmazonS3 amazonS3 = amazonS3ClientBuilder.build();
- return amazonS3;
- }
- }
plugin.json
- {
- "name": "s3reader",
- "class": "com.alibaba.datax.plugin.reader.s3reader.S3Reader",
- "description": "",
- "developer": "alibaba"
- }

package.xml
- <assembly
- xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <formats>
- <format>dir</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
- <fileSets>
- <fileSet>
- <directory>src/main/resources</directory>
- <includes>
- <include>plugin.json</include>
- <include>plugin_job_template.json</include>
- </includes>
- <outputDirectory>plugin/writer/s3writer</outputDirectory>
- </fileSet>
- <fileSet>
- <directory>target/</directory>
- <includes>
- <include>s3writer-0.0.1-SNAPSHOT.jar</include>
- </includes>
- <outputDirectory>plugin/writer/s3writer</outputDirectory>
- </fileSet>
- </fileSets>
-
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>false</useProjectArtifact>
- <outputDirectory>plugin/writer/s3writer/libs</outputDirectory>
- <scope>runtime</scope>
- </dependencySet>
- </dependencySets>
- </assembly>
Constant.java
- package com.alibaba.datax.plugin.writer.s3writer;
-
-
- public class Constant {
- public static final String OBJECT = "object";
- public static final int CONNECT_TIMEOUT = 50000;
- public static final String SIGNER_OVERRIDE = "S3SignerType";
- }
Key.java
- package com.alibaba.datax.plugin.writer.s3writer;
-
- public class Key {
- public static final String ENDPOINT = "endpoint";
-
- public static final String ACCESSKEY = "accessKey";
-
- public static final String SECRETKEY = "secretKey";
-
- public static final String BUCKET = "bucket";
-
- public static final String OBJECT = "object";
-
- }
S3Util.java 如上
S3Writer.java
- package com.alibaba.datax.plugin.writer.s3writer;
-
- import com.alibaba.datax.common.element.Record;
- import com.alibaba.datax.common.exception.DataXException;
- import com.alibaba.datax.common.plugin.RecordReceiver;
- import com.alibaba.datax.common.spi.Writer;
- import com.alibaba.datax.common.util.Configuration;
- import com.alibaba.datax.common.util.RetryUtil;
- import com.alibaba.datax.plugin.unstructuredstorage.writer.TextCsvWriterManager;
- import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredStorageWriterUtil;
- import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredWriter;
- import com.amazonaws.services.s3.AmazonS3;
- import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
- import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
- import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
- import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
- import com.amazonaws.services.s3.model.ObjectListing;
- import com.amazonaws.services.s3.model.PartETag;
- import com.amazonaws.services.s3.model.S3ObjectSummary;
- import com.amazonaws.services.s3.model.UploadPartRequest;
- import com.amazonaws.services.s3.model.UploadPartResult;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.io.ByteArrayInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.StringWriter;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Set;
- import java.util.UUID;
- import java.util.concurrent.Callable;
-
-
- public class S3Writer extends Writer {
- public static class Job extends Writer.Job {
- private static final Logger LOG = LoggerFactory.getLogger(Job.class);
-
- private Configuration writerSliceConfig = null;
- private AmazonS3 s3client = null;
-
- @Override
- public void init() {
- this.writerSliceConfig = this.getPluginJobConf();
- this.validateParameter();
- this.s3client = S3Util.initS3Client(this.writerSliceConfig);
- }
-
- private void validateParameter() {
- this.writerSliceConfig.getNecessaryValue(Key.ENDPOINT,
- S3WriterErrorCode.REQUIRED_VALUE);
- this.writerSliceConfig.getNecessaryValue(Key.SECRETKEY,
- S3WriterErrorCode.REQUIRED_VALUE);
- this.writerSliceConfig.getNecessaryValue(Key.ACCESSKEY,
- S3WriterErrorCode.REQUIRED_VALUE);
- this.writerSliceConfig.getNecessaryValue(Key.BUCKET,
- S3WriterErrorCode.REQUIRED_VALUE);
- this.writerSliceConfig.getNecessaryValue(Key.OBJECT,
- S3WriterErrorCode.REQUIRED_VALUE);
-
- // warn: do not support compress!!
- String compress = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.COMPRESS);
- if (StringUtils.isNotBlank(compress)) {
- String errorMessage = String.format("S3写暂时不支持压缩, 该压缩配置项[%s]不起效用", compress);
- LOG.error(errorMessage);
- throw DataXException.asDataXException(S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
- }
-
- UnstructuredStorageWriterUtil.validateParameter(this.writerSliceConfig);
- }
-
- @Override
- public void prepare() {
- LOG.info("begin do prepare...");
- String bucket = this.writerSliceConfig.getString(Key.BUCKET);
- String object = this.writerSliceConfig.getString(Key.OBJECT);
- String writeMode = this.writerSliceConfig
- .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
- // warn: bucket is not exists, create it
- try {
- // warn: do not create bucket for user
- if (!this.s3client.doesBucketExist(bucket)) {
- // this.s3client.createBucket(bucket);
- String errorMessage = String.format(
- "您配置的bucket [%s] 不存在, 请您确认您的配置项.", bucket);
- LOG.error(errorMessage);
- throw DataXException.asDataXException(
- S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
- }
- LOG.info(String.format("access control details [%s].",
- this.s3client.getBucketAcl(bucket).toString()));
-
- // truncate option handler
- if ("truncate".equals(writeMode)) {
- LOG.info(String
- .format("由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的Object",
- bucket, object));
- // warn: 默认情况下,如果Bucket中的Object数量大于100,则只会返回100个Object
- while (true) {
- ObjectListing listing = null;
- LOG.info("list objects with listObject(bucket, object)");
- listing = this.s3client.listObjects(bucket, object);
- List<S3ObjectSummary> objectSummarys = listing.getObjectSummaries();
- for (S3ObjectSummary objectSummary : objectSummarys) {
- LOG.info(String.format("delete oss object [%s].", objectSummary.getKey()));
- this.s3client.deleteObject(bucket, objectSummary.getKey());
- }
- if (objectSummarys.isEmpty()) {
- break;
- }
- }
- } else if ("append".equals(writeMode)) {
- LOG.info(String
- .format("由于您配置了writeMode append, 写入前不做清理工作, 数据写入Bucket [%s] 下, 写入相应Object的前缀为 [%s]",
- bucket, object));
- } else if ("nonConflict".equals(writeMode)) {
- LOG.info(String
- .format("由于您配置了writeMode nonConflict, 开始检查Bucket [%s] 下面以 [%s] 命名开头的Object",
- bucket, object));
- ObjectListing listing = this.s3client.listObjects(bucket,
- object);
- if (0 < listing.getObjectSummaries().size()) {
- StringBuilder objectKeys = new StringBuilder();
- objectKeys.append("[ ");
- for (S3ObjectSummary ossObjectSummary : listing.getObjectSummaries()) {
- objectKeys.append(ossObjectSummary.getKey() + " ,");
- }
- objectKeys.append(" ]");
- LOG.info(String.format("object with prefix [%s] details: %s", object, objectKeys.toString()));
- throw DataXException.asDataXException(
- S3WriterErrorCode.ILLEGAL_VALUE,
- String.format("您配置的Bucket: [%s] 下面存在其Object有前缀 [%s].", bucket, object));
- }
- }
- } catch (Exception e) {
- throw DataXException.asDataXException(
- S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
- }
- }
-
- @Override
- public void post() {
-
- }
-
- @Override
- public void destroy() {
-
- }
-
- @Override
- public List<Configuration> split(int mandatoryNumber) {
- LOG.info("begin do split...");
- List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
- String object = this.writerSliceConfig.getString(Key.OBJECT);
- String bucket = this.writerSliceConfig.getString(Key.BUCKET);
-
- Set<String> allObjects = new HashSet<String>();
- try {
- List<S3ObjectSummary> ossObjectlisting = this.s3client.listObjects(bucket).getObjectSummaries();
- for (S3ObjectSummary objectSummary : ossObjectlisting) {
- allObjects.add(objectSummary.getKey());
- }
- } catch (Exception e) {
- throw DataXException.asDataXException(
- S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
- }
-
- String objectSuffix;
- for (int i = 0; i < mandatoryNumber; i++) {
- // handle same object name
- Configuration splitedTaskConfig = this.writerSliceConfig
- .clone();
-
- String fullObjectName = null;
- objectSuffix = StringUtils.replace(
- UUID.randomUUID().toString(), "-", "");
- fullObjectName = String.format("%s__%s", object, objectSuffix);
- while (allObjects.contains(fullObjectName)) {
- objectSuffix = StringUtils.replace(UUID.randomUUID()
- .toString(), "-", "");
- fullObjectName = String.format("%s__%s", object,
- objectSuffix);
- }
- allObjects.add(fullObjectName);
-
- splitedTaskConfig.set(Key.OBJECT, fullObjectName);
-
- LOG.info(String.format("splited write object name:[%s]",
- fullObjectName));
-
- writerSplitConfigs.add(splitedTaskConfig);
- }
- LOG.info("end do split.");
- return writerSplitConfigs;
- }
- }
-
- public static class Task extends Writer.Task {
- private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-
- private AmazonS3 s3client;
- private Configuration writerSliceConfig;
- private String bucket;
- private String object;
- private String nullFormat;
- private String encoding;
- private char fieldDelimiter;
- private String dateFormat;
- private DateFormat dateParse;
- private String fileFormat;
- private List<String> header;
- private Long maxFileSize;// MB
- private String suffix;
-
- @Override
- public void init() {
- this.writerSliceConfig = this.getPluginJobConf();
- this.s3client = S3Util.initS3Client(this.writerSliceConfig);
- this.bucket = this.writerSliceConfig.getString(Key.BUCKET);
- this.object = this.writerSliceConfig.getString(Key.OBJECT);
- this.nullFormat = this.writerSliceConfig
- .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.NULL_FORMAT);
- this.dateFormat = this.writerSliceConfig
- .getString(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT,
- null);
- if (StringUtils.isNotBlank(this.dateFormat)) {
- this.dateParse = new SimpleDateFormat(dateFormat);
- }
- this.encoding = this.writerSliceConfig
- .getString(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.ENCODING,
- com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_ENCODING);
- this.fieldDelimiter = this.writerSliceConfig
- .getChar(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FIELD_DELIMITER,
- com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_FIELD_DELIMITER);
- this.fileFormat = this.writerSliceConfig
- .getString(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_FORMAT,
- com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.FILE_FORMAT_TEXT);
- this.header = this.writerSliceConfig
- .getList(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.HEADER,
- null, String.class);
- this.maxFileSize = this.writerSliceConfig
- .getLong(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.MAX_FILE_SIZE,
- com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.MAX_FILE_SIZE);
- this.suffix = this.writerSliceConfig
- .getString(
- com.alibaba.datax.plugin.unstructuredstorage.writer.Key.SUFFIX,
- com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_SUFFIX);
- this.suffix = this.suffix.trim();// warn: need trim
- }
-
- @Override
- public void startWrite(RecordReceiver lineReceiver) {
- // 设置每块字符串长度
- final long partSize = 1024 * 1024 * 10L;
- long numberCacul = (this.maxFileSize * 1024 * 1024L) / partSize;
- final long maxPartNumber = numberCacul >= 1 ? numberCacul : 1;
- int objectRollingNumber = 0;
- //warn: may be StringBuffer->StringBuilder
- StringWriter sw = new StringWriter();
- StringBuffer sb = sw.getBuffer();
- UnstructuredWriter unstructuredWriter = TextCsvWriterManager
- .produceUnstructuredWriter(this.fileFormat,
- this.fieldDelimiter, sw);
- Record record = null;
-
- LOG.info(String.format(
- "begin do write, each object maxFileSize: [%s]MB...",
- maxPartNumber * 10));
- String currentObject = this.object;
- InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest = null;
- InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
- boolean gotData = false;
- List<PartETag> currentPartETags = null;
- // to do:
- // 可以根据currentPartNumber做分块级别的重试,InitiateMultipartUploadRequest多次一个currentPartNumber会覆盖原有
- int currentPartNumber = 1;
- try {
- // warn
- boolean needInitMultipartTransform = true;
- while ((record = lineReceiver.getFromReader()) != null) {
- gotData = true;
- // init:begin new multipart upload
- if (needInitMultipartTransform) {
- if (objectRollingNumber == 0) {
- if (StringUtils.isBlank(this.suffix)) {
- currentObject = this.object;
- } else {
- currentObject = String.format("%s%s",
- this.object, this.suffix);
- }
- } else {
- // currentObject is like(no suffix)
- // myfile__9b886b70fbef11e59a3600163e00068c_1
- if (StringUtils.isBlank(this.suffix)) {
- currentObject = String.format("%s_%s",
- this.object, objectRollingNumber);
- } else {
- // or with suffix
- // myfile__9b886b70fbef11e59a3600163e00068c_1.csv
- currentObject = String.format("%s_%s%s",
- this.object, objectRollingNumber,
- this.suffix);
- }
- }
- objectRollingNumber++;
- currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
- this.bucket, currentObject);
- currentInitiateMultipartUploadResult = this.s3client
- .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
- currentPartETags = new ArrayList<PartETag>();
- LOG.info(String
- .format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
- this.bucket, currentObject,
- currentInitiateMultipartUploadResult
- .getUploadId()));
-
- // each object's header
- if (null != this.header && !this.header.isEmpty()) {
- unstructuredWriter.writeOneRecord(this.header);
- }
- // warn
- needInitMultipartTransform = false;
- currentPartNumber = 1;
- }
- // write: upload data to current object
- UnstructuredStorageWriterUtil.transportOneRecord(record,
- this.nullFormat, this.dateParse,
- this.getTaskPluginCollector(), unstructuredWriter);
- if (sb.length() >= partSize) {
- this.uploadOnePart(sw, currentPartNumber,
- currentInitiateMultipartUploadResult,
- currentPartETags, currentObject);
- currentPartNumber++;
- sb.setLength(0);
- }
- // save: end current multipart upload
- if (currentPartNumber > maxPartNumber) {
- LOG.info(String
- .format("current object [%s] size > %s, complete current multipart upload and begin new one",
- currentObject, currentPartNumber
- * partSize));
- CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
- this.bucket, currentObject,
- currentInitiateMultipartUploadResult
- .getUploadId(), currentPartETags);
- CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.s3client
- .completeMultipartUpload(currentCompleteMultipartUploadRequest);
- LOG.info(String.format(
- "final object [%s] etag is:[%s]",
- currentObject,
- currentCompleteMultipartUploadResult.getETag()));
- // warn
- needInitMultipartTransform = true;
- }
- }
- if (!gotData) {
- LOG.info("Receive no data from the source.");
- currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
- this.bucket, currentObject);
- currentInitiateMultipartUploadResult = this.s3client
- .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
- currentPartETags = new ArrayList
(); - // each object's header
- if (null != this.header && !this.header.isEmpty()) {
- unstructuredWriter.writeOneRecord(this.header);
- }
- }
- // warn: may be some data stall in sb
- if (0 < sb.length()) {
- this.uploadOnePart(sw, currentPartNumber,
- currentInitiateMultipartUploadResult,
- currentPartETags, currentObject);
- }
- CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
- this.bucket, currentObject,
- currentInitiateMultipartUploadResult.getUploadId(),
- currentPartETags);
- CompleteMultipartUploadResult completeMultipartUploadResult = this.s3client
- .completeMultipartUpload(completeMultipartUploadRequest);
- LOG.info(String.format("final object etag is:[%s]",
- completeMultipartUploadResult.getETag()));
- } catch (IOException e) {
- // 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
- // 都是字符串不认为有脏数据
- throw DataXException.asDataXException(
- S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
- } catch (Exception e) {
- throw DataXException.asDataXException(
- S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
- }
- LOG.info("end do write");
- }
-
- /**
- * 对于同一个UploadID,该号码不但唯一标识这一块数据,也标识了这块数据在整个文件内的相对位置。
- * 如果你用同一个part号码,上传了新的数据,那么OSS上已有的这个号码的Part数据将被覆盖。
- *
- * @throws Exception
- */
- private void uploadOnePart(
- final StringWriter sw,
- final int partNumber,
- final InitiateMultipartUploadResult initiateMultipartUploadResult,
- final List<PartETag> partETags, final String currentObject)
- throws Exception {
- final String encoding = this.encoding;
- final String bucket = this.bucket;
- final AmazonS3 s3client = this.s3client;
- RetryUtil.executeWithRetry(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- byte[] byteArray = sw.toString().getBytes(encoding);
- InputStream inputStream = new ByteArrayInputStream(
- byteArray);
- // 创建UploadPartRequest,上传分块
- UploadPartRequest uploadPartRequest = new UploadPartRequest();
- uploadPartRequest.setBucketName(bucket);
- uploadPartRequest.setKey(currentObject);
- uploadPartRequest.setUploadId(initiateMultipartUploadResult
- .getUploadId());
- uploadPartRequest.setInputStream(inputStream);
- uploadPartRequest.setPartSize(byteArray.length);
- uploadPartRequest.setPartNumber(partNumber);
- UploadPartResult uploadPartResult = s3client
- .uploadPart(uploadPartRequest);
- partETags.add(uploadPartResult.getPartETag());
- LOG.info(String
- .format("upload part [%s] size [%s] Byte has been completed.",
- partNumber, byteArray.length));
- IOUtils.closeQuietly(inputStream);
- return true;
- }
- }, 3, 1000L, false);
- }
-
- @Override
- public void prepare() {
-
- }
-
- @Override
- public void post() {
-
- }
-
- @Override
- public void destroy() {
-
- }
- }
- }
S3WriterErrorCode.java
- package com.alibaba.datax.plugin.writer.s3writer;
-
- import com.alibaba.datax.common.spi.ErrorCode;
-
- public enum S3WriterErrorCode implements ErrorCode {
-
- CONFIG_INVALID_EXCEPTION("S3Writer-00", "您的参数配置错误."),
- REQUIRED_VALUE("S3Writer-01", "您缺失了必须填写的参数值."),
- ILLEGAL_VALUE("S3Writer-02", "您填写的参数值不合法."),
- Write_OBJECT_ERROR("S3Writer-03", "您配置的目标Object在写入时异常."),
- S3_COMM_ERROR("S3Writer-05", "执行相应的S3操作异常."),
- ;
-
- private final String code;
- private final String description;
-
- private S3WriterErrorCode(String code, String description) {
- this.code = code;
- this.description = description;
- }
-
- @Override
- public String getCode() {
- return this.code;
- }
-
- @Override
- public String getDescription() {
- return this.description;
- }
-
- @Override
- public String toString() {
- return String.format("Code:[%s], Description:[%s].", this.code,
- this.description);
- }
-
- }
plugin.json
- {
- "name": "s3writer",
- "class": "com.alibaba.datax.plugin.writer.s3writer.S3Writer",
- "description": "",
- "developer": "chenweifeng"
- }
-
略
