• flowable异步任务加锁流程


    一、异步任务执行

      1.1流程图如下:

    1.2时序图如下:

    加入有两个异步任务,同时触达,那么如下图

    1.3代码分析如下:

    1.3.1 入口代码

    附上部分源代码。事物提交监听器入口如下:
    复制代码
    public class JobAddedTransactionListener implements TransactionListener {
    
        ...
    
        @Override
        public void execute(CommandContext commandContext) {
            asyncExecutor.executeAsyncJob(job);
        }
    复制代码

     

    接入spring后,后续主要逻辑进入ExecuteAsyncRunnable类的run方法,如下:
    复制代码
    public void run() {
    
        ...
        if (job instanceof AbstractRuntimeJobEntity) {
    
            boolean lockingNeeded = ((AbstractRuntimeJobEntity) job).isExclusive();
            boolean executeJob = true;
            if (lockingNeeded) {
                executeJob = lockJob();
            }
            if (executeJob) {
                executeJob(lockingNeeded);
            }
    
        }
    
    }
    复制代码

     

    1.3.2 加锁代码

    lockJob方法,主要代码如下:
    复制代码
    protected boolean lockJob() {
        Job job = (Job) this.job; 
        try {
            // 加锁
            jobServiceConfiguration.getCommandExecutor().execute(new LockExclusiveJobCmd(job, jobServiceConfiguration));
    
        } catch (Throwable lockException) {
    
            // 释放job,等待下次被调用
            unacquireJob();
    
            return false;
        }
    
        return true;
    }
    复制代码

     

    真实加锁方法,主要代码在DefaultInternalJobManager类,如下:
    复制代码
    protected void lockJobScopeInternal(Job job) {
        ExecutionEntityManager executionEntityManager = getExecutionEntityManager();
        ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId());
        if (execution != null) {
            String lockOwner;
            Date lockExpirationTime;
            // 处理lockOwner与lockExpirationTime,省略
            
            executionEntityManager.updateProcessInstanceLockTime(execution.getProcessInstanceId(), lockOwner, lockExpirationTime);
        }
    
    }
    复制代码

     

    实际加锁,其实是数据库悲观锁,在MybatisExecutionDataManager类如下:
    复制代码
    public void updateProcessInstanceLockTime(String processInstanceId, Date lockDate, String lockOwner, Date expirationTime) {
        HashMap params = new HashMap<>();
        params.put("id", processInstanceId);
        params.put("lockTime", lockDate);
        params.put("expirationTime", expirationTime);
        params.put("lockOwner", lockOwner);
    
        int result = getDbSqlSession().directUpdate("updateProcessInstanceLockTime", params);
        if (result == 0) {
            throw new FlowableOptimisticLockingException("Could not lock process instance");
        }
    }
    复制代码

    二、异步job执行

      2.1 流程图如下:

    • 注意,此图中的锁冲突,主要是多服务器并发捞取数据时,容易触发。

    2.2 代码如下

    2.2.1 入口

    入口在AcquireAsyncJobsDueRunnable的run方法,主要代码如下:
    复制代码
    public synchronized void run() {
        while (!isInterrupted) {
    
            // 全局锁——增加之后,所有实例中,锁过期前同一时间只有一个可以运行
            if (configuration.isGlobalAcquireLockEnabled()) {
    
            
            } else {
                // 循环执行
                millisToWait = executeAcquireCycle(commandExecutor);
    
            }
    
            // 等待
            if (millisToWait > 0) {
                sleep(millisToWait);
            }
    
        }
    }
    复制代码

     

    2.2.2run方法注入spring

    配置在ProcessEngineAutoConfiguration类,方法如下:
    复制代码
    springProcessEngineConfiguration(){
        AsyncExecutor springAsyncExecutor = asyncExecutorProvider.getIfUnique();
        if (springAsyncExecutor != null) {
            conf.setAsyncExecutor(springAsyncExecutor);
        }
    }
    复制代码

     

    启动在SpringProcessEngineConfiguration类,此类实现了spring的Lifecycle类,具体为start方法,层层堆叠如下:
    复制代码
    public void start() {
        synchronized (lifeCycleMonitor) {
            if (!isRunning()) {
                enginesBuild.forEach(name -> {
                    ProcessEngine processEngine = ProcessEngines.getProcessEngine(name);
                    // 这里
                    processEngine.startExecutors();
                    autoDeployResources(processEngine);
                });
                running = true;
            }
        }
    }
    
    
    public void startExecutors() {
        if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {
            // 此处开启
            asyncExecutor.start();
        }
    }
    
    public void start() {
        if (isActive) {
            return;
        }
    
        isActive = true;
    
        LOGGER.info("Starting up the async job executor [{}] for engine {}", getClass().getName(), getJobServiceConfiguration().getEngineName());
    
        initializeJobEntityManager();
        // 初始化
        initializeRunnables();
        // 真实开启
        startAdditionalComponents();
        executeTemporaryJobs();
    }
    
    protected void startAdditionalComponents() {
        if (!isMessageQueueMode) {
            initAsyncJobExecutionThreadPool();
            // 开启异步任务方法
            startJobAcquisitionThread();
        }
    }
    
    protected void startTimerAcquisitionThread() {
        if (configuration.isTimerJobAcquisitionEnabled()) {
            if (timerJobAcquisitionThread == null) {
                timerJobAcquisitionThread = new Thread(timerJobRunnable);
            }
            
            // 开启
            timerJobAcquisitionThread.start();
        }
    }
    复制代码

     

    2.2.3 关于加锁

    进入AcquireAsyncJobsDueRunnable类,逻辑如下:
    复制代码
    protected long acquireAndExecuteJobs(CommandExecutor commandExecutor, int remainingCapacity) {
        boolean globalAcquireLockEnabled = configuration.isGlobalAcquireLockEnabled();
        try {
            Listextends JobInfoEntity> acquiredJobs;
            // 获取并加锁
            acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(asyncExecutor, remainingCapacity, jobEntityManager));
    
            // 执行
            List rejectedJobs = offerJobs(acquiredJobs);
    
            LOGGER.debug("Jobs acquired: {}, rejected: {}, for engine {}", acquiredJobs.size(), rejectedJobs.size(), getEngineName());
            
    
        } catch (FlowableOptimisticLockingException optimisticLockingException) {
    
        } catch (Throwable e) {
            LOGGER.warn("exception for engine {} during async job acquisition: {}", getEngineName(), e.getMessage(), e);
        }
    
        return asyncExecutor.getDefaultAsyncJobAcquireWaitTimeInMillis();
    }
    复制代码

     

    看一下AcquireJobsCmd类,代码如下:
    复制代码
    public Listextends JobInfoEntity> execute(CommandContext commandContext) {
        int maxResults = Math.min(remainingCapacity, asyncExecutor.getMaxAsyncJobsDuePerAcquisition());
        List enabledCategories = asyncExecutor.getJobServiceConfiguration().getEnabledJobCategories();
        // 查询数据库
        Listextends JobInfoEntity> jobs = jobEntityManager.findJobsToExecute(enabledCategories, new Page(0, maxResults));
    
        for (JobInfoEntity job : jobs) {
            // 加锁
            lockJob(job, asyncExecutor.getAsyncJobLockTimeInMillis(), asyncExecutor.getJobServiceConfiguration());
        }
    
        return jobs;
    }
    
    protected void lockJob(JobInfoEntity job, int lockTimeInMillis, JobServiceConfiguration jobServiceConfiguration) {
        GregorianCalendar gregorianCalendar = calculateLockExpirationTime(lockTimeInMillis, jobServiceConfiguration);
        job.setLockOwner(asyncExecutor.getLockOwner());
        job.setLockExpirationTime(gregorianCalendar.getTime());
    }
    复制代码

     

    到这里,实际上并没有数据库操作,但是注意,flowable的select方法,会把查出来的数据,放入缓存中。且刚刚我们结果的是外层被命令模式封装的责任链,所以,可以知道业务代码处理完,会执行责任链后置代码,具体入库为CommandContextInterceptor类execute中的commandContext.close()代码,此方法内会冲刷session(flushSessions方法)。我们直接看dbsqlSession的处理
    复制代码
    public void flush() {
        // 此方法把缓存中修改过的对象,组装为update方法
        determineUpdatedObjects(); 
        removeUnnecessaryOperations();
    
        if (LOGGER.isDebugEnabled()) {
            debugFlush();
        }
    
        flushInserts();
        // 更新数据
        flushUpdates();
        flushDeletes();
    }
    复制代码

     

    至此,我们只要再看下flushUpdates即可,代码如下:
    复制代码
    protected void flushUpdates() {
        for (Entity updatedObject : updatedObjects) {
            // 执行变更
            int updatedRecords = sqlSession.update(updateStatement, updatedObject);
            // 变更失败获取锁失败
            if (updatedRecords == 0) {
                throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently");
            }
    
        }
        updatedObjects.clear();
    }
    复制代码

    三、关于全局锁

    3.1 异步job的lockOwner设定

    关于job的lockOwner,如果是机器A从timeJob捞起来,满足条件的数据,在插入job表是会直接在本机设定,然后注册一个事务监听器。job入库事务提交后,还是机器A来执行job,这个时候,job的死循环,不回拉到这个刚刚timeJob捞起过的数据。 现在看起来,以下两种场景,lockOwner会为空: 1、独占任务并发执行时,没有抢到流程实例锁的任务,重新插入时,lockOwner为空 2、异步线程池满了,新任务插入时,报错被捕获,此时会情况lockOwner

    3.2 全局锁对性能的影响分析

    基于异步job的lockOwner设定,当任务执行时间比较密集。比如同一秒有10000个需要执行的任务时,此时假定我们有5台服务器,单服务器的异步线程池队列长度为200。那么无论对job还是timeJob,都有大量待认领的任务。 我们可以简单的把一次拉起带执行job并操作的过程,分为三步:
    1. 获取带执行任务(批量,默认500)
    2. 加锁,指定此任务由本机执行。
    3. 交给本机异步线程池执行
    不开全局锁的时候,多个服务器执行步骤1时,可能会拉取到同样的数据。此时他们会尝试以update xx where id=xx;这种格式,对500条sql进行提交。此时有可能多台机器都在和数据库交互。因为每次update语句时,事物还没有提交,所以当前逻辑暂时都没有问题。但是最终事物提交时,永远只有一个能成功提交。此时数据库开始回滚。
    当我们机器足够多,且带执行任务足够多时,上述情况大概率发生。而实际上步骤3因为只要交给异步线程池即可结束,并非耗时操作。综上理解为,全局锁应该可以解决1、2中的锁冲突,从而提升性能。
    再具体一些,如果我们的待执行任务不多,那么可以理解为同一时间,只要有一个机器的job处理器在拉取任务,放入自身的异步队列,就可以处理完所有的job,那么此时全局锁开启其实不会拖慢性能。 相对应的,如果带执行任务足够多,多机器并行时,比如AB两台机器,不开全局锁,可能节约的时间为A机器执行到步骤3时,B机器执行步骤1拉取。但是可能面对的问题是A机器执行步骤3之前,比如1或2时,B机器已经开始拉取数据,此时B机器执行步骤锁定job时会出现锁冲突。 即我们可以通过1/2/3步骤,每一步的耗时比,来评判全局锁的性能收益。但是考虑到1/2步骤时数据库操作,且当数据量为500条时步骤2为update xx where id=xx;*500次的数据库操作,而步骤3为内存操作。所以暂时任务步骤1&2的耗时>>步骤3的耗时,所以全局锁可以产生全局正面收益。
     
  • 相关阅读:
    银河麒麟桌面操作系统V10升级 华云数据携手麒麟为用户提供更佳体验
    优酷 Android 包瘦身治理思路全解
    操作系统学习笔记(Ⅲ):内存
    openbmc开发37:webui开发—增加全屏功能
    【算法练习Day30】无重叠区间&& 划分字母区间&&合并区间
    河南高考录取统计分析
    移远EC600U-CN开发板 11.22
    转发网关与NAT网关
    【开发日记】MySQL-Explain学习日记
    退火算法研究分析
  • 原文地址:https://www.cnblogs.com/fbw-gxy/p/17183677.html