此篇為本系列的第一篇,用於建立一些基本概念,其餘operator會獨立在不同篇章介紹。
一般來說,一個完整的RxJava chain會由以下幾種operator組成:
- 建立源頭的:
create()
- 決定送出資料的thread:
subscribeOn()
- 決定operator執行時的thread:
observeOn()
- 訂閱Observable:
subscribe()
注意以上種類所含的operator可能不只一種,這邊為了簡化問題只列出最主要。
由上述的operator,我們可以給出一個簡單的例子如下:
Observable.create(new ObservableOnSubscribe<Integer>() { |
整個流程不難理解,就是用create()
建立一個資料源,然後在新的thread上送出一個數字0,然後在Android的main thread接收。
接著我們先從operator互相串接的過程開始介紹。
Connect with each other
Create()
.create(new ObservableOnSubscribe<Integer>() { |
進去create()
後如下:
// In Observable |
接收的ObservableOnSubscribe就是我們建立的,然後將其傳入ObservableCreate。
// In ObservableCreate |
傳進去後沒做特別的事情,就是把ObservableOnSubscribe包進去ObservableCreate。
SubscribeOn()
.subscribeOn(Schedulers.newThread()) |
從前面可知create()
會回傳一個ObservableCreate,因此這邊也可以表示如下:
ObservableCreate.subscribeOn(Schedulers.newThread()) |
接著看進去subscribeOn()
:
// In Observable |
包進去ObservableSubscribeOn的除了Scheduler還有this。由前述的程式碼可知道this就是ObservableCreate。
// In ObservableSubscribeOn |
綜合ObservableCreate的constructor,在這可以建立第一個觀念:
- 任何Observable的constructor,都是將上游的Observable包起來。
ObservableOn()
.observeOn(AndroidSchedulers.mainThread()) |
看進去observeOn()
:
// In Observable |
由前面介紹可知最後會得到一個ObservableObserveOn,其中包含著代表Android main thread的Scheduler和上游的Observable,也就是ObservableSubscribeOn。
到此完成operator的串接,由於Observable在串接過程中是一層包一層,所以最後得到的ObservableObserveOn可以用以下的結構表示:
ObservableObserveOn { |
Subscribe to Observable
subscribe()
.subscribe(new Consumer<String>() { |
看進去subscribe()
:
// In Observable |
在外部定義的Consumer,傳入後被包進去LambdaObserver,再傳入subscribeActual()
。Observable的subscribeActual()
需要被子類實作,推斷這裡的subscribeActual()
是ObservableObserveOn的:
// In ObservableObserveOn |
Worker部分跟thread的操作有關,將在其獨立的篇章介紹。
藉由前章最後的結構圖,可以推斷這裡的source是ObservableSubscribeOn。所以我們在這將前面得到的LambdaObserver包進去ObserveOnObserver。
綜合Observer的constructor,這有第二個觀念:
- 任何Observer的constructor,都是將下游的Observer包起來。
ObservableSubscribeOn並沒有實作subscribe()
,所以回到父類Observable的subscribe()
,再走回ObservableSubscribeOn的subscribeActual()
。
接著是第三個觀念:
- 任何Observable的
subscribe()
都會走回自己的subscribeActual()
。
接著看到ObservableSubscribeOn的subscribeActual()
:
// In ObservableSubscribeOn |
Scheduler在這邊進行thread切換的原理可查看Thread控制。
上游的ObserveOnObserver包進去SubscribeOnObserver,接著放入SubscribeTask並透過Scheduler執行。
到這邊,應可知道source就是ObservableCreate,而subscribe()
會帶到ObservableCreate的subscriberActual()
:
// In ObservableCreate |
這邊的source就是ObservableOnSubscribe,也就是我們一開始宣告給create()
的:
new ObservableOnSubscribe<Integer>() { |
和Observable一樣,Observer也是層層包裝。所以拿來呼叫onNext()
的ObservableEmitter,也就是CreateEmitter的結構如下:
CreateEmitter { |
Send data
接續上個章節最後一段程式碼,我們用ObservableEmitter,也就是CreateEmitter呼叫onNext()
:
// In CreateEmitter |
對照上個章節最後的結構,可以知道這個Observer就是SubscribeOnObserver:
// In SubscribeOnObserver |
沒做什麼事情,直接將資料傳遞到下游的Observer,也就是ObserveOnObserver:
// In ObserveOnObserver |
最後執行到a.onNext()
,這邊的a等於actual,也就是LambdaObserver:
// In LambdaObserver |
這邊的onNext()
就是我們宣告的Consumer:
new Consumer<String>() { |
最後的第四個觀念:
- 資料傳遞的方式是從上到下。
Conclusion
整合目前所建立的觀念如下:
- 任何Observable的constructor,都是將上游的Observable包起來。
- 任何Observer的constructor,都是將下游的Observer包起來。
- **任何Observable的
subscribe()
都會走回自己的subscribeActual()
**。 - 資料傳遞的方式是從上到下。
這些觀念是之後在分析operator時,可用來避免被source或actual之類會不斷重複的字眼所搗亂,如此可以專注在operator的邏輯的不同之處。