Iceberg表支持以下数据类型:
PrimitiveType类很简单,提供了eq、ne、is_primitive_type和as_primitive_type成员函数。
class PrimitiveType(Type):
def __eq__(self, other): return type(self) == type(other)
def __ne__(self, other): return not self.__eq__(other)
def is_primitive_type(self): return True
def as_primitive_type(self): return self
PrimitiveType类的子类包含:BooleanType、IntegerType、LongType、FloatType、DoubleType、DateType、TimeType、TimestampType、StringType、UUIDType、FixedType、BinaryType、DecimalType。以布尔类型为例,BooleanType定义在python_legacy/iceberg/api/types/types.py中,BooleanType继承自PrimitiveType类。
class BooleanType(PrimitiveType):
__instance = None
@staticmethod
def get():
if BooleanType.__instance is None:
BooleanType()
return BooleanType.__instance
def __init__(self):
if BooleanType.__instance is not None:
raise Exception("Multiple Boolean Types created")
BooleanType.__instance = self
@property
def type_id(self):
return TypeID.BOOLEAN
def __repr__(self):
return "boolean"
def __str__(self):
return "boolean"
NestedType类(定义在python_legacy/api/types/type.py中)同样提供了nested_type、field_type和fields成员函数。NestedField类(定义在python_legacy/api/types/types.py文件中)包含了is_optional、id、name、type等成员。NestedType类包含StructType、ListType、MapType子类。
class NestedType(Type):
def __init__(self):
super(NestedType, self).__init__()
def is_nested_type(self):
return True
def as_nested_type(self):
return self
def fields(self):
pass
def field_type(self, name):
pass
def field(self, id):
pass
class NestedField():
length: int
@staticmethod
def optional(id, name, type_var, doc=None):
return NestedField(True, id, name, type_var, doc=doc)
@staticmethod
def required(id, name, type, doc=None):
return NestedField(False, id, name, type, doc=doc)
def __init__(self, is_optional, id, name, type, doc=None):
self.is_optional = is_optional
self.id = id
self.name = name
self.type = type
self.doc = doc
@property
def is_required(self):
return not self.is_optional
@property
def field_id(self):
return self.id
def __repr__(self):
return "%s: %s: %s %s(%s)" % (self.id,self.name,"optional" if self.is_optional else "required",self.type, self.doc)
def __str__(self):
return self.__repr__()
def __eq__(self, other):
if id(self) == id(other):
return True
elif other is None or not isinstance(other, NestedField):
return False
return self.is_optional == other.is_optional and self.id == other.id and self.name == other.name and self.type == other.type and self.doc == other.doc
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.__key())
def __key(self):
type_name = self.type.type_id.name
return NestedField.__class__, self.is_optional, self.id, self.name, self.doc, type_name
StructType类继承自NestedType,其最重要的成员就是fields,用于存放数据(如下所示)。python使用list表示fields,并设置到_fields成员中。
"fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" },
{ "id" : 2, "name" : "order_id", "required" : false, "type" : "long" } ]
从index_fields函数可以看出,_fieldsByName、_fieldsByLowercaseName和_fieldsById是以field.name或field.id为键,值为field的字典。
class StructType(NestedType):
FIELD_SEP = ", "
@staticmethod
def of(fields):
return StructType(fields)
def __init__(self, fields):
if fields is None:
raise RuntimeError("Field list cannot be None")
self._fields = list()
for i in range(0, len(fields)):
self._fields.append(fields[i])
self._fieldList = None
self._fieldsByName = None
self._fieldsByLowercaseName = None
self._fieldsById = None
@property
def fields(self):
return self._lazy_field_list()
def field(self, name=None, id=None):
if name:
return self._lazy_fields_by_name().get(name)
elif id:
return self._lazy_fields_by_id()[id]
raise RuntimeError("No valid field info passed in ")
def case_insensitive_field(self, name):
return self._lazy_fields_by_lowercase_name().get(name)
def _lazy_field_list(self):
if self._fieldList is None:
self._fieldList = tuple(self._fields)
return self._fieldList
def _lazy_fields_by_name(self):
if self._fieldsByName is None:
self.index_fields()
return self._fieldsByName
def _lazy_fields_by_lowercase_name(self):
if self._fieldsByName is None:
self.index_fields()
return self._fieldsByName
def _lazy_fields_by_id(self):
if self._fieldsById is None:
self.index_fields()
return self._fieldsById
def index_fields(self):
self._fieldsByName = dict()
self._fieldsByLowercaseName = dict()
self._fieldsById = dict()
for field in self.fields:
self._fieldsByName[field.name] = field
self._fieldsByLowercaseName[field.name.lower()] = field
self._fieldsById[field.id] = field
ListType类型就是List,element_field就是存放elem的list。
class ListType(NestedType):
@staticmethod
def of_optional(element_id, element_type):
if element_type is None:
raise RuntimeError("Element type cannot be null")
return ListType(NestedField.optional(element_id, "element", element_type))
@staticmethod
def of_required(element_id, element_type):
if element_type is None:
raise RuntimeError("Element type cannot be null")
return ListType(NestedField.required(element_id, "element", element_type))
def __init__(self, element_field):
self.element_field = element_field
self._fields = None
@property
def element_type(self):
return self.element_field.type
def field_type(self, name):
if "element" == name:
return self.element_type
def field(self, id):
if self.element_field.id == id:
return self.element_field
def fields(self):
return self._lazyFieldsList()
def _lazyFieldsList(self):
if self._fields is None:
self._fields = [self.element_field]
return self._fields
MapType就是字典类型,其key_field就是存放键数据,value_field存放值数据。
class MapType(NestedType):
@staticmethod
def of_optional(key_id, value_id, key_type, value_type):
if value_type is None:
raise RuntimeError("Value type cannot be null")
return MapType(NestedField.required(key_id, 'key', key_type), NestedField.optional(value_id, 'value', value_type))
@staticmethod
def of_required(key_id, value_id, key_type, value_type):
if value_type is None:
raise RuntimeError("Value type cannot be null")
return MapType(NestedField.required(key_id, 'key', key_type), NestedField.required(value_id, 'value', value_type))
def __init__(self, key_field, value_field):
self.key_field = key_field
self.value_field = value_field
self._fields = None
def field(self, id):
if self.key_field.id == id:
return self.key_field
elif self.value_field.id == id:
return self.value_field
def fields(self):
return self._lazy_field_list()
def _lazy_field_list(self):
return (self.key_field, self.value_field)
def key_id(self):
return self.key_field.field_id
def value_id(self):
return self.value_field.field_id
枚举(enumeration)在许多编程语言中常被表示为一种基础的数据结构使用,枚举帮助组织一系列密切相关的成员到同一个群组机制下,一般各种离散的属性都可以用枚举的数据结构定义,比如颜色、季节、国家、时间单位等。Python标准库enum实现了枚举属性的功能。enum规定了一个有限集合的属性,限定只能使用集合内的值,明确地声明了哪些值是合法值,,如果输入不合法的值会引发错误,只要是想要从一个限定集合取值使用的方式就可以使用enum来组织值。如下为使用案例:
from enum import Enum
class Directions(Enum): # enum的定义/声明
NORTH = 1
EAST = 2
SOUTH = 3
WEST = 4
>>> Directions.EAST # 使用和类型检查
<Directions.EAST: 2>
>>> Directions.SOUTH
<Directions.SOUTH: 3>
>>> Directions.EAST.name
'EAST'
>>> Directions.EAST.value
2
>>> print("South的类型:", type(Directions.SOUTH))
South的类型: <enum 'Directions'>
>>> print(isinstance(Directions.EAST, Directions))
True
>>>
fetched_value = 2 # 获取值
if Directions(fetched_value) is Directions.NORTH:
...
elif Directions(fetched_value) is Directions.EAST:
...
else:
...
>>> Directions(5) # 输入未定义的值
ValueError: 5 is not a valid Directions
>>> for name, value in Directions.__members__.items(): # 遍历成员
... print(name, value)
...
NORTH Directions.NORTH
EAST Directions.EAST
SOUTH Directions.SOUTH
WEST Directions.WEST
TypeID枚举类型定义在python_legacy/api/types/type.py中,用于标识Iceberg支持的数据类型,实际上就是用于数据类型类type_id成员函数返回其标签。
class TypeID(Enum):
BOOLEAN = {"java_class": "Boolean.class", "python_class": bool, "id": 1}
INTEGER = {"java_class": "Integer.class", "python_class": int, "id": 2}
LONG = {"java_class": "Long.class", "python_class": int, "id": 3}
FLOAT = {"java_class": "Float.class", "python_class": float, "id": 4}
DOUBLE = {"java_class": "Double.class", "python_class": float, "id": 5}
DATE = {"java_class": "Integer.class", "python_class": int, "id": 6}
TIME = {"java_class": "Long.class", "python_class": int, "id": 7}
TIMESTAMP = {"java_class": "Long.class", "python_class": int, "id": 8}
STRING = {"java_class": "CharSequence.class", "python_class": str, "id": 9}
UUID = {"java_class": "java.util.UUID.class", "python_class": uuid.UUID, "id": 10}
FIXED = {"java_class": "ByteBuffer.class", "python_class": bytes, "id": 11}
BINARY = {"java_class": "ByteBuffer.class", "python_class": bytearray, "id": 12}
DECIMAL = {"java_class": "BigDecimal.class", "python_class": Decimal, "id": 13}
STRUCT = {"java_class": "Void.class", "python_class": None, "id": 14}
LIST = {"java_class": "Void.class", "python_class": None, "id": 15}
MAP = {"java_class": "Void.class", "python_class": None, "id": 16}
如python_legacy/iceberg/hive/hive_types.py中的hive_types字典就定义了iceberg数据类型和hive数据类型的对应关系。
hive_types = {
TypeID.BOOLEAN: 'boolean',
TypeID.INTEGER: 'int',
TypeID.LONG: 'bigint',
TypeID.FLOAT: 'float',
TypeID.DOUBLE: 'double',
TypeID.DATE: 'date',
TypeID.TIME: 'string',
TypeID.TIMESTAMP: 'timestamp',
TypeID.STRING: 'string',
TypeID.UUID: 'string',
TypeID.FIXED: 'binary',
TypeID.BINARY: "binary",
TypeID.DECIMAL: None,
TypeID.STRUCT: None,
TypeID.LIST: None,
TypeID.MAP: None
}
Type类是PrimitiveType和NestedType类的基类,其提供了判定是否是primitive_type、struct_type、list_type、map_type、nested_type的成员函数。
class Type(object):
length: int
scale: int
precision: int
def __init__(self):
pass
def type_id(self):
pass
def is_primitive_type(self):
return False
def as_primitive_type(self):
raise ValueError("Not a primitive type: " + self)
def as_struct_type(self):
raise ValueError("Not a struct type: " + self)
def as_list_type(self):
raise ValueError("Not a list type: " + self)
def as_map_type(self):
raise ValueError("Not a map type: " + self)
def is_nested_type(self):
return False
def is_struct_type(self):
return False
def is_list_type(self):
return False
def is_map_type(self):
return False
def as_nested_type(self):
raise ValueError("Not a nested type: " + self)