본문 바로가기

Rx

RxKotlin- Part 5. Hot Observable, Cold Observable

728x90

Cold Observable

Observable은 Hot, Cold로 볼 수 있습니다.

콜드 Observable은 Observable을 선언하고 just(), from...() 함수를 호출해도 Observer(옵저버)가 subscribe() 함수를 호출해 구독하지 않으면 데이터를 발행(emit)하지 않습니다.

콜드Observable은 보통 웹 요청, 데이터베이스 쿼리, 파일 읽기 등에 적합하고, URL이나 데이터를 지정하면 해당 서버 또는 데이터베이스로 부터 요청을 보내고 결과를 받아옵니다.

/** 콜드 옵저버블 */
fun main(){
    val observable : Observable<String> = listOf("item 1","item 2","item 3").toObservable()
    // toObservable() : 일반 리스트를 Observable로 변경할 수 있는확장 함수 기능
    observable.subscribe(
        { println("$it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독
}

실행결과:

item 1
item 2
item 3
Complete

위 코드의 결과를 보면 구독하여 발행된 데이터가 출력된 것을 알 수 있습니다. 콜드 Observable은 구독 시 데이터 발행을 시작하여 subscribe() 함수 호출로 각 데이터를 동일한 순서로 푸시하는 것 입니다.

Hot Observable

핫 Observable은 구독자의 존재는 상관하지 않고 데이터를 발행하는 Observable입니다.

따라서 여러 구독자를 고려할 수 있지만, 모든 데이터를 처음부터 수신할 수 있는 것은 보장하지 않습니다.

즉, 핫 Observable은 구독한 시점부터 Observable에서 발행한 값을 받게됩니다.

핫 Observable은 마우스, 키보드, 시스템 이벤트와 센서 데이터 등이 있습니다. 만약 온도 센서의 데이터를 처리한다면, 처음부터 온도를 알 필요가 없이 최근 온도만 있으면 되니, 핫 Observable이 적합합니다.

Cold -> Hot

콜드Observable을 핫Observable 객체로 변환하려면 Subject 객체를 만들거나, ConnectableObservable 클래스를 활용하는 방법이 있습니다.

ConnectableObservable의 가장 큰 목적은 한 Observable에 여러 구독자를 연결하여 하나의 푸시에 한꺼번에 대응하는 것 입니다. 콜드와 반대되는 성향을 보여줍니다.

/** 핫 옵저버블 */
fun main(){
    val connectableObservable = listOf("item 1","item 2","item 3").toObservable().publish()
    // publish(): Hot Observable로 변환

    // operator로 Observable의 이벤트를 변환, 제어
    connectableObservable.map(String::reversed).subscribe(
        { println("1. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 1

    connectableObservable.subscribe(
        { println("2. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 2

    connectableObservable.connect()

    connectableObservable.subscribe(
        { println("3. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 3
}

실행결과:

1. Received 1 meti
2. Received item 1
1. Received 2 meti
2. Received item 2
1. Received 3 meti
2. Received item 3
Complete
Complete

위 실행결과를 보면 콜드 Observable의 예제와는 조금 다릅니다. ConnectableObservable은 connect() 함수 이전에 호출된 모든 Observable를 연결하여 모든 Observable에 단일 푸시를 전달하여 대응할 수 있게 합니다.

이처럼 Observable에서 단 한번의 발행으로 모든 구독자에게 발행을 할 수 있는 방식을 Muticasting이라고 합니다.

fun main() {
    val connectableObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
        .publish()

    connectableObservable.subscribe(
        { println("1. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 1

    connectableObservable.subscribe(
        { println("2. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 2

    connectableObservable.connect()

    runBlocking { delay(500) }

    connectableObservable.subscribe(
        { println("3. Received $it") },
        { println("Error : ${it.message}") },
        { println("Complete") }
    )  //구독 3

    runBlocking { delay(300) }
}

실행결과:

1. Received 0
2. Received 0
1. Received 1
2. Received 1
1. Received 2
2. Received 2
3. Received 2
1. Received 3
2. Received 3
3. Received 3

interval() : 특정 시간별로 연속된 정수형을 배출하는 Observable을 생성

실행결과를 보면 500ms 후 발행되는 Observable이 이전에 발행된 0, 1을 발행하지 못한 것을 확인할 수 있습니다.


배압이란?

배압은 Part 1.의 onCreate() 함수를 공부할 때 처음 접했는데, 배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생합니다.

핫 Observable은 배압을 고려해야 하는 주의점이 있습니다.

RxJava 2 이전에는 Observable 클래스에 별도의 배압 연산자들을 제공해왔지만, RxJava 2부터는 Flowable 클래스에서 배압을 처리합니다.

2021/01/31 - [Rx] - RxJava - Part 1. Observable 생성 - just(), onCreate()


구독자가 여러명이다?

RxJava에서 구독자가 여러명이다라는 부분에 대해 알아보면, 만약 도서 API를 이용하여 서버에 요청하고 반환된 JSON 문서를 parsing하여 각각의 속성을 추출하게 되었을 때, 필요한 데이터, 즉 저작자 정보, 제목 정보, 가격 정보를 반환한다면 이 3개의 정보가 구독자가 된다는 것이라고 보면 될 것 같습니다.