# 剛開始 ## 先說說Rx Rx最早是Microsoft的某個實驗室為了解決asychronous、scalable還有一些app問題而提出來的libraray。大概在2009年的時候提出,叫做Reactive Extension for .NET(Rx). 一開始是以add-on的方式安裝在.NET 3.5上,到了.NET 4.0就變成了內建的library。也因為它open source的關係,讓其他語言得以將這套概念也移植過去,所以現在有RxJS, RxSwift, RxNET, RxScale, RxJava。這些library都致力於在它們的語言來實作出相同的「行為」,所以理論上iOS工程師可以和Web工程師用Rx來討論app的邏輯是沒有問題的。 Rx的官網:[http://reactivex.io/](http://reactivex.io/),它的logo是一支電鰻(Electric eel): ![[Rx_Logo_S.png]] Rx Community - http://android-united.community/ - https://kotlinlang.slack.com/ 什麼是RxJava? > RxJava is a library for composing asynchronous and event-based code using observable sequences and functional style operations, allowing for parameterized execution via schedulers. > RxJava, in its essence, simplifies developing asynchronous programs by allowing your code to react to new data and process it in a sequential, isolated manner. In other words, RxJava lets you observe sequence of asychronous events in an app and respond to each event accordingly. Examples are taps by a user on the screen and listening for results if asynchronous network calls. ## 再說RxJava RxJava是一個實作Rx的framework。 RxJava與其他的Rx library提供了asynchronous與event-based的解決辦法 而Asychronous code跟Sychronous code的差異: Sychronous code按照字面上的意思執行,每一次的結果都相同。 Asychronous code則是在必要的時候才被使用,每一次執行的「狀態」不盡相同。也就是沒辦法控制其順序與時間。 ### Asychronous programming的詞彙 #### 1. State - State指的是我們程式所儲存的資料與程式自身行為互動所產生的狀態。 - #### 2. Imperative programming - Imperative programming(指令式程式設計)是用一連串的命令或描述來改變程式的狀態。如下面的code: ``` setupUI() bindClickListeners() createAdapter() listenForChanges() ``` 這些code可能有一些相關的邏輯,但是字面上看不出來,即使互相調換可能會造成錯誤,但也可能不會。 #### 3. Side effect - Side effect指的是「一段程式修改了它本身區域外的狀態」,譬如說,一個處理event的function它除了處理event之外,也改變的UI上所顯示的文字。 - Side effect並不總是不好的,我們的程式就是要對某些東西做出改變,完全無法改變任何東西的程式是沒有用的。 RxJava試著用接下來的2個概念來解決剛剛提到的3個概念上的問題 #### 4. Declarative code - 又叫Fucntional programming,Fucntional programming不產生任何side effect。 - Declarative code定義的是行為。 - RxJava試著在Declarative code和Imperative programming取一個平衡點,它定義行為,然後依順序執行。 #### 5. Reactive systems Reactive systems通常有幾個特性: - Reponseive:保持UI在最新狀態 - Resilient:每個行為都是獨立定義的,而且有辦法靈活的處理錯誤。 - Elastic:程式的十座可以處理不同的工作量 - Message driven:每個元件使用Message driven(訊息驅動)的方式來互相溝通,並改進可用性與獨立性,解開(decouple)生命週期與實作的關聯。 ### Rx的三大組成 #### 1. Observables `Observable`是Rx的基礎之一,Observable允許觀察者觀察它,並接收它發出來的資料。 ##### Observables 的基礎:event Observable會以3種事件(event)來發出資料: 1. **next**:**next** event會伴隨著一筆資料,這也是觀察者用來接收資料的event。 2. **complete**:**complete** event表示Observable已經「成功的」結束了它的生命週期,在**complete** event之後,觀察者不會再收到任何**next** event。 3. **error**:**error** event表示Observable在發生錯誤的情況下結束它的生命週期。跟**complete** event依樣,後續不會再有任何**next** event。 一個Observable用next所發出來的一連串資料我們稱為"sequence"。sequence可以分為兩種: 1. Finite sequnece: 想像你要下載一個檔案,我們的code大概是這個樣子: ```kotlin API.download(file = "http://www...") .subscribeBy( onNext = { // Handle downloading here }, onComplete = { // Handle download finish here }, onError = { // Handle error here }, ) ``` `API.download()`會產生一個Obervable,然後我們藉由`subscribeBy`來訂閱他,並加入我們的處理程序,我們在`onNext`裡面處理接收到的檔案buffer,在`onComplete`裡面了解到檔案已經完成下載,可以做一些後續的處理,`onError`則是發生了某些錯誤,需要重來或是通知使用者之類。 2. Infinite sequence: Switch button就是一個例子,我們要處理switch button的code會是這樣: ```kotlin switch.checkdChanges() .subscribeBy( onNext = { isOn -> if (isOn) { // Handle on here } else { // Handle off here } } ) ``` 可以看到這一段`subscribeBy()`裡面並沒有`onComplete`跟`onError`,因為switch button根本就不會產生這兩種event。 #### 2. Operators Operators用來處理Observable所發出來的資料,可能是過濾或者做一些轉換,或其他操作。再以switch button做例子,下面的code可以把switch button的狀態做幾個改變: 1. 我們只想收到on的狀態。 2. 把on的狀態轉為一個字串"We've been toggled on!"。 ```kotlin switch.checkdChanges() .filter { it == true } .map { "We've been toggled on!" } .subscribeBy( onNext = { message -> updateTextView(message) } ) ``` #### 3. Schedulers Scheduler可以想像成thread,RxJava已經內建了好幾種scheduler,而且應該可以適用於大部分的情形。 例如IO scheduler可以讓你的檔案下載在背景執行,`TeampolineScheduler`可以讓你的程式同時執行, `ComputationScheduler`可以讓你將程式分配給不同的thread來處理需要大量運算的資料。 RxJava是一個很獨立的library,所以有2個library可以跟RxJava一起合作: 1. RxAndroid:提供Android Looper class跟RxJava的scheduler之間的橋接管道。 2. RxBinding:用來把UI的click listen之類的callback轉變為Observable的`subscribeBy`。 # 安裝 在`build.gradle`裡的`depedencies`區域加入: ``` implementation "io.reactivex.rxjava3:rxjava:3.0.2" implementation "io.reactivex.rxjava3:rxkotlin:3.0.0" implementation "io.reactivex.rxjava3:rxandroid:3.0.0" ``` # Observable Standard Observable has three types of event: 1. next 2. complete 3. error Obervable很適合用marble diagram來表示: ![[Pasted image 20210120150947.png]] 3個event解釋如下: 1. `onNext()`:`onNext()` event會伴隨著一筆資料,這也是觀察者用來接收資料的event。 2. `onComplete()`:`onComplete()` event表示Observable已經「成功的」結束了它的生命週期,在`onComplete()` event之後,觀察者不會再收到任何`onNext()` event。 3. `onError()`:`onError()` event表示Observable在發生錯誤的情況下結束它的生命週期。跟`onComplete()` event依樣,後續不會再有任何`onNext()` event。 另外,要注意:一個Observable在沒有被訂閱的情況下,「**是不會發送任何event的**」。 A example of usage of standard Observable: ```kotlin API.download("http://...") .subscribeBy( onNext = { /* do something */ }, onComplete = { /* do something */ }, onError = { /* do something */ }, ) ``` ## 建立`Observable`的方法 ### 1. `just` ```kotlin val observable = Observable.just(1, 2, 3) ``` 變數observable的內容設為1個"1、2、3"三個數,型別會是`Observable!`。 如果使用了onNext來發送event的話,將會依序發送1、2、3。 但如果是: ```kotlin val observable = Observable.just(listOf(1, 2, 3)) ``` 變數observable的內容會是一個list,這個list的內容是"1、2、3"。型別是`Observable!>!`。 如果使用了onNext來發送event的話,將發送一個包含1、2、3的list。 ### 2. `fromIterable` 用來將list的內容轉變為一個一個單獨的element給Observable。 ```kotlin val observable = Observable.fromIterable(listOf(2, 3, 1)) ``` 變數observable的型別會是`Observable!`,而不是`Observable!>!`。 ### 3. `empty` 建立一個「空的」Observable,可以用來表示一個馬上就會結束的事情,或是不包含任何東西的情況。 ```kotlin val observable = Observable.empty() observable.subscribeBy( onNext = { println(it) }, onComplete = { println("Completed") } ) ``` 用`empty()`所建立的observable只會發出`onComplete()` event,所以上面的`onNext()` event永遠不會發生。 還有,Observable所包含的element一定要有一個型別,而且不可以是null,所以上面的`empty()`必須明白的寫出`Unit`型別:`empty() `。 ### 4. `never` 建立一個不會發出任何event的observable。 ### 5. `range` 產生一個範圍的數列,參數型別必須是整數(`Int`)。 ```kotlin val observable = Observable.range(1, 10) ``` 上例的`onNext()`會依序發送1~10的數字出來。 ### 6. `create` 用來定義自己的event發送方法。 範例: ```kotlin val observable = Observable.create { emitter -> emitter.onNext("A") emitter.onNext("C") emitter.onNext("B") emitter.onComplete() } val subscription = observable.subscribeBy( onNext = { println("Received: $it") }, onComplete = { println("Completed") }, onError = { println("Completed") } ) ``` `create`必須帶入要發送的型別,例如`Int`、`String`或是任何class,此例中是`create`,表示會送出的element是`String`型別。 然後`create`則是發送的實作,範例是會發送"A" -> "C" -> "B",然後用`onComplete`來結束。 注意:要是observable沒有`onComplete`或是`onError`,然後`Disposable`(也就是訂閱者)也沒有呼叫`dispose()`,則會造成memory leak。 ### 7. `defer` `defer`會建立一個Observable factory,每一次呼叫這個factory都會產生一個新的Observable。`defer`只有一個參數,就是我們要「製造」Observable的方法: ```kotlin var flip = false val factory: Observable = Observable.defer { flip = !flip if (flip) { Observable.just(1, 2, 3) } else { Observable.just(4, 5, 6) } } ``` `defer`後面的lambda就是我們要「製造」Observable的方法。當`flip`是`true`的時候,我們產生`Observable.just(1, 2, 3)`,反之則產生`Observable.just(4, 5, 6)`。Observable裡面所帶的element都是整數,這也是為什麼factory的型別是`Observable`。 接下來訂閱這個factory: ```kotlin for (i in 0..3) { val subscription = factory.subscribe { println("Factory out: $it") } disposables.add(subscription) } disposables.dispose() ``` 上面的例子產生了4個Observable。依照flip的值來產生不一樣內容的Observable。 How to subscrible a Observable ## 訂閱`Observable`的方法 1. `observable.subscrible()` 2. `observable.subscribleBy()` Remember to release the resource. Call `disposable()` if you don't need Observable anymore. Or use `CompositeDisposable()` to collect all Disposable and release them. ## 特殊的Observable ### 1. `Single` `Single`只有`onSuccess`跟`onError`兩種event。在發出`onSuccess`或是`onError`之後,`Single`就結束了。 譬如說讀取檔案,只會有讀取成功跟讀取失敗兩種情況,下面的範例讀取一個檔案,要是檔案不存在就發送`onError()`,反之就發送`onSuccess()`。 ```kotlin val subscriptions = CompositeDisposable() fun loadText(filename: String): Single { return Single.create create@{ emitter -> val file = File(filename) if (!file.exists()) { emitter.onError(FileNotFoundException("Can't find $filename")) return@create } val contents = file.readText(Charsets.UTF_8) emitter.onSuccess(contents) } } // Use the single observable val subscription = loadText("Copyright.txt") .subscribeBy( onSuccess = { println("Success read: $it") }, onError = { println("Error: $it") } ) subscriptions.add(subscription) ``` `loadText()`這個function會返回`Single`物件,要是讀取檔案成功,就把檔案內容用`onSuccess()`發送出來: ```kotlin val contents = file.readText(Charsets.UTF_8) emitter.onSuccess(contents) ``` 要是檔案不存在,就發出`onError()`: ```kotlin emitter.onError(FileNotFoundException("Can't find $filename")) ``` ### 2. `Completable` `Completable`只有`onCompleted`跟`onError`兩種event。跟`Single`一樣,在發出`onCompleted`或是`onError`之後,`Completable`就結束了。 ### 3. `Maybe` `Maybe`是`Single`跟`Completable`的混合,他有`onSuccess(value)`、`onCompleted`跟`onError`三種event。`Maybe`只會發出這三種的其中一種event,然後就結束了。 ## 停止訂閱或是結束一個`Observable` ### 使用`Disposable.dispose()` 每一次呼叫`observable.subscrible()`或是`observable.subscribleBy()`都會回傳一個`Disposable`物件,當我們不再需要訂閱一個Observable的時候,我們必須呼叫`dispose()`停止訂閱: ```kotlin val alphaSequnce = Observable.just("A", "B", "C") val subscription = alphaSequece.subscribe { println(it) } subscription.dispose() ``` ### 使用`CompositeDisposable.dispose()` 對每一個`Disposable`物件在停止訂閱之後都要呼叫一次`dispose()`是很煩人的,RxJava提供了一個`CompositeDisposable` class。它可以收納所有的`Disposable`物件,然後一次停止: ```kotlin val subscriptions = CompositeDisposable() val subscriptionNumbers = Observable.just(1, 2, 3).subscribe { println("Numbers: $it") } val subscriptionAlphabets = Observable.just("A", "B", "C").subscribe { println("Alphabets: $it") } subscriptions.add(subscriptionNumbers) subscriptions.add(subscriptionAlphabets) subscriptions.dispose() <-- subscriptionNumbers 與 subscriptionAlphabets 都會一起呼叫dispose() ``` 忘記呼叫`dispose()`可能會造成memory leak。 # Subjects Observable必須在建立的時候就指定好資料,之後沒辦法再新增資料。而Subject可以在建立資料之後,再新增資料,Subject也會將新增的資料再馬上轉發給它的訂閱者。 ## 1. `PublishSubject` `PublishSubject`剛開始是沒有任何資料的,它也只會將最新的資料發送給它的訂閱者。另外,要是`PublishSubject`本身結束了(已經送出了`onComplete` event),那麼新的訂閱者將不會收到任何資料,但是會收到`onComplete` event。 ```kotlin val publishSubject = PublishSubject.create() publishSubject.onNext(0) val subscriptionOne = publishSubject.subscribe { println(it) } publishSubject.onNext(1) publishSubject.onNext(2) val subscriptionTwo = publishSubject.subscribe { println("2: $it") } publishSubject.onNext(3) subscriptionOne.dispose() publishSubject.onNext(4) publishSubject.onComplete() publishSubject.onNext(5) subscriptionTwo.dispose() val subscriptionThree = publishSubject.subscribeBy( onNext = { println("3: $it") }, onComplete = { println("3: Completed") } ) ``` 上例中的`subscriptionThree`只會收到`onComplete` event,也就是只會印出`"3: Completed"`。 ## 2. `BehaviorSubject` 行為跟`PublishSubject`類似,但是`BehaviorSubject`會發送「最後一筆資料」給新的訂閱者。如果`BehaviorSubject`最後的event是`onError`,那麼新的訂閱者也會收到`onError` event。例: ```kotlin val subscriptions = CompositeDisposable() val behaviorSubject = BehaviorSubject.createDefault("Initial value") behaviorSubject.onNext("X") val subscriptionOne = behaviorSubject.subscribeBy( onNext = { println("1: $it") }, onError = { println("1: ERROR, $it") } ) behaviorSubject.onError(RuntimeException("Error!")) behaviorSubject.onNext("Y") // <-- 不會有人收到,因為已經被onError給terminate了 subscriptions.add(behaviorSubject.subscribeBy( onNext = { println("2: $it") }, onError = { println("2: $it") } )) ``` 另外,可以直接取得`BehaviorSubject`目前的值,以上例來說,只要用`behaviorSubject.value`就可以,這方法可以很方便的在Rx與非Rx的程式中交換資料。 例子中是用static method `BehaviorSubject.createDefault()`來建立一個有初始值的`BehaviorSubject`,當然也可以跟`PublishSubject`一樣,用`BehaviorSubject.create()`來建立。 ## 3. `ReplaySubject`: `BehaviorSubject`會發送會後一筆資料,`ReplaySubject`就是發送最後n筆資料。我們可以用`ReplaySubject.createWithSize()`這個static method來建立一個`ReplaySubject`。例: ```kotlin val replaySubject = ReplaySubject.createWithSize(2) ``` 變數`replaySubject`的buffer容量是2,型別是`String`。 ## 4. `AsyncSubject` `AsyncSubject`的行為比較特別,`AsyncSubject`只會結束的時候,同時發出最後一筆資料。也就是說,即便一直提供資料給`AsyncSubject`,它也不會發出任何`onNext` event給它的訂閱者,直到它收到`onComplete`的時候,它才會同時發出最後一筆`onNext`與`onComplete`給它的訂閱者。 ## 5. `RxRelay` `RxRelay`永遠不會發出`onComplete`或是`onError`。下面例子建立了一個`PublishRelay`: ``` val publishRelay = PublishRelay.create() ``` 要使用`RxRelay` library,必須在build.gradle裡面加入: ``` implementation "com.jakewharton.rxrelay3:rxrelay:3.0.0" ``` # Operators ## 1. Filtering Operators ### `ignoreElement` `ignoreElement()`會忽略掉由[[20200207 - Study RxKotlin#Subjects]]丟出來的**next** event,訂閱者只會收到`onCompleted`跟`onError`這兩種event,也就是讓Subject退化成[[20200207 - Study RxKotlin#2 Completable]]。 例: ```kotlin val subscriptions = CompositeDisposable() val strikes = PublishSubject.create() subscriptions.add( strikes.ignoreElements() .subscribeBy { println("Done") <-- 只會收到onComplete跟onError } ) ``` ### `elementAt` `elementAt()`只會處理「第n個」**next** event,n之前跟n之後的都會被忽略。例如: ```kotlin val subscriptions = CompositeDisposable() val strikes = PublishSubject.create() subscriptions.add( strikes.elementAt(2) <-- 只要收第2個 .subscribeBy { println("Get $it") } ) strikes.onNext("A") <-- 第0個 strikes.onNext("B") <-- 第1個 strikes.onNext("C") <-- 第2個 strikes.onNext("D") <-- 第3個 ``` 上面例子只要收「第2個next event」,所以只會收到"**C**"。這也是一個`onSuccess` event。 `elementAt()`也等於是把Subject退化成[[20200207 - Study RxKotlin#3 Maybe]]。 要是Subject的「第n個」還沒收到就結束了,那就是收到`onComplete` event。 ### `filter` `filter()`接收一個lambda函數,每一個next event所帶的element都必須經過這個函數的「驗證」,只有驗證結果為`true`的時候,才會pass給訂閱者。例: ```kotlin val subscriptions = CompositeDisposable() subscriptions.add( Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .filter { it > 5 <-- 數值必須大於5才可以pass給訂閱者 } .subscribe { print("Get number: $it\n") } ) ``` 所以上面的結果會收到6、7、8、9、10。 ### `skip` 忽略「前n個」next event。 ```kotlin Observer.just(1, 2, 3, 4 ,5) .skip(3) <-- 忽略前3個 .subscribe { println("it") } // Output 4, 5 ``` ### `skipWhile` `skipWhile`也是用一個lambda來當作通過條件,跟[[20200207 - Study RxKotlin#filter]]類似。但不像`filter`會去檢查「每一個」進來的element,`skipWhile`是「當lambda檢查到第一個`false`的時候,後面全部通過」。 另一個跟`filter`不同的是,`skipWhile`是在檢查為`true`把next event忽略掉,檢查到`false`的時候開始放行。 例如,我們要收集字串,但我們要當字串是"start"的時候才開始收集字串,例: ```kotlin val subscriptions \= CompositeDisposable() subscriptions.add( Observable.just("1", "2", "3", "Start", "1", "3", "2") .skipWhile { it != "Start" } <-- 比對為false開始放行 .subscribe { println("Get $it") } ) // Output Get Start Get 1 Get 3 Get 2 ``` ### `skipUntil` 與前面的skip operator不同,`skipUntil`不是用lambda來決定skip的條件,而是依賴於「另一個subject」,`skipUntil`會一直忽略,直到「另一個subject」發出`onNext` event。例: ```kotlin val subject = PublishSubject.create() val trigger = PublishSubject.create() subject .skipUntil(trigger) .subscribe { println("it") } ``` In this code you'll get nothing, until `trigger` sent an `onNext()` event. Example: ```kotlin subject.onNext("A") // Ignored subject.onNext("B") // Ignored trigger.onNext("1") // TRIGGER! subject.onNext("C") // send out ``` ![[Pasted image 20210202155003.png]] ### `take` `take`跟[[20200207 - Study RxKotlin#skip]]相反,`take`是接收「前n個」訊息,之後全部忽略。 ### `takeWhile` `takeWhile`是用lambda當判斷條件,當判斷為`true`的時候放行,一旦判斷為`false`,之後的所有訊息都會被忽略。 ```kotlin exampleOf("takeWhile") { val subscriptions = CompositeDisposable() subscriptions.add( Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1)) .takeWhile { it < 5 } .subscribe { println("Get $it") } ) } ``` 上例中,判斷到5的時候,`it < 5`為`false`,所以之後的都會忽略,即使最後那個1也是一樣被忽略。 `takeWhile`跟[[20200207 - Study RxKotlin#skipWhile]]一樣,但是行為相反。 ### `takeUntil` 跟[[20200207 - Study RxKotlin#skipUntil]]相反的行為,會一直接收訊息,直到他依賴的subject發出訊息後停止。 ### `distinctUntilChanged` `distinctUntilChanged`用來過濾「連續且相同」的訊息,例如連續的"Hi",那麼就只會收到第1個"Hi",之後的都不會收到。但是一旦收到的訊息改變了,再次收到以前發過的訊息,只要它沒有跟前一筆相同,那麼就還是會接收。例: ```kotlin val subscriptions = CompositeDisposable() subscriptions.add( Observable.just(5, 5, 3, 3, 1) .distinctUntilChanged() .subscribe { println(it) } ) // Output 5 3 1 ``` `distinctUntilChanged`預設用class的`equal()` method來判斷,當然我們也可以給它一個lambda來當客製我們的條件,被lambda判斷為`true`的話,該訊息就會被忽略: ```kotlin val subscriptions = CompositeDisposable() subscriptions.add( Observable.just("ABC", "CCD", "FAG") .distinctUntilChanged { first, second -> first[second.length - 1] == second[0] } .subscribe { println("Get $it") } ) ``` 上例中,我們希望「第2個字串的開頭字母要是跟第1個字串的結尾字母一樣的話,那麼就不要顯示」。第1筆"ABC"一定會接收,第2筆"CCD"則會被忽略,第3筆"FAG"會被接收。 ## 2. Transforming Operators ### 1. toList `toList`可以把每一個單獨從Observalbe發出來的元素變成一個list,如: ```kotlin val subscriptions = CompositeDisposable() val items = Observable.just("A", "B", "C") subscriptions.add( items.toList() .subscribeBy { println(it) } ) ``` 原本會單獨發出的"A", "B", "C",現在變成只翠發出一個List,其內容是`["A", "B", "C"]`。 ### 2. map `map`跟Kotlin的`map`行為上差不多,只是Kotlin的`map`是作用在List上,而RxJava的`map`是作用在Observable上。`map`根據你提供的lambda函式來對每一個element做轉換,如下例,將每一個羅馬數字轉換為阿拉伯數字: ```kotlin val subscriptions = CompositeDisposable() subscriptions.add( Observable.just("M", "C", "V", "I") .map { it.romanNumeralIntValue() } .subscribeBy { println(it) } ) ``` 注意到了嗎?`map`用來轉換Observable所包含的item型別,上例中,Observable的item本來是一個字串(`String`),被`map`轉換為數字(`Int`)。 ### 3. flatMap `flatMap`用來處理「Observable發出來的Observable」,並且「記錄每一個變化」。用例子比較好說明: ```kotlin class Student(val score: BehaviorSubject) val subscriptions = CompositeDisposable() val ryan = Student(BehaviorSubject.createDefault(80)) val charlotte = Student(BehaviorSubject.createDefault(90)) val student = PublishSubject.create() student .flatMap { it.score } .subscribeBy { println(it) } .addTo(subscriptions) student.onNext(ryan) <-- 1 ryan.score.onNext(85) <-- 2 student.onNext(charlotte) <-- 3 ryan.score.onNext(95) <-- 4 charlotte.score.onNext(100) <-- 5 ``` 我們有一個叫做student的`PublishSubject`,另外有兩個Student class(分別是ryan與charlotte),而這個Student class有一個member叫做score,score的類別是`BehaviorSubject`。我們的`flatMap` lambda不做任何轉換,直接bypass分數。 1. student先選擇ryan來發出第一個onNext event,ryan原本的分數是80,所以我們會收到80。 2. ryan變更為85,所以我們會收到85。 3. student選擇了charlotte並發出一個onNext event,charlotte原本的分數是90,所以我們會收到90。 4. ryan變更為95,所以我們會收到95。 5. charlotte變更為100,所以我們會收到100。 結果會收到80、85、90、95、100。 ryan跟charlotte都是獨立的Observable,但透過`flatMap`我們可以把它們的值(以及後續的變化)變成一連串的數值,這就是`flat`的意思。 ![[rxJava_flatMap.png]] ### 4. switchMap `switchMap`跟`flatMap`類似,也是處理「Observable發出來的Observable」,但是差別在於`switchMap`一但切換到新的Observable之後,上一個Observale的訊息就部會收到了,以`flatMap`的例子來說,在`student.onNext(charlotte)`這一行之後,ryan的改變就不會收到了。例: ```kotlin val ryan = Student(BehaviorSubject.createDefault(80)) val charlotte = Student(BehaviorSubject.createDefault(90)) val student = PublishSubject.create() student .switchMap { it.score } .subscribe { println(it) } student.onNext(ryan) ryan.score.onNext(85) student.onNext(charlotte) ryan.score.onNext(95) charlotte.score.onNext(100) ``` 結果會收到80、85、95、100。`ryan.score.onNext(95)`這一行的95不會收到。 `switchMap`適合用在會「改變興趣」的場景,例如說原本是要持續收到台北氣溫的改變,接著使用者把地點改到高雄,那我們就會變成持續收到高雄的溫度變化而不是台北的,又或者說,你會隨著使用的的輸入持續的搜尋結果,例如使用者依序輸入k、o、t、l、i、n,每輸入一個字母我們就搜尋一次,但我們只關注最後一個字串搜尋,不在意之前的搜尋結果。 ### 5. materialize `materialize`能將Observable的值包裝成一個`Notification`,回到[[20200207 - Study RxKotlin#4 switchMap]]的例子,如果任何一個學生發出了`onError`的訊息,那麼連`student`本身都會因為這個Exception而中斷,所以即使切到了charlotte,我們也收不到charlotte的訊息了。`materialize`可以將`onError`包裝成一個`Notification`,讓exception留在ryan本身而不會影響到上面的student。 ```kotlin val subscriptions = CompositeDisposable() val ryan = Student(BehaviorSubject.createDefault(80)) val charlotte = Student(BehaviorSubject.createDefault(90)) val student = BehaviorSubject.createDefault(ryan) // 1 val studentScore = student.switchMap { it.score.materialize() } <-- HERE! // 2 subscriptions.add( studentScore .subscribe { println(it) } ) // 3 ryan.score.onNext(85) ryan.score.onError(RuntimeException("Error!")) ryan.score.onNext(90) // 4 student.onNext(charlotte) ``` ![[rxkotlin_materialize.png]] ### 6. dematerialize `dematerialize`用來反解`materialize`所包裝的東西,例如上例中,會將`materialize`所包裝出來的`Observable!>!`反解為`Observable!`,例: ```kotlin subscriptions.add( studentScore .filter { if (it.error != null) { println("Got error: ${it.error}") false } else { true } } .dematerialize { it } .subscribe { println(it) } ) ``` 要注意的是如果發生Exception的話,直接`println`還是會產生Exception,所以需要用`filter`來把error給濾掉。 ![[rxkotlin_dematerialize.png]] 在Subject所發出的element仍然是Subject的時候,如果element發生error(Exception),會導致上層的Subject也跟著停止,`materialize`/`dematerialize`可以用來包裝element,讓element所發出的東西都變成`Notification`,這樣就部會影響上層的Subject了。 ## 3. Combining Operators ### startWith 用來在Observable本身所帶的item前面再加上其他item。實際的有`startWithIterable()`與`startWithItem()`。 例: ```kotlin val subscriptions = CompositeDisposable() val numbers = Observable.just(3, 4, 5) val startWith = numbers.startWithIterable(listOf(1, 2)) startWith.subscribe { println(it) }.addTo(subscriptions) ``` ### concat `concat`是一個static method,用來合併2個Observable。`concat`會先等第一個Observable結束,然後再等待第二個,之後把它們合併起來。 ```kotlin val subscriptions = CompositeDisposable() val first = Observable.just(1, 2, 3) val second = Observable.just(3, 4, 5) Observable.concat(first, second) .subscribe { println(it) } .addTo(subscriptions) ``` ### concatWith 跟[[20200207 - Study RxKotlin#concat]]一樣,但是`concatWith`是一個member function,而不是一個static method。`concatWith`一樣會先等自己結束,然後再等第二個Obervable(當參數的那一個)結束,之後再合併。 ```kotlin val subscriptions = CompositeDisposable() val first = Observable.just(1, 2, 3) val second = Observable.just(3, 4, 5) first.concatWith(second) .subscribe { println(it) } .addTo(subscriptions) ``` > ## 注意 > 要被合併的兩個Observable類型必須要一樣,不可以一個是`Obsrvable`而另一個是`Observable`,compiler會報錯喔。 ### concatMap `concatMap`接受一個lambda函示,並回傳另一個Observable序列,`concatMap`會保證Observable的順序。 Given multiple Observable, and map each Observable to a lambda function. And make sure the sequence of given Observable list. ### merge `merge`是一個static function。 `merge`會按照接收的順序把element合併起來,例: ```kotlin val subscriptions = CompositeDisposable() val odd = PublishSubject.create() val even = PublishSubject.create() Observable.merge(odd, even) .subscribe { println(it) } .addTo(subscriptions) odd.onNext(1) even.onNext(2) odd.onNext(3) even.onNext(4) odd.onNext(5) even.onNext(6) ``` odd與even兩個交互發出elemet,merge依順序接收,而不是像[[20200207 - Study RxKotlin#concat]]是依照Observable的順序。 `merge` complete的時機點定義如下: - 當來源的Observable與內部的Observable都complete的時候,merge本身也會發出complete。 - 內部Observable結束的順序跟接收的順序沒有關係。(一律按照接收點) - 如果有任何Observable發生error,`merge`會轉發這個error,然後自己發生terminate。 Q: - What's the different between `flatMap()`? -> 很大的差別,`flatMap()` 有map的功能。 ### mergeWith 就像`concatWith()`與`concat()`的關係,`mergeWith()`跟`merge`也是一樣的關係。 `mergeWith()`是一個member function,必須由某個Observable instance來呼叫。 ### combineLatest combineLatest是`Observables`的static funtion(注意不是`Observable`)。 combineLatest只會接收2個Observalbe的「最後一個」elements,然後交由你所提供的lambda來處置,例: ```kotlin val subscriptions = CompositeDisposable() val left = PublishSubject.create() val right = PublishSubject.create() Observables.combineLatest(left, right) { leftString, rightString -> "$leftString, $rightString" }.subscribe { println(it) }.addTo(subscriptions) left.onNext("Hello") right.onNext("World") left.onNext("It's nice to") right.onNext("be here") left.onNext("Actually, it's super great to") ``` 重點: 1. 在上例中是直接結合2個字串,但是其實可以是任何用途。 2. 在實務中,`combineLatest`可以用來結合2個不同型別的Observable,然後再回傳另一個不同型別的Observable。`combineLatest`回傳的Observable型別由lambda決定。 3. `combineLatest`必須在「每一個」Observable都發出element之後才會動作。如果不確定Observable是否會發出element,可以使用[[20200207 - Study RxKotlin#startWith]]來讓Observable有一個初始值,這樣可以避免`combineLatest`永遠不會發生的情況。 4. 如果有某個Observable已經complete,`combineLatest`會保留它的最後一個element,然後繼續結合更新的element。 5. 直到最後一個Observable complete,`combineLatest`才會complete。 ![[combineLatest.png]] ### zip - `zip` wait until each if the inner Ovservables emits a new value. ### Triggers #### withLastestFrom - `withLatestFrom` is useful in all situations where you want the current(latest) value emittted frim an Observable, but only when a particular trigger occurs. #### sample - Just like `withLastFrom`. But each time the trygger Ivsercable emits a value, `sample` emits the latest value from the "other" Obervable, but only if it arrived since the last "tick". You can combine `withLastFrom` and `distinctUntilChanged` to do the same behavior of `sample`. ``` exampleOf("sample") { val subscriptions = CompositeDisposable() val button = PublishSubject.create() val editText = PublishSubject.create() editText.sample(button) .subscribe { println(it) }.addTo(subscriptions) editText.onNext("Par") editText.onNext("Pari") editText.onNext("Paris") button.onNext(Unit) button.onNext(Unit) <- button emits twice, but editText only emit last value } // Output --- Example of: sample --- Paris ``` ### Switchs - ambWith - Think of `amb` as in ambiguous. - `ambWith` connect to two Observables. And wait any of them who emit element first. If any Observable emit element, another one will be unsubscribed. ### reduce - `reduce` accumulates a summary value. ### scan - Like `reduce`, but emit per input value. ## 4. Time-Based Operators ### Buffering #### replay - This operator creates a new sequence that records the last N elements emitted by the source Observable. #### replayAll #### window - Difference is that it emits an Observable of the buffered items, instead of emitting an array. ### Time-Shifting #### delaySubscription - Delay the time a subscriber starts receiving elements from its subscription. #### delay - This operator subscribes immediateley to the source observable, but delays every emitted element by the specified amount of time. ### Timer #### Observable.interval - Produce an infinite Observable sequence of Int values. #### Observable.timer - Specify a "due time" as the time that elapsed between the point of subscription and the first emitted value. - If the "repeat period" is not assigned, the timer Observable will emit once, the complete. #### timeout - Emit an TimeoutException error event. If not caught, it terminates the sequence. ## 5. Explore Operators # 參考資料: - [RxMarbles: Interactive diagrams of Rx Observables](https://rxmarbles.com/#delayWhen)