在使用RxJava實作的過程中,會經常遇到以下兩個情形:
- 相同的operator組合重複出現
- 相同的資料操作邏輯重複出現
剛好RxJava本身也有提供自訂operator的方式,對應上面情形如下:
- compose()
- lift()
以下將舉例介紹別分析其原理。
compose()
在官方介紹中,其被定義為Transformation Operator,用來將多個operator封裝。以下引用Don’t break the chain: use RxJava’s compose() operator中使用的範例。
試想在使用RxJava時,最終都得使用observeOn()
和subscribeOn()
。如果將這兩個operator封裝,並大量複用的話會變成如下:
<T> Observable<T> applySchedulers(Observable<T> observable) { |
可以看得出來這完全打破RxJava可以串連operator的好處。
透過compose()
我們可以將applySchedulers()
修改如下:
private <T> ObservableTransformer<T, T> applySchedulers() { |
如此就可以不打破鏈結,達到使用observeOn()
和subscribeOn()
的目的:
Observable.from(someSource) |
如果是用JDK 7或更舊的版本編譯,需要改成
this.<Type>applySchedulers()
。
且applySchedulers()
本身並不依賴任何外部的參數,可以被重複使用在各個情境內。
How
接著來看compose()
是如何實作來達到接續串聯的效果,首先看到原始碼內容:
// In Observable |
意外的沒做很多事情,只是透過傳入的composer轉換成ObservableTransformer後執行apply()
,接著再看到ObservableTransformer:
// In ObservableTransformer |
由註解就很明顯地說出apply()
的用途:承先啟後,將上游的Observable處理後,再傳出一個Observable,才能將鏈結延續下去。
綜合前面的分析,如果要使用compose()
,必須要有以下幾個條件:
- 繼承ObservableTransformer,並實作
apply()
- 不論
apply()
內容為何,必須要回傳Observable。 - 必須要使用傳入的Observable作為起點向下串連。
What’s more
compose()
畢竟只是固定的operator組合,通常不需要重複建立新的ObservableTransformer,所以範例中ObservableTransformer的部分可以抽出如下:
final ObservableTransformer schedules = new ObservableTransformer() { |
如此我們就有一個可以一直複用的ObservableTransformer,最後再把applySchedulers()
修改:
private <T> ObservableTransformer<T, T> applySchedulers() { |
此函式看起來有些多餘,但如此可避免直接使用
schedules
時,還要指定型態,如果型態名稱複雜則會降低整體可讀性。
lift()
在官方介紹中,其被定義為Sequence Operators,用在資料流的操作。以下我們用Switch-case and If-else blocks in Rx-Java streams當作範例來說明。
試想我們有一個資料流,然後要針對不同類型的資料做對應的操作,則勢必會在operator內使用switch-case或是if-else的方式作分流:
Observable.fromIterable(list) |
類似的操作可能會在不同的operator鏈結中出現,於是可以用lift()
將switch-case包裝成一個通用型的observer:
caseBlocks.put(1, (integer) -> "First Odd Number : " + integer); |
如此就可以將定義操作和oprator鏈結拆開,保持了整串鏈結的可讀性。同時,由於判斷條件和對應操作已從switch-case分離,使得構成switch-case的元件都可以個別被定義,然後彈性的在不同的情境中複用。
How
接著來看lift()
是如何實作來包裝操作邏輯,首先看到原始碼內容:
// In Observable |
用了ObservableLift來包裝傳入的lifter:
// In ObservableLift |
一般來說,subscribeActual()
的內容只有一行,用來將上下游的Observer串連,如ObservableMap:
// In ObservableMap |
如尚未熟悉Operator之間串連的原理,可以參閱基本概念章節。
而ObservableLift內多出來的部分,則是用傳入的operator來呼叫apply()
,並傳入下游的Observer:
// In ObservableOperator |
由註解可以知道,就是將下游的Observer再做一層打包,產生新的Observer。
實作apply()
的方式就是:
建立新的Observer
並實作其中
onNext()
等含式
如此就能在資料流在往下傳之前,進行一些額外的操作,簡單示意如下:
.lift(new ObservableOperator<Object, String>() { |
綜合前面的分析,如果要使用lift()
,必須要有以下幾個條件:
- 繼承ObservableOperator,並實作
apply()
- 不論
apply()
內容為何,必須要回傳Observer。 - 如在
apply()
內新增自訂的Observer,必須要在各函示最後使用傳入的Observer呼叫對應的函示,如onNext()
。