JobPlus知识库 IT 其它 文章
Node 异步编程

函数式编程

函数式编程是异步编程的基础,在JS中,将函数作为参数,返回值,都是可以的。这为我们使用回调函数打下了很好的基础。

var points = [40, 100, 1, 5, 25, 10]; 

points.sort(function(a, b) 

{   

    return a - b; 

}); // [ 1, 5, 10, 25, 40, 100 ]

var isType = function (type) {   

    return function (obj) {     

        return toString.call(obj) == '[object ' + type + ']';   

    }; 

};  

var isString = isType('String'); 

var isFunction = isType('Function');

难点

异常处理

由于异步,try/catch这样的代码不会起到任何作用。
所以一般对回调函数都要求第一个参数接收错误信息。
在我们自行编写的异步方法上,也要遵循这个原则,在调用传进来的回调时,第一个参数传异常。
我们的异步方法:

var async = function (callback) {   

    process.nextTick(function() {     

        var results = something;     

        if (error) {       

            return callback(error);     

        }     

        callback(null, results);   

    }); 

};

函数嵌套过深

这个在各种有回调机制的编程中都是个问题,比如iOS。

多线程编程

由于JS执行在单线程,Node实际上没有充分利用多核CPU的性能。在浏览器端存在同样的问题,因此出现了Web Worker来将于UI无关的计算任务交给其他线程。
Node借鉴了这一点,使用child_process和cluster模块来完成这些。

异步编程解决方案

主要有3种解决方案:

  • 事件发布/订阅模式
  • Promise/Deferred模式
  • 流程控制库

事件发布/订阅模式

这个是用途最广的异步方式,操作也很简单,这里我们使用Node自带的events模块。
首先你订阅一个事件,并定义在这个事件发生时你会做什么的回调函数。
然后在事件发生时你触发它。

var emitter = require('events'); 

var myEmitter = new emitter();

myEmitter.on("event1", (message1,message2) => {   

    console.log(message1);   

    console.log(message2);   

}); // 发布 

myEmitter.emit('event1', "I am message1!", "I am message2!");

这是一个很好的解耦逻辑的方式。也是一个很好的封装的方式,只将像暴露的过程和信息通过事件发布的方式告诉外面,外面想使用就只能通过订阅的方式。

继承events

Node中近半数的模块都继承自EventEmitter类,这样方便使用事件。

const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();

myEmitter.on('event', () => {

  console.log('an event occurred!');

});

myEmitter.emit('event');

或者老一点的办法:

//继承EventEmitter模块,使得自己的类也可以发布事件

var util = require('util');

function Stream() {   

    emitter.EventEmitter.call(this); 

util.inherits(Stream, emitter.EventEmitter); 

var myStream = new Stream();

myStream.on("event1", (message) => {   

    console.log(message);     

}); // 发布 

myStream.emit('event1', "I am stream!");

利用事件队列解决雪崩问题

比如对于一次数据库的查询:


var select = function (callback) {   

    db.select("SQL", function (results) {    

        callback(results);   

    }); 

};

这个数据库的查询在没有缓存硬执行时,当请求数量非常大时是对性能影响非常大的,它们查询的是同一个语句,我们只要保证给他们的数据是最新的就行。

这时我们可以使用状态锁:

var status = "ready"; 

var select = function (callback) {   

    if (status === "ready") {     

        status = "pending";     

        db.select("SQL", function (results) {      

            status = "ready";       

            callback(results);     

        });   

    } 

};

但是在这种情况下,连续多次的调用select()时,第前一次完成前,后面的是不会被执行的。

这时加入事件队列就很不错,通过once添加的监听器只能执行一次,在执行后就会将它与关联的事件移除:

var proxy = new events.EventEmitter(); 

var status = "ready"; 

var select = function (callback) {   

    proxy.once("selected", callback);   

    if (status === "ready") {     

        status = "pending";     

        db.select("SQL", function (results) {       

            proxy.emit("selected", results);       

            status = "ready";     

        });   

    } 

};

每一次select()被调用时,不管上一次是否执行完,我们都把它压入到一个事件队列中。此时如果上次查询已经执行完了,那么这次查询会立即执行;如果没有执行完,那么这次查询会像上面一样不执行,但是这里不同的是,这个查询的请求没有被忽略,而是在事件队列中等待。
在一个查询执行完并返回结果的时候,这个结果会传给在队列中等待的所有查询请求的回调。这就意味着在这次查询结果产生前进来的所有的查询请求就都得到了结果。
多异步之间的协作方案
有时需要等待多个事件完成才能继续下去,这多个事件很可能没有顺序,没有关联。这时事件与监听器的关系其实是多对一的,最简单直观的办法是使用多层嵌套,一个完成了再执行另一个,可是这样无论从效率还是代码上都是不好的。我们可以使用哨兵变量:

var count = 0; 

var results = {}; 

var done = function (key, value) {   

    results[key] = value;   

    count++;   

    if (count === 3) {    

        render(results);   

    } 

};  

fs.readFile(template_path, "utf8", function (err, template) {  

    done("template", template); 

}); 

db.query(sql, function (err, data) {   

    done("data", data); 

}); 

l10n.get(function (err, resources) {   

    done("resources", resources);  

});

这样就可以同步的执行它们了。

如果你想要可定制次数的done函数,这里利用了闭包来保存count的状态:

var after = function (times, callback) {   

    var count = 0, results = {};   

    return function (key, value) {     

        results[key] = value;     

        count++;     

        if (count === times) {       

            callback(results);     

        }   

    }; 

};  

var done = after(times, render);

通过简单的拓展,多对多的形式也可以实现:

var after = function (times, callback) {   

    var count = 0, results = {};   

    return function (key, value) {     

        results[key] = value;     

        count++;     

        if (count === times) {       

            callback(results);     

        }   

    }; 

};  

var done = after(3, render); 

var emitter = new events.Emitter(); 

emitter.on("done", done); 

emitter.on("done", other);  

fs.readFile(template_path, "utf8", function (err, template) {   

    emitter.emit("done", "template", template); 

}); 

db.query(sql, function (err, data) {   

    emitter.emit("done", "data", data); 

}); 

l10n.get(function (err, resources) {   

    emitter.emit("done", "resources", resources); 

});

这样在done事件发生时可以同时触发other回调。

Promise/Deferred模式

这个模式允许你在不完全设置好回调的情况下进行异步调用。使用起来更加灵活。
最后的调用看起来是这样的,我们使用jq来举个例子:

$.when(d)   .done(function(){ alert("哈哈,成功了!"); })   .fail(function(){ alert("出错啦!"); });

这样是不是更加方便易读了,而且此时就算你不指定done和fail的回调,d这个函数也会照样完成它的工作。如果你想有多个回调,由于实现了链式调用,继续在后面点就好了。
我们在node里以event为基础来实现它:

//首先promise继承event,我们将在promise里完成事件的订阅和发布

//这里定义了3个方法,done,fail和它们的组合then

//done和fail都使用once来绑定事件,保证其只执行一次

//新定义的这三个方法完成了事件的订阅,并且都返回了自己以完成链式调用

var Promise = function () {   

    emitter.EventEmitter.call(this);

}; 

util.inherits(Promise, emitter.EventEmitter);  

Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) {   

    if (typeof fulfilledHandler === 'function') {       

        this.once('success', fulfilledHandler);   

    }   

    if (typeof errorHandler === 'function') {   

        this.once('error', errorHandler);   

    }   

    if (typeof progressHandler === 'function') {     

        this.on('progress', progressHandler);   

    }   

    return this; 

};

Promise.prototype.done = function (fulfilledHandler) {   

    if (typeof fulfilledHandler === 'function') {       

        this.once('success', fulfilledHandler);   

    }   

    return this; 

};

Promise.prototype.fail = function (errorHandler) {   

    if (typeof errorHandler === 'function') {   

        this.once('error', errorHandler);   

    }   

    return this; 

};

可以看到,这里这三个新的自定义方法的作用就是将各个处理函数存起来,这里整个事件有3个状态:处理中,完成和失败。我们还需要一个对象来触发事件状态的转变:

//deferred这个对象是触发事件状态转变的地方

var Deferred = function () {   

    this.state = 'unfulfilled';  

    this.promise = new Promise(); 

};  

Deferred.prototype.resolve = function (obj) {   

    this.state = 'fulfilled';   

    this.promise.emit('success', obj); 

};  

Deferred.prototype.reject = function (err) {   

    this.state = 'failed';   

    this.promise.emit('error', err); 

};  

Deferred.prototype.progress = function (data) {   

    this.promise.emit('progress', data); 

};

啊,这样我们真正的业务逻辑就可以直接调用这几个方法来触发各个状态了。

为了通用我们可以做一个简单的工具函数:


var when = function (func) {

    var defrred = new Deferred();

    func(defrred);

    return defrred.promise;

};

我们真正的业务逻辑就是func了,我们把defrred对象传给我们的业务逻辑,业务逻辑就可以根据自己的需要来触发事件了。我们这里只返回defrred.promise,不把整个defrred暴露给外面,这样外面就不能随便调用defrred来转变事件的状态了。

我们假设一个业务逻辑:

function eat(dfd) {

    console.log('give me apple or beef');

    var tasks = function(){

        if (food=='apple') {

            console.log('you give me apple');

            dfd.reject();

        } 

        if (food=='beef') {

            console.log('you give me beef');

            dfd.resolve();

        }

    };

    setTimeout(tasks,5000);

}

这里使用settimeout来模拟异步。

调用:

when(eat)

    .done(function(){ console.log('I eat beef'); })

    .fail(function(){ console.log('I throw apple'); });

这里就算不给done和fail,eat还是会正常执行,就是会触发没有的事件而已。


多异步协作


这里就又要提到一对多,多对一,多对多。

一对多我们可以直接实现:

when(eat)

    .done(function(){ console.log('I eat beef'); })

    .done(function(){ console.log('I eat beefaaa'); })

    .fail(function(){ console.log('I throw apple'); });

多对一我们就需要新的方法了:

Deferred.prototype.all = function (promises) { 

    var count = promises.length;   

    var that = this;   

    var results = [];   

    promises.forEach(function (promise, i) {     

        promise.then(function (data) {       

            count--;       

            results[i] = data;       

            if (count === 0) {         

                that.resolve(results);       

            }     

        }, function (err) {       

            that.reject(err);     

        });   

    });   

    return this.promise; 

};

在每一个异步事件执行成功检查一下是否所有的都执行完了,都执行完了就调用顶上的全部执行完的方法,有一个fail了就调用顶上的fail方法。

使用after方法封装一下:


var after = function(promises) {

    var defrred = new Deferred();

    return defrred.all(promises);

}

使用:

after([when(eat),when(drink)])

    .done(function(){ console.log('I eat all'); })

    .fail(function(){ console.log('I don\'t like one of those'); })

流程控制库

这种方式并不是规范中主流的方式,但是相应的也更加的灵活。

我们来看看最流行的流程控制模块async。

异步的串行

var async = require("async");

var fs = require("fs");

async.series([function (callback) {  

        console.log("reading 1");   

        fs.readFile('file1.txt', 'utf-8', callback); 

    }, function (callback) {     

        //fs.readFile('file2.txt', 'utf-8', callback);   

        console.log("reading 2");

        callback("2222222","33333");

    }], function (err, results) {   

        // results => [file1.txt, file2.txt] 

        console.log("results:"+results);

    });

这个series方法会依次执行数组里的函数,这里的callback是由async通过高阶函数的方式注入,每一个函数里会有callback来接收这个函数要返回的结果,无论是同步的还是异步的,无论执行的快或慢,series都会按照顺序来执行这些函数并将这些结果存在数组中,在所有函数执行完之后,这个数组就可以使用了。

异步的并行执行

async.parallel([   function (callback) {     

        fs.readFile('file1.txt', 'utf-8', callback);   

    },   

    function (callback) {     

        fs.readFile('file2.txt', 'utf-8', callback);   

    } ], 

    function (err, results) {   // results => [file1.txt, file2.txt] 

        console.log("results:"+results);

    });

异步调用的依赖处理

当前一个结果是后一个调用的输入时,问题就不能用series来解决了。

async.waterfall([   function (callback) {     

        //file1.txt里下一个文件的地址

        fs.readFile('file1.txt', 'utf-8', function (err, content) { callback(err, content); });   

    },  function (arg1, callback) {     // arg1 => file2.txt     

        fs.readFile(arg1, 'utf-8', function (err, content) { callback(err, content); }); 

    }], function (err, result) {    

        // result => file2.txt 

        console.log("results:"+result);

    });

自动依赖处理
这里就是async强大的地方了,你给他一个你所有要做的事情的对象,以及每个事情依赖谁,它会自动帮你判断该怎么执行:

var deps = {   

    readConfig: function (callback) {     

        console.log("// read config file");     

        console.log(callback);

    },   

    connectMongoDB: ['readConfig', function (callback) {     

        console.log("// connect to mongodb");     

        console.log(callback);  

    }],   

    connectRedis: ['readConfig', function (callback) {     

        console.log("// connect to redis  ");   

        //callback();

    }],  

    complieAsserts: function (callback) {     

        console.log("// complie asserts");     

        //callback();   

    },   

    uploadAsserts: ['complieAsserts', function (callback) {     

        console.log("// upload to assert");    

        //callback();   

    }],   

    startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function (callback) {     

        console.log("// startup ");  

    }] 

};

async.auto(deps);

其他库

相应的还有其他可用的流程控制库,这里就不一一介绍了,比较有代表的是Step和Wind

异步并发控制

因为是异步的,所以当请求多的时候很可能给系统的低层服务带来很大的压力甚至直接崩溃。比如打开文件这个操作,以前同步的时候同时打开的文件不会太多,都是完成一个再去打开另一个。但是异步时可能一下打开了大量的文件,从而引发问题。

所以我们应该对异步调用的数量做一定的限制。

在async中专门有一个方法来有限制的执行异步调用:

async.parallelLimit([   function (callback) {     

        fs.readFile('file1.txt', 'utf-8', callback);   

    },   function (callback) {     

        fs.readFile('file2.txt', 'utf-8', callback);   

    } ], 1, function (err, results) {   

        console.log("resultsLimit:"+results);

    });


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
471人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序