• k8s 搭建基于session模式的flink集群


    1.flink集群搭建

    不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

    flink-configuration-configmap.yaml

    1. apiVersion: v1
    2. kind: ConfigMap
    3. metadata:
    4. name: flink-config
    5. labels:
    6. app: flink
    7. data:
    8. flink-conf.yaml: |+
    9. jobmanager.rpc.address: flink-jobmanager
    10. taskmanager.numberOfTaskSlots: 2
    11. blob.server.port: 6124
    12. jobmanager.rpc.port: 6123
    13. taskmanager.rpc.port: 6122
    14. jobmanager.memory.process.size: 1600m
    15. taskmanager.memory.process.size: 1728m
    16. parallelism.default: 2
    17. log4j-console.properties: |+
    18. # This affects logging for both user code and Flink
    19. rootLogger.level = INFO
    20. rootLogger.appenderRef.console.ref = ConsoleAppender
    21. rootLogger.appenderRef.rolling.ref = RollingFileAppender
    22. # Uncomment this if you want to _only_ change Flink's logging
    23. #logger.flink.name = org.apache.flink
    24. #logger.flink.level = INFO
    25. # The following lines keep the log level of common libraries/connectors on
    26. # log level INFO. The root logger does not override this. You have to manually
    27. # change the log levels here.
    28. logger.pekko.name = org.apache.pekko
    29. logger.pekko.level = INFO
    30. logger.kafka.name= org.apache.kafka
    31. logger.kafka.level = INFO
    32. logger.hadoop.name = org.apache.hadoop
    33. logger.hadoop.level = INFO
    34. logger.zookeeper.name = org.apache.zookeeper
    35. logger.zookeeper.level = INFO
    36. # Log all infos to the console
    37. appender.console.name = ConsoleAppender
    38. appender.console.type = CONSOLE
    39. appender.console.layout.type = PatternLayout
    40. appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    41. # Log all infos in the given rolling file
    42. appender.rolling.name = RollingFileAppender
    43. appender.rolling.type = RollingFile
    44. appender.rolling.append = false
    45. appender.rolling.fileName = ${sys:log.file}
    46. appender.rolling.filePattern = ${sys:log.file}.%i
    47. appender.rolling.layout.type = PatternLayout
    48. appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    49. appender.rolling.policies.type = Policies
    50. appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    51. appender.rolling.policies.size.size=100MB
    52. appender.rolling.strategy.type = DefaultRolloverStrategy
    53. appender.rolling.strategy.max = 10
    54. # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    55. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    56. logger.netty.level = OFF

    jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

    1. apiVersion: v1
    2. kind: Service
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. type: ClusterIP
    7. ports:
    8. - name: rpc
    9. port: 6123
    10. - name: blob-server
    11. port: 6124
    12. - name: webui
    13. port: 8081
    14. selector:
    15. app: flink
    16. component: jobmanager

    Session cluster resource definitions #

    jobmanager-session-deployment-non-ha.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-jobmanager
    5. spec:
    6. replicas: 1
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: jobmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: jobmanager
    16. spec:
    17. containers:
    18. - name: jobmanager
    19. image: apache/flink:latest
    20. args: ["jobmanager"]
    21. ports:
    22. - containerPort: 6123
    23. name: rpc
    24. - containerPort: 6124
    25. name: blob-server
    26. - containerPort: 8081
    27. name: webui
    28. livenessProbe:
    29. tcpSocket:
    30. port: 6123
    31. initialDelaySeconds: 30
    32. periodSeconds: 60
    33. volumeMounts:
    34. - name: flink-config-volume
    35. mountPath: /opt/flink/conf
    36. securityContext:
    37. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    38. volumes:
    39. - name: flink-config-volume
    40. configMap:
    41. name: flink-config
    42. items:
    43. - key: flink-conf.yaml
    44. path: flink-conf.yaml
    45. - key: log4j-console.properties
    46. path: log4j-console.properties

    taskmanager-session-deployment.yaml

    1. apiVersion: apps/v1
    2. kind: Deployment
    3. metadata:
    4. name: flink-taskmanager
    5. spec:
    6. replicas: 2
    7. selector:
    8. matchLabels:
    9. app: flink
    10. component: taskmanager
    11. template:
    12. metadata:
    13. labels:
    14. app: flink
    15. component: taskmanager
    16. spec:
    17. containers:
    18. - name: taskmanager
    19. image: apache/flink:latest
    20. args: ["taskmanager"]
    21. ports:
    22. - containerPort: 6122
    23. name: rpc
    24. livenessProbe:
    25. tcpSocket:
    26. port: 6122
    27. initialDelaySeconds: 30
    28. periodSeconds: 60
    29. volumeMounts:
    30. - name: flink-config-volume
    31. mountPath: /opt/flink/conf/
    32. securityContext:
    33. runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
    34. volumes:
    35. - name: flink-config-volume
    36. configMap:
    37. name: flink-config
    38. items:
    39. - key: flink-conf.yaml
    40. path: flink-conf.yaml
    41. - key: log4j-console.properties
    42. path: log4j-console.properties

     kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

    为flink的ui界面添加nodeport即可外部访问

    2. demo代码测试

    创建一个maven工程,pom.xml引入依赖:

    1. "1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>test-platformartifactId>
    7. <groupId>com.testgroupId>
    8. <version>2.0.0-SNAPSHOTversion>
    9. parent>
    10. <modelVersion>4.0.0modelVersion>
    11. <artifactId>flink-demoartifactId>
    12. <properties>
    13. <maven.compiler.source>11maven.compiler.source>
    14. <maven.compiler.target>11maven.compiler.target>
    15. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    16. <flink.version>1.17.0flink.version>
    17. <log4j.version>2.20.0log4j.version>
    18. properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.apache.flinkgroupId>
    22. <artifactId>flink-streaming-javaartifactId>
    23. <version>${flink.version}version>
    24. dependency>
    25. <dependency>
    26. <groupId>org.apache.flinkgroupId>
    27. <artifactId>flink-clientsartifactId>
    28. <version>${flink.version}version>
    29. dependency>
    30. <dependency>
    31. <groupId>org.apache.logging.log4jgroupId>
    32. <artifactId>log4j-slf4j-implartifactId>
    33. <scope>compilescope>
    34. <version>${log4j.version}version>
    35. dependency>
    36. <dependency>
    37. <groupId>org.apache.logging.log4jgroupId>
    38. <artifactId>log4j-apiartifactId>
    39. <scope>compilescope>
    40. <version>${log4j.version}version>
    41. dependency>
    42. <dependency>
    43. <groupId>org.apache.logging.log4jgroupId>
    44. <artifactId>log4j-coreartifactId>
    45. <scope>compilescope>
    46. <version>${log4j.version}version>
    47. dependency>
    48. dependencies>
    49. project>

    log4j2.xml:

    1. "1.0" encoding="UTF-8"?>
    2. <configuration monitorInterval="5">
    3. <Properties>
    4. <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
    5. <property name="LOG_LEVEL" value="INFO" />
    6. Properties>
    7. <appenders>
    8. <console name="Console" target="SYSTEM_OUT">
    9. <PatternLayout pattern="${LOG_PATTERN}"/>
    10. <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
    11. console>
    12. appenders>
    13. <loggers>
    14. <root level="${LOG_LEVEL}">
    15. <appender-ref ref="Console"/>
    16. root>
    17. loggers>
    18. configuration>

    计数代码:

    1. package com.test.flink;
    2. import org.apache.flink.api.common.functions.FlatMapFunction;
    3. import org.apache.flink.api.java.functions.KeySelector;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    6. import org.apache.flink.streaming.api.datastream.KeyedStream;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.flink.util.Collector;
    10. public class WordCountUnboundStreamDemo {
    11. public static void main(String[] args) throws Exception {
    12. // TODO 1.创建执行环境
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    15. // 3, // 尝试重启的次数
    16. // Time.of(10, TimeUnit.SECONDS) // 间隔
    17. // ));
    18. // TODO 2.读取数据
    19. DataStreamSource lineDS = env.socketTextStream("192.168.0.28", 7777);
    20. // TODO 3.处理数据: 切分、转换、分组、聚合
    21. // TODO 3.1 切分、转换
    22. SingleOutputStreamOperator> wordAndOneDS = lineDS //<输入类型, 输出类型>
    23. .flatMap(new FlatMapFunction>() {
    24. @Override
    25. public void flatMap(String value, Collector> out) throws Exception {
    26. // 按照 空格 切分
    27. String[] words = value.split(" ");
    28. for (String word : words) {
    29. // 转换成 二元组 (word,1)
    30. Tuple2 wordsAndOne = Tuple2.of(word, 1);
    31. // 通过 采集器 向下游发送数据
    32. out.collect(wordsAndOne);
    33. }
    34. }
    35. });
    36. // TODO 3.2 分组
    37. KeyedStream, String> wordAndOneKS = wordAndOneDS.keyBy(
    38. new KeySelector, String>() {
    39. @Override
    40. public String getKey(Tuple2 value) throws Exception {
    41. return value.f0;
    42. }
    43. }
    44. );
    45. // TODO 3.3 聚合
    46. SingleOutputStreamOperator> sumDS = wordAndOneKS.sum(1);
    47. // TODO 4.输出数据
    48. sumDS.print("接收到的数据=======").setParallelism(1);
    49. // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
    50. env.execute(sumDS.getClass().getSimpleName());
    51. }
    52. }

    打成jar包导入flink dashboard:

    在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

    k8s查看读取到的数据

  • 相关阅读:
    项目中的自定义注解
    Maven——分模块开发与设计(重点)
    软件测试工程师是做什么的?这是我的理解...
    给设计小白推荐几款笔记本电脑与工具
    C语言—统计从键盘输入的一行英文句子的字符个数
    Kubernetes: kube-controller-manager 源码分析
    php的html实体和字符之间的转换
    基于Jenkins实现的CI/CD方案
    Docker镜像的打包与加载
    centos7安装部署kvm,照做就行
  • 原文地址:https://blog.csdn.net/Json_Marz/article/details/132688519