Files
Obsidian-Main/20.03. RxKotlin/20200207 - Study RxKotlin.md

912 lines
37 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 剛開始
## 先說說Rx
Rx最早是Microsoft的某個實驗室為了解決asychronous、scalable還有一些app問題而提出來的libraray。大概在2009年的時候提出叫做Reactive Extension for .NET(Rx). 一開始是以add-on的方式安裝在.NET 3.5上,到了.NET 4.0就變成了內建的library。也因為它open source的關係讓其他語言得以將這套概念也移植過去所以現在有RxJS, RxSwift, RxNET, RxScale, RxJava。這些library都致力於在它們的語言來實作出相同的「行為」所以理論上iOS工程師可以和Web工程師用Rx來討論app的邏輯是沒有問題的。
Rx的官網[http://reactivex.io/](http://reactivex.io/)它的logo是一支電鰻Electric eel
![[Rx_Logo_S.png]]
Rx Community
- http://android-united.community/
- https://kotlinlang.slack.com/
什麼是RxJava
> RxJava is a library for composing asynchronous and event-based code using observable sequences and functional style operations, allowing for parameterized execution via schedulers.
> RxJava, in its essence, simplifies developing asynchronous programs by allowing your code to react to new data and process it in a sequential, isolated manner. In other words, RxJava lets you observe sequence of asychronous events in an app and respond to each event accordingly. Examples are taps by a user on the screen and listening for results if asynchronous network calls.
## 再說RxJava
RxJava是一個實作Rx的framework。
RxJava與其他的Rx library提供了asynchronous與event-based的解決辦法
而Asychronous code跟Sychronous code的差異
Sychronous code按照字面上的意思執行每一次的結果都相同。
Asychronous code則是在必要的時候才被使用每一次執行的「狀態」不盡相同。也就是沒辦法控制其順序與時間。
### Asychronous programming的詞彙
#### 1. State
- State指的是我們程式所儲存的資料與程式自身行為互動所產生的狀態。
-
#### 2. Imperative programming
- Imperative programming指令式程式設計是用一連串的命令或描述來改變程式的狀態。如下面的code
```
setupUI()
bindClickListeners()
createAdapter()
listenForChanges()
```
這些code可能有一些相關的邏輯但是字面上看不出來即使互相調換可能會造成錯誤但也可能不會。
#### 3. Side effect
- Side effect指的是「一段程式修改了它本身區域外的狀態」譬如說一個處理event的function它除了處理event之外也改變的UI上所顯示的文字。
- Side effect並不總是不好的我們的程式就是要對某些東西做出改變完全無法改變任何東西的程式是沒有用的。
RxJava試著用接下來的2個概念來解決剛剛提到的3個概念上的問題
#### 4. Declarative code
- 又叫Fucntional programmingFucntional programming不產生任何side effect。
- Declarative code定義的是行為。
- RxJava試著在Declarative code和Imperative programming取一個平衡點它定義行為然後依順序執行。
#### 5. Reactive systems
Reactive systems通常有幾個特性
- Reponseive保持UI在最新狀態
- Resilient每個行為都是獨立定義的而且有辦法靈活的處理錯誤。
- Elastic程式的十座可以處理不同的工作量
- Message driven每個元件使用Message driven訊息驅動的方式來互相溝通並改進可用性與獨立性解開decouple生命週期與實作的關聯。
### Rx的三大組成
#### 1. Observables
`Observable<T>`是Rx的基礎之一Observable允許觀察者觀察它並接收它發出來的資料。
##### Observables 的基礎event
Observable會以3種事件event來發出資料
1. **next****next** event會伴隨著一筆資料這也是觀察者用來接收資料的event。
2. **complete****complete** event表示Observable已經「成功的」結束了它的生命週期在**complete** event之後觀察者不會再收到任何**next** event。
3. **error****error** event表示Observable在發生錯誤的情況下結束它的生命週期。跟**complete** event依樣後續不會再有任何**next** event。
一個Observable用next所發出來的一連串資料我們稱為"sequence"。sequence可以分為兩種
1. Finite sequnece
想像你要下載一個檔案我們的code大概是這個樣子
```kotlin
API.download(file = "http://www...")
.subscribeBy(
onNext = {
// Handle downloading here
},
onComplete = {
// Handle download finish here
},
onError = {
// Handle error here
},
)
```
`API.download()`會產生一個Obervable然後我們藉由`subscribeBy`來訂閱他,並加入我們的處理程序,我們在`onNext`裡面處理接收到的檔案buffer在`onComplete`裡面了解到檔案已經完成下載,可以做一些後續的處理,`onError`則是發生了某些錯誤,需要重來或是通知使用者之類。
2. Infinite sequence
Switch button就是一個例子我們要處理switch button的code會是這樣
```kotlin
switch.checkdChanges()
.subscribeBy(
onNext = { isOn ->
if (isOn) {
// Handle on here
} else {
// Handle off here
}
}
)
```
可以看到這一段`subscribeBy()`裡面並沒有`onComplete`跟`onError`因為switch button根本就不會產生這兩種event。
#### 2. Operators
Operators用來處理Observable所發出來的資料可能是過濾或者做一些轉換或其他操作。再以switch button做例子下面的code可以把switch button的狀態做幾個改變
1. 我們只想收到on的狀態。
2. 把on的狀態轉為一個字串"We've been toggled on!"。
```kotlin
switch.checkdChanges()
.filter { it == true }
.map { "We've been toggled on!" }
.subscribeBy(
onNext = { message ->
updateTextView(message)
}
)
```
#### 3. Schedulers
Scheduler可以想像成threadRxJava已經內建了好幾種scheduler而且應該可以適用於大部分的情形。
例如IO scheduler可以讓你的檔案下載在背景執行`TeampolineScheduler`可以讓你的程式同時執行, `ComputationScheduler`可以讓你將程式分配給不同的thread來處理需要大量運算的資料。
RxJava是一個很獨立的library所以有2個library可以跟RxJava一起合作
1. RxAndroid提供Android Looper class跟RxJava的scheduler之間的橋接管道。
2. RxBinding用來把UI的click listen之類的callback轉變為Observable的`subscribeBy`。
# 安裝
在`build.gradle`裡的`depedencies`區域加入:
```
implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation "io.reactivex.rxjava3:rxkotlin:3.0.0"
implementation "io.reactivex.rxjava3:rxandroid:3.0.0"
```
# Observable
Standard Observable has three types of event:
1. next
2. complete
3. error
Obervable很適合用marble diagram來表示
![[Pasted image 20210120150947.png]]
3個event解釋如下
1. `onNext()``onNext()` event會伴隨著一筆資料這也是觀察者用來接收資料的event。
2. `onComplete()``onComplete()` event表示Observable已經「成功的」結束了它的生命週期在`onComplete()` event之後觀察者不會再收到任何`onNext()` event。
3. `onError()``onError()` event表示Observable在發生錯誤的情況下結束它的生命週期。跟`onComplete()` event依樣後續不會再有任何`onNext()` event。
另外要注意一個Observable在沒有被訂閱的情況下「**是不會發送任何event的**」。
A example of usage of standard Observable:
```kotlin
API.download("http://...")
.subscribeBy(
onNext = { /* do something */ },
onComplete = { /* do something */ },
onError = { /* do something */ },
)
```
## 建立`Observable`的方法
### 1. `just`
```kotlin
val observable = Observable.just(1, 2, 3)
```
變數observable的內容設為1個"1、2、3"三個數,型別會是`Observable<Int!>!`。
如果使用了onNext來發送event的話將會依序發送1、2、3。
但如果是:
```kotlin
val observable = Observable.just(listOf(1, 2, 3))
```
變數observable的內容會是一個list這個list的內容是"1、2、3"。型別是`Observable<List<Int>!>!`。
如果使用了onNext來發送event的話將發送一個包含1、2、3的list。
### 2. `fromIterable`
用來將list的內容轉變為一個一個單獨的element給Observable。
```kotlin
val observable = Observable.fromIterable(listOf(2, 3, 1))
```
變數observable的型別會是`Observable<Int!>!`,而不是`Observable<List<Int>!>!`。
### 3. `empty`
建立一個「空的」Observable可以用來表示一個馬上就會結束的事情或是不包含任何東西的情況。
```kotlin
val observable = Observable.empty<Unit>()
observable.subscribeBy(
onNext = { println(it) },
onComplete = { println("Completed") }
)
```
用`empty()`所建立的observable只會發出`onComplete()` event所以上面的`onNext()` event永遠不會發生。
還有Observable所包含的element一定要有一個型別而且不可以是null所以上面的`empty()`必須明白的寫出`Unit`型別:`empty<Unit>() `。
### 4. `never`
建立一個不會發出任何event的observable。
### 5. `range`
產生一個範圍的數列,參數型別必須是整數(`Int`)。
```kotlin
val observable = Observable.range(1, 10)
```
上例的`onNext()`會依序發送1~10的數字出來。
### 6. `create`
用來定義自己的event發送方法。
範例:
```kotlin
val observable = Observable.create<String> { emitter ->
emitter.onNext("A")
emitter.onNext("C")
emitter.onNext("B")
emitter.onComplete()
}
val subscription = observable.subscribeBy(
onNext = { println("Received: $it") },
onComplete = { println("Completed") },
onError = { println("Completed") }
)
```
`create`必須帶入要發送的型別,例如`Int`、`String`或是任何class此例中是`create<String>`表示會送出的element是`String`型別。
然後`create`則是發送的實作,範例是會發送"A" -> "C" -> "B",然後用`onComplete`來結束。
注意要是observable沒有`onComplete`或是`onError`,然後`Disposable`(也就是訂閱者)也沒有呼叫`dispose()`則會造成memory leak。
### 7. `defer`
`defer`會建立一個Observable factory每一次呼叫這個factory都會產生一個新的Observable。`defer`只有一個參數就是我們要「製造」Observable的方法
```kotlin
var flip = false
val factory: Observable<Int> = Observable.defer {
flip = !flip
if (flip) {
Observable.just(1, 2, 3)
} else {
Observable.just(4, 5, 6)
}
}
```
`defer`後面的lambda就是我們要「製造」Observable的方法。當`flip`是`true`的時候,我們產生`Observable.just(1, 2, 3)`,反之則產生`Observable.just(4, 5, 6)`。Observable裡面所帶的element都是整數這也是為什麼factory的型別是`Observable<Int>`。
接下來訂閱這個factory
```kotlin
for (i in 0..3) {
val subscription = factory.subscribe {
println("Factory out: $it")
}
disposables.add(subscription)
}
disposables.dispose()
```
上面的例子產生了4個Observable。依照flip的值來產生不一樣內容的Observable。
How to subscrible a Observable
## 訂閱`Observable`的方法
1. `observable.subscrible()`
2. `observable.subscribleBy()`
Remember to release the resource. Call `disposable()` if you don't need Observable anymore. Or use `CompositeDisposable()` to collect all Disposable and release them.
## 特殊的Observable
### 1. `Single`
`Single`只有`onSuccess`跟`onError`兩種event。在發出`onSuccess`或是`onError`之後,`Single`就結束了。
譬如說讀取檔案,只會有讀取成功跟讀取失敗兩種情況,下面的範例讀取一個檔案,要是檔案不存在就發送`onError()`,反之就發送`onSuccess()`。
```kotlin
val subscriptions = CompositeDisposable()
fun loadText(filename: String): Single<String> {
return Single.create create@{ emitter ->
val file = File(filename)
if (!file.exists()) {
emitter.onError(FileNotFoundException("Can't find $filename"))
return@create
}
val contents = file.readText(Charsets.UTF_8)
emitter.onSuccess(contents)
}
}
// Use the single observable
val subscription = loadText("Copyright.txt")
.subscribeBy(
onSuccess = { println("Success read: $it") },
onError = { println("Error: $it") }
)
subscriptions.add(subscription)
```
`loadText()`這個function會返回`Single<String>`物件,要是讀取檔案成功,就把檔案內容用`onSuccess()`發送出來:
```kotlin
val contents = file.readText(Charsets.UTF_8)
emitter.onSuccess(contents)
```
要是檔案不存在,就發出`onError()`
```kotlin
emitter.onError(FileNotFoundException("Can't find $filename"))
```
### 2. `Completable`
`Completable`只有`onCompleted`跟`onError`兩種event。跟`Single`一樣,在發出`onCompleted`或是`onError`之後,`Completable`就結束了。
### 3. `Maybe`
`Maybe`是`Single`跟`Completable`的混合,他有`onSuccess(value)`、`onCompleted`跟`onError`三種event。`Maybe`只會發出這三種的其中一種event然後就結束了。
## 停止訂閱或是結束一個`Observable`
### 使用`Disposable.dispose()`
每一次呼叫`observable.subscrible()`或是`observable.subscribleBy()`都會回傳一個`Disposable`物件當我們不再需要訂閱一個Observable的時候我們必須呼叫`dispose()`停止訂閱:
```kotlin
val alphaSequnce = Observable.just("A", "B", "C")
val subscription = alphaSequece.subscribe {
println(it)
}
subscription.dispose()
```
### 使用`CompositeDisposable.dispose()`
對每一個`Disposable`物件在停止訂閱之後都要呼叫一次`dispose()`是很煩人的RxJava提供了一個`CompositeDisposable` class。它可以收納所有的`Disposable`物件,然後一次停止:
```kotlin
val subscriptions = CompositeDisposable()
val subscriptionNumbers = Observable.just(1, 2, 3).subscribe {
println("Numbers: $it")
}
val subscriptionAlphabets = Observable.just("A", "B", "C").subscribe {
println("Alphabets: $it")
}
subscriptions.add(subscriptionNumbers)
subscriptions.add(subscriptionAlphabets)
subscriptions.dispose() <-- subscriptionNumbers 與 subscriptionAlphabets 都會一起呼叫dispose()
```
忘記呼叫`dispose()`可能會造成memory leak。
# Subjects
Observable必須在建立的時候就指定好資料之後沒辦法再新增資料。而Subject可以在建立資料之後再新增資料Subject也會將新增的資料再馬上轉發給它的訂閱者。
## 1. `PublishSubject`
`PublishSubject`剛開始是沒有任何資料的,它也只會將最新的資料發送給它的訂閱者。另外,要是`PublishSubject`本身結束了(已經送出了`onComplete` event那麼新的訂閱者將不會收到任何資料但是會收到`onComplete` event。
```kotlin
val publishSubject = PublishSubject.create<Int>()
publishSubject.onNext(0)
val subscriptionOne = publishSubject.subscribe {
println(it)
}
publishSubject.onNext(1)
publishSubject.onNext(2)
val subscriptionTwo = publishSubject.subscribe {
println("2: $it")
}
publishSubject.onNext(3)
subscriptionOne.dispose()
publishSubject.onNext(4)
publishSubject.onComplete()
publishSubject.onNext(5)
subscriptionTwo.dispose()
val subscriptionThree = publishSubject.subscribeBy(
onNext = { println("3: $it") },
onComplete = { println("3: Completed") }
)
```
上例中的`subscriptionThree`只會收到`onComplete` event也就是只會印出`"3: Completed"`。
## 2. `BehaviorSubject`
行為跟`PublishSubject`類似,但是`BehaviorSubject`會發送「最後一筆資料」給新的訂閱者。如果`BehaviorSubject`最後的event是`onError`,那麼新的訂閱者也會收到`onError` event。例
```kotlin
val subscriptions = CompositeDisposable()
val behaviorSubject = BehaviorSubject.createDefault("Initial value")
behaviorSubject.onNext("X")
val subscriptionOne = behaviorSubject.subscribeBy(
onNext = { println("1: $it") },
onError = { println("1: ERROR, $it") }
)
behaviorSubject.onError(RuntimeException("Error!"))
behaviorSubject.onNext("Y") // <-- 不會有人收到因為已經被onError給terminate了
subscriptions.add(behaviorSubject.subscribeBy(
onNext = { println("2: $it") },
onError = { println("2: $it") }
))
```
另外,可以直接取得`BehaviorSubject`目前的值,以上例來說,只要用`behaviorSubject.value`就可以這方法可以很方便的在Rx與非Rx的程式中交換資料。
例子中是用static method `BehaviorSubject.createDefault()`來建立一個有初始值的`BehaviorSubject`,當然也可以跟`PublishSubject`一樣,用`BehaviorSubject.create()`來建立。
## 3. `ReplaySubject`:
`BehaviorSubject`會發送會後一筆資料,`ReplaySubject`就是發送最後n筆資料。我們可以用`ReplaySubject.createWithSize()`這個static method來建立一個`ReplaySubject`。例:
```kotlin
val replaySubject = ReplaySubject.createWithSize<String>(2)
```
變數`replaySubject`的buffer容量是2型別是`String`。
## 4. `AsyncSubject`
`AsyncSubject`的行為比較特別,`AsyncSubject`只會結束的時候,同時發出最後一筆資料。也就是說,即便一直提供資料給`AsyncSubject`,它也不會發出任何`onNext` event給它的訂閱者直到它收到`onComplete`的時候,它才會同時發出最後一筆`onNext`與`onComplete`給它的訂閱者。
## 5. `RxRelay`
`RxRelay`永遠不會發出`onComplete`或是`onError`。下面例子建立了一個`PublishRelay`
```
val publishRelay = PublishRelay.create<Int>()
```
要使用`RxRelay` library必須在build.gradle裡面加入
```
implementation "com.jakewharton.rxrelay3:rxrelay:3.0.0"
```
# Operators
## 1. Filtering Operators
### `ignoreElement`
`ignoreElement()`會忽略掉由[[20200207 - Study RxKotlin#Subjects]]丟出來的**next** event訂閱者只會收到`onCompleted`跟`onError`這兩種event也就是讓Subject退化成[[20200207 - Study RxKotlin#2 Completable]]。
例:
```kotlin
val subscriptions = CompositeDisposable()
val strikes = PublishSubject.create<String>()
subscriptions.add(
strikes.ignoreElements()
.subscribeBy {
println("Done") <-- 只會收到onComplete跟onError
}
)
```
### `elementAt`
`elementAt()`只會處理「第n個」**next** eventn之前跟n之後的都會被忽略。例如
```kotlin
val subscriptions = CompositeDisposable()
val strikes = PublishSubject.create<String>()
subscriptions.add(
strikes.elementAt(2) <-- 只要收第2個
.subscribeBy {
println("Get $it")
}
)
strikes.onNext("A") <-- 第0個
strikes.onNext("B") <-- 第1個
strikes.onNext("C") <-- 第2個
strikes.onNext("D") <-- 第3個
```
上面例子只要收「第2個next event」所以只會收到"**C**"。這也是一個`onSuccess` event。
`elementAt()`也等於是把Subject退化成[[20200207 - Study RxKotlin#3 Maybe]]。
要是Subject的「第n個」還沒收到就結束了那就是收到`onComplete` event。
### `filter`
`filter()`接收一個lambda函數每一個next event所帶的element都必須經過這個函數的「驗證」只有驗證結果為`true`的時候才會pass給訂閱者。例
```kotlin
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.filter {
it > 5 <-- 數值必須大於5才可以pass給訂閱者
}
.subscribe {
print("Get number: $it\n")
}
)
```
所以上面的結果會收到6、7、8、9、10。
### `skip`
忽略「前n個」next event。
```kotlin
Observer.just(1, 2, 3, 4 ,5)
.skip(3) <-- 忽略前3個
.subscribe {
println("it")
}
// Output 4, 5
```
### `skipWhile`
`skipWhile`也是用一個lambda來當作通過條件跟[[20200207 - Study RxKotlin#filter]]類似。但不像`filter`會去檢查「每一個」進來的element`skipWhile`是「當lambda檢查到第一個`false`的時候,後面全部通過」。
另一個跟`filter`不同的是,`skipWhile`是在檢查為`true`把next event忽略掉檢查到`false`的時候開始放行。
例如,我們要收集字串,但我們要當字串是"start"的時候才開始收集字串,例:
```kotlin
val subscriptions \= CompositeDisposable()
subscriptions.add(
Observable.just("1", "2", "3", "Start", "1", "3", "2")
.skipWhile { it != "Start" } <-- 比對為false開始放行
.subscribe {
println("Get $it")
}
)
// Output
Get Start
Get 1
Get 3
Get 2
```
### `skipUntil`
與前面的skip operator不同`skipUntil`不是用lambda來決定skip的條件而是依賴於「另一個subject」`skipUntil`會一直忽略直到「另一個subject」發出`onNext` event。例
```kotlin
val subject = PublishSubject.create<String>()
val trigger = PublishSubject.create<String>()
subject
.skipUntil(trigger)
.subscribe {
println("it")
}
```
In this code you'll get nothing, until `trigger` sent an `onNext()` event.
Example:
```kotlin
subject.onNext("A") // Ignored
subject.onNext("B") // Ignored
trigger.onNext("1") // TRIGGER!
subject.onNext("C") // send out
```
![[Pasted image 20210202155003.png]]
### `take`
`take`跟[[20200207 - Study RxKotlin#skip]]相反,`take`是接收「前n個」訊息之後全部忽略。
### `takeWhile`
`takeWhile`是用lambda當判斷條件當判斷為`true`的時候放行,一旦判斷為`false`,之後的所有訊息都會被忽略。
```kotlin
exampleOf("takeWhile") {
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1))
.takeWhile {
it < 5
}
.subscribe {
println("Get $it")
}
)
}
```
上例中判斷到5的時候`it < 5`為`false`所以之後的都會忽略即使最後那個1也是一樣被忽略。
`takeWhile`跟[[20200207 - Study RxKotlin#skipWhile]]一樣,但是行為相反。
### `takeUntil`
跟[[20200207 - Study RxKotlin#skipUntil]]相反的行為會一直接收訊息直到他依賴的subject發出訊息後停止。
### `distinctUntilChanged`
`distinctUntilChanged`用來過濾「連續且相同」的訊息,例如連續的"Hi"那麼就只會收到第1個"Hi",之後的都不會收到。但是一旦收到的訊息改變了,再次收到以前發過的訊息,只要它沒有跟前一筆相同,那麼就還是會接收。例:
```kotlin
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just(5, 5, 3, 3, 1)
.distinctUntilChanged()
.subscribe {
println(it)
}
)
// Output
5
3
1
```
`distinctUntilChanged`預設用class的`equal()` method來判斷當然我們也可以給它一個lambda來當客製我們的條件被lambda判斷為`true`的話,該訊息就會被忽略:
```kotlin
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just("ABC", "CCD", "FAG")
.distinctUntilChanged { first, second ->
first[second.length - 1] == second[0]
}
.subscribe {
println("Get $it")
}
)
```
上例中我們希望「第2個字串的開頭字母要是跟第1個字串的結尾字母一樣的話那麼就不要顯示」。第1筆"ABC"一定會接收第2筆"CCD"則會被忽略第3筆"FAG"會被接收。
## 2. Transforming Operators
### 1. toList
`toList`可以把每一個單獨從Observalbe發出來的元素變成一個list
```kotlin
val subscriptions = CompositeDisposable()
val items = Observable.just("A", "B", "C")
subscriptions.add(
items.toList()
.subscribeBy {
println(it)
}
)
```
原本會單獨發出的"A", "B", "C"現在變成只翠發出一個List其內容是`["A", "B", "C"]`。
### 2. map
`map`跟Kotlin的`map`行為上差不多只是Kotlin的`map`是作用在List上而RxJava的`map`是作用在Observable上。`map`根據你提供的lambda函式來對每一個element做轉換如下例將每一個羅馬數字轉換為阿拉伯數字
```kotlin
val subscriptions = CompositeDisposable()
subscriptions.add(
Observable.just("M", "C", "V", "I")
.map {
it.romanNumeralIntValue()
}
.subscribeBy {
println(it)
}
)
```
注意到了嗎?`map`用來轉換Observable所包含的item型別上例中Observable的item本來是一個字串`String`),被`map`轉換為數字(`Int`)。
### 3. flatMap
`flatMap`用來處理「Observable發出來的Observable」並且「記錄每一個變化」。用例子比較好說明
```kotlin
class Student(val score: BehaviorSubject<Int>)
val subscriptions = CompositeDisposable()
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = PublishSubject.create<Student>()
student
.flatMap {
it.score
}
.subscribeBy {
println(it)
}
.addTo(subscriptions)
student.onNext(ryan) <-- 1
ryan.score.onNext(85) <-- 2
student.onNext(charlotte) <-- 3
ryan.score.onNext(95) <-- 4
charlotte.score.onNext(100) <-- 5
```
我們有一個叫做student的`PublishSubject`另外有兩個Student class分別是ryan與charlotte而這個Student class有一個member叫做scorescore的類別是`BehaviorSubject<Int>`。我們的`flatMap` lambda不做任何轉換直接bypass分數。
1. student先選擇ryan來發出第一個onNext eventryan原本的分數是80所以我們會收到80。
2. ryan變更為85所以我們會收到85。
3. student選擇了charlotte並發出一個onNext eventcharlotte原本的分數是90所以我們會收到90。
4. ryan變更為95所以我們會收到95。
5. charlotte變更為100所以我們會收到100。
結果會收到80、85、90、95、100。
ryan跟charlotte都是獨立的Observable但透過`flatMap`我們可以把它們的值(以及後續的變化)變成一連串的數值,這就是`flat`的意思。
![[rxJava_flatMap.png]]
### 4. switchMap
`switchMap`跟`flatMap`類似也是處理「Observable發出來的Observable」但是差別在於`switchMap`一但切換到新的Observable之後上一個Observale的訊息就部會收到了以`flatMap`的例子來說,在`student.onNext(charlotte)`這一行之後ryan的改變就不會收到了。例
```kotlin
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = PublishSubject.create<Student>()
student
.switchMap {
it.score
}
.subscribe {
println(it)
}
student.onNext(ryan)
ryan.score.onNext(85)
student.onNext(charlotte)
ryan.score.onNext(95)
charlotte.score.onNext(100)
```
結果會收到80、85、95、100。`ryan.score.onNext(95)`這一行的95不會收到。
`switchMap`適合用在會「改變興趣」的場景例如說原本是要持續收到台北氣溫的改變接著使用者把地點改到高雄那我們就會變成持續收到高雄的溫度變化而不是台北的又或者說你會隨著使用的的輸入持續的搜尋結果例如使用者依序輸入k、o、t、l、i、n每輸入一個字母我們就搜尋一次但我們只關注最後一個字串搜尋不在意之前的搜尋結果。
### 5. materialize
`materialize`能將Observable的值包裝成一個`Notification`,回到[[20200207 - Study RxKotlin#4 switchMap]]的例子,如果任何一個學生發出了`onError`的訊息,那麼連`student`本身都會因為這個Exception而中斷所以即使切到了charlotte我們也收不到charlotte的訊息了。`materialize`可以將`onError`包裝成一個`Notification`讓exception留在ryan本身而不會影響到上面的student。
```kotlin
val subscriptions = CompositeDisposable()
val ryan = Student(BehaviorSubject.createDefault(80))
val charlotte = Student(BehaviorSubject.createDefault(90))
val student = BehaviorSubject.createDefault<Student>(ryan)
// 1
val studentScore = student.switchMap { it.score.materialize() } <-- HERE!
// 2
subscriptions.add(
studentScore
.subscribe {
println(it)
}
)
// 3
ryan.score.onNext(85)
ryan.score.onError(RuntimeException("Error!"))
ryan.score.onNext(90)
// 4
student.onNext(charlotte)
```
![[rxkotlin_materialize.png]]
### 6. dematerialize
`dematerialize`用來反解`materialize`所包裝的東西,例如上例中,會將`materialize`所包裝出來的`Observable<Notification<Int!>!>!`反解為`Observable<Int>!`,例:
```kotlin
subscriptions.add(
studentScore
.filter {
if (it.error != null) {
println("Got error: ${it.error}")
false
} else {
true
}
}
.dematerialize { it }
.subscribe {
println(it)
}
)
```
要注意的是如果發生Exception的話直接`println`還是會產生Exception所以需要用`filter`來把error給濾掉。
![[rxkotlin_dematerialize.png]]
在Subject所發出的element仍然是Subject的時候如果element發生errorException會導致上層的Subject也跟著停止`materialize`/`dematerialize`可以用來包裝element讓element所發出的東西都變成`Notification`這樣就部會影響上層的Subject了。
## 3. Combining Operators
### startWith
用來在Observable本身所帶的item前面再加上其他item。實際的有`startWithIterable()`與`startWithItem()`。
例:
```kotlin
val subscriptions = CompositeDisposable()
val numbers = Observable.just(3, 4, 5)
val startWith = numbers.startWithIterable(listOf(1, 2))
startWith.subscribe {
println(it)
}.addTo(subscriptions)
```
### concat
`concat`是一個static method用來合併2個Observable。`concat`會先等第一個Observable結束然後再等待第二個之後把它們合併起來。
```kotlin
val subscriptions = CompositeDisposable()
val first = Observable.just(1, 2, 3)
val second = Observable.just(3, 4, 5)
Observable.concat(first, second)
.subscribe {
println(it)
}
.addTo(subscriptions)
```
### concatWith
跟[[20200207 - Study RxKotlin#concat]]一樣,但是`concatWith`是一個member function而不是一個static method。`concatWith`一樣會先等自己結束然後再等第二個Obervable當參數的那一個結束之後再合併。
```kotlin
val subscriptions = CompositeDisposable()
val first = Observable.just(1, 2, 3)
val second = Observable.just(3, 4, 5)
first.concatWith(second)
.subscribe {
println(it)
}
.addTo(subscriptions)
```
> ## 注意
> 要被合併的兩個Observable類型必須要一樣不可以一個是`Obsrvable<String>`而另一個是`Observable<Int>`compiler會報錯喔。
### concatMap
`concatMap`接受一個lambda函示並回傳另一個Observable序列`concatMap`會保證Observable的順序。
Given multiple Observable, and map each Observable to a lambda function. And make sure the sequence of given Observable list.
### merge
`merge`是一個static function。
`merge`會按照接收的順序把element合併起來
```kotlin
val subscriptions = CompositeDisposable()
val odd = PublishSubject.create<Int\>()
val even = PublishSubject.create<Int\>()
Observable.merge(odd, even)
.subscribe {
println(it)
}
.addTo(subscriptions)
odd.onNext(1)
even.onNext(2)
odd.onNext(3)
even.onNext(4)
odd.onNext(5)
even.onNext(6)
```
odd與even兩個交互發出elemetmerge依順序接收而不是像[[20200207 - Study RxKotlin#concat]]是依照Observable的順序。
`merge` complete的時機點定義如下
- 當來源的Observable與內部的Observable都complete的時候merge本身也會發出complete。
- 內部Observable結束的順序跟接收的順序沒有關係。一律按照接收點
- 如果有任何Observable發生error`merge`會轉發這個error然後自己發生terminate。
Q:
- What's the different between `flatMap()`? -> 很大的差別,`flatMap()` 有map的功能。
### mergeWith
就像`concatWith()`與`concat()`的關係,`mergeWith()`跟`merge`也是一樣的關係。
`mergeWith()`是一個member function必須由某個Observable instance來呼叫。
### combineLatest
combineLatest是`Observables`的static funtion注意不是`Observable`)。
combineLatest只會接收2個Observalbe的「最後一個」elements然後交由你所提供的lambda來處置
```kotlin
val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()
Observables.combineLatest(left, right) { leftString, rightString ->
"$leftString, $rightString"
}.subscribe {
println(it)
}.addTo(subscriptions)
left.onNext("Hello")
right.onNext("World")
left.onNext("It's nice to")
right.onNext("be here")
left.onNext("Actually, it's super great to")
```
重點:
1. 在上例中是直接結合2個字串但是其實可以是任何用途。
2. 在實務中,`combineLatest`可以用來結合2個不同型別的Observable然後再回傳另一個不同型別的Observable。`combineLatest`回傳的Observable型別由lambda決定。
3. `combineLatest`必須在「每一個」Observable都發出element之後才會動作。如果不確定Observable是否會發出element可以使用[[20200207 - Study RxKotlin#startWith]]來讓Observable有一個初始值這樣可以避免`combineLatest`永遠不會發生的情況。
4. 如果有某個Observable已經complete`combineLatest`會保留它的最後一個element然後繼續結合更新的element。
5. 直到最後一個Observable complete`combineLatest`才會complete。
![[combineLatest.png]]
### zip
- `zip` wait until each if the inner Ovservables emits a new value.
### Triggers
#### withLastestFrom
- `withLatestFrom` is useful in all situations where you want the current(latest) value emittted frim an Observable, but only when a particular trigger occurs.
#### sample
- Just like `withLastFrom`. But each time the trygger Ivsercable emits a value, `sample` emits the latest value from the "other" Obervable, but only if it arrived since the last "tick". You can combine `withLastFrom` and `distinctUntilChanged` to do the same behavior of `sample`.
```
exampleOf("sample") {
val subscriptions = CompositeDisposable()
val button = PublishSubject.create<Unit>()
val editText = PublishSubject.create<String>()
editText.sample(button)
.subscribe {
println(it)
}.addTo(subscriptions)
editText.onNext("Par")
editText.onNext("Pari")
editText.onNext("Paris")
button.onNext(Unit)
button.onNext(Unit) <- button emits twice, but editText only emit last value
}
// Output
--- Example of: sample ---
Paris
```
### Switchs
- ambWith
- Think of `amb` as in ambiguous.
- `ambWith` connect to two Observables. And wait any of them who emit element first. If any Observable emit element, another one will be unsubscribed.
### reduce
- `reduce` accumulates a summary value.
### scan
- Like `reduce`, but emit per input value.
## 4. Time-Based Operators
### Buffering
#### replay
- This operator creates a new sequence that records the last N elements emitted by the source Observable.
#### replayAll
#### window
- Difference is that it emits an Observable of the buffered items, instead of emitting an array.
### Time-Shifting
#### delaySubscription
- Delay the time a subscriber starts receiving elements from its subscription.
#### delay
- This operator subscribes immediateley to the source observable, but delays every emitted element by the specified amount of time.
### Timer
#### Observable.interval
- Produce an infinite Observable sequence of Int values.
#### Observable.timer
- Specify a "due time" as the time that elapsed between the point of subscription and the first emitted value.
- If the "repeat period" is not assigned, the timer Observable will emit once, the complete.
#### timeout
- Emit an TimeoutException error event. If not caught, it terminates the sequence.
## 5. Explore Operators
# 參考資料:
- [RxMarbles: Interactive diagrams of Rx Observables](https://rxmarbles.com/#delayWhen)