• flink 流计算一条一条处理日志


    [root@master pyflink]# cat test.txt 
    aaaaa 111111
    bbbbb 222222
    ccccc 333333
    ddddd 444444
    eeeee 555555
    [root@master pyflink]# cat test.py 
    # -*- coding: utf-8 -*-
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from abc import ABC, abstractmethod
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.functions import  MapFunction, RuntimeContext, KeyedProcessFunction
    from pyflink.datastream.state import MapStateDescriptor
    from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
    from pyflink.common.typeinfo import Types, TypeInformation
    from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter, FlushBackoffType
    from pyflink.datastream.connectors import DeliveryGuarantee
    from pyflink.common.serialization import SimpleStringSchema
    import json
    import re
    from datetime import datetime
    from elasticsearch import Elasticsearch
    from pyflink.datastream.functions import RuntimeContext, FlatMapFunction

    import re
    import redis


    # 创建 StreamExecutionEnvironment 对象
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    # 读取文件,创建 DataStream 对象
    data_stream = env.read_text_file('/root/pyflink/test.txt')
    def my_map_func(value):
        return  int(value.split(' ')[1]) + 1
    new_stream = data_stream.map(my_map_func)
    # 输出到控制台
    new_stream.print()

    # 执行任务
    env.execute('Add "bus_seq" to each line')

     def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \
                -> 'DataStream':
            """
            Applies a Map transformation on a DataStream. The transformation calls a MapFunction for
            each element of the DataStream. Each MapFunction call returns exactly one element.
            
    [root@master pyflink]# python test.py 
    111112
    222223
    333334
    444445
    555556
            

  • 相关阅读:
    代码随想录刷题day14|二叉树的理论基础&二叉树的前中后序递归遍历
    web前端开发技术纯静态 (12306官网 1页)
    Qt源码分析--QObject(4)
    Web服务器的搭建
    大数据团队必备的最佳提效工具推荐
    Sparse R-CNN细节剖析
    【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 字符串筛选排序(100分) - 三语言AC题解(Python/Java/Cpp)
    ubuntu安装最新版本的go基于官网二进制
    Python中Collections模块namedtuple用法
    Java设计模式之状态模式
  • 原文地址:https://blog.csdn.net/zhaoyangjian724/article/details/131142491