深入理解Node.js异步流程控制async.auto

为了解决回调嵌套问题,在node.js中有很多方案,Async便是其中之一。

Async中,最常用的方法,应该就是async.auto了,它能够处理多个函数有依赖关系,有的并行执行,有的顺序执行。与其考虑使用async.series或者async.parallel等其他方法,不如都直接简单粗暴地使用async.auto

如果你有自己制作过JIRA里的工作流,或者使用过其他流程类工具,那就更加容易理解async.auto的工作原理了。

工作原理

FgsjT_iFzVYx-4wOYt8HcE6z9Su3

假如我们希望每个函数按如上的流程执行,用async实现如下:

const async = require('async');
async.auto({
    'task1': (callback) => {
        console.log('task1');
        callback();
    },
    'task2': (callback) => {
        console.log('task2');
        callback();
    },
    'task3': (results, callback) => {
        console.log('task3');
        callback();
    },
    'task4': ['task2', 'task3', (results, callback) => {
        console.log('task4');
        callback();
    }],
    'task5': ['task1', 'task4', (results, callback) => {
        console.log('task5');
        callback();
    }]
}, (err, results) => {
    console.log('end');
});

通过追溯async.auto的源码,可以发现其工作原理主要分为3步。

1.流程初始化

通过检测任务的依赖关系,初始化一些变量:

  • 初始任务。 即不依赖其他任务的任务,也就是task1, task2,task3
  • 依赖计数。 即每个任务所依赖的任务数。task5依赖2个,task4依赖2个。
  • 监听器。 即每个任务完成后,会触发的下个依赖它的任务。task4监听task2,task3。task5监听task1,task4
  • 任务队列。 即当轮到指定任务执行时,就将其放入队列中
  • 未执行任务。 即有依赖任务,但是非初始执行的任务

2.检测流程死循环

检测任务是否有循环依赖。确保整个流程正常结束,而不是进入死循环
Fpu83eQXnVXNHFmmFTtl1bCjNoYN

3.执行流程

Fjij1vVFXEfkrvo9bytSU2eu5nVT

总结

整个逻辑还是相对比较简单的,明白了其中的原理,就可以自己动手实现一个小小的工作流了:)

下面送上async.auto的精简版(整理自async.auto源码)

function auto(tasks, callback) {
    var taskQueue = [];
    var taskCheckStack = [];
    var results = {};
    var listeners = {};
    var hasError = false;
    var completeTask = 0;
    var uncheckedDependencies = {};
    var keys = Object.keys(tasks);
    var taskCount = keys.length;
    keys.forEach(function (key) {
        var task = tasks[key];
        if (!Array.isArray(task)) {
            enqueue(key, [task]);
            taskCheckStack.push(key);
            return;
        }
        var dependencies = task.slice(0, task.length - 1);
        var remainingDependencies = dependencies.length;
        if (remainingDependencies === 0) {
            enqueue(key, task);
            taskCheckStack.push(key);
            return;
        }
        uncheckedDependencies[key] = remainingDependencies;
        dependencies.forEach(function (dependent) {
            addListener(dependent, function () {
                if (--remainingDependencies === 0) {
                    enqueue(key, task);
                }
            });
        });
    });
    checkDeadLock();
    processQueue();
    //入队
    function enqueue(key, task) {
        taskQueue.push(function () {
            runTask(key, task);
        });
    }
    //处理队列
    function processQueue() {
        if (completeTask === taskCount) {
            return callback(null, results);
        }
        while(taskQueue.length) {
            var run = taskQueue.shift();
            run();
        }
    }
    //运行任务
    function runTask(key, task) {
        if (hasError) {
            return;
        }
        var taskFn = task[task.length - 1];
        var taskCallback = function (err, result) {
            results[key] = result;
            if (err) {
                hasError = true;
                return callback(err, results);
            }
            var listener = listeners[key];
            if (listener) {
                listener.forEach(function (fn) {
                    fn.apply(null, []);
                });
            }
            completeTask++;
            processQueue();
        };
        if (task.length > 1) {
            taskFn.apply(null, [results, taskCallback]);
        } else {
            taskFn.apply(null, [taskCallback]);
        }
    }
    //添加监听
    function addListener(key, fn) {
        var task = tasks[key];
        if (!task) {
            throw new Error('non-existent dependency:' + key);
        }
        var listener = listeners[key];
        if (!listener) {
            listener = listeners[key] = [];
        }
        listener.push(fn)
    }
    //检查死锁
    function checkDeadLock() {
        var currentTask;
        var counter = 0;
        while (taskCheckStack.length) {
          currentTask = taskCheckStack.pop();
          counter++;
          var dependents = getDependents(currentTask);
          dependents.forEach(function (dependent) {
            if (--uncheckedDependencies[dependent] === 0) {
                taskCheckStack.push(dependent);
            }
          });
        }
        if (counter !== taskCount) {
            throw new Error('auto cannot execute tasks due to a recursive dependency')
        }
    }
    //获取依赖该任务的任务
    function getDependents(taskName) {
      var ret = [];
      Object.keys(tasks).forEach(function (key) {
        var task = tasks[key];
        if (Array.isArray(task) && task.indexOf(taskName) > -1) {
          ret.push(key);
        }
      });
      return ret;
    }
}