Flink作为一个非常优秀的大数据实时计算框架,在很多从事大数据开发的公司都是必备的技能,接下来我将通过Flink以批处理来实现入门案例WordCount
idea新建设maven项目,并且自己配置好maven环境
在pom文件中加入下面的依赖和配置
-
-
UTF-8 -
-
1.13.0 -
-
1.8 -
1.7.30 -
-
2.12 -
${java.version} -
${java.version} -
-
-
-
org.apache.flink -
flink-java -
${flink.version} -
-
-
org.apache.flink -
flink-streaming-java_${scala.binary.version} -
${flink.version} -
-
-
org.apache.flink -
flink-clients_2.11 -
${flink.version} -
-
-
org.slf4j -
slf4j-log4j12 -
${slf4j.version} -
-
-
org.slf4j -
slf4j-api -
${slf4j.version} -
-
-
org.apache.logging.log4j -
log4j-to-slf4j -
2.14.0 -
-
-
-
-
-
-
org.codehaus.mojo -
exec-maven-plugin -
1.6.0 -
-
-
-
java -
-
-
-
-
test -
-
-
-
org.apache.maven.plugins -
maven-compiler-plugin -
-
-
8 -
-
-
-
配置 log4j,在resources下面建一个文件log4j.properties,里面内容如下
- # Output pattern : date [thread] priority category - message
- log4j.rootLogger=WARN,CONSOLE,RollingFile
-
- #CONSOLE
- log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
- log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
- log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
-
- #RollingFile
- log4j.appender.RollingFile=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.RollingFile.File=logs/signserver.log
- log4j.appender.RollingFile.layout=org.apache.log4j.PatternLayout
- log4j.appender.RollingFile.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
-
- #Project default level
- log4j.logger.com.ntko.sign=debug
新建一个input目录,并且新建words.txt文件,文件中输入如下内容
hello world hello flink hello java
新建一个BatchWordCount类,里面代码如下
- public class BatchWordCount {
- public static void main(String[] args) throws Exception {
- //1:创建执行环境
- ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
- //2:从文件读取数据
- DataSource
dataSource = executionEnvironment.readTextFile("input/words.txt"); - //将每行数据进行分词,转换成二元组类型
- FlatMapOperator
> returns = dataSource. - flatMap((String line, Collector
> out) -> { - //将一行文本进行分词
- String[] words = line.split(" ");
- //将每个单词转换成二元组输出
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.LONG));
- //按照word进行分组
- UnsortedGrouping
> tuple2UnsortedGrouping = returns.groupBy(0); - //分组内进行聚合统计
- AggregateOperator
> sum = tuple2UnsortedGrouping.sum(1); - sum.print();
- }
- }
输出结果 可以看到flink单词出现了1次,world出现了1次,hello出现了三次,java出现了一次