• hive 自定义函数、GenericUDF、GenericUDTF(自定义函数计算给定字符串的长度、将一个任意分割符的字符串切割成独立的单词)


    前言

      当Hive提供的内置函数无法满足业务处理需要时,可以考虑使用用户自定义函数


    1. 自定义函数

    官方文档

    1.1 分类

      UDF(User-Defined-Function) 一进一出
      UDAF(User-Defined Aggregation Function)聚集函数,多进一出
      UDTF(User-Defined Table-Generating Functions)一进多出

    1.2 编程步骤

      (1)继承Hive提供的类

    org.apache.hadoop.hive.ql.udf.generic.GenericUDF  
    org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    
    • 1
    • 2

      (2)实现类中的抽象方法
      (3)在hive的命令行窗口创建函数
    添加jar

    add jar jar路径
    
    • 1

    创建function

    create [temporary] function [dbname.]function_name AS class_name;
    
    • 1

      (4)在hive的命令行窗口删除函数

    drop [temporary] function [if exists] [dbname.]function_name;
    
    • 1

    2. 自定义UDF函数

    2.1 参考内置函数concat

    /**
     * GenericUDFConcat.
     */
    @Description(name = "concat",
    value = "_FUNC_(str1, str2, ... strN) - returns the concatenation of str1, str2, ... strN or "+
            "_FUNC_(bin1, bin2, ... binN) - returns the concatenation of bytes in binary data " +
            " bin1, bin2, ... binN",
    extended = "Returns NULL if any argument is NULL.\n"
    + "Example:\n"
    + "  > SELECT _FUNC_('abc', 'def') FROM src LIMIT 1;\n"
    + "  'abcdef'")
    @VectorizedExpressions({
        StringGroupConcatColCol.class,
        StringGroupColConcatStringScalar.class,
        StringScalarConcatStringGroupCol.class})
    public class GenericUDFConcat extends GenericUDF {
      private transient ObjectInspector[] argumentOIs;
      private transient StringConverter[] stringConverters;
      private transient PrimitiveCategory returnType = PrimitiveCategory.STRING;
      private transient BytesWritable[] bw;
      private transient GenericUDFUtils.StringHelper returnHelper;
    
      @Override
      public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    
        // Loop through all the inputs to determine the appropriate return type/length.
        // Return type:
        //  All CHAR inputs: return CHAR
        //  All VARCHAR inputs: return VARCHAR
        //  All CHAR/VARCHAR inputs: return VARCHAR
        //  All BINARY inputs: return BINARY
        //  Otherwise return STRING
        argumentOIs = arguments;
    
        PrimitiveCategory currentCategory;
        PrimitiveObjectInspector poi;
        boolean fixedLengthReturnValue = true;
        int returnLength = 0;  // Only for char/varchar return types
        for (int idx = 0; idx < arguments.length; ++idx) {
          if (arguments[idx].getCategory() != Category.PRIMITIVE) {
            throw new UDFArgumentException("CONCAT only takes primitive arguments");
          }
          poi = (PrimitiveObjectInspector)arguments[idx];
          currentCategory = poi.getPrimitiveCategory();
          if (idx == 0) {
            returnType = currentCategory;
          }
          switch (currentCategory) {
            case BINARY:
              fixedLengthReturnValue = false;
              if (returnType != currentCategory) {
                // mix of binary/non-binary args
                returnType = PrimitiveCategory.STRING;
              }
              break;
            case CHAR:
            case VARCHAR:
              if (!fixedLengthReturnValue) {
                returnType = PrimitiveCategory.STRING;
              }
              if (fixedLengthReturnValue && currentCategory == PrimitiveCategory.VARCHAR) {
                returnType = PrimitiveCategory.VARCHAR;
              }
              break;
            default:
              returnType = PrimitiveCategory.STRING;
              fixedLengthReturnValue = false;
              break;
          }
    
          // If all arguments are of known length then we can keep track of the max
          // length of the return type. However if the return length exceeds the
          // max length for the char/varchar, then the return type reverts to string.
          if (fixedLengthReturnValue) {
            returnLength += GenericUDFUtils.StringHelper.getFixedStringSizeForType(poi);
            if ((returnType == PrimitiveCategory.VARCHAR
                    && returnLength > HiveVarchar.MAX_VARCHAR_LENGTH)
                || (returnType == PrimitiveCategory.CHAR
                    && returnLength > HiveChar.MAX_CHAR_LENGTH)) {
              returnType = PrimitiveCategory.STRING;
              fixedLengthReturnValue = false;
            }
          }
        }
    
        if (returnType == PrimitiveCategory.BINARY) {
          bw = new BytesWritable[arguments.length];
          return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
        } else {
          // treat all inputs as string, the return value will be converted to the appropriate type.
          createStringConverters();
          returnHelper = new GenericUDFUtils.StringHelper(returnType);
          BaseCharTypeInfo typeInfo;
          switch (returnType) {
            case STRING:
              return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
            case CHAR:
              typeInfo = TypeInfoFactory.getCharTypeInfo(returnLength);
              return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo);
            case VARCHAR:
              typeInfo = TypeInfoFactory.getVarcharTypeInfo(returnLength);
              return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(typeInfo);
            default:
              throw new UDFArgumentException("Unexpected CONCAT return type of " + returnType);
          }
        }
      }
    
      private void createStringConverters() {
        stringConverters = new StringConverter[argumentOIs.length];
        for (int idx = 0; idx < argumentOIs.length; ++idx) {
          stringConverters[idx] = new StringConverter((PrimitiveObjectInspector) argumentOIs[idx]);
        }
      }
    
      @Override
      public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if (returnType == PrimitiveCategory.BINARY) {
          return binaryEvaluate(arguments);
        } else {
          return returnHelper.setReturnValue(stringEvaluate(arguments));
        }
      }
    
      public Object binaryEvaluate(DeferredObject[] arguments) throws HiveException {
        int len = 0;
        for (int idx = 0; idx < arguments.length; ++idx) {
          bw[idx] = ((BinaryObjectInspector)argumentOIs[idx])
              .getPrimitiveWritableObject(arguments[idx].get());
          if (bw[idx] == null){
            return null;
          }
          len += bw[idx].getLength();
        }
    
        byte[] out = new byte[len];
        int curLen = 0;
        // Need to iterate twice since BytesWritable doesn't support append.
        for (BytesWritable bytes : bw){
          System.arraycopy(bytes.getBytes(), 0, out, curLen, bytes.getLength());
          curLen += bytes.getLength();
        }
        return new BytesWritable(out);
      }
    
      public String stringEvaluate(DeferredObject[] arguments) throws HiveException {
        StringBuilder sb = new StringBuilder();
        for (int idx = 0; idx < arguments.length; ++idx) {
          String val = null;
          if (arguments[idx] != null) {
            val = (String) stringConverters[idx].convert(arguments[idx].get());
          }
          if (val == null) {
            return null;
          }
          sb.append(val);
        }
        return sb.toString();
      }
    
      @Override
      public String getDisplayString(String[] children) {
        return getStandardDisplayString("concat", children);
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167

    2.2 自定义一个UDF实现计算给定字符串的长度

      引入依赖

    <dependencies>
    		<dependency>
    			<groupId>org.apache.hive</groupId>
    			<artifactId>hive-exec</artifactId>
    			<version>3.1.2</version>
    		</dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

      继承抽象类GenericUDF

    public class MyStringLength extends GenericUDF {
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            if(arguments.length!=1){
                throw new UDFArgumentLengthException("输入参数个数不为1");
            }
            if(!arguments[0].getCategory().equals( ObjectInspector.Category.PRIMITIVE)){
                throw  new UDFArgumentTypeException(0,"输入参数类型错误");
            }
            return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
        }
    
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
            if(arguments[0].get() == null){
                return 0 ;
            }
            return arguments[0].get().toString().length();
        }
    
        @Override
        public String getDisplayString(String[] children) {
            return "htdata MyStringLength";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

      打成jar包上传到服务器,将jar包添加到hive的classpath

    add jar /home/hdfs/hiveFunction-1.0-SNAPSHOT.jar;
    
    • 1

    在这里插入图片描述
      创建临时函数与java class关联

    create temporary function htdata_length as "com.cz.MyStringLength";
    
    • 1

    在这里插入图片描述
    在这里插入图片描述


    3. 自定义UDTF函数

    3.1 参考内置函数explode

    public class GenericUDTFExplode extends GenericUDTF {
    
      private transient ObjectInspector inputOI = null;
      @Override
      public void close() throws HiveException {
      }
    
      @Override
      public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
          throw new UDFArgumentException("explode() takes only one argument");
        }
    
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    
        switch (args[0].getCategory()) {
        case LIST:
          inputOI = args[0];
          fieldNames.add("col");
          fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
          break;
        case MAP:
          inputOI = args[0];
          fieldNames.add("key");
          fieldNames.add("value");
          fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
          fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
          break;
        default:
          throw new UDFArgumentException("explode() takes an array or a map as a parameter");
        }
    
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
            fieldOIs);
      }
    
      private transient final Object[] forwardListObj = new Object[1];
      private transient final Object[] forwardMapObj = new Object[2];
    
      @Override
      public void process(Object[] o) throws HiveException {
        switch (inputOI.getCategory()) {
        case LIST:
          ListObjectInspector listOI = (ListObjectInspector)inputOI;
          List<?> list = listOI.getList(o[0]);
          if (list == null) {
            return;
          }
          for (Object r : list) {
            forwardListObj[0] = r;
            forward(forwardListObj);
          }
          break;
        case MAP:
          MapObjectInspector mapOI = (MapObjectInspector)inputOI;
          Map<?,?> map = mapOI.getMap(o[0]);
          if (map == null) {
            return;
          }
          for (Entry<?,?> r : map.entrySet()) {
            forwardMapObj[0] = r.getKey();
            forwardMapObj[1] = r.getValue();
            forward(forwardMapObj);
          }
          break;
        default:
          throw new TaskExecutionException("explode() can only operate on an array or a map");
        }
      }
    
      @Override
      public String toString() {
        return "explode";
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    3.2 自定义一个UDTF实现将一个任意分割符的字符串切割成独立的单词

      继承抽象类GenericUDTF

    public class MyUDTF extends GenericUDTF {
    
        private ArrayList<String> outList = new ArrayList<>();
    
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            //1.定义输出数据的列名和类型
            List<String> fieldNames = new ArrayList<>();
            List<ObjectInspector> fieldOIs = new ArrayList<>();
            //2.添加输出数据的列名和类型
            fieldNames.add("lineToWord");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            //1.获取原始数据
            String arg = args[0].toString();
            //2.获取数据传入的第二个参数,此处为分隔符
            String splitKey = args[1].toString();
            //3.将原始数据按照传入的分隔符进行切分
            String[] fields = arg.split(splitKey);
            //4.遍历切分后的结果,并写出
            for (String field : fields) {
                //集合为复用的,首先清空集合
                outList.clear();
                //将每一个单词添加至集合
                outList.add(field);
                //将集合内容写出
                forward(outList);
            }
        }
    
        @Override
        public void close() throws HiveException {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

      打成jar包上传到服务器,将jar包添加到hive的classpath下,创建临时函数与java class关联

    create temporary function htdata_split as "com.cz.MyUDTF";
    
    • 1

    在这里插入图片描述
    在这里插入图片描述


  • 相关阅读:
    【附源码】Python计算机毕业设计企业物资管理系统
    格雷希尔GripSeal密封测试接头更换密封圈时需要注意些什么
    数据结构和算法之排序和查找
    泰迪智能科技助力中山三院放射科搭建生成式大模型应用
    FFmpeg 视频常用处理命令指定时间处理
    WEB服务器的超级防护——安全WAF
    蓝桥杯第 2 场算法双周赛 第2题 铺地板【算法赛】c++ 数学思维
    MySQL的行锁和表锁
    手把手教你ubuntu下移植MJPG-streamer
    【附源码】Python计算机毕业设计企业员工考勤管理系统
  • 原文地址:https://blog.csdn.net/javahelpyou/article/details/125625995