RxJava
a library for composing asynchronous and event-based programs using observable sequences for the Java VM
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
随着程序逻辑变得越来越复杂,它依然能够保持简洁
最早,我们这么写代码
private void requestFinished(JSONObject json) {
    new DealAsync().execute(json);
}
private class DealAsync extends AsyncTask<Void, Void, Void> {
    @Override
    protected List<Object> doInBackground(JSONArray... params) {
        //TOO deal Data
        dealData(params);
        return data;
    }
    @Override
    protected void onPostExecute(List<Object> entities) {
        super.onPostExecute(entities);
        //TODO deal data to view
    }
}
现在,可以这么写
private void requestFinished(JSONObject json) {
    Observable.just(dealData(json))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {
                @Override
                public void call(Object object) {
                    //TODO deal data to view
                }
            });
}
sample again!
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
    imageCollectorView.addImage(bitmap);
}
                    });
                }
            }
        }
    }
}.start();
一组文件夹下所有的 png 图片都加载出来,并显示在 imageCollectorView 内
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
    @Override
    public Observable<File> call(File file) {
        return Observable.from(file.listFiles());
    }
})
.filter(new Func1<File, Boolean>() {
    @Override
    public Boolean call(File file) {
        return file.getName().endsWith(".png");
    }
})
.map(new Func1<File, Bitmap>() {
    @Override
    public Bitmap call(File file) {
        return getBitmapFromFile(file);
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
    @Override
    public void call(Bitmap bitmap) {
        imageCollectorView.addImage(bitmap);
    }
});
每次你打开,看到的是这样的:
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
核心:Observables(被观察者,事件源)
Subscribers(观察者)
Observables 发出事件, Subscribers 处理事件
eg: 触摸事件,web接口调用返回的数据
常见观察者模式
RxJava
区别:Observable 没有 Subscriber 的话,则不会发出任何事件
Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);
Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }
    @Override
    public void onCompleted() { }
    @Override
    public void onError(Throwable e) { }
};
myObservable.subscribe(mySubscriber);
Observable.just("Hello, world!")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });
或
Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s));
Hello, world!中加签名:
Hello, world! -MrFu
Observable.just("Hello, world! -MrFu")
.subscribe(s -> System.out.println(s + " -MrFu"));
Observable.just("Hello, world!")
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s + " -Dan";
            }
        })
        .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
        .map(s -> s + " -Dan")
        .subscribe(s -> System.out.println(s));
Observable.just("Hello, world!")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.hashCode();
        }
    })
    .subscribe(i -> System.out.println(Integer.toString(i)));
Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .subscribe(i -> System.out.println(Integer.toString(i)));
Observable 可以是一个数据库查询,Subscriber 用来显示查询结果;
Observable 可以是屏幕上的点击事件,Subscriber 用来响应点击事件;
Observable 可以是一个网络请求,Subscriber 用来显示请求结果。
Observable 和 Subscriber 是独立于中间的变换过程的
此处应该有表情!
Observable<List<String>> query(String text); //返回 urls
按照之前做法,我们会这么做:
query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });
但是,还能这么做:
query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });
不够!!!
超牛逼的操作符:flatMap
query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));
Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个 Observable。
flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List(String),而是收到一系列单个的字符串,就像 Observable.from() 的输出一样。
See More!
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));
query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)//过滤不为 null
    .take(5)//只要5个结果
    .doOnNext(title -> saveTitle(title))//打印前保存标题
    .subscribe(title -> System.out.println(title));
Operators 可以让你对数据流做任何操作
响应式函数编程的魅力:一系列的 Operators 链接起来就可以完成复杂的逻辑。代码被分解成一系列可以组合的片段
Deal Error
Observable.just("Hello, world!")
    .map(s -> potentialException(s))
    .map(s -> anotherPotentialException(s))
    .subscribe(new Subscriber<String>() {
        @Override
        public void onNext(String s) { System.out.println(s); }
        @Override
        public void onCompleted() { System.out.println("Completed!"); }
        @Override
        public void onError(Throwable e) { System.out.println("Ouch!"); }
    });
Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
常用于:后台线程取数据,主线程显示
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread)
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定
Sample
getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {
userView.setUser(user);
                    }
                });
            }).start();
    }
    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};
getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });
Anything else?
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });
RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);
一种模式,使用 RxJava 实现 EventBus,从而不需要使用 Otto 或者 EventBus。[1]
Hey, do not be shy.