• DataX二次开发——(6)kafkareader、kafkawriter的开发


    参考博客:

    Datax 二次开发插件详细过程_键盘上的艺术家w的博客-CSDN博客_datax kafkareader

    简书-DataX kafkawriter

     背景

    基于阿里开源DataX3.0版本,开发kafka的读写驱动,可以实现从mysql、postgresql抽取数据到kafka,从kafka 消费消息写入hdfs等功能。

    1、整体模块代码结构

    1、kafkareader

    2、kafkawriter 

    3、package.xml

    1. <fileSet>
    2. <directory>kafkareader/target/datax/</directory>
    3. <includes>
    4. <include>**/*.*</include>
    5. </includes>
    6. <outputDirectory>datax</outputDirectory>
    7. </fileSet>
    8. <fileSet>
    9. <directory>kafkawriter/target/datax/</directory>
    10. <includes>
    11. <include>**/*.*</include>
    12. </includes>
    13. <outputDirectory>datax</outputDirectory>
    14. </fileSet>

    4、pom.xml

    1. <module>kafkareader</module>
    2. <module>kafkawriter</module>

    2、kafkareader模块代码

    package.xml

    1. <assembly
    2. xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    5. <id></id>
    6. <formats>
    7. <format>dir</format>
    8. </formats>
    9. <includeBaseDirectory>false</includeBaseDirectory>
    10. <fileSets>
    11. <fileSet>
    12. <directory>src/main/resources</directory>
    13. <includes>
    14. <include>plugin.json</include>
    15. </includes>
    16. <outputDirectory>plugin/reader/kafkareader</outputDirectory>
    17. </fileSet>
    18. <fileSet>
    19. <directory>target/</directory>
    20. <includes>
    21. <include>kafkareader-0.0.1-SNAPSHOT.jar</include>
    22. </includes>
    23. <outputDirectory>plugin/reader/kafkareader</outputDirectory>
    24. </fileSet>
    25. </fileSets>
    26. <dependencySets>
    27. <dependencySet>
    28. <useProjectArtifact>false</useProjectArtifact>
    29. <outputDirectory>plugin/reader/kafkareader/libs</outputDirectory>
    30. <scope>runtime</scope>
    31. </dependencySet>
    32. </dependencySets>
    33. </assembly>

    DateUtil.java 

    1. package com.alibaba.datax.plugin.reader.kafkareader;
    2. import java.text.SimpleDateFormat;
    3. import java.util.Date;
    4. /**
    5. * @Description: TODO
    6. * @Author: chenweifeng
    7. * @Date: 2022年09月05日 下午1:43
    8. **/
    9. public class DateUtil {
    10. /**
    11. * 时间转 yyyy-MM-dd HH:mm:ss 字符串
    12. *
    13. * @param date
    14. * @return
    15. */
    16. public static String targetFormat(Date date) {
    17. String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
    18. return dateString;
    19. }
    20. /**
    21. * 时间转字符串
    22. *
    23. * @param date
    24. * @param pattern
    25. * @return
    26. */
    27. public static String targetFormat(Date date, String pattern) {
    28. String dateString = new SimpleDateFormat(pattern).format(date);
    29. return dateString;
    30. }
    31. }

     JsonUtilJava.java

    1. package com.alibaba.datax.plugin.reader.kafkareader;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.alibaba.fastjson.TypeReference;
    4. import java.util.HashMap;
    5. /**
    6. * @Description: TODO
    7. * @Author: chenweifeng
    8. * @Date: 2022年09月05日 下午1:47
    9. **/
    10. public class JsonUtilJava {
    11. /**
    12. * json转hashmap
    13. *
    14. * @param jsonData
    15. * @return
    16. */
    17. public static HashMap<String, Object> parseJsonStrToMap(String jsonData) {
    18. HashMap<String, Object> hashMap = JSONObject.parseObject(jsonData, new TypeReference<HashMap<String, Object>>() {
    19. });
    20. return hashMap;
    21. }
    22. }

    KafkaReader.java

    1. package com.alibaba.datax.plugin.reader.kafkareader;
    2. import com.alibaba.datax.common.element.Record;
    3. import com.alibaba.datax.common.element.StringColumn;
    4. import com.alibaba.datax.common.exception.DataXException;
    5. import com.alibaba.datax.common.plugin.RecordSender;
    6. import com.alibaba.datax.common.spi.Reader;
    7. import com.alibaba.datax.common.util.Configuration;
    8. import org.apache.kafka.clients.consumer.ConsumerConfig;
    9. import org.apache.kafka.clients.consumer.ConsumerRecord;
    10. import org.apache.kafka.clients.consumer.ConsumerRecords;
    11. import org.apache.kafka.clients.consumer.KafkaConsumer;
    12. import org.slf4j.Logger;
    13. import org.slf4j.LoggerFactory;
    14. import java.io.FileNotFoundException;
    15. import java.io.FileOutputStream;
    16. import java.io.IOException;
    17. import java.time.Duration;
    18. import java.util.*;
    19. import java.util.regex.Matcher;
    20. import java.util.regex.Pattern;
    21. public class KafkaReader extends Reader {
    22. public static class Job extends Reader.Job {
    23. private static final Logger LOG = LoggerFactory.getLogger(Job.class);
    24. private Configuration originalConfig = null;
    25. @Override
    26. public void init() {
    27. this.originalConfig = super.getPluginJobConf();
    28. // warn: 忽略大小写
    29. String topic = this.originalConfig.getString(Key.TOPIC);
    30. Integer partitions = this.originalConfig.getInt(Key.KAFKA_PARTITIONS);
    31. String bootstrapServers = this.originalConfig.getString(Key.BOOTSTRAP_SERVERS);
    32. String groupId = this.originalConfig.getString(Key.GROUP_ID);
    33. Integer columnCount = this.originalConfig.getInt(Key.COLUMNCOUNT);
    34. String split = this.originalConfig.getString(Key.SPLIT);
    35. String filterContaintsStr = this.originalConfig.getString(Key.CONTAINTS_STR);
    36. String filterContaintsFlag = this.originalConfig.getString(Key.CONTAINTS_STR_FLAG);
    37. String conditionAllOrOne = this.originalConfig.getString(Key.CONDITION_ALL_OR_ONE);
    38. String parsingRules = this.originalConfig.getString(Key.PARSING_RULES);
    39. String writerOrder = this.originalConfig.getString(Key.WRITER_ORDER);
    40. String kafkaReaderColumnKey = this.originalConfig.getString(Key.KAFKA_READER_COLUMN_KEY);
    41. LOG.info("topic:{},partitions:{},bootstrapServers:{},groupId:{},columnCount:{},split:{},parsingRules:{}",
    42. topic, partitions, bootstrapServers, groupId, columnCount, split, parsingRules);
    43. if (null == topic) {
    44. throw DataXException.asDataXException(KafkaReaderErrorCode.TOPIC_ERROR,
    45. "没有设置参数[topic].");
    46. }
    47. if (partitions == null) {
    48. throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
    49. "没有设置参数[kafka.partitions].");
    50. } else if (partitions < 1) {
    51. throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
    52. "[kafka.partitions]不能小于1.");
    53. }
    54. if (null == bootstrapServers) {
    55. throw DataXException.asDataXException(KafkaReaderErrorCode.ADDRESS_ERROR,
    56. "没有设置参数[bootstrap.servers].");
    57. }
    58. if (null == groupId) {
    59. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    60. "没有设置参数[groupid].");
    61. }
    62. if (columnCount == null) {
    63. throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
    64. "没有设置参数[columnCount].");
    65. } else if (columnCount < 1) {
    66. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    67. "[columnCount]不能小于1.");
    68. }
    69. if (null == split) {
    70. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    71. "[split]不能为空.");
    72. }
    73. if (filterContaintsStr != null) {
    74. if (conditionAllOrOne == null || filterContaintsFlag == null) {
    75. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    76. "设置了[filterContaintsStr],但是没有设置[conditionAllOrOne]或者[filterContaintsFlag]");
    77. }
    78. }
    79. if (parsingRules == null) {
    80. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    81. "没有设置[parsingRules]参数");
    82. } else if (!parsingRules.equals("regex") && parsingRules.equals("json") && parsingRules.equals("split")) {
    83. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    84. "[parsingRules]参数设置错误,不是regex,json,split其中一个");
    85. }
    86. if (writerOrder == null) {
    87. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    88. "没有设置[writerOrder]参数");
    89. }
    90. if (kafkaReaderColumnKey == null) {
    91. throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
    92. "没有设置[kafkaReaderColumnKey]参数");
    93. }
    94. }
    95. @Override
    96. public void preCheck() {
    97. init();
    98. }
    99. @Override
    100. public List<Configuration> split(int adviceNumber) {
    101. List<Configuration> configurations = new ArrayList<Configuration>();
    102. Integer partitions = this.originalConfig.getInt(Key.KAFKA_PARTITIONS);
    103. for (int i = 0; i < partitions; i++) {
    104. configurations.add(this.originalConfig.clone());
    105. }
    106. return configurations;
    107. }
    108. @Override
    109. public void post() {
    110. }
    111. @Override
    112. public void destroy() {
    113. }
    114. }
    115. public static class Task extends Reader.Task {
    116. private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    117. //配置文件
    118. private Configuration readerSliceConfig;
    119. //kafka消息的分隔符
    120. private String split;
    121. //解析规则
    122. private String parsingRules;
    123. //是否停止拉去数据
    124. private boolean flag;
    125. //kafka address
    126. private String bootstrapServers;
    127. //kafka groupid
    128. private String groupId;
    129. //kafkatopic
    130. private String kafkaTopic;
    131. //kafka中的数据一共有多少个字段
    132. private int count;
    133. //是否需要data_from
    134. //kafka ip 端口+ topic
    135. //将包含/不包含该字符串的数据过滤掉
    136. private String filterContaintsStr;
    137. //是包含containtsStr 还是不包含
    138. //1 表示包含 0 表示不包含
    139. private int filterContaintsStrFlag;
    140. //全部包含或不包含,包含其中一个或者不包含其中一个。
    141. private int conditionAllOrOne;
    142. //writer端要求的顺序。
    143. private String writerOrder;
    144. //kafkareader端的每个关键子的key
    145. private String kafkaReaderColumnKey;
    146. //异常文件路径
    147. private String exceptionPath;
    148. //
    149. private Boolean enableAutoCommit;
    150. //
    151. private Integer maxPollRecords;
    152. @Override
    153. public void init() {
    154. flag = true;
    155. this.readerSliceConfig = super.getPluginJobConf();
    156. split = this.readerSliceConfig.getString(Key.SPLIT);
    157. bootstrapServers = this.readerSliceConfig.getString(Key.BOOTSTRAP_SERVERS);
    158. groupId = this.readerSliceConfig.getString(Key.GROUP_ID);
    159. kafkaTopic = this.readerSliceConfig.getString(Key.TOPIC);
    160. count = this.readerSliceConfig.getInt(Key.COLUMNCOUNT);
    161. filterContaintsStr = this.readerSliceConfig.getString(Key.CONTAINTS_STR);
    162. filterContaintsStrFlag = this.readerSliceConfig.getInt(Key.CONTAINTS_STR_FLAG);
    163. conditionAllOrOne = this.readerSliceConfig.getInt(Key.CONTAINTS_STR_FLAG);
    164. parsingRules = this.readerSliceConfig.getString(Key.PARSING_RULES);
    165. writerOrder = this.readerSliceConfig.getString(Key.WRITER_ORDER);
    166. kafkaReaderColumnKey = this.readerSliceConfig.getString(Key.KAFKA_READER_COLUMN_KEY);
    167. exceptionPath = this.readerSliceConfig.getString(Key.EXECPTION_PATH);
    168. enableAutoCommit = Objects.isNull(this.readerSliceConfig.getBool(Key.ENABLE_AUTO_COMMIT)) ? true : this.readerSliceConfig.getBool(Key.EXECPTION_PATH);
    169. maxPollRecords = Objects.isNull(this.readerSliceConfig.getInt(Key.MAX_POLL_RECORDS)) ? 200 : this.readerSliceConfig.getInt(Key.MAX_POLL_RECORDS);
    170. LOG.info(filterContaintsStr);
    171. }
    172. @Override
    173. public void startRead(RecordSender recordSender) {
    174. Properties props = new Properties();
    175. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    176. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId != null ? groupId : UUID.randomUUID().toString());
    177. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    178. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    179. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    180. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    181. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    182. consumer.subscribe(Collections.singletonList(kafkaTopic));
    183. Record oneRecord = null;
    184. int commitSyncMaxNum = maxPollRecords;
    185. int commitSyncNum = 0;
    186. while (flag) {
    187. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    188. for (ConsumerRecord<String, String> record : records) {
    189. String value = record.value();
    190. //定义过滤标志
    191. int ifNotContinue = filterMessage(value);
    192. //如果标志修改为1了那么就过滤掉这条数据。
    193. if (ifNotContinue == 1) {
    194. LOG.info("过滤数据: " + record.value());
    195. continue;
    196. }
    197. oneRecord = buildOneRecord(recordSender, value);
    198. //如果返回值不等于null表示不是异常消息。
    199. if (oneRecord != null) {
    200. LOG.info("oneRecord:{}", oneRecord.toString());
    201. recordSender.sendToWriter(oneRecord);
    202. }
    203. }
    204. recordSender.flush();
    205. if (!enableAutoCommit) {
    206. commitSyncNum++;
    207. if (commitSyncNum >= commitSyncMaxNum) {
    208. consumer.commitSync();
    209. commitSyncNum = 0;
    210. }
    211. }
    212. //判断当前事件是不是0点,0点的话进程他退出
    213. Date date = new Date();
    214. if (DateUtil.targetFormat(date).split(" ")[1].substring(0, 2).equals("00")) {
    215. destroy();
    216. }
    217. }
    218. }
    219. private int filterMessage(String value) {
    220. //如果要过滤的条件配置了
    221. int ifNotContinue = 0;
    222. if (filterContaintsStr != null) {
    223. String[] filterStrs = filterContaintsStr.split(",");
    224. //所有
    225. if (conditionAllOrOne == 1) {
    226. //过滤掉包含filterContaintsStr的所有项的值。
    227. if (filterContaintsStrFlag == 1) {
    228. int i = 0;
    229. for (; i < filterStrs.length; i++) {
    230. if (!value.contains(filterStrs[i])) break;
    231. }
    232. if (i >= filterStrs.length) ifNotContinue = 1;
    233. } else {
    234. //留下掉包含filterContaintsStr的所有项的值
    235. int i = 0;
    236. for (; i < filterStrs.length; i++) {
    237. if (!value.contains(filterStrs[i])) break;
    238. }
    239. if (i < filterStrs.length) ifNotContinue = 1;
    240. }
    241. } else {
    242. //过滤掉包含其中一项的值
    243. if (filterContaintsStrFlag == 1) {
    244. int i = 0;
    245. for (; i < filterStrs.length; i++) {
    246. if (value.contains(filterStrs[i])) break;
    247. }
    248. if (i < filterStrs.length) ifNotContinue = 1;
    249. }
    250. //留下包含其中一下的值
    251. else {
    252. int i = 0;
    253. for (; i < filterStrs.length; i++) {
    254. if (value.contains(filterStrs[i])) break;
    255. }
    256. if (i >= filterStrs.length) ifNotContinue = 1;
    257. }
    258. }
    259. }
    260. return ifNotContinue;
    261. }
    262. private Record buildOneRecord(RecordSender recordSender, String value) {
    263. Record record = null;
    264. if (parsingRules.equals("regex")) {
    265. record = parseRegex(value, recordSender);
    266. } else if (parsingRules.equals("json")) {
    267. record = parseJson(value, recordSender);
    268. } else if (parsingRules.equals("split")) {
    269. record = parseSplit(value, recordSender);
    270. }
    271. LOG.info("record:{}", record.toString());
    272. return record;
    273. }
    274. private Record parseSplit(String value, RecordSender recordSender) {
    275. Record record = recordSender.createRecord();
    276. String[] splits = value.split(this.split);
    277. if (splits.length != count) {
    278. writerErrorPath(value);
    279. return null;
    280. }
    281. return parseOrders(Arrays.asList(splits), record);
    282. }
    283. private Record parseJson(String value, RecordSender recordSender) {
    284. LOG.info("parseJson value :{}", value);
    285. Record record = recordSender.createRecord();
    286. HashMap<String, Object> map = JsonUtilJava.parseJsonStrToMap(value);
    287. LOG.info("map :{}", map);
    288. List<Map<String, Object>> mapData = (List>) map.get("data");
    289. LOG.info("mapData :{}", mapData);
    290. LOG.info("parseJson kafkaReaderColumnKey :{}", kafkaReaderColumnKey);
    291. String[] columns = kafkaReaderColumnKey.split(",");
    292. if (mapData.size() != columns.length) {
    293. throw new RuntimeException("kafka字段数和columns的字段数不一致,无法映射数据");
    294. }
    295. ArrayList<String> datas = new ArrayList<>();
    296. for (int i = 0; i < columns.length; i++) {
    297. datas.add(String.valueOf(mapData.get(i).get("rawData")));
    298. }
    299. // for (String column : columns) {
    300. // datas.add(map.get(column).toString());
    301. // }
    302. if (datas.size() != count) {
    303. writerErrorPath(value);
    304. return null;
    305. }
    306. LOG.info("datas:{}", datas);
    307. return parseOrders(datas, record);
    308. }
    309. private Record parseRegex(String value, RecordSender recordSender) {
    310. Record record = recordSender.createRecord();
    311. ArrayList<String> datas = new ArrayList<String>();
    312. Pattern r = Pattern.compile(split);
    313. Matcher m = r.matcher(value);
    314. if (m.find()) {
    315. if (m.groupCount() != count) {
    316. writerErrorPath(value);
    317. }
    318. for (int i = 1; i <= count; i++) {
    319. // record.addColumn(new StringColumn(m.group(i)));
    320. datas.add(m.group(i));
    321. return record;
    322. }
    323. } else {
    324. writerErrorPath(value);
    325. }
    326. return parseOrders(datas, record);
    327. }
    328. private void writerErrorPath(String value) {
    329. if (exceptionPath == null) return;
    330. FileOutputStream fileOutputStream = null;
    331. try {
    332. fileOutputStream = getFileOutputStream();
    333. fileOutputStream.write((value + "\n").getBytes());
    334. fileOutputStream.close();
    335. } catch (FileNotFoundException e) {
    336. e.printStackTrace();
    337. } catch (IOException e) {
    338. e.printStackTrace();
    339. }
    340. }
    341. private FileOutputStream getFileOutputStream() throws FileNotFoundException {
    342. return new FileOutputStream(exceptionPath + "/" + kafkaTopic + "errordata" + DateUtil.targetFormat(new Date(), "yyyyMMdd"), true);
    343. }
    344. private Record parseOrders(List<String> datas, Record record) {
    345. //writerOrder
    346. String[] orders = writerOrder.split(",");
    347. LOG.info("writerOrder:{}", writerOrder);
    348. for (String order : orders) {
    349. if (order.equals("data_from")) {
    350. record.addColumn(new StringColumn(bootstrapServers + "|" + kafkaTopic));
    351. } else if (order.equals("uuid")) {
    352. record.addColumn(new StringColumn(UUID.randomUUID().toString()));
    353. } else if (order.equals("null")) {
    354. record.addColumn(new StringColumn("null"));
    355. } else if (order.equals("datax_time")) {
    356. record.addColumn(new StringColumn(DateUtil.targetFormat(new Date())));
    357. } else if (isNumeric(order)) {
    358. record.addColumn(new StringColumn(datas.get(new Integer(order) - 1)));
    359. }
    360. }
    361. return record;
    362. }
    363. public static boolean isNumeric(String str) {
    364. for (int i = 0; i < str.length(); i++) {
    365. if (!Character.isDigit(str.charAt(i))) {
    366. return false;
    367. }
    368. }
    369. return true;
    370. }
    371. @Override
    372. public void post() {
    373. }
    374. @Override
    375. public void destroy() {
    376. flag = false;
    377. }
    378. }
    379. }

     KafkaReaderErrorCode.java

    1. package com.alibaba.datax.plugin.reader.kafkareader;
    2. import com.alibaba.datax.common.spi.ErrorCode;
    3. /**
    4. * @Description: TODO
    5. * @Author: chenweifeng
    6. * @Date: 2022年09月05日 上午11:14
    7. **/
    8. public enum KafkaReaderErrorCode implements ErrorCode {
    9. TOPIC_ERROR("KafkaReader-00", "您没有设置topic参数"),
    10. PARTITION_ERROR("KafkaReader-01", "您没有设置kafkaPartitions参数"),
    11. ADDRESS_ERROR("KafkaReader-02", "您没有设置bootstrapServers参数"),
    12. KAFKA_READER_ERROR("KafkaReader-03", "参数错误"),
    13. ;
    14. private final String code;
    15. private final String description;
    16. private KafkaReaderErrorCode(String code, String description) {
    17. this.code = code;
    18. this.description = description;
    19. }
    20. @Override
    21. public String getCode() {
    22. return this.code;
    23. }
    24. @Override
    25. public String getDescription() {
    26. return this.description;
    27. }
    28. @Override
    29. public String toString() {
    30. return String.format("Code:[%s], Description:[%s].", this.code,
    31. this.description);
    32. }
    33. }

    Key.java

    1. package com.alibaba.datax.plugin.reader.kafkareader;
    2. /**
    3. * @Description: TODO
    4. * @Author: chenweifeng
    5. * @Date: 2022年09月05日 上午11:08
    6. **/
    7. public class Key {
    8. public final static String TOPIC = "topic";// 主题
    9. public final static String KAFKA_PARTITIONS = "kafkaPartitions";// 分区数
    10. public final static String BOOTSTRAP_SERVERS = "bootstrapServers";//
    11. public final static String GROUP_ID = "groupId";// 消费组
    12. public final static String COLUMNCOUNT = "columnCount";// 字段数
    13. public final static String SPLIT = "split";// 分割符
    14. public final static String CONTAINTS_STR = "filterContaints";// 过滤的字符串 英文逗号隔开
    15. public final static String CONTAINTS_STR_FLAG = "filterContaintsFlag";// 1 表示包含 0 表示不包含
    16. public final static String CONDITION_ALL_OR_ONE = "conditionAllOrOne";// 1 全部包含或不包含,0 包含其中一个或者不包含其中一个
    17. public final static String PARSING_RULES = "parsingRules";//
    18. public final static String WRITER_ORDER = "writerOrder";//
    19. public final static String KAFKA_READER_COLUMN_KEY = "kafkaReaderColumnKey";//
    20. public final static String EXECPTION_PATH = "execptionPath";//
    21. public final static String ENABLE_AUTO_COMMIT = "enableAutoCommit";// 是否自动提交
    22. public final static String MAX_POLL_RECORDS = "maxPollRecords";// 最大Poll数
    23. }

    plugin.json 

    1. {
    2. "name": "kafkareader",
    3. "class": "com.alibaba.datax.plugin.reader.kafkareader.KafkaReader",
    4. "description": "kafka reader 插件",
    5. "developer": "chenweifeng"
    6. }

    plugin_job_template.json 

    1. {
    2. "name": "kafkareader",
    3. "parameter": {
    4. "topic": "Event",
    5. "bootstrapServers": "192.168.7.128:9092",
    6. "kafkaPartitions": "1",
    7. "columnCount": 11,
    8. "groupId": "ast",
    9. "filterContaints": "5^1,6^5",
    10. "filterContaintsFlag": 1,
    11. "conditionAllOrOne": 0,
    12. "parsingRules": "regex",
    13. "writerOrder": "uuid,1,3,6,4,8,9,10,11,5,7,2,null,datax_time,data_from",
    14. "kafkaReaderColumnKey": "a",
    15. "execptionPath": "/opt/module/datax/log/errorlog"
    16. }
    17. }

    3、kafkawriter模块代码

    package.xml

    1. <assembly
    2. xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    5. <id></id>
    6. <formats>
    7. <format>dir</format>
    8. </formats>
    9. <includeBaseDirectory>false</includeBaseDirectory>
    10. <fileSets>
    11. <fileSet>
    12. <directory>src/main/resources</directory>
    13. <includes>
    14. <include>plugin.json</include>
    15. </includes>
    16. <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
    17. </fileSet>
    18. <fileSet>
    19. <directory>target/</directory>
    20. <includes>
    21. <include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
    22. </includes>
    23. <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
    24. </fileSet>
    25. </fileSets>
    26. <dependencySets>
    27. <dependencySet>
    28. <useProjectArtifact>false</useProjectArtifact>
    29. <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
    30. <scope>runtime</scope>
    31. </dependencySet>
    32. </dependencySets>
    33. </assembly>

    KafkaWriter.java

    1. package com.alibaba.datax.plugin.writer.kafkawriter;
    2. import com.alibaba.datax.common.element.Column;
    3. import com.alibaba.datax.common.element.Record;
    4. import com.alibaba.datax.common.exception.DataXException;
    5. import com.alibaba.datax.common.plugin.RecordReceiver;
    6. import com.alibaba.datax.common.spi.Writer;
    7. import com.alibaba.datax.common.util.Configuration;
    8. import org.apache.kafka.clients.admin.AdminClient;
    9. import org.apache.kafka.clients.admin.ListTopicsResult;
    10. import org.apache.kafka.clients.admin.NewTopic;
    11. import org.apache.kafka.clients.producer.KafkaProducer;
    12. import org.apache.kafka.clients.producer.Producer;
    13. import org.apache.kafka.clients.producer.ProducerRecord;
    14. import org.slf4j.Logger;
    15. import org.slf4j.LoggerFactory;
    16. import java.util.ArrayList;
    17. import java.util.List;
    18. import java.util.Properties;
    19. /**
    20. * @ClassName: KafkaWriter
    21. * @Author: majun
    22. * @CreateDate: 2019/2/20 11:06
    23. * @Version: 1.0
    24. * @Description: datax kafkawriter
    25. */
    26. public class KafkaWriter extends Writer {
    27. public static class Job extends Writer.Job {
    28. private static final Logger logger = LoggerFactory.getLogger(Job.class);
    29. private Configuration conf = null;
    30. @Override
    31. public List<Configuration> split(int mandatoryNumber) {
    32. List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
    33. for (int i = 0; i < mandatoryNumber; i++) {
    34. configurations.add(conf);
    35. }
    36. return configurations;
    37. }
    38. private void validateParameter() {
    39. this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
    40. this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    41. }
    42. @Override
    43. public void init() {
    44. this.conf = super.getPluginJobConf();
    45. logger.info("kafka writer params:{}", conf.toJSON());
    46. this.validateParameter();
    47. }
    48. @Override
    49. public void destroy() {
    50. }
    51. }
    52. public static class Task extends Writer.Task {
    53. private static final Logger logger = LoggerFactory.getLogger(Task.class);
    54. private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
    55. private Producer<String, String> producer;
    56. private String fieldDelimiter;
    57. private Configuration conf;
    58. private Properties props;
    59. @Override
    60. public void init() {
    61. this.conf = super.getPluginJobConf();
    62. fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
    63. props = new Properties();
    64. props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
    65. props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
    66. props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "0", null));
    67. // Controls how much bytes sender would wait to batch up before publishing to Kafka.
    68. //控制发送者在发布到kafka之前等待批处理的字节数。
    69. //控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
    70. //默认16384 16kb
    71. props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
    72. props.put("linger.ms", 1);
    73. props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    74. props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
    75. producer = new KafkaProducer<String, String>(props);
    76. }
    77. @Override
    78. public void prepare() {
    79. if (Boolean.valueOf(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {
    80. ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
    81. String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
    82. try {
    83. if (!topicsResult.names().get().contains(topic)) {
    84. new NewTopic(
    85. topic,
    86. Integer.valueOf(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
    87. Short.valueOf(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
    88. );
    89. List<NewTopic> newTopics = new ArrayList<NewTopic>();
    90. AdminClient.create(props).createTopics(newTopics);
    91. }
    92. } catch (Exception e) {
    93. throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
    94. }
    95. }
    96. }
    97. @Override
    98. public void startWrite(RecordReceiver lineReceiver) {
    99. logger.info("start to writer kafka");
    100. Record record = null;
    101. while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
    102. //获取一行数据,按照指定分隔符 拼成字符串 发送出去
    103. if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null).toLowerCase()
    104. .equals(WriteType.TEXT.name().toLowerCase())) {
    105. producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
    106. recordToString(record),
    107. recordToString(record))
    108. );
    109. } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null).toLowerCase()
    110. .equals(WriteType.JSON.name().toLowerCase())) {
    111. producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
    112. recordToString(record),
    113. record.toString())
    114. );
    115. }
    116. logger.info("complete write " + record.toString());
    117. producer.flush();
    118. }
    119. }
    120. @Override
    121. public void destroy() {
    122. if (producer != null) {
    123. producer.close();
    124. }
    125. }
    126. /**
    127. * 数据格式化
    128. *
    129. * @param record
    130. * @return
    131. */
    132. private String recordToString(Record record) {
    133. int recordLength = record.getColumnNumber();
    134. if (0 == recordLength) {
    135. return NEWLINE_FLAG;
    136. }
    137. Column column;
    138. StringBuilder sb = new StringBuilder();
    139. for (int i = 0; i < recordLength; i++) {
    140. column = record.getColumn(i);
    141. sb.append(column.asString()).append(fieldDelimiter);
    142. }
    143. sb.setLength(sb.length() - 1);
    144. sb.append(NEWLINE_FLAG);
    145. logger.info("recordToString:{}",sb.toString());
    146. return sb.toString();
    147. }
    148. }
    149. }

    KafkaWriterErrorCode.java

    1. package com.alibaba.datax.plugin.writer.kafkawriter;
    2. import com.alibaba.datax.common.spi.ErrorCode;
    3. public enum KafkaWriterErrorCode implements ErrorCode {
    4. REQUIRED_VALUE("KafkaWriter-00", "您缺失了必须填写的参数值."),
    5. CREATE_TOPIC("KafkaWriter-01", "写入数据前检查topic或是创建topic失败.");
    6. private final String code;
    7. private final String description;
    8. private KafkaWriterErrorCode(String code, String description) {
    9. this.code = code;
    10. this.description = description;
    11. }
    12. @Override
    13. public String getCode() {
    14. return this.code;
    15. }
    16. @Override
    17. public String getDescription() {
    18. return this.description;
    19. }
    20. @Override
    21. public String toString() {
    22. return String.format("Code:[%s], Description:[%s].", this.code,
    23. this.description);
    24. }
    25. }

    Key.java

    1. package com.alibaba.datax.plugin.writer.kafkawriter;
    2. /**
    3. * @ClassName: Key
    4. * @Author: majun
    5. * @CreateDate: 2019/2/20 11:17
    6. * @Version: 1.0
    7. * @Description: TODO
    8. */
    9. public class Key {
    10. //
    11. // bootstrapServers": "",
    12. // "topic": "",
    13. // "ack": "all",
    14. // "batchSize": 1000,
    15. // "retries": 0,
    16. // "keySerializer":"org.apache.kafka.common.serialization.StringSerializer",
    17. // "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
    18. // "fieldFelimiter": ","
    19. public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
    20. // must have
    21. public static final String TOPIC = "topic";
    22. public static final String ACK = "ack";
    23. public static final String BATCH_SIZE = "batchSize";
    24. public static final String RETRIES = "retries";
    25. public static final String KEYSERIALIZER = "keySerializer";
    26. public static final String VALUESERIALIZER = "valueSerializer";
    27. // not must , not default
    28. public static final String FIELD_DELIMITER = "fieldDelimiter";
    29. public static final String NO_TOPIC_CREATE = "noTopicCreate";
    30. public static final String TOPIC_NUM_PARTITION = "topicNumPartition";
    31. public static final String TOPIC_REPLICATION_FACTOR = "topicReplicationFactor";
    32. public static final String WRITE_TYPE = "writeType";
    33. }

    WriterType.java

    1. package com.alibaba.datax.plugin.writer.kafkawriter;
    2. public enum WriteType {
    3. JSON("json"),
    4. TEXT("text");
    5. private String name;
    6. WriteType(String name) {
    7. this.name = name;
    8. }
    9. }

    plugin.json 

    1. {
    2. "name": "kafkawriter",
    3. "class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",
    4. "description": "kafka writer 插件",
    5. "developer": "chenweifeng"
    6. }

    plugin_job_template.json  

    1. {
    2. "name": "kafkawriter",
    3. "parameter": {
    4. "bootstrapServers": "10.1.20.150:9092",
    5. "topic": "test-topic",
    6. "ack": "all",
    7. "batchSize": 1000,
    8. "retries": 0,
    9. "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
    10. "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
    11. "fieldFelimiter": ",",
    12. "writeType": "json",
    13. "topicNumPartition": 1,
    14. "topicReplicationFactor": 1
    15. }
    16. }

    4、由kafka写到kafka的例子

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. "channel": 12
    6. },
    7. "errorLimit": {
    8. "record": 0,
    9. "percentage": 0.02
    10. }
    11. },
    12. "content": [
    13. {
    14. "reader": {
    15. "name": "kafkareader",
    16. "parameter": {
    17. "topic": "test_datax_kafka_read",
    18. "bootstrapServers": "10.254.21.6:59292,10.254.21.1:59292,10.254.21.2:59292",
    19. "kafkaPartitions": 12,
    20. "columnCount": 34,
    21. "groupId": "datax_kafka_kafka",
    22. "filterContaints": "5^1,6^5",
    23. "filterContaintsFlag": 1,
    24. "conditionAllOrOne": 0,
    25. "parsingRules": "json",
    26. "writerOrder": "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34",
    27. "kafkaReaderColumnKey": "id,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,c27,c28,c29,c30,c31,c32,crt_time",
    28. "execptionPath": "/Users/chenweifeng/datax/log/errorlog",
    29. "split":"\t"
    30. }
    31. },
    32. "writer": {
    33. "name": "kafkawriter",
    34. "parameter": {
    35. "print": true,
    36. "topic": "test_datax_kafka_write",
    37. "bootstrapServers": "10.254.21.6:59292,10.254.21.1:59292,10.254.21.2:59292",
    38. "fieldDelimiter": "\t",
    39. "batchSize": 20,
    40. "writeType": "json",
    41. "notTopicCreate": false,
    42. "topicNumPartition": 12,
    43. "topicReplicationFactor": 3
    44. }
    45. }
    46. }
    47. ]
    48. }
    49. }

    注:目前kafkareader和kafkawriter处于可行状态,但是我本人做了性能测试,目前存在性能问题,做了一些参数的调整,仍然存在,以后会尝试进行优化,如有朋友已经对其进行优化的,可以在评论留下你建议,谢谢。

    性能问题的分析结果:DataX本身的架构说分批次进行读取,然后将数据经过Channel队列进行传输,和Kafka本身流式的数据是不兼容的,所以个人的建设是不太建议在DataX做kafka的数据同步,这个如果要解决这个问题就要破坏本身DataX的批式同步架构,所以后面本人放弃了DataX的Kafka数据源支持,改成在Flink实现Kafka的同步。

  • 相关阅读:
    Windows子系统WSL2 (ubuntu安装 docker、nvidia-docker)
    Scrum 敏捷管理流程图及敏捷管理工具
    GPT的历史
    Docker常用命令Top20
    vue中使用MINIO将文件上传到指定的bucket库中(vue2和vue3)
    【无标题】
    YUV和RGB的相互转换实验
    springboot+特色农产品电商平台 毕业设计-附源码211515
    【数据加密、解密】前后端数据传输的过程中,如何进行数据加密传输,保证数据的传输安全,防止被他人窃取
    基于篇章结构的英文作文自动评分方法(学习笔记)
  • 原文地址:https://blog.csdn.net/Carson073/article/details/126728411