• apache-atlas-hive-hook-源码分析


    Atlas Hook类图

    Hive 元数据变更有2种实现:
    1)基于 Hook 函数实现,实现类为 HiveHook
    2)基于MetaStoreEventListener 实现, 实现类为HiveMetastoreHookImpl
    所以提供2 种配置,即配置钩子函数或监听器,我们目前采用的是第二种方案。Atlas hook的类图如下所示
    在这里插入图片描述Hive hook 开发的步骤
    1)创建类实现ExecuteWithHookContext
    2)打包上传到HIVE_HOME/lib
    3)配置,可在hive client中临时配置,或者设置hive-site.xml配置

    Hive Hook源码分析

    HiveHook重写了AtlasHook的run方法,根据hive的操作类型,生成事件,通过AtlasHook#notifyEntities发送事件消息。

    HiveHook#run

    public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
    @Override
    public void run(HookContext hookContext) throws Exception {
    //...
        try {
    // 从hookContext中获取HiveOperation
            HiveOperation        oper    = OPERATION_MAP.get(hookContext.getOperationName());
            AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects());
            BaseHiveEvent        event   = null;
    // 匹配不同的HiveOperation, 创建事件        
            switch (oper) {
                case CREATEDATABASE:
                    event = new CreateDatabase(context);
                break;
                case DROPDATABASE:
                    event = new DropDatabase(context);
                break;
    //...
                case ALTERTABLE_RENAME:
                case ALTERVIEW_RENAME:
                    event = new AlterTableRename(context);
                break;
                case ALTERTABLE_RENAMECOL:
                    event = new AlterTableRenameCol(context);
                break;
                default:
                    //...
                break;
            }
            if (event != null) {
    // 用户组信息
                final UserGroupInformation ugi = hookContext.getUgi() == null ? Utils.getUGI() : hookContext.getUgi();
    // 委托给AtlasHook#notifyEntities
                super.notifyEntities(event.getNotificationMessages(), ugi);
            }
        } catch (Throwable t) {
            LOG.error("HiveHook.run(): failed to process operation {}", hookContext.getOperationName(), t);
        }
    //...
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    hive事件类图
    在这里插入图片描述BaseHiveEvent事件主要分为以下几类
    1)Hive类型 HIVE_TYPE_
    2)Hbase类型 HBASE_TYPE_
    3)Hive属性 ATTRIBUTE_
    4)Hbase属性 HBASE_
    5)Hive关系属性 RELATIONSHIP_
    6)其他

    AtlasHook#notifyEntities

    notifyEntities消息通知有同步和异步两种策略,notifyEntities委托给notifyEntitiesInternal

    public static void notifyEntities(List<HookNotification> messages, UserGroupInformation ugi, int maxRetries) {
        if (executor == null) { // send synchronously
    // 同步通知
            notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
        } else {
    // 异步通知
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    notifyEntitiesInternal(messages, maxRetries, ugi, notificationInterface, logFailedMessages, failedMessagesLogger);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    AtlasHook#notifyEntitiesInternal

    notifyEntitiesInternal实现了重试策略, 掉用notificationInterface.send方法发送消息

    @VisibleForTesting
    static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, UserGroupInformation ugi,
                                       NotificationInterface notificationInterface,
                                       boolean shouldLogFailedMessages, FailedMessagesLogger logger) {
    // ...
        final int maxAttempts         = maxRetries < 1 ? 1 : maxRetries;
        Exception notificationFailure = null;
    // 多次尝试
        for (int numAttempt = 1; numAttempt <= maxAttempts; numAttempt++) {
            if (numAttempt > 1) { // retry attempt
    // 重试,线程sleep
                try {
                    LOG.debug("Sleeping for {} ms before retry", notificationRetryInterval);
                    Thread.sleep(notificationRetryInterval);
                } catch (InterruptedException ie) {
                    LOG.error("Notification hook thread sleep interrupted");
                    break;
                }
            }
            try {
    //  发送通知,什么地方接收通知
                if (ugi == null) {
                    notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
                } else {
                    PrivilegedExceptionAction<Object> privilegedNotify = new PrivilegedExceptionAction<Object>() {
                        @Override
                        public Object run() throws Exception {
                            notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
                            return messages;
                        }
                    };
                    ugi.doAs(privilegedNotify);
                }
                notificationFailure = null; // notification sent successfully, reset error
                break;
            } catch (Exception e) {
                notificationFailure = e;
                LOG.error("Failed to send notification - attempt #{}; error={}", numAttempt, e.getMessage());
            }
        }
    
    // 通知失败处理
    // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    通知消息最终由KafkaNotification将消息发送到kafka, KafkaNotification在AtlasHook静态方法种由NotificationProvider创建

    notificationInterface的初始化

    notificationInterface的初始化在AtlasHook 的static方法中初始化,notificationInterface的实现类为KafkaNotification,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

    public abstract class AtlasHook {
    //...
        protected static Configuration         atlasProperties;
        protected static NotificationInterface notificationInterface;
    //...
        private static       ExecutorService      executor = null;
    
        static {
            try {
                atlasProperties = ApplicationProperties.get();
            } catch (Exception e) {
                LOG.info("Failed to load application properties", e);
            }
    //...
            metadataNamespace         = getMetadataNamespace(atlasProperties);
            placeCode                 = getPlaceCode(atlasProperties);
            notificationMaxRetries    = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
            notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
    // 消息通知接口实现
            notificationInterface     = NotificationProvider.get();
    //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    HiveMetastoreHookImpl源码分析

    HiveMetastoreHookImpl

    HiveMetastoreHookImpl 继承了MetaStoreEventListener 类,重写了 onCreateDatabase、onDropDatabase等方法,并委托给HiveMetastoreHook#handleEvent处理

    public class HiveMetastoreHookImpl extends MetaStoreEventListener {
        private static final Logger            LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class);
        private        final HiveHook          hiveHook;
        private        final HiveMetastoreHook hook;
    
        public HiveMetastoreHookImpl(Configuration config) {
            super(config);
            this.hiveHook = new HiveHook();
            this.hook     = new HiveMetastoreHook();
        }
    //...
        @Override
        public void onCreateDatabase(CreateDatabaseEvent dbEvent) {
            HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent);
        // 委托给HiveMetastoreHook执行
            hook.handleEvent(context);
        }
        @Override
        public void onDropDatabase(DropDatabaseEvent dbEvent) {
            HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent);
            hook.handleEvent(context);
    }
    //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    HiveMetastoreHookImpl#handleEvent

    handleEvent将变更消息通过AtlasHook#notifyEntities,消息将发送到kafka中,Atlas服务端对消息进行消费,然后入库。

    public void handleEvent(HiveOperationContext operContext) {
            ListenerEvent listenerEvent = operContext.getEvent();
    //...
            try {
                HiveOperation        oper    = operContext.getOperation();
                AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent);
                BaseHiveEvent        event   = null;
                switch (oper) {
                    case CREATEDATABASE:
                        event = new CreateDatabase(context);
                        break;
                   //...
                    case ALTERTABLE_RENAMECOL:
                        FieldSchema columnOld = operContext.getColumnOld();
                        FieldSchema columnNew = operContext.getColumnNew();
                        event = new AlterTableRenameCol(columnOld, columnNew, context);
                        break;
                    default:
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent);
                        }
                        break;
                }
                if (event != null) {
                    final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI();
    // 消息通知,AtlasHook#notifyEntities
                    super.notifyEntities(event.getNotificationMessages(), ugi);
                }
            } catch (Throwable t) {
                LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    C语言和mfc按格式读取文件数据
    如何用Java实现一个基于机器学习的情感分析系统,用于分析文本中的情感倾向
    基于SSM实现在线租房系统
    安路远程调试使用chipwatcher报错
    Angular 依赖注入介绍及使用(五)
    Sharding-JDBC概述
    Canal实现Mysql数据同步至Redis、Elasticsearch
    16 _ 二分查找(下):如何快速定位IP对应的省份地址?
    C# Onnx DIS高精度图像二类分割
    可靠的可视化监控平台应用在那些场景?
  • 原文地址:https://blog.csdn.net/windydreams/article/details/127893478