Source: service/tranTcc.js


const Promise = require('bluebird');
const debug = require('debug')('app:service:tranTcc');

const ActionProgress = require('../help/ActionProgress');

module.exports = (services, config, models, core) => {
  const {
    Action,
    ActionLog,
    Project,
    Process,
    TransactionInstance,
  } = models;

  /**
   * @class TranTcc
   * 对外支持的 Tcc 的相关服务
   */
  class TranTcc {
    /**
     * TranTcc.createInstance
     * 创建一个事务实例
     *
     * @static
     * @param {string} projectKey 项目的key
     * @param {string} processKey 进程的key
     * @param {string} messageId  事务的唯一id
     * @param {object} payload    任务相关的参数
     *
     * @return {object} 事务实例
     */
    static async createInstance(projectKey, processKey, messageId, payload) {
      const projectInstance = await Project.findOneOrThrow({
        where: {
          key: projectKey,
        },
      });

      const processInstance = await Process.findOneOrThrow({
        where: {
          key: processKey,
        },
      });
      // 初始化数据 maxAttemptTime
      let tryActionsInfo = await Promise.map(processInstance.tryIds, async (itemId) => {
        const actionInstance = await Action.findByPkOrThrow(itemId);
        return actionInstance && new ActionProgress(actionInstance);
      });
      let confirmActionsInfo = await Promise.map(processInstance.confirmIds, async (itemId) => {
        const actionInstance = await Action.findByPkOrThrow(itemId);
        return actionInstance && new ActionProgress(actionInstance);
      });
      let cancelActionsInfo = await Promise.map(processInstance.cancelIds, async (itemId) => {
        const actionInstance = await Action.findByPkOrThrow(itemId);
        return actionInstance && new ActionProgress(actionInstance);
      });

      tryActionsInfo = tryActionsInfo.filter(item => item);
      confirmActionsInfo = confirmActionsInfo.filter(item => item);
      cancelActionsInfo = cancelActionsInfo.filter(item => item);

      const input = {
        title: `${projectInstance.name}-${processInstance.name}`,
        messageId,
        projectId: projectInstance.id,
        proccessId: processInstance.id,
        desc: '',
        status: TransactionInstance.STATUS_INIT,
        log: [],
        payload,
        tryIds: processInstance.tryIds,
        confirmIds: processInstance.confirmIds,
        cancelIds: processInstance.cancelIds,
        tryActionsInfo,
        confirmActionsInfo,
        cancelActionsInfo,
        spacingMilliSeconds: processInstance.spacingMilliSeconds,
      };

      const instance = await TransactionInstance.create(input);

      await core.queue.add('init', { id: instance.id });

      return instance;
    }

    /**
     * TranTcc.groupActionsExec
     * 执行一些 Action 的集合并获取结果
     *
     * @static
     * @param {Array.<object>} actions - 集合 [{currentAttemptTime: 当前尝试次数, id: actionId}]
     * @param {Object} payload         - 请求载体
     * @param {string} messageId       - 消息id/事务唯一标识
     * @param {number} projectId       - 项目id
     * @param {number} proccessId      - 流程程id
     *
     * @returns {Array.<object>} 处理结果集 [{id, success, currentAttemptTime }] 处理结果
     */
    static async groupActionsExec(
      actions,
      payload,
      messageId,
      projectId,
      proccessId,
    ) {
      const promiseActionLogs = actions.map((actionItem) => {
        const { currentAttemptTime, id: actionId } = actionItem;
        return TranTcc.singleActionExec(
          actionId,
          payload,
          messageId,
          projectId,
          proccessId,
          currentAttemptTime,
        );
      });

      const actionLogs = await Promise.all(promiseActionLogs);

      const resultInfo = actionLogs.map(item => ({
        id: item.actionId,
        success: item.isSuccess,
        currentAttemptTime: item.currentAttemptTime,
      }));

      return resultInfo;
    }

    /**
     * TranTcc.singleActionExec
     * 执行某个独立的action
     *
     * @static
     * @param {number} actionId - action id
     * @param {json} payload 载体
     * @param {string} messageId 消息id
     * @param {number} projectId 项目id
     * @param {number} proccessId 过程id
     * @param {number} currentAttemptTime 当前执行次数
     * @returns {Object} actionLogInstance actionLog的实例
     */
    static async singleActionExec(
      actionId,
      payload = {},
      messageId = '',
      projectId = 0,
      proccessId = 0,
      currentAttemptTime = 0,
    ) {
      try {
        let resultData;
        const actionInstance = await Action.findByPkOrThrow(actionId);

        // 发送成功
        switch (actionInstance.sendType) {
        case Action.SEND_TYPE_HTTP_REQUEST:
          resultData = await actionInstance.sendHttp(messageId, payload);
          break;
        default:
          break;
        }

        const isMatch = await actionInstance.isMatchResult(resultData);

        const actionLogInput = {
          messageId,
          projectId,
          proccessId,
          actionId,
          isSuccess: resultData && isMatch,
          payload,
          repData: resultData,
          currentAttemptTime: currentAttemptTime + 1,
        };

        const log = await ActionLog.create(actionLogInput);
        return log;
      } catch (error) {
        // 记录异常
        const actionLogInput = {
          messageId,
          projectId,
          proccessId,
          actionId,
          isSuccess: false,
          payload,
          errorMessage: error.toString(),
          currentAttemptTime: currentAttemptTime + 1,
        };

        const log = await ActionLog.create(actionLogInput);
        return log;
      }
    }

    /**
     * TranTcc.try2NextStep
     * 尝试进入下一个步骤, 当前步骤的所有状态满足特定条件
     *
     * @static
     * @param {number} tranId - 事务的实例id
     * @return {Object} 事务实例
     */
    static async try2NextStep(tranId) {
      const instance = await TransactionInstance.findByPkOrThrow(tranId);
      // 校验步骤状态
      const {
        // step,
        needExecActionsInfo,
      } = instance.getStepNeedExec();

      debug('needExecActionsInfo :%o', needExecActionsInfo);

      const {
        projectId,
        payload,
        proccessId,
        messageId,
      } = instance;

      const resultInfo = await TranTcc.groupActionsExec(
        needExecActionsInfo,
        payload,
        messageId,
        projectId,
        proccessId,
      );

      debug('resultInfo :%o', resultInfo);

      // updateData 更新数据
      const updateData = await instance.createGoNextData(resultInfo);

      debug('updateData :%o', updateData);
      await instance.update(updateData);

      return instance;
    }
  }

  return TranTcc;
};