• Flink实现kafka到kafka、kafka到doris的精准一次消费


    1 流程图

    2 Flink来源表建模

    1. --来源-城市topic
    2. CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
    3. record string
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'QL_JC_SSJC',
    7. 'properties.bootstrap.servers' = '172.*.*.*:9092',
    8. 'properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE',
    9. 'scan.startup.mode' = 'group-offsets',
    10. 'properties.isolation.level' = 'read_committed',
    11. 'properties.auto.offset.reset' = 'earliest',
    12. 'format' = 'raw'
    13. );
    14. --来源-中台kafka-topic
    15. CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
    16. sscsdm string,
    17. extract_time TIMESTAMP,
    18. record string
    19. ) WITH (
    20. 'connector' = 'kafka',
    21. 'topic' = 'ODS_QL_JC_SSJC',
    22. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
    23. 'properties.security.protocol' = 'SASL_PLAINTEXT',
    24. 'properties.sasl.kerberos.service.name' = 'kafka',
    25. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    26. 'properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE',
    27. 'scan.startup.mode' = 'group-offsets',
    28. 'properties.auto.offset.reset' = 'earliest',
    29. 'properties.isolation.level' = 'read_committed',
    30. 'sink.semantic' = 'exactly-once',
    31. 'format' = 'json'
    32. );

    3 Flink去向表建模

    1. --去向-中台kafka-topic
    2. CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK (
    3. sscsdm string,
    4. extract_time TIMESTAMP,
    5. record string
    6. ) WITH (
    7. 'connector' = 'kafka',
    8. 'topic' = 'ODS_QL_JC_SSJC',
    9. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
    10. 'properties.security.protocol' = 'SASL_PLAINTEXT',
    11. 'properties.sasl.kerberos.service.name' = 'kafka',
    12. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    13. 'format' = 'json',
    14. 'properties.transaction.timeout.ms' = '900000'
    15. );
    16. --去向-Doris表
    17. CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK (
    18. sscsdm STRING,
    19. extract_time TIMESTAMP,
    20. record STRING
    21. ) WITH (
    22. 'connector' = 'doris',
    23. 'fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030',
    24. 'table.identifier' = 'doris_d.ods_ql_jc_ssjc',
    25. 'username' = 'root',
    26. 'password' = '********',
    27. 'sink.properties.two_phase_commit' = 'true'
    28. );

    4 城市Topic至中台Topic的Flinksql

    1. insert into
    2. KAFKA_ODS_QL_JC_SSJC_SINK
    3. SELECT
    4. '320100' as sscsdm,
    5. CURRENT_TIMESTAMP as extract_time,
    6. record
    7. FROM
    8. NJ_QL_JC_SSJC_SOURCE
    9. UNION ALL
    10. SELECT
    11. '320200' as sscsdm,
    12. CURRENT_TIMESTAMP as extract_time,
    13. record
    14. FROM
    15. WX_QL_JC_SSJC_SOURCE
    16. .
    17. .
    18. .
    19. UNION ALL
    20. SELECT
    21. '320583' as sscsdm,
    22. CURRENT_TIMESTAMP as extract_time,
    23. record
    24. FROM
    25. KS_QL_JC_SSJC_SOURCE

    5 中台Topic至Doris的Flinksql

    1. insert into DORIS_ODS_QL_JC_SSJC_SINK
    2. SELECT
    3. sscsdm,
    4. CURRENT_TIMESTAMP as extract_time,
    5. record
    6. FROM
    7. ODS_QL_JC_SSJC_SOURCE

  • 相关阅读:
    Java 开发必知的规范文档
    关于生命周期的面试题vue
    超级简单学会:盐加密&Shiro认证
    word2vec发展过程
    stm32——hal库学习笔记(ADC)
    Pytorch GPU模型推理时间探讨
    Leetcode1-两数之和详解
    贪心算法(活动安排问题)
    bash: cmake: command not found...+++++++lsb_release: command not found
    详解synchronized关键字及锁的基本概念
  • 原文地址:https://blog.csdn.net/weixin_42258633/article/details/133710867