学习了Observables之后,我发现它们与Node.js流非常相似。两者都有一种机制,可在新数据到达,发生错误或没有更多数据(EOF)时通知使用者。
我很想了解两者之间的概念/功能差异。谢谢!
无论 观测量 和node.js中的 流 让你解决同样的根本问题:异步处理值的序列。我认为,两者之间的主要区别与激发其外观的环境有关。该上下文反映在术语和API中。
在 Observables 方面,您对EcmaScript进行了扩展,引入了反应式编程模型。它试图填补值生成和异步之间的间隙用的极简和可组合的概念Observer和Observable。
Observer
Observable
在node.js和 Streams 方面,您想要创建一个接口,用于网络流和本地文件的异步处理和高性能处理。从初始上下文的术语派生,你会得到pipe,chunk,encoding,flush,Duplex,Buffer,等由于具有务实的做法,提供了特殊的用例,你失去了一些能力,撰写的东西,因为它不是为统一明确的支持。例如,您push在Readable流write上使用,Writable尽管从概念上讲,您在做相同的事情:发布值。
pipe
chunk
encoding
flush
Duplex
Buffer
push
Readable
write
Writable
因此,在实践中,如果你看的概念,如果你使用的选项{ objectMode: true },可以匹配Observable与Readable流和Observer与Writable流。您甚至可以在两个模型之间创建一些简单的适配器。
{ objectMode: true }
var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util'); var Observable = function(subscriber) { this.subscribe = subscriber; } var Subscription = function(unsubscribe) { this.unsubscribe = unsubscribe; } Observable.fromReadable = function(readable) { return new Observable(function(observer) { function nop() {}; var nextFn = observer.next ? observer.next.bind(observer) : nop; var returnFn = observer.return ? observer.return.bind(observer) : nop; var throwFn = observer.throw ? observer.throw.bind(observer) : nop; readable.on('data', nextFn); readable.on('end', returnFn); readable.on('error', throwFn); return new Subscription(function() { readable.removeListener('data', nextFn); readable.removeListener('end', returnFn); readable.removeListener('error', throwFn); }); }); } var Observer = function(handlers) { function nop() {}; this.next = handlers.next || nop; this.return = handlers.return || nop; this.throw = handlers.throw || nop; } Observer.fromWritable = function(writable, shouldEnd, throwFn) { return new Observer({ next: writable.write.bind(writable), return: shouldEnd ? writable.end.bind(writable) : function() {}, throw: throwFn }); }
您可能已经注意到,我改变了一些名字和使用的简单的概念Observer和Subscription,介绍到这里,以避免做reponsibilities超负荷 观测量 在Generator。基本上,Subscription您可以取消订阅Observable。无论如何,使用上述代码,您可以拥有一个pipe。
Subscription
Generator
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与相比process.stdin.pipe(process.stdout),您拥有的是一种组合,过滤和转换流的方法,该方法也适用于任何其他数据序列。您可以使用Readable,Transform和Writable流实现此功能,但API倾向于使用子类而不是Readables和应用函数。Observable例如,在模型上,转换值对应于将转换器函数应用于流。不需要的新子类型Transform。
process.stdin.pipe(process.stdout)
Transform
Observable.just = function(/*... arguments*/) { var values = arguments; return new Observable(function(observer) { [].forEach.call(values, function(value) { observer.next(value); }); observer.return(); return new Subscription(function() {}); }); }; Observable.prototype.transform = function(transformer) { var source = this; return new Observable(function(observer) { return source.subscribe({ next: function(v) { observer.next(transformer(v)); }, return: observer.return.bind(observer), throw: observer.throw.bind(observer) }); }); }; Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify) .subscribe(Observer.fromWritable(process.stdout))
结论?在Observable任何地方都可以轻松引入反应模型和概念。围绕该概念实现整个库比较困难。所有这些小功能都需要一致地协同工作。毕竟,ReactiveX项目仍在进行中。但是,如果您确实需要将文件内容发送到客户端,进行编码并压缩,则可以在NodeJS中使用它的支持,并且效果很好。