• 使用Spark清洗统计业务数据并保存到数据库中


    1、打开前面创建的项目“BigData-Etl-KongGuan”,创建一些数据库访问的工具类和辅助类。

    1)打开SpringBoot项目:BigData-Etl-KongGuan

    2)创建数据库访问的工具类和辅助类:

    com.qrsoft.etl.dao.IBaseDao数据库访问的通用类,包括创建连接、执行更新等通用操作
    com.qrsoft.etl.common.db.ConnectionPoolManager连接管理类
    com.qrsoft.etl.common.db.IConnectionPool连接池管理类接口
    com.qrsoft.etl.common.db.ConnectionPool连接池管理类接口实现类
    com.qrsoft.etl.common.db.DBbean这是外部可以配置的连接池属性 可以允许外部配置,拥有默认值
    com.qrsoft.etl.common.db.DBInitInfo初始化,模拟加载所有的配置文件
    com.qrsoft.etl.common.Constants全局常量类
    com.qrsoft.etl.util.ConfigUtil加载配置文件的工具类
    com.qrsoft.etl.util.ConfigManager配置文件管理的工具类
    com.qrsoft.etl.util.MapManager地图管理的工具类,例如:是否在矩形区域内
    • 创建com.qrsoft.etl.util.ConfigUtil类,该类是一个通用工具类,用于加载myconfig.properties配置文件,并提供了一个根据键来读取配置文件中属性值的方法readProperty(key)。
    1. package com.qrsoft.etl.util;
    2. import org.springframework.core.io.support.PropertiesLoaderUtils;
    3. import java.io.IOException;
    4. import java.util.Properties;
    5. public class ConfigUtil {
    6. public static String readProperty(String key){
    7. Properties properties = new Properties();
    8. try {
    9. properties = PropertiesLoaderUtils.loadAllProperties("myconfig.properties");
    10. } catch (IOException e) {
    11. e.printStackTrace();
    12. }
    13. return properties.get(key).toString();
    14. }
    15. }
    • 创建com.qrsoft.etl.util.ConfigManager类,该类是一个通用工具类,用于配置文件管理,在该类的构造函数中加载config.properties配置文件,并提供了一个根据键来读取配置文件中属性值的方法getValue(key)。
    1. package com.qrsoft.etl.util;
    2. import java.io.IOException;
    3. import java.io.InputStream;
    4. import java.util.Properties;
    5. public class ConfigManager {
    6. private static ConfigManager configManager; // 声明工具类的一个私有的对象
    7. private static Properties properties; //声明对象
    8. private ConfigManager() { //私有无参构造方法
    9. String configFile = "config.properties";
    10. properties = new Properties();
    11. InputStream in = ConfigManager.class.getClassLoader().getResourceAsStream(configFile);
    12. try {
    13. properties.load(in);
    14. in.close();
    15. } catch (IOException e) {
    16. e.printStackTrace();
    17. }
    18. }
    19. public static ConfigManager getInstance() {
    20. if (configManager == null) {
    21. configManager = new ConfigManager();
    22. }
    23. return configManager;
    24. }
    25. public String getValue(String key) {
    26. return properties.getProperty(key);
    27. }
    28. }
    • 创建com.qrsoft.etl.util.MapManager类,该类是地图管理的工具类,提供了地图操作的相关工具方法,例如:地图上某个经纬度的点是否在矩形区域内、判断是否在经纬度范围内等。
    1. package com.qrsoft.etl.util;
    2. public class MapManager {
    3. /**
    4. * 是否在矩形区域内
    5. * @Title: isInArea
    6. * @Description: TODO()
    7. * @param lat 测试点经度
    8. * @param lng 测试点纬度
    9. * @param minLat 纬度范围限制1
    10. * @param maxLat 纬度范围限制2
    11. * @param minLng 经度限制范围1
    12. * @param maxLng 经度范围限制2
    13. * @return boolean
    14. */
    15. public boolean isInRectangleArea(double lat,double lng,double minLat,
    16. double maxLat,double minLng,double maxLng){
    17. if(this.isInRange(lat, minLat, maxLat)){//如果在纬度的范围内
    18. if(minLng*maxLng>0){
    19. if(this.isInRange(lng, minLng, maxLng)){
    20. return true;
    21. }else {
    22. return false;
    23. }
    24. }else {
    25. if(Math.abs(minLng)+Math.abs(maxLng)<180){
    26. if(this.isInRange(lng, minLng, maxLng)){
    27. return true;
    28. }else {
    29. return false;
    30. }
    31. }else{
    32. double left = Math.max(minLng, maxLng);
    33. double right = Math.min(minLng, maxLng);
    34. if(this.isInRange(lng, left, 180)||this.isInRange(lng, right,-180)){
    35. return true;
    36. }else {
    37. return false;
    38. }
    39. }
    40. }
    41. }else{
    42. return false;
    43. }
    44. }
    45. /**
    46. * 判断是否在经纬度范围内
    47. */
    48. public boolean isInRange(double point, double left,double right){
    49. if(point>=Math.min(left, right)&&point<=Math.max(left, right)){
    50. return true;
    51. }else {
    52. return false;
    53. }
    54. }
    55. }
    • 创建com.qrsoft.etl.common.db.DBbean类,该类中定义了外部可以配置的连接池属性 可以允许外部配置,拥有默认值。
    1. package com.qrsoft.etl.common.db;
    2. public class DBbean {
    3. // 连接池属性
    4. private String driverName;
    5. private String url;
    6. private String userName;
    7. private String password;
    8. // 连接池名字
    9. private String poolName;
    10. private int minConnections = 10; // 空闲池,最小连接数
    11. private int maxConnections = 300; // 空闲池,最大连接数
    12. private int initConnections = 20;// 初始化连接数
    13. private long connTimeOut = 1000;// 重复获得连接的频率
    14. private int maxActiveConnections = 500;// 最大允许的连接数,和数据库对应
    15. private long connectionTimeOut = 0;// 连接超时时间,默认20分钟
    16. private boolean isCurrentConnection = true; // 是否获得当前连接,默认true
    17. private boolean isCheakPool = true; // 是否定时检查连接池
    18. private long lazyCheck = 1000 * 60 * 60;// 延迟多少时间后开始 检查
    19. private long periodCheck = 1000 * 60 * 60;// 检查频率
    20. public DBbean(String driverName, String url, String userName,String password, String poolName) {
    21. super();
    22. this.driverName = driverName;
    23. this.url = url;
    24. this.userName = userName;
    25. this.password = password;
    26. this.poolName = poolName;
    27. }
    28. public DBbean() {
    29. }
    30. public String getDriverName() {
    31. if (driverName == null) {
    32. driverName = this.getDriverName() + "_" + this.getUrl();
    33. }
    34. return driverName;
    35. }
    36. public void setDriverName(String driverName) {
    37. this.driverName = driverName;
    38. }
    39. public String getUrl() {
    40. return url;
    41. }
    42. public void setUrl(String url) {
    43. this.url = url;
    44. }
    45. public String getUserName() {
    46. return userName;
    47. }
    48. public void setUserName(String userName) {
    49. this.userName = userName;
    50. }
    51. public String getPassword() {
    52. return password;
    53. }
    54. public void setPassword(String password) {
    55. this.password = password;
    56. }
    57. public String getPoolName() {
    58. return poolName;
    59. }
    60. public void setPoolName(String poolName) {
    61. this.poolName = poolName;
    62. }
    63. public int getMinConnections() {
    64. return minConnections;
    65. }
    66. public void setMinConnections(int minConnections) {
    67. this.minConnections = minConnections;
    68. }
    69. public int getMaxConnections() {
    70. return maxConnections;
    71. }
    72. public void setMaxConnections(int maxConnections) {
    73. this.maxConnections = maxConnections;
    74. }
    75. public int getInitConnections() {
    76. return initConnections;
    77. }
    78. public void setInitConnections(int initConnections) {
    79. this.initConnections = initConnections;
    80. }
    81. public int getMaxActiveConnections() {
    82. return maxActiveConnections;
    83. }
    84. public void setMaxActiveConnections(int maxActiveConnections) {
    85. this.maxActiveConnections = maxActiveConnections;
    86. }
    87. public long getConnTimeOut() {
    88. return connTimeOut;
    89. }
    90. public void setConnTimeOut(long connTimeOut) {
    91. this.connTimeOut = connTimeOut;
    92. }
    93. public long getConnectionTimeOut() {
    94. return connectionTimeOut;
    95. }
    96. public void setConnectionTimeOut(long connectionTimeOut) {
    97. this.connectionTimeOut = connectionTimeOut;
    98. }
    99. public boolean isCurrentConnection() {
    100. return isCurrentConnection;
    101. }
    102. public void setCurrentConnection(boolean isCurrentConnection) {
    103. this.isCurrentConnection = isCurrentConnection;
    104. }
    105. public long getLazyCheck() {
    106. return lazyCheck;
    107. }
    108. public void setLazyCheck(long lazyCheck) {
    109. this.lazyCheck = lazyCheck;
    110. }
    111. public long getPeriodCheck() {
    112. return periodCheck;
    113. }
    114. public void setPeriodCheck(long periodCheck) {
    115. this.periodCheck = periodCheck;
    116. }
    117. public boolean isCheakPool() {
    118. return isCheakPool;
    119. }
    120. public void setCheakPool(boolean isCheakPool) {
    121. this.isCheakPool = isCheakPool;
    122. }
    123. }
    • 创建com.qrsoft.etl.common.db.DBInitInfo类,该类主要用于加载配置文件中所有的有关数据库的配置信息,初始化数据库连接对象。
    1. package com.qrsoft.etl.common.db;
    2. import com.qrsoft.etl.util.ConfigManager;
    3. import java.util.ArrayList;
    4. import java.util.List;
    5. public class DBInitInfo {
    6. // 设置注册属性
    7. public static String DRIVER = ConfigManager.getInstance().getValue("jdbc.driver");
    8. public static String URL = ConfigManager.getInstance().getValue("jdbc.url");
    9. public static String USERNAME = ConfigManager.getInstance().getValue("jdbc.username");
    10. public static String PASSWORD = ConfigManager.getInstance().getValue("jdbc.password");
    11. public static String MinConnections = ConfigManager.getInstance().getValue("jdbc.min");
    12. public static String MaxConnections = ConfigManager.getInstance().getValue("jdbc.max");
    13. public static List beans = null;
    14. static {
    15. beans = new ArrayList();
    16. // 这里数据 可以从xml 等配置文件进行获取,为了测试,这里直接写死了
    17. DBbean beanMysql = new DBbean();
    18. beanMysql.setDriverName(DRIVER);
    19. beanMysql.setUrl(URL);
    20. beanMysql.setUserName(USERNAME);
    21. beanMysql.setPassword(PASSWORD);
    22. beanMysql.setMinConnections(Integer.parseInt(MinConnections));
    23. beanMysql.setMaxConnections(Integer.parseInt(MaxConnections));
    24. beanMysql.setPoolName("pool");
    25. beans.add(beanMysql);
    26. }
    27. }
    • 创建com.qrsoft.etl.common.db.IConnectionPool接口,该接口中定义了连接池操作类的接口方法,如:获得连接、回收连接、销毁清空、查看连接池活动状态、定时检查连接池等接口方法。
    1. package com.qrsoft.etl.common.db;
    2. import java.sql.Connection;
    3. import java.sql.SQLException;
    4. public interface IConnectionPool {
    5. // 获得连接
    6. public Connection getConnection();
    7. // 获得当前连接
    8. public Connection getCurrentConnecton();
    9. // 回收连接
    10. public void releaseConn(Connection conn) throws SQLException;
    11. // 销毁清空
    12. public void destroy();
    13. // 连接池是活动状态
    14. public boolean isActive();
    15. // 定时器,检查连接池
    16. public void cheackPool();
    17. }
    • 创建com.qrsoft.etl.common.db.ConnectionPool类,该类实现了IConnectionPool接口,并实现了接口中定义的方法。
    1. package com.qrsoft.etl.common.db;
    2. import java.sql.Connection;
    3. import java.sql.DriverManager;
    4. import java.sql.SQLException;
    5. import java.util.List;
    6. import java.util.Timer;
    7. import java.util.TimerTask;
    8. import java.util.Vector;
    9. public class ConnectionPool implements IConnectionPool {
    10. // 连接池配置属性
    11. private DBbean dbBean;
    12. private boolean isActive = false; // 连接池活动状态
    13. private int contActive = 0;// 记录创建的总的连接数
    14. // 空闲连接
    15. private List<Connection> freeConnection = new Vector<Connection>();
    16. // 活动连接
    17. private List<Connection> activeConnection = new Vector<Connection>();
    18. // 将线程和连接绑定,保证事务能统一执行
    19. private static ThreadLocal<Connection> threadLocal = new ThreadLocal<Connection>();
    20. public ConnectionPool(DBbean dbBean) {
    21. super();
    22. this.dbBean = dbBean;
    23. init();
    24. cheackPool();
    25. }
    26. // 初始化
    27. public void init() {
    28. try {
    29. Class.forName(dbBean.getDriverName());
    30. for (int i = 0; i < dbBean.getInitConnections(); i++) {
    31. Connection conn;
    32. conn = newConnection();
    33. // 初始化最小连接数
    34. if (conn != null) {
    35. freeConnection.add(conn);
    36. contActive++;
    37. }
    38. }
    39. isActive = true;
    40. } catch (ClassNotFoundException e) {
    41. e.printStackTrace();
    42. } catch (SQLException e) {
    43. e.printStackTrace();
    44. }
    45. }
    46. // 获得当前连接
    47. public Connection getCurrentConnecton() {
    48. // 默认线程里面取
    49. Connection conn = threadLocal.get();
    50. if (!isValid(conn)) {
    51. conn = getConnection();
    52. }
    53. return conn;
    54. }
    55. // 获得连接
    56. public synchronized Connection getConnection() {
    57. Connection conn = null;
    58. try {
    59. // 判断是否超过最大连接数限制
    60. if (contActive < this.dbBean.getMaxActiveConnections()) {
    61. if (freeConnection.size() > 0) {
    62. conn = freeConnection.get(0);
    63. if (conn != null) {
    64. threadLocal.set(conn);
    65. }
    66. freeConnection.remove(0);
    67. } else {
    68. conn = newConnection();
    69. }
    70. } else {
    71. // 继续获得连接,直到从新获得连接
    72. wait(this.dbBean.getConnTimeOut());
    73. conn = getConnection();
    74. }
    75. if (isValid(conn)) {
    76. activeConnection.add(conn);
    77. contActive++;
    78. }
    79. } catch (SQLException e) {
    80. e.printStackTrace();
    81. } catch (ClassNotFoundException e) {
    82. e.printStackTrace();
    83. } catch (InterruptedException e) {
    84. e.printStackTrace();
    85. }
    86. return conn;
    87. }
    88. // 获得新连接
    89. private synchronized Connection newConnection() throws ClassNotFoundException, SQLException {
    90. Connection conn = null;
    91. if (dbBean != null) {
    92. Class.forName(dbBean.getDriverName());
    93. conn = DriverManager.getConnection(dbBean.getUrl(), dbBean.getUserName(), dbBean.getPassword());
    94. }
    95. return conn;
    96. }
    97. // 释放连接
    98. public synchronized void releaseConn(Connection conn) throws SQLException {
    99. if (isValid(conn)&& !(freeConnection.size() > dbBean.getMaxConnections())) {
    100. freeConnection.add(conn);
    101. activeConnection.remove(conn);
    102. contActive--;
    103. threadLocal.remove();
    104. // 唤醒所有正待等待的线程,去抢连接
    105. notifyAll();
    106. }
    107. }
    108. // 判断连接是否可用
    109. private boolean isValid(Connection conn) {
    110. try {
    111. if (conn == null || conn.isClosed()) {
    112. return false;
    113. }
    114. } catch (SQLException e) {
    115. e.printStackTrace();
    116. }
    117. return true;
    118. }
    119. // 销毁连接池
    120. public synchronized void destroy() {
    121. for (Connection conn : freeConnection) {
    122. try {
    123. if (isValid(conn)) {
    124. conn.close();
    125. }
    126. } catch (SQLException e) {
    127. e.printStackTrace();
    128. }
    129. }
    130. for (Connection conn : activeConnection) {
    131. try {
    132. if (isValid(conn)) {
    133. conn.close();
    134. }
    135. } catch (SQLException e) {
    136. e.printStackTrace();
    137. }
    138. }
    139. isActive = false;
    140. contActive = 0;
    141. }
    142. // 连接池状态
    143. public boolean isActive() {
    144. return isActive;
    145. }
    146. // 定时检查连接池情况
    147. public void cheackPool() {
    148. if (dbBean.isCheakPool()) {
    149. new Timer().schedule(new TimerTask() {
    150. @Override
    151. public void run() {
    152. // 1.对线程里面的连接状态
    153. // 2.连接池最小 最大连接数
    154. // 3.其他状态进行检查,因为这里还需要写几个线程管理的类,暂时就不添加了
    155. System.out.println("空线池连接数:" + freeConnection.size());
    156. System.out.println("活动连接数::" + activeConnection.size());
    157. System.out.println("总的连接数:" + contActive);
    158. }
    159. }, dbBean.getLazyCheck(), dbBean.getPeriodCheck());
    160. }
    161. }
    162. }
    • 创建com.qrsoft.etl.common.db.ConnectionPoolManager类,该类为数据库的连接管理类,是一个通用的固定写法,通过连接池管理数据库连接,提供了获取单例模式连接的实现,还提供了根据连接池名字获得连接的方法,以及关闭连接、清空连接池等方法。
    1. package com.qrsoft.etl.common.db;
    2. import java.sql.Connection;
    3. import java.sql.SQLException;
    4. import java.util.Hashtable;
    5. /**
    6. * 连接管理类
    7. */
    8. public class ConnectionPoolManager {
    9. // 连接池存放
    10. public Hashtable<String, IConnectionPool> pools = new Hashtable<String, IConnectionPool>();
    11. // 初始化
    12. private ConnectionPoolManager() {
    13. init();
    14. }
    15. // 单例实现
    16. public static ConnectionPoolManager getInstance() {
    17. return Singtonle.instance;
    18. }
    19. private static class Singtonle {
    20. private static ConnectionPoolManager instance = new ConnectionPoolManager();
    21. }
    22. // 初始化所有的连接池
    23. public void init() {
    24. for (int i = 0; i < DBInitInfo.beans.size(); i++) {
    25. DBbean bean = DBInitInfo.beans.get(i);
    26. ConnectionPool pool = new ConnectionPool(bean);
    27. if (pool != null) {
    28. pools.put(bean.getPoolName(), pool);
    29. System.out.println("Info:Init connection successed ->"
    30. + bean.getPoolName());
    31. }
    32. }
    33. }
    34. // 获得连接,根据连接池名字 获得连接
    35. public Connection getConnection(String poolName) {
    36. Connection conn = null;
    37. if (pools.size() > 0 && pools.containsKey(poolName)) {
    38. conn = getPool(poolName).getConnection();
    39. try {
    40. conn.setAutoCommit(false);
    41. } catch (SQLException e) {
    42. e.printStackTrace();
    43. }
    44. } else {
    45. System.out.println(
    46. "Error:Can't find this connecion pool ->" + poolName);
    47. }
    48. return conn;
    49. }
    50. // 关闭,回收连接
    51. public void close(String poolName, Connection conn) {
    52. IConnectionPool pool = getPool(poolName);
    53. try {
    54. if (pool != null) {
    55. pool.releaseConn(conn);
    56. }
    57. } catch (SQLException e) {
    58. System.out.println("连接池已经销毁");
    59. e.printStackTrace();
    60. }
    61. }
    62. // 清空连接池
    63. public void destroy(String poolName) {
    64. IConnectionPool pool = getPool(poolName);
    65. if (pool != null) {
    66. pool.destroy();
    67. }
    68. }
    69. // 获得连接池
    70. public IConnectionPool getPool(String poolName) {
    71. IConnectionPool pool = null;
    72. if (pools.size() > 0) {
    73. pool = pools.get(poolName);
    74. }
    75. return pool;
    76. }
    77. }
    • 创建com.qrsoft.etl.dao.IBaseDao类,该类主要用于数据库的通用操作,如创建连接、执行更新等操作。
    1. package com.qrsoft.etl.dao;
    2. import com.qrsoft.etl.common.db.ConnectionPoolManager;
    3. import com.qrsoft.etl.common.db.IConnectionPool;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import java.sql.*;
    7. import java.util.ArrayList;
    8. import java.util.HashMap;
    9. import java.util.List;
    10. import java.util.Map;
    11. public class IBaseDao {
    12. private final static Logger logger = LoggerFactory.getLogger(IBaseDao.class);
    13. public ResultSet rs = null;
    14. private IConnectionPool pool= ConnectionPoolManager.getInstance().getPool("pool");
    15. private Connection conn = getConnection();
    16. // 定义sql语句的执行对象
    17. private PreparedStatement pstmt;
    18. public Connection getConnection(){
    19. Connection conn = null;
    20. if(pool != null && pool.isActive()){
    21. conn = pool.getConnection();
    22. }
    23. return conn;
    24. }
    25. public Connection getCurrentConnection(){
    26. Connection conn = null;
    27. if(pool != null && pool.isActive()){
    28. conn = pool.getCurrentConnecton();
    29. }
    30. return conn;
    31. }
    32. /**
    33. * 查询
    34. */
    35. public ResultSet execute(String sql, Object params[]) {
    36. try {
    37. conn.setAutoCommit(false);
    38. pstmt = conn.prepareStatement(sql);
    39. for (int i = 0; i < params.length; i++) {
    40. pstmt.setObject(i + 1, params[i]);
    41. }
    42. rs = pstmt.executeQuery();
    43. pool.releaseConn(conn);
    44. } catch (SQLException e) {
    45. e.printStackTrace();
    46. logger.info("查询失败!", e.getMessage());
    47. }
    48. return rs;
    49. }
    50. /**
    51. * 更新
    52. */
    53. public boolean update(String sql, Object params[]) throws SQLException {
    54. boolean flag = false;
    55. try {
    56. conn.setAutoCommit(false);
    57. pstmt = conn.prepareStatement(sql);
    58. for (int i = 0; i < params.length; i++) {
    59. pstmt.setObject(i + 1, params[i]);
    60. }
    61. int i = pstmt.executeUpdate();
    62. if (i > 0){
    63. flag = true;
    64. }else{
    65. flag = false;
    66. }
    67. pool.releaseConn(conn);
    68. conn.commit();
    69. } catch (SQLException e) {
    70. conn.rollback();
    71. e.printStackTrace();
    72. logger.info("更新失败!", e.getMessage());
    73. }
    74. return flag;
    75. }
    76. /**
    77. * 更新一个
    78. */
    79. public boolean updateOne(String sql) throws SQLException {
    80. boolean flag = false;
    81. try {
    82. pstmt = conn.prepareStatement(sql);
    83. int i = pstmt.executeUpdate();
    84. if (i > 0){
    85. flag = true;
    86. } else{
    87. flag = false;
    88. }
    89. pool.releaseConn(conn);
    90. conn.commit();
    91. } catch (SQLException e) {
    92. conn.rollback();
    93. e.printStackTrace();
    94. logger.info("更新失败!", e.getMessage());
    95. }
    96. return flag;
    97. }
    98. /**
    99. * 执行查询操作
    100. */
    101. public List<Map<String, Object>> findResult(String sql, List params) throws SQLException {
    102. List<Map<String, Object>> list = new ArrayList>();
    103. int index = 1;
    104. pstmt = conn.prepareStatement(sql);
    105. if (params != null && !params.isEmpty()) {
    106. for (int i = 0; i < params.size(); i++) {
    107. pstmt.setObject(index++, params.get(i));
    108. }
    109. }
    110. rs = pstmt.executeQuery();
    111. ResultSetMetaData metaData = rs.getMetaData();
    112. int cols_len = metaData.getColumnCount();
    113. while (rs.next()) {
    114. Map<String, Object> map = new HashMap<String, Object>();
    115. for (int i = 0; i < cols_len; i++) {
    116. String cols_name = metaData.getColumnName(i + 1);
    117. Object cols_value = rs.getObject(cols_name);
    118. if (cols_value == null) {
    119. cols_value = "";
    120. }
    121. map.put(cols_name, cols_value);
    122. }
    123. list.add(map);
    124. }
    125. return list;
    126. }
    127. public Boolean getBool(ResultSet airRs){
    128. boolean bool = false;
    129. Integer no = 0;
    130. try {
    131. while (airRs.next()) {
    132. no = airRs.getInt(1);
    133. if (no>0) {
    134. bool = true;
    135. }
    136. }
    137. } catch (SQLException e) {
    138. bool = false;
    139. e.printStackTrace();
    140. }
    141. return bool;
    142. }
    143. }
    • 创建com.qrsoft.etl.common.Constants类,该类为全局常量类,定义了相关的常量,使用的时候直接通过Constants类调用,即方便修改,又可以避免手写时写错。
    1. package com.qrsoft.etl.common;
    2. public class Constants {
    3. //间隔时间(10分钟)
    4. public final static int INTERVAL_TIME_10MIN = 10*60*1000;
    5. //间隔时间(5分钟)
    6. public final static int INTERVAL_TIME_5MIN = 5*60*1000;
    7. //间隔时间(1分钟)
    8. public final static int INTERVAL_TIME_1MIN = 60*1000;
    9. //间隔时间(30秒)
    10. public final static int INTERVAL_TIME_30SEC = 30*1000;
    11. //间隔时间(10秒)
    12. public final static int INTERVAL_TIME_10SEC = 10*1000;
    13. //间隔时间(10秒)
    14. public final static int INTERVAL_TIME_5SEC = 5*1000;
    15. //每分钟读取条数
    16. public final static int READ_COUNT = 10;
    17. //kg_airport
    18. public final static String TABLE_AIRPORT = "kg_airport";
    19. //kg_airlinecompany
    20. public final static String TABLE_AIRLINECOMPANY = "kg_airlinecompany";
    21. //kg_PlanData计划数据
    22. public final static String TASK_PlANDATA = "task_PlanData";
    23. public final static String TABLE_PlANDATA = "Kg_PlanData";
    24. public final static String FAMILY_PlANDATA = "ReportHome";
    25. public final static String COLUMN_PlANDATA = "EXECUTE_DATE";
    26. //kg_MultiRadarData综合航迹数据
    27. public final static String TASK_RADAR = "task_Radar";
    28. public final static String TABLE_RADAR = "Kg_MultiRadarData";
    29. public final static String FAMILY_RADAR = "RadarHome";
    30. public final static String COLUMN_RADAR = "SEND_RADAR_TIME";
    31. //kg_AFTN报文数据
    32. public final static String TASK_AFTN = "task_Aftn";
    33. public final static String TABLE_AFTN = "Kg_AFTN";
    34. public final static String FAMILY_AFTN = "AFTNHome";
    35. public final static String COLUMN_AFTN = "EXECUTE_DATE";
    36. //Kg_ATCDutyInfo管制值班人员数据
    37. public final static String TASK_ATCDUTY = "task_ATCDuty";
    38. public final static String TABLE_ATCDUTY = "Kg_ATCDutyInfo";
    39. public final static String FAMILY_ATCDUTY = "ATCDutyHome";
    40. public final static String COLUMN_ATCDUTY = "SEND_TIME";
    41. //Kg_WarnFlightHistory航班指令告警数据
    42. public final static String TASK_WARNFLIGHT = "task_WarnFlight";
    43. public final static String TABLE_WARNFLIGHT = "Kg_WarnFlightHistory";
    44. public final static String FAMILY_WARNFLIGHT = "WarnFlightHome";
    45. public final static String COLUMN_WARNFLIGHT = "GJ_DATE";
    46. //Kg_WarnSimilarHistory相似航班号告警数据
    47. public final static String TASK_WARNSIMILAR = "task_WarnSimilar";
    48. public final static String TABLE_WARNSIMILAR = "Kg_WarnSimilarHistory";
    49. public final static String FAMILY_WARNSIMILAR = "WarnSimilarHome";
    50. public final static String COLUMN_WARNSIMILAR = "GJ_DATE";
    51. //Kg_ATC扇区信息
    52. public final static String TASK_ATC = "task_ATC";
    53. public final static String TABLE_ATC = "Kg_ATC";
    54. public final static String FAMILY_ATC = "ATCHome";
    55. public final static String COLUMN_ATC = "EXECUTE_DATE";
    56. //Kg_CallSaturation 扇区通话饱和度信息
    57. public final static String TASK_CALLSATURATION = "task_CallSaturation";
    58. public final static String TABLE_CALLSATURATION = "Kg_CallSaturation";
    59. public final static String FAMILY_CALLSATURATION = "SaturationHome";
    60. public final static String COLUMN_CALLSATURATION = "SEND_TIME";
    61. }

    2、创建com/qrsoft/etl/spark/SparkStreamingApplication.java类,在该类中配置Kafka和Spark的执行参数,然后使用Spark进行业务数据处理,在代码中会根据不同的Topic进入不同的分支,进行数据的处理。

    • 创建SparkStreamingApplication.java类,设置Zookeeper的brokers和zkServers,设置需要监听的Topic List
    1. package com.qrsoft.etl.spark;
    2. import com.qrsoft.etl.common.Constants;
    3. import com.qrsoft.etl.util.ConfigUtil;
    4. import net.sf.json.JSONObject;
    5. import org.apache.kafka.clients.consumer.ConsumerConfig;
    6. import org.apache.kafka.clients.consumer.ConsumerRecord;
    7. import org.apache.kafka.common.serialization.StringDeserializer;
    8. import org.apache.spark.SparkConf;
    9. import org.apache.spark.api.java.JavaSparkContext;
    10. import org.apache.spark.streaming.Durations;
    11. import org.apache.spark.streaming.api.java.JavaInputDStream;
    12. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    13. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    14. import org.apache.spark.streaming.kafka010.KafkaUtils;
    15. import org.apache.spark.streaming.kafka010.LocationStrategies;
    16. import org.springframework.stereotype.Component;
    17. import java.io.Serializable;
    18. import java.util.Arrays;
    19. import java.util.Collection;
    20. import java.util.HashMap;
    21. import java.util.Map;
    22. import java.util.regex.Pattern;
    23. @Component
    24. public class SparkStreamingApplication implements Serializable {
    25. static final Pattern SPACE = Pattern.compile(" ");
    26. // 多个以逗号隔开
    27. static String brokers = ConfigUtil.readProperty("brokers");
    28. static String zkserver = ConfigUtil.readProperty("zkserver");
    29. // 消费者组名称
    30. static String groupId = "spark_etl";
    31. // topic列表
    32. static String topicsStr = Constants.TASK_RADAR;
    33. static String[] topicsStrs = {
    34. Constants.TASK_PlANDATA,
    35. Constants.TASK_WARNFLIGHT,
    36. Constants.TASK_ATCDUTY,
    37. Constants.TASK_WARNSIMILAR,
    38. Constants.TASK_AFTN,
    39. Constants.TASK_ATC,
    40. Constants.TASK_CALLSATURATION,
    41. Constants.TASK_RADAR
    42. };
    43. /**
    44. * 启动Spark读取、清洗数据
    45. */
    46. public void SparkEtlStart() {
    47. // ... 在此处添加代码 ...
    48. }
    49. }
    • 在SparkEtlStart()方法内添加代码,配置Spark和Kafka参数,创建StreamingContext

    创建StreamingContext,设置拉取流的时间,准备读取Kafka数据。本地开发时Spark配置使用local[*]方式,设置成本地运行模式,放到集群中运行时需要修改为Yarn模式。

    1. SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("qst-etl");
    2. //反压机制
    3. conf.set("spark.streaming.backpressure.enabled", "true");
    4. conf.set("allowMultipleContexts", "true");
    5. JavaSparkContext sc = new JavaSparkContext(conf);
    6. // 获取jssc 以及设置获取流的时间
    7. JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
    8. jssc.sparkContext().setLogLevel("WARN");
    9. // Kafka 参数配置
    10. Map<String, Object> kafkaParams = new HashMap<>();
    11. kafkaParams.put("zookeeper.connect", zkserver);
    12. kafkaParams.put("bootstrap.servers", brokers);
    13. kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); //指定了KafkaConsumershuyu 哪一个消费者群组
    14. kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    15. kafkaParams.put("key.deserializer", StringDeserializer.class);
    16. kafkaParams.put("value.deserializer", StringDeserializer.class);
    17. kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    18. kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //读取Kafka最新的一条
    19. kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    20. Collection<String> topics = Arrays.asList(topicsStrs);
    • 拉取Kafka数据流
    1. // 获取流
    2. JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
    3. LocationStrategies.PreferConsistent(),
    4. ConsumerStrategies.Subscribe(topics, kafkaParams));
    • 解析Kafka数据流:解析流,对流进行循环处理,首先把流区分Topic,解析流中的value,其次根据不同Topic进入不同的分支,进行处理
    1. stream.foreachRDD(rdd -> {
    2. rdd.foreach(t -> {
    3. String topName = t.topic();
    4. JSONObject jsonObject = new JSONObject();
    5. String taskRadar = "";
    6. if(topName.equals(Constants.TASK_RADAR)){
    7. taskRadar = t.value();
    8. }else{
    9. jsonObject = JSONObject.parseObject(t.value());
    10. }
    11. SparkUtil sparkUtil = new SparkUtil();
    12. try {
    13. switch (topName) {
    14. case Constants.TASK_RADAR:
    15. // sparkUtil.TaskRadar(taskRadar);
    16. sparkUtil.TaskRadarStr(taskRadar);
    17. break;
    18. case Constants.TASK_PlANDATA:
    19. sparkUtil.TaskPlanData(jsonObject);
    20. break;
    21. case Constants.TASK_WARNFLIGHT:
    22. sparkUtil.TaskWarnfLight(jsonObject);
    23. break;
    24. case Constants.TASK_ATCDUTY:
    25. sparkUtil.TaskAtcduty(jsonObject);
    26. break;
    27. case Constants.TASK_WARNSIMILAR:
    28. sparkUtil.TaskWarnsimilar(jsonObject);
    29. break;
    30. case Constants.TASK_AFTN:
    31. sparkUtil.TaskAftn(jsonObject);
    32. break;
    33. case Constants.TASK_ATC:
    34. sparkUtil.TaskAtc(jsonObject);
    35. break;
    36. case Constants.TASK_CALLSATURATION:
    37. sparkUtil.TaskCallsaturation(jsonObject);
    38. break;
    39. }
    40. } catch (Exception e) {
    41. System.out.println(e);
    42. }
    43. //return Arrays.asList(SPACE.split(t.value())).iterator();
    44. });
    45. });
    • 启动Spark,进行业务处理
    1. // 打印结果
    2. //warns.print();
    3. try {
    4. // 启动
    5. jssc.start();
    6. jssc.awaitTermination();
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }

    3、将清洗后的数据保存到数据库中

    • 在上一步的代码中,解析Kafka数据流时,首先把流区分Topic,解析流中的value,其次根据不同Topic进入不同的分支,进行处理。例如:
    1. case Constants.TASK_PlANDATA:
    2. sparkUtil.TaskPlanData(jsonObject);
    3. break;

    该分支是处理机场起降数据,这里会用到一个类SparkUtil.java,该类中定义了处理不同Topic数据的方法,其中sparkUtil.TaskPlanData(jsonObject)就是处理机场起降数据对应的方法。

    主要任务是:对起降信息进行统计和更新、对航线信息进行统计和更新:

    1)首先判断是否有该机场航班起降的统计信息,如果数据库中没有该机场数据,则在数据库中插入;如果有则根据条件进行更新数据;

    2)对于航线信息也是如此,如果数据库中没有相应的航线数据,则在数据库中插入;否则根据条件进行更新。

    • 编写com.qrsoft.etl.spark.SparkUtil类,代码位置src/main/java/com/qrsoft/etl/spark/SparkUtil.java,在该类中添加一个方法TaskPlanData(jsonObject),用于处理“机场起降数据”对应的Topic中的数据。
    1. package com.qrsoft.etl.spark;
    2. import com.alibaba.fastjson.JSONObject;
    3. import com.qrsoft.etl.dao.PlanDataDao;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. import org.springframework.stereotype.Component;
    7. import java.text.SimpleDateFormat;
    8. import java.util.Date;
    9. @Component
    10. public class SparkUtil {
    11. private final static Logger logger = LoggerFactory.getLogger(SparkUtil.class);
    12. // 初始化扇区
    13. private static Double[] sectionG = {38.716066,42.297914,114.648477,128.759203};
    14. private static Double[] sectionK = {35.519796,38.716066,114.648477,128.759203};
    15. private static Double[] sectionE = {32.519796,35.519796,114.648477,128.759203};
    16. /**
    17. * 业务处理
    18. * @param jsonObject 机场起降数据
    19. * @throws Exception
    20. */
    21. public void TaskPlanData(JSONObject jsonObject) throws Exception {
    22. //起飞机场
    23. String adep = jsonObject.getString("ADEP");
    24. //降落机场
    25. String ades = jsonObject.getString("ADES");
    26. //操作数据库,统计和更新机场航班数
    27. operationDB(adep);
    28. operationDB(ades);
    29. //航班号
    30. String acid = jsonObject.getString("ACID");
    31. //操作数据库,统计和更新航线信息
    32. operationDBBOLT(adep, ades, acid);
    33. }
    34. /**
    35. * 操作数据库(对航班起降数进行统计或更新)
    36. * @param code “起飞机场”或“降落机场”
    37. */
    38. public void operationDB(String code) {
    39. //根据机场代码获取目前数据库中已存在的航班数
    40. PlanDataDao pDao = new PlanDataDao();
    41. boolean bool;
    42. try {
    43. bool = pDao.isExistThisAir(code);
    44. if (bool) {
    45. //存在,在原来基础上+1,修改数据库中该机场的航班数
    46. pDao.updateAnAirMsg(code);
    47. } else {
    48. //不存在,在统计表中创建该机场的航班数(默认为1
    49. pDao.createAnAirMsg(code);
    50. }
    51. } catch (Exception e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. /**
    56. * 操作数据库(对航线进行统计或更新)
    57. * @param adep 起飞机场
    58. * @param ades 降落机场
    59. * @param acid 航班号
    60. */
    61. public void operationDBBOLT(String adep, String ades, String acid) {
    62. boolean bool;
    63. PlanDataDao pDao = new PlanDataDao();
    64. if (pDao.isDomesticThisLine(adep) && pDao.isDomesticThisLine(ades)) {
    65. bool = pDao.isExistThisLine(acid);
    66. if (bool) {
    67. pDao.updateAnLineMsg(acid);
    68. } else {
    69. pDao.createAnLineMsg(acid, adep, ades);
    70. }
    71. }
    72. }
    73. // ... ...
    74. // ... 其他方法。因为当前要实现的是“机场起降数据”,所以其他可以只有方法体,没有方法实现及返回值。 ...
    75. // ... ...
    76. public void TaskRadarStr(String taskRadar) { }
    77. public void TaskWarnfLight(JSONObject jsonObject) { }
    78. public void TaskAtcduty(JSONObject jsonObject) { }
    79. public void TaskWarnsimilar(JSONObject jsonObject) { }
    80. public void TaskAftn(JSONObject jsonObject) { }
    81. public void TaskAtc(JSONObject jsonObject) { }
    82. public void TaskCallsaturation(JSONObject jsonObject) { }
    83. // ... ...
    84. }
    • 在上面的代码中会用到一个com.qrsoft.etl.dao.PlanDataDao类,代码位置src/main/java/com/qrsoft/etl/dao/PlanDataDao.java。该类是一个数据库操作类,会根据业务逻辑进行实际的数据库的操作,如增、删、改、查等。
    1. package com.qrsoft.etl.dao;
    2. import org.slf4j.Logger;
    3. import org.slf4j.LoggerFactory;
    4. import java.sql.ResultSet;
    5. import java.sql.SQLException;
    6. public class PlanDataDao extends IBaseDao {
    7. private final static Logger logger = LoggerFactory.getLogger(PlanDataDao.class);
    8. // ... ...
    9. // ... 添加方法实现 ...
    10. // ... ...
    11. }
    • 对于“处理机场起降数据”会涉及到以下的方法:

    查询该机场是否在国内:

    1. public boolean isDomesticThisLine(String code4){
    2. String sql = " SELECT COUNT(*) from airport_longlat where code4 ='"+code4+"';";
    3. Object[] params = {};
    4. ResultSet comRs = this.execute(sql, params);
    5. return getBool(comRs);
    6. }

    根据机场代码查询是否有该机场的统计信息:

    1. public boolean isExistThisAir(String code) {
    2. String sql = " SELECT COUNT(*) from airport_number where flightcode='"+code+"';";
    3. Object[] params = {};
    4. ResultSet airRs = this.execute(sql, params);
    5. return getBool(airRs);
    6. }

    如果根据机场代码查询,有该机场的统计信息,则在数据库中更新机场的起降航班数:

    1. public void updateAnAirMsg(String code) {
    2. String sql = "update airport_number set count=count+'1' where flightcode='"+code+"'; ";
    3. Object[] params = {};
    4. try {
    5. this.update(sql, params);
    6. } catch (SQLException e) {
    7. logger.info("修改指定机场的统计信息(统计数在原来基础上+1)失败! " + code);
    8. e.printStackTrace();
    9. }
    10. }

    如果根据机场代码查询,有该机场的统计信息,则在统计表中创建该机场的航班数(默认为1):

    1. public void createAnAirMsg(String code) {
    2. String sql = "insert into airport_number (flightcode,cname,count) values ('"+code+"',(select airport_cname from kg_airport where AIRPORT_CODE4 = '"+code+"'),'1');";
    3. Object[] params = {};
    4. try {
    5. this.update(sql, params);
    6. } catch (SQLException e) {
    7. logger.info("创建新机场的统计信息失败! " + code);
    8. e.printStackTrace();
    9. }
    10. }

    根据航班号查询是否有该航线存在:

    1. public boolean isExistThisLine(String acid){
    2. String sql = " SELECT COUNT(*) from airline_number where acid ='"+acid+"';";
    3. Object[] params = {};
    4. ResultSet comRs = this.execute(sql, params);
    5. return getBool(comRs);
    6. }

    根据航班号查询,有该航线统计信息,则在统计表中修改指定航线的统计信息(统计数在原来的基础上+1):

    1. public void updateAnLineMsg(String acid) {
    2. String sql = "update airline_number set count=count+1 where acid='"+acid+"';";
    3. Object[] params = {};
    4. try {
    5. this.update(sql, params);
    6. } catch (SQLException e) {
    7. logger.info("修改指定航线统计信息(统计数在原来基础上+1)失败! 航班号:" + acid);
    8. e.printStackTrace();
    9. }
    10. }

    根据航班号查询,没该航线统计信息,则创建新航线的统计信息:

    1. public void createAnLineMsg(String acid,String aDEP,String aDES) {
    2. String sql = "insert into airline_number (acid,adepcode,adescode,adepname,adesname,adeplong,adeplat,adeslong,adeslat,count) values ('"+acid+"','"+aDEP+"','"+aDES+"',(select airport_cname from kg_airport where airport_code4 = '"+aDEP+"'),(select airport_cname from kg_airport where airport_code4 = '"+aDES+"'),(select longitude from airport_longlat where code4 = '"+aDEP+"'),(select latitude from airport_longlat where code4 = '"+aDEP+"'),(select longitude from airport_longlat where code4 = '"+aDES+"'),(select latitude from airport_longlat where code4 = '"+aDES+"'),'1') ;";
    3. Object[] params = {};
    4. try {
    5. this.update(sql, params);
    6. } catch (SQLException e) {
    7. logger.info("创建新航线的统计信息失败! 航班号:" + acid);
    8. e.printStackTrace();
    9. }
    10. }

    该PlanDataDao.java类是一个普通的数据库操作类,非常简单,主要是根据我们的业务需求对数据进行相应的操作。这里会涉及到前面我们创建的一些数据库访问的工具类和辅助类:

    com.qrsoft.etl.dao.IBaseDao数据库访问的通用类,包括创建连接、执行更新等通用操作
    com.qrsoft.etl.common.db.ConnectionPoolManager连接管理类
    com.qrsoft.etl.common.db.IConnectionPool连接池管理类接口
    com.qrsoft.etl.common.db.ConnectionPool连接池管理类接口实现类
    com.qrsoft.etl.common.db.DBbean这是外部可以配置的连接池属性 可以允许外部配置,拥有默认值
    com.qrsoft.etl.common.db.DBInitInfo初始化,模拟加载所有的配置文件
    com.qrsoft.etl.common.Constants全局常量类
    com.qrsoft.etl.util.ConfigUtil加载配置文件的工具类
    com.qrsoft.etl.util.ConfigManager配置文件管理的工具类
    com.qrsoft.etl.util.MapManager地图管理的工具类,例如:是否在矩形区域内
    • 添加或修改如下配置文件:

    1)application.yml,服务器相关配置

    1. server:
    2. port: 8849
    3. spring:
    4. datasource:
    5. driver-class-name: com.mysql.jdbc.Driver
    6. username: root
    7. password: 123456
    8. url: jdbc:mysql://node3:3306/kongguan?serverTimezone=UTC
    9. redis:
    10. host: node3
    11. port: 6379
    12. database: 15
    13. mybatis-plus:
    14. configuration:
    15. log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    16. mapper-locations: classpath:/mapper/*.xml

    2)config.properties,MySQL数据库相关配置

    1. jdbc.driver=com.mysql.jdbc.Driver
    2. jdbc.url=jdbc:mysql://node3:3306/kongguan?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    3. jdbc.username=root
    4. jdbc.password=123456
    5. jdbc.min=20
    6. jdbc.max=500

    3)log4j.properties,日志文件相关配置

    1. #定义LOG输出级别
    2. #log4j.rootLogger=INFO,Console,File,stdout
    3. log4j.rootLogger=Console
    4. log4j.rootCategory = ERROR,Console
    5. #定义日志输出目的地为控制台
    6. log4j.appender.Console=org.apache.log4j.ConsoleAppender
    7. log4j.appender.Console.Target=System.out
    8. #可以灵活地指定日志输出格式,下面一行是指定具体的格式
    9. log4j.appender.Console.layout = org.apache.log4j.PatternLayout
    10. log4j.appender.Console.layout.ConversionPattern=[%c] - %m%n
    11. #文件大小到达指定尺寸的时候产生一个新的文件
    12. # log4j.appender.File = org.apache.log4j.RollingFileAppender
    13. log4j.appender.File.Append=true
    14. log4j.appender.File = org.apache.log4j.DailyRollingFileAppender
    15. #指定输出目录
    16. log4j.appender.File.File = /home/tmp/hbase
    17. log4j.appender.File.DatePattern = '_'yyyy-MM-dd'.log'
    18. #定义文件最大大小
    19. log4j.appender.File.MaxFileSize = 10MB
    20. # 输出所以日志,如果换成DEBUG表示输出DEBUG以上级别日志
    21. log4j.appender.File.Threshold = ERROR
    22. log4j.appender.File.layout = org.apache.log4j.PatternLayout
    23. log4j.appender.File.layout.ConversionPattern =[%p] [%d{yyyy-MM-dd HH\:mm\:ss}][%c]%m%n
    24. # mybatis日志输出
    25. log4j.logger.com.sarnath.ind.dao.IRoleDao.addPermission=TRACE
    26. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    27. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    28. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

    4)myconfig.properties,Zookeeper和Kafka相关配置

    1. brokers: node1:9092,node2:9092,node3:9092
    2. zkserver: node1:2181,node2:2181,node3:2181

    注意,在上面的步骤中只处理了一个Topic分支的数据,其余Topic分支,参考TASK_PlANDATA方式处理,自行实现其他XXX统计(请参考源代码)。

    4、修改项目启动文件BigDataEtlKongGuanApplication.java,内容如下:

    1. package com.qrsoft;
    2. import com.qrsoft.etl.spark.SparkStreamingApplication;
    3. import org.springframework.boot.SpringApplication;
    4. import org.springframework.boot.autoconfigure.SpringBootApplication;
    5. import org.springframework.context.ConfigurableApplicationContext;
    6. import org.springframework.scheduling.annotation.EnableScheduling;
    7. @SpringBootApplication
    8. @EnableScheduling
    9. public class BigDataEtlKongGuanApplication {
    10. public static void main(String[] args) {
    11. ConfigurableApplicationContext run = SpringApplication.run(BigDataEtlKongGuanApplication.class, args);
    12. SparkStreamingApplication bean = run.getBean(SparkStreamingApplication.class);
    13. bean.SparkEtlStart();
    14. }
    15. }

    5、测试

    • 确保Hadoop、Spark、Kafka、MySQL等环境均已经启动,如果没有启动,可参考前面的安装部署任务,自行启动。
    • 启动BigData-KongGuan项目(如果没有启动)
    • 启动BigData-Etl-KongGuan项目

    • 在node3节点,进入MySQL数据库,查询 airline*相关的表是否有数据
    [root@node3 ~]# mysql -uroot -p123456
    mysql> show databases;
    mysql> use kongguan;
    mysql> select * from airline_number limit 20;

    例如:统计airline_number表,记录数是递增的。

    mysql> select count(*) from airline_number;

  • 相关阅读:
    React 18 迁移状态逻辑至 Reducer 中
    UI设计师岗位的基本职责八篇
    排队(单调队列+二分)
    学历不好,还有希望进大厂吗?
    引导滤波融合matlab
    【WPF应用30】WPF中的ListBox控件详解
    Android基础-进程间通信
    HCIA-MSTP替代技术之链路捆绑(LACP模式)
    【unity实战】Unity实现2D人物双击疾跑
    java 比较运算符
  • 原文地址:https://blog.csdn.net/2201_75642955/article/details/136752301