Files
Obsidian-Main/03. 專注Study/RxKotlin/20200207 - Study RxKotlin.md
Awin Huang 5e4265b783 vault backup: 2022-09-30 21:53:03
Affected files:
.obsidian/workspace
03. 專注Study/Android/ADB 取得 APK 的 icon.md
03. 專注Study/Android/ADB.md
03. 專注Study/Android/AOSP.md
03. 專注Study/Android/Android programming.md
03. 專注Study/Android/AudioTrack.md
03. 專注Study/Android/Ktor.md
03. 專注Study/Android/Service.md
03. 專注Study/Android/Tools.md
03. 專注Study/Android/UI.md
03. 專注Study/C++/C++17.md
03. 專注Study/C++/Class template.md
03. 專注Study/C++/GCC.md
03. 專注Study/C++/Structured binding declaration.md
03. 專注Study/C++/for_each.md
03. 專注Study/C++/lambda.md
03. 專注Study/C++/lvalue.md
03. 專注Study/C++/move operator.md
03. 專注Study/C++/rvalue.md
03. 專注Study/C++/智慧指標.md
03. 專注Study/RxKotlin/20200207 - Study RxKotlin.md
04. Programming/COM/20210726 - COM Interface.md
04. Programming/DB/MySQL.md
04. Programming/DB/sqlite.md
04. Programming/Design Pattern.md
04. Programming/FFMPEG/00. Introduction.md
04. Programming/FFMPEG/01. Setup.md
04. Programming/FFMPEG/FFMpeg.md
04. Programming/Flask.md
04. Programming/Kotlin/class.md
04. Programming/Kotlin/run, let, with, also 和 apply.md
04. Programming/Media Foundation/20210604 - Windows media foundation.md
04. Programming/OpenCV.md
04. Programming/OpenGL.md
04. Programming/Python/argparse.ArgumentParser.md
04. Programming/Python/decorator.md
04. Programming/Python/logging.md
04. Programming/Python/opencv.md
04. Programming/Python/subprocess.md
04. Programming/Python/threading.md
04. Programming/Python/tkinter.md
04. Programming/Python/檢測工具.md
04. Programming/QT/Dropdown button.md
04. Programming/QT/QVariant.md
04. Programming/QT/Qt.md
04. Programming/UML.md
04. Programming/演算法.md
05. 資料收集/99. templates/blogHeader.md
05. 資料收集/99. templates/date.md
05. 資料收集/99. templates/front matter.md
05. 資料收集/99. templates/note.md
05. 資料收集/99. templates/table.md
05. 資料收集/99. templates/thisWeek.md
05. 資料收集/99. templates/日記.md
05. 資料收集/99. templates/讀書筆記.md
05. 資料收集/Linux/CLI/cut.md
05. 資料收集/Linux/CLI/scp.md
05. 資料收集/Linux/CLI/timedatectl.md
05. 資料收集/Linux/Programming.md
05. 資料收集/Linux/Ubuntu.md
05. 資料收集/Tool Setup/Hardware/RaspberryPi.md
05. 資料收集/Tool Setup/Software/Chrome.md
05. 資料收集/Tool Setup/Software/Obisidian.md
05. 資料收集/Tool Setup/Software/SublimeText.md
05. 資料收集/Tool Setup/Software/VirtualBox.md
05. 資料收集/Tool Setup/Software/Visual Studio Code.md
05. 資料收集/Tool Setup/Software/Windows Setup.md
05. 資料收集/Tool Setup/Software/Windows Terminal.md
05. 資料收集/Tool Setup/Software/freefilesync.md
05. 資料收集/Tool Setup/Software/vim.md
05. 資料收集/名言佳句.md
05. 資料收集/架站/Gitea.md
05. 資料收集/架站/HTTP Server/Apache.md
05. 資料收集/架站/HTTP Server/Nginx/Reverse Proxy(Layer4).md
05. 資料收集/架站/Pelican blog.md
05. 資料收集/架站/Proxmox VE.md
05. 資料收集/架站/SWAG Reverse proxy.md
05. 資料收集/架站/Storj.md
05. 資料收集/架站/Trojan.md
05. 資料收集/每週外食.md
05. 資料收集/科技/802.11.md
05. 資料收集/科技/HDR Sensor.md
05. 資料收集/科技/量子電腦.md
05. 資料收集/科技/鋰電池.md
05. 資料收集/興趣嗜好/RC/Traxxas Sledge.md
05. 資料收集/興趣嗜好/RC/好盈電變調整中立點.md
05. 資料收集/興趣嗜好/RC/差速器調教教學.md
05. 資料收集/興趣嗜好/模型/舊化作例.md
05. 資料收集/興趣嗜好/軍武/虎式.md
05. 資料收集/讀書筆記/20201201 - 學習如何學習.md
05. 資料收集/讀書筆記/20201218 - Kotlin權威2.0.md
05. 資料收集/讀書筆記/20201224 - 寫作是最好的自我投資.md
05. 資料收集/讀書筆記/20210119 - 中產悲歌.md
05. 資料收集/讀書筆記/20210220 - 最高學習法.md
05. 資料收集/讀書筆記/20210320 - 最高學以致用法.md
05. 資料收集/讀書筆記/20210406 - 精準購買.md
05. 資料收集/讀書筆記/20210723 - 高手學習.md
05. 資料收集/讀書筆記/20220526 - 深入淺出設計模式.md
05. 資料收集/讀書筆記/20220619 - 精確的力量.md
05. 資料收集/軟體工具/IPFS.md
05. 資料收集/軟體工具/MkDocs.md
05. 資料收集/軟體工具/Obsidian.md
05. 資料收集/軟體工具/docker.md
05. 資料收集/軟體工具/git/apply.md
05. 資料收集/軟體工具/git/submodule.md
05. 資料收集/軟體工具/youtube-dl.md
05. 資料收集/面試準備/技术面试最后反问面试官的话.md
2022-09-30 21:53:04 +08:00

37 KiB
Raw Blame History

剛開始

先說說Rx

Rx最早是Microsoft的某個實驗室為了解決asychronous、scalable還有一些app問題而提出來的libraray。大概在2009年的時候提出叫做Reactive Extension for .NET(Rx). 一開始是以add-on的方式安裝在.NET 3.5上,到了.NET 4.0就變成了內建的library。也因為它open source的關係讓其他語言得以將這套概念也移植過去所以現在有RxJS, RxSwift, RxNET, RxScale, RxJava。這些library都致力於在它們的語言來實作出相同的「行為」所以理論上iOS工程師可以和Web工程師用Rx來討論app的邏輯是沒有問題的。 Rx的官網http://reactivex.io/它的logo是一支電鰻Electric eel !Rx_Logo_S.png

Rx Community

什麼是RxJava

RxJava is a library for composing asynchronous and event-based code using observable sequences and functional style operations, allowing for parameterized execution via schedulers.

RxJava, in its essence, simplifies developing asynchronous programs by allowing your code to react to new data and process it in a sequential, isolated manner. In other words, RxJava lets you observe sequence of asychronous events in an app and respond to each event accordingly. Examples are taps by a user on the screen and listening for results if asynchronous network calls.

再說RxJava

RxJava是一個實作Rx的framework。 RxJava與其他的Rx library提供了asynchronous與event-based的解決辦法

而Asychronous code跟Sychronous code的差異 Sychronous code按照字面上的意思執行每一次的結果都相同。 Asychronous code則是在必要的時候才被使用每一次執行的「狀態」不盡相同。也就是沒辦法控制其順序與時間。

Asychronous programming的詞彙

1. State

  • State指的是我們程式所儲存的資料與程式自身行為互動所產生的狀態。

2. Imperative programming

  • Imperative programming指令式程式設計是用一連串的命令或描述來改變程式的狀態。如下面的code
    setupUI()
    bindClickListeners()
    createAdapter()
    listenForChanges()
    
    這些code可能有一些相關的邏輯但是字面上看不出來即使互相調換可能會造成錯誤但也可能不會。

3. Side effect

  • Side effect指的是「一段程式修改了它本身區域外的狀態」譬如說一個處理event的function它除了處理event之外也改變的UI上所顯示的文字。
  • Side effect並不總是不好的我們的程式就是要對某些東西做出改變完全無法改變任何東西的程式是沒有用的。

RxJava試著用接下來的2個概念來解決剛剛提到的3個概念上的問題

4. Declarative code

  • 又叫Fucntional programmingFucntional programming不產生任何side effect。
  • Declarative code定義的是行為。
  • RxJava試著在Declarative code和Imperative programming取一個平衡點它定義行為然後依順序執行。

5. Reactive systems

Reactive systems通常有幾個特性

  • Reponseive保持UI在最新狀態
  • Resilient每個行為都是獨立定義的而且有辦法靈活的處理錯誤。
  • Elastic程式的十座可以處理不同的工作量
  • Message driven每個元件使用Message driven訊息驅動的方式來互相溝通並改進可用性與獨立性解開decouple生命週期與實作的關聯。

Rx的三大組成

1. Observables

Observable<T>是Rx的基礎之一Observable允許觀察者觀察它並接收它發出來的資料。

Observables 的基礎event

Observable會以3種事件event來發出資料

  1. nextnext event會伴隨著一筆資料這也是觀察者用來接收資料的event。
  2. completecomplete event表示Observable已經「成功的」結束了它的生命週期complete event之後觀察者不會再收到任何next event。
  3. errorerror event表示Observable在發生錯誤的情況下結束它的生命週期。跟complete event依樣後續不會再有任何next event。

一個Observable用next所發出來的一連串資料我們稱為"sequence"。sequence可以分為兩種

  1. Finite sequnece 想像你要下載一個檔案我們的code大概是這個樣子

    API.download(file = "http://www...")
        .subscribeBy(
            onNext = { 
                // Handle downloading here
            },
            onComplete = {
                // Handle download finish here
            },
            onError = {
                // Handle error here
            },
        )
    

    API.download()會產生一個Obervable然後我們藉由subscribeBy來訂閱他,並加入我們的處理程序,我們在onNext裡面處理接收到的檔案bufferonComplete裡面了解到檔案已經完成下載,可以做一些後續的處理,onError則是發生了某些錯誤,需要重來或是通知使用者之類。

  2. Infinite sequence Switch button就是一個例子我們要處理switch button的code會是這樣

    switch.checkdChanges()
        .subscribeBy(
            onNext = { isOn ->
                if (isOn) {
                    // Handle on here
                } else {
                    // Handle off here
                }
            }
        )
    

    可以看到這一段subscribeBy()裡面並沒有onCompleteonError因為switch button根本就不會產生這兩種event。

2. Operators

Operators用來處理Observable所發出來的資料可能是過濾或者做一些轉換或其他操作。再以switch button做例子下面的code可以把switch button的狀態做幾個改變

  1. 我們只想收到on的狀態。
  2. 把on的狀態轉為一個字串"We've been toggled on!"。
switch.checkdChanges()
    .filter { it == true }
    .map { "We've been toggled on!" }
    .subscribeBy(
        onNext = { message ->
            updateTextView(message)
        }
    )

3. Schedulers

Scheduler可以想像成threadRxJava已經內建了好幾種scheduler而且應該可以適用於大部分的情形。 例如IO scheduler可以讓你的檔案下載在背景執行TeampolineScheduler可以讓你的程式同時執行, ComputationScheduler可以讓你將程式分配給不同的thread來處理需要大量運算的資料。

RxJava是一個很獨立的library所以有2個library可以跟RxJava一起合作

  1. RxAndroid提供Android Looper class跟RxJava的scheduler之間的橋接管道。
  2. RxBinding用來把UI的click listen之類的callback轉變為Observable的subscribeBy

安裝

build.gradle裡的depedencies區域加入:

implementation "io.reactivex.rxjava3:rxjava:3.0.2"
implementation "io.reactivex.rxjava3:rxkotlin:3.0.0"
implementation "io.reactivex.rxjava3:rxandroid:3.0.0"

Observable

Standard Observable has three types of event:

  1. next
  2. complete
  3. error

Obervable很適合用marble diagram來表示 !Pasted image 20210120150947.png

3個event解釋如下

  1. onNext()onNext() event會伴隨著一筆資料這也是觀察者用來接收資料的event。
  2. onComplete()onComplete() event表示Observable已經「成功的」結束了它的生命週期onComplete() event之後觀察者不會再收到任何onNext() event。
  3. onError()onError() event表示Observable在發生錯誤的情況下結束它的生命週期。跟onComplete() event依樣後續不會再有任何onNext() event。

另外要注意一個Observable在沒有被訂閱的情況下是不會發送任何event的」。

A example of usage of standard Observable:

API.download("http://...")
    .subscribeBy(
        onNext = { /* do something */ }, 
        onComplete = { /* do something */ }, 
        onError = { /* do something */ }, 
    )

建立Observable的方法

1. just

val observable = Observable.just(1, 2, 3)

變數observable的內容設為1個"1、2、3"三個數,型別會是Observable<Int!>!。 如果使用了onNext來發送event的話將會依序發送1、2、3。 但如果是:

val observable = Observable.just(listOf(1, 2, 3))

變數observable的內容會是一個list這個list的內容是"1、2、3"。型別是Observable<List<Int>!>!。 如果使用了onNext來發送event的話將發送一個包含1、2、3的list。

2. fromIterable

用來將list的內容轉變為一個一個單獨的element給Observable。

val observable = Observable.fromIterable(listOf(2, 3, 1))

變數observable的型別會是Observable<Int!>!,而不是Observable<List<Int>!>!

3. empty

建立一個「空的」Observable可以用來表示一個馬上就會結束的事情或是不包含任何東西的情況。

val observable = Observable.empty<Unit>()  
observable.subscribeBy(  
    onNext = { println(it) },  
    onComplete = { println("Completed") }  
)

empty()所建立的observable只會發出onComplete() event所以上面的onNext() event永遠不會發生。 還有Observable所包含的element一定要有一個型別而且不可以是null所以上面的empty()必須明白的寫出Unit型別:empty<Unit>()

4. never

建立一個不會發出任何event的observable。

5. range

產生一個範圍的數列,參數型別必須是整數(Int)。

val observable = Observable.range(1, 10)

上例的onNext()會依序發送1~10的數字出來。

6. create

用來定義自己的event發送方法。 範例:

val observable = Observable.create<String> { emitter ->  
    emitter.onNext("A")  
    emitter.onNext("C")  
    emitter.onNext("B")  
    emitter.onComplete()  
}  
val subscription = observable.subscribeBy(  
    onNext     = { println("Received: $it") },  
    onComplete = { println("Completed") },  
    onError    = { println("Completed") }  
)

create必須帶入要發送的型別,例如IntString或是任何class此例中是create<String>表示會送出的element是String型別。 然後create則是發送的實作,範例是會發送"A" -> "C" -> "B",然後用onComplete來結束。 注意要是observable沒有onComplete或是onError,然後Disposable(也就是訂閱者)也沒有呼叫dispose()則會造成memory leak。

7. defer

defer會建立一個Observable factory每一次呼叫這個factory都會產生一個新的Observable。defer只有一個參數就是我們要「製造」Observable的方法

var flip = false  

val factory: Observable<Int> = Observable.defer {  
 flip = !flip  
 if (flip) {  
        Observable.just(1, 2, 3)  
    } else {  
        Observable.just(4, 5, 6)  
    }  
}

defer後面的lambda就是我們要「製造」Observable的方法。當fliptrue的時候,我們產生Observable.just(1, 2, 3),反之則產生Observable.just(4, 5, 6)。Observable裡面所帶的element都是整數這也是為什麼factory的型別是Observable<Int>。 接下來訂閱這個factory

for (i in 0..3) {  
    val subscription = factory.subscribe {  
        println("Factory out: $it")  
    }  
    disposables.add(subscription)  
}
disposables.dispose()

上面的例子產生了4個Observable。依照flip的值來產生不一樣內容的Observable。

How to subscrible a Observable

訂閱Observable的方法

  1. observable.subscrible()
  2. observable.subscribleBy()

Remember to release the resource. Call disposable() if you don't need Observable anymore. Or use CompositeDisposable() to collect all Disposable and release them.

特殊的Observable

1. Single

Single只有onSuccessonError兩種event。在發出onSuccess或是onError之後,Single就結束了。 譬如說讀取檔案,只會有讀取成功跟讀取失敗兩種情況,下面的範例讀取一個檔案,要是檔案不存在就發送onError(),反之就發送onSuccess()

val subscriptions = CompositeDisposable()  

fun loadText(filename: String): Single<String> {  
    return Single.create create@{ emitter ->  
        val file = File(filename)  

        if (!file.exists()) {  
            emitter.onError(FileNotFoundException("Can't find $filename"))  
            return@create  
        }  

        val contents = file.readText(Charsets.UTF_8)  
        emitter.onSuccess(contents)  
    }  
}  

// Use the single observable  
val subscription = loadText("Copyright.txt")  
    .subscribeBy(  
        onSuccess = { println("Success read: $it") },  
        onError = { println("Error: $it") }  
 )  
subscriptions.add(subscription)

loadText()這個function會返回Single<String>物件,要是讀取檔案成功,就把檔案內容用onSuccess()發送出來:

val contents = file.readText(Charsets.UTF_8)  
emitter.onSuccess(contents)  

要是檔案不存在,就發出onError()

emitter.onError(FileNotFoundException("Can't find $filename")) 

2. Completable

Completable只有onCompletedonError兩種event。跟Single一樣,在發出onCompleted或是onError之後,Completable就結束了。

3. Maybe

MaybeSingleCompletable的混合,他有onSuccess(value)onCompletedonError三種event。Maybe只會發出這三種的其中一種event然後就結束了。

停止訂閱或是結束一個Observable

使用Disposable.dispose()

每一次呼叫observable.subscrible()或是observable.subscribleBy()都會回傳一個Disposable物件當我們不再需要訂閱一個Observable的時候我們必須呼叫dispose()停止訂閱:

val alphaSequnce = Observable.just("A", "B", "C")
val subscription = alphaSequece.subscribe {
    println(it)
}

subscription.dispose()

使用CompositeDisposable.dispose()

對每一個Disposable物件在停止訂閱之後都要呼叫一次dispose()是很煩人的RxJava提供了一個CompositeDisposable class。它可以收納所有的Disposable物件,然後一次停止:

val subscriptions = CompositeDisposable()  
val subscriptionNumbers = Observable.just(1, 2, 3).subscribe {  
    println("Numbers: $it")  
}  
val subscriptionAlphabets = Observable.just("A", "B", "C").subscribe {  
    println("Alphabets: $it")  
}  
  
subscriptions.add(subscriptionNumbers)  
subscriptions.add(subscriptionAlphabets)  
subscriptions.dispose() <-- subscriptionNumbers  subscriptionAlphabets 都會一起呼叫dispose()

忘記呼叫dispose()可能會造成memory leak。

Subjects

Observable必須在建立的時候就指定好資料之後沒辦法再新增資料。而Subject可以在建立資料之後再新增資料Subject也會將新增的資料再馬上轉發給它的訂閱者。

1. PublishSubject

PublishSubject剛開始是沒有任何資料的,它也只會將最新的資料發送給它的訂閱者。另外,要是PublishSubject本身結束了(已經送出了onComplete event那麼新的訂閱者將不會收到任何資料但是會收到onComplete event。

val publishSubject = PublishSubject.create<Int>()  

publishSubject.onNext(0)  

val subscriptionOne = publishSubject.subscribe {  
    println(it)  
}  

publishSubject.onNext(1)  
publishSubject.onNext(2)  

val subscriptionTwo = publishSubject.subscribe {  
    println("2: $it")  
}  

publishSubject.onNext(3)  
subscriptionOne.dispose()  
publishSubject.onNext(4)  
publishSubject.onComplete()  
publishSubject.onNext(5)  
subscriptionTwo.dispose()  

val subscriptionThree = publishSubject.subscribeBy(  
    onNext = { println("3: $it") },  
    onComplete = { println("3: Completed") }  
)

上例中的subscriptionThree只會收到onComplete event也就是只會印出"3: Completed"

2. BehaviorSubject

行為跟PublishSubject類似,但是BehaviorSubject會發送「最後一筆資料」給新的訂閱者。如果BehaviorSubject最後的event是onError,那麼新的訂閱者也會收到onError event。例

val subscriptions = CompositeDisposable()  
val behaviorSubject = BehaviorSubject.createDefault("Initial value")  

behaviorSubject.onNext("X")  

val subscriptionOne = behaviorSubject.subscribeBy(  
    onNext = { println("1: $it") },  
    onError = { println("1: ERROR, $it") }  
)  

behaviorSubject.onError(RuntimeException("Error!"))  
behaviorSubject.onNext("Y") // <-- 不會有人收到因為已經被onError給terminate了  

subscriptions.add(behaviorSubject.subscribeBy(  
    onNext = { println("2: $it") },  
    onError = { println("2: $it") }  
))

另外,可以直接取得BehaviorSubject目前的值,以上例來說,只要用behaviorSubject.value就可以這方法可以很方便的在Rx與非Rx的程式中交換資料。 例子中是用static method BehaviorSubject.createDefault()來建立一個有初始值的BehaviorSubject,當然也可以跟PublishSubject一樣,用BehaviorSubject.create()來建立。

3. ReplaySubject:

BehaviorSubject會發送會後一筆資料,ReplaySubject就是發送最後n筆資料。我們可以用ReplaySubject.createWithSize()這個static method來建立一個ReplaySubject。例:

val replaySubject = ReplaySubject.createWithSize<String>(2)

變數replaySubject的buffer容量是2型別是String

4. AsyncSubject

AsyncSubject的行為比較特別,AsyncSubject只會結束的時候,同時發出最後一筆資料。也就是說,即便一直提供資料給AsyncSubject,它也不會發出任何onNext event給它的訂閱者直到它收到onComplete的時候,它才會同時發出最後一筆onNextonComplete給它的訂閱者。

5. RxRelay

RxRelay永遠不會發出onComplete或是onError。下面例子建立了一個PublishRelay

val publishRelay = PublishRelay.create<Int>()

要使用RxRelay library必須在build.gradle裡面加入

implementation "com.jakewharton.rxrelay3:rxrelay:3.0.0"

Operators

1. Filtering Operators

ignoreElement

ignoreElement()會忽略掉由20200207 - Study RxKotlin#Subjects丟出來的next event訂閱者只會收到onCompletedonError這兩種event也就是讓Subject退化成20200207 - Study RxKotlin#2 Completable。 例:

val subscriptions = CompositeDisposable()  
val strikes = PublishSubject.create<String>()  
  
subscriptions.add(  
    strikes.ignoreElements()
        .subscribeBy {
            println("Done")  <-- 只會收到onComplete跟onError
        }  
)

elementAt

elementAt()只會處理「第n個」next eventn之前跟n之後的都會被忽略。例如

val subscriptions = CompositeDisposable()  
val strikes = PublishSubject.create<String>()  
  
subscriptions.add(  
    strikes.elementAt(2)  <-- 只要收第2個
        .subscribeBy {  
            println("Get $it")
        }  
)  
  
strikes.onNext("A")  <-- 第0個
strikes.onNext("B")  <-- 第1個
strikes.onNext("C")  <-- 第2個
strikes.onNext("D")  <-- 第3個

上面例子只要收「第2個next event」所以只會收到"C"。這也是一個onSuccess event。 elementAt()也等於是把Subject退化成20200207 - Study RxKotlin#3 Maybe。 要是Subject的「第n個」還沒收到就結束了那就是收到onComplete event。

filter

filter()接收一個lambda函數每一個next event所帶的element都必須經過這個函數的「驗證」只有驗證結果為true的時候才會pass給訂閱者。例

val subscriptions = CompositeDisposable()  
  
subscriptions.add(  
    Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))  
        .filter {  
            it > 5                        <-- 數值必須大於5才可以pass給訂閱者
        }  
        .subscribe {  
            print("Get number: $it\n")  
        }  
)

所以上面的結果會收到6、7、8、9、10。

skip

忽略「前n個」next event。

Observer.just(1, 2, 3, 4 ,5)
    .skip(3)          <-- 忽略前3個
    .subscribe {
        println("it")
    }
// Output 4, 5

skipWhile

skipWhile也是用一個lambda來當作通過條件20200207 - Study RxKotlin#filter類似。但不像filter會去檢查「每一個」進來的elementskipWhile是「當lambda檢查到第一個false的時候,後面全部通過」。 另一個跟filter不同的是,skipWhile是在檢查為true把next event忽略掉檢查到false的時候開始放行。 例如,我們要收集字串,但我們要當字串是"start"的時候才開始收集字串,例:

val subscriptions \= CompositeDisposable()  
  
subscriptions.add(  
    Observable.just("1", "2", "3", "Start", "1", "3", "2")  
        .skipWhile { it != "Start" }  <-- 比對為false開始放行
        .subscribe {  
            println("Get $it")  
        }  
)

// Output
Get Start
Get 1
Get 3
Get 2

skipUntil

與前面的skip operator不同skipUntil不是用lambda來決定skip的條件而是依賴於「另一個subject」skipUntil會一直忽略直到「另一個subject」發出onNext event。例

val subject = PublishSubject.create<String>()
val trigger = PublishSubject.create<String>()

subject
    .skipUntil(trigger)
    .subscribe {
        println("it")
    }

In this code you'll get nothing, until trigger sent an onNext() event. Example:

subject.onNext("A") // Ignored
subject.onNext("B") // Ignored
trigger.onNext("1") // TRIGGER!
subject.onNext("C") // send out

!Pasted image 20210202155003.png

take

take20200207 - Study RxKotlin#skip相反,take是接收「前n個」訊息之後全部忽略。

takeWhile

takeWhile是用lambda當判斷條件當判斷為true的時候放行,一旦判斷為false,之後的所有訊息都會被忽略。

exampleOf("takeWhile") {  
 val subscriptions = CompositeDisposable()  
  
    subscriptions.add(  
        Observable.fromIterable(listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1))  
            .takeWhile {  
                it < 5  
            }
            .subscribe {  
                println("Get $it")  
            }  
    )  
}

上例中判斷到5的時候it < 5false所以之後的都會忽略即使最後那個1也是一樣被忽略。 takeWhile20200207 - Study RxKotlin#skipWhile一樣,但是行為相反。

takeUntil

20200207 - Study RxKotlin#skipUntil相反的行為會一直接收訊息直到他依賴的subject發出訊息後停止。

distinctUntilChanged

distinctUntilChanged用來過濾「連續且相同」的訊息,例如連續的"Hi"那麼就只會收到第1個"Hi",之後的都不會收到。但是一旦收到的訊息改變了,再次收到以前發過的訊息,只要它沒有跟前一筆相同,那麼就還是會接收。例:

val subscriptions = CompositeDisposable()  
  
subscriptions.add(  
    Observable.just(5, 5, 3, 3, 1)  
        .distinctUntilChanged()  
        .subscribe {
            println(it)
        }  
)

// Output
5
3
1

distinctUntilChanged預設用class的equal() method來判斷當然我們也可以給它一個lambda來當客製我們的條件被lambda判斷為true的話,該訊息就會被忽略:

val subscriptions = CompositeDisposable()  
  
subscriptions.add(  
    Observable.just("ABC", "CCD", "FAG")  
        .distinctUntilChanged { first, second ->  
            first[second.length - 1] == second[0]  
        }
        .subscribe {  
            println("Get $it")  
        }  
)

上例中我們希望「第2個字串的開頭字母要是跟第1個字串的結尾字母一樣的話那麼就不要顯示」。第1筆"ABC"一定會接收第2筆"CCD"則會被忽略第3筆"FAG"會被接收。

2. Transforming Operators

1. toList

toList可以把每一個單獨從Observalbe發出來的元素變成一個list

val subscriptions = CompositeDisposable()  
val items = Observable.just("A", "B", "C")  
  
subscriptions.add(  
    items.toList()  
        .subscribeBy {  
            println(it)  
        }  
)

原本會單獨發出的"A", "B", "C"現在變成只翠發出一個List其內容是["A", "B", "C"]

2. map

map跟Kotlin的map行為上差不多只是Kotlin的map是作用在List上而RxJava的map是作用在Observable上。map根據你提供的lambda函式來對每一個element做轉換如下例將每一個羅馬數字轉換為阿拉伯數字

val subscriptions = CompositeDisposable()  
  
subscriptions.add(  
    Observable.just("M", "C", "V", "I")  
        .map {  
            it.romanNumeralIntValue()  
        }  
        .subscribeBy {  
            println(it)  
        }  
)

注意到了嗎?map用來轉換Observable所包含的item型別上例中Observable的item本來是一個字串String),被map轉換為數字(Int)。

3. flatMap

flatMap用來處理「Observable發出來的Observable」並且「記錄每一個變化」。用例子比較好說明

class Student(val score: BehaviorSubject<Int>)

val subscriptions = CompositeDisposable()  
val ryan = Student(BehaviorSubject.createDefault(80))  
val charlotte = Student(BehaviorSubject.createDefault(90))  
val student = PublishSubject.create<Student>()  
  
student  
    .flatMap {  
        it.score
    }  
    .subscribeBy {  
        println(it)  
    }  
    .addTo(subscriptions)  
  
student.onNext(ryan)        <-- 1
ryan.score.onNext(85)       <-- 2
  
student.onNext(charlotte)   <-- 3
ryan.score.onNext(95)       <-- 4
charlotte.score.onNext(100) <-- 5

我們有一個叫做student的PublishSubject另外有兩個Student class分別是ryan與charlotte而這個Student class有一個member叫做scorescore的類別是BehaviorSubject<Int>。我們的flatMap lambda不做任何轉換直接bypass分數。

  1. student先選擇ryan來發出第一個onNext eventryan原本的分數是80所以我們會收到80。
  2. ryan變更為85所以我們會收到85。
  3. student選擇了charlotte並發出一個onNext eventcharlotte原本的分數是90所以我們會收到90。
  4. ryan變更為95所以我們會收到95。
  5. charlotte變更為100所以我們會收到100。 結果會收到80、85、90、95、100。

ryan跟charlotte都是獨立的Observable但透過flatMap我們可以把它們的值(以及後續的變化)變成一連串的數值,這就是flat的意思。 !rxJava_flatMap.png

4. switchMap

switchMapflatMap類似也是處理「Observable發出來的Observable」但是差別在於switchMap一但切換到新的Observable之後上一個Observale的訊息就部會收到了flatMap的例子來說,在student.onNext(charlotte)這一行之後ryan的改變就不會收到了。例

val ryan = Student(BehaviorSubject.createDefault(80))  
val charlotte = Student(BehaviorSubject.createDefault(90))  
val student = PublishSubject.create<Student>()  
  
student  
    .switchMap {  
        it.score  
    }  
    .subscribe {  
        println(it)  
    }  
  
student.onNext(ryan)  
ryan.score.onNext(85)  
  
student.onNext(charlotte)  
ryan.score.onNext(95)  
charlotte.score.onNext(100)

結果會收到80、85、95、100。ryan.score.onNext(95)這一行的95不會收到。

switchMap適合用在會「改變興趣」的場景例如說原本是要持續收到台北氣溫的改變接著使用者把地點改到高雄那我們就會變成持續收到高雄的溫度變化而不是台北的又或者說你會隨著使用的的輸入持續的搜尋結果例如使用者依序輸入k、o、t、l、i、n每輸入一個字母我們就搜尋一次但我們只關注最後一個字串搜尋不在意之前的搜尋結果。

5. materialize

materialize能將Observable的值包裝成一個Notification,回到20200207 - Study RxKotlin#4 switchMap的例子,如果任何一個學生發出了onError的訊息,那麼連student本身都會因為這個Exception而中斷所以即使切到了charlotte我們也收不到charlotte的訊息了。materialize可以將onError包裝成一個Notification讓exception留在ryan本身而不會影響到上面的student。

val subscriptions = CompositeDisposable()  
val ryan = Student(BehaviorSubject.createDefault(80))  
val charlotte = Student(BehaviorSubject.createDefault(90))  
val student = BehaviorSubject.createDefault<Student>(ryan)  
  
// 1  
val studentScore = student.switchMap { it.score.materialize() }  <-- HERE!
// 2  
subscriptions.add(  
    studentScore  
        .subscribe {  
            println(it)  
        }  
)  
// 3  
ryan.score.onNext(85)  
ryan.score.onError(RuntimeException("Error!"))  
ryan.score.onNext(90)  
// 4  
student.onNext(charlotte)

!rxkotlin_materialize.png

6. dematerialize

dematerialize用來反解materialize所包裝的東西,例如上例中,會將materialize所包裝出來的Observable<Notification<Int!>!>!反解為Observable<Int>!,例:

subscriptions.add(  
    studentScore  
    .filter {  
        if (it.error != null) {  
            println("Got error: ${it.error}")  
            false  
        } else {  
            true  
        }  
    }  
    .dematerialize { it }  
    .subscribe {  
        println(it)  
    }  
)

要注意的是如果發生Exception的話直接println還是會產生Exception所以需要用filter來把error給濾掉。 !rxkotlin_dematerialize.png

在Subject所發出的element仍然是Subject的時候如果element發生errorException會導致上層的Subject也跟著停止materialize/dematerialize可以用來包裝element讓element所發出的東西都變成Notification這樣就部會影響上層的Subject了。

3. Combining Operators

startWith

用來在Observable本身所帶的item前面再加上其他item。實際的有startWithIterable()startWithItem()。 例:

val subscriptions = CompositeDisposable()  
val numbers = Observable.just(3, 4, 5)  
val startWith = numbers.startWithIterable(listOf(1, 2))  
  
startWith.subscribe {  
    println(it)  
}.addTo(subscriptions)

concat

concat是一個static method用來合併2個Observable。concat會先等第一個Observable結束然後再等待第二個之後把它們合併起來。

val subscriptions = CompositeDisposable()  
val first = Observable.just(1, 2,  3)  
val second = Observable.just(3, 4, 5)  
  
Observable.concat(first, second)  
    .subscribe {  
        println(it)  
    }  
    .addTo(subscriptions)

concatWith

20200207 - Study RxKotlin#concat一樣,但是concatWith是一個member function而不是一個static method。concatWith一樣會先等自己結束然後再等第二個Obervable當參數的那一個結束之後再合併。

val subscriptions = CompositeDisposable()  
val first = Observable.just(1, 2,  3)  
val second = Observable.just(3, 4, 5)  
  
first.concatWith(second)  
    .subscribe {
        println(it)
    }
    .addTo(subscriptions)

注意

要被合併的兩個Observable類型必須要一樣不可以一個是Obsrvable<String>而另一個是Observable<Int>compiler會報錯喔。

concatMap

concatMap接受一個lambda函示並回傳另一個Observable序列concatMap會保證Observable的順序。 Given multiple Observable, and map each Observable to a lambda function. And make sure the sequence of given Observable list.

merge

merge是一個static function。 merge會按照接收的順序把element合併起來

val subscriptions = CompositeDisposable()  
val odd = PublishSubject.create<Int\>()  
val even = PublishSubject.create<Int\>()  
  
Observable.merge(odd, even)  
    .subscribe {  
        println(it)  
    }  
    .addTo(subscriptions)  
  
odd.onNext(1)  
even.onNext(2)  
odd.onNext(3)  
even.onNext(4)  
odd.onNext(5)  
even.onNext(6)

odd與even兩個交互發出elemetmerge依順序接收而不是像20200207 - Study RxKotlin#concat是依照Observable的順序。 merge complete的時機點定義如下

  • 當來源的Observable與內部的Observable都complete的時候merge本身也會發出complete。
  • 內部Observable結束的順序跟接收的順序沒有關係。一律按照接收點
  • 如果有任何Observable發生errormerge會轉發這個error然後自己發生terminate。

Q:

  • What's the different between flatMap()? -> 很大的差別,flatMap() 有map的功能。

mergeWith

就像concatWith()concat()的關係,mergeWith()merge也是一樣的關係。 mergeWith()是一個member function必須由某個Observable instance來呼叫。

combineLatest

combineLatest是Observables的static funtion注意不是Observable)。 combineLatest只會接收2個Observalbe的「最後一個」elements然後交由你所提供的lambda來處置

val subscriptions = CompositeDisposable()
val left = PublishSubject.create<String>()
val right = PublishSubject.create<String>()

Observables.combineLatest(left, right) { leftString, rightString ->
    "$leftString, $rightString"
}.subscribe {
    println(it)
}.addTo(subscriptions)

left.onNext("Hello")
right.onNext("World")
left.onNext("It's nice to")
right.onNext("be here")
left.onNext("Actually, it's super great to")

重點:

  1. 在上例中是直接結合2個字串但是其實可以是任何用途。
  2. 在實務中,combineLatest可以用來結合2個不同型別的Observable然後再回傳另一個不同型別的Observable。combineLatest回傳的Observable型別由lambda決定。
  3. combineLatest必須在「每一個」Observable都發出element之後才會動作。如果不確定Observable是否會發出element可以使用20200207 - Study RxKotlin#startWith來讓Observable有一個初始值這樣可以避免combineLatest永遠不會發生的情況。
  4. 如果有某個Observable已經completecombineLatest會保留它的最後一個element然後繼續結合更新的element。
  5. 直到最後一個Observable completecombineLatest才會complete。 !combineLatest.png

zip

  • zip wait until each if the inner Ovservables emits a new value.

Triggers

withLastestFrom

  • withLatestFrom is useful in all situations where you want the current(latest) value emittted frim an Observable, but only when a particular trigger occurs.

sample

  • Just like withLastFrom. But each time the trygger Ivsercable emits a value, sample emits the latest value from the "other" Obervable, but only if it arrived since the last "tick". You can combine withLastFrom and distinctUntilChanged to do the same behavior of sample.
exampleOf("sample") {
    val subscriptions = CompositeDisposable()
    val button = PublishSubject.create<Unit>()
    val editText = PublishSubject.create<String>()

    editText.sample(button)
        .subscribe {
            println(it)
        }.addTo(subscriptions)

    editText.onNext("Par")
    editText.onNext("Pari")
    editText.onNext("Paris")
    button.onNext(Unit) 
    button.onNext(Unit) <- button emits twice, but editText only emit last value
}

// Output
--- Example of: sample ---
Paris

Switchs

  • ambWith
    • Think of amb as in ambiguous.
    • ambWith connect to two Observables. And wait any of them who emit element first. If any Observable emit element, another one will be unsubscribed.

reduce

  • reduce accumulates a summary value.

scan

  • Like reduce, but emit per input value.

4. Time-Based Operators

Buffering

replay

  • This operator creates a new sequence that records the last N elements emitted by the source Observable.

replayAll

window

  • Difference is that it emits an Observable of the buffered items, instead of emitting an array.

Time-Shifting

delaySubscription

  • Delay the time a subscriber starts receiving elements from its subscription.

delay

  • This operator subscribes immediateley to the source observable, but delays every emitted element by the specified amount of time.

Timer

Observable.interval

  • Produce an infinite Observable sequence of Int values.

Observable.timer

  • Specify a "due time" as the time that elapsed between the point of subscription and the first emitted value.
  • If the "repeat period" is not assigned, the timer Observable will emit once, the complete.

timeout

  • Emit an TimeoutException error event. If not caught, it terminates the sequence.

5. Explore Operators

參考資料: