• DataX二次开发——(9)新增s3reader和s3writer模块


    1 背景

    DataX3.0支持阿里的OSS的读写,但没支持S3的读写,虽然OSS的也是基于S3协议去做二开的,但是一些参数有点区别,所以参考阿里的OSSReader和OSSWriter的设计开发了S3Reader和S3Writer。

    2 代码开发

    2.1 s3reader

    2.1.1 项目结构

     2.1.2 代码如下

    package.xml

    1. <assembly
    2. xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    5. <formats>
    6. <format>dir</format>
    7. </formats>
    8. <includeBaseDirectory>false</includeBaseDirectory>
    9. <fileSets>
    10. <fileSet>
    11. <directory>src/main/resources</directory>
    12. <includes>
    13. <include>plugin.json</include>
    14. <include>plugin_job_template.json</include>
    15. </includes>
    16. <outputDirectory>plugin/reader/s3reader</outputDirectory>
    17. </fileSet>
    18. <fileSet>
    19. <directory>target/</directory>
    20. <includes>
    21. <include>s3reader-0.0.1-SNAPSHOT.jar</include>
    22. </includes>
    23. <outputDirectory>plugin/reader/s3reader</outputDirectory>
    24. </fileSet>
    25. </fileSets>
    26. <dependencySets>
    27. <dependencySet>
    28. <useProjectArtifact>false</useProjectArtifact>
    29. <outputDirectory>plugin/reader/s3reader/libs</outputDirectory>
    30. <scope>runtime</scope>
    31. </dependencySet>
    32. </dependencySets>
    33. </assembly>

    Constant.java

    1. package com.alibaba.datax.plugin.reader.s3reader;
    2. public class Constant {
    3. public static final String OBJECT = "object";
    4. public static final int CONNECT_TIMEOUT = 50000;
    5. public static final String SIGNER_OVERRIDE = "S3SignerType";
    6. }

     Key.java

    1. public class Key {
    2. public static final String ENDPOINT = "endpoint";
    3. public static final String ACCESSKEY = "accessKey";
    4. public static final String SECRETKEY = "secretKey";
    5. public static final String BUCKET = "bucket";
    6. public static final String OBJECT = "object";
    7. }

    S3Reader.java

    1. package com.alibaba.datax.plugin.reader.s3reader;
    2. import com.alibaba.datax.common.exception.DataXException;
    3. import com.alibaba.datax.common.plugin.RecordSender;
    4. import com.alibaba.datax.common.spi.Reader;
    5. import com.alibaba.datax.common.util.Configuration;
    6. import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
    7. import com.amazonaws.services.s3.AmazonS3;
    8. import com.amazonaws.services.s3.model.ListObjectsRequest;
    9. import com.amazonaws.services.s3.model.ObjectListing;
    10. import com.amazonaws.services.s3.model.S3Object;
    11. import com.amazonaws.services.s3.model.S3ObjectSummary;
    12. import com.google.common.collect.Sets;
    13. import org.apache.commons.io.Charsets;
    14. import org.apache.commons.io.IOUtils;
    15. import org.apache.commons.lang3.StringUtils;
    16. import org.slf4j.Logger;
    17. import org.slf4j.LoggerFactory;
    18. import java.io.InputStream;
    19. import java.nio.charset.UnsupportedCharsetException;
    20. import java.util.ArrayList;
    21. import java.util.List;
    22. import java.util.Set;
    23. import java.util.regex.Pattern;
    24. public class S3Reader extends Reader {
    25. public static class Job extends Reader.Job {
    26. private static final Logger LOG = LoggerFactory.getLogger(Job.class);
    27. private Configuration readerOriginConfig = null;
    28. @Override
    29. public void init() {
    30. LOG.debug("init() begin...");
    31. this.readerOriginConfig = this.getPluginJobConf();
    32. this.validate();
    33. LOG.debug("init() ok and end...");
    34. }
    35. private void validate() {
    36. String endpoint = this.readerOriginConfig.getString(Key.ENDPOINT);
    37. if (StringUtils.isBlank(endpoint)) {
    38. throw DataXException.asDataXException(
    39. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    40. "您需要指定 endpoint");
    41. }
    42. String secretKey = this.readerOriginConfig.getString(Key.SECRETKEY);
    43. if (StringUtils.isBlank(secretKey)) {
    44. throw DataXException.asDataXException(
    45. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    46. "您需要指定 secretKey");
    47. }
    48. String accessKey = this.readerOriginConfig.getString(Key.ACCESSKEY);
    49. if (StringUtils.isBlank(accessKey)) {
    50. throw DataXException.asDataXException(
    51. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    52. "您需要指定 accessKey");
    53. }
    54. String bucket = this.readerOriginConfig.getString(Key.BUCKET);
    55. if (StringUtils.isBlank(bucket)) {
    56. throw DataXException.asDataXException(
    57. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    58. "您需要指定 bucket");
    59. }
    60. String object = this.readerOriginConfig.getString(Key.OBJECT);
    61. if (StringUtils.isBlank(object)) {
    62. throw DataXException.asDataXException(
    63. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    64. "您需要指定 object");
    65. }
    66. String fieldDelimiter = this.readerOriginConfig
    67. .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.FIELD_DELIMITER);
    68. // warn: need length 1
    69. if (null == fieldDelimiter || fieldDelimiter.length() == 0) {
    70. throw DataXException.asDataXException(
    71. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    72. "您需要指定 fieldDelimiter");
    73. }
    74. String encoding = this.readerOriginConfig
    75. .getString(
    76. com.alibaba.datax.plugin.unstructuredstorage.reader.Key.ENCODING,
    77. com.alibaba.datax.plugin.unstructuredstorage.reader.Constant.DEFAULT_ENCODING);
    78. try {
    79. Charsets.toCharset(encoding);
    80. } catch (UnsupportedCharsetException uce) {
    81. throw DataXException.asDataXException(
    82. S3ReaderErrorCode.ILLEGAL_VALUE,
    83. String.format("不支持的编码格式 : [%s]", encoding), uce);
    84. } catch (Exception e) {
    85. throw DataXException.asDataXException(
    86. S3ReaderErrorCode.ILLEGAL_VALUE,
    87. String.format("运行配置异常 : %s", e.getMessage()), e);
    88. }
    89. // 检测是column 是否为 ["*"] 若是则填为空
    90. List<Configuration> column = this.readerOriginConfig
    91. .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
    92. if (null != column
    93. && 1 == column.size()
    94. && ("\"*\"".equals(column.get(0).toString()) || "'*'"
    95. .equals(column.get(0).toString()))) {
    96. readerOriginConfig
    97. .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN,
    98. new ArrayList<String>());
    99. } else {
    100. // column: 1. index type 2.value type 3.when type is Data, may
    101. // have
    102. // format
    103. List<Configuration> columns = this.readerOriginConfig
    104. .getListConfiguration(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
    105. if (null == columns || columns.size() == 0) {
    106. throw DataXException.asDataXException(
    107. S3ReaderErrorCode.CONFIG_INVALID_EXCEPTION,
    108. "您需要指定 columns");
    109. }
    110. if (null != columns && columns.size() != 0) {
    111. for (Configuration eachColumnConf : columns) {
    112. eachColumnConf
    113. .getNecessaryValue(
    114. com.alibaba.datax.plugin.unstructuredstorage.reader.Key.TYPE,
    115. S3ReaderErrorCode.REQUIRED_VALUE);
    116. Integer columnIndex = eachColumnConf
    117. .getInt(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.INDEX);
    118. String columnValue = eachColumnConf
    119. .getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.VALUE);
    120. if (null == columnIndex && null == columnValue) {
    121. throw DataXException.asDataXException(
    122. S3ReaderErrorCode.NO_INDEX_VALUE,
    123. "由于您配置了type, 则至少需要配置 index 或 value");
    124. }
    125. if (null != columnIndex && null != columnValue) {
    126. throw DataXException.asDataXException(
    127. S3ReaderErrorCode.MIXED_INDEX_VALUE,
    128. "您混合配置了index, value, 每一列同时仅能选择其中一种");
    129. }
    130. }
    131. }
    132. }
    133. // only support compress: gzip,bzip2,zip
    134. String compress = this.readerOriginConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS);
    135. if (StringUtils.isBlank(compress)) {
    136. this.readerOriginConfig.set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS, null);
    137. } else {
    138. Set<String> supportedCompress = Sets.newHashSet("gzip", "bzip2", "zip");
    139. compress = compress.toLowerCase().trim();
    140. if (!supportedCompress.contains(compress)) {
    141. throw DataXException.asDataXException(
    142. S3ReaderErrorCode.ILLEGAL_VALUE,
    143. String.format(
    144. "仅支持 gzip, bzip2, zip 文件压缩格式 , 不支持您配置的文件压缩格式: [%s]",
    145. compress));
    146. }
    147. this.readerOriginConfig
    148. .set(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COMPRESS,
    149. compress);
    150. }
    151. }
    152. @Override
    153. public void prepare() {
    154. LOG.debug("prepare()");
    155. }
    156. @Override
    157. public void post() {
    158. LOG.debug("post()");
    159. }
    160. @Override
    161. public void destroy() {
    162. LOG.debug("destroy()");
    163. }
    164. @Override
    165. public List<Configuration> split(int adviceNumber) {
    166. LOG.debug("split() begin...");
    167. List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
    168. // 将每个单独的 object 作为一个 slice
    169. List<String> objects = parseOriginObjects(readerOriginConfig
    170. .getList(Constant.OBJECT, String.class));
    171. if (0 == objects.size()) {
    172. throw DataXException.asDataXException(
    173. S3ReaderErrorCode.EMPTY_BUCKET_EXCEPTION,
    174. String.format(
    175. "未能找到待读取的Object,请确认您的配置项bucket: %s object: %s",
    176. this.readerOriginConfig.get(Key.BUCKET),
    177. this.readerOriginConfig.get(Key.OBJECT)));
    178. }
    179. for (String object : objects) {
    180. Configuration splitedConfig = this.readerOriginConfig.clone();
    181. splitedConfig.set(Constant.OBJECT, object);
    182. readerSplitConfigs.add(splitedConfig);
    183. LOG.info(String.format("S3 object to be read:%s", object));
    184. }
    185. LOG.debug("split() ok and end...");
    186. return readerSplitConfigs;
    187. }
    188. private List<String> parseOriginObjects(List<String> originObjects) {
    189. List<String> parsedObjects = new ArrayList<String>();
    190. for (String object : originObjects) {
    191. int firstMetaChar = (object.indexOf('*') > object.indexOf('?')) ? object
    192. .indexOf('*') : object.indexOf('?');
    193. if (firstMetaChar != -1) {
    194. int lastDirSeparator = object.lastIndexOf(
    195. IOUtils.DIR_SEPARATOR, firstMetaChar);
    196. String parentDir = object
    197. .substring(0, lastDirSeparator + 1);
    198. List<String> remoteObjects = getRemoteObjects(parentDir);
    199. Pattern pattern = Pattern.compile(object.replace("*", ".*")
    200. .replace("?", ".?"));
    201. for (String remoteObject : remoteObjects) {
    202. if (pattern.matcher(remoteObject).matches()) {
    203. parsedObjects.add(remoteObject);
    204. }
    205. }
    206. } else {
    207. parsedObjects.add(object);
    208. }
    209. }
    210. return parsedObjects;
    211. }
    212. private List<String> getRemoteObjects(String parentDir) {
    213. LOG.debug(String.format("父文件夹 : %s", parentDir));
    214. List<String> remoteObjects = new ArrayList<String>();
    215. AmazonS3 client = S3Util.initS3Client(readerOriginConfig);
    216. try {
    217. ListObjectsRequest listObjectsRequest = new ListObjectsRequest(
    218. readerOriginConfig.getString(Key.BUCKET), parentDir, null, null, null
    219. );
    220. ObjectListing objectList;
    221. do {
    222. objectList = client.listObjects(listObjectsRequest);
    223. for (S3ObjectSummary objectSummary : objectList
    224. .getObjectSummaries()) {
    225. LOG.debug(String.format("找到文件 : %s",
    226. objectSummary.getKey()));
    227. remoteObjects.add(objectSummary.getKey());
    228. }
    229. listObjectsRequest.setMarker(objectList.getNextMarker());
    230. LOG.debug(listObjectsRequest.getMarker());
    231. LOG.debug(String.valueOf(objectList.isTruncated()));
    232. } while (objectList.isTruncated());
    233. } catch (IllegalArgumentException e) {
    234. throw DataXException.asDataXException(
    235. S3ReaderErrorCode.S3_EXCEPTION, e.getMessage());
    236. }
    237. return remoteObjects;
    238. }
    239. }
    240. public static class Task extends Reader.Task {
    241. private static Logger LOG = LoggerFactory.getLogger(Reader.Task.class);
    242. private Configuration readerSliceConfig;
    243. @Override
    244. public void startRead(RecordSender recordSender) {
    245. LOG.debug("read start");
    246. String bucket = readerSliceConfig.getString(Key.BUCKET);
    247. String object = readerSliceConfig.getString(Key.OBJECT);
    248. AmazonS3 client = S3Util.initS3Client(readerSliceConfig);
    249. S3Object s3Object = client.getObject(bucket, object);
    250. InputStream objectStream = s3Object.getObjectContent();
    251. UnstructuredStorageReaderUtil.readFromStream(objectStream, object,
    252. this.readerSliceConfig, recordSender,
    253. this.getTaskPluginCollector());
    254. recordSender.flush();
    255. }
    256. @Override
    257. public void init() {
    258. this.readerSliceConfig = this.getPluginJobConf();
    259. }
    260. @Override
    261. public void destroy() {
    262. }
    263. }
    264. }

    S3ReaderErrorCode.java

    1. package com.alibaba.datax.plugin.reader.s3reader;
    2. import com.alibaba.datax.common.spi.ErrorCode;
    3. public enum S3ReaderErrorCode implements ErrorCode {
    4. // TODO: 修改错误码类型
    5. RUNTIME_EXCEPTION("S3Reader-00", "运行时异常"),
    6. S3_EXCEPTION("OssFileReader-01", "OSS配置异常"),
    7. CONFIG_INVALID_EXCEPTION("OssFileReader-02", "参数配置错误"),
    8. NOT_SUPPORT_TYPE("S3Reader-03", "不支持的类型"),
    9. CAST_VALUE_TYPE_ERROR("OssFileReader-04", "无法完成指定类型的转换"),
    10. SECURITY_EXCEPTION("S3Reader-05", "缺少权限"),
    11. ILLEGAL_VALUE("S3Reader-06", "值错误"),
    12. REQUIRED_VALUE("S3Reader-07", "必选项"),
    13. NO_INDEX_VALUE("S3Reader-08", "没有 Index"),
    14. MIXED_INDEX_VALUE("S3Reader-09", "index 和 value 混合"),
    15. EMPTY_BUCKET_EXCEPTION("S3Reader-10", "您尝试读取的Bucket为空");
    16. private final String code;
    17. private final String description;
    18. private S3ReaderErrorCode(String code, String description) {
    19. this.code = code;
    20. this.description = description;
    21. }
    22. @Override
    23. public String getCode() {
    24. return this.code;
    25. }
    26. @Override
    27. public String getDescription() {
    28. return this.description;
    29. }
    30. @Override
    31. public String toString() {
    32. return String.format("Code:[%s], Description:[%s].", this.code,
    33. this.description);
    34. }
    35. }

     S3Util.java

    1. package com.alibaba.datax.plugin.reader.s3reader;
    2. import com.alibaba.datax.common.util.Configuration;
    3. import com.amazonaws.ClientConfiguration;
    4. import com.amazonaws.auth.AWSCredentials;
    5. import com.amazonaws.auth.AWSCredentialsProvider;
    6. import com.amazonaws.auth.AWSStaticCredentialsProvider;
    7. import com.amazonaws.auth.BasicAWSCredentials;
    8. import com.amazonaws.client.builder.AwsClientBuilder;
    9. import com.amazonaws.services.s3.AmazonS3;
    10. import com.amazonaws.services.s3.AmazonS3ClientBuilder;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. /**
    14. * @Description: obs 工具类
    15. * @Author: chenweifeng
    16. * @Date: 2022年11月18日 下午1:56
    17. **/
    18. public class S3Util {
    19. public static void main(String[] args) {
    20. Map<String, Object> map = new HashMap<>();
    21. map.put(Key.ENDPOINT, "http://x.x.x.x:9383");
    22. map.put(Key.ACCESSKEY, "admin");
    23. map.put(Key.SECRETKEY, "123456");
    24. Configuration conf = Configuration.from(map);
    25. initS3Client(conf);
    26. }
    27. /**
    28. * 初始化S3Client
    29. *
    30. * @param conf
    31. * @return
    32. */
    33. public static AmazonS3 initS3Client(Configuration conf) {
    34. String endpoint = conf.getString(Key.ENDPOINT);
    35. String accessKey = conf.getString(Key.ACCESSKEY);
    36. String secretKey = conf.getString(Key.SECRETKEY);
    37. AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3ClientBuilder.standard();
    38. ClientConfiguration clientConfiguration = new ClientConfiguration();
    39. clientConfiguration.setProtocol(com.amazonaws.Protocol.HTTPS);
    40. clientConfiguration.setConnectionTimeout(Constant.CONNECT_TIMEOUT);
    41. clientConfiguration.setSignerOverride(Constant.SIGNER_OVERRIDE);
    42. amazonS3ClientBuilder.setClientConfiguration(clientConfiguration);
    43. AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    44. AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
    45. amazonS3ClientBuilder.setCredentials(awsCredentialsProvider);
    46. AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, null);
    47. amazonS3ClientBuilder.setEndpointConfiguration(endpointConfiguration);
    48. AmazonS3 amazonS3 = amazonS3ClientBuilder.build();
    49. return amazonS3;
    50. }
    51. }

    plugin.json

    1. {
    2. "name": "s3reader",
    3. "class": "com.alibaba.datax.plugin.reader.s3reader.S3Reader",
    4. "description": "",
    5. "developer": "alibaba"
    6. }

    2.2 s3writer

    2.2.1 项目结构

     2.2.2 代码开发

    package.xml

    1. <assembly
    2. xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    5. <formats>
    6. <format>dir</format>
    7. </formats>
    8. <includeBaseDirectory>false</includeBaseDirectory>
    9. <fileSets>
    10. <fileSet>
    11. <directory>src/main/resources</directory>
    12. <includes>
    13. <include>plugin.json</include>
    14. <include>plugin_job_template.json</include>
    15. </includes>
    16. <outputDirectory>plugin/writer/s3writer</outputDirectory>
    17. </fileSet>
    18. <fileSet>
    19. <directory>target/</directory>
    20. <includes>
    21. <include>s3writer-0.0.1-SNAPSHOT.jar</include>
    22. </includes>
    23. <outputDirectory>plugin/writer/s3writer</outputDirectory>
    24. </fileSet>
    25. </fileSets>
    26. <dependencySets>
    27. <dependencySet>
    28. <useProjectArtifact>false</useProjectArtifact>
    29. <outputDirectory>plugin/writer/s3writer/libs</outputDirectory>
    30. <scope>runtime</scope>
    31. </dependencySet>
    32. </dependencySets>
    33. </assembly>

    Constant.java

    1. package com.alibaba.datax.plugin.writer.s3writer;
    2. public class Constant {
    3. public static final String OBJECT = "object";
    4. public static final int CONNECT_TIMEOUT = 50000;
    5. public static final String SIGNER_OVERRIDE = "S3SignerType";
    6. }

    Key.java

    1. package com.alibaba.datax.plugin.writer.s3writer;
    2. public class Key {
    3. public static final String ENDPOINT = "endpoint";
    4. public static final String ACCESSKEY = "accessKey";
    5. public static final String SECRETKEY = "secretKey";
    6. public static final String BUCKET = "bucket";
    7. public static final String OBJECT = "object";
    8. }

    S3Util.java 如上

    S3Writer.java

    1. package com.alibaba.datax.plugin.writer.s3writer;
    2. import com.alibaba.datax.common.element.Record;
    3. import com.alibaba.datax.common.exception.DataXException;
    4. import com.alibaba.datax.common.plugin.RecordReceiver;
    5. import com.alibaba.datax.common.spi.Writer;
    6. import com.alibaba.datax.common.util.Configuration;
    7. import com.alibaba.datax.common.util.RetryUtil;
    8. import com.alibaba.datax.plugin.unstructuredstorage.writer.TextCsvWriterManager;
    9. import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredStorageWriterUtil;
    10. import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredWriter;
    11. import com.amazonaws.services.s3.AmazonS3;
    12. import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
    13. import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    14. import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
    15. import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
    16. import com.amazonaws.services.s3.model.ObjectListing;
    17. import com.amazonaws.services.s3.model.PartETag;
    18. import com.amazonaws.services.s3.model.S3ObjectSummary;
    19. import com.amazonaws.services.s3.model.UploadPartRequest;
    20. import com.amazonaws.services.s3.model.UploadPartResult;
    21. import org.apache.commons.io.IOUtils;
    22. import org.apache.commons.lang3.StringUtils;
    23. import org.slf4j.Logger;
    24. import org.slf4j.LoggerFactory;
    25. import java.io.ByteArrayInputStream;
    26. import java.io.IOException;
    27. import java.io.InputStream;
    28. import java.io.StringWriter;
    29. import java.text.DateFormat;
    30. import java.text.SimpleDateFormat;
    31. import java.util.ArrayList;
    32. import java.util.HashSet;
    33. import java.util.List;
    34. import java.util.Set;
    35. import java.util.UUID;
    36. import java.util.concurrent.Callable;
    37. public class S3Writer extends Writer {
    38. public static class Job extends Writer.Job {
    39. private static final Logger LOG = LoggerFactory.getLogger(Job.class);
    40. private Configuration writerSliceConfig = null;
    41. private AmazonS3 s3client = null;
    42. @Override
    43. public void init() {
    44. this.writerSliceConfig = this.getPluginJobConf();
    45. this.validateParameter();
    46. this.s3client = S3Util.initS3Client(this.writerSliceConfig);
    47. }
    48. private void validateParameter() {
    49. this.writerSliceConfig.getNecessaryValue(Key.ENDPOINT,
    50. S3WriterErrorCode.REQUIRED_VALUE);
    51. this.writerSliceConfig.getNecessaryValue(Key.SECRETKEY,
    52. S3WriterErrorCode.REQUIRED_VALUE);
    53. this.writerSliceConfig.getNecessaryValue(Key.ACCESSKEY,
    54. S3WriterErrorCode.REQUIRED_VALUE);
    55. this.writerSliceConfig.getNecessaryValue(Key.BUCKET,
    56. S3WriterErrorCode.REQUIRED_VALUE);
    57. this.writerSliceConfig.getNecessaryValue(Key.OBJECT,
    58. S3WriterErrorCode.REQUIRED_VALUE);
    59. // warn: do not support compress!!
    60. String compress = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.COMPRESS);
    61. if (StringUtils.isNotBlank(compress)) {
    62. String errorMessage = String.format("S3写暂时不支持压缩, 该压缩配置项[%s]不起效用", compress);
    63. LOG.error(errorMessage);
    64. throw DataXException.asDataXException(S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
    65. }
    66. UnstructuredStorageWriterUtil.validateParameter(this.writerSliceConfig);
    67. }
    68. @Override
    69. public void prepare() {
    70. LOG.info("begin do prepare...");
    71. String bucket = this.writerSliceConfig.getString(Key.BUCKET);
    72. String object = this.writerSliceConfig.getString(Key.OBJECT);
    73. String writeMode = this.writerSliceConfig
    74. .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE);
    75. // warn: bucket is not exists, create it
    76. try {
    77. // warn: do not create bucket for user
    78. if (!this.s3client.doesBucketExist(bucket)) {
    79. // this.s3client.createBucket(bucket);
    80. String errorMessage = String.format(
    81. "您配置的bucket [%s] 不存在, 请您确认您的配置项.", bucket);
    82. LOG.error(errorMessage);
    83. throw DataXException.asDataXException(
    84. S3WriterErrorCode.ILLEGAL_VALUE, errorMessage);
    85. }
    86. LOG.info(String.format("access control details [%s].",
    87. this.s3client.getBucketAcl(bucket).toString()));
    88. // truncate option handler
    89. if ("truncate".equals(writeMode)) {
    90. LOG.info(String
    91. .format("由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的Object",
    92. bucket, object));
    93. // warn: 默认情况下,如果Bucket中的Object数量大于100,则只会返回100Object
    94. while (true) {
    95. ObjectListing listing = null;
    96. LOG.info("list objects with listObject(bucket, object)");
    97. listing = this.s3client.listObjects(bucket, object);
    98. List<S3ObjectSummary> objectSummarys = listing.getObjectSummaries();
    99. for (S3ObjectSummary objectSummary : objectSummarys) {
    100. LOG.info(String.format("delete oss object [%s].", objectSummary.getKey()));
    101. this.s3client.deleteObject(bucket, objectSummary.getKey());
    102. }
    103. if (objectSummarys.isEmpty()) {
    104. break;
    105. }
    106. }
    107. } else if ("append".equals(writeMode)) {
    108. LOG.info(String
    109. .format("由于您配置了writeMode append, 写入前不做清理工作, 数据写入Bucket [%s] 下, 写入相应Object的前缀为 [%s]",
    110. bucket, object));
    111. } else if ("nonConflict".equals(writeMode)) {
    112. LOG.info(String
    113. .format("由于您配置了writeMode nonConflict, 开始检查Bucket [%s] 下面以 [%s] 命名开头的Object",
    114. bucket, object));
    115. ObjectListing listing = this.s3client.listObjects(bucket,
    116. object);
    117. if (0 < listing.getObjectSummaries().size()) {
    118. StringBuilder objectKeys = new StringBuilder();
    119. objectKeys.append("[ ");
    120. for (S3ObjectSummary ossObjectSummary : listing.getObjectSummaries()) {
    121. objectKeys.append(ossObjectSummary.getKey() + " ,");
    122. }
    123. objectKeys.append(" ]");
    124. LOG.info(String.format("object with prefix [%s] details: %s", object, objectKeys.toString()));
    125. throw DataXException.asDataXException(
    126. S3WriterErrorCode.ILLEGAL_VALUE,
    127. String.format("您配置的Bucket: [%s] 下面存在其Object有前缀 [%s].", bucket, object));
    128. }
    129. }
    130. } catch (Exception e) {
    131. throw DataXException.asDataXException(
    132. S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
    133. }
    134. }
    135. @Override
    136. public void post() {
    137. }
    138. @Override
    139. public void destroy() {
    140. }
    141. @Override
    142. public List<Configuration> split(int mandatoryNumber) {
    143. LOG.info("begin do split...");
    144. List<Configuration> writerSplitConfigs = new ArrayList<Configuration>();
    145. String object = this.writerSliceConfig.getString(Key.OBJECT);
    146. String bucket = this.writerSliceConfig.getString(Key.BUCKET);
    147. Set<String> allObjects = new HashSet<String>();
    148. try {
    149. List<S3ObjectSummary> ossObjectlisting = this.s3client.listObjects(bucket).getObjectSummaries();
    150. for (S3ObjectSummary objectSummary : ossObjectlisting) {
    151. allObjects.add(objectSummary.getKey());
    152. }
    153. } catch (Exception e) {
    154. throw DataXException.asDataXException(
    155. S3WriterErrorCode.S3_COMM_ERROR, e.getMessage());
    156. }
    157. String objectSuffix;
    158. for (int i = 0; i < mandatoryNumber; i++) {
    159. // handle same object name
    160. Configuration splitedTaskConfig = this.writerSliceConfig
    161. .clone();
    162. String fullObjectName = null;
    163. objectSuffix = StringUtils.replace(
    164. UUID.randomUUID().toString(), "-", "");
    165. fullObjectName = String.format("%s__%s", object, objectSuffix);
    166. while (allObjects.contains(fullObjectName)) {
    167. objectSuffix = StringUtils.replace(UUID.randomUUID()
    168. .toString(), "-", "");
    169. fullObjectName = String.format("%s__%s", object,
    170. objectSuffix);
    171. }
    172. allObjects.add(fullObjectName);
    173. splitedTaskConfig.set(Key.OBJECT, fullObjectName);
    174. LOG.info(String.format("splited write object name:[%s]",
    175. fullObjectName));
    176. writerSplitConfigs.add(splitedTaskConfig);
    177. }
    178. LOG.info("end do split.");
    179. return writerSplitConfigs;
    180. }
    181. }
    182. public static class Task extends Writer.Task {
    183. private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    184. private AmazonS3 s3client;
    185. private Configuration writerSliceConfig;
    186. private String bucket;
    187. private String object;
    188. private String nullFormat;
    189. private String encoding;
    190. private char fieldDelimiter;
    191. private String dateFormat;
    192. private DateFormat dateParse;
    193. private String fileFormat;
    194. private List<String> header;
    195. private Long maxFileSize;// MB
    196. private String suffix;
    197. @Override
    198. public void init() {
    199. this.writerSliceConfig = this.getPluginJobConf();
    200. this.s3client = S3Util.initS3Client(this.writerSliceConfig);
    201. this.bucket = this.writerSliceConfig.getString(Key.BUCKET);
    202. this.object = this.writerSliceConfig.getString(Key.OBJECT);
    203. this.nullFormat = this.writerSliceConfig
    204. .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.NULL_FORMAT);
    205. this.dateFormat = this.writerSliceConfig
    206. .getString(
    207. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT,
    208. null);
    209. if (StringUtils.isNotBlank(this.dateFormat)) {
    210. this.dateParse = new SimpleDateFormat(dateFormat);
    211. }
    212. this.encoding = this.writerSliceConfig
    213. .getString(
    214. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.ENCODING,
    215. com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_ENCODING);
    216. this.fieldDelimiter = this.writerSliceConfig
    217. .getChar(
    218. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FIELD_DELIMITER,
    219. com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_FIELD_DELIMITER);
    220. this.fileFormat = this.writerSliceConfig
    221. .getString(
    222. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_FORMAT,
    223. com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.FILE_FORMAT_TEXT);
    224. this.header = this.writerSliceConfig
    225. .getList(
    226. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.HEADER,
    227. null, String.class);
    228. this.maxFileSize = this.writerSliceConfig
    229. .getLong(
    230. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.MAX_FILE_SIZE,
    231. com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.MAX_FILE_SIZE);
    232. this.suffix = this.writerSliceConfig
    233. .getString(
    234. com.alibaba.datax.plugin.unstructuredstorage.writer.Key.SUFFIX,
    235. com.alibaba.datax.plugin.unstructuredstorage.writer.Constant.DEFAULT_SUFFIX);
    236. this.suffix = this.suffix.trim();// warn: need trim
    237. }
    238. @Override
    239. public void startWrite(RecordReceiver lineReceiver) {
    240. // 设置每块字符串长度
    241. final long partSize = 1024 * 1024 * 10L;
    242. long numberCacul = (this.maxFileSize * 1024 * 1024L) / partSize;
    243. final long maxPartNumber = numberCacul >= 1 ? numberCacul : 1;
    244. int objectRollingNumber = 0;
    245. //warn: may be StringBuffer->StringBuilder
    246. StringWriter sw = new StringWriter();
    247. StringBuffer sb = sw.getBuffer();
    248. UnstructuredWriter unstructuredWriter = TextCsvWriterManager
    249. .produceUnstructuredWriter(this.fileFormat,
    250. this.fieldDelimiter, sw);
    251. Record record = null;
    252. LOG.info(String.format(
    253. "begin do write, each object maxFileSize: [%s]MB...",
    254. maxPartNumber * 10));
    255. String currentObject = this.object;
    256. InitiateMultipartUploadRequest currentInitiateMultipartUploadRequest = null;
    257. InitiateMultipartUploadResult currentInitiateMultipartUploadResult = null;
    258. boolean gotData = false;
    259. List<PartETag> currentPartETags = null;
    260. // to do:
    261. // 可以根据currentPartNumber做分块级别的重试,InitiateMultipartUploadRequest多次一个currentPartNumber会覆盖原有
    262. int currentPartNumber = 1;
    263. try {
    264. // warn
    265. boolean needInitMultipartTransform = true;
    266. while ((record = lineReceiver.getFromReader()) != null) {
    267. gotData = true;
    268. // init:begin new multipart upload
    269. if (needInitMultipartTransform) {
    270. if (objectRollingNumber == 0) {
    271. if (StringUtils.isBlank(this.suffix)) {
    272. currentObject = this.object;
    273. } else {
    274. currentObject = String.format("%s%s",
    275. this.object, this.suffix);
    276. }
    277. } else {
    278. // currentObject is like(no suffix)
    279. // myfile__9b886b70fbef11e59a3600163e00068c_1
    280. if (StringUtils.isBlank(this.suffix)) {
    281. currentObject = String.format("%s_%s",
    282. this.object, objectRollingNumber);
    283. } else {
    284. // or with suffix
    285. // myfile__9b886b70fbef11e59a3600163e00068c_1.csv
    286. currentObject = String.format("%s_%s%s",
    287. this.object, objectRollingNumber,
    288. this.suffix);
    289. }
    290. }
    291. objectRollingNumber++;
    292. currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
    293. this.bucket, currentObject);
    294. currentInitiateMultipartUploadResult = this.s3client
    295. .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
    296. currentPartETags = new ArrayList<PartETag>();
    297. LOG.info(String
    298. .format("write to bucket: [%s] object: [%s] with oss uploadId: [%s]",
    299. this.bucket, currentObject,
    300. currentInitiateMultipartUploadResult
    301. .getUploadId()));
    302. // each object's header
    303. if (null != this.header && !this.header.isEmpty()) {
    304. unstructuredWriter.writeOneRecord(this.header);
    305. }
    306. // warn
    307. needInitMultipartTransform = false;
    308. currentPartNumber = 1;
    309. }
    310. // write: upload data to current object
    311. UnstructuredStorageWriterUtil.transportOneRecord(record,
    312. this.nullFormat, this.dateParse,
    313. this.getTaskPluginCollector(), unstructuredWriter);
    314. if (sb.length() >= partSize) {
    315. this.uploadOnePart(sw, currentPartNumber,
    316. currentInitiateMultipartUploadResult,
    317. currentPartETags, currentObject);
    318. currentPartNumber++;
    319. sb.setLength(0);
    320. }
    321. // save: end current multipart upload
    322. if (currentPartNumber > maxPartNumber) {
    323. LOG.info(String
    324. .format("current object [%s] size > %s, complete current multipart upload and begin new one",
    325. currentObject, currentPartNumber
    326. * partSize));
    327. CompleteMultipartUploadRequest currentCompleteMultipartUploadRequest = new CompleteMultipartUploadRequest(
    328. this.bucket, currentObject,
    329. currentInitiateMultipartUploadResult
    330. .getUploadId(), currentPartETags);
    331. CompleteMultipartUploadResult currentCompleteMultipartUploadResult = this.s3client
    332. .completeMultipartUpload(currentCompleteMultipartUploadRequest);
    333. LOG.info(String.format(
    334. "final object [%s] etag is:[%s]",
    335. currentObject,
    336. currentCompleteMultipartUploadResult.getETag()));
    337. // warn
    338. needInitMultipartTransform = true;
    339. }
    340. }
    341. if (!gotData) {
    342. LOG.info("Receive no data from the source.");
    343. currentInitiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
    344. this.bucket, currentObject);
    345. currentInitiateMultipartUploadResult = this.s3client
    346. .initiateMultipartUpload(currentInitiateMultipartUploadRequest);
    347. currentPartETags = new ArrayList();
    348. // each object's header
    349. if (null != this.header && !this.header.isEmpty()) {
    350. unstructuredWriter.writeOneRecord(this.header);
    351. }
    352. }
    353. // warn: may be some data stall in sb
    354. if (0 < sb.length()) {
    355. this.uploadOnePart(sw, currentPartNumber,
    356. currentInitiateMultipartUploadResult,
    357. currentPartETags, currentObject);
    358. }
    359. CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest(
    360. this.bucket, currentObject,
    361. currentInitiateMultipartUploadResult.getUploadId(),
    362. currentPartETags);
    363. CompleteMultipartUploadResult completeMultipartUploadResult = this.s3client
    364. .completeMultipartUpload(completeMultipartUploadRequest);
    365. LOG.info(String.format("final object etag is:[%s]",
    366. completeMultipartUploadResult.getETag()));
    367. } catch (IOException e) {
    368. // 脏数据UnstructuredStorageWriterUtil.transportOneRecord已经记录,header
    369. // 都是字符串不认为有脏数据
    370. throw DataXException.asDataXException(
    371. S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
    372. } catch (Exception e) {
    373. throw DataXException.asDataXException(
    374. S3WriterErrorCode.Write_OBJECT_ERROR, e.getMessage());
    375. }
    376. LOG.info("end do write");
    377. }
    378. /**
    379. * 对于同一个UploadID,该号码不但唯一标识这一块数据,也标识了这块数据在整个文件内的相对位置。
    380. * 如果你用同一个part号码,上传了新的数据,那么OSS上已有的这个号码的Part数据将被覆盖。
    381. *
    382. * @throws Exception
    383. */
    384. private void uploadOnePart(
    385. final StringWriter sw,
    386. final int partNumber,
    387. final InitiateMultipartUploadResult initiateMultipartUploadResult,
    388. final List<PartETag> partETags, final String currentObject)
    389. throws Exception {
    390. final String encoding = this.encoding;
    391. final String bucket = this.bucket;
    392. final AmazonS3 s3client = this.s3client;
    393. RetryUtil.executeWithRetry(new Callable<Boolean>() {
    394. @Override
    395. public Boolean call() throws Exception {
    396. byte[] byteArray = sw.toString().getBytes(encoding);
    397. InputStream inputStream = new ByteArrayInputStream(
    398. byteArray);
    399. // 创建UploadPartRequest,上传分块
    400. UploadPartRequest uploadPartRequest = new UploadPartRequest();
    401. uploadPartRequest.setBucketName(bucket);
    402. uploadPartRequest.setKey(currentObject);
    403. uploadPartRequest.setUploadId(initiateMultipartUploadResult
    404. .getUploadId());
    405. uploadPartRequest.setInputStream(inputStream);
    406. uploadPartRequest.setPartSize(byteArray.length);
    407. uploadPartRequest.setPartNumber(partNumber);
    408. UploadPartResult uploadPartResult = s3client
    409. .uploadPart(uploadPartRequest);
    410. partETags.add(uploadPartResult.getPartETag());
    411. LOG.info(String
    412. .format("upload part [%s] size [%s] Byte has been completed.",
    413. partNumber, byteArray.length));
    414. IOUtils.closeQuietly(inputStream);
    415. return true;
    416. }
    417. }, 3, 1000L, false);
    418. }
    419. @Override
    420. public void prepare() {
    421. }
    422. @Override
    423. public void post() {
    424. }
    425. @Override
    426. public void destroy() {
    427. }
    428. }
    429. }

    S3WriterErrorCode.java

    1. package com.alibaba.datax.plugin.writer.s3writer;
    2. import com.alibaba.datax.common.spi.ErrorCode;
    3. public enum S3WriterErrorCode implements ErrorCode {
    4. CONFIG_INVALID_EXCEPTION("S3Writer-00", "您的参数配置错误."),
    5. REQUIRED_VALUE("S3Writer-01", "您缺失了必须填写的参数值."),
    6. ILLEGAL_VALUE("S3Writer-02", "您填写的参数值不合法."),
    7. Write_OBJECT_ERROR("S3Writer-03", "您配置的目标Object在写入时异常."),
    8. S3_COMM_ERROR("S3Writer-05", "执行相应的S3操作异常."),
    9. ;
    10. private final String code;
    11. private final String description;
    12. private S3WriterErrorCode(String code, String description) {
    13. this.code = code;
    14. this.description = description;
    15. }
    16. @Override
    17. public String getCode() {
    18. return this.code;
    19. }
    20. @Override
    21. public String getDescription() {
    22. return this.description;
    23. }
    24. @Override
    25. public String toString() {
    26. return String.format("Code:[%s], Description:[%s].", this.code,
    27. this.description);
    28. }
    29. }

    plugin.json

    1. {
    2. "name": "s3writer",
    3. "class": "com.alibaba.datax.plugin.writer.s3writer.S3Writer",
    4. "description": "",
    5. "developer": "chenweifeng"
    6. }

    3 调试运行和性能测试

    s3reader 和 s3writer 模板

    s3reader 和 s3writer 运行结果

  • 相关阅读:
    bean的生命周期
    操作系统_多线程笔记(二)
    【问题解决】新版webots纹理等资源文件加载/下载时间过长
    【LeetCode】No.55. Jump Game -- Java Version
    成功项目经理总结的20个项目管理经验
    PMP模拟题 | 每日一练,快速提分
    GPT-4o:人工智能新纪元的突破与展望
    Ubuntu安装和配置ssh
    Java 包及访问控制权限
    c语言进阶部分详解(指针进阶1)
  • 原文地址:https://blog.csdn.net/Carson073/article/details/128001868