flowable异步任务加锁流程

发布时间 2023-03-22 19:13:47作者: 豆豆323

一、异步任务执行

  1.1流程图如下:

1.2时序图如下:

加入有两个异步任务,同时触达,那么如下图

1.3代码分析如下:

1.3.1 入口代码

附上部分源代码。事物提交监听器入口如下:
public class JobAddedTransactionListener implements TransactionListener {

    ...

    @Override
    public void execute(CommandContext commandContext) {
        asyncExecutor.executeAsyncJob(job);
    }

 

接入spring后,后续主要逻辑进入ExecuteAsyncRunnable类的run方法,如下:
public void run() {

    ...
    if (job instanceof AbstractRuntimeJobEntity) {

        boolean lockingNeeded = ((AbstractRuntimeJobEntity) job).isExclusive();
        boolean executeJob = true;
        if (lockingNeeded) {
            executeJob = lockJob();
        }
        if (executeJob) {
            executeJob(lockingNeeded);
        }

    }

}

 

1.3.2 加锁代码

lockJob方法,主要代码如下:
protected boolean lockJob() {
    Job job = (Job) this.job; 
    try {
        // 加锁
        jobServiceConfiguration.getCommandExecutor().execute(new LockExclusiveJobCmd(job, jobServiceConfiguration));

    } catch (Throwable lockException) {

        // 释放job,等待下次被调用
        unacquireJob();

        return false;
    }

    return true;
}

 

真实加锁方法,主要代码在DefaultInternalJobManager类,如下:
protected void lockJobScopeInternal(Job job) {
    ExecutionEntityManager executionEntityManager = getExecutionEntityManager();
    ExecutionEntity execution = executionEntityManager.findById(job.getExecutionId());
    if (execution != null) {
        String lockOwner;
        Date lockExpirationTime;
        // 处理lockOwner与lockExpirationTime,省略
        
        executionEntityManager.updateProcessInstanceLockTime(execution.getProcessInstanceId(), lockOwner, lockExpirationTime);
    }

}

 

实际加锁,其实是数据库悲观锁,在MybatisExecutionDataManager类如下:
public void updateProcessInstanceLockTime(String processInstanceId, Date lockDate, String lockOwner, Date expirationTime) {
    HashMap<String, Object> params = new HashMap<>();
    params.put("id", processInstanceId);
    params.put("lockTime", lockDate);
    params.put("expirationTime", expirationTime);
    params.put("lockOwner", lockOwner);

    int result = getDbSqlSession().directUpdate("updateProcessInstanceLockTime", params);
    if (result == 0) {
        throw new FlowableOptimisticLockingException("Could not lock process instance");
    }
}

二、异步job执行

  2.1 流程图如下:

  • 注意,此图中的锁冲突,主要是多服务器并发捞取数据时,容易触发。

2.2 代码如下

2.2.1 入口

入口在AcquireAsyncJobsDueRunnable的run方法,主要代码如下:
public synchronized void run() {
    while (!isInterrupted) {

        // 全局锁——增加之后,所有实例中,锁过期前同一时间只有一个可以运行
        if (configuration.isGlobalAcquireLockEnabled()) {

        
        } else {
            // 循环执行
            millisToWait = executeAcquireCycle(commandExecutor);

        }

        // 等待
        if (millisToWait > 0) {
            sleep(millisToWait);
        }

    }
}

 

2.2.2run方法注入spring

配置在ProcessEngineAutoConfiguration类,方法如下:
springProcessEngineConfiguration(){
    AsyncExecutor springAsyncExecutor = asyncExecutorProvider.getIfUnique();
    if (springAsyncExecutor != null) {
        conf.setAsyncExecutor(springAsyncExecutor);
    }
}

 

启动在SpringProcessEngineConfiguration类,此类实现了spring的Lifecycle类,具体为start方法,层层堆叠如下:
public void start() {
    synchronized (lifeCycleMonitor) {
        if (!isRunning()) {
            enginesBuild.forEach(name -> {
                ProcessEngine processEngine = ProcessEngines.getProcessEngine(name);
                // 这里
                processEngine.startExecutors();
                autoDeployResources(processEngine);
            });
            running = true;
        }
    }
}


public void startExecutors() {
    if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {
        // 此处开启
        asyncExecutor.start();
    }
}

public void start() {
    if (isActive) {
        return;
    }

    isActive = true;

    LOGGER.info("Starting up the async job executor [{}] for engine {}", getClass().getName(), getJobServiceConfiguration().getEngineName());

    initializeJobEntityManager();
    // 初始化
    initializeRunnables();
    // 真实开启
    startAdditionalComponents();
    executeTemporaryJobs();
}

protected void startAdditionalComponents() {
    if (!isMessageQueueMode) {
        initAsyncJobExecutionThreadPool();
        // 开启异步任务方法
        startJobAcquisitionThread();
    }
}

protected void startTimerAcquisitionThread() {
    if (configuration.isTimerJobAcquisitionEnabled()) {
        if (timerJobAcquisitionThread == null) {
            timerJobAcquisitionThread = new Thread(timerJobRunnable);
        }
        
        // 开启
        timerJobAcquisitionThread.start();
    }
}

 

2.2.3 关于加锁

进入AcquireAsyncJobsDueRunnable类,逻辑如下:
protected long acquireAndExecuteJobs(CommandExecutor commandExecutor, int remainingCapacity) {
    boolean globalAcquireLockEnabled = configuration.isGlobalAcquireLockEnabled();
    try {
        List<? extends JobInfoEntity> acquiredJobs;
        // 获取并加锁
        acquiredJobs = commandExecutor.execute(new AcquireJobsCmd(asyncExecutor, remainingCapacity, jobEntityManager));

        // 执行
        List<JobInfoEntity> rejectedJobs = offerJobs(acquiredJobs);

        LOGGER.debug("Jobs acquired: {}, rejected: {}, for engine {}", acquiredJobs.size(), rejectedJobs.size(), getEngineName());
        

    } catch (FlowableOptimisticLockingException optimisticLockingException) {

    } catch (Throwable e) {
        LOGGER.warn("exception for engine {} during async job acquisition: {}", getEngineName(), e.getMessage(), e);
    }

    return asyncExecutor.getDefaultAsyncJobAcquireWaitTimeInMillis();
}

 

看一下AcquireJobsCmd类,代码如下:
public List<? extends JobInfoEntity> execute(CommandContext commandContext) {
    int maxResults = Math.min(remainingCapacity, asyncExecutor.getMaxAsyncJobsDuePerAcquisition());
    List<String> enabledCategories = asyncExecutor.getJobServiceConfiguration().getEnabledJobCategories();
    // 查询数据库
    List<? extends JobInfoEntity> jobs = jobEntityManager.findJobsToExecute(enabledCategories, new Page(0, maxResults));

    for (JobInfoEntity job : jobs) {
        // 加锁
        lockJob(job, asyncExecutor.getAsyncJobLockTimeInMillis(), asyncExecutor.getJobServiceConfiguration());
    }

    return jobs;
}

protected void lockJob(JobInfoEntity job, int lockTimeInMillis, JobServiceConfiguration jobServiceConfiguration) {
    GregorianCalendar gregorianCalendar = calculateLockExpirationTime(lockTimeInMillis, jobServiceConfiguration);
    job.setLockOwner(asyncExecutor.getLockOwner());
    job.setLockExpirationTime(gregorianCalendar.getTime());
}

 

到这里,实际上并没有数据库操作,但是注意,flowable的select方法,会把查出来的数据,放入缓存中。且刚刚我们结果的是外层被命令模式封装的责任链,所以,可以知道业务代码处理完,会执行责任链后置代码,具体入库为CommandContextInterceptor类execute中的commandContext.close()代码,此方法内会冲刷session(flushSessions方法)。我们直接看dbsqlSession的处理
public void flush() {
    // 此方法把缓存中修改过的对象,组装为update方法
    determineUpdatedObjects(); 
    removeUnnecessaryOperations();

    if (LOGGER.isDebugEnabled()) {
        debugFlush();
    }

    flushInserts();
    // 更新数据
    flushUpdates();
    flushDeletes();
}

 

至此,我们只要再看下flushUpdates即可,代码如下:
protected void flushUpdates() {
    for (Entity updatedObject : updatedObjects) {
        // 执行变更
        int updatedRecords = sqlSession.update(updateStatement, updatedObject);
        // 变更失败获取锁失败
        if (updatedRecords == 0) {
            throw new FlowableOptimisticLockingException(updatedObject + " was updated by another transaction concurrently");
        }

    }
    updatedObjects.clear();
}

三、关于全局锁

3.1 异步job的lockOwner设定

关于job的lockOwner,如果是机器A从timeJob捞起来,满足条件的数据,在插入job表是会直接在本机设定,然后注册一个事务监听器。job入库事务提交后,还是机器A来执行job,这个时候,job的死循环,不回拉到这个刚刚timeJob捞起过的数据。 现在看起来,以下两种场景,lockOwner会为空: 1、独占任务并发执行时,没有抢到流程实例锁的任务,重新插入时,lockOwner为空 2、异步线程池满了,新任务插入时,报错被捕获,此时会情况lockOwner

3.2 全局锁对性能的影响分析

基于异步job的lockOwner设定,当任务执行时间比较密集。比如同一秒有10000个需要执行的任务时,此时假定我们有5台服务器,单服务器的异步线程池队列长度为200。那么无论对job还是timeJob,都有大量待认领的任务。 我们可以简单的把一次拉起带执行job并操作的过程,分为三步:
  1. 获取带执行任务(批量,默认500)
  2. 加锁,指定此任务由本机执行。
  3. 交给本机异步线程池执行
不开全局锁的时候,多个服务器执行步骤1时,可能会拉取到同样的数据。此时他们会尝试以update xx where id=xx;这种格式,对500条sql进行提交。此时有可能多台机器都在和数据库交互。因为每次update语句时,事物还没有提交,所以当前逻辑暂时都没有问题。但是最终事物提交时,永远只有一个能成功提交。此时数据库开始回滚。
当我们机器足够多,且带执行任务足够多时,上述情况大概率发生。而实际上步骤3因为只要交给异步线程池即可结束,并非耗时操作。综上理解为,全局锁应该可以解决1、2中的锁冲突,从而提升性能。
再具体一些,如果我们的待执行任务不多,那么可以理解为同一时间,只要有一个机器的job处理器在拉取任务,放入自身的异步队列,就可以处理完所有的job,那么此时全局锁开启其实不会拖慢性能。 相对应的,如果带执行任务足够多,多机器并行时,比如AB两台机器,不开全局锁,可能节约的时间为A机器执行到步骤3时,B机器执行步骤1拉取。但是可能面对的问题是A机器执行步骤3之前,比如1或2时,B机器已经开始拉取数据,此时B机器执行步骤锁定job时会出现锁冲突。 即我们可以通过1/2/3步骤,每一步的耗时比,来评判全局锁的性能收益。但是考虑到1/2步骤时数据库操作,且当数据量为500条时步骤2为update xx where id=xx;*500次的数据库操作,而步骤3为内存操作。所以暂时任务步骤1&2的耗时>>步骤3的耗时,所以全局锁可以产生全局正面收益。