一尘不染

Spring Batch在集群环境中正确重启未完成的作业

spring-boot

我使用以下逻辑在单节点Spring Batch应用程序上重新启动未完成的作业:

public void restartUncompletedJobs() {

    try {
        jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

        List<String> jobs = jobExplorer.getJobNames();
        for (String job : jobs) {
            Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);

            for (JobExecution runningJob : runningJobs) {
                runningJob.setStatus(BatchStatus.FAILED);
                runningJob.setEndTime(new Date());
                jobRepository.update(runningJob);
                jobOperator.restart(runningJob.getId());
            }
        }
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    }
}

现在,我正在尝试使其在两节点群集上运行。每个节点上的两个应用程序都将指向共享的PostgreSQL数据库。

让我们看看下面的例子:我有2个作业实例-在jobInstance1运行,现在就node1jobInstance2上运行node2。执行Node1期间由于某种原因重新启动jobInstance1。后node1重新启动Spring批处理应用程序尝试重新启动与上面给出逻辑未完成任务-
它看到有2个未完成的作业实例-
jobInstance1jobInstance2(这是正常运行的node2),并尝试重新启动它们。用这种方式代替重新启动唯一jobInstance1-它将重新启动jobInstance1jobInstance2..但jobInstance2不应重新启动,因为它现在可以正确执行node2

如何在应用程序启动期间正确地重新启动未完成的作业(在上一个应用程序终止之前),并防止类似的作业jobInstance2也重新启动的情况?

更新

这是下面答案中提供的解决方案:

Get the job instances of your job with JobOperator#getJobInstances

For each instance, check if there is a running execution using JobOperator#getExecutions.

2.1 If there is a running execution, move to next instance (in order to let the execution finish either successfully or with a failure)

2.2 If there is no currently running execution, check the status of the last execution and restart it if failed using JobOperator#restart.

我有一个关于#2.1的问题-在应用程序重新启动后,Spring Batch将自动以运行中的执行方式重新启动未完成的作业,还是我需要执行手动操作?


阅读 943

收藏
2020-05-30

共1个答案

一尘不染

您的逻辑不是重新启动未完成的作业。您的逻辑是采用当前正在运行的作业执行,将其状态设置为FAILED并重新启动它们。您的逻辑不应找到 正在运行的
执行,而应查找当前 运行的执行,尤其是失败的执行,然后重新启动它们。

如何正确重新启动失败的作业并防止像jobInstance2之类的作业也重新启动的情况?

在伪代码中,您需要执行以下操作:

  1. 通过获取工作的工作实例 JobOperator#getJobInstances
  2. 对于每个实例,请使用来检查是否有正在运行的执行JobOperator#getExecutions

2.1如果有运行中的执行,请移至下一个实例(以使执行成功或失败完成)

2.2如果当前没有正在运行的执行,请检查上一次执行的状态,如果使用失败则重新启动它JobOperator#restart

在您的情况下:

  • jobInstance1 应该在步骤2.2中重新启动
  • jobInstance2 应该在步骤2.1中进行过滤,因为在节点2上有正在运行的执行。
2020-05-30