본문 바로가기

Rx

RxKotlin - Part 1. Observable 생성 - just(), onCreate()

728x90

생산자와 소비자

생산자 : 데이터를 생산하고 발행하는 역할

소비자 : 데이터를 사용하는 주체로, 등록 방법은 크게 Observer, Consumer 방식이 존재한다.

☞ Observer 방식 : Observer 인터페이스를 구현한 객체를 subscribe()을 이용해 소비자를 추가한다.(return type - Unit)

// Observer 방식
    val observer = object : Observer<Int>{
        override fun onSubscribe(p0: Disposable) {
            print("onSubscribe ")
        }

        override fun onNext(p0: Int) {
            print("onNext:${p0} ")
        }

        override fun onError(p0: Throwable) {
            print(p0)
        }

        override fun onComplete() {
            print("onComplete")
        }
    }
    Observable.just(4, 5, 6).subscribe(observer)

 

☞ Consumer 방식 : 보통 자주 보이는 방식으로, 각각의 Consumer를 subscribe()을 이용해 소비자를 추가한다. (return type - Disposable)

 // consumer 방식
    val disposable = Observable.just(1,2,3).subscribe(
        { println("$it") },
        { println("onError") }, // onError: Consumer
        { println("onComplete") }, // onComplete: Consumer
        { println("onSubscribe") } // onSubscribe: Consumer
    )

 

Observable

Observable은 데이터를 전달하는 생산자 역할을 한다.

RxJava에선 Observable을 구독하는 observer가 존재하며, Observable이 순차적으로 발행하는 데이터에 대해서 반응하게 된다.

Observable 이벤트

☞ onNext() : Observable에서 Observer까지 한 번에 하나씩 데이터를 발행한다.

☞ onComplete() : 데이터 발행이 끝나게되면 완료 이벤트를 Observer에 전달하여 발행 작업이 없음을 알린다.

☞ onError() : 오류가 발생했음을 Observer에 전달한다.

Observable 생성

RxJava는 Operator(연산자)를 통해 Observable을 생성한다.

Operator(연산자) 종류 : just(), onCreate(), fromArray(), fromIterable(), fromFuture(), fromPublisher(), fromCallable()

just()

Observable을 간단하게 생성할 수 있는 함수로, 같은 타입최대 10개의 데이터를 차례로 발행할 수 있으며 실제 발행은 subscribe() 함수를 호출해야 된다.

fun main() {
    Observable.just(1,2,4,5,6).subscribe(System.out::println)
}

실행결과:

1
2
4
5
6

위 코드처럼 just()를 이용해 데이터 1,2,3,4,5,6를 생성하고 발행했다

subscribe()

RxJava에서 실행되기 위해 사용하는 함수는 subscribe() 함수이며, just 등으로 데이터 흐름을 정한 후 subscribe() 함수를 호출하여 실제로 데이터를 발행한다. (return type - Disposable 인터페이스 객체)

Disposable 인터페이스 - dispose(), isDisposed 두 가지만 있다.

dipose() : Observable에게 더이상 데이터를 발행하지 않도록 subscribe를 해지한다. 하지만 Observable이 OnComplete 알림을 보내면 dispose() 호출되기 때문에 별도로 dispose()를 호출할 필요가 없다.

isDisposed() : Observable이 subscribe를 해지했는지를 확인하는 함수다.

create()

just()의 경우엔 자동으로 알림 이벤트가 하였지만, create()는 onNext, onComplete, onError 알림을 개발자가 직접 호출해야 한다.

 Observable.create<String> {
        it.onNext("L" as String)
        it.onNext("S" as String)
        it.onNext("G" as String)
    }.subscribe { data-> println(data) }

실행결과:

L
S
G
Observable.create<Int> {
        it.onNext(100)
        it.onNext(200)
        it.onNext(300)
        it.onComplete()
    }.subscribe(
        { println("$it") },
        { println("onError") }, // onError: Consumer
        { println("onComplete") }, // onComplete: Consumer
        { println("onSubscribe") } // onSubscribe: Consumer
    )

실행결과:

onSubscribe
100
200
300
onComplete
    val observer = object : Observer<Int> {
        override fun onSubscribe(p0: Disposable) {
            println("onSubscribe ")
        }

        override fun onNext(p0: Int) {
            println("onNext:${p0} ")
        }

        override fun onError(p0: Throwable) {
            print(p0)
        }

        override fun onComplete() {
            println("완료")
        }
    }
    Observable.create<Int> {
        it.onNext(100)
        it.onNext(200)
        it.onNext(300)
        it.onComplete()
    }.subscribe(observer)

실행결과:

onSubscribe 
onNext:100 
onNext:200 
onNext:300 
onComplete

create() 특징

☞ Observable이 subscribe를 해지하면 등록된 모든 콜백을 해제해야 한다. (메모리 누수 방지)

☞ subscribe하는 동안에만 onNext(), onComplete()를 호출해야 한다.

☞ onError()를 통해서만 에러를 전달해야 한다.

☞ 배압을 직접 처리해야 한다.