• pyflink map 字典写入ES


    [root@master pyflink]# cat test.py 
    # -*- coding: utf-8 -*-
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from abc import ABC, abstractmethod
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from pyflink.datastream.state import MapStateDescriptor
    from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
    from pyflink.common.typeinfo import Types, TypeInformation
    from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
    from pyflink.datastream.connectors import DeliveryGuarantee
    from pyflink.common.serialization import SimpleStringSchema
    import json
    import re
    from datetime import datetime
    from elasticsearch import Elasticsearch
    from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

    import re
    import redis


    # 创建 StreamExecutionEnvironment 对象
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # 读取文件,创建 DataStream 对象
    data_stream = env.read_text_file('/root/pyflink/test.txt')
    class MyMapFunction(MapFunction):
       def open(self, runtime_context: RuntimeContext):
           pass
       def close(self):
           pass

       def map(self,line):
          r_value=str(int(line.split(' ')[1]) + 1)
          dict1={}
          dict1['r_value']=r_value
          return dict1

    env.add_jars("file:///root/lib/flink-sql-connector-elasticsearch7-3.0.1-1.16.jar")
    date_str = datetime.now().strftime("%Y-%m-%d")
    es7_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_max_actions(1) \
        .set_emitter(ElasticsearchEmitter.static_index('flink-test2023-06-07')) \
        .set_hosts(['127.0.0.1:9200']) \
        .build()
          
    new_stream = data_stream.map(MyMapFunction(),output_type=Types.MAP(Types.STRING(),Types.STRING())).sink_to(es7_sink)
    # 输出到控制台

    # 执行任务
    env.execute('Add "bus_seq" to each line')

  • 相关阅读:
    vue3中ref和reactive的区别
    开源照片管理服务LibrePhotos
    【论文笔记】policy-space response oracles (PSRO)
    ruoyi-vue插件集成websocket
    【Docker】Python Flask + Redis 练习
    QT 网络编程 服务端 客户端 QTcpServer
    Could not load dynamic library ‘cudart64_110.dll‘的解决方法
    java基于微信小程序的英语学习激励系统+ssm+uinapp+Mysql+计算机毕业设计
    xshell开启ssh端口转发,通过公网机器访问内网机器
    递归算法实现二分查找c++
  • 原文地址:https://blog.csdn.net/zhaoyangjian724/article/details/131145930