• 【深入浅出 Yarn 架构与实现】2-4 Yarn 基础库 - 状态机库


    当一个服务拥有太多处理逻辑时,会导致代码结构异常的混乱,很难分辨一段逻辑是在哪个阶段发挥作用的。
    这时就可以引入状态机模型,帮助代码结构变得清晰。

    一、状态机库概述#

    一)简介#

    状态机由一组状态组成:
    【初始状态 -> 中间状态 -> 最终状态】。
    在一个状态机中,每个状态会接收一组特定的事件,根据事件类型进行处理,并转换到下一个状态。当转换到最终状态时则退出。

    二)状态转换方式#

    状态间转换会有下面这三种类型:
    image.png

    三)Yarn 状态机类#

    在 Yarn 中提供了一个工厂类 StateMachineFactory 来帮助定义状态机。如何使用,我们直接写个 demo。
    image.png

    二、案例 demo#

    在上一篇文章《Yarn 服务库和事件库》案例基础上进行扩展,增加状态机库的内容。如果还不了解服务库和事件库的同学,建议先学习下上一篇文章。
    案例已上传至 github,有帮助可以点个 ⭐️
    https://github.com/Simon-Ace/hadoop-yarn-study-demo/tree/master/state-demo

    一)状态机实现#

    状态机实现,可以直接嵌入到上篇文章中的 AsyncDispatcher使用。
    这里仅给出状态机JobStateMachine以及各种事件处理的代码。完整的代码项目执行,请到 github demo 中查看。

    import com.shuofxz.event.JobEvent;
    import com.shuofxz.event.JobEventType;
    import org.apache.hadoop.yarn.event.EventHandler;
    import org.apache.hadoop.yarn.state.*;
    
    import java.util.EnumSet;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /*
    * 可参考 Yarn 中实现的状态机对象:
    * ResourceManager 中的 RMAppImpl、RMApp- AttemptImpl、RMContainerImpl 和 RMNodeImpl,
    * NodeManager 中 的 ApplicationImpl、 ContainerImpl 和 LocalizedResource,
    * MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等
    * */
    @SuppressWarnings({"rawtypes", "unchecked"})
    public class JobStateMachine implements EventHandler {
        private final String jobID;
        private EventHandler eventHandler;
        private final Lock writeLock;
        private final Lock readLock;
    
        // 定义状态机
        protected static final StateMachineFactory
                stateMachineFactory = new StateMachineFactory(JobStateInternal.NEW)
                .addTransition(JobStateInternal.NEW, JobStateInternal.INITED, JobEventType.JOB_INIT, new InitTransition())
                .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP, JobEventType.JOB_START, new StartTransition())
                .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING, JobEventType.JOB_SETUP_COMPLETED, new SetupCompletedTransition())
                .addTransition(JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED), JobEventType.JOB_COMPLETED, new JobTasksCompletedTransition())
                .installTopology();
    
        private final StateMachine stateMachine;
    
        public JobStateMachine(String jobID, EventHandler eventHandler) {
            this.jobID = jobID;
    
            // 多线程异步处理,state 有可能被同时读写,使用读写锁来避免竞争
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            this.readLock = readWriteLock.readLock();
            this.writeLock = readWriteLock.writeLock();
    
            this.eventHandler = eventHandler;
            stateMachine = stateMachineFactory.make(this);
        }
    
        protected StateMachine getStateMachine() {
            return stateMachine;
        }
    
        public static class InitTransition implements SingleArcTransition {
            @Override
            public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
                System.out.println("Receiving event " + jobEvent);
                // do something...
                // 完成后发送新的 Event —— JOB_START
                jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_START));
            }
        }
    
        public static class StartTransition implements SingleArcTransition {
            @Override
            public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
                System.out.println("Receiving event " + jobEvent);
                jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_SETUP_COMPLETED));
            }
        }
    
        public static class SetupCompletedTransition implements SingleArcTransition {
            @Override
            public void transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
                System.out.println("Receiving event " + jobEvent);
                jobStateMachine.eventHandler.handle(new JobEvent(jobStateMachine.jobID, JobEventType.JOB_COMPLETED));
            }
        }
    
        public static class JobTasksCompletedTransition implements MultipleArcTransition {
            @Override
            public JobStateInternal transition(JobStateMachine jobStateMachine, JobEvent jobEvent) {
                System.out.println("Receiving event " + jobEvent);
    
                // 这是多结果状态部分,因此需要人为制定后续状态
                // 这里整个流程结束,设置一下对应的状态
                boolean flag = true;
                if (flag) {
                    return JobStateInternal.SUCCEEDED;
                } else {
                    return JobStateInternal.KILLED;
                }
            }
        }
    
        @Override
        public void handle(JobEvent jobEvent) {
            try {
                // 注意这里为了避免静态条件,使用了读写锁
                writeLock.lock();
                JobStateInternal oldState = getInternalState();
                try {
                    getStateMachine().doTransition(jobEvent.getType(), jobEvent);
                } catch (InvalidStateTransitionException e) {
                    System.out.println("Can't handle this event at current state!");
                }
                if (oldState != getInternalState()) {
                    System.out.println("Job Transitioned from " + oldState + " to " + getInternalState());
                }
    
            } finally {
                writeLock.unlock();
            }
        }
    
        public JobStateInternal getInternalState() {
            readLock.lock();
            try {
                return getStateMachine().getCurrentState();
            } finally {
                readLock.unlock();
            }
        }
    
        public enum JobStateInternal {
            NEW,
            SETUP,
            INITED,
            RUNNING,
            SUCCEEDED,
            KILLED
        }
    }
    

    二)状态机可视化#

    hadoop 中提供了状态机可视化的工具类 VisualizeStateMachine.java,可以拷贝到我们的工程中使用。
    根据提示,运行需要三个参数:

    Usage: %s   %n
    

    image.png

    运行后会在项目根目录生成图文件 jsm.gv
    需要使用 graphviz工具将 gv 文件转换成 png 文件:

    # linux 安装
    yum install graphviz
    
    # mac 安装
    brew install graphviz
    

    转换:

    dot -Tpng jsm.gv > jsm.png
    

    可视化状态机展示:
    image.png

    再使用这个工具对 Yarn 中的 Application 状态进行展示:
    image.png

    三)如果不用状态机库#

    【思考】
    如果不用状态机,代码结构会是什么样呢?
    下面这样的代码,如果要增加或修改逻辑可能就是很痛苦的一件事情了。

    // 一堆的函数调用
    
    // 一堆的 if 嵌套
    
    // 或者 switch case
    

    三、总结#

    本节对 Yarn 状态机库进行了介绍。实际使用时会结合事件库、服务库一同使用。
    状态机库的使用帮助代码结构更加的清晰,新增状态处理逻辑只需要增加一个状态类别,或者增加一个方法处理对应类型的事件即可。将整个处理逻辑进行了拆分,便于编写和维护。


    参考文章:
    源码|Yarn的事件驱动模型与状态机

  • 相关阅读:
    Java 同步工具与组合类的线程安全性分析
    kubernetes,service详解下
    2019java面试(六)
    如何知道kafka是否正常生产消费?看看客户端指标采集
    如何利用基于充血模型的DDD开发一个虚拟钱包系统?
    函数式编程——Stream流
    Java8Stream快速使用
    KF32A学习笔记(一):工程导入、编译烧录方法(KF32 IDE+ KF32 PRO)
    面向对象编程原则(06)——依赖倒转原则
    2021年6月大学英语六级翻译
  • 原文地址:https://www.cnblogs.com/shuofxz/p/16881989.html