• pyflink map 字典写入ES


    [root@master pyflink]# cat test.py 
    # -*- coding: utf-8 -*-
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from abc import ABC, abstractmethod
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from pyflink.datastream.state import MapStateDescriptor
    from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
    from pyflink.common.typeinfo import Types, TypeInformation
    from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
    from pyflink.datastream.connectors import DeliveryGuarantee
    from pyflink.common.serialization import SimpleStringSchema
    import json
    import re
    from datetime import datetime
    from elasticsearch import Elasticsearch
    from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

    import re
    import redis


    # 创建 StreamExecutionEnvironment 对象
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # 读取文件,创建 DataStream 对象
    data_stream = env.read_text_file('/root/pyflink/test.txt')
    class MyMapFunction(MapFunction):
       def open(self, runtime_context: RuntimeContext):
           pass
       def close(self):
           pass

       def map(self,line):
          r_value=str(int(line.split(' ')[1]) + 1)
          dict1={}
          dict1['r_value']=r_value
          return dict1

    env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
    date_str = datetime.now().strftime("%Y-%m-%d")
    es7_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_max_actions(1) \
        .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07')) \
        .set_hosts(['127.0.0.1:9200']) \
        .build()
          
    new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
    # 输出到控制台

    # 执行任务
    env.execute('Add "bus_seq" to each line')

  • 相关阅读:
    Debian11面板怎么添加显示桌面的快捷按钮?
    ES索引数据清理脚本示例
    ES6中的箭头函数详细梳理
    shell脚本之grep命令
    mkfifo函数 及 解决Linux下 “mkfifo: no such file or directory”
    ZPLPrinter Emulator SDK v4.0.22.722 Crack
    外包干了6天,技术明显退步。。。
    什么是反向代理,它是如何工作的?
    java ssm基于身份识别的考生考试签到管理系统
    【无标题】
  • 原文地址:https://blog.csdn.net/zhaoyangjian724/article/details/131145930