本文档内容基于 flink-1.13.x
,其他版本的整理,请查看本人博客的 flink 专栏其他文章。
Apache Flink 以一种独特的方式处理数据类型和序列化,flink 包含自己的类型描述符、泛型类型提取和类型序列化框架。本文档描述这些概念及其背后的基本原理。
flink 对 DataSet 或 DataStream 中的元素类型有一些限制,该限制的原因是系统需要分析类型以决定有效的执行策略。
有七种不同的数据类型类别:
Java
元组是复合类型,他包含固定数量不同类型的属性,java API 提供了从 Tuple1
到 Tuple25
一共 25 个类。元组的每个属性都可以是任意的 flink 类型,包括元组,最后组成嵌套元组。元组的属性可以直接使用属性的名称访问,比如 tuple.f4
,或使用普通的 getter 方法 tuple.getField(int position)
。属性索引从 0 开始。注意这与 scala 的元组不同,但与 Java 的索引更加一致。
DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
new Tuple2<String, Integer>("hello", 1),
new Tuple2<String, Integer>("world", 2));
wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
});
wordCounts.keyBy(value -> value.f0);
Scala
scala 的 case class 以及元组(case class 的一种特殊类型)都是复合类型,包含固定数量不同类型的属性。元组的属性地址为 1-offset ,比如 _1
表示第一个属性。case class 的属性通过他们的名称访问。
case class WordCount(word: String, count: Int)
val input = env.fromElements(
WordCount("hello", 1),
WordCount("world", 2)) // Case Class 数据集
input.keyBy(_.word)
val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 数据集
input2.keyBy(value => (value._1, value._2))
如果满足下面的要求,则 Java 和 Scala 的类将会被 flink 作为特殊的 POJO 数据类型对待:
foo
,getter 方法必须为 getFoo()
,setter 方法必须为 setFoo()
。POJO 通常通过 PojoTypeInfo
表示,并且被 PojoSerializer
序列化,使用 Kryo 作为可配的回退方案。例外情况有,POJO 是真正的 Avro 类型(明确的 Avro 记录)或由“Avro 反射类型”生成。在这种情况下,POJO 会通过 AvroTypeInfo
表示,并且使用 AvroSerializer
序列化。如果需要,你也可以注册自己的自定义序列化,查看 Serialization 来获取更多信息。
flink 会分析 POJO 类型的结构,比如他会学习 POJO 的属性,因此 POJO 类型比常规类型更易用。除此之外,flink 处理 POJO 比常规类更高效。
下面的案例展示了一个包含包含两个 public 属性的简单 POJO 类。
Java
public class WordWithCount {
public String word;
public int count;
public WordWithCount() {}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
}
DataStream<WordWithCount> wordCounts = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2));
wordCounts.keyBy(value -> value.word);
Scala
class WordWithCount(var word: String, var count: Int) {
def this() {
this(null, -1)
}
}
val input = env.fromElements(
new WordWithCount("hello", 1),
new WordWithCount("world", 2)) // Case Class Data Set
input.keyBy(_.word)
flink 支持 Java 和 Scala 中所有的原始类型,比如:Integer
、 String
和 Double
。
flink 支持大多数 Java 和 Scala 类(API 和自定义),但限制使用包含不能序列化属性的类,比如文件指针、I/O 流、或其他本地资源。遵循 Java Bean 约定的类通常都有很好的支持。
所有没有被认定为 POJO 类型(查看上面的 POJO 要求)的类都会被 flink 当做常规类处理,flink 会认定这些类型为黑箱并且无法访问他们的内容,比如进行高效的排序。常规类使用 Kryo 序列化框架进行反序列化/序列化。
Value 类型手动描述他们的序列化和反序列化,不同于常规的序列化框架,他们通过实现 org.apache.flink.types.Value
接口中的 read
和 write
方法,以自定义代码方式实现这些操作。当常规序列化框架非常低效时,使用 Value 类型是很个合理选择。比如将元素的稀疏向量使用数组类型实现。如果提前知道一个数组内的大部分元素都为零,则可以对非零元素使用特殊编码以减少空间使用,但通用的序列化框架会写入所有数组元素。
org.apache.flink.types.CopyableValue
接口支持使用简单的方式手动进行内部的克隆。
flink 预定了与基础类型对应的 Value 类型,包括:ByteValue
, ShortValue
, IntValue
, LongValue
, FloatValue
, DoubleValue
, StringValue
, CharValue
, BooleanValue
。这些 Value 类型是基础类型的可变变体:他们的值可以被改变,允许程序重新使用对象,还减轻垃圾收集器的压力。
你可以使用实现了 org.apache.hadoop.Writable
接口的类型。通过 write()
和 readFields()
方法定义的序列化逻辑将被用于序列化操作。
你可以使用特殊类型,包括 Scala 的 Either
, Option
和 Try
。Java API 有对 Either
的自定义实现,类似于 Scala 的 Either
,它提供了两个可能类型 Left 或 Right 对应的值。Either
对处理异常或需要输出两个不同类型记录的操作是非常有用的。
注意:该章节值和 Java 有关。
Java 编译器在编译之后会丢失很多泛型类型信息,这就是 Java 的类型消除。这意味着在运行时,一个对象的实例将不再知道他的泛型类型。比如 DataStream
和 DataStream
的实例在 JVM 看来是一样的。
flink 在准备运行程序时就需要类型信息,也就是调用程序 main 方法的时候。flink 的 Java API 会尝试重新构建通过各种方式丢失的类型信息,并且显式的将它们存储到数据集和算子中。可以通过 DataStream.getType()
方法检索类型,该方法会返回 TypeInformation
的实例,该实例是 flink 内部表达类型的方式。
在有些情况下,类型接口也有它的限制,并且需要和应用程序“协作”,比如从 collections 创建数据集和的方法,比如 ExecutionEnvironment.fromCollection()
,你就可以传递一个参数来描述类型,但是类似于 MapFunction
的常规方法,就需要额外的类型信息。
ResultTypeQueryable 接口可以通过输入格式和函数来显式地告诉 API 有关它的返回类型。函数的输入类型可以通过前面算子的结果类型推断出来。
flink 会尝试推断出分布式计算中很多交换和存储的数据类新信息,可以把他想象成为推断表的 schema 的数据库,在大部分情况下,flink 会自己无缝推断出所有必要的信息。使用这些类型信息,flink 就可以做很多事情:
通常来说,在预处理阶段就需要数据类型信息,也就是说,当程序调用 DataStream
和 DataSet
时,以及在调用 execute()
, print()
, count()
或 collect()
之前。
用户与 Flink 的数据类型处理交互时最常见的问题是:
StreamExecutionEnvironment
或 ExecutionEnvironment
上调用 .registerType(clazz)
。StreamExecutionEnvironment
或 ExecutionEnvironment
上调用 .getConfig().addDefaultKryoSerializer(clazz, serializer)
方法。额外的 Kryo 序列化器在很多库中都可用。查看 自定义序列化器 来了解更多自定义序列化器的细节。TypeInformation
:这对于一些 API 调用可能是必要的,因为在这些 API 调用中,由于 Java 的泛型类型消除,Flink 无法推断其数据类型。查看 创建 TypeInformation 或类型序列化器 来获取更多细节。TypeInformation 类是所有类型描述的基础类,它暴露了类型的一些基础配置,并且可以生成类型的序列化器和比较器。注意:在 flink 中,比较器相比于定义顺序,可以做更多事情——他们基本上是处理 key 的工具。
数据类型在 flink 内部有如下区别:
void
, String
, Date
, BigDecimal
和 BigInteger
。POJO 特别有趣,因为它们支持创建复合类型,并且可以在定义 key 时使用字段名:dataSet.join(another).where("name").equalTo("personName")
。他们在运行时透明,并且可以被 flink 高效处理。
如果完全遵循了以下规则,则 flink 会将数据类型识别为 POJO 类型(允许通过名称定位属性):
注意当用户自定义类型无法被识别为 POJO 类型时,它必须被当做泛型类型处理,并且使用 Kryo 序列化。
对某个类型创建 TypeInformation 对象,需要使用对应语言指定的方式:
Java
因为 Java 通常会消除泛型的类型信息,所以你需要将类型处理为 TypeInformation 结构:
对于非泛型类型,你需要使用如下方式:
TypeInformation<String> info = TypeInformation.of(String.class);
对于泛型类型,你需要通过 TypeHint
来“捕获”泛型类型信息:
TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
内部实现中,上述代码会创建一个 TypeHint 的匿名子类来捕获泛型信息并且保持他直到运行时。
Scala
在 Scala 中,Flink 会在编译时使用宏命令来捕获所有可用的泛型类型信息。
// 重要事项:为了访问 createTypeInformation 宏命令,需要导入以下包
import org.apache.flink.streaming.api.scala._
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
作为备用方案,你仍然可以使用 Java 中相同的方法。
为了创建 TypeSerializer
,可以 TypeInformation
对象上直接调用 typeInfo.createSerializer(config)
。
config
参数是 ExecutionConfig
类型,并且保存了程序注册的自定义序列化信息,只要有可能,就会尝试转化为程序真正的 ExecutionConfig。通常来说,可以使用 DataStream
或 DataSet
,通过调用 getExecutionConfig()
获取它。内部函数,比如 MapFunction
,你可以使用对应的 Rich Function ,然后调用 getRuntimeContext().getExecutionConfig()
来获取它。
通过类型清单和类标签,Scala 对运行时的类型信息有非常清晰的概念。通常来说,类型和方法可以访问他们的泛型参数来获取类型,因此,Scala 程序不会像 Java 程序一样遭受类型消除的影响。
另外,Scala 允许 Scala 编译器通过 Scala 宏命令来运行自定义代码,这意味着你只要编译了包含 Flink Scala API 的 Scala 程序,就会执行一些 Flink 代码。
我们在编译期间会使用宏命令来查找所有自定义函数的参数类型和返回值类型,在这个时候,所有的类型信息都是可用的。在宏命令内,我们会对函数的返回值类型或参数类型创建一个 TypeInformation ,并使它成为算子的一部分。
当无法创建 TypeInformation 时,程序会编译失败,并提示该错误:“could not find implicit value for evidence parameter of type TypeInformation”。
出现该错误的常见原因是代码没有导入生成 TypeInformation 对应的包,请确认导入了整个 flink.api.scala 包。
import org.apache.flink.api.scala._
另一个常见错误是泛型方法,具体解决方法详见下面的章节。
参考下面的例子:
def selectFirst[T](input: DataSet[(T, _)]) : DataSet[T] = {
input.map { v => v._1 }
}
val data : DataSet[(String, Long) = ...
val result = selectFirst(data)
对于这样的泛型方法,函数参数和返回值的数据类型在方法每次被调用时都可能不是一样的,在定义方法时并不能确定它们。上面的代码将会导致错误,因为没有足够的隐式值可用。
对于这个情况,必须在调用时生成类型信息并传递给方法,Scala 参考隐式参数来解决。
下面的代码告诉 Scala 将类型参数 T 传递给函数。类型参数将会在方法被调用时生成,而不是在方法被定义时生成。
def selectFirst[T : TypeInformation](input: DataSet[(T, _)]) : DataSet[T] = {
input.map { v => v._1 }
}
正常情况下,Java 会消除泛型类型信息。Flink 会尝试使用少量 Java 保留的二进制信息,通过反射来尽可能的重构类型信息,主要是函数签名和子类信息。当函数的返回类型依赖于输入类型时,该逻辑也包含了一些简单的类型推断:
public class AppendOne<T> implements MapFunction<T, Tuple2<T, Long>> {
public Tuple2<T, Long> map(T value) {
return new Tuple2<T, Long>(value, 1L);
}
}
有些情况下 Flink 会无法重构所有的泛型类型信息,这时,用户就需要借助于类型提示的帮助了:
在 Flink 无法重构被擦除的泛型类型信息时,可以通过 Java API 来调用类型提示。类型提示会告诉系统通过函数产生的 data stream 或 data set 的类型:
DataSet<SomeType> result = dataSet
.map(new MyGenericNonInferrableFunction<Long, SomeType>())
.returns(SomeType.class);
returns
语句通过一个 class 类来指定类型,提示通过以下方式支持类型定义:
returns(new TypeHint>(){})
内使用的 TypeHint。TypeHint
类可以捕获泛型类型信息并在运行时通过匿名子类保留它。Java 8 lambda 表达式的类型提取和非 lambda 表达式不一样,因为 lambda 表达式没有关联某个扩展了 function 接口的实现类。
目前,Flink 会尝试找出实现了 lambda 的方法,并使用 Java 的泛型签名来决定参数类型和返回类型。然而,并不是所有的编译器都会为 lambda 表达式生成这些签名,在撰写本文档时,只有Eclipse JDT 编译器从 4.5 才开始支持。
PojoTypeInfo
会创建 POJP 内所有属性的序列化器,标准类型,比如 int、long、String 等,是由 flink 附带的序列化器处理的。对于其他类型,会将 Kryo 作为备选方案。
如果 Kryo 无法处理类型,我们可以让 PojoTypeInfo
使用 Avro 来序列化 POJO。为了实现他,你必须调用如下代码:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
注意,Flink 会自动使用 Avro 序列化器来序列化通过 Avro 生成的 POJO。
如果你想让你的整个 POJO 类型被 Kryo 序列化器处理,则需要以下代码:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceKryo();
如果 Kryo 无法序列化你的 POJO,可以对 Kryo 增加一个自定义序列化器,使用如下代码:
env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)
这些方法有不同的变体。
有些情况下,程序可能希望显式的避免使用 Kryo 作为泛型类型的回退方案。最常见的一种情况是,希望通过 Flink 自己的序列化器或用户自定义的序列化器来确保所有类型都被有效序列化。
下面的设置在遇到使用 Kryo 处理的数据类型时会发生异常:
env.getConfig().disableGenericTypes();
类型信息工厂允许在 flink 类型系统中使用可插拔的自定义类型信息。你可以实现 org.apache.flink.api.common.typeinfo.TypeInfoFactory
来返回自定义的类型信息。如果相对应的类型上添加了 @org.apache.flink.api.common.typeinfo.TypeInfo
注释,在类型提取阶段,该工厂就会被调用。
类型信息工厂可以在 Java 和 Scala API 中使用。
在类型层次结构中,向上遍历时,会选择最近的工厂,但是,内置的工厂优先级最高。工厂有比 flink 内置类型更高的优先级,因此你需要知道你在使用工厂时在做什么事。
下面的案例展示如何注解自定义类型 MyTuple
,并且在 Java 中使用工厂来支持自定义类型信息。
注解自定义类型:
@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0, T1> {
public T0 myfield0;
public T1 myfield1;
}
工厂提供自定可以类型信息:
public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple> {
@Override
public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new MyTupleTypeInfo(genericParameters.get("T0"), genericParameters.get("T1"));
}
}
createTypeInfo(Type, Map
方法会创建工厂要创建的目标类型的类型信息,参数提供了类型自身的信息,如果可用,也会提供泛型的类型参数。
如果你的类型包含可能需要从 flink 函数的输入类型派生泛型参数,请确保也实现了 org.apache.flink.api.common.typeinfo.TypeInformation#getGenericParameters
方法,以双向匹配泛型参数和类型信息。
如果在 Flink 程序中使用 Flink 类型序列化器无法序列化用户的自定义类型,Flink 会回退到通用的 Kryo 序列化器。这时可以使用 Kryo 注册自己的序列化器或序列化系统,比如 Google Protobuf 或 Apache Thrift。使用方法是通过 Flink 程序中的 ExecutionConfig
注册类类型以及序列化器。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 为类型注册序列化器类
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);
// 为类型注册序列化器实例
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
需要确保你的自定义序列化器继承了 Kryo 的序列化器类。对于 Google Protobuf 或 Apache Thrift,已经为你实现好了:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 使用 Kryo 注册 Google Protobuf 序列化器
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
// 注册 Apache Thrift 序列化器为标准序列化器
// TBaseSerializer 需要初始化为默认的 kryo 序列化器
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
为了使上面的代码正常工作,需要在 Maven 项目文件中(pom.xml)包含必要的依赖。为 Apache Thrift 添加以下依赖:
<dependency>
<groupId>com.twittergroupId>
<artifactId>chill-thriftartifactId>
<version>0.7.6version>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryogroupId>
<artifactId>kryoartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.thriftgroupId>
<artifactId>libthriftartifactId>
<version>0.11.0version>
<exclusions>
<exclusion>
<groupId>javax.servletgroupId>
<artifactId>servlet-apiartifactId>
exclusion>
<exclusion>
<groupId>org.apache.httpcomponentsgroupId>
<artifactId>httpclientartifactId>
exclusion>
exclusions>
dependency>
对于 Google Protobuf 需要添加以下 Maven 依赖:
<dependency>
<groupId>com.twittergroupId>
<artifactId>chill-protobufartifactId>
<version>0.7.6version>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryogroupId>
<artifactId>kryoartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>com.google.protobufgroupId>
<artifactId>protobuf-javaartifactId>
<version>3.7.0version>
dependency>
请根据需要调整两个依赖的版本。
JavaSerializer
的问题如果你为自定义类型注册 Kryo 的 JavaSerializer
,即使你提交的 jar 中包含了自定义类型的类,也可能遇到 ClassNotFoundException
异常。这是 Kryo JavaSerializer
的一个已知问题,它可能使用了错误的类加载器。
在这种情况下,你应该使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
来解决这个问题。这个类是在 Flink 中对 JavaSerializer 的重新实现,可以确保使用用户代码的类加载器。
更多细节可以参考 FLINK-6025。