目录
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
- #批处理环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- #流处理环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
设置并行度:如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认为1
- //设置并行度为8
- env.setParallelism(8);
返回本地执行环境,需要在调用时指定默认的并行度
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("IP",端口号,jar包路径)
-
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import java.util.Arrays;
-
- /**
- * @author : Ashiamd email: ashiamd@foxmail.com
- * @date : 2021/1/31 5:13 PM
- * 测试Flink从集合中获取数据
- */
- public class SourceTest1_Collection {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 设置env并行度1,使得整个任务抢占同一个线程执行
- env.setParallelism(1);
-
- // Source: 从集合Collection中获取数据
- DataStream
dataStream = env.fromCollection( - Arrays.asList(
- new SensorReading("sensor_1", 1547718199L, 35.8),
- new SensorReading("sensor_6", 1547718201L, 15.4),
- new SensorReading("sensor_7", 1547718202L, 6.7),
- new SensorReading("sensor_10", 1547718205L, 38.1)
- )
- );
-
- DataStream
intStream = env.fromElements(1,2,3,4,5,6,7,8,9); -
- // 打印输出
- dataStream.print("SENSOR");
- intStream.print("INT");
-
- // 执行
- env.execute("JobName");
-
- }
-
- }
文件由自己创建一个txt文件
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- /**
- * @author : Ashiamd email: ashiamd@foxmail.com
- * @date : 2021/1/31 5:26 PM
- * Flink从文件中获取数据
- */
- public class SourceTest2_File {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 使得任务抢占同一个线程
- env.setParallelism(1);
-
- // 从文件中获取数据输出
- DataStream
dataStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); -
- dataStream.print();
-
- env.execute();
- }
- }
-
-
junit -
junit -
4.11 -
test -
-
-
org.apache.flink -
flink-java -
1.10.1 -
-
-
org.apache.flink -
flink-streaming-java_2.12 -
1.10.1 -
-
-
org.apache.flink -
flink-clients_2.12 -
1.10.1 -
-
-
-
org.apache.flink -
flink-connector-kafka_2.11 -
1.12.1 -
-
-
启动Zookeeper
./bin/zookeeper-server-start.sh [config/zookeeper.properties]
启动KafKa服务
./bin/kafka-server-start.sh -daemon ./config/server.properties
启动KafKa生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sensor
- import org.apache.flink.api.common.serialization.SimpleStringSchema;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
- import java.util.Properties;
-
- /**
- * @author : Ashiamd email: ashiamd@foxmail.com
- * @date : 2021/1/31 5:44 PM
- */
- public class SourceTest3_Kafka {
-
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 设置并行度1
- env.setParallelism(1);
-
- Properties properties = new Properties();
- //监听的kafka端口
- properties.setProperty("bootstrap.servers", "localhost:9092");
- // 下面这些次要参数
- properties.setProperty("group.id", "consumer-group");
- properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.setProperty("auto.offset.reset", "latest");
-
- // flink添加外部数据源
- DataStream
dataStream = env.addSource(new FlinkKafkaConsumer("sensor", new SimpleStringSchema(),properties)); -
- // 打印输出
- dataStream.print();
-
- env.execute();
- }
- }