一篇博客让你了解RxJava

更新时间:2016-12-20 21:07:16 点击次数:1953次

RxJava可以说是2016年流行的项目之一了,近也接触了一下RxJava,于是想写一篇博客,希望能通过这篇博客让大家能对其进行了解,本篇博客是基于RxJava2.0,跟RxJava1.0还是有很多不同的

基础知识

RxJava的核心就是“异步”两个字,其关键的东西就是两个:

  1. Observable(被观察者)

  2. Observer/Subscriber(观察者)

Observable可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer回调处理。

Observable可以理解为事件的发送者,就好像快递的寄出者,而这些事件就好比快递 
Observer可以理解为事件的接收者,就好像快递的接收者

那他们之间是如何进行联系的呢?答案就是通过subscribe()方法,下面的代码就是RXJAVA中Observable与Observer进行关联的典型方式:

//创建一个被观察者 Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(5);
            e.onNext(6);
            e.onNext(7);
            e.onNext(8);
            e.onComplete();
        }
    }); //创建观察者observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
        } @Override public void onNext(Integer value) {
            Log.d(TAG, value.toString());
        } @Override public void onError(Throwable e) {
            Log.d(TAG, "error");
        } @Override public void onComplete() {
            Log.d(TAG, "complete");
        }
    }; //建立关联 observable.subscribe(observer);
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

运行项目,我们可以看到,数字已经打印出来 
这里写图片描述

这里需要强调的是: 只有当观察者和被观察者建立连接之后, 被观察者才会开始发送事件. 也就是调用了subscribe()方法之后才开始发送事件.

上面我们看到观察者和被观察者的逻辑是分开写的,那能不能合在一起写呢?答案是肯定的,这也是RxJava比较突出的优点,那就是链式操作,代码如下:

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);
        e.onComplete();
    }
}).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    } @Override public void onNext(Integer value) {
        Log.d(TAG, value.toString());
    } @Override public void onError(Throwable e) {
        Log.d(TAG, "error");
    } @Override public void onComplete() {
        Log.d(TAG, "complete");
    }
}); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

有时候,你可能觉得,我就打印几个数,还要把Observable写的那么麻烦,能不能简便一点呢?答案是肯定的,RxJava内置了很多简化创建Observable对象的函数,比如Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行

Observable<String> observable = Observable.just("hello");
		
  • 1
  • 1

同样对于Observer,这个例子中,我们其实并不关心OnComplete和OnError,我们只需要在onNext的时候做一些处理,这时候就可以使用Consumer类。

Observable<String> observable = Observable.just("hello");
   Consumer<String> consumer = new Consumer<String>() { @Override public void accept(String s) throws Exception {
           System.out.println(s);
       }
   };
    observable.subscribe(consumer);
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这里写图片描述

其实在RxJava中,我们可以为 Observer中的三种状态根据自身需要分别创建一个回调动作,通过Action 来替代onComplete():,通过Consumer来替代 onError(Throwable t)和onNext(T t)

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() { @Override public void run() throws Exception {
            Log.i(TAG, "complete");
        }
    };
    Consumer<String> onNextConsumer = new Consumer<String>() { @Override public void accept(String s) throws Exception {
            Log.i(TAG, s);
        }
    };
    Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception {
            Log.i(TAG, "error");
        }
    };
    observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);

} 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

这里写图片描述

Observable.just同样可以发送多个参数

Observable observable = Observable.just("you", "are", "beautiful");
Consumer<String> onNextConsumer = new Consumer<String>() {
    @Override public void accept(String s) throws Exception { Log.i(TAG, s);
    }
};
observable.subscribe(onNextConsumer); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这里写图片描述

例子:来一个简单的例子来了解事件的产生到消费、订阅的过程:从res/mipmap中取出一张图片,显示在ImageView上。

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> e) throws Exception { // 从mipmap取出一张图片作为Drawable对象 Drawable drawable = ContextCompat.getDrawable(MainActivity.this, R.mipmap.ic_launcher); // 把Drawable对象发送出去 e.onNext(drawable);
        e.onComplete();
    }
}).subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) {

    } @Override public void onNext(Drawable value) {
        ivLogo.setImageDrawable(value);
    } @Override public void onError(Throwable e) {

    } @Override public void onComplete() {

    }
});
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

这样就完成了一个简单的图片的设置 
这里写图片描述

ObservableEmitter和Disposable

ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。 
注意:但是事件的发送是有一定的规定的,就好比寄快递也要有一定要求,不是什么都能寄的:

1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext. 
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件. 
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件. 
4.Observable可以不发送onComplete或onError. 
5.为关键的是onComplete和onError必须并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.当我们写多个onComplete时,不会报错

当我们又有onComplete又有onError时,发现在调用onComplete后会爆出异常

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);

        e.onError(new NullPointerException());
        e.onComplete();
    }
})
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这是onComplete在onError前调用的情况 
这里写图片描述

当我们写两个onError时,会先接受前面的所有事件,后才报错 
这里写图片描述

介绍了ObservableEmitter, 接下来介绍Disposable, 当调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件.

注意: 调用dispose()并不会导致Observable不再继续发送事件, Observable会继续发送剩余的事件. 
看一下下面这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emitter 1");
            emitter.onNext(1); Log.d(TAG, "emitter 2");
            emitter.onNext(2); Log.d(TAG, "emitter 3");
            emitter.onNext(3); Log.d(TAG, "emitter complete");
            emitter.onComplete(); Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        }
    }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int i;

        @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe");
            mDisposable = d;
        }

        @Override public void onNext(Integer value) { Log.d(TAG, "onNext: " + value);
            i++; if (i == 2) { Log.d(TAG, "dispose");
                mDisposable.dispose(); Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
            }
        }

        @Override public void onError(Throwable e) { Log.d(TAG, "error");
        }

        @Override public void onComplete() { Log.d(TAG, "complete");
        }
    });
}
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

打印如下: 
这里写图片描述

在收到onNext 2这个事件后, 我们中断了联系, 但是Observable 
仍然发送了3, complete, 4这几个事件, 而且Observable 
并没有因为发送了onComplete而停止. 同时可以看到Observer的onSubscribe()方法是先调用的.

subscribe()有多个重载的方法:

 public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {} 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

不带任何参数的subscribe() 表示Observer不关心任何事件,Observable发送什么数据都随你 
带有一个Consumer参数的方法表示Observer只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emitter 1");
            emitter.onNext(1); Log.d(TAG, "emitter 2");
            emitter.onNext(2); Log.d(TAG, "emitter 3");
            emitter.onNext(3); Log.d(TAG, "emitter complete");
            emitter.onComplete(); Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "onNext: " + integer);
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

其他方式也是类似的方式

线程调度

正常情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程发事件, Observer就在哪个线程接收事件. 
RxJava中, 当我们在主线程中去创建一个Observable来发送事件, 则这个Observable默认就在主线程发送事件. 
当我们在主线程去创建一个Observer来接收事件, 则这个Observer默认就在主线程中接收事件,但其实在现实工作中我们更多的是需要进行线程切换的,常见的例子就是在子线程中请求网络数据,在主线程中进行展示

要达到这个目的, 我们需要先改变Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变Observer的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName()); Log.d(TAG, "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName()); Log.d(TAG, "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(consumer);
}
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

这里写图片描述 
可以看到, observable发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-1的线程中发送的事件, 而consumer 仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

这段代码只不过是增加了两行代码:

.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread())
		
  • 1
  • 2
  • 1
  • 2

简单的来说, subscribeOn() 指定的是Observable发送事件的线程, observeOn() 指定的是Observer接收事件的线程. 
多次指定Observable的线程只有次指定的有效, 也就是说多次调用subscribeOn() 只有次的有效, 其余的会被忽略. 
多次指定Observer的线程是可以的, 也就是说每调用一次observeOn() , Observer的线程就会切换一次.例如:

observable.subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.io()) .subscribe(consumer);
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为: 
这里写图片描述

可以看到, Observable虽然指定了两次线程, 但只有次指定的有效, 依然是在RxNewThreadScheduler线程中, 而Observer则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:

Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程; Schedulers.newThread(): 开启新线程操作; Schedulers.immediate(): 默认指定的线程,也就是当前线程; Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;

AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

例子:还是用之前设置图片的例子,这次我们在子线程中进行网络请求获取图片,在主线程中对图片进行设置

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> e) throws Exception { try {
            Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
            e.onNext(drawable);
        } catch (IOException error) {
            e.onError(error);
        }
    }
})// 指定 subscribe() 所在的线程,也就是上面subscribe()方法调用的线程 .subscribeOn(Schedulers.io()) // 指定 Observer 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程 .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) {

    } @Override public void onNext(Drawable value) {
        ivLogo.setImageDrawable(value);
    } @Override public void onError(Throwable e) {

    } @Override public void onComplete() {

    }
});
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

操作符的使用

在了解基本知识和线程调度后,我们来学习一下RxJava各种神奇的操作符

Map 
Map是RxJava中简单的一个变换操作符了, 它的作用就是对Observable发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override public String apply(Integer integer) throws Exception { return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override public void accept(String s) throws Exception { Log.d(TAG, s);
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

在Observable我们发送的是数字类型, 而在Observer我们接收的是String类型, 中间起转换作用的就是Map操作符, 运行结果为: 
这里写图片描述

通过Map, 可以将Observable发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合,功能非常强大

例子:还是以图片加载的例子,我们传进来一个图片的路径,然后通过Map进行转换成drawble再发送给观察者

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    }
}).map(new Function<String, Drawable>() { @Override public Drawable apply(String url) throws Exception { try {
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src"); return drawable;
        } catch (IOException e) {

        } return null;
    }
})  .subscribeOn(Schedulers.io()) // 指定 Observer 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程 .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) {

            } @Override public void onNext(Drawable value) { if (value != null) {
                    ivLogo.setImageDrawable(value);
                }
            } @Override public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            } @Override public void onComplete() {

            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

效果如下: 
这里写图片描述

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是 
Observable –> map变换 –> Observable

FlatMap 
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

Observable每发送一个事件, flatMap都将对其进行转换, 然后发送转换之后的新的事件, Observer接收到的就是转换后发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 如果需要保证顺序则需要使用concatMap.

 Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) { list.add("I am value " + integer);
            } return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override public void accept(String s) throws Exception { Log.d(TAG, s);
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

效果如下: 
这里写图片描述

Map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

1.flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象; 
2.flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中; 
3.flatMap 变换后产生的每一个Observable对象发送的事件,后都汇入同一个Observable,进而发送给Subscriber回调; 
4.map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样; 
5.可以对一个Observable多次使用 map 和 flatMap;

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

public interface Api { @GET Observable<LoginResponse> login(@Body LoginRequest request); @GET Observable<RegisterResponse> register(@Body RegisterRequest request);

}
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接着创建一个Retrofit客户端:

private static Retrofit create() {
    OkHttpClient.Builder builder = new OkHttpClient().newBuilder(); builder.readTimeout(10, TimeUnit.SECONDS); builder.connectTimeout(9, TimeUnit.SECONDS); if (BuildConfig.DEBUG) {
        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(); interceptor.setLevel(HttpLoggingInterceptor.Level.BODY); builder.addInterceptor(interceptor); }

    return new Retrofit.Builder().baseUrl(ENDPOINT) .client(builder.build()) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); }
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

发起请求就很简单了:

Api api = retrofit.create(Api.class);
api.login(request)
        .subscribeOn(Schedulers.io()) //在IO线程进行网络请求 .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果 .subscribe(new Observer<LoginResponse>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(LoginResponse value) {} @Override public void onError(Throwable e) {
                Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
            } @Override public void onComplete() {
                Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

concatMap 
这里也简单说一下concatMap吧, 它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 来看个代码吧:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) { list.add("I am value " + integer);
            } return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override public void accept(String s) throws Exception { Log.d(TAG, s);
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

只是将之前的flatMap改为了concatMap, 其余原封不动, 运行结果如下: 
这里写图片描述

可以看到, 结果仍然是有序的.

ZIP 
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项少的那个Observable一样多的数据。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emitter 1");
            emitter.onNext(1); Log.d(TAG, "emitter 2");
            emitter.onNext(2); Log.d(TAG, "emitter 3");
            emitter.onNext(3); Log.d(TAG, "emitter 4");
            emitter.onNext(4); Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    });

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "emitter A");
            emitter.onNext("A"); Log.d(TAG, "emitter B");
            emitter.onNext("B"); Log.d(TAG, "emitter C");
            emitter.onNext("C"); Log.d(TAG, "emitter complete2");
            emitter.onComplete();
        }
    });

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override public String apply(Integer integer, String s) throws Exception { return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe");
        }

        @Override public void onNext(String value) { Log.d(TAG, "onNext: " + value);
        }

        @Override public void onError(Throwable e) { Log.d(TAG, "onError");
        }

        @Override public void onComplete() { Log.d(TAG, "onComplete");
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

我们分别创建了observable, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧: 
这里写图片描述 
观察发现observable1发送事件后,observable2才发送 
这是因为我们两个observable都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1");
            emitter.onNext(1); Thread.sleep(1000); Log.d(TAG, "emit 2");
            emitter.onNext(2); Thread.sleep(1000); Log.d(TAG, "emit 3");
            emitter.onNext(3); Thread.sleep(1000); Log.d(TAG, "emit 4");
            emitter.onNext(4); Thread.sleep(1000); Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "emit A");
            emitter.onNext("A"); Thread.sleep(1000); Log.d(TAG, "emit B");
            emitter.onNext("B"); Thread.sleep(1000); Log.d(TAG, "emit C");
            emitter.onNext("C"); Thread.sleep(1000); Log.d(TAG, "emit complete2");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override public String apply(Integer integer, String s) throws Exception { return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe");
        }

        @Override public void onNext(String value) { Log.d(TAG, "onNext: " + value);
        }

        @Override public void onError(Throwable e) { Log.d(TAG, "onError");
        }

        @Override public void onComplete() { Log.d(TAG, "onComplete");
        }
    });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

好了, 这次我们让事件都在IO线程里发送事件, 再来看看运行结果: 
这里写图片描述

个observable明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢? 
这是因为我们之前说了, zip发送的事件数量跟observable中发送事件少的那一个的事件数量是有关的, 在这个例子里我们observable2只发送了三个事件然后就发送了Complete, 这个时候尽管observable1还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?

from

在RxJava的from操作符到2.0已经被拆分成了3个,fromArray, fromIterable, fromFuture接收一个集合作为输入,然后每次输出一个元素给subscriber。

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
}); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这里写图片描述

注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

filter 
条件过滤,去除不符合某些条件的事件。举个栗子:

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
       .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { // 偶数返回true,则表示剔除奇数,留下偶数 return integer % 2 == 0;

           }
       }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

这里写图片描述

take 
多保留的事件数。

 Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) {

            } @Override public void onNext(String value) {
                Log.d(TAG,value);
            } @Override public void onError(Throwable e) {

            } @Override public void onComplete() {

            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

这里写图片描述 
可以发现我们发送了6个String,后只打印了前两个,这就是take过滤掉的结果

doOnNext 
如果你想在处理下一个事件之前做某些事,就可以调用该方法

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { // 偶数返回true,则表示剔除奇数 return integer % 2 == 0;
    }
})// 多保留三个,也就是后剩三个偶数 .take(3).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 在输出偶数之前输出它的hashCode Log.i(TAG, "hahcode = " + integer.hashCode() + "");
    }
}).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) {

    } @Override public void onNext(Integer value) {
        Log.i(TAG, "number = " + value);
    } @Override public void onError(Throwable e) {

    } @Override public void onComplete() {

    }
}); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

这里写图片描述

debounce 
debounce也是用于事件的过滤,可以指定过滤事件的时间间隔

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { int i = 0; int[] times = new int[]{100, 1000}; while (true) {
            i++; if (i >= 100) break;
            e.onNext(i); try { // 注意!!!! // 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉 // 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉 Thread.sleep(times[i % 2]);
            } catch (InterruptedException error) {
                error.printStackTrace();
            }
        }
        e.onComplete();
    }
})// 间隔400ms以内的事件将被丢弃 .debounce(400, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() { @Override public void onError(Throwable e) {
                Log.e(TAG, e.toString());
            } @Override public void onComplete() {
                Log.i(TAG, "complete");
            } @Override public void onSubscribe(Disposable d) {

            } @Override public void onNext(Integer integer) {
                Log.i(TAG, "integer = " + integer);
            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这里写图片描述

compose 
与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。 
1.compose 是一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。 
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。 
3.flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。 
4.建议使用 compose 代替 flatMap。

First 
只发送符合条件的个事件。可以与contact操作符,做网络缓存。 
例子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

// 从缓存获取 Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() { @Override public void call(Subscriber<? super BookList> subscriber) {
                BookList list = getFromDisk(); if (list != null) {
                    subscriber.onNext(list);
                } else {
                    subscriber.onCompleted();
                }
            }
        }); // 从网络获取 Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();

        Observable.concat(fromDisk, fromNetWork) // 如果缓存不为null,则不再进行网络请求。反之 .first()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<BookList>() { @Override public void onCompleted() {

                    } @Override public void onError(Throwable e) {

                    } @Override public void onNext(BookList discussionList) {

                    }
                }); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

Single 
Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

Single.create(new SingleOnSubscribe<Object>() { @Override public void subscribe(SingleEmitter<Object> e) throws Exception {
        e.onSuccess("hello world");
    }
}).subscribe(new SingleObserver<Object>() { @Override public void onSubscribe(Disposable d) {

    } @Override public void onSuccess(Object value) {
        Log.i(TAG, value.toString());
    } @Override public void onError(Throwable e) {

    }
});
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

这里写图片描述

可以配合debounce,避免SearchEditText频繁请求。

final Subject subject = PublishSubject.create();

subject.debounce(400, TimeUnit.MILLISECONDS)
        .subscribe(new Observer() { @Override public void onError(Throwable e) {

            } @Override public void onComplete() {

            } @Override public void onSubscribe(Disposable d) {

            } @Override public void onNext(Object o) { // request }
        });

edittext.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) {
        subject.onNext(s.toString());
    } @Override public void afterTextChanged(Editable s) { }
});
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

RxJava的一些使用场景

场景1: 
取数据,首先检查内存是否有缓存 
然后检查文件缓存中是否有 
后才从网络中取 
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { if (memoryCache != null) {
            emitter.onNext(memoryCache);
        } else {
            emitter.onComplete();
        }
    }
}); final Observable<String> disk  = Observable.create(new ObservableOnSubscribe<String>() {
    String cachePref = getSharedPreferences("rxdeni",MODE_PRIVATE).getString("cache",null); @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { if (cachePref != null) {
            emitter.onNext(cachePref);
        } else {
            emitter.onComplete();
        }
    }
});

Observable<String> network = Observable.just("network"); //主要就是靠concat operator来实现 Observable.concat(memory, disk, network).firstElement()

        .subscribeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception {
                System.out.println("--------------subscribe: " + s);
            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

场景2:界面需要等到多个接口并发取完数据,再更新

Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("haha");
        }
    }).subscribeOn(Schedulers.newThread());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("hehe");
        }
    }).subscribeOn(Schedulers.newThread());


    Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) {

                } @Override public void onNext(String value) {
                    Log.d(TAG,value);
                } @Override public void onError(Throwable e) {

                } @Override public void onComplete() {

                }
            });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

场景3:界面按钮需要防止连续点击的情况

RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() { @Override public void onCompleted() {

            } @Override public void onError(Throwable e) {

            } @Override public void onNext(Object o) {
                Log.i(TAG, "do clicked!");
            }
        });
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

场景4:响应式的界面 
比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this); RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences); Preference<Boolean> checked = rxPreferences.getBoolean("checked", true); CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test); RxCompoundButton.checkedChanges(checkBox) .subscribe(checked.asAction());
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

场景5:复杂的数据变换

Observable.just("1", "2", "6", "3", "4", "5") .map(new Function<String, Integer>() {
            @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s);
            }
        }).filter(new Predicate<Integer>() {
    @Override public boolean test(Integer integer) throws Exception { return integer.intValue()%2 == 0;
    }
}).distinct().take(2).reduce(new BiFunction<Integer, Integer, Integer>() {
    @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer.intValue() + integer2.intValue();
    }
}).subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception { Log.d(TAG,integer.toString());
    }
}); 
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责,本站只提供参考并不构成任何投资及应用建议。本站是一个个人学习交流的平台,网站上部分文章为转载,并不用于任何商业目的,我们已经尽可能的对作者和来源进行了通告,但是能力有限或疏忽,造成漏登,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

回到顶部
嘿,我来帮您!