생산자와 소비자
생산자 : 데이터를 생산하고 발행하는 역할
소비자 : 데이터를 사용하는 주체로, 등록 방법은 크게 Observer, Consumer 방식이 존재한다.
☞ Observer 방식 : Observer 인터페이스를 구현한 객체를 subscribe()을 이용해 소비자를 추가한다.(return type - Unit)
// Observer 방식
val observer = object : Observer<Int>{
override fun onSubscribe(p0: Disposable) {
print("onSubscribe ")
}
override fun onNext(p0: Int) {
print("onNext:${p0} ")
}
override fun onError(p0: Throwable) {
print(p0)
}
override fun onComplete() {
print("onComplete")
}
}
Observable.just(4, 5, 6).subscribe(observer)
☞ Consumer 방식 : 보통 자주 보이는 방식으로, 각각의 Consumer를 subscribe()을 이용해 소비자를 추가한다. (return type - Disposable)
// consumer 방식
val disposable = Observable.just(1,2,3).subscribe(
{ println("$it") },
{ println("onError") }, // onError: Consumer
{ println("onComplete") }, // onComplete: Consumer
{ println("onSubscribe") } // onSubscribe: Consumer
)
Observable
Observable은 데이터를 전달하는 생산자 역할을 한다.
RxJava에선 Observable을 구독하는 observer가 존재하며, Observable이 순차적으로 발행하는 데이터에 대해서 반응하게 된다.
Observable 이벤트
☞ onNext() : Observable에서 Observer까지 한 번에 하나씩 데이터를 발행한다.
☞ onComplete() : 데이터 발행이 끝나게되면 완료 이벤트를 Observer에 전달하여 발행 작업이 없음을 알린다.
☞ onError() : 오류가 발생했음을 Observer에 전달한다.
Observable 생성
RxJava는 Operator(연산자)를 통해 Observable을 생성한다.
Operator(연산자) 종류 : just(), onCreate(), fromArray(), fromIterable(), fromFuture(), fromPublisher(), fromCallable()
just()
Observable을 간단하게 생성할 수 있는 함수로, 같은 타입의 최대 10개의 데이터를 차례로 발행할 수 있으며 실제 발행은 subscribe() 함수를 호출해야 된다.
fun main() {
Observable.just(1,2,4,5,6).subscribe(System.out::println)
}
실행결과:
1
2
4
5
6
위 코드처럼 just()를 이용해 데이터 1,2,3,4,5,6를 생성하고 발행했다
subscribe()
RxJava에서 실행되기 위해 사용하는 함수는 subscribe() 함수이며, just 등으로 데이터 흐름을 정한 후 subscribe() 함수를 호출하여 실제로 데이터를 발행한다. (return type - Disposable 인터페이스 객체)
Disposable 인터페이스 - dispose(), isDisposed 두 가지만 있다.
dipose() : Observable에게 더이상 데이터를 발행하지 않도록 subscribe를 해지한다. 하지만 Observable이 OnComplete 알림을 보내면 dispose() 호출되기 때문에 별도로 dispose()를 호출할 필요가 없다.
isDisposed() : Observable이 subscribe를 해지했는지를 확인하는 함수다.
create()
just()의 경우엔 자동으로 알림 이벤트가 하였지만, create()는 onNext, onComplete, onError 알림을 개발자가 직접 호출해야 한다.
Observable.create<String> {
it.onNext("L" as String)
it.onNext("S" as String)
it.onNext("G" as String)
}.subscribe { data-> println(data) }
실행결과:
L
S
G
Observable.create<Int> {
it.onNext(100)
it.onNext(200)
it.onNext(300)
it.onComplete()
}.subscribe(
{ println("$it") },
{ println("onError") }, // onError: Consumer
{ println("onComplete") }, // onComplete: Consumer
{ println("onSubscribe") } // onSubscribe: Consumer
)
실행결과:
onSubscribe
100
200
300
onComplete
val observer = object : Observer<Int> {
override fun onSubscribe(p0: Disposable) {
println("onSubscribe ")
}
override fun onNext(p0: Int) {
println("onNext:${p0} ")
}
override fun onError(p0: Throwable) {
print(p0)
}
override fun onComplete() {
println("완료")
}
}
Observable.create<Int> {
it.onNext(100)
it.onNext(200)
it.onNext(300)
it.onComplete()
}.subscribe(observer)
실행결과:
onSubscribe
onNext:100
onNext:200
onNext:300
onComplete
create() 특징
☞ Observable이 subscribe를 해지하면 등록된 모든 콜백을 해제해야 한다. (메모리 누수 방지)
☞ subscribe하는 동안에만 onNext(), onComplete()를 호출해야 한다.
☞ onError()를 통해서만 에러를 전달해야 한다.
☞ 배압을 직접 처리해야 한다.
'Rx' 카테고리의 다른 글
RxKotlin- Part 5. Hot Observable, Cold Observable (0) | 2021.02.10 |
---|---|
RxKotlin- Part 4. Observable 외 다른 스트림 (Single , Maybe) (0) | 2021.02.04 |
RxKotlin- Part 3. Observable 생성 (fromFuture, fromPublisher) (0) | 2021.01.31 |
RxKotlin- Part 2. Observable 생성 (fromArray(), fromIterable()) (0) | 2021.01.31 |
RxJava, RxKotlin, RxAndroid 이해하기 (0) | 2021.01.31 |