• 尚硅谷大数据项目《在线教育之实时数仓》笔记004


    视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

    目录

    第8章 数仓开发之DIM层

    P024

    P025

    P026

    P027

    P028

    P029

    P030


    第8章 数仓开发之DIM层

    P024

    1. package com.atguigu.edu.realtime.app.func;
    2. import com.alibaba.druid.pool.DruidDataSource;
    3. import com.alibaba.druid.pool.DruidPooledConnection;
    4. import com.alibaba.fastjson.JSON;
    5. import com.alibaba.fastjson.JSONObject;
    6. import com.atguigu.edu.realtime.bean.DimTableProcess;
    7. import com.atguigu.edu.realtime.common.EduConfig;
    8. import com.atguigu.edu.realtime.util.DruidDSUtil;
    9. import com.atguigu.edu.realtime.util.PhoenixUtil;
    10. import org.apache.flink.api.common.state.BroadcastState;
    11. import org.apache.flink.api.common.state.MapStateDescriptor;
    12. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    13. import org.apache.flink.configuration.Configuration;
    14. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.sql.*;
    17. import java.util.*;
    18. public class DimBroadcastProcessFunction extends BroadcastProcessFunction {
    19. private MapStateDescriptor tableProcessState;
    20. // 初始化配置表数据
    21. private HashMap configMap = new HashMap<>();
    22. public DimBroadcastProcessFunction(MapStateDescriptor tableProcessState) {
    23. this.tableProcessState = tableProcessState;
    24. }
    25. @Override
    26. public void open(Configuration parameters) throws Exception {
    27. super.open(parameters);
    28. Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
    29. "user=root&password=123456&useUnicode=true&" +
    30. "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
    31. );
    32. PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
    33. ResultSet resultSet = preparedStatement.executeQuery();
    34. ResultSetMetaData metaData = resultSet.getMetaData();
    35. while (resultSet.next()) {
    36. JSONObject jsonObject = new JSONObject();
    37. for (int i = 1; i <= metaData.getColumnCount(); i++) {
    38. String columnName = metaData.getColumnName(i);
    39. String columnValue = resultSet.getString(i);
    40. jsonObject.put(columnName, columnValue);
    41. }
    42. DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
    43. configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
    44. }
    45. resultSet.close();
    46. preparedStatement.close();
    47. connection.close();
    48. }
    49. /**
    50. * @param value flinkCDC直接输入的json
    51. * @param ctx
    52. * @param out
    53. * @throws Exception
    54. */
    55. @Override
    56. public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
    57. //TODO 1 获取配置表数据解析格式
    58. JSONObject jsonObject = JSON.parseObject(value);
    59. String type = jsonObject.getString("op");
    60. BroadcastState tableConfigState = ctx.getBroadcastState(tableProcessState);
    61. if ("d".equals(type)) {
    62. // 从状态中删除对应的表格
    63. DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
    64. tableConfigState.remove(before.getSourceTable());
    65. // 从configMap中删除对应的表格
    66. configMap.remove(before.getSourceTable());
    67. } else {
    68. DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
    69. //TODO 3 将数据写入到状态 广播出去
    70. tableConfigState.put(after.getSourceTable(), after);
    71. //TODO 2 检查phoenix中是否存在表 不存在创建
    72. String sinkTable = after.getSinkTable();
    73. String sinkColumns = after.getSinkColumns();
    74. String sinkPk = after.getSinkPk();
    75. String sinkExtend = after.getSinkExtend();
    76. checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
    77. }
    78. }
    79. /**
    80. * @param value kafka中maxwell生成的json数据
    81. * @param ctx
    82. * @param out
    83. * @throws Exception
    84. */
    85. @Override
    86. public void processElement(JSONObject value, ReadOnlyContext ctx, Collector out) throws Exception {
    87. //TODO 1 获取广播的配置数据
    88. //TODO 2 过滤出需要的维度字段
    89. //TODO 3 补充输出字段
    90. }
    91. }

    P025

    8.3.2 根据MySQL的配置表,动态进行分流

    6)创建连接池工具类

    1. package com.atguigu.edu.realtime.app.func;
    2. import com.alibaba.druid.pool.DruidDataSource;
    3. import com.alibaba.druid.pool.DruidPooledConnection;
    4. import com.alibaba.fastjson.JSON;
    5. import com.alibaba.fastjson.JSONObject;
    6. import com.atguigu.edu.realtime.bean.DimTableProcess;
    7. import com.atguigu.edu.realtime.common.EduConfig;
    8. import com.atguigu.edu.realtime.util.DruidDSUtil;
    9. import com.atguigu.edu.realtime.util.PhoenixUtil;
    10. import org.apache.flink.api.common.state.BroadcastState;
    11. import org.apache.flink.api.common.state.MapStateDescriptor;
    12. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    13. import org.apache.flink.configuration.Configuration;
    14. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.sql.*;
    17. import java.util.*;
    18. public class DimBroadcastProcessFunction extends BroadcastProcessFunction {
    19. private MapStateDescriptor tableProcessState;
    20. // 初始化配置表数据
    21. private HashMap configMap = new HashMap<>();
    22. public DimBroadcastProcessFunction(MapStateDescriptor tableProcessState) {
    23. this.tableProcessState = tableProcessState;
    24. }
    25. @Override
    26. public void open(Configuration parameters) throws Exception {
    27. super.open(parameters);
    28. Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
    29. "user=root&password=123456&useUnicode=true&" +
    30. "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
    31. );
    32. PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
    33. ResultSet resultSet = preparedStatement.executeQuery();
    34. ResultSetMetaData metaData = resultSet.getMetaData();
    35. while (resultSet.next()) {
    36. JSONObject jsonObject = new JSONObject();
    37. for (int i = 1; i <= metaData.getColumnCount(); i++) {
    38. String columnName = metaData.getColumnName(i);
    39. String columnValue = resultSet.getString(i);
    40. jsonObject.put(columnName, columnValue);
    41. }
    42. DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
    43. configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
    44. }
    45. resultSet.close();
    46. preparedStatement.close();
    47. connection.close();
    48. }
    49. /**
    50. * @param value flinkCDC直接输入的json
    51. * @param ctx
    52. * @param out
    53. * @throws Exception
    54. */
    55. @Override
    56. public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
    57. //TODO 1 获取配置表数据解析格式
    58. JSONObject jsonObject = JSON.parseObject(value);
    59. String type = jsonObject.getString("op");
    60. BroadcastState tableConfigState = ctx.getBroadcastState(tableProcessState);
    61. if ("d".equals(type)) {
    62. // 从状态中删除对应的表格
    63. DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
    64. tableConfigState.remove(before.getSourceTable());
    65. // 从configMap中删除对应的表格
    66. configMap.remove(before.getSourceTable());
    67. } else {
    68. DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
    69. //TODO 3 将数据写入到状态 广播出去
    70. tableConfigState.put(after.getSourceTable(), after);
    71. //TODO 2 检查phoenix中是否存在表 不存在创建
    72. String sinkTable = after.getSinkTable();
    73. String sinkColumns = after.getSinkColumns();
    74. String sinkPk = after.getSinkPk();
    75. String sinkExtend = after.getSinkExtend();
    76. checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
    77. }
    78. }
    79. private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
    80. // create table if not exists table (id string pk, name string...)
    81. // 拼接建表语句的sql
    82. StringBuilder sql = new StringBuilder();
    83. sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
    84. // 判断主键
    85. // 如果主键为空,默认使用id
    86. if (sinkPk == null) {
    87. sinkPk = "";
    88. }
    89. if (sinkExtend == null) {
    90. sinkExtend = "";
    91. }
    92. // 遍历字段拼接建表语句
    93. String[] split = sinkColumns.split(",");
    94. for (int i = 0; i < split.length; i++) {
    95. sql.append(split[i] + " varchar");
    96. if (split[i].equals(sinkPk)) {
    97. sql.append(" primary key");
    98. }
    99. if (i < split.length - 1) {
    100. sql.append(",\n");
    101. }
    102. }
    103. sql.append(") ");
    104. sql.append(sinkExtend);
    105. PhoenixUtil.executeDDL(sql.toString());
    106. }
    107. /**
    108. * @param value kafka中maxwell生成的json数据
    109. * @param ctx
    110. * @param out
    111. * @throws Exception
    112. */
    113. @Override
    114. public void processElement(JSONObject value, ReadOnlyContext ctx, Collector out) throws Exception {
    115. //TODO 1 获取广播的配置数据
    116. //TODO 2 过滤出需要的维度字段
    117. //TODO 3 补充输出字段
    118. }
    119. }

    P026

    P027

    启动hadoop、zookeeper、kafka、hbase。p41

    1. [atguigu@node001 ~]$ myhadoop.sh start
    2. ================ 启动 hadoop集群 ================
    3. ---------------- 启动 hdfs ----------------
    4. Starting namenodes on [node001]
    5. Starting datanodes
    6. Starting secondary namenodes [node003]
    7. --------------- 启动 yarn ---------------
    8. Starting resourcemanager
    9. Starting nodemanagers
    10. --------------- 启动 historyserver ---------------
    11. [atguigu@node001 ~]$ zookeeper.sh start
    12. ---------- zookeeper node001 启动 ----------
    13. ZooKeeper JMX enabled by default
    14. Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
    15. Starting zookeeper ... STARTED
    16. ---------- zookeeper node002 启动 ----------
    17. ZooKeeper JMX enabled by default
    18. Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
    19. Starting zookeeper ... STARTED
    20. ---------- zookeeper node003 启动 ----------
    21. ZooKeeper JMX enabled by default
    22. Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
    23. Starting zookeeper ... STARTED
    24. [atguigu@node001 ~]$
    25. [atguigu@node001 ~]$
    26. [atguigu@node001 ~]$ kafka.sh start
    27. --------------- node001 Kafka 启动 ---------------
    28. --------------- node002 Kafka 启动 ---------------
    29. --------------- node003 Kafka 启动 ---------------
    30. [atguigu@node001 ~]$
    31. [atguigu@node001 ~]$
    32. [atguigu@node001 ~]$ start-hbase.sh
    33. SLF4J: Class path contains multiple SLF4J bindings.
    34. SLF4J: Found binding in [jar:file:/opt/module/hbase/hbase-2.0.5/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    35. SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    36. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    37. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    38. running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node001.out
    39. node002: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node002.out
    40. node003: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node003.out
    41. node001: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node001.out
    42. node002: running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node002.out
    43. [atguigu@node001 ~]$ jpsall
    44. ================ node001 ================
    45. 4880 HMaster
    46. 4615 Kafka
    47. 4183 QuorumPeerMain
    48. 3017 DataNode
    49. 2858 NameNode
    50. 5083 HRegionServer
    51. 3676 JobHistoryServer
    52. 3454 NodeManager
    53. 5406 Jps
    54. ================ node002 ================
    55. 2080 ResourceManager
    56. 4050 Jps
    57. 3397 Kafka
    58. 3574 HRegionServer
    59. 2966 QuorumPeerMain
    60. 3719 HMaster
    61. 1833 DataNode
    62. 2233 NodeManager
    63. ================ node003 ================
    64. 3265 Kafka
    65. 2833 QuorumPeerMain
    66. 3634 Jps
    67. 2067 SecondaryNameNode
    68. 2245 NodeManager
    69. 1941 DataNode
    70. 3430 HRegionServer
    71. [atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
    72. [atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ cd bin
    73. [atguigu@node001 bin]$ pwd
    74. /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin
    75. [atguigu@node001 bin]$ ./sqlline.py node001:2181
    76. Setting property: [incremental, false]
    77. Setting property: [isolation, TRANSACTION_READ_COMMITTED]
    78. issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
    79. Connecting to jdbc:phoenix:node001:2181
    80. SLF4J: Class path contains multiple SLF4J bindings.
    81. SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    82. SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    83. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    84. 23/10/26 20:07:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    85. Connected to: Phoenix (version 5.0)
    86. Driver: PhoenixEmbeddedDriver (version 5.0)
    87. Autocommit status: true
    88. Transaction isolation: TRANSACTION_READ_COMMITTED
    89. Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
    90. 133/133 (100%) Done
    91. Done
    92. sqlline version 1.2.0
    93. 0: jdbc:phoenix:node001:2181> create schema EDU_REALTIME;
    94. No rows affected (3.039 seconds)
    95. 0: jdbc:phoenix:node001:2181> !tables
    96. +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
    97. | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_RO |
    98. +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
    99. | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false |
    100. | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false |
    101. | | SYSTEM | LOG | SYSTEM TABLE | | | | | | true |
    102. | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false |
    103. | | SYSTEM | STATS | SYSTEM TABLE | | | | | | false |
    104. +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
    105. 0: jdbc:phoenix:node001:2181> !tables
    106. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
    107. | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE |
    108. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
    109. | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | |
    110. | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | |
    111. | | SYSTEM | LOG | SYSTEM TABLE | | | | | |
    112. | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | |
    113. | | SYSTEM | STATS | SYSTEM TABLE | | | | | |
    114. | | EDU_REALTIME | DIM_BASE_CATEGORY_INFO | TABLE | | | | | |
    115. | | EDU_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | |
    116. | | EDU_REALTIME | DIM_BASE_SOURCE | TABLE | | | | | |
    117. | | EDU_REALTIME | DIM_BASE_SUBJECT_INFO | TABLE | | | | | |
    118. | | EDU_REALTIME | DIM_CHAPTER_INFO | TABLE | | | | | |
    119. | | EDU_REALTIME | DIM_COURSE_INFO | TABLE | | | | | |
    120. | | EDU_REALTIME | DIM_KNOWLEDGE_POINT | TABLE | | | | | |
    121. | | EDU_REALTIME | DIM_TEST_PAPER | TABLE | | | | | |
    122. | | EDU_REALTIME | DIM_TEST_PAPER_QUESTION | TABLE | | | | | |
    123. | | EDU_REALTIME | DIM_TEST_POINT_QUESTION | TABLE | | | | | |
    124. | | EDU_REALTIME | DIM_TEST_QUESTION_INFO | TABLE | | | | | |
    125. | | EDU_REALTIME | DIM_TEST_QUESTION_OPTION | TABLE | | | | | |
    126. | | EDU_REALTIME | DIM_USER_INFO | TABLE | | | | | |
    127. | | EDU_REALTIME | DIM_VIDEO_INFO | TABLE | | | | | |
    128. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
    129. 0: jdbc:phoenix:node001:2181>

    P028

    1. package com.atguigu.edu.realtime.app.func;
    2. import com.alibaba.druid.pool.DruidDataSource;
    3. import com.alibaba.druid.pool.DruidPooledConnection;
    4. import com.alibaba.fastjson.JSON;
    5. import com.alibaba.fastjson.JSONObject;
    6. import com.atguigu.edu.realtime.bean.DimTableProcess;
    7. import com.atguigu.edu.realtime.common.EduConfig;
    8. import com.atguigu.edu.realtime.util.DruidDSUtil;
    9. import com.atguigu.edu.realtime.util.PhoenixUtil;
    10. import org.apache.flink.api.common.state.BroadcastState;
    11. import org.apache.flink.api.common.state.MapStateDescriptor;
    12. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    13. import org.apache.flink.configuration.Configuration;
    14. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    15. import org.apache.flink.util.Collector;
    16. import java.sql.*;
    17. import java.util.*;
    18. public class DimBroadcastProcessFunction extends BroadcastProcessFunction {
    19. private MapStateDescriptor tableProcessState;
    20. // 初始化配置表数据
    21. private HashMap configMap = new HashMap<>();
    22. public DimBroadcastProcessFunction(MapStateDescriptor tableProcessState) {
    23. this.tableProcessState = tableProcessState;
    24. }
    25. @Override
    26. public void open(Configuration parameters) throws Exception {
    27. super.open(parameters);
    28. Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
    29. "user=root&password=123456&useUnicode=true&" +
    30. "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
    31. );
    32. PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
    33. ResultSet resultSet = preparedStatement.executeQuery();
    34. ResultSetMetaData metaData = resultSet.getMetaData();
    35. while (resultSet.next()) {
    36. JSONObject jsonObject = new JSONObject();
    37. for (int i = 1; i <= metaData.getColumnCount(); i++) {
    38. String columnName = metaData.getColumnName(i);
    39. String columnValue = resultSet.getString(i);
    40. jsonObject.put(columnName, columnValue);
    41. }
    42. DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
    43. configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
    44. }
    45. resultSet.close();
    46. preparedStatement.close();
    47. connection.close();
    48. }
    49. /**
    50. * @param value flinkCDC直接输入的json
    51. * @param ctx
    52. * @param out
    53. * @throws Exception
    54. */
    55. @Override
    56. public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
    57. //TODO 1 获取配置表数据解析格式
    58. JSONObject jsonObject = JSON.parseObject(value);
    59. String type = jsonObject.getString("op");
    60. BroadcastState tableConfigState = ctx.getBroadcastState(tableProcessState);
    61. if ("d".equals(type)) {
    62. // 从状态中删除对应的表格
    63. DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
    64. tableConfigState.remove(before.getSourceTable());
    65. // 从configMap中删除对应的表格
    66. configMap.remove(before.getSourceTable());
    67. } else {
    68. DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
    69. //TODO 3 将数据写入到状态 广播出去
    70. tableConfigState.put(after.getSourceTable(), after);
    71. //TODO 2 检查phoenix中是否存在表 不存在创建
    72. String sinkTable = after.getSinkTable();
    73. String sinkColumns = after.getSinkColumns();
    74. String sinkPk = after.getSinkPk();
    75. String sinkExtend = after.getSinkExtend();
    76. checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
    77. }
    78. }
    79. private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
    80. // create table if not exists table (id string pk, name string...)
    81. // 拼接建表语句的sql
    82. StringBuilder sql = new StringBuilder();
    83. sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
    84. // 判断主键
    85. // 如果主键为空,默认使用id
    86. if (sinkPk == null) {
    87. sinkPk = "";
    88. }
    89. if (sinkExtend == null) {
    90. sinkExtend = "";
    91. }
    92. // 遍历字段拼接建表语句
    93. String[] split = sinkColumns.split(",");
    94. for (int i = 0; i < split.length; i++) {
    95. sql.append(split[i] + " varchar");
    96. if (split[i].equals(sinkPk)) {
    97. sql.append(" primary key");
    98. }
    99. if (i < split.length - 1) {
    100. sql.append(",\n");
    101. }
    102. }
    103. sql.append(") ");
    104. sql.append(sinkExtend);
    105. PhoenixUtil.executeDDL(sql.toString());
    106. }
    107. /**
    108. * @param value kafka中maxwell生成的json数据
    109. * @param ctx
    110. * @param out
    111. * @throws Exception
    112. */
    113. @Override
    114. public void processElement(JSONObject value, ReadOnlyContext ctx, Collector out) throws Exception {
    115. //TODO 1 获取广播的配置数据
    116. ReadOnlyBroadcastState tableConfigState = ctx.getBroadcastState(tableProcessState);
    117. DimTableProcess tableProcess = tableConfigState.get(value.getString("table"));
    118. // 补充情况,防止kafka数据到的过快 造成数据丢失
    119. if (tableProcess == null) {
    120. tableProcess = configMap.get(value.getString("table"));
    121. }
    122. if (tableProcess != null) {
    123. String type = value.getString("type");
    124. if (type == null) {
    125. System.out.println("maxwell采集的数据不完整...");
    126. } else {
    127. JSONObject data = value.getJSONObject("data");
    128. //TODO 2 过滤出需要的维度字段
    129. String sinkColumns = tableProcess.getSinkColumns();
    130. filterColumns(data, sinkColumns);
    131. //TODO 3 补充输出字段
    132. data.put("sink_table", tableProcess.getSinkTable());
    133. // 添加数据的类型
    134. data.put("type", type);
    135. out.collect(data);
    136. }
    137. }
    138. }
    139. private void filterColumns(JSONObject data, String sinkColumns) {
    140. Set> entries = data.entrySet();
    141. List stringList = Arrays.asList(sinkColumns.split(","));
    142. entries.removeIf(entry -> !stringList.contains(entry.getKey()));
    143. }
    144. }

    P029

    1. package com.atguigu.edu.realtime.util;
    2. import com.alibaba.druid.pool.DruidDataSource;
    3. import com.alibaba.druid.pool.DruidPooledConnection;
    4. import com.alibaba.fastjson.JSONObject;
    5. import com.atguigu.edu.realtime.common.EduConfig;
    6. import org.apache.commons.beanutils.BeanUtils;
    7. import org.apache.commons.lang3.StringUtils;
    8. import java.lang.reflect.InvocationTargetException;
    9. import java.sql.PreparedStatement;
    10. import java.sql.ResultSet;
    11. import java.sql.ResultSetMetaData;
    12. import java.sql.SQLException;
    13. import java.util.ArrayList;
    14. import java.util.List;
    15. import java.util.Map;
    16. import java.util.Set;
    17. public class PhoenixUtil {
    18. private static DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource();
    19. public static void executeDDL(String sqlString) {
    20. DruidPooledConnection connection = null;
    21. PreparedStatement preparedStatement = null;
    22. try {
    23. connection = druidDataSource.getConnection();
    24. } catch (SQLException throwables) {
    25. throwables.printStackTrace();
    26. System.out.println("连接池获取连接异常...");
    27. }
    28. try {
    29. preparedStatement = connection.prepareStatement(sqlString);
    30. } catch (SQLException throwables) {
    31. throwables.printStackTrace();
    32. System.out.println("编译sql异常...");
    33. }
    34. try {
    35. preparedStatement.execute();
    36. } catch (SQLException throwables) {
    37. throwables.printStackTrace();
    38. System.out.println("建表语句错误...");
    39. }
    40. // 关闭资源
    41. try {
    42. preparedStatement.close();
    43. } catch (SQLException throwables) {
    44. throwables.printStackTrace();
    45. }
    46. try {
    47. connection.close();
    48. } catch (SQLException throwables) {
    49. throwables.printStackTrace();
    50. }
    51. }
    52. public static void executeDML(String sinkTable, JSONObject jsonObject) {
    53. // TODO 2 拼接sql语言
    54. StringBuilder sql = new StringBuilder();
    55. Set> entries = jsonObject.entrySet();
    56. ArrayList columns = new ArrayList<>();
    57. ArrayList values = new ArrayList<>();
    58. StringBuilder symbols = new StringBuilder();
    59. for (Map.Entry entry : entries) {
    60. columns.add(entry.getKey());
    61. values.add(entry.getValue());
    62. symbols.append("?,");
    63. }
    64. sql.append("upsert into " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(");
    65. // 拼接列名
    66. String columnsStrings = StringUtils.join(columns, ",");
    67. String symbolStr = symbols.substring(0, symbols.length() - 1).toString();
    68. sql.append(columnsStrings)
    69. .append(")values(")
    70. .append(symbolStr)
    71. .append(")");
    72. DruidPooledConnection connection = null;
    73. try {
    74. connection = druidDataSource.getConnection();
    75. } catch (SQLException throwables) {
    76. throwables.printStackTrace();
    77. System.out.println("连接池获取连接异常...");
    78. }
    79. PreparedStatement preparedStatement = null;
    80. try {
    81. preparedStatement = connection.prepareStatement(sql.toString());
    82. // 传入参数
    83. for (int i = 0; i < values.size(); i++) {
    84. preparedStatement.setObject(i + 1, values.get(i) + "");
    85. }
    86. } catch (SQLException throwables) {
    87. throwables.printStackTrace();
    88. System.out.println("编译sql异常...");
    89. }
    90. try {
    91. preparedStatement.executeUpdate();
    92. } catch (SQLException throwables) {
    93. throwables.printStackTrace();
    94. System.out.println("写入phoenix错误...");
    95. }
    96. try {
    97. preparedStatement.close();
    98. } catch (SQLException throwables) {
    99. throwables.printStackTrace();
    100. }
    101. try {
    102. connection.close();
    103. } catch (SQLException throwables) {
    104. throwables.printStackTrace();
    105. }
    106. }
    107. public static List queryList(String sql, Class clazz) {
    108. ArrayList resultList = new ArrayList<>();
    109. DruidPooledConnection connection = null;
    110. PreparedStatement preparedStatement = null;
    111. try {
    112. connection = druidDataSource.getConnection();
    113. preparedStatement = connection.prepareStatement(sql);
    114. ResultSet resultSet = preparedStatement.executeQuery();
    115. ResultSetMetaData metaData = resultSet.getMetaData();
    116. while (resultSet.next()) {
    117. T obj = clazz.newInstance();
    118. for (int i = 1; i <= metaData.getColumnCount(); i++) {
    119. String columnName = metaData.getColumnName(i);
    120. Object columnValue = resultSet.getObject(i);
    121. BeanUtils.setProperty(obj, columnName, columnValue);
    122. }
    123. resultList.add(obj);
    124. }
    125. } catch (Exception e) {
    126. e.printStackTrace();
    127. }
    128. if (preparedStatement != null) {
    129. try {
    130. preparedStatement.close();
    131. } catch (SQLException throwables) {
    132. throwables.printStackTrace();
    133. }
    134. }
    135. if (connection != null) {
    136. try {
    137. connection.close();
    138. } catch (SQLException throwables) {
    139. throwables.printStackTrace();
    140. }
    141. }
    142. return resultList;
    143. }
    144. }
      1. package com.atguigu.edu.realtime.app.func;
      2. import com.alibaba.fastjson.JSONObject;
      3. import com.atguigu.edu.realtime.util.DimUtil;
      4. import com.atguigu.edu.realtime.util.PhoenixUtil;
      5. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
      6. public class DimPhoenixSinkFunc implements SinkFunction {
      7. @Override
      8. public void invoke(JSONObject jsonObject, Context context) throws Exception {
      9. // TODO 1 获取输出的表名
      10. String sinkTable = jsonObject.getString("sink_table");
      11. // String type = jsonObject.getString("type");
      12. // String id = jsonObject.getString("id");
      13. jsonObject.remove("sink_table");
      14. // jsonObject.remove("type");
      15. // TODO 2 使用工具类写出数据
      16. PhoenixUtil.executeDML(sinkTable, jsonObject);
      17. // TODO 3 如果类型为update,删除redis对应缓存
      18. // if ("update".equals(type)) {
      19. // DimUtil.deleteCached(sinkTable, id);
      20. // }
      21. }
      22. }

      P030

      启动hadoop、zookeeper、kafka、hbase、maxwell、apache-phoenix-5.0.0-HBase。

      1. org.apache.phoenix.schema.ColumnNotFoundException: ERROR 504 (42703): Undefined column. columnName=EDU_REALTIME.DIM_BASE_PROVINCE.TYPE
      2. at org.apache.phoenix.schema.PTableImpl.getColumnForColumnName(PTableImpl.java:828)
      3. at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.resolveColumn(FromCompiler.java:477)
      4. at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:452)
      5. at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:784)
      6. at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:770)
      7. at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:401)
      8. at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:391)
      9. at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
      10. at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:390)
      11. at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:378)
      12. at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeUpdate(PhoenixPreparedStatement.java:206)
      13. at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeUpdate(DruidPooledPreparedStatement.java:255)
      14. at com.atguigu.edu.realtime.util.PhoenixUtil.executeDML(PhoenixUtil.java:105)
      15. at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:19)
      16. at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:7)
      17. at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
      18. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
      19. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
      20. at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      21. at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
      22. at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
      23. at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
      24. at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:148)
      25. at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:21)
      26. at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)
      27. at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
      28. at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
      29. at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
      30. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
      31. at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
      32. at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
      33. at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
      34. at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
      35. at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
      36. at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
      37. at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
      38. at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
      39. at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
      40. at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
      41. at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
      42. at java.lang.Thread.run(Thread.java:748)
      43. 写入phoenix错误...

      直接使用最终的项目代码运行,懒得改bug了。。。

      1. [atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/
      2. [atguigu@node001 bin]$ ./sqlline.py node001:2181
      3. Setting property: [incremental, false]
      4. Setting property: [isolation, TRANSACTION_READ_COMMITTED]
      5. issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
      6. Connecting to jdbc:phoenix:node001:2181
      7. SLF4J: Class path contains multiple SLF4J bindings.
      8. SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      9. SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
      10. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
      11. 23/10/31 11:04:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      12. Connected to: Phoenix (version 5.0)
      13. Driver: PhoenixEmbeddedDriver (version 5.0)
      14. Autocommit status: true
      15. Transaction isolation: TRANSACTION_READ_COMMITTED
      16. Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
      17. 246/246 (100%) Done
      18. Done
      19. sqlline version 1.2.0
      20. 0: jdbc:phoenix:node001:2181> !tables
      21. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
      22. | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE |
      23. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
      24. | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | |
      25. | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | |
      26. | | SYSTEM | LOG | SYSTEM TABLE | | | | | |
      27. | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | |
      28. | | SYSTEM | STATS | SYSTEM TABLE | | | | | |
      29. | | EDU_REALTIME | DIM_BASE_CATEGORY_INFO | TABLE | | | | | |
      30. | | EDU_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | |
      31. | | EDU_REALTIME | DIM_BASE_SOURCE | TABLE | | | | | |
      32. | | EDU_REALTIME | DIM_BASE_SUBJECT_INFO | TABLE | | | | | |
      33. | | EDU_REALTIME | DIM_CHAPTER_INFO | TABLE | | | | | |
      34. | | EDU_REALTIME | DIM_COURSE_INFO | TABLE | | | | | |
      35. | | EDU_REALTIME | DIM_KNOWLEDGE_POINT | TABLE | | | | | |
      36. | | EDU_REALTIME | DIM_TEST_PAPER | TABLE | | | | | |
      37. | | EDU_REALTIME | DIM_TEST_PAPER_QUESTION | TABLE | | | | | |
      38. | | EDU_REALTIME | DIM_TEST_POINT_QUESTION | TABLE | | | | | |
      39. | | EDU_REALTIME | DIM_TEST_QUESTION_INFO | TABLE | | | | | |
      40. | | EDU_REALTIME | DIM_TEST_QUESTION_OPTION | TABLE | | | | | |
      41. | | EDU_REALTIME | DIM_USER_INFO | TABLE | | | | | |
      42. | | EDU_REALTIME | DIM_VIDEO_INFO | TABLE | | | | | |
      43. +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
      44. 0: jdbc:phoenix:node001:2181> select * from EDU_REALTIME.DIM_BASE_CATEGORY_INFO;
      45. +-----+----------------+----------------------+--------------+----------+
      46. | ID | CATEGORY_NAME | CREATE_TIME | UPDATE_TIME | DELETED |
      47. +-----+----------------+----------------------+--------------+----------+
      48. | 1 | 编程技术 | 2021-09-24 22:19:37 | null | 0 |
      49. +-----+----------------+----------------------+--------------+----------+
      50. 1 row selected (0.333 seconds)
      51. 0: jdbc:phoenix:node001:2181>

      😘👌💕

    145. 相关阅读:
      L2 数据仓库和Hive环境配置
      【Hack The Box】windows练习-- blue
      03 最小CMake项目
      【java学习—十五】创建多线程的两种方式(2)
      中国财政科学研究院党委书记、院长刘尚希一行莅临麒麟信安调研
      已解决ModuleNotFoundError: No module named‘ pip‘(重新安装pip的两种方式)
      MySQL递归查询所有子集
      Endpoint Central自动化软件部署
      Tailscale的子网路由和出口节点
      GUI编程--PyQt5--控件
    146. 原文地址:https://blog.csdn.net/weixin_44949135/article/details/134060188