Affected files: .obsidian/daily-notes.json .obsidian/plugins/periodic-notes/data.json .obsidian/templates.json .obsidian/workspace 01. 個人/02. 專注Study/Android/ADB 取得 APK 的 icon.md 01. 個人/02. 專注Study/Android/ADB.md 01. 個人/02. 專注Study/Android/AOSP.md 01. 個人/02. 專注Study/Android/Android programming.md 01. 個人/02. 專注Study/Android/Ktor.md 01. 個人/02. 專注Study/Android/Service.md 01. 個人/02. 專注Study/Android/Tools.md 01. 個人/02. 專注Study/Android/UI.md 01. 個人/02. 專注Study/C++/C++17.md 01. 個人/02. 專注Study/C++/Class template.md 01. 個人/02. 專注Study/C++/Structured binding declaration.md 01. 個人/02. 專注Study/C++/for_each.md 01. 個人/02. 專注Study/C++/lambda.md 01. 個人/02. 專注Study/C++/lvalue.md 01. 個人/02. 專注Study/C++/move operator.md 01. 個人/02. 專注Study/C++/rvalue.md 01. 個人/02. 專注Study/C++/智慧指標.md 01. 個人/02. 專注Study/RxKotlin/20200207 - Study RxKotlin.md 03. 資料收集/01. Programming/COM/20210726 - COM Interface.md 03. 資料收集/01. Programming/DB/MySQL.md 03. 資料收集/01. Programming/DB/sqlite.md 03. 資料收集/01. Programming/Design Pattern.md 03. 資料收集/01. Programming/FFMPEG/00. Introduction.md 03. 資料收集/01. Programming/FFMPEG/01. Setup.md 03. 資料收集/01. Programming/FFMPEG/FFMpeg.md 03. 資料收集/01. Programming/Flask.md 03. 資料收集/01. Programming/Media Foundation/20210604 - Windows media foundation.md 03. 資料收集/01. Programming/OpenCV.md 03. 資料收集/01. Programming/OpenGL.md 03. 資料收集/01. Programming/Python/argparse.ArgumentParser.md 03. 資料收集/01. Programming/Python/decorator.md 03. 資料收集/01. Programming/Python/logging.md 03. 資料收集/01. Programming/Python/opencv.md 03. 資料收集/01. Programming/Python/subprocess.md 03. 資料收集/01. Programming/Python/threading.md 03. 資料收集/01. Programming/Python/tkinter.md 03. 資料收集/01. Programming/Python/檢測工具.md 03. 資料收集/01. Programming/QT/Dropdown button.md 03. 資料收集/01. Programming/QT/QVariant.md 03. 資料收集/01. Programming/QT/Qt.md 03. 資料收集/01. Programming/UML.md 03. 資料收集/01. Programming/演算法.md 03. 資料收集/99. templates/blogHeader.md 03. 資料收集/99. templates/date.md 03. 資料收集/99. templates/front matter.md 03. 資料收集/99. templates/note.md 03. 資料收集/99. templates/table.md 03. 資料收集/99. templates/thisWeek.md 03. 資料收集/99. templates/日記.md 03. 資料收集/99. templates/讀書筆記.md 03. 資料收集/Hobby/RC.md 03. 資料收集/Hobby/RC/Traxxas Sledge.md 03. 資料收集/Hobby/RC/好盈電變調整中立點.md 03. 資料收集/Hobby/RC/差速器調教教學.md 03. 資料收集/Linux/CLI/cut.md 03. 資料收集/Linux/CLI/scp.md 03. 資料收集/Linux/CLI/timedatectl.md 03. 資料收集/Programming/Qt.md 03. 資料收集/Tool Setup/Hardware/RaspberryPi.md 03. 資料收集/Tool Setup/Software/Chrome.md 03. 資料收集/Tool Setup/Software/Obisidian.md 03. 資料收集/Tool Setup/Software/SublimeText.md 03. 資料收集/Tool Setup/Software/VirtualBox.md 03. 資料收集/Tool Setup/Software/Visual Studio Code.md 03. 資料收集/Tool Setup/Software/Windows Setup.md 03. 資料收集/Tool Setup/Software/Windows Terminal.md 03. 資料收集/Tool Setup/Software/freefilesync.md 03. 資料收集/Tool Setup/Software/vim.md 03. 資料收集/翻牆/V2Ray.md 03. 資料收集/翻牆/Wireguard.md 03. 資料收集/軟體工具/youtube-dl.md
37 KiB
剛開始
先說說Rx
Rx最早是Microsoft的某個實驗室為了解決asychronous、scalable還有一些app問題而提出來的libraray。大概在2009年的時候提出,叫做Reactive Extension for .NET(Rx). 一開始是以add-on的方式安裝在.NET 3.5上,到了.NET 4.0就變成了內建的library。也因為它open source的關係,讓其他語言得以將這套概念也移植過去,所以現在有RxJS, RxSwift, RxNET, RxScale, RxJava。這些library都致力於在它們的語言來實作出相同的「行為」,所以理論上iOS工程師可以和Web工程師用Rx來討論app的邏輯是沒有問題的。
Rx的官網:http://reactivex.io/,它的logo是一支電鰻(Electric eel):
!
Rx Community
什麼是RxJava?
RxJava is a library for composing asynchronous and event-based code using observable sequences and functional style operations, allowing for parameterized execution via schedulers.
RxJava, in its essence, simplifies developing asynchronous programs by allowing your code to react to new data and process it in a sequential, isolated manner. In other words, RxJava lets you observe sequence of asychronous events in an app and respond to each event accordingly. Examples are taps by a user on the screen and listening for results if asynchronous network calls.
再說RxJava
RxJava是一個實作Rx的framework。 RxJava與其他的Rx library提供了asynchronous與event-based的解決辦法
而Asychronous code跟Sychronous code的差異: Sychronous code按照字面上的意思執行,每一次的結果都相同。 Asychronous code則是在必要的時候才被使用,每一次執行的「狀態」不盡相同。也就是沒辦法控制其順序與時間。
Asychronous programming的詞彙
1. State
- State指的是我們程式所儲存的資料與程式自身行為互動所產生的狀態。
2. Imperative programming
- Imperative programming(指令式程式設計)是用一連串的命令或描述來改變程式的狀態。如下面的code:
這些code可能有一些相關的邏輯,但是字面上看不出來,即使互相調換可能會造成錯誤,但也可能不會。
setupUI() bindClickListeners() createAdapter() listenForChanges()
3. Side effect
- Side effect指的是「一段程式修改了它本身區域外的狀態」,譬如說,一個處理event的function它除了處理event之外,也改變的UI上所顯示的文字。
- Side effect並不總是不好的,我們的程式就是要對某些東西做出改變,完全無法改變任何東西的程式是沒有用的。
RxJava試著用接下來的2個概念來解決剛剛提到的3個概念上的問題
4. Declarative code
- 又叫Fucntional programming,Fucntional programming不產生任何side effect。
- Declarative code定義的是行為。
- RxJava試著在Declarative code和Imperative programming取一個平衡點,它定義行為,然後依順序執行。
5. Reactive systems
Reactive systems通常有幾個特性:
- Reponseive:保持UI在最新狀態
- Resilient:每個行為都是獨立定義的,而且有辦法靈活的處理錯誤。
- Elastic:程式的十座可以處理不同的工作量
- Message driven:每個元件使用Message driven(訊息驅動)的方式來互相溝通,並改進可用性與獨立性,解開(decouple)生命週期與實作的關聯。
Rx的三大組成
1. Observables
Observable<T>是Rx的基礎之一,Observable允許觀察者觀察它,並接收它發出來的資料。
Observables 的基礎:event
Observable會以3種事件(event)來發出資料:
- next:next event會伴隨著一筆資料,這也是觀察者用來接收資料的event。
- complete:complete event表示Observable已經「成功的」結束了它的生命週期,在complete event之後,觀察者不會再收到任何next event。
- error:error event表示Observable在發生錯誤的情況下結束它的生命週期。跟complete event依樣,後續不會再有任何next event。
一個Observable用next所發出來的一連串資料我們稱為"sequence"。sequence可以分為兩種:
-
Finite sequnece: 想像你要下載一個檔案,我們的code大概是這個樣子:
API.download(file = "http://www...") .subscribeBy( onNext = { // Handle downloading here }, onComplete = { // Handle download finish here }, onError = { // Handle error here }, )API.download()會產生一個Obervable,然後我們藉由subscribeBy來訂閱他,並加入我們的處理程序,我們在onNext裡面處理接收到的檔案buffer,在onComplete裡面了解到檔案已經完成下載,可以做一些後續的處理,onError則是發生了某些錯誤,需要重來或是通知使用者之類。 -
Infinite sequence: Switch button就是一個例子,我們要處理switch button的code會是這樣:
switch.checkdChanges() .subscribeBy( onNext = { isOn -> if (isOn) { // Handle on here } else { // Handle off here } } )可以看到這一段
subscribeBy()裡面並沒有onComplete跟onError,因為switch button根本就不會產生這兩種event。
2. Operators
Operators用來處理Observable所發出來的資料,可能是過濾或者做一些轉換,或其他操作。再以switch button做例子,下面的code可以把switch button的狀態做幾個改變:
- 我們只想收到on的狀態。
- 把on的狀態轉為一個字串"We've been toggled on!"。
switch.checkdChanges()
.filter { it == true }
.map { "We've been toggled on!" }
.subscribeBy(
onNext = { message ->
updateTextView(message)
}
)
3. Schedulers
Scheduler可以想像成thread,RxJava已經內建了好幾種scheduler,而且應該可以適用於大部分的情形。
例如IO scheduler可以讓你的檔案下載在背景執行,TeampolineScheduler可以讓你的程式同時執行, ComputationScheduler可以讓你將程式分配給不同的thread來處理需要大量運算的資料。
RxJava是一個很獨立的library,所以有2個library可以跟RxJava一起合作:
- RxAndroid:提供Android Looper class跟RxJava的scheduler之間的橋接管道。
- RxBinding:用來把UI的click listen之類的callback轉變為Observable的
subscribeBy。
安裝
在build.gradle裡的depedencies區域加入:
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation "io.reactivex.rxjava3:rxkotlin:3.0.0"
implementation "io.reactivex.rxjava3:rxandroid:3.0.0"
Observable
Standard Observable has three types of event:
- next
- complete
- error
Obervable很適合用marble diagram來表示:
!
3個event解釋如下:
onNext():onNext()event會伴隨著一筆資料,這也是觀察者用來接收資料的event。onComplete():onComplete()event表示Observable已經「成功的」結束了它的生命週期,在onComplete()event之後,觀察者不會再收到任何onNext()event。onError():onError()event表示Observable在發生錯誤的情況下結束它的生命週期。跟onComplete()event依樣,後續不會再有任何onNext()event。
另外,要注意:一個Observable在沒有被訂閱的情況下,「是不會發送任何event的」。
A example of usage of standard Observable:
API.download("http://...")
.subscribeBy(
onNext = { /* do something */ },
onComplete = { /* do something */ },
onError = { /* do something */ },
)
建立Observable的方法
1. just
val observable = Observable.just(1, 2, 3)
變數observable的內容設為1個"1、2、3"三個數,型別會是Observable<Int!>!。
如果使用了onNext來發送event的話,將會依序發送1、2、3。
但如果是:
val observable = Observable.just(listOf(1, 2, 3))
變數observable的內容會是一個list,這個list的內容是"1、2、3"。型別是Observable<List<Int>!>!。
如果使用了onNext來發送event的話,將發送一個包含1、2、3的list。
2. fromIterable
用來將list的內容轉變為一個一個單獨的element給Observable。
val observable = Observable.fromIterable(listOf(2, 3, 1))
變數observable的型別會是Observable<Int!>!,而不是Observable<List<Int>!>!。
3. empty
建立一個「空的」Observable,可以用來表示一個馬上就會結束的事情,或是不包含任何東西的情況。
val observable = Observable.empty<Unit>()
observable.subscribeBy(
onNext = { println(it) },
onComplete = { println("Completed") }
)
用empty()所建立的observable只會發出onComplete() event,所以上面的onNext() event永遠不會發生。
還有,Observable所包含的element一定要有一個型別,而且不可以是null,所以上面的empty()必須明白的寫出Unit型別:empty<Unit>() 。
4. never
建立一個不會發出任何event的observable。
5. range
產生一個範圍的數列,參數型別必須是整數(Int)。
val observable = Observable.range(1, 10)
上例的onNext()會依序發送1~10的數字出來。
6. create
用來定義自己的event發送方法。 範例:
val observable = Observable.create<String> { emitter ->
emitter.onNext("A")
emitter.onNext("C")
emitter.onNext("B")
emitter.onComplete()
}
val subscription = observable.subscribeBy(
onNext = { println("Received: $it") },
onComplete = { println("Completed") },
onError = { println("Completed") }
)
create必須帶入要發送的型別,例如Int、String或是任何class,此例中是create<String>,表示會送出的element是String型別。
然後create則是發送的實作,範例是會發送"A" -> "C" -> "B",然後用onComplete來結束。
注意:要是observable沒有onComplete或是onError,然後Disposable(也就是訂閱者)也沒有呼叫dispose(),則會造成memory leak。
7. defer
defer會建立一個Observable factory,每一次呼叫這個factory都會產生一個新的Observable。defer只有一個參數,就是我們要「製造」Observable的方法:
var flip = false
val factory: Observable<Int> = Observable.defer {
flip = !flip
if (flip) {
Observable.just(1, 2, 3)
} else {
Observable.just(4, 5, 6)
}
}
defer後面的lambda就是我們要「製造」Observable的方法。當flip是true的時候,我們產生Observable.just(1, 2, 3),反之則產生Observable.just(4, 5, 6)。Observable裡面所帶的element都是整數,這也是為什麼factory的型別是Observable<Int>。
接下來訂閱這個factory:
for (i in 0..3) {
val subscription = factory.subscribe {
println("Factory out: $it")
}
disposables.add(subscription)
}
disposables.dispose()
上面的例子產生了4個Observable。依照flip的值來產生不一樣內容的Observable。
How to subscrible a Observable
訂閱Observable的方法
observable.subscrible()observable.subscribleBy()
Remember to release the resource. Call disposable() if you don't need Observable anymore. Or use CompositeDisposable() to collect all Disposable and release them.
特殊的Observable
1. Single
Single只有onSuccess跟onError兩種event。在發出onSuccess或是onError之後,Single就結束了。
譬如說讀取檔案,只會有讀取成功跟讀取失敗兩種情況,下面的範例讀取一個檔案,要是檔案不存在就發送onError(),反之就發送onSuccess()。
val subscriptions = CompositeDisposable()
fun loadText(filename: String): Single<String> {
return Single.create create@{ emitter ->
val file = File(filename)
if (!file.exists()) {
emitter.onError(FileNotFoundException("Can't find $filename"))
return@create
}
val contents = file.readText(Charsets.UTF_8)
emitter.onSuccess(contents)
}
}
// Use the single observable
val subscription = loadText("Copyright.txt")
.subscribeBy(
onSuccess = { println("Success read: $it") },
onError = { println("Error: $it") }
)
subscriptions.add(subscription)
loadText()這個function會返回Single<String>物件,要是讀取檔案成功,就把檔案內容用onSuccess()發送出來:
val contents = file.readText(Charsets.UTF_8)
emitter.onSuccess(contents)
要是檔案不存在,就發出onError():
emitter.onError(FileNotFoundException("Can't find $filename"))
2. Completable
Completable只有onCompleted跟onError兩種event。跟Single一樣,在發出onCompleted或是onError之後,Completable就結束了。
3. Maybe
Maybe是Single跟Completable的混合,他有onSuccess(value)、onCompleted跟onError三種event。Maybe只會發出這三種的其中一種event,然後就結束了。
停止訂閱或是結束一個Observable
使用Disposable.dispose()
每一次呼叫observable.subscrible()或是observable.subscribleBy()都會回傳一個Disposable物件,當我們不再需要訂閱一個Observable的時候,我們必須呼叫dispose()停止訂閱:
val alphaSequnce = Observable.just("A", "B", "C")
val subscription = alphaSequece.subscribe {
println(it)
}
subscription.dispose()
使用CompositeDisposable.dispose()
對每一個Disposable物件在停止訂閱之後都要呼叫一次dispose()是很煩人的,RxJava提供了一個CompositeDisposable class。它可以收納所有的Disposable物件,然後一次停止:
val subscriptions = CompositeDisposable()
val subscriptionNumbers = Observable.just(1, 2, 3).subscribe {
println("Numbers: $it")
}
val subscriptionAlphabets = Observable.just("A", "B", "C").subscribe {
println("Alphabets: $it")
}
subscriptions.add(subscriptionNumbers)
subscriptions.add(subscriptionAlphabets)
subscriptions.dispose() <-- subscriptionNumbers 與 subscriptionAlphabets 都會一起呼叫dispose()
忘記呼叫dispose()可能會造成memory leak。
Subjects
Observable必須在建立的時候就指定好資料,之後沒辦法再新增資料。而Subject可以在建立資料之後,再新增資料,Subject也會將新增的資料再馬上轉發給它的訂閱者。
1. PublishSubject
PublishSubject剛開始是沒有任何資料的,它也只會將最新的資料發送給它的訂閱者。另外,要是PublishSubject本身結束了(已經送出了onComplete event),那麼新的訂閱者將不會收到任何資料,但是會收到onComplete event。
val publishSubject = PublishSubject.create<Int>()
publishSubject.onNext(0)
val subscriptionOne = publishSubject.subscribe {
println(it)
}
publishSubject.onNext(1)
publishSubject.onNext(2)
val subscriptionTwo = publishSubject.subscribe {
println("2: $it")
}
publishSubject.onNext(3)
subscriptionOne.dispose()
publishSubject.onNext(4)
publishSubject.onComplete()
publishSubject.onNext(5)
subscriptionTwo.dispose()
val subscriptionThree = publishSubject.subscribeBy(
onNext = { println("3: $it") },
onComplete = { println("3: Completed") }
)
上例中的subscriptionThree只會收到onComplete event,也就是只會印出"3: Completed"。
2. BehaviorSubject
行為跟PublishSubject類似,但是BehaviorSubject會發送「最後一筆資料」給新的訂閱者。如果BehaviorSubject最後的event是onError,那麼新的訂閱者也會收到onError event。例:
val subscriptions = CompositeDisposable()
val behaviorSubject = BehaviorSubject.createDefault("Initial value")
behaviorSubject.onNext("X")
val subscriptionOne = behaviorSubject.subscribeBy(
onNext = { println("1: $it") },
onError = { println("1: ERROR, $it") }
)
behaviorSubject.onError(RuntimeException("Error!"))
behaviorSubject.onNext("Y") // <-- 不會有人收到,因為已經被onError給terminate了
subscriptions.add(behaviorSubject.subscribeBy(
onNext = { println("2: $it") },
onError = { println("2: $it") }
))
另外,可以直接取得BehaviorSubject目前的值,以上例來說,只要用behaviorSubject.value就可以,這方法可以很方便的在Rx與非Rx的程式中交換資料。
例子中是用static method BehaviorSubject.createDefault()來建立一個有初始值的BehaviorSubject,當然也可以跟PublishSubject一樣,用BehaviorSubject.create()來建立。
3. ReplaySubject:
BehaviorSubject會發送會後一筆資料,ReplaySubject就是發送最後n筆資料。我們可以用ReplaySubject.createWithSize()這個static method來建立一個ReplaySubject。例:
val replaySubject = ReplaySubject.createWithSize<String>(2)
變數replaySubject的buffer容量是2,型別是String。
4. AsyncSubject
AsyncSubject的行為比較特別,AsyncSubject只會結束的時候,同時發出最後一筆資料。也就是說,即便一直提供資料給AsyncSubject,它也不會發出任何onNext event給它的訂閱者,直到它收到onComplete的時候,它才會同時發出最後一筆onNext與onComplete給它的訂閱者。
5. RxRelay
RxRelay永遠不會發出onComplete或是onError。下面例子建立了一個PublishRelay:
val publishRelay = PublishRelay.create<Int>()
要使用RxRelay library,必須在build.gradle裡面加入:
implementation "com.jakewharton.rxrelay3:rxrelay:3.0.0"
Operators
1. Filtering Operators
ignoreElement
ignoreElement()會忽略掉由20200207 - Study RxKotlin#Subjects丟出來的next event,訂閱者只會收到onCompleted跟onError這兩種event,也就是讓Subject退化成20200207 - Study RxKotlin#2 Completable。
例:
val subscriptions = CompositeDisposable()
val strikes = PublishSubject.create<String>()
subscriptions.add(
strikes.ignoreElements()
.subscribeBy {
println("Done") <-- 只會收到onComplete跟onError
}
)
elementAt
elementAt()只會處理「第n個」next event,n之前跟n之後的都會被忽略。例如:
val subscriptions = CompositeDisposable()
val strikes = PublishSubject.create<String>()
subscriptions.add(
strikes.elementAt(2) <-- 只要收第2個
.subscribeBy {
println("Get $it")
}
)
strikes.onNext("A") <-- 第0個
strikes.onNext("B") <-- 第1個
strikes.onNext("C") <-- 第2個
strikes.onNext("D") <-- 第3個
上面例子只要收「第2個next event」,所以只會收到"C"。這也是一個onSuccess event。
elementAt()也等於是把Subject退化成20200207 - Study RxKotlin#3 Maybe。
要是Subject的「第n個」還沒收到就結束了,那就是收到onComplete event。
filter
filter()接收一個lambda函數,每一個next event所帶的element都必須經過這個函數的「驗證」,只有驗證結果為true的時候,才會pass給訂閱者。例:
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.filter {
it > 5 <-- 數值必須大於5才可以pass給訂閱者
}
.subscribe {
print("Get number: $it\n")
}
)
所以上面的結果會收到6、7、8、9、10。
skip
忽略「前n個」next event。
Observer.just(1, 2, 3, 4 ,5)
.skip(3) <-- 忽略前3個
.subscribe {
println("it")
}
// Output 4, 5
skipWhile
skipWhile也是用一個lambda來當作通過條件,跟20200207 - Study RxKotlin#filter類似。但不像filter會去檢查「每一個」進來的element,skipWhile是「當lambda檢查到第一個false的時候,後面全部通過」。
另一個跟filter不同的是,skipWhile是在檢查為true把next event忽略掉,檢查到false的時候開始放行。
例如,我們要收集字串,但我們要當字串是"start"的時候才開始收集字串,例:
val subscriptions \= CompositeDisposable()
subscriptions.add(
Observable.just("1", "2", "3", "Start", "1", "3", "2")
.skipWhile { it != "Start" } <-- 比對為false開始放行
.subscribe {
println("Get $it")
}
)
// Output
Get Start
Get 1
Get 3
Get 2
skipUntil
與前面的skip operator不同,skipUntil不是用lambda來決定skip的條件,而是依賴於「另一個subject」,skipUntil會一直忽略,直到「另一個subject」發出onNext event。例:
val subject = PublishSubject.create<String>()
val trigger = PublishSubject.create<String>()
subject
.skipUntil(trigger)
.subscribe {
println("it")
}
In this code you'll get nothing, until trigger sent an onNext() event.
Example:
subject.onNext("A") // Ignored
subject.onNext("B") // Ignored
trigger.onNext("1") // TRIGGER!
subject.onNext("C") // send out
take
take跟20200207 - Study RxKotlin#skip相反,take是接收「前n個」訊息,之後全部忽略。
takeWhile
takeWhile是用lambda當判斷條件,當判斷為true的時候放行,一旦判斷為false,之後的所有訊息都會被忽略。
exampleOf("takeWhile") {
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1))
.takeWhile {
it < 5
}
.subscribe {
println("Get $it")
}
)
}
上例中,判斷到5的時候,it < 5為false,所以之後的都會忽略,即使最後那個1也是一樣被忽略。
takeWhile跟20200207 - Study RxKotlin#skipWhile一樣,但是行為相反。
takeUntil
跟20200207 - Study RxKotlin#skipUntil相反的行為,會一直接收訊息,直到他依賴的subject發出訊息後停止。
distinctUntilChanged
distinctUntilChanged用來過濾「連續且相同」的訊息,例如連續的"Hi",那麼就只會收到第1個"Hi",之後的都不會收到。但是一旦收到的訊息改變了,再次收到以前發過的訊息,只要它沒有跟前一筆相同,那麼就還是會接收。例:
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just(5, 5, 3, 3, 1)
.distinctUntilChanged()
.subscribe {
println(it)
}
)
// Output
5
3
1
distinctUntilChanged預設用class的equal() method來判斷,當然我們也可以給它一個lambda來當客製我們的條件,被lambda判斷為true的話,該訊息就會被忽略:
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just("ABC", "CCD", "FAG")
.distinctUntilChanged { first, second ->
first[second.length - 1] == second[0]
}
.subscribe {
println("Get $it")
}
)
上例中,我們希望「第2個字串的開頭字母要是跟第1個字串的結尾字母一樣的話,那麼就不要顯示」。第1筆"ABC"一定會接收,第2筆"CCD"則會被忽略,第3筆"FAG"會被接收。
2. Transforming Operators
1. toList
toList可以把每一個單獨從Observalbe發出來的元素變成一個list,如:
val subscriptions = CompositeDisposable()
val items = Observable.just("A", "B", "C")
subscriptions.add(
items.toList()
.subscribeBy {
println(it)
}
)
原本會單獨發出的"A", "B", "C",現在變成只翠發出一個List,其內容是["A", "B", "C"]。
2. map
map跟Kotlin的map行為上差不多,只是Kotlin的map是作用在List上,而RxJava的map是作用在Observable上。map根據你提供的lambda函式來對每一個element做轉換,如下例,將每一個羅馬數字轉換為阿拉伯數字:
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just("M", "C", "V", "I")
.map {
it.romanNumeralIntValue()
}
.subscribeBy {
println(it)
}
)
注意到了嗎?map用來轉換Observable所包含的item型別,上例中,Observable的item本來是一個字串(String),被map轉換為數字(Int)。
3. flatMap
flatMap用來處理「Observable發出來的Observable」,並且「記錄每一個變化」。用例子比較好說明:
class Student(val score: BehaviorSubject<Int>)
val subscriptions = CompositeDisposable()
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = PublishSubject.create<Student>()
student
.flatMap {
it.score
}
.subscribeBy {
println(it)
}
.addTo(subscriptions)
student.onNext(ryan) <-- 1
ryan.score.onNext(85) <-- 2
student.onNext(charlotte) <-- 3
ryan.score.onNext(95) <-- 4
charlotte.score.onNext(100) <-- 5
我們有一個叫做student的PublishSubject,另外有兩個Student class(分別是ryan與charlotte),而這個Student class有一個member叫做score,score的類別是BehaviorSubject<Int>。我們的flatMap lambda不做任何轉換,直接bypass分數。
- student先選擇ryan來發出第一個onNext event,ryan原本的分數是80,所以我們會收到80。
- ryan變更為85,所以我們會收到85。
- student選擇了charlotte並發出一個onNext event,charlotte原本的分數是90,所以我們會收到90。
- ryan變更為95,所以我們會收到95。
- charlotte變更為100,所以我們會收到100。 結果會收到80、85、90、95、100。
ryan跟charlotte都是獨立的Observable,但透過flatMap我們可以把它們的值(以及後續的變化)變成一連串的數值,這就是flat的意思。
!
4. switchMap
switchMap跟flatMap類似,也是處理「Observable發出來的Observable」,但是差別在於switchMap一但切換到新的Observable之後,上一個Observale的訊息就部會收到了,以flatMap的例子來說,在student.onNext(charlotte)這一行之後,ryan的改變就不會收到了。例:
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = PublishSubject.create<Student>()
student
.switchMap {
it.score
}
.subscribe {
println(it)
}
student.onNext(ryan)
ryan.score.onNext(85)
student.onNext(charlotte)
ryan.score.onNext(95)
charlotte.score.onNext(100)
結果會收到80、85、95、100。ryan.score.onNext(95)這一行的95不會收到。
switchMap適合用在會「改變興趣」的場景,例如說原本是要持續收到台北氣溫的改變,接著使用者把地點改到高雄,那我們就會變成持續收到高雄的溫度變化而不是台北的,又或者說,你會隨著使用的的輸入持續的搜尋結果,例如使用者依序輸入k、o、t、l、i、n,每輸入一個字母我們就搜尋一次,但我們只關注最後一個字串搜尋,不在意之前的搜尋結果。
5. materialize
materialize能將Observable的值包裝成一個Notification,回到20200207 - Study RxKotlin#4 switchMap的例子,如果任何一個學生發出了onError的訊息,那麼連student本身都會因為這個Exception而中斷,所以即使切到了charlotte,我們也收不到charlotte的訊息了。materialize可以將onError包裝成一個Notification,讓exception留在ryan本身而不會影響到上面的student。
val subscriptions = CompositeDisposable()
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = BehaviorSubject.createDefault<Student>(ryan)
// 1
val studentScore = student.switchMap { it.score.materialize() } <-- HERE!
// 2
subscriptions.add(
studentScore
.subscribe {
println(it)
}
)
// 3
ryan.score.onNext(85)
ryan.score.onError(RuntimeException("Error!"))
ryan.score.onNext(90)
// 4
student.onNext(charlotte)
6. dematerialize
dematerialize用來反解materialize所包裝的東西,例如上例中,會將materialize所包裝出來的Observable<Notification<Int!>!>!反解為Observable<Int>!,例:
subscriptions.add(
studentScore
.filter {
if (it.error != null) {
println("Got error: ${it.error}")
false
} else {
true
}
}
.dematerialize { it }
.subscribe {
println(it)
}
)
要注意的是如果發生Exception的話,直接println還是會產生Exception,所以需要用filter來把error給濾掉。
!
在Subject所發出的element仍然是Subject的時候,如果element發生error(Exception),會導致上層的Subject也跟著停止,materialize/dematerialize可以用來包裝element,讓element所發出的東西都變成Notification,這樣就部會影響上層的Subject了。
3. Combining Operators
startWith
用來在Observable本身所帶的item前面再加上其他item。實際的有startWithIterable()與startWithItem()。
例:
val subscriptions = CompositeDisposable()
val numbers = Observable.just(3, 4, 5)
val startWith = numbers.startWithIterable(listOf(1, 2))
startWith.subscribe {
println(it)
}.addTo(subscriptions)
concat
concat是一個static method,用來合併2個Observable。concat會先等第一個Observable結束,然後再等待第二個,之後把它們合併起來。
val subscriptions = CompositeDisposable()
val first = Observable.just(1, 2, 3)
val second = Observable.just(3, 4, 5)
Observable.concat(first, second)
.subscribe {
println(it)
}
.addTo(subscriptions)
concatWith
跟20200207 - Study RxKotlin#concat一樣,但是concatWith是一個member function,而不是一個static method。concatWith一樣會先等自己結束,然後再等第二個Obervable(當參數的那一個)結束,之後再合併。
val subscriptions = CompositeDisposable()
val first = Observable.just(1, 2, 3)
val second = Observable.just(3, 4, 5)
first.concatWith(second)
.subscribe {
println(it)
}
.addTo(subscriptions)
注意
要被合併的兩個Observable類型必須要一樣,不可以一個是
Obsrvable<String>而另一個是Observable<Int>,compiler會報錯喔。
concatMap
concatMap接受一個lambda函示,並回傳另一個Observable序列,concatMap會保證Observable的順序。
Given multiple Observable, and map each Observable to a lambda function. And make sure the sequence of given Observable list.
merge
merge是一個static function。
merge會按照接收的順序把element合併起來,例:
val subscriptions = CompositeDisposable()
val odd = PublishSubject.create<Int\>()
val even = PublishSubject.create<Int\>()
Observable.merge(odd, even)
.subscribe {
println(it)
}
.addTo(subscriptions)
odd.onNext(1)
even.onNext(2)
odd.onNext(3)
even.onNext(4)
odd.onNext(5)
even.onNext(6)
odd與even兩個交互發出elemet,merge依順序接收,而不是像20200207 - Study RxKotlin#concat是依照Observable的順序。
merge complete的時機點定義如下:
- 當來源的Observable與內部的Observable都complete的時候,merge本身也會發出complete。
- 內部Observable結束的順序跟接收的順序沒有關係。(一律按照接收點)
- 如果有任何Observable發生error,
merge會轉發這個error,然後自己發生terminate。
Q:
- What's the different between
flatMap()? -> 很大的差別,flatMap()有map的功能。
mergeWith
就像concatWith()與concat()的關係,mergeWith()跟merge也是一樣的關係。
mergeWith()是一個member function,必須由某個Observable instance來呼叫。
combineLatest
combineLatest是Observables的static funtion(注意不是Observable)。
combineLatest只會接收2個Observalbe的「最後一個」elements,然後交由你所提供的lambda來處置,例:
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
Observables.combineLatest(left, right) { leftString, rightString ->
"$leftString, $rightString"
}.subscribe {
println(it)
}.addTo(subscriptions)
left.onNext("Hello")
right.onNext("World")
left.onNext("It's nice to")
right.onNext("be here")
left.onNext("Actually, it's super great to")
重點:
- 在上例中是直接結合2個字串,但是其實可以是任何用途。
- 在實務中,
combineLatest可以用來結合2個不同型別的Observable,然後再回傳另一個不同型別的Observable。combineLatest回傳的Observable型別由lambda決定。 combineLatest必須在「每一個」Observable都發出element之後才會動作。如果不確定Observable是否會發出element,可以使用20200207 - Study RxKotlin#startWith來讓Observable有一個初始值,這樣可以避免combineLatest永遠不會發生的情況。- 如果有某個Observable已經complete,
combineLatest會保留它的最後一個element,然後繼續結合更新的element。 - 直到最後一個Observable complete,
combineLatest才會complete。 !
zip
zipwait until each if the inner Ovservables emits a new value.
Triggers
withLastestFrom
withLatestFromis useful in all situations where you want the current(latest) value emittted frim an Observable, but only when a particular trigger occurs.
sample
- Just like
withLastFrom. But each time the trygger Ivsercable emits a value,sampleemits the latest value from the "other" Obervable, but only if it arrived since the last "tick". You can combinewithLastFromanddistinctUntilChangedto do the same behavior ofsample.
exampleOf("sample") {
val subscriptions = CompositeDisposable()
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
editText.sample(button)
.subscribe {
println(it)
}.addTo(subscriptions)
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit) <- button emits twice, but editText only emit last value
}
// Output
--- Example of: sample ---
Paris
Switchs
- ambWith
- Think of
ambas in ambiguous. ambWithconnect to two Observables. And wait any of them who emit element first. If any Observable emit element, another one will be unsubscribed.
- Think of
reduce
reduceaccumulates a summary value.
scan
- Like
reduce, but emit per input value.
4. Time-Based Operators
Buffering
replay
- This operator creates a new sequence that records the last N elements emitted by the source Observable.
replayAll
window
- Difference is that it emits an Observable of the buffered items, instead of emitting an array.
Time-Shifting
delaySubscription
- Delay the time a subscriber starts receiving elements from its subscription.
delay
- This operator subscribes immediateley to the source observable, but delays every emitted element by the specified amount of time.
Timer
Observable.interval
- Produce an infinite Observable sequence of Int values.
Observable.timer
- Specify a "due time" as the time that elapsed between the point of subscription and the first emitted value.
- If the "repeat period" is not assigned, the timer Observable will emit once, the complete.
timeout
- Emit an TimeoutException error event. If not caught, it terminates the sequence.

