• 聊一聊UDF/UDTF/UDAF是什么,开发要点及如何使用?


    背景介绍

    UDF来源于Hive,Hive可以允许用户编写自己定义的函数UDF,然后在查询中进行使用。星环Inceptor中的UDF开发规范与Hive相同,目前有3种UDF:

    A. UDF--以单个数据行为参数,输出单个数据行;

    UDF(User Defined Function),即用户自定义函数,能结合SQL语句一起使用,更好地表达复杂的业务逻辑,一般以单个数据行为参数,输出单个数据行;比如数学函数、字符串函数、时间函数、拼接函数

    B. UDTF: 以一个数据行为参数,输出多个数据行为一个表作为输出;

    UDTF(User Defined Table Function),即用户自定义表函数,它与UDF类似。区别在于UDF只能实现一对一,而它用来实现多(行/列)对多(行/列)数据的处理逻辑。一般以一个数据行为参数,输出多个数据行为一个表作为输出,如lateral、view、explore;

    C. UDAF: 以多个数据行为参数,输出一个数据行;

    UDAF(User Defined Aggregate Function)用户自定义聚合函数,是由用户自主定义的,用法同如MAX、MIN和SUM已定义的聚合函数一样的处理函数。而且,不同于只能处理标量数据的系统定义的聚合函数,UDAF的可以接受并处理更广泛的数据类型,如用对象类型、隐式类型或者LOB存储的多媒体数据。由于UDAF也属于聚合函数中的一种,同样也需要与GROUPBY结合使用。

    一般UDAF以多个数据行为参数,接收多个数据行,并输出一个数据行,比如COUNT、MAX;

    UDF、UDTF、UDAF的开发要点及使用DEMO

    星环Quark计算引擎中内置了很多函数,同时支持用户自行扩展,按规则添加后即可在sql执行过程中使用,目前支持UDF、UDTF、UDAF三种类型,一般UDF应用场景较多,后面将着重介绍UDF的开发与使用。UDAF及UDTF将主要介绍开发要点以及Demo示例。

    Quark的UDF接口兼容开源Hive的UDF接口,用户可以参考开源Hive的UDF手册,或者直接把开源Hive的UDF迁移到Quark上。

    UDF

    Quark数据类型

    Quark类型

    Java原始类型

    Java包装类

    hadoop.hive.ioWritable

    tinyintbyteByteByteWritable
    smallintshortShortShortWritable
    intintIntegerIntWritable
    bigintlongLongLongWritable
    string-StringText
    charcharCharacterHiveCharWritable
    booleanbooleanBooleanBooleanWritable
    floatfloatFloatFloatWritable
    double doubleDoubleDoubleWritable
    decimal-BigDecimalHiveDecimalWritable
    date-DateDateWritable
    array-ListArrayListWritable
    Map-MapHashMapWritable

    UDF函数

    Quark 提供了两个实现 UDF 的方式:

    第一种:继承 UDF 类
    • 优点:实现简单;支持Quark的基本类型、数组和Map;支持函数重载。
    • 缺点:逻辑较为简单,只适合用于实现简单的函数
    第二种:继承 GenericUDF 类
    • 优点:支持任意长度、任意类型的参数;可以根据参数个数和类型实现不同的逻辑;资源消耗更低;可以实现初始化和关闭资源的逻辑(initialize、close)。
    • 缺点:实现比继承UDF要复杂一些

    一般在以下几种场景下考虑使用GenericUDF:

    • 传参情况复杂,比如某UDF要传参数有多种数量或多种类型的情况,在UDF中支持这种场景我们需要实现N个不同的evaluate()方法分别对应N种场景的传参,在GenericUDF我们只需在一个方法内加上判断逻辑,对不同的输入路由到不同的处理逻辑上即可。还有比如某UDF参数既要支持String list参数,也要支持Integer list参数。你可能认为我们只要继续多重载方法就好了,但是Java不支持同一个方法重载参数只有泛型类型不一样,所以该场景只能用GenericUDF。
    • 需要传非Writable的或复杂数据类型作为参数。比如嵌套数据结构,传入Map的key-value中的value为list数据类型,或者比如数据域数量不确定的Struct结构,都更适合使用GenericUDF在运行时捕获数据的内部构造。
    • 该UDF被大量、高频地使用,所以从收益上考虑,会尽可能地优化一切可以优化的地方,则GenericUDF相比UDF在operator中避免了多次反射转化的资源消耗(后面会细讲),更适合被考虑。
    • 该UDF函数功能未来预期的重构、扩展场景较多,需要做得足够可扩展,则GenericUDF在这方面更优秀。

    pom文件的依赖导入

    1. UDF开发依赖
    2. org.apache.hive
    3. inceptor-exec
    4. xxx

    继承示例

    1.继承 UDF 类

    该方式实现简单,只需新建一个类继承org.apache.hadoop.hive.ql.exec.UDF;

    继承UDF类必须实现evaluate方法且返回值类型不能为 void,支持定义多个evaluate方法不同参数列表用于处理不同类型数据;

    可通过完善@Description展示UDF用法 UDF样例。

    1. import org.apache.hadoop.hive.ql.exec.UDF;
    2. import org.apache.hadoop.hive.ql.exec.Description;
    3. @Description(
    4. name="my_plus",
    5. value="my_plus() - if string, do concat; if integer, do plus",
    6. extended = "Example : \n >select my_plus('a', 'b');\n >ab\n >select my_plus(3, 5);\n >8"
    7. )
    8. /**
    9. * 实现UDF函数,若字符串执行拼接,int类型执行加法运算。
    10. */
    11. public class AddUDF extends UDF {
    12. /**
    13. * 编写一个函数,要求如下:
    14. * 1. 函数名必须为 evaluate
    15. * 2. 参数和返回值类型可以为:Java基本类型、Java包装类、org.apache.hadoop.io.Writable等类型、List、Map
    16. * 3. 函数一定要有返回值,不能为 void
    17. */
    18. public String evaluate(String... parameters) {
    19. if (parameters == null || parameters.length == 0) {
    20. return null;
    21. }
    22. StringBuilder sb = new StringBuilder();
    23. for (String param : parameters) {
    24. sb.append(param);
    25. }
    26. return sb.toString();
    27. }
    28. /**
    29. * 支持函数重载
    30. */
    31. public int evaluate(IntWritable... parameters) {
    32. if (parameters == null || parameters.length == 0) {
    33. return 0;
    34. }
    35. long sum = 0;
    36. for (IntWritable currentNum : parameters) {
    37. sum = Math.addExact(sum, currentNum.get());
    38. }
    39. return (int) sum;
    40. }
    41. }
    2.继承 GenericUDF 类

    GenericUDF相比与UDF功能更丰富,支持所有参数类型,实现起来也更加复杂。org.apache.hadoop.hive.ql.udf.generic.GenericUDF API提供了一个通用的接口将任何数据类型的对象当作泛型Object去调用和输出,参数类型由ObjectInspector封装;参数Writable类由DeferredObject封装,使用时简单类型可直接从Writable获取,复杂类型可由ObjectInspector解析。

    Java的ObjectInspector类,用于帮助Quark了解复杂对象的内部架构,通过创建特定的ObjectInspector对象替代创建具体类对象,在内存中储存某类对象的信息。在UDF中,ObjectInspector用于帮助Hive引擎将HQL转成MR Job时确定输入和输出的数据类型。Hive语句会生成MapReduce Job执行,所以使用的是Hadoop数据格式,不是编写UDF的Java的数据类型,比如Java的int在Hadoop为IntWritable,String在Hadoop为Text格式,所以我们需要将UDF内的Java数据类型转成正确的Hadoop数据类型以支持Hive将HQL生成MapReduce Job。

    继承 GenericUDF 后,必须实现以下三个方法:

    1. public class MyCountUDF extends GenericUDF {
    2. private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
    3. private transient ObjectInspectorConverters.Converter intConverter;
    4. private transient ObjectInspectorConverters.Converter longConverter;
    5. // 初始化
    6. @Override
    7. public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    8. }
    9. // DeferredObject封装实际参数的对应Writable类
    10. @Override
    11. public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
    12. }
    13. // 函数信息
    14. @Override
    15. public String getDisplayString(String[] strings) {
    16. }
    17. }

    initialize()方法只在 GenericUDF 初始化时被调用一次,执行一些初始化操作,包括:参数个数检查;参数类型检查与转换;确定返回值类型。

    a. 参数个数检查;

    可通过 arguments 数组的长度来判断函数参数的个数:

    1. // 检查该记录是否传过来正确的参数数量,arguments的长度不为2时,则抛出异常
    2. if (arguments.length != 2) {
    3. throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List, T");
    4. }
    b. 参数类型检查与转换;

    针对该UDF的每个参数,initialize()方法都会收到一个对应的ObjectInspector参数,通过遍历ObjectInspector数组检查每个参数类型,根据参数类型构造ObjectInspectorConverters.Converter,用于将Hive传递的参数类型转换为对应的Writable封装对象ObjectInspector,供后续统一处理。

    ObjectInspector内部有一个枚举类 Category,代表了当前 ObjectInspector 的类型。

    1. public interface ObjectInspector extends Cloneable {
    2. public static enum Category {
    3. PRIMITIVE, // Hive原始类型
    4. LIST, // Hive数组
    5. MAP, // Hive Map
    6. STRUCT, // 结构体
    7. UNION // 联合体
    8. };
    9. }

    Quark原始类型又细分了多种子类型,PrimitiveObjectInspector 实现了 ObjectInspector,可以更加具体的表示对应的Hive原始类型。

    1. public interface PrimitiveObjectInspector extends ObjectInspector {
    2. /**
    3. * The primitive types supported by Quark.
    4. */
    5. public static enum PrimitiveCategory {
    6. VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
    7. DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
    8. UNKNOWN
    9. };
    10. }

    参数类型检查与转换示例:

    1. for (int i = 0; i < length; i++) { // 遍历每个参数
    2. ObjectInspector currentOI = arguments[i];
    3. ObjectInspector.Category type = currentOI.getCategory(); // 获取参数类型
    4. if (type != ObjectInspector.Category.PRIMITIVE) { // 检查参数类型
    5. throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
    6. }
    7. PrimitiveObjectInspector.PrimitiveCategory primitiveType =
    8. ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
    9. inputType[i] = primitiveType;
    10. switch (primitiveType) { // 参数类型转换
    11. case INT:
    12. if (intConverter == null) {
    13. ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    14. intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
    15. }
    16. break;
    17. case LONG:
    18. if (longConverter == null) {
    19. ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    20. longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
    21. }
    22. break;
    23. default:
    24. throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
    25. }
    26. }
    c. 确定函数返回值类型

    initialize() 需要 return 一个 ObjectInspector 实例,用于表示自定义UDF返回值类型。initialize() 的返回值决定了 evaluate() 的返回值类型。创建ObjectInspector时,不要用new的方式创建,应该用工厂模式去创建以保证相同类型的ObjectInspector只有一个实例,且同一个ObjectInspector可以在代码中多处被使用。

    1. // 自定义UDF返回值类型为Long
    2. return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    完整的 initialize() 函数
    1. public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    2. int length = arguments.length;
    3. inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
    4. for (int i = 0; i < length; i++) {
    5. ObjectInspector currentOI = arguments[i];
    6. ObjectInspector.Category type = currentOI.getCategory();
    7. if (type != ObjectInspector.Category.PRIMITIVE) {
    8. throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
    9. }
    10. PrimitiveObjectInspector.PrimitiveCategory primitiveType =
    11. ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
    12. inputType[i] = primitiveType;
    13. switch (primitiveType) {
    14. case INT:
    15. if (intConverter == null) {
    16. ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    17. intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
    18. }
    19. break;
    20. case LONG:
    21. if (longConverter == null) {
    22. ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    23. longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
    24. }
    25. break;
    26. default:
    27. throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
    28. }
    29. }
    30. return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    31. }

    evaluate()方法是GenericUDF的核心方法,自定义UDF的实现逻辑。代码实现步骤可以分为三部分:参数接收;自定义UDF核心逻辑;返回处理结果。

    第一步:参数接收

    evaluate() 的参数就是 自定义UDF 的参数。

    1. /**
    2. * Evaluate the GenericUDF with the arguments.
    3. *
    4. * @param arguments
    5. * The arguments as DeferedObject, use DeferedObject.get() to get the
    6. * actual argument Object. The Objects can be inspected by the
    7. * ObjectInspectors passed in the initialize call.
    8. * @return The
    9. */
    10. public abstract Object evaluate(DeferredObject[] arguments)
    11. throws HiveException;

    通过源码注释可知,DeferedObject.get() 可以获取参数的值。

    1. /**
    2. * A Defered Object allows us to do lazy-evaluation and short-circuiting.
    3. * GenericUDF use DeferedObject to pass arguments.
    4. */
    5. public static interface DeferredObject {
    6. void prepare(int version) throws HiveException;
    7. Object get() throws HiveException;
    8. };

    再看看 DeferredObject 的源码,DeferedObject.get() 返回的是 Object,传入的参数不同,会是不同的Java类型。

    第二步:自定义UDF核心逻辑

    这一部分根据实际项目需求自行编写。

    第三步:返回处理结果

    这一步和 initialize() 的返回值一一对应,基本类型返回值有两种:Writable类型 和 Java包装类型:

    • 在 initialize 指定的返回值类型为 Writable类型 时,在 evaluate() 中 return 的就应该是对应的 Writable实例。
    • 在 initialize 指定的返回值类型为 Java包装类型 时,在 evaluate() 中 return 的就应该是对应的 Java包装类实例。

    evalute()示例

    1. @Override
    2. public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
    3. LongWritable out = new LongWritable();
    4. for (int i = 0; i < deferredObjects.length; i++) {
    5. PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
    6. Object param = deferredObjects[i].get();
    7. switch (type) {
    8. case INT:
    9. Object intObject = intConverter.convert(param);
    10. out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
    11. break;
    12. case LONG:
    13. Object longObject = longConverter.convert(param);
    14. out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
    15. break;
    16. default:
    17. throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
    18. }
    19. }
    20. return out;
    21. }

    getDisplayString() 返回的是 explain 时展示的信息。这里不能return null,否则可能在运行时抛出空指针异常。

    1. @Override
    2. public String getDisplayString(String[] strings) {
    3. return "my_count(" + Joiner.on(", ").join(strings) + ")";
    4. }
    自定义GenericUDF完整示例
    1. @Description(
    2. name="my_count",
    3. value="my_count(...) - count int or long type numbers",
    4. extended = "Example :\n >select my_count(3, 5);\n >8\n >select my_count(3, 5, 25);\n >33"
    5. )
    6. public class MyCountUDF extends GenericUDF {
    7. private PrimitiveObjectInspector.PrimitiveCategory[] inputType;
    8. private transient ObjectInspectorConverters.Converter intConverter;
    9. private transient ObjectInspectorConverters.Converter longConverter;
    10. @Override
    11. public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
    12. int length = objectInspectors.length;
    13. inputType = new PrimitiveObjectInspector.PrimitiveCategory[length];
    14. for (int i = 0; i < length; i++) {
    15. ObjectInspector currentOI = objectInspectors[i];
    16. ObjectInspector.Category type = currentOI.getCategory();
    17. if (type != ObjectInspector.Category.PRIMITIVE) {
    18. throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type);
    19. }
    20. PrimitiveObjectInspector.PrimitiveCategory primitiveType =
    21. ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory();
    22. inputType[i] = primitiveType;
    23. switch (primitiveType) {
    24. case INT:
    25. if (intConverter == null) {
    26. ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    27. intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI);
    28. }
    29. break;
    30. case LONG:
    31. if (longConverter == null) {
    32. ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType);
    33. longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI);
    34. }
    35. break;
    36. default:
    37. throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType);
    38. }
    39. }
    40. return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    41. }
    42. @Override
    43. public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
    44. LongWritable out = new LongWritable();
    45. for (int i = 0; i < deferredObjects.length; i++) {
    46. PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i];
    47. Object param = deferredObjects[i].get();
    48. switch (type) {
    49. case INT:
    50. Object intObject = intConverter.convert(param);
    51. out.set(Math.addExact(out.get(), ((IntWritable) intObject).get()));
    52. break;
    53. case LONG:
    54. Object longObject = longConverter.convert(param);
    55. out.set(Math.addExact(out.get(), ((LongWritable) longObject).get()));
    56. break;
    57. default:
    58. throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type);
    59. }
    60. }
    61. return out;
    62. }
    63. @Override
    64. public String getDisplayString(String[] strings) {
    65. return "my_count(" + Joiner.on(", ").join(strings) + ")";
    66. }
    67. }

    UDTF

    UDTF函数作用都是输入一行数据,将该行数据拆分、并返回多行数据。不同的UDTF函数只是拆分的原理不同、作用的数据格式不同而已。

    适用场景

    1. 流应用中对数据处理,如:字符串解析,hyperbase数据删除,时间段去重,时间段统计
    2. 数仓数集应用中需要将单行转换为多行,inceptor内置多种UDTF,如:explode,inline,json_tuple等

    注意:返回UDTF结果的同时查询其他对象,须引用关键字 LATERAL VIEW

    UDTF开发要点

    1. 实现UDTF函数需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

    2. 然后重写/实现initialize, process, close三个方法

    A. initialize初始化验证,返回字段名和字段类型

    initialize初始化:UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型,名称)。initialize针对任务调一次, 作用是定义输出字段的列名、和输出字段的数据类型。

    initialize方法示例
    1. @Override
    2. /**
    3. * 返回数据类型:StructObjectInspector
    4. * 定义输出数据的列名、和数据类型。
    5. */
    6. public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
    7. List fieldNames = new ArrayList(); //fieldNames为输出的字段名
    8. fieldNames.add("world");
    9. List fieldOIs = new ArrayList(); //类型,列输出类型
    10. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    11. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    12. }
    B. 初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回

    process:初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果写出。process传入一行数据写出去多次,传入一行数据输出多行数据,如:mapreduce单词计数。process针对每行数据调用一次该方法。在initialize初始化的时候,定义输出字段的数据类型是集合,调用forward()将数据写入到一个缓冲区,写入缓冲区的数据也要是集合。

    process方法示例
    1. //数据的集合
    2. private List dataList = new ArrayList();
    3. /**
    4. * process(Object[] objects) 参数是一个数组,但是hive中的explode函数接受的是一个,一进多出
    5. * @param args
    6. * @throws HiveException
    7. */
    8. public void process(Object[] args) throws HiveException {
    9. //我们现在的需求是传入一个数据,在传入一个分割符
    10. //1.获取数据
    11. String data = args[0].toString();
    12. //2.获取分割符
    13. String splitKey = args[1].toString();
    14. //3.切分数据,得到一个数组
    15. String[] words = data.split(splitKey);
    16. //4.想把words里面的数据全部写出去。类似在map方法中,通过context.write方法
    17. // 定义是集合、写出去是一个string,类型不匹配,写出也要写出一个集合
    18. for (String word : words) {
    19. //5.将数据放置集合,EG:传入"hello,world,hdfs"---->写出需要写n次,hello\world
    20. dataList.clear();//清空数据集合
    21. dataList.add(word);
    22. //5.写出数据的操作
    23. forward(dataList);
    24. }
    25. }
    C. 最后调用close()方法进行清理工作

    最后close()方法调用,对需要清理的方法进行清理,close()方法针对整个任务调一次

    UDTF DEMO

    下面UDTF 实现的是字符串的分拆,多行输出

    1. package io.transwarp.udtf;
    2. import java.util.ArrayList;
    3. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    4. import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
    5. import org.apache.hadoop.hive.ql.metadata.HiveException;
    6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    7. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    9. import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    10. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    11. public class SplitUDF extends GenericUDTF{
    12. @Override
    13. public void close() throws HiveException {
    14. // TODO Auto-generated method stub
    15. }
    16. @Override
    17. public StructObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
    18. // TODO Auto-generated method stub
    19. if(arg0.length != 1){
    20. throw new UDFArgumentLengthException("SplitString only takes one argument");
    21. }
    22. if(arg0[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
    23. throw new UDFArgumentException("SplitString only takes string as a parameter");
    24. }
    25. ArrayList fieldNames = new ArrayList<>();
    26. ArrayList fieldOIs = new ArrayList<>();
    27. fieldNames.add("col1");
    28. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    29. fieldNames.add("col2");
    30. fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    31. return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    32. }
    33. @Override
    34. public void process(Object[] arg0) throws HiveException {
    35. // TODO Auto-generated method stub
    36. String input = arg0[0].toString();
    37. String[] inputSplits = input.split("#");
    38. for (int i = 0; i < inputSplits.length; i++) {
    39. try {
    40. String[] result = inputSplits[i].split(":");
    41. forward(result);
    42. } catch (Exception e) {
    43. continue;
    44. }
    45. }
    46. }
    47. }

    执行效果如下:

    如何使用UDTF

    将UDTF打包后,放在inceptor server 所在节点之上(建议不要放在/user/lib/hive/lib/下),之后在连接inceptor执行以下命令,生成临时函数(server有效,重启inceptor失效)

    1. add jar /tmp/timestampUDF.jar
    2. drop temporary function timestamp_ms;
    3. create temporary function timestamp_ms as 'io.transwarp.udf.ToTimestamp';
    4. select date, timestamp_ms(date) from table1;

     UDAF

    正如前面所说,UDAF是由用户自主定义的,虽然UDAF的使用可以方便对数据的运算处理,但是使用它的数量建议不要过多,因为UDAF的数量增长和性能下降成线性关系。另外,如果存在大量的嵌套UDAF,系统的性能也会降低,建议用户在可能的情况下写一个没有嵌套或者嵌套较少的UDAF实现相同功能来提高性能。

    UDAF开发要点

    1. 用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;

    2. 用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。

    3. 一个计算函数必须实现的5个方法的具体含义如下:

    • - init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。
    • - iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。
    • - terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。
    • - merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。
    • - terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

    UDAF DEMO

    下面的UDAF DEMO目标是实现找到最大值功能,以表中某一字段为参数,返回最大值。

    1. package udaf.transwarp.io;
    2. import org.apache.hadoop.hive.ql.exec.UDAF;
    3. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    4. import org.apache.hadoop.io.IntWritable;
    5. //UDAF是输入多个数据行,产生一个数据行
    6. //用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
    7. public class MaxiNumber extends UDAF{
    8. public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
    9. //最终结果
    10. private IntWritable result;
    11. //负责初始化计算函数并设置它的内部状态,result是存放最终结果的
    12. @Override
    13. public void init() {
    14. result=null;
    15. }
    16. //每次对一个新值进行聚集计算都会调用iterate方法
    17. public boolean iterate(IntWritable value)
    18. {
    19. if(value==null)
    20. return false;
    21. if(result==null)
    22. result=new IntWritable(value.get());
    23. else
    24. result.set(Math.max(result.get(), value.get()));
    25. return true;
    26. }
    27. //Hive需要部分聚集结果的时候会调用该方法
    28. //会返回一个封装了聚集计算当前状态的对象
    29. public IntWritable terminatePartial()
    30. {
    31. return result;
    32. }
    33. //合并两个部分聚集值会调用这个方法
    34. public boolean merge(IntWritable other)
    35. {
    36. return iterate(other);
    37. }
    38. //Hive需要最终聚集结果时候会调用该方法
    39. public IntWritable terminate()
    40. {
    41. return result;
    42. }
    43. }
    44. }

    UDF 的打包与使用

    操作前提

    将开发好自定义UDF函数的项目打包成jar包,注意:jar 包中的自定义UDF 类名,不能和现有UDF 类,在包名+类名上,完全相同

    部署方式

    常见的UDF部署方式有以下三种:

    • 把UDF固化到image里,重新打image(推荐);
    • 其次是通过创建临时UDF(add jar + temporary function)的方式;
    • 创建永久UDF(hdfs jar+permanent function)的方式(可行,但不是很推荐);

    方式一 固化UDF

    此方式的核心逻辑是把UDF jar包放到image的/usr/lib/inceptor/下面,重新制作image。具体步骤如下:

    以更换inceptor中的inceptor_2.10-1.1.0-transwarp-6.1.0.jar为例:

    1. 进入inceptor image

    docker run -it  bash

    2. 打开另一个terminal

    3. 替换container中的jar包

    docker cp  :/usr/lib/inceptor/ 

    image.png

    4. commit修改记录

    docker commit  REPOSITORY:TAG

    5. 打开manager管理页面重新启动inceptor服务

    6.重启完成后即可查看quark server的pod下/usr/lib/inceptor/是否有新增的jar包

    方式二 创建临时UDF

    1. 查看已存在jar包

    LIST JAR;

    2. 添加jar包

    1. ADD JAR[S] <local_path>;
    2. // Local_path是jar包所在Inceptor server节点的路径。

    3. 创建临时UDF

    CREATE TEMPORARY FUNCTION [<db_name>.]<function_name> AS <class_name>;

    临时UDF在Inceptor重启后失效。如果需要更新临时UDF,需要重启Inceptor重新创建该临时UDF。

    示例:

    4. 验证临时UDF

    SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

    5. 删除临时UDF

    DROP TEMPORARY FUNCTION <if exists> <function_name>;

    方式三 创建永久UDF

    建议优先选取前两种方式,此方式虽然可行但不推荐,故仅介绍基础命令,暂无视频提供。

    1. 查看已存在jar包

    LIST JAR;

    2. 添加jar包

    1. ADD JAR[S] <local_or_hdfs_path>;
    2. //Local_path是Inceptor server节点的路径。保证hive用户对jar所在的目录有读权限。

    3. 创建永久UDF

    CREATE PERMANENT FUNCTION [<db_name>.]<function_name> AS <class_name>;

    如果Inceptor不在local mode,那么资源的地址也必须是非本地URI,比如HDFS地址。

    4. 验证永久UDF

    SELECT [<db_name>.]<function_name>() FROM SYSTEM.DUAL;

    5. 删除永久UDF

    DROP PERMANENT FUNCTION <if exists> <function_name>;

    image.png

  • 相关阅读:
    python 文件加密
    【Java 基础语法】Java 的文件操作
    Python Day15 json和文件操作【初级】
    7、索引优化分析
    【Java面试八股文宝典之基础篇】备战2023 查缺补漏 你越早准备 越早成功!!!——Day10
    【SQL】MySQL中的SQL优化、explain执行计划
    Java并发编程解析 | 解析AQS基础同步器的设计与实现
    Jetpack Compose中的Constraint Layout
    Linux上实现CPU亲缘
    拓扑排序(一部分)
  • 原文地址:https://blog.csdn.net/tiancaidddddd/article/details/139796004