一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文简单介绍了通过java api操作视图,提供了三个示例,即sql实现和java api的两种实现方式。
本文依赖flink和hive、hadoop集群能正常使用。
本文示例java api的实现是通过Flink 1.13.5版本做的示例,SQL 如果没有特别说明则是Flink 1.17版本。
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);
// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);
// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);
// get view
catalog.getTable("myview");
// check if a view exist or not
catalog.tableExists("mytable");
// list views in a database
catalog.listViews("mydb");
properties>
<encoding>UTF-8encoding>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<maven.compiler.source>1.8maven.compiler.source>
<maven.compiler.target>1.8maven.compiler.target>
<java.version>1.8java.version>
<scala.version>2.12scala.version>
<flink.version>1.13.6flink.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-scala-bridge_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-sql-connector-kafka_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hive_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-metastoreartifactId>
<version>2.1.0version>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-execartifactId>
<version>3.1.2version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-shaded-hadoop-2-uberartifactId>
<version>2.7.5-10.0version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.38version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.7version>
<scope>runtimescope>
dependency>
<dependency>
<groupId>log4jgroupId>
<artifactId>log4jartifactId>
<version>1.2.17version>
<scope>runtimescope>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.44version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.2version>
dependency>
dependencies>
<build>
<sourceDirectory>src/main/javasourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.5.1version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-surefire-pluginartifactId>
<version>2.18.1version>
<configuration>
<useFile>falseuseFile>
<disableXmlReport>truedisableXmlReport>
<includes>
<include>**/*Test.*include>
<include>**/*Suite.*include>
includes>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>2.3version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass> org.table_sql.TestHiveViewBySQLDemomainClass>
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
package org.table_sql;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
/**
* @author alanchan
*
*/
public class TestHiveViewBySQLDemo {
public static final String tableName = "viewtest";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String name = "alan_hive";
String defaultDatabase = "default";
String databaseName = "viewtest_db";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog(name, hiveCatalog);
tenv.useCatalog(name);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
// 创建第一个视图viewName_byTable
String selectSQL = "select * from " + tableName;
String viewName_byTable = "test_view_table_V";
String createViewSQL = "create view " + viewName_byTable + " as " + selectSQL;
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);
// tenv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
tenv.executeSql(createViewSQL);
tenv.listViews();
CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath(databaseName, viewName_byTable));
List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + viewName_byTable).collect());
for (Row row : results) {
System.out.println("test_view_table_V: " + row.toString());
}
// 创建第二个视图
String viewName_byView = "test_view_view_V";
tenv.executeSql("create view " + viewName_byView + " (v2_id,v2_name,v2_age) comment 'test_view_view_V comment' as select * from " + viewName_byTable);
catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath(databaseName, viewName_byView));
results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + viewName_byView).collect());
System.out.println("test_view_view_V comment : " + catalogView.getComment());
for (Row row : results) {
System.out.println("test_view_view_V : " + row.toString());
}
tenv.executeSql("drop database " + databaseName + " cascade");
}
}
前提是flink的集群可用。使用maven打包成jar。
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.2-SNAPSHOT.jar
Hive Session ID = ed6d5c9b-e00f-4881-840d-24c72aba6db7
Hive Session ID = 14445dc8-1f08-4f0f-bb45-aba8c6f52174
Job has been submitted with JobID bff7b59367bd5de6e778b442c4cc4404
Hive Session ID = 4c16f4fc-4c10-4353-b322-e6633e3ebe3d
Hive Session ID = 57949f09-bdcb-497f-a85c-ed9766fc4ce3
2023-10-13 02:42:24,891 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
Job has been submitted with JobID 80e48bb76e3d580412fdcdc434a8a979
test_view_table_V: +I[1, alan, 18]
Hive Session ID = a73d5b93-2129-4159-ad5e-0814df77e987
Hive Session ID = e4ae1a79-4d5e-4835-81de-ebc2041eedf9
2023-10-13 02:42:33,648 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 1
Job has been submitted with JobID c228d9ce3bdce91dc68bff75d14db1e5
test_view_view_V comment : test_view_view_V comment
test_view_view_V : +I[1, alan, 18]
Hive Session ID = e4a38393-d760-4bd3-8d8b-864cbe0daba7
通过api创建视图相对比较麻烦,且存在版本更新的过期方法情况。
通过TableSchema和CatalogViewImpl创建视图则已过期,当前推荐使用通过CatalogView和ResolvedSchema来创建视图。
另外需要注意的是下面两个参数的区别
String originalQuery,原始的sql
String expandedQuery,带有数据库名称的表,甚至包含hivecatalog
例如:如果使用default作为默认的数据库,查询语句为select * from test1,则
originalQuery = ”select name,value from test1“即可,
expandedQuery = “selecttest1.name
, test1.value
from default.test1
”
修改、删除视图等操作比较简单,不再赘述。
此处使用的依赖与上示例一致,mainclass变成本示例的类,不再赘述。
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;
/**
* @author alanchan
*
*/
public class TestHiveViewByAPIDemo {
public static final String tableName = "viewtest";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
System.setProperty("HADOOP_USER_NAME", "alanchan");
String moduleName = "myhive";
String hiveVersion = "3.1.2";
tenv.loadModule(moduleName, new HiveModule(hiveVersion));
String catalogName = "alan_hive";
String defaultDatabase = "default";
String databaseName = "viewtest_db";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
tenv.registerCatalog(catalogName, hiveCatalog);
tenv.useCatalog(catalogName);
tenv.listDatabases();
hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
}, true);
// tenv.executeSql("create database "+databaseName);
tenv.useDatabase(databaseName);
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
tenv.executeSql(hive_create_table_sql);
String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
String insertSQL2 = "insert into " + tableName + " values (2,'alan2',19)";
String insertSQL3 = "insert into " + tableName + " values (3,'alan3',20)";
tenv.executeSql(insertSQL);
tenv.executeSql(insertSQL2);
tenv.executeSql(insertSQL3);
tenv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String viewName1 = "test_view_table_V";
String viewName2 = "test_view_table_V2";
ObjectPath path1= new ObjectPath(databaseName, viewName1);
//ObjectPath.fromString("viewtest_db.test_view_table_V2")
ObjectPath path2= new ObjectPath(databaseName, viewName2);
String originalQuery = "SELECT id, name, age FROM "+tableName+" WHERE id >=1 ";
// String originalQuery = String.format("select * from %s",tableName+" WHERE id >=1 ");
System.out.println("originalQuery:"+originalQuery);
String expandedQuery = "SELECT id, name, age FROM "+databaseName+"."+tableName+" WHERE id >=1 ";
// String expandedQuery = String.format("select * from %s.%s", catalogName, path1.getFullName());
System.out.println("expandedQuery:"+expandedQuery);
String comment = "this is a comment";
// 创建视图,第一种方式(通过TableSchema和CatalogViewImpl),已声明过期
createView1(originalQuery,expandedQuery,comment,hiveCatalog,path1);
// 查询视图
List<Row> results = CollectionUtil.iteratorToList( tenv.executeSql("select * from " + viewName1).collect());
for (Row row : results) {
System.out.println("test_view_table_V: " + row.toString());
}
// 创建视图,第二种方式(通过Schema和ResolvedSchema)
createView2(originalQuery,expandedQuery,comment,hiveCatalog,path2);
List<Row> results2 = CollectionUtil.iteratorToList( tenv.executeSql("select * from viewtest_db.test_view_table_V2").collect());
for (Row row : results2) {
System.out.println("test_view_table_V2: " + row.toString());
}
tenv.executeSql("drop database " + databaseName + " cascade");
}
static void createView1(String originalQuery,String expandedQuery,String comment,HiveCatalog hiveCatalog,ObjectPath path) throws Exception {
TableSchema viewSchema = new TableSchema(new String[]{"id", "name","age"}, new TypeInformation[]{Types.INT, Types.STRING,Types.INT});
CatalogBaseTable viewTable = new CatalogViewImpl(
originalQuery,
expandedQuery,
viewSchema,
new HashMap(),
comment);
hiveCatalog.createTable(path, viewTable, false);
}
static void createView2(String originalQuery,String expandedQuery,String comment,HiveCatalog hiveCatalog,ObjectPath path) throws Exception {
ResolvedSchema resolvedSchema = new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT()),
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT())),
Collections.emptyList(),
null);
CatalogView origin = CatalogView.of(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
comment,
// String.format("select * from tt"),
// String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
originalQuery,
expandedQuery,
Collections.emptyMap());
CatalogView view = new ResolvedCatalogView(origin, resolvedSchema);
// ObjectPath.fromString("viewtest_db.test_view_table_V2")
hiveCatalog.createTable(path, view, false);
}
}
[alanchan@server2 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.3-SNAPSHOT.jar
Hive Session ID = ab4d159a-b2d3-489e-988f-eebdc43d9517
Hive Session ID = 391de19c-5d5a-4a83-a88c-c43cca71fc63
Job has been submitted with JobID a880510032165523f3f2a559c5ab4ec9
Hive Session ID = cb063c31-eaf2-44e3-8fc0-9e8d2a6a3a5d
Job has been submitted with JobID cb05286c404b561306f8eb3969c3456a
Hive Session ID = 8132b36e-c9e2-41a2-8f42-3fe842e0991f
Job has been submitted with JobID 264aef7da1b17598bda159d946827dea
Hive Session ID = 7657be14-8188-4362-84a9-4c84c596021b
2023-10-16 07:21:19,073 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 3
Job has been submitted with JobID 05c2bb7265b0430cb12e00237f18444b
test_view_table_V: +I[1, alan, 18]
test_view_table_V: +I[2, alan2, 19]
test_view_table_V: +I[3, alan3, 20]
Hive Session ID = 7bb01c0d-03c9-413a-9040-c89676cec3b9
2023-10-16 07:21:27,512 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 3
Job has been submitted with JobID 79130d1fe56d88a784980d16e7f1cfb4
test_view_table_V2: +I[1, alan, 18]
test_view_table_V2: +I[2, alan2, 19]
test_view_table_V2: +I[3, alan3, 20]
Hive Session ID = 6d44ea95-f733-4c56-8da4-e2687a4bf945
本文简单介绍了通过java api操作视图,提供了三个示例,即sql实现和java api的两种实现方式。