Updated on 2016-11-22

观察者模式

响应式编程 | 函数式编程

https://github.com/ReactiveX/RxJava

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html

Operators | Operators

Book | Book

Java Native

import java.util.Observable;
import java.util.Observer;

public class Main {
    public static void main(String[] args) {
        Subject subject = new Subject();
        Consumer consumer = new Consumer();

        subject.addObserver(consumer); // 被观察者添加观察者

        subject.setData("123");
        subject.setData("123");
        subject.setData("456");
        subject.setData("456");
    }

    private static class Subject extends Observable { // 被观察者需要继承类
        private String data = "";

        public String getData() {
            return data;
        }

        public void setData(String data) {
            if (data == null || this.data.equals(data)) return; // 过滤无效数据

            this.data = data;
            setChanged(); // 设置 flag
            notifyObservers(data); // 通知观察者
        }
    }

    private static class Consumer implements Observer { // 观察者需要实现接口
        @Override
        public void update(Observable o, Object arg) {
            System.out.println("Update: " + arg);
        }
    }
}
----
输出
Update: 123
Update: 456

Observable - 被观察者

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {     传入 OnSubscribe描述事件
    @Override
    public void call(Subscriber<? super String> subscriber) {     作为参数传入的观察者
        subscriber.onNext("A");     事件
        subscriber.onNext("B");
        subscriber.onNext("C");
        subscriber.onCompleted();
    }
});

----

Observable<String> observable = Observable.create(subscriber -> {     简化为 Lambda 表达式
    subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onNext("C");
    subscriber.onCompleted();
});

-------------------------------------------------------

Observable<String> observable = Observable.just("A", "B", "C");     快捷方式

-------------------------------------------------------

List<String> list = Arrays.asList("A", "B", "C");
Observable<String> observable = Observable.from(list);     快捷方式

-------------------------------------------------------

observable.subscribe(observer);     被观察者订阅观察者观察者见下文
----
输出
A
B
C
onCompleted

Note
0. 被观察者发出事件观察者处理事件。(可一对多
1. 一旦被观察者调用 subscribe() 方法订阅观察者被观察者中的唯一成员 OnSubscribe 将执行 call() 方法并将观察者作为参数传入
2. 调用 subscribe() 方法后会返回 Subscription 接口对象仅含 2 个方法 unsubscribe 和 isUnsubscribed),代表被观察者与观察者之间的订阅关系

Observer - 观察者

Observer<String> observer = new Observer<String>() {     接口最终会被包装为 Subscriber
    @Override
    public void onCompleted() {     完成事件调用链结束
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {     出现异常框架自动调用调用链结束
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {     处理事件
        System.out.println(s);
    }
};

-------------------------------------------------------

Subscriber<String> subscriber = new Subscriber<String>() {     抽象类继承但未实现 Observer 接口且可选择性重写 onStart 方法
    @Override
    public void onCompleted() {     完成事件调用链结束
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {     出现异常框架自动调用调用链结束
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {     处理事件
        System.out.println(s);
    }
};

调用链onStart() --> onNext() --> onCompleted()
   |       |                    onError()
   |        此方法只能在调用 subscribe() 的线程上执行可通过操作符 doOnSubscribe(Action0) 替代且可指定运行线程
    当调用链结束后订阅关系自动解除Subscription.isUnsubscribed = true
-------------------------------------------------------

Action1<String> action1 = new Action1<String>() {     快捷方式被观察者的 subscribe() 方法支持传入 Action1 接口充当 onNext
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

----

Action1<String> action1 = s -> System.out.println(s);     简化为 Lambda 表达式
Action1<String> action1 = System.out::println;     简化为方法引用

Scheduler - 线程调度

默认情况下调用链(事件流)在调用 subscribe() 的线程上运行,可以通过以下 2 种方法指定运行线程:

  • (一次)subscribeOn():指定其运行线程。
  • (多次)observeOn():切换其运行线程。
    • subscribeOn() 用于指定最开始调用链(事件流)的运行线程,后期可通过 observeOn() 随时切换其运行线程。
Observable
        .create((Observable.OnSubscribe<String>) subscriber -> {
            Log.w("Tag", "create__" + Thread.currentThread().toString());
            subscriber.onNext("");
            subscriber.onCompleted();
        })
        .observeOn(Schedulers.computation())     切换至计算线程 (2)
        .map(s -> {
            Log.w("Tag", "map__" + Thread.currentThread().toString());
            return s;
        })
        .observeOn(AndroidSchedulers.mainThread())     切换至主线程 (3)
        .subscribeOn(Schedulers.io())     指定最开始在IO线程中运行 (1)
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.w("Tag", "onCompleted__" + Thread.currentThread().toString());
            }

            @Override
            public void onError(Throwable e) {
                Log.w("Tag", "onError");
            }

            @Override
            public void onNext(String s) {
                Log.w("Tag", "onNext__" + Thread.currentThread().toString());
            }
        });
----
输出
23701-14945/com.example.myapp.myapplication W/Tag: create__Thread[RxIoScheduler-3,5,main]     IO 线程
23701-23824/com.example.myapp.myapplication W/Tag: map__Thread[RxComputationScheduler-2,5,main]     计算线程
23701-23701/com.example.myapp.myapplication W/Tag: onNext__Thread[main,5,main]     主线程
23701-23701/com.example.myapp.myapplication W/Tag: onCompleted__Thread[main,5,main]     主线程

-------------------------------------------------------

实例加载网络图片
----
Observable
        .just("http://blog.xhstormr.tk/uploads/children-of-the-sun1.jpg")
        .map(s -> {     下载 BitmapString  Bitmap
            Bitmap bitmap = null;
            try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new URL(s).openStream())) {
                bitmap = BitmapFactory.decodeStream(bufferedInputStream);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return bitmap;
        })
        .observeOn(AndroidSchedulers.mainThread())     切换至主线程 (2)
        .subscribeOn(Schedulers.io())     指定最开始在IO线程中运行 (1)
        .subscribe(bitmap -> mImageView.setImageBitmap(bitmap));     加载 Bitmap

Operators - 操作符

类似于 Java 8 中的 Stream 的内部迭代。

转换

public class A {
    private static final List<Author> LIST = Arrays.asList(
            new Author("Adam", 23, Arrays.asList("Java1", "Java2")),
            new Author("Bell", 19, Arrays.asList("Python1", "Python2")),
            new Author("Conan", 23, Arrays.asList("PHP1", "PHP2")),
            new Author("David", 26, Arrays.asList("Ruby1", "Ruby2")));     作家列表

    public static void main(String[] args) {
        Observable
                .from(LIST)
                .map(author -> author.mAge)     一对一
                .sorted()     排序
                .subscribe(integer -> System.out.print(integer + " "), System.out::println, () -> System.out.println("\n————————————"));
        Observable
                .from(LIST)
                .map(author -> author.mAge)     一对一
                .sorted()     排序
                .scan(0, (i1, i2) -> i1 + i2)     累加器提供初始值
                .subscribe(integer -> System.out.print(integer + " "), System.out::println, () -> System.out.println("\n————————————"));
        Observable
                .from(LIST)
                .map(author -> author.mAge)     一对一
                .reduce(0, (i1, i2) -> i1 + i2)     累加器提供初始值
                .subscribe(integer -> System.out.print(integer + " "), System.out::println, () -> System.out.println("\n————————————"));
        Observable
                .from(LIST)
                .flatMap(author -> Observable.from(author.mArticle))     一对多手动转换为 Observable)(推荐使用 concatMap解决 flatMap 事件交叉问题)(flatMap() 底层调用 merge()concatMap 底层调用 concat()下同
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .flatMapIterable(author -> author.mArticle)     一对多自动转换为 Observable)(推荐使用 concatMapIterable解决 flatMapIterable 事件交叉问题
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .groupBy(author -> author.mAge)     按年龄将事件流分组
                .concatMap(groupedObservable -> groupedObservable)     一对多
                .subscribe(author -> System.out.printf("年龄:%d 姓名:%s\n", author.mAge, author.mName), System.out::println, () -> System.out.println("————————————"));
    }

    private static class Author {     作家
        private String mName;     姓名
        private int mAge;     年龄
        private List<String> mArticle;     文章列表

        private Author(String name, int age, List<String> article) {
            mName = name;
            mAge = age;
            mArticle = article;
        }

        @Override
        public String toString() {
            return mName;
        }
    }
}
----
输出
19 23 23 26
————————————
0 19 42 65 91
————————————
91
————————————
Java1
Java2
Python1
Python2
PHP1
PHP2
Ruby1
Ruby2
————————————
Java1
Java2
Python1
Python2
PHP1
PHP2
Ruby1
Ruby2
————————————
年龄:23 姓名:Adam
年龄:23 姓名:Conan
年龄:19 姓名:Bell
年龄:26 姓名:David
————————————

过滤

public class A {
    private static final List<Author> LIST = Arrays.asList(
            new Author("Adam", 23, Arrays.asList("Java1", "Java2")),
            new Author("Bell", 19, Arrays.asList("Python1", "Python2")),
            new Author("Conan", 23, Arrays.asList("PHP1", "PHP2")),
            new Author("David", 26, Arrays.asList("Ruby1", "Ruby2")));

    public static void main(String[] args) {
        Observable
                .from(LIST)
                .filter(author -> author.mAge > 25)     过滤
                .subscribe(author -> System.out.printf("年龄:%d 姓名:%s\n", author.mAge, author.mName), System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .take(2)     只取前 2
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .takeLast(2)     只取后 2
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .skip(2)     跳过前 2
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .skipLast(2)     跳过后 2
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .elementAt(0)     选取
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .first()     选取最前若无数据first() 直接调用 onError()takeFirst() 直接调用 onCompleted()下同
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(LIST)
                .last()     选取最后
                .subscribe(System.out::println, System.out::println, () -> System.out.println("————————————"));
        Observable
                .from(Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 1, 2, 3, 4, 5, 6))
                .distinct()     去重
                .subscribe(integer -> System.out.print(integer + " "), System.out::println, () -> System.out.println("\n————————————"));
        Observable
                .from(Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 1, 2, 3, 4, 5, 6))
                .distinctUntilChanged()     前后去重
                .subscribe(integer -> System.out.print(integer + " "), System.out::println, () -> System.out.println("\n————————————"));
        ----
        输出
        年龄:26 姓名:David
        ————————————
        Adam
        Bell
        ————————————
        Conan
        David
        ————————————
        Conan
        David
        ————————————
        Adam
        Bell
        ————————————
        Adam
        ————————————
        Adam
        ————————————
        David
        ————————————
        1 2 3 4 5 6
        ————————————
        1 2 3 4 5 6 1 2 3 4 5 6
        ————————————
    }

    private static void a() throws InterruptedException {     takeUntil
        Observable
                .from(Arrays.asList(1, 2, 3, 4, 5, 6))
                .takeUntil(integer -> integer == 3)     一直处理事件直到事件符合某项条件
                .subscribe(System.out::println);
        ----
        输出
        1
        2
        3

        Observable<Long> observable1 = Observable.interval(300, TimeUnit.MILLISECONDS);     每隔 300 毫秒发送数字
        Observable<Long> observable2 = Observable.interval(800, TimeUnit.MILLISECONDS);     每隔 800 毫秒发送数字
        observable1
                .takeUntil(observable2)     一直处理事件直到 observable2 发送了事件
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出
        0
        1
    }

    private static class Author {
        private String mName;
        private int mAge;
        private List<String> mArticle;

        private Author(String name, int age, List<String> article) {
            mName = name;
            mAge = age;
            mArticle = article;
        }

        @Override
        public String toString() {
            return mName;
        }
    }
}

组合

public class A {
    private static String[] strings = {"A", "B", "C", "D", "E"};
    private static Observable<String> observable1 = Observable     每隔 300 毫秒发送事件A B C D E
            .interval(300, TimeUnit.MILLISECONDS)
            .take(strings.length)
            .map(aLong -> strings[aLong.intValue()]);
    private static Observable<Long> observable2 = Observable     每隔 500 毫秒发送事件0 1 2 3 4 5 6 7 8 9
            .interval(500, TimeUnit.MILLISECONDS)
            .take(10);

    public static void main(String[] args) throws InterruptedException {
        a();
        b();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void a() {     合并
        Observable
                .merge(observable1, observable2)     无序合并异步进行
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        A 0 B C 1 D E 2 3 4 5 6 7 8 9

        Observable
                .concat(observable1, observable2)     有序合并同步进行
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        A B C D E 0 1 2 3 4 5 6 7 8 9

        observable1
                .startWith(Observable.just("AA", "BB", "CC"))     有序合并同步进行),底层调用 concat 在其之前插入新事件接收 Observable 和 Iterable
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        AA BB CC A B C D E
    }

    private static void b() {     组合
        Observable
                .zip(observable1, observable2, (s, aLong) -> s + aLong)     不可重用
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        A0 B1 C2 D3 E4

        Observable
                .combineLatest(observable1, observable2, (s, aLong) -> s + aLong)     可重用与最近的事件组合
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        A0 B0 C0 C1 D1 E1 E2 E3 E4 E5 E6 E7 E8 E9

        observable1     左事件源
                .join(     可重用事件具有有效期有点像排列组合
                        observable2,     右事件源
                        s -> Observable.timer(10000, TimeUnit.MILLISECONDS),     左事件有效期
                        aLong -> Observable.timer(0, TimeUnit.MILLISECONDS),     右事件有效期
                        (s, aLong) -> s + aLong)
                .subscribe(o -> System.out.print(o + " "), System.out::println, () -> System.exit(0));
        ----
        输出
        A0 A1 B1 C1 A2 B2 C2 D2 E2 A3 B3 C3 D3 E3 A4 B4 C4 D4 E4 A5 B5 C5 D5 E5 A6 B6 C6 D6 E6 A7 B7 C7 D7 E7 A8 B8 C8 D8 E8 A9 B9 C9 D9 E9
    }
}

Code

Observable
        .just("A", "B", "C")
        .doOnNext(s -> System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "doOnNext"))     类似于 Java 8 中的 Stream 的 peek
        .map(s -> {
            System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "map");
            return s;
        })
        .subscribe(s -> System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "onNext"), System.out::println, () -> System.exit(0));
Thread.sleep(Integer.MAX_VALUE);
----
输出     类似于 Java 8 中的 Stream 的垂直执行像在流水线依次经过每个操作并通过短路求值尽可能减少操作次数
A__main__doOnNext
A__main__map
A__main__onNext
B__main__doOnNext
B__main__map
B__main__onNext
C__main__doOnNext
C__main__map
C__main__onNext

-------------------------------------------------------

Observable
        .just("A", "B", "C")
        .doOnNext(s -> System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "doOnNext"))
        .map(s -> {
            System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "map");
            return s;
        })
        .observeOn(Schedulers.computation())     切换至计算线程 (2)
        .subscribeOn(Schedulers.io())     指定最开始在IO线程中运行 (1)
        .subscribe(s -> System.out.printf("%s__%s__%s\n", s, Thread.currentThread().getName(), "onNext"), System.out::println, () -> System.exit(0));
Thread.sleep(Integer.MAX_VALUE);
----
输出
A__RxIoScheduler-2__doOnNext
A__RxIoScheduler-2__map
B__RxIoScheduler-2__doOnNext
B__RxIoScheduler-2__map
C__RxIoScheduler-2__doOnNext
C__RxIoScheduler-2__map
A__RxComputationScheduler-1__onNext
B__RxComputationScheduler-1__onNext
C__RxComputationScheduler-1__onNext
通过 compose() 操作符重用操作链
-------------------------------------------------------
public class A {
    public static void main(String[] args) {
        final Observable.Transformer<String, String> transformer = new Observable.Transformer<String, String>() {
            @Override
            public Observable<String> call(Observable<String> stringObservable) {     对传入的原始被观察者进行加工
                return stringObservable.doOnUnsubscribe(() -> System.out.println("Unsubscribed")).map(s -> s + "1");
            }
        };     Transformer 对象

        Observable
                .just("A", "B", "C")
                .compose(transformer)     重用 Transformer
                .subscribe(System.out::println);
        Observable
                .just("1", "2", "3")
                .compose(transformer)     重用 Transformer
                .subscribe(System.out::println);
    }
}
----
输出
A1
B1
C1
Unsubscribed
11
21
31
Unsubscribed

Note
1. compose() 接收一个 Transformer 接口此接口继承自 Func1接收原始 Observable返回新 Observable
2. compose() 作用于整个被观察者flatMap() 作用于每个事件

-------------------------------------------------------

实例改进之前线程调度加载网络图片的例子
----
public class A {
    private static final Observable.Transformer<Object, Object> mTransformer = observable ->     单例模式
            observable
                    .observeOn(AndroidSchedulers.mainThread())     切换至主线程 (2)
                    .subscribeOn(Schedulers.io());     指定最开始在IO线程中运行 (1)

    @SuppressWarnings("unchecked")     压制警告强制类型转换
    public static <T> Observable.Transformer<T, T> applySchedulers() {
       return ((Observable.Transformer<T, T>) mTransformer);   返回 Transformer 对象为了不丢失类型信息而强制转换
    }
}

----

Observable
        .just("http://blog.xhstormr.tk/uploads/children-of-the-sun1.jpg")
        .map(s -> {     下载 BitmapString  Bitmap
            Bitmap bitmap = null;
            try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new URL(s).openStream())) {
                bitmap = BitmapFactory.decodeStream(bufferedInputStream);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return bitmap;
        })
        .compose(A.applySchedulers())     只需传入 A.applySchedulers() 返回的对象即可后台处理前台显示
        .subscribe(bitmap -> mImageView.setImageBitmap(bitmap));     加载 Bitmap
通过 Observable.defer(Func0) 实现延迟订阅
-------------------------------------------------------
public class A {
    private static String s;

    public static void main(String[] args) {
        Observable<String> observable = Observable.just(s);     直接执行
        s = "ABC";
        observable.subscribe(System.out::println);
        ----
        输出
        null

        Observable<String> observable = Observable.defer(() -> Observable.just(s));     Func0 接口中的代码将延迟执行
        s = "ABC";
        observable.subscribe(System.out::println);
        ----
        输出
        ABC
    }
}

Note
0. just() 和 from() 在创建 Observable 时就存储了对象的值create() 创建的 Observable 在 subscribe() 时才访问对象
1. defer() 接收一个 Func0 接口并显式声明此接口返回一个 Observable 对象
2. defer() 中的代码直到订阅才会执行
通过 Schedulers 将耗时操作放到后台线程中执行
-------------------------------------------------------
public class A {
    public static void main(String[] args) throws InterruptedException {
        Schedulers.io().createWorker().schedule(() -> {     Action0 接口中的代码将在 IO 线程中执行
            System.out.println(Thread.currentThread());
            System.exit(0);
        });
        Thread.sleep(Integer.MAX_VALUE);
    }
}
----
输出
Thread[RxIoScheduler-2,5,main]
Single
-------------------------------------------------------

被观察者
----
Single<String> observable = Single.create(new Single.OnSubscribe<String>() {
    @Override
    public void call(SingleSubscriber<? super String> singleSubscriber) {
        singleSubscriber.onSuccess("A");
    }
});

Single<String> observable = Single.just("A");

观察者
----
SingleSubscriber<String> subscriber = new SingleSubscriber<String>() {     SingleSubscriber
    @Override
    public void onSuccess(String value) {     成功
        System.out.println(value);
    }

    @Override
    public void onError(Throwable error) {     异常
        System.out.println(error.toString());
    }
};

实例
----
Single.just("A").subscribe(System.out::println);
----
输出
A
延时执行
-------------------------------------------------------
public class A {
    public static void main(String[] args) throws InterruptedException {
        Observable<Integer> range = Observable.range(0, 3);
        range
                .delay(2, TimeUnit.SECONDS)     2 秒
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(延时 2 秒后执行
        0
        1
        2

        Observable<Long> timer = Observable.timer(2, TimeUnit.SECONDS);    2 秒timer 操作返回计时器 Observable
        timer
                .delay(2, TimeUnit.SECONDS)     2 秒
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(延时 4 秒后执行
        0
    }
}
缓解 Backpressure背部压力
-------------------------------------------------------

缓冲事件
----
public class A {
    public static void main(String[] args) throws InterruptedException {
        Observable
                .range(0, 10)
                .buffer(2)     将多个事件包装为 List<T>缓冲区大小为 2
                .subscribe(System.out::println);
        ----
        输出
        [0, 1]
        [2, 3]
        [4, 5]
        [6, 7]
        [8, 9]

        Observable
                .range(0, 10)
                .buffer(2, 3)     将多个事件包装为 List<T>缓冲区大小为 2每次都跳过第 3 个事件
                .subscribe(System.out::println);
        ----
        输出
        [0, 1]
        [3, 4]
        [6, 7]
        [9]
    }
}

过滤事件
----
public class A {
    public static void main(String[] args) throws InterruptedException {
        Observable<Long> timer = Observable.timer(5, TimeUnit.SECONDS);     5 秒后发出事件计时器

        Observable
                .interval(500, TimeUnit.MILLISECONDS)     每隔 500 毫秒发出事件
                .takeUntil(timer)     一直处理事件直到 timer 发送了事件
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(CPU 时间不准无过滤
        0
        1
        2
        3
        4
        5
        6
        7
        8

        Observable
                .interval(500, TimeUnit.MILLISECONDS)
                .takeUntil(timer)
                .throttleFirst(1, TimeUnit.SECONDS)     每隔 1 秒发出时间段中的第一个事件
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(CPU 时间不准
        0
        3
        5
        7

        Observable
                .interval(500, TimeUnit.MILLISECONDS)
                .takeUntil(timer)
                .throttleLast(1, TimeUnit.SECONDS)     每隔 1 秒发出时间段中的最后一个事件sample() 行为一致
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(CPU 时间不准
        0
        2
        4
        6
        8

        Observable     debounce去抖
                .interval(500, TimeUnit.MILLISECONDS)
                .takeUntil(timer)
                .debounce(1, TimeUnit.SECONDS)     1 秒无新事件后再发送其接收到的最后一个事件
                .subscribe(System.out::println, System.out::println, () -> System.exit(0));
        Thread.sleep(Integer.MAX_VALUE);
        ----
        输出:(CPU 时间不准
        8
    }
}