• 基于Thrift的分布式Hive数据源连接器


    基于Thrift的分布式Hive数据源连接器

    问题产生原因

    最近要写一个通过thrift连接hive的工具,传统的jdbc代码是不能获取日志的,因此需要定制一个可以获取hive 执行日志的客户端。但是我们系统本身属于分布式系统。那么就会造成一个问题。我的连接是在节点A获取的,然后在节点A上执行了查询。那么我下次继续请求更新的日志信息的时候,请求可能到达了节点B。

    问题的根源就是请求日志或者数据的时候请求是异步的,请求之后立马返回,下次请求具体到达什么节点是不确定的。

    解决思路

    如大家所知道的连接是不能进行序列化传输的,但是Hive的TOperationHandle是可以进行序列化的。因此我们需要记录一下Hive的操作句柄就可以拿到日志或者数据。

    实现

    借助工具 Hazelcast (可嵌入的分布式内存数据库- 点击查看该集群) 制作一个内存分布式共享集合,然后将请求的句柄存入集合内即可。

    代码一: 分布式内存集合工具类

    由于spring项目内已经注册的有Hazelcast集群,所以我们仅仅在此处检测JVM内存在的Hazelcast资源即可,检测到之后取出来就可以使用

    public class HazelcastUtil {
        private static volatile HazelcastInstance hazelcastInstance;
    
        private HazelcastUtil(){
            if (hazelcastInstance != null){
                throw new DataRockException("can not init hazelcast when it was existed");
            }
        }
    
        public static HazelcastInstance getHazelcastInstance() {
            if (hazelcastInstance == null){
                synchronized (HazelcastUtil.class){
                    if (hazelcastInstance == null){
                        Set<HazelcastInstance> allHazelcastInstances = Hazelcast.getAllHazelcastInstances();
                        Optional<HazelcastInstance> first = allHazelcastInstances.stream().findFirst();
                        if (!first.isPresent()){
                            throw new DataRockException("hazelcast has not be initialized in JVM");
                        }
                        return first.get();
                    }
                }
            }
            return hazelcastInstance;
        }
        public static <A,B> IMap<A,B> getOrCreateHazelcastMap(String name){
            HazelcastInstance instance = getHazelcastInstance();
            IMap<A,B> map = instance.getMap(name);
            return map;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    代码二: 创建一个可序列化的共享集合,该集合的主要目的就是在各个实例之间传输,用来共享句柄和一些句柄的信息。

    @Data
    @Builder
    public class MyOperationStatementRecord implements Serializable {
    
        private TOperationHandle operationHandle;
        private Integer fetchSize;
        private TFetchOrientation fetchLogOrientation;
        private TFetchOrientation fetchDataOrientation;
        private TProtocolVersion protocolVersion;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    代码三:具体请求操作,省略一些冗余代码

    
    
    @Slf4j
    public class HiveInterpreterOperator extends JdbcInterpreterOperator {
    	
    	//省略一些客户端以及协议版本等属性
    	
        public IMap<String, MyOperationStatementRecord> myHiveOperationHandleMap = HazelcastUtil.getOrCreateHazelcastMap(" myHiveOperationHandle");
    
        @Override
        public  void executeAsyncQuery(MyParam param) {
    			//此处省略异步执行sql的代码 用executeAsync代替
    			//Myparam内包含很多配置信息比如连接信息等 
    		  TOperationHandle operationHandle = executeAsync(param)
    			//protocol为使用的hive协议版本
              MyOperationStatementRecord record = MyOperationStatementRecord.builder()
                    .operationHandle(operationHandle)
                    .protocolVersion(protocol)
                    .fetchSize(100)
                    .fetchLogOrientation(TFetchOrientation.FETCH_FIRST).build();
               myHiveOperationHandleMap.put(param.getRequestId(), record);
      
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public List<String> fetchLogs(MyFetchParam param) {
            String requestId = param.getRequestId();
            MyRockOperationStatementRecord record = myHiveOperationHandleMap.get(requestId);
            TOperationHandle operationHandle = record.getOperationHandle();
            TFetchOrientation fetchOrientation = record.getFetchLogOrientation();
            Integer fetchSize = record.getFetchSize();
    
            List<String> logs = new ArrayList<String>();
            TFetchResultsResp tFetchResultsResp = null;
            try {
                TCLIService.Iface client = getClient();
                if (operationHandle != null) {
                    TFetchResultsReq tFetchResultsReq =
                            new TFetchResultsReq(operationHandle, fetchOrientation, fetchSize);
                    tFetchResultsReq.setFetchType(FetchType.LOG.toTFetchType());
    
                    tFetchResultsResp = client.FetchResults(tFetchResultsReq);
                    if (Objects.nonNull(tFetchResultsResp)){
                        record.setFetchLogOrientation(TFetchOrientation.FETCH_NEXT);
                        dataRockHiveOperationHandleMap.put(param.getRequestId(), record);
    
                    }
                }
                RowSet  rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), record.getProtocolVersion());
                for (Object[] row : rowSet) {
                    logs.add(String.valueOf(row[0]));
                }
            } catch (Exception e) {
                throw new RunTimeException("Error when getting query log: " + e, e);
            }
           return logs;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    此时尝试本地开启两个服务分别为80端口服务和81端口服务,在80端口服务进行异步请求,在81端口服务获取日志均成功

  • 相关阅读:
    django连接达梦数据库
    Plumed分子模拟后分析
    电磁场几何和衍射理论的统一
    mapboxgl加载tiff
    Diffusion Models视频生成-博客汇总
    排队时延与流量强度
    网络编程、OSI七层协议
    计算机辅助数据绘图(matlab\python\js)
    多肽介导PEG磷脂——靶向功能材料之DSPE-PEG-RGD/TAT/NGR/APRPG
    基于SSM框架的图片分享及评价网站设计与实现 毕业设计-附源码201524
  • 原文地址:https://blog.csdn.net/sinat_35045195/article/details/126353339