• Flink实现kafka到kafka、kafka到doris的精准一次消费


    1 流程图

    2 Flink来源表建模

    1. --来源-城市topic
    2. CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
    3. record string
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'QL_JC_SSJC',
    7. 'properties.bootstrap.servers' = '172.*.*.*:9092',
    8. 'properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE',
    9. 'scan.startup.mode' = 'group-offsets',
    10. 'properties.isolation.level' = 'read_committed',
    11. 'properties.auto.offset.reset' = 'earliest',
    12. 'format' = 'raw'
    13. );
    14. --来源-中台kafka-topic
    15. CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
    16. sscsdm string,
    17. extract_time TIMESTAMP,
    18. record string
    19. ) WITH (
    20. 'connector' = 'kafka',
    21. 'topic' = 'ODS_QL_JC_SSJC',
    22. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
    23. 'properties.security.protocol' = 'SASL_PLAINTEXT',
    24. 'properties.sasl.kerberos.service.name' = 'kafka',
    25. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    26. 'properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE',
    27. 'scan.startup.mode' = 'group-offsets',
    28. 'properties.auto.offset.reset' = 'earliest',
    29. 'properties.isolation.level' = 'read_committed',
    30. 'sink.semantic' = 'exactly-once',
    31. 'format' = 'json'
    32. );

    3 Flink去向表建模

    1. --去向-中台kafka-topic
    2. CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK (
    3. sscsdm string,
    4. extract_time TIMESTAMP,
    5. record string
    6. ) WITH (
    7. 'connector' = 'kafka',
    8. 'topic' = 'ODS_QL_JC_SSJC',
    9. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
    10. 'properties.security.protocol' = 'SASL_PLAINTEXT',
    11. 'properties.sasl.kerberos.service.name' = 'kafka',
    12. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    13. 'format' = 'json',
    14. 'properties.transaction.timeout.ms' = '900000'
    15. );
    16. --去向-Doris表
    17. CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK (
    18. sscsdm STRING,
    19. extract_time TIMESTAMP,
    20. record STRING
    21. ) WITH (
    22. 'connector' = 'doris',
    23. 'fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030',
    24. 'table.identifier' = 'doris_d.ods_ql_jc_ssjc',
    25. 'username' = 'root',
    26. 'password' = '********',
    27. 'sink.properties.two_phase_commit' = 'true'
    28. );

    4 城市Topic至中台Topic的Flinksql

    1. insert into
    2. KAFKA_ODS_QL_JC_SSJC_SINK
    3. SELECT
    4. '320100' as sscsdm,
    5. CURRENT_TIMESTAMP as extract_time,
    6. record
    7. FROM
    8. NJ_QL_JC_SSJC_SOURCE
    9. UNION ALL
    10. SELECT
    11. '320200' as sscsdm,
    12. CURRENT_TIMESTAMP as extract_time,
    13. record
    14. FROM
    15. WX_QL_JC_SSJC_SOURCE
    16. .
    17. .
    18. .
    19. UNION ALL
    20. SELECT
    21. '320583' as sscsdm,
    22. CURRENT_TIMESTAMP as extract_time,
    23. record
    24. FROM
    25. KS_QL_JC_SSJC_SOURCE

    5 中台Topic至Doris的Flinksql

    1. insert into DORIS_ODS_QL_JC_SSJC_SINK
    2. SELECT
    3. sscsdm,
    4. CURRENT_TIMESTAMP as extract_time,
    5. record
    6. FROM
    7. ODS_QL_JC_SSJC_SOURCE

  • 相关阅读:
    Linux学习-20-yum介绍,yum源配置
    无名管道与有名管道(FIFO)的应用
    恒运资本:煤炭、石油板块拉升,安泰集团一度涨停,中曼石油等走高
    【Webpack】webpack的基础使用详细总结 下(建议收藏)
    SketchUp Pro 2023 for Mac/Win:重塑设计,引领未来
    【数据结构】单链表
    Games104现代游戏引擎笔记 面向数据编程与任务系统
    nginx反向代理
    Spark Streaming_第七章笔记
    【数据结构】线性表(八)队列:顺序队列及其基本操作(初始化、判空、判满、入队、出队、存取队首元素)
  • 原文地址:https://blog.csdn.net/weixin_42258633/article/details/133710867