Thread控制是RxJava的特點之一,透過切換thread,可以將耗時的操作透過特定的thread執行。如此不只避免干擾到畫面的反應時間,擁有專屬的thread來執行操作,也可降低整體所需時間。
在RxJava中控制thread的僅有兩個函式:subscribeOn()
和 observeOn()
。由”基本概念”的那篇文章中可以知道,subscribeOn()
負責決定資料送出的thread;observeOn()
控制執行operator的thread。
建議可先看過”基本概念”再來接續閱讀,因為以下會用到”基本概念”內的觀念。
以下將繼續引用”基本概念”內的範例程式碼:
Observable.create(new ObservableOnSubscribe<Integer>() { |
subscribeOn()
在使用subscribeOn()
時,需要傳入Scheduler,並得到ObservableSubscribeOn:
.subscribeOn(Schedulers.newThread()) |
而這Scheduler會在subscribeActual()
被呼叫時使用:
// In ObservableSubscribeOn |
這邊的SubscribeTask可直接當成單純的Runnable。
看進去scheduleDirect()
:
// Scheduler |
在此先用createWorker()
取得Worker,然後與外部傳進來的Runnable,也就是SubscribeTask,一起傳入DisposeTask:
// In DisposeTask |
在DisposeTask的constructor沒做其他事,所以DisposeTask用來將SubscribeTask再做一層包裝。
接著再回到Worker的subscribe()
,但由於Worker沒有實作schedule()
,需要往前看到createWorker()
:
// In Scheduler |
結果Scheduler的createWorker()
也是一個抽象函式,因此再往前追朔到Scheduler的實際類別,也就是我們呼叫subscribeOn()
時傳入的參數Schedulers.newThread()
:
// In Schedulers |
這裡的NEW_THREAD最後會是NewThreadScheduler,詳細過程會在最後的章節介紹。
接續之前的createWorker()
,從NewThreadScheduler可以找到實作:
// In NewThreadScheduler |
看進去NewThreadWorker:
// In NewThreadWorker |
NewThreadWorker會透過傳入的ThreadFactory建立一個ScheduledExecutorService,也是ScheduledThreadPoolExecutor,並且實作subedule()
:
// In NewThreadWorker |
// In ScheduleRunnable |
外部傳入的Runnable,也就是DisposeTask,會再被包進ScheduledRunnable,然後交由executor,也就是ScheduledExecutorService來傳入submit()
或是schedule()
執行。
到此,我們先來看一下ScheduledRunnable的結構:
ScheduledRunnable { |
接著,我們來看當ScheduledRunnable被執行時的過程:
// In ScheduledRunnable |
直接使用actual,也就是DisposeTask來呼叫run()
:
// In DisposeTask |
再用decoratedRun,也就是SubscribeTask來呼叫run()
:
// In SubscribeTask |
這邊讓我們來回想一下”基礎概念”內提到的兩個觀念:
- 任何Observable的constructor,都是將上游的Observable包起來。
- 任何Observer的constructor,都是將下游的Observer包起來。
所以,這邊的source就是上游的operator,而parent就是下游傳上來的Observer。
因為這個subscribe()
的操作是在Runnable裡面,並由NewThreadWorker自帶的ScheduledExecutorService執行。從這個點開始,任何上游的operator的subscribe()
都是在NewThread內進行。
這邊可以給出一個觀念:
- 原始碼中,Worker呼叫
schedule()
的位置,就是切換thread的起點。
可以推斷的是,最後我們宣告的ObservableOnSubscribe的subscribe()
,裡面的e.onNext()
也是在NewThread內執行。
另外,如果多次使用subscribeOn()
,則會使thread由下而上一路切換,因此這邊有另一觀念:
subscribeOn()
只有離源頭最近的有效。
observeOn()
接著來談observeOn()
如何進行thread的控制,跟subscribeOn()
一樣,使用時需要傳入Scheduler:
.observeOn(AndroidSchedulers.mainThread()) |
而這Scheduler會在subscribeActual()
被呼叫時使用:
// In ObservableObserveOn |
傳進去的Worker會在ObserveOnObserver的schedule()
內使用:
// In ObserveOnObserver |
根據前述關於Worker的新觀念,我們可推斷這邊的schedule()
就是切換thread的起點。
而worker.schedule()
又是onNext()
呼叫schedule()
而來,因此observeOn()
是在資料傳遞時切換thread。也因此如果多次使用,其只會影響下游的operator的thread,於是有以下觀念:
observeOn()
可多次使用並且有效。
Going to deeper
到這我們介紹了subscribeOn
和observeOn()
透過Scheduler控制thread的方法及內部流程,其中沒有直接針對thread的操作。
於是這裡遇到的問題是:那thread是在哪裡和哪時被建立並且啟動的?
由於thread是透過Scheduler控制,所以可以確定在Scheduler尚未指定前,一定沒有啟動任何新的thread,因此我們可以猜測合理的時機要不在一開始就啟動,不然就是要使用前啟動,也就是以下兩種狀況:
指定Scheduler時。
Scheduler的Worker呼叫
schedule()
時。
指定Scheduler時
在Android實作中,observeOn()
一般來說都是main thread,一定是已經啟動的,所以我們用subscribeOn()
來做範例:
.subscribeOn(Schedulers.newThread()) |
// In Schedulers |
這裏直接使用了一個靜態變數NEW_THREAD,再看到與NEW_THREAD相關的程式碼:
// In Schedulers |
我們可以看到有一個靜態的constructor,可知這邊就是整個流程起點:
- 將NewThreadTask傳給
initNewThreadScheduler()。
- NewThreadTask的
call()
會被執行。 - 將
NewThreadHolder.Default
,即是NewThreadScheduler取出並指定給NEW_THREAD。
接著我們再看到NewThreadScheduler的constructor:
// In NewThreadScheduler |
到此完成建立並回傳Scheduler的流程,其中沒有任何啟動thread的機制。代表在指定Scheduler的同時啟動thread的假設並不成立,接著我們在看另一個假設。
Scheduler的Worker呼叫schedule()
時
前面的章節可以知道,Worker.schedule()
最後會使用ScheduledThreadPoolExecutor的submit()
或schedule()
:
// In ScheduledThreadPoolExecutor |
不論是submit()
或是schedule()
都會走到delayedExecute()
。接著,在正常的情況下會走到ensurePrestart()
:
// In ThreadPoolExecutor |
從ensurePrestart()
走到addWorker()
,然後看到一個ThreadPoolExecutor內部類別Worker,其建構式用到ThreadFactory呼叫newThread()
。
根據前面的內容可以知道,NewThreadScheduler透過createWorker()
建立NewThreadWorker時,會傳入靜態變數THREAD_FACTORY,同時也是RxThreadFactory。
此ThreadFactory會再ScheduledThreadPoolExecutor。因此,呼叫getThreadFactory().newTherad()
,就是RxThreadFactory.newThread()
:
// In RxThreadFactory |
在這我們終於看到Thread被建立,用的是Thread的constructor:
// In Thread |
Thread的constructor內只有設定一些相關的值,並透過ThreadGroup呼叫addUnstarted()
,根據addUnstarted()
的comment,裡面只是用於通知ThreadGroup有新的Thread要加入,但還不會改變已執行中的Thread列表。
回到addWorker()
,最後用傳回的Thread呼叫start()
:
// In Thread |
在這就透過ThreadGroup的add()
先通知剛加入的Thread即將啟動,並將其加入已啟動的Thread列表。然後透過底層來執行Thread的run()
,到這邊thread算是正式被啟用:
// In Thread |
此時的run()
就是已經在新的Thread上執行。
這邊的target,是在Thread建立時傳入的一個外部Runnable,往回可以到ThreadPoolExecutor.Worker的constructor:
// In ThreadPoolExecutor.Worker |
所以target就是Worker本身,於是再看到Worker的run()
:
// In ThreadPoolExecutor.Worker |
繞了一大圈,在這邊我們看到task被執行的部分,這task就是我們在透過Scheduler呼叫scheduleDirect()
時傳入的DisposeTask。
到此我們終於確定Scheduler所控制的Thread的建立和啟動的時機,最後來總結整個啟動的過程如下:
ObservableSubscribeOn.subscribeActual |