目录
- package com.atguigu.edu.realtime.app.func;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import com.alibaba.druid.pool.DruidPooledConnection;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.bean.DimTableProcess;
- import com.atguigu.edu.realtime.common.EduConfig;
- import com.atguigu.edu.realtime.util.DruidDSUtil;
- import com.atguigu.edu.realtime.util.PhoenixUtil;
- import org.apache.flink.api.common.state.BroadcastState;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.sql.*;
- import java.util.*;
-
- public class DimBroadcastProcessFunction extends BroadcastProcessFunction
{ - private MapStateDescriptor
tableProcessState; -
- // 初始化配置表数据
- private HashMap
configMap = new HashMap<>(); -
- public DimBroadcastProcessFunction(MapStateDescriptor
tableProcessState) { - this.tableProcessState = tableProcessState;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
- "user=root&password=123456&useUnicode=true&" +
- "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
- );
-
- PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
- ResultSet resultSet = preparedStatement.executeQuery();
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- JSONObject jsonObject = new JSONObject();
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- String columnName = metaData.getColumnName(i);
- String columnValue = resultSet.getString(i);
- jsonObject.put(columnName, columnValue);
- }
- DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
- configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
- }
- resultSet.close();
- preparedStatement.close();
- connection.close();
- }
-
- /**
- * @param value flinkCDC直接输入的json
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processBroadcastElement(String value, Context ctx, Collector
out) throws Exception { - //TODO 1 获取配置表数据解析格式
- JSONObject jsonObject = JSON.parseObject(value);
- String type = jsonObject.getString("op");
- BroadcastState
tableConfigState = ctx.getBroadcastState(tableProcessState); - if ("d".equals(type)) {
- // 从状态中删除对应的表格
- DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
- tableConfigState.remove(before.getSourceTable());
- // 从configMap中删除对应的表格
- configMap.remove(before.getSourceTable());
- } else {
- DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
- //TODO 3 将数据写入到状态 广播出去
- tableConfigState.put(after.getSourceTable(), after);
- //TODO 2 检查phoenix中是否存在表 不存在创建
- String sinkTable = after.getSinkTable();
- String sinkColumns = after.getSinkColumns();
- String sinkPk = after.getSinkPk();
- String sinkExtend = after.getSinkExtend();
- checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
- }
- }
-
- /**
- * @param value kafka中maxwell生成的json数据
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processElement(JSONObject value, ReadOnlyContext ctx, Collector
out) throws Exception { - //TODO 1 获取广播的配置数据
-
- //TODO 2 过滤出需要的维度字段
-
- //TODO 3 补充输出字段
- }
- }
8.3.2 根据MySQL的配置表,动态进行分流
6)创建连接池工具类
- package com.atguigu.edu.realtime.app.func;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import com.alibaba.druid.pool.DruidPooledConnection;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.bean.DimTableProcess;
- import com.atguigu.edu.realtime.common.EduConfig;
- import com.atguigu.edu.realtime.util.DruidDSUtil;
- import com.atguigu.edu.realtime.util.PhoenixUtil;
- import org.apache.flink.api.common.state.BroadcastState;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.sql.*;
- import java.util.*;
-
- public class DimBroadcastProcessFunction extends BroadcastProcessFunction
{ - private MapStateDescriptor
tableProcessState; -
- // 初始化配置表数据
- private HashMap
configMap = new HashMap<>(); -
- public DimBroadcastProcessFunction(MapStateDescriptor
tableProcessState) { - this.tableProcessState = tableProcessState;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
- "user=root&password=123456&useUnicode=true&" +
- "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
- );
-
- PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
- ResultSet resultSet = preparedStatement.executeQuery();
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- JSONObject jsonObject = new JSONObject();
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- String columnName = metaData.getColumnName(i);
- String columnValue = resultSet.getString(i);
- jsonObject.put(columnName, columnValue);
- }
- DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
- configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
- }
- resultSet.close();
- preparedStatement.close();
- connection.close();
- }
-
- /**
- * @param value flinkCDC直接输入的json
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processBroadcastElement(String value, Context ctx, Collector
out) throws Exception { - //TODO 1 获取配置表数据解析格式
- JSONObject jsonObject = JSON.parseObject(value);
- String type = jsonObject.getString("op");
- BroadcastState
tableConfigState = ctx.getBroadcastState(tableProcessState); - if ("d".equals(type)) {
- // 从状态中删除对应的表格
- DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
- tableConfigState.remove(before.getSourceTable());
- // 从configMap中删除对应的表格
- configMap.remove(before.getSourceTable());
- } else {
- DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
- //TODO 3 将数据写入到状态 广播出去
- tableConfigState.put(after.getSourceTable(), after);
- //TODO 2 检查phoenix中是否存在表 不存在创建
- String sinkTable = after.getSinkTable();
- String sinkColumns = after.getSinkColumns();
- String sinkPk = after.getSinkPk();
- String sinkExtend = after.getSinkExtend();
- checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
- }
- }
-
- private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
- // create table if not exists table (id string pk, name string...)
- // 拼接建表语句的sql
- StringBuilder sql = new StringBuilder();
- sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
-
- // 判断主键
- // 如果主键为空,默认使用id
- if (sinkPk == null) {
- sinkPk = "";
- }
- if (sinkExtend == null) {
- sinkExtend = "";
- }
-
- // 遍历字段拼接建表语句
- String[] split = sinkColumns.split(",");
- for (int i = 0; i < split.length; i++) {
- sql.append(split[i] + " varchar");
- if (split[i].equals(sinkPk)) {
- sql.append(" primary key");
- }
- if (i < split.length - 1) {
- sql.append(",\n");
- }
- }
- sql.append(") ");
- sql.append(sinkExtend);
-
- PhoenixUtil.executeDDL(sql.toString());
- }
-
- /**
- * @param value kafka中maxwell生成的json数据
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processElement(JSONObject value, ReadOnlyContext ctx, Collector
out) throws Exception { - //TODO 1 获取广播的配置数据
-
- //TODO 2 过滤出需要的维度字段
-
- //TODO 3 补充输出字段
- }
- }
启动hadoop、zookeeper、kafka、hbase。p41
- [atguigu@node001 ~]$ myhadoop.sh start
- ================ 启动 hadoop集群 ================
- ---------------- 启动 hdfs ----------------
- Starting namenodes on [node001]
- Starting datanodes
- Starting secondary namenodes [node003]
- --------------- 启动 yarn ---------------
- Starting resourcemanager
- Starting nodemanagers
- --------------- 启动 historyserver ---------------
- [atguigu@node001 ~]$ zookeeper.sh start
- ---------- zookeeper node001 启动 ----------
- ZooKeeper JMX enabled by default
- Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
- Starting zookeeper ... STARTED
- ---------- zookeeper node002 启动 ----------
- ZooKeeper JMX enabled by default
- Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
- Starting zookeeper ... STARTED
- ---------- zookeeper node003 启动 ----------
- ZooKeeper JMX enabled by default
- Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
- Starting zookeeper ... STARTED
- [atguigu@node001 ~]$
- [atguigu@node001 ~]$
- [atguigu@node001 ~]$ kafka.sh start
- --------------- node001 Kafka 启动 ---------------
- --------------- node002 Kafka 启动 ---------------
- --------------- node003 Kafka 启动 ---------------
- [atguigu@node001 ~]$
- [atguigu@node001 ~]$
- [atguigu@node001 ~]$ start-hbase.sh
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/hbase/hbase-2.0.5/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
- SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
- running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node001.out
- node002: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node002.out
- node003: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node003.out
- node001: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node001.out
- node002: running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node002.out
- [atguigu@node001 ~]$ jpsall
- ================ node001 ================
- 4880 HMaster
- 4615 Kafka
- 4183 QuorumPeerMain
- 3017 DataNode
- 2858 NameNode
- 5083 HRegionServer
- 3676 JobHistoryServer
- 3454 NodeManager
- 5406 Jps
- ================ node002 ================
- 2080 ResourceManager
- 4050 Jps
- 3397 Kafka
- 3574 HRegionServer
- 2966 QuorumPeerMain
- 3719 HMaster
- 1833 DataNode
- 2233 NodeManager
- ================ node003 ================
- 3265 Kafka
- 2833 QuorumPeerMain
- 3634 Jps
- 2067 SecondaryNameNode
- 2245 NodeManager
- 1941 DataNode
- 3430 HRegionServer
- [atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
- [atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ cd bin
- [atguigu@node001 bin]$ pwd
- /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin
- [atguigu@node001 bin]$ ./sqlline.py node001:2181
- Setting property: [incremental, false]
- Setting property: [isolation, TRANSACTION_READ_COMMITTED]
- issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
- Connecting to jdbc:phoenix:node001:2181
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
- 23/10/26 20:07:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- Connected to: Phoenix (version 5.0)
- Driver: PhoenixEmbeddedDriver (version 5.0)
- Autocommit status: true
- Transaction isolation: TRANSACTION_READ_COMMITTED
- Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
- 133/133 (100%) Done
- Done
- sqlline version 1.2.0
- 0: jdbc:phoenix:node001:2181> create schema EDU_REALTIME;
- No rows affected (3.039 seconds)
- 0: jdbc:phoenix:node001:2181> !tables
- +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
- | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_RO |
- +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
- | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false |
- | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false |
- | | SYSTEM | LOG | SYSTEM TABLE | | | | | | true |
- | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false |
- | | SYSTEM | STATS | SYSTEM TABLE | | | | | | false |
- +------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
- 0: jdbc:phoenix:node001:2181> !tables
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE |
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | |
- | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | |
- | | SYSTEM | LOG | SYSTEM TABLE | | | | | |
- | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | |
- | | SYSTEM | STATS | SYSTEM TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_CATEGORY_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_SOURCE | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_SUBJECT_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_CHAPTER_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_COURSE_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_KNOWLEDGE_POINT | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_PAPER | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_PAPER_QUESTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_POINT_QUESTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_QUESTION_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_QUESTION_OPTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_USER_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_VIDEO_INFO | TABLE | | | | | |
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- 0: jdbc:phoenix:node001:2181>
- package com.atguigu.edu.realtime.app.func;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import com.alibaba.druid.pool.DruidPooledConnection;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.bean.DimTableProcess;
- import com.atguigu.edu.realtime.common.EduConfig;
- import com.atguigu.edu.realtime.util.DruidDSUtil;
- import com.atguigu.edu.realtime.util.PhoenixUtil;
- import org.apache.flink.api.common.state.BroadcastState;
- import org.apache.flink.api.common.state.MapStateDescriptor;
- import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
- import org.apache.flink.util.Collector;
-
- import java.sql.*;
- import java.util.*;
-
- public class DimBroadcastProcessFunction extends BroadcastProcessFunction
{ - private MapStateDescriptor
tableProcessState; -
- // 初始化配置表数据
- private HashMap
configMap = new HashMap<>(); -
- public DimBroadcastProcessFunction(MapStateDescriptor
tableProcessState) { - this.tableProcessState = tableProcessState;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
- "user=root&password=123456&useUnicode=true&" +
- "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
- );
-
- PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
- ResultSet resultSet = preparedStatement.executeQuery();
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- JSONObject jsonObject = new JSONObject();
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- String columnName = metaData.getColumnName(i);
- String columnValue = resultSet.getString(i);
- jsonObject.put(columnName, columnValue);
- }
- DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
- configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
- }
- resultSet.close();
- preparedStatement.close();
- connection.close();
- }
-
- /**
- * @param value flinkCDC直接输入的json
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processBroadcastElement(String value, Context ctx, Collector
out) throws Exception { - //TODO 1 获取配置表数据解析格式
- JSONObject jsonObject = JSON.parseObject(value);
- String type = jsonObject.getString("op");
- BroadcastState
tableConfigState = ctx.getBroadcastState(tableProcessState); - if ("d".equals(type)) {
- // 从状态中删除对应的表格
- DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
- tableConfigState.remove(before.getSourceTable());
- // 从configMap中删除对应的表格
- configMap.remove(before.getSourceTable());
- } else {
- DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
- //TODO 3 将数据写入到状态 广播出去
- tableConfigState.put(after.getSourceTable(), after);
- //TODO 2 检查phoenix中是否存在表 不存在创建
- String sinkTable = after.getSinkTable();
- String sinkColumns = after.getSinkColumns();
- String sinkPk = after.getSinkPk();
- String sinkExtend = after.getSinkExtend();
- checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
- }
- }
-
- private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
- // create table if not exists table (id string pk, name string...)
- // 拼接建表语句的sql
- StringBuilder sql = new StringBuilder();
- sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");
-
- // 判断主键
- // 如果主键为空,默认使用id
- if (sinkPk == null) {
- sinkPk = "";
- }
- if (sinkExtend == null) {
- sinkExtend = "";
- }
-
- // 遍历字段拼接建表语句
- String[] split = sinkColumns.split(",");
- for (int i = 0; i < split.length; i++) {
- sql.append(split[i] + " varchar");
- if (split[i].equals(sinkPk)) {
- sql.append(" primary key");
- }
- if (i < split.length - 1) {
- sql.append(",\n");
- }
- }
- sql.append(") ");
- sql.append(sinkExtend);
-
- PhoenixUtil.executeDDL(sql.toString());
- }
-
- /**
- * @param value kafka中maxwell生成的json数据
- * @param ctx
- * @param out
- * @throws Exception
- */
- @Override
- public void processElement(JSONObject value, ReadOnlyContext ctx, Collector
out) throws Exception { - //TODO 1 获取广播的配置数据
- ReadOnlyBroadcastState
tableConfigState = ctx.getBroadcastState(tableProcessState); - DimTableProcess tableProcess = tableConfigState.get(value.getString("table"));
- // 补充情况,防止kafka数据到的过快 造成数据丢失
- if (tableProcess == null) {
- tableProcess = configMap.get(value.getString("table"));
- }
- if (tableProcess != null) {
- String type = value.getString("type");
- if (type == null) {
- System.out.println("maxwell采集的数据不完整...");
- } else {
- JSONObject data = value.getJSONObject("data");
- //TODO 2 过滤出需要的维度字段
- String sinkColumns = tableProcess.getSinkColumns();
- filterColumns(data, sinkColumns);
- //TODO 3 补充输出字段
- data.put("sink_table", tableProcess.getSinkTable());
- // 添加数据的类型
- data.put("type", type);
- out.collect(data);
- }
- }
- }
-
- private void filterColumns(JSONObject data, String sinkColumns) {
- Set
> entries = data.entrySet(); - List
stringList = Arrays.asList(sinkColumns.split(",")); - entries.removeIf(entry -> !stringList.contains(entry.getKey()));
- }
- }
- package com.atguigu.edu.realtime.util;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import com.alibaba.druid.pool.DruidPooledConnection;
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.common.EduConfig;
- import org.apache.commons.beanutils.BeanUtils;
- import org.apache.commons.lang3.StringUtils;
-
- import java.lang.reflect.InvocationTargetException;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.ResultSetMetaData;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
-
- public class PhoenixUtil {
- private static DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource();
-
- public static void executeDDL(String sqlString) {
- DruidPooledConnection connection = null;
- PreparedStatement preparedStatement = null;
- try {
- connection = druidDataSource.getConnection();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("连接池获取连接异常...");
- }
-
- try {
- preparedStatement = connection.prepareStatement(sqlString);
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("编译sql异常...");
- }
-
- try {
- preparedStatement.execute();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("建表语句错误...");
- }
-
- // 关闭资源
- try {
- preparedStatement.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
-
- try {
- connection.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
- }
-
- public static void executeDML(String sinkTable, JSONObject jsonObject) {
- // TODO 2 拼接sql语言
- StringBuilder sql = new StringBuilder();
- Set
> entries = jsonObject.entrySet(); - ArrayList
columns = new ArrayList<>(); - ArrayList
- StringBuilder symbols = new StringBuilder();
- for (Map.Entry
entry : entries) { - columns.add(entry.getKey());
- values.add(entry.getValue());
- symbols.append("?,");
- }
-
- sql.append("upsert into " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(");
-
- // 拼接列名
- String columnsStrings = StringUtils.join(columns, ",");
- String symbolStr = symbols.substring(0, symbols.length() - 1).toString();
- sql.append(columnsStrings)
- .append(")values(")
- .append(symbolStr)
- .append(")");
-
- DruidPooledConnection connection = null;
- try {
- connection = druidDataSource.getConnection();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("连接池获取连接异常...");
- }
-
- PreparedStatement preparedStatement = null;
- try {
- preparedStatement = connection.prepareStatement(sql.toString());
- // 传入参数
- for (int i = 0; i < values.size(); i++) {
- preparedStatement.setObject(i + 1, values.get(i) + "");
- }
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("编译sql异常...");
- }
-
- try {
- preparedStatement.executeUpdate();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- System.out.println("写入phoenix错误...");
- }
-
- try {
- preparedStatement.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
-
- try {
- connection.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
- }
-
- public static
List queryList(String sql, Class clazz) { - ArrayList
resultList = new ArrayList<>(); - DruidPooledConnection connection = null;
- PreparedStatement preparedStatement = null;
- try {
- connection = druidDataSource.getConnection();
- preparedStatement = connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery();
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- T obj = clazz.newInstance();
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- String columnName = metaData.getColumnName(i);
- Object columnValue = resultSet.getObject(i);
- BeanUtils.setProperty(obj, columnName, columnValue);
- }
- resultList.add(obj);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if (preparedStatement != null) {
- try {
- preparedStatement.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
- }
-
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException throwables) {
- throwables.printStackTrace();
- }
- }
- return resultList;
- }
- }
- package com.atguigu.edu.realtime.app.func;
-
- import com.alibaba.fastjson.JSONObject;
- import com.atguigu.edu.realtime.util.DimUtil;
- import com.atguigu.edu.realtime.util.PhoenixUtil;
- import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
- public class DimPhoenixSinkFunc implements SinkFunction
{ - @Override
- public void invoke(JSONObject jsonObject, Context context) throws Exception {
- // TODO 1 获取输出的表名
- String sinkTable = jsonObject.getString("sink_table");
-
- // String type = jsonObject.getString("type");
- // String id = jsonObject.getString("id");
- jsonObject.remove("sink_table");
- // jsonObject.remove("type");
-
- // TODO 2 使用工具类写出数据
- PhoenixUtil.executeDML(sinkTable, jsonObject);
-
- // TODO 3 如果类型为update,删除redis对应缓存
- // if ("update".equals(type)) {
- // DimUtil.deleteCached(sinkTable, id);
- // }
- }
- }
启动hadoop、zookeeper、kafka、hbase、maxwell、apache-phoenix-5.0.0-HBase。
- org.apache.phoenix.schema.ColumnNotFoundException: ERROR 504 (42703): Undefined column. columnName=EDU_REALTIME.DIM_BASE_PROVINCE.TYPE
- at org.apache.phoenix.schema.PTableImpl.getColumnForColumnName(PTableImpl.java:828)
- at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.resolveColumn(FromCompiler.java:477)
- at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:452)
- at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:784)
- at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:770)
- at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:401)
- at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:391)
- at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
- at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:390)
- at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:378)
- at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeUpdate(PhoenixPreparedStatement.java:206)
- at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeUpdate(DruidPooledPreparedStatement.java:255)
- at com.atguigu.edu.realtime.util.PhoenixUtil.executeDML(PhoenixUtil.java:105)
- at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:19)
- at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:7)
- at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
- at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
- at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
- at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
- at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:148)
- at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:21)
- at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)
- at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
- at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
- at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
- at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
- at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
- at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
- at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
- at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
- at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
- at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
- at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
- at java.lang.Thread.run(Thread.java:748)
- 写入phoenix错误...
直接使用最终的项目代码运行,懒得改bug了。。。
- [atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/
- [atguigu@node001 bin]$ ./sqlline.py node001:2181
- Setting property: [incremental, false]
- Setting property: [isolation, TRANSACTION_READ_COMMITTED]
- issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
- Connecting to jdbc:phoenix:node001:2181
- SLF4J: Class path contains multiple SLF4J bindings.
- SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
- SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
- 23/10/31 11:04:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- Connected to: Phoenix (version 5.0)
- Driver: PhoenixEmbeddedDriver (version 5.0)
- Autocommit status: true
- Transaction isolation: TRANSACTION_READ_COMMITTED
- Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
- 246/246 (100%) Done
- Done
- sqlline version 1.2.0
- 0: jdbc:phoenix:node001:2181> !tables
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE |
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- | | SYSTEM | CATALOG | SYSTEM TABLE | | | | | |
- | | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | |
- | | SYSTEM | LOG | SYSTEM TABLE | | | | | |
- | | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | |
- | | SYSTEM | STATS | SYSTEM TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_CATEGORY_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_PROVINCE | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_SOURCE | TABLE | | | | | |
- | | EDU_REALTIME | DIM_BASE_SUBJECT_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_CHAPTER_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_COURSE_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_KNOWLEDGE_POINT | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_PAPER | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_PAPER_QUESTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_POINT_QUESTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_QUESTION_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_TEST_QUESTION_OPTION | TABLE | | | | | |
- | | EDU_REALTIME | DIM_USER_INFO | TABLE | | | | | |
- | | EDU_REALTIME | DIM_VIDEO_INFO | TABLE | | | | | |
- +------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
- 0: jdbc:phoenix:node001:2181> select * from EDU_REALTIME.DIM_BASE_CATEGORY_INFO;
- +-----+----------------+----------------------+--------------+----------+
- | ID | CATEGORY_NAME | CREATE_TIME | UPDATE_TIME | DELETED |
- +-----+----------------+----------------------+--------------+----------+
- | 1 | 编程技术 | 2021-09-24 22:19:37 | null | 0 |
- +-----+----------------+----------------------+--------------+----------+
- 1 row selected (0.333 seconds)
- 0: jdbc:phoenix:node001:2181>
😘👌💕