Source: service/processJob.js


// const Promise = require('bluebird');
const debug = require('debug')('app:service:processJob');

module.exports = (services, config, models, core) => {
  const {
    TransactionInstance,
  } = models;

  /**
   * @class ProcessJob
   * 对外支持的队列的相关服务
   */
  class ProcessJob {
    /**
     * ProcessJob.init
     * 处理队列 init 分组的逻辑, 将实例状态改为 运行
     *
     * @static
     * @param {object} job 队列信息
     *
     * @return {null}
     */
    static async init(job) {
      const { data } = job;
      const tranId = data.id;

      try {
        const tranIns = await TransactionInstance.findByPk(tranId);

        if (!tranIns || tranIns.status !== TransactionInstance.STATUS_INIT) {
          throw new Error('未找到实例, 或者实例状态错误');
        }


        const updateData = {
          status: TransactionInstance.STATUS_RUNNING,
          step: TransactionInstance.STEP_TRYING,
        };

        const queueData = {
          id: tranIns.id,
        };

        await core.queue.add('running', queueData, tranIns.spacingMilliSeconds);
        await tranIns.update(updateData);
        debug(`${tranId} init success`);
        return 'ok';
      } catch (error) {
        debug(`${tranId} init error`);
        return Promise.reject(error);
      }
    }


    /**
     * ProcessJob.running
     * 处理队列 running 分组的逻辑, 激活事务的逻辑处理
     *
     * @static
     * @param {object} job 队列信息
     *
     * @return {null}
     */
    static async running(job) {
      const { data } = job;
      const tranId = data && data.id;

      try {
        // 防止重复执行
        const runningCache = `system:running-${tranId}`;
        const ttl = 10000;

        const hasExpireKey = await core.cache.has(runningCache);
        if (hasExpireKey) {
          throw new Error('还在执行中重复执行');
        }

        await core.cache.set(runningCache, 1, ttl);

        const tranIns = await TransactionInstance.findByPk(tranId);

        if (!tranIns || tranIns.status !== TransactionInstance.STATUS_RUNNING) {
          throw new Error('未找到实例, 或者实例状态错误');
        }

        const changeTranIns = await services.TranTcc.try2NextStep(tranId);

        // 再次放回执行队列中
        if (changeTranIns.status === TransactionInstance.STATUS_RUNNING) {
          // 几秒种后再次执行
          const queueData = {
            id: tranId,
          };

          await core.cache.del(runningCache);
          await core.queue.add('running', queueData, tranIns.spacingMilliSeconds);
        }
        return 'ok';
      } catch (error) {
        debug(`${tranId} running error`);
        return Promise.reject(error);
      }
    }
  }

  return ProcessJob;
};