一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文简单介绍了通过java api操作数据库、表,分别提供了具体可运行的例子。
本文依赖flink和hive、hadoop集群能正常使用。
本文分为2个部分,即数据库操作、表操作。
本文示例java api的实现是通过Flink 1.13.5版本做的示例,SQL 如果没有特别说明则是Flink 1.17版本。
下文列出了一般的数据库操作,示例是以jdbccatalog为示例,flink的版本是1.17.0。
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
// drop database
catalog.dropDatabase("mydb", false);
// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
// get databse
catalog.getDatabase("mydb");
// check if a database exist
catalog.databaseExists("mydb");
// list databases in a catalog
catalog.listDatabases("mycatalog");
<properties>
<encoding>UTF-8encoding>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<maven.compiler.source>1.8maven.compiler.source>
<maven.compiler.target>1.8maven.compiler.target>
<java.version>1.8java.version>
<scala.version>2.12scala.version>
<flink.version>1.17.0flink.version>
properties>
<dependencies>
<dependency>
<groupId>jdk.toolsgroupId>
<artifactId>jdk.toolsartifactId>
<version>1.8version>
<scope>systemscope>
<systemPath>${JAVA_HOME}/lib/tools.jarsystemPath>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clientsartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-javaartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-scala-bridge_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridgeartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.12artifactId>
<version>${flink.version}version>
<scope>testscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbcartifactId>
<version>3.1.0-1.17version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.38version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.12artifactId>
<version>${flink.version}version>
<scope>testscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-loaderartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-runtimeartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
dependencies>
import java.util.List;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
/**
* @author alanchan
*
*/
public class TestJdbcCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
*/
public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// public JdbcCatalog(
// String catalogName,
// String defaultDatabase,
// String username,
// String pwd,
// String baseUrl)
// CREATE CATALOG alan_catalog WITH(
// 'type' = 'jdbc',
// 'default-database' = 'test?useSSL=false',
// 'username' = 'root',
// 'password' = 'root',
// 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
// );
Catalog catalog = new JdbcCatalog("alan_catalog", "test?useSSL=false", "root", "123456", "jdbc:mysql://192.168.10.44:3306");
// Register the catalog
tenv.registerCatalog("alan_catalog", catalog);
List<String> tables = catalog.listTables("test");
// System.out.println("test tables:" + tables
for (String table : tables) {
System.out.println("Database:test tables:"+table);
}
}
}
Database:test tables:allowinsert
Database:test tables:author
Database:test tables:batch_job_execution
Database:test tables:batch_job_execution_context
Database:test tables:batch_job_execution_params
Database:test tables:batch_job_execution_seq
Database:test tables:batch_job_instance
Database:test tables:batch_job_seq
Database:test tables:batch_step_execution
Database:test tables:batch_step_execution_context
Database:test tables:batch_step_execution_seq
Database:test tables:book
Database:test tables:customertest
Database:test tables:datax_user
Database:test tables:dm_sales
Database:test tables:dms_attach_t
Database:test tables:dx_user
Database:test tables:dx_user_copy
Database:test tables:employee
Database:test tables:hibernate_sequence
Database:test tables:permissions
Database:test tables:person
Database:test tables:personinfo
Database:test tables:role
Database:test tables:studenttotalscore
Database:test tables:t_consume
Database:test tables:t_czmx_n
Database:test tables:t_kafka_flink_user
Database:test tables:t_merchants
Database:test tables:t_recharge
Database:test tables:t_user
Database:test tables:t_withdrawal
Database:test tables:updateonly
Database:test tables:user
本示例需要在有hadoop和hive环境执行,本示例是打包执行jar文件。
关于flink与hive的集成请参考:42、Flink 的table api与sql之Hive Catalog
<properties>
<encoding>UTF-8encoding>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<maven.compiler.source>1.8maven.compiler.source>
<maven.compiler.target>1.8maven.compiler.target>
<java.version>1.8java.version>
<scala.version>2.12scala.version>
<flink.version>1.13.6flink.version>
properties>
<dependencies>
<dependency>
<groupId>jdk.toolsgroupId>
<artifactId>jdk.toolsartifactId>
<version>1.8version>
<scope>systemscope>
<systemPath>${JAVA_HOME}/lib/tools.jarsystemPath>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-scala-bridge_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-sql-connector-kafka_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hive_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-metastoreartifactId>
<version>2.1.0version>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-execartifactId>
<version>3.1.2version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-shaded-hadoop-2-uberartifactId>
<version>2.7.5-10.0version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.38version>
dependency>
<dependency>
<groupId>io.vertxgroupId>
<artifactId>vertx-coreartifactId>
<version>3.9.0version>
dependency>
<dependency>
<groupId>io.vertxgroupId>
<artifactId>vertx-jdbc-clientartifactId>
<version>3.9.0version>
dependency>
<dependency>
<groupId>io.vertxgroupId>
<artifactId>vertx-redis-clientartifactId>
<version>3.9.0version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.7version>
<scope>runtimescope>
dependency>
<dependency>
<groupId>log4jgroupId>
<artifactId>log4jartifactId>
<version>1.2.17version>
<scope>runtimescope>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.44version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.2version>
<scope>providedscope>
dependency>
dependencies>
<build>
<sourceDirectory>src/main/javasourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.5.1version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-surefire-pluginartifactId>
<version>2.18.1version>
<configuration>
<useFile>falseuseFile>
<disableXmlReport>truedisableXmlReport>
<includes>
<include>**/*Test.*include>
<include>**/*Suite.*include>
includes>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>2.3version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass> org.table_sql.TestHiveCatalogDemomainClass>
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
*/
public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 数据库名称
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注册的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase); // tables should contain "test"
// System.out.println("test tables:" + tables
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
}
}
################hive查询结果##################
0: jdbc:hive2://server4:10000> use testhive;
No rows affected (0.021 seconds)
0: jdbc:hive2://server4:10000> show tables;
+-----------------------+
| tab_name |
+-----------------------+
| apachelog |
| col2row1 |
| col2row2 |
| cookie_info |
| dual |
| dw_zipper |
| emp |
| employee |
| employee_address |
| employee_connection |
| ods_zipper_update |
| row2col1 |
| row2col2 |
| singer |
| singer2 |
| student |
| student_dept |
| student_from_insert |
| student_hdfs |
| student_hdfs_p |
| student_info |
| student_local |
| student_partition |
| t_all_hero_part_msck |
| t_usa_covid19 |
| t_usa_covid19_p |
| tab1 |
| tb_dept01 |
| tb_dept_bucket |
| tb_emp |
| tb_emp01 |
| tb_emp_bucket |
| tb_json_test1 |
| tb_json_test2 |
| tb_login |
| tb_login_tmp |
| tb_money |
| tb_money_mtn |
| tb_url |
| the_nba_championship |
| tmp_1 |
| tmp_zipper |
| user_dept |
| user_dept_sex |
| users |
| users_bucket_sort |
| website_pv_info |
| website_url_info |
+-----------------------+
48 rows selected (0.027 seconds)
################flink查询结果##################
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
Database:testhive tables:student
Database:testhive tables:user_dept
Database:testhive tables:user_dept_sex
Database:testhive tables:t_all_hero_part_msck
Database:testhive tables:student_local
Database:testhive tables:student_hdfs
Database:testhive tables:student_hdfs_p
Database:testhive tables:tab1
Database:testhive tables:student_from_insert
Database:testhive tables:student_info
Database:testhive tables:student_dept
Database:testhive tables:student_partition
Database:testhive tables:emp
Database:testhive tables:t_usa_covid19
Database:testhive tables:t_usa_covid19_p
Database:testhive tables:employee
Database:testhive tables:employee_address
Database:testhive tables:employee_connection
Database:testhive tables:dual
Database:testhive tables:the_nba_championship
Database:testhive tables:tmp_1
Database:testhive tables:cookie_info
Database:testhive tables:website_pv_info
Database:testhive tables:website_url_info
Database:testhive tables:users
Database:testhive tables:users_bucket_sort
Database:testhive tables:singer
Database:testhive tables:apachelog
Database:testhive tables:singer2
Database:testhive tables:tb_url
Database:testhive tables:row2col1
Database:testhive tables:row2col2
Database:testhive tables:col2row1
Database:testhive tables:col2row2
Database:testhive tables:tb_json_test1
Database:testhive tables:tb_json_test2
Database:testhive tables:tb_login
Database:testhive tables:tb_login_tmp
Database:testhive tables:tb_money
Database:testhive tables:tb_money_mtn
Database:testhive tables:tb_emp
Database:testhive tables:dw_zipper
Database:testhive tables:ods_zipper_update
Database:testhive tables:tmp_zipper
Database:testhive tables:tb_emp01
Database:testhive tables:tb_emp_bucket
Database:testhive tables:tb_dept01
Database:testhive tables:tb_dept_bucket
本示例着重在于演示如何创建database,其如何构造函数来创建database。
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
* @throws DatabaseAlreadyExistException
*/
public static void main(String[] args) throws CatalogException, DatabaseNotExistException, DatabaseAlreadyExistException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 数据库名称
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注册的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
// public CatalogDatabaseImpl(Map properties, @Nullable String comment) {
// this.properties = checkNotNull(properties, "properties cannot be null");
// this.comment = comment;
// }
Map<String, String> properties = new HashMap();
CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
hiveCatalog.createDatabase(newDatabaseName, cd, true);
List<String> newTables = hiveCatalog.listTables(newDatabaseName);
for (String table : newTables) {
System.out.println("Database:alan_hivecatalog_hivedb tables:" + table);
}
}
}
################## hive查询结果 ############################
#####提交flink创建database前查询结果
0: jdbc:hive2://server4:10000> show databases;
+----------------+
| database_name |
+----------------+
| default |
| test |
| testhive |
+----------------+
3 rows selected (0.03 seconds)
#####提交flink创建database后查询结果
0: jdbc:hive2://server4:10000> show databases;
+--------------------------+
| database_name |
+--------------------------+
| alan_hivecatalog_hivedb |
| default |
| test |
| testhive |
+--------------------------+
4 rows selected (0.023 seconds)
################## flink 查询结果 ############################
#### 由于只创建了database,其下是没有表的,故没有输出。至于testhive库下的表输出详见示例2,不再赘述。
表操作就是指hivecatalog的操作,因为jdbccatalog不能对库、表进行操作,当然查询类是可以的。故以下示例都是以hivecatalog进行说明。本处与24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1的第三部分相似,具体参考其示例即可。不再赘述。
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
// get table
catalog.getTable("mytable");
// check if a table exist or not
catalog.tableExists("mytable");
// list tables in a database
catalog.listTables("mydb");
本文简单介绍了通过java api操作数据库、表,分别提供了具体可运行的例子。