Skip to content

Latest commit

 

History

History
576 lines (435 loc) · 18.8 KB

探索RxJS-Observable.md

File metadata and controls

576 lines (435 loc) · 18.8 KB

Table of Contents generated with DocToc

探索 RxJS - Observable

Observable是什么?

官方的解释:

The Observable object represents a push based collection.

The Observer and Observable interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The Observable object represents the object that sends notifications (the provider); the Observer object represents the class that receives them (the observer).

也就是说,一个Observable类似一个集合或者说队列,用于表示一串的事件流。而观察者Observer注册监听Observable事件流上事件的触发。正因如此,Observable中的每个元素都是push属性的,即发生变化之后主动向外更新状态。

用一个表格来表示:

单一数据 大量数据
Pull Function Iterator(Array
Push Promise Observable

Pull&Push

PullPush是 “数据提供者” 和 ”数据消费者“ 之间的沟通协议。

数据提供者 数据消费者
Pull 被动:只有被调用时才提供数据 主动:决定什么时候需要数据
Push 主动:自主决定何时提供数据 被动:根据数据的变化进行反应
  • Pull

对于Pull系统而言,由 ”数据消费者“ 来决定什么时候从 ”数据提供者“ 那里获取数据,而 ”数据提供者“ 本身并不知道什么时候把数据传递给消费者(提供者只有被动的调用之后才会传递数据)

JavaScript 中的函数Function和迭代器Iterator都是Pull类型的对象,它们只有被调用之后才会返回数据给消费者。对于Function而言,调用函数的代码就是消费者;而对于Iterator而言,调用iterator.next()的代码是消费者。

  • Push

Push系统中,”数据提供者“ 决定了何时提供数据给 ”数据消费者“,而消费者并不知道数据什么时候会传输过来。

PromisePush系统的典型代表。它(Promise,数据提供者)将异步完成之后的数据提供给回调函数(Callback,数据消费者)

RxJS中的Observable也是 JavaScript 中的一种Push系统。一个Observable,也就是一串事件流,涵盖了多种/多个事件,因此是复数数据的提供者,将数据传输给观察者Observers(数据消费者)

进一步理解Observable

  • 监听一个流类似于调用一个方法

ObservableFunction有一个类似之处:它们都是懒执行的,即只有被调用了才会执行。对于Function而言,当你调用call的时候才会执行;而对于Observable而言,只有通过subscribe进行监听之后,事件流才会开始逐步执行:

// Function
function foo() {
  console.log('Hello');
  return 42;
}
var x = foo.call();
console.log(x);
// Hello
// 42

// Observable
var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});
foo.subscribe(function (x) {
  console.log(x);
});
// Hello
// 42

  • ObservableFunction的不同在于,Observable可以依次返回多个值
  • Observable即可异步也可同步
var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200); // "return" yet another
  setTimeout(() => {
    observer.next(300); // 异步输出
  }, 1000);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');
// before
// Hello
// 42
// 100
// 200
// after
// 300

之前说了,一个Observable是一串数据流,涵盖了多种/多个事件。它类似于 JavaScript 中的 Array,拥有自己的mapfilter等方法。对一个Observable调用map等方法,相当于对这个流上的各个事件进行遍历,之后创建一个新的流:

Observable

我们再来看一看ObservableArray间相似的方法:

var array = ['1', '2', 'foo', '5', 'bar'];
// Array
var result = array.map(x => parseInt(x)).filter(x => !isNaN(x));
console.log(result); // [1, 2, 5]

// Observable
// 通过循环9次,每次间隔400ms来遍历一个数组,以此构造出一个 Observable
var source = Rx.Observable.interval(400).take(9).map(i => array[i]);
// 对 Observable 调用方法,每个方法都会返回一个 Observable 对象,最终返回一个新的 Observable
var result = source.map(x => parseInt(x)).filter(x => !isNaN(x));
// 通过 subscribe 对 Observable 建立事件监听
result.subscribe(x => console.log(x));

通过subscribe方法,创建了对Observable的监听,并触发事件流,而每个事件的最终输出会作为subscribe中的回调函数参数代入。

创建Observable

  • just(obj) 将一个或多个对象转为Observable

just

var source = Rx.Observable.just(42);

var subscription = source.subscribe(
  function (x) { console.log('Next: %s', x); },
  function (err) { console.log('Error: %s', err); },
  function () { console.log('Completed'); });
// Next: 42
// Completed

from

// Array-like object (arguments) to Observable
function f() {
  return Rx.Observable.from(arguments);
}

f(1, 2, 3).subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });

// Next: 1
// Next: 2
// Next: 3
// Completed

// String
// 如果参数是字符串,则遍历其中的每个字段
Rx.Observable.from("foo").subscribe(
  function (x) { console.log('Next: ' + x); },
  function (err) { console.log('Error: ' + err); },
  function () { console.log('Completed'); });
// Next: f
// Next: o
// Next: o
// Completed
  • fromCallback(func) 以一个函数作为参数,通过回调函数来创建流。创建好的流可接收函数所需的参数,并返回新的流

fromCallback

var fs = require('fs'),
    Rx = require('rx');

// Wrap fs.exists
var exists = Rx.Observable.fromCallback(fs.exists);

// Check if file.txt exists
var source = exists('file.txt');

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: true
// Completed

fromEvent

该方法以一个节点元素和事件名称作为参数。其中,节点元素可以是 DOM 元素,或者 NodeList,或者 jQuery 元素、Zepto 元素、Angular 元素、Ember 元素、EventEmitter

// using a jQuery element
var input = $('#input');
// 创建了 input 点击事件监听的流
var source = Rx.Observable.fromEvent(input, 'click');

var subscription = source.subscribe(
    function (x) { console.log('Next: Clicked!'); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

input.trigger('click');
// Next: Clicked!
var promise = new RSVP.Promise(function (resolve, reject) {
   resolve(42);
});

var source = Rx.Observable.fromPromise(promise);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });
// Next: 42:
// Completed
  • of(a, b, c...)from不同的是,of方法需要将参数一个一个的传入

of

var source = Rx.Observable.of(1,2,3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: 1
// Next: 2
// Next: 3
// Completed
var arr = [1,2,3];
var source = Rx.Observable.ofArrayChanges(arr);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });

arr.push(4)
// Next: {type: "splice", object: Array[4], index: 3, removed: Array[0], addedCount: 1}
var obj = {x: 1};
var source = Rx.Observable.ofObjectChanges(obj);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });

obj.x = 42;
// Next: {type: "update", object: Object, name: "x", oldValue: 1}
  • pairs(obj) 以一个对象作为参数,创建一个可以遍历对象中键值对的流
var obj = {
  foo: 42,
  bar: 56,
  baz: 78
};

var source = Rx.Observable.pairs(obj);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (e) { console.log('Error: ' + e); },
    function ( ) { console.log('Completed'); });
// Next: ['foo', 42]
// Next: ['bar', 56]
// Next: ['baz', 78]
// Completed
  • defer(func) 直到创建了监听,才创建一个Observable

defer

/* Using an observable sequence */
// 不会立即创建流,但接受的方法里应该返回一个流
var source = Rx.Observable.defer(function () {
    return Rx.Observable.return(42);
});

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); } );
// Next: 42
// Completed
  • range(start, end) 实质上相当于from()方法接收了一个由数组组成的 Array

range

var source = Rx.Observable.range(0, 3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: 0
// Next: 1
// Next: 2
// Completed
  • interval(ms) 创建 Observable 的时候指定每次执行事件时的时间间隔

interval

通常后面会跟随take()方法,指定调用次数

var source = Rx.Observable
    .interval(500 /* ms */)
    .timeInterval()
    .take(3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });
// Next: {value: 0, interval: 500}
// Next: {value: 1, interval: 500}
// Next: {value: 2, interval: 500}
// Completed

timer

const start = 5;
Rx.Observable
  .timer(100, 100) // 开始流之前延迟100ms,开始之后每个事件间隔100ms
  .map(i => start - i)
  .take(start + 1)
  .subscribe(i => console.log(i));
// 5
// 4
// 3
// 2
// 1
// 0

repeat

var source = Rx.Observable.repeat(42, 3);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: 42
// Next: 42
// Next: 42
// Completed
  • doWhile(func) 接收一个方法作为参数,并不断调用该方法。如果该方法返回true,则执行事件;否则不执行事件

doWhile

var i = 0;

var source = Rx.Observable.return(42).doWhile(
    function () { return ++i < 2; });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: 42
// Next: 42
// Completed

while

var i = 0;

// Repeat until condition no longer holds
var source = Rx.Observable.while(
    function () { return i++ < 3 },
    Rx.Observable.return(42)
);

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });
// Next: 42
// Next: 42
// Next: 42
// Completed

start

var source = Rx.Observable.start(
    function () {
        return '123';
    }
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    }
// Next: 123
  • startAsync() 类似于start(),但是以一个异步方法作为参数

  • startWith(a, b, c...) 以一个或多个对象作为参数,流将会以它们作为开头

var source = Rx.Observable.return(4)
    .startWith(1, 2, 3)

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

// => Next: 1
// => Next: 2
// => Next: 3
// => Next: 4
// => Completed

Observable的操作

详见:Reactivex - operators

汉化版可见:

利用do/tap来做一些无关事件流的处理

你可以在do或者tab方法里做一些无关事件流的事件,比如输出 log 或者操作 DOM:

var result = xs
  .filter(function (x) { return x.failed; })
  .tap(function (x) { log(x); });

监听Observable

要监听一个流,必须创建一个观察者,然后调用subscribe方法。在subscribe方法中,有下列三种回调:

  • onNext 当流中的某个事件被执行时所触发的回调,参数是事件触发之后的返回值
  • onError 事件执行报错时所触发的回调,参数是一个 Error
  • onCompleted 当一个 onNext 执行完毕且没有报错后的回调

当一个流开始执行时,会触发onNext零次或多次,之后会调用onError或者流结束时触发onCompleted方法,但不会两个都调用。你可以在onError或者onCompleted进行清理工作。

除了直接代入三个方法以外,还可以代入一个 Object,但要注意的是,目前仅 RxJS 5 版本支持:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};
Observable.subscribe(observer);

observer内的nexterrorcomplete分别对应着onNextonErroronCompleted状态

取消监听

对流进行监听之后,返回一个subscription,通过调用其dispose方法可以取消监听,并且停止所有事件:

console.clear();
var array = ['1', '2', 'foo', '5', 'bar'];
var source = Rx.Observable.interval(1000).take(9).map(i => array[i]);

var result = source.map(x => parseInt(x)).filter(x => !isNaN(x));

var subscription = result.subscribe(x => console.log(x), err => console.log(err), () => console.log('completed'));

setTimeout(() => {
  console.log('unsubscribe');
  subscription.dispose();
}, 1000);

当调用dispose()方法时,流上还没有触发的事件不会再触发,而已经在进行中的事件则不会被打断,而是继续进行直到完成或报错。但是,那些还在进行的方法即便完成了,也不会触发观察者的回调。

扩展阅读