Cold Observable
Observable은 Hot, Cold로 볼 수 있습니다.
콜드 Observable은 Observable을 선언하고 just(), from...() 함수를 호출해도 Observer(옵저버)가 subscribe() 함수를 호출해 구독하지 않으면 데이터를 발행(emit)하지 않습니다.
콜드Observable은 보통 웹 요청, 데이터베이스 쿼리, 파일 읽기 등에 적합하고, URL이나 데이터를 지정하면 해당 서버 또는 데이터베이스로 부터 요청을 보내고 결과를 받아옵니다.
/** 콜드 옵저버블 */
fun main(){
val observable : Observable<String> = listOf("item 1","item 2","item 3").toObservable()
// toObservable() : 일반 리스트를 Observable로 변경할 수 있는확장 함수 기능
observable.subscribe(
{ println("$it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독
}
실행결과:
item 1
item 2
item 3
Complete
위 코드의 결과를 보면 구독하여 발행된 데이터가 출력된 것을 알 수 있습니다. 콜드 Observable은 구독 시 데이터 발행을 시작하여 subscribe() 함수 호출로 각 데이터를 동일한 순서로 푸시하는 것 입니다.
Hot Observable
핫 Observable은 구독자의 존재는 상관하지 않고 데이터를 발행하는 Observable입니다.
따라서 여러 구독자를 고려할 수 있지만, 모든 데이터를 처음부터 수신할 수 있는 것은 보장하지 않습니다.
즉, 핫 Observable은 구독한 시점부터 Observable에서 발행한 값을 받게됩니다.
핫 Observable은 마우스, 키보드, 시스템 이벤트와 센서 데이터 등이 있습니다. 만약 온도 센서의 데이터를 처리한다면, 처음부터 온도를 알 필요가 없이 최근 온도만 있으면 되니, 핫 Observable이 적합합니다.
Cold -> Hot
콜드Observable을 핫Observable 객체로 변환하려면 Subject
객체를 만들거나, ConnectableObservable
클래스를 활용하는 방법이 있습니다.
ConnectableObservable
의 가장 큰 목적은 한 Observable에 여러 구독자를 연결하여 하나의 푸시에 한꺼번에 대응하는 것 입니다. 콜드와 반대되는 성향을 보여줍니다.
/** 핫 옵저버블 */
fun main(){
val connectableObservable = listOf("item 1","item 2","item 3").toObservable().publish()
// publish(): Hot Observable로 변환
// operator로 Observable의 이벤트를 변환, 제어
connectableObservable.map(String::reversed).subscribe(
{ println("1. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 1
connectableObservable.subscribe(
{ println("2. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 2
connectableObservable.connect()
connectableObservable.subscribe(
{ println("3. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 3
}
실행결과:
1. Received 1 meti
2. Received item 1
1. Received 2 meti
2. Received item 2
1. Received 3 meti
2. Received item 3
Complete
Complete
위 실행결과를 보면 콜드 Observable의 예제와는 조금 다릅니다. ConnectableObservable은 connect()
함수 이전에 호출된 모든 Observable를 연결하여 모든 Observable에 단일 푸시를 전달하여 대응할 수 있게 합니다.
이처럼 Observable에서 단 한번의 발행으로 모든 구독자에게 발행을 할 수 있는 방식을 Muticasting
이라고 합니다.
fun main() {
val connectableObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
.publish()
connectableObservable.subscribe(
{ println("1. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 1
connectableObservable.subscribe(
{ println("2. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 2
connectableObservable.connect()
runBlocking { delay(500) }
connectableObservable.subscribe(
{ println("3. Received $it") },
{ println("Error : ${it.message}") },
{ println("Complete") }
) //구독 3
runBlocking { delay(300) }
}
실행결과:
1. Received 0
2. Received 0
1. Received 1
2. Received 1
1. Received 2
2. Received 2
3. Received 2
1. Received 3
2. Received 3
3. Received 3
interval() : 특정 시간별로 연속된 정수형을 배출하는 Observable을 생성
실행결과를 보면 500ms 후 발행되는 Observable이 이전에 발행된 0, 1을 발행하지 못한 것을 확인할 수 있습니다.
배압이란?
배압은 Part 1.의 onCreate() 함수를 공부할 때 처음 접했는데, 배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생합니다.
핫 Observable은 배압을 고려해야 하는 주의점이 있습니다.
RxJava 2 이전에는 Observable 클래스에 별도의 배압 연산자들을 제공해왔지만, RxJava 2부터는 Flowable
클래스에서 배압을 처리합니다.
2021/01/31 - [Rx] - RxJava - Part 1. Observable 생성 - just(), onCreate()
구독자가 여러명이다?
RxJava에서 구독자가 여러명이다라는 부분에 대해 알아보면, 만약 도서 API를 이용하여 서버에 요청하고 반환된 JSON 문서를 parsing하여 각각의 속성을 추출하게 되었을 때, 필요한 데이터, 즉 저작자 정보, 제목 정보, 가격 정보를 반환한다면 이 3개의 정보가 구독자가 된다는 것이라고 보면 될 것 같습니다.
'Rx' 카테고리의 다른 글
RxKotlin- Part 7. ConnectableObservable Class (0) | 2021.03.09 |
---|---|
RxKotlin- Part 6. Subject Class (0) | 2021.02.20 |
RxKotlin- Part 4. Observable 외 다른 스트림 (Single , Maybe) (0) | 2021.02.04 |
RxKotlin- Part 3. Observable 생성 (fromFuture, fromPublisher) (0) | 2021.01.31 |
RxKotlin- Part 2. Observable 생성 (fromArray(), fromIterable()) (0) | 2021.01.31 |