• Flink学习:Flink支持的数据类型


    Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo等。

    一、原生数据类型

    1、BasicTypeInfo

    //创建Int类型的数据集
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val IntStream:DataStream[Int] = env.fromElements(4,51,2,7)
    //创建String类型的数据集
    val StringStream:DataStream[String] = env.fromElements("hello","flink")
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、BasicArrayTypeInfo

    //通过从数组中创建数据集
    val ArrayStream:DataStream[Int] = env.fromCollection(Array(8,5,2,31))
    //通过List集合创建数据集
    val ListStream:DataStream[Int] = env.fromCollection(List(6,23,63,9))
    
    • 1
    • 2
    • 3
    • 4

    二、Java Tuples类型

    //通过new Tuple2创建元组数据集
    val TupleStream:DataStream[Tuple2[String,Int]] = env.fromElements(new Tuple2("唐太宗",1),new Tuple2("汉武帝",2))
    
    • 1
    • 2

    三、Scala样例类

    Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持通过字段名称和位置索引获取指标,不支持存储空值

    //scala样例类
    case class bing(id:Int,name:String)
    object TableTeat {
      def main(args: Array[String]): Unit = {
        //val senv = EnvironmentSettings.newInstance().inStreamingMode().build()
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
        //val tenv = TableEnvironment.create(senv)
        val input = senv.fromElements(bing(1,"成吉思汗"),bing(2,"松赞干布"))
        input.print()
        senv.execute()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    四、POJOs类型

    POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java类和Scala类

    • POJOs类必须是Public修饰且必须独立定义,不能是内部类;
    • POJOs类中必须含有默认空构造器;
    • POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
    • POJOs类中的字段类型必须是Flink支持的;
    public class Person{
    	//字段具有public修饰符
    	public String name;
    	public int age;
    	//具有默认空构造器
    	public Person(){
    	}
    	public Person(String name,int age){
    		this.name = name;
    		this.age = age;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    class Person(var name:String,var age:Int){
    	def this(){
    		this(null,-1)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义好后,就可以在Flink环境中使用

    val personStream = env.fromElements(new Person("刘病己",14),new Person("刘秀",25))
    personStream.keyBy("name")
    
    • 1
    • 2

    五、特殊数据类型

    val mapStream = env.fromElements(Map("name" -> "朱元璋","age" -> "18"),Map("name"-> "朱棣","age" -> "24"))
    
    • 1
  • 相关阅读:
    HDU1228 A + B
    《设计模式巩固学习》
    java项目线上cpu过高如何排查
    WordPress实时搜索插件Ajax Search Lite,轻松替代默认搜索功能
    安卓大作业 图书管理APP
    接口自动化框架搭建(九):接入钉钉消息通知
    软件工程:波斯特尔定律,输入输出的平衡之道
    联邦学习fate框架入门
    微信小程序毕业设计_论文校园活动报名管理系统+后台管理_项目源代码
    山东大学人工智能导论实验二 前向传播和反向传播
  • 原文地址:https://blog.csdn.net/nzbing/article/details/127835197