728x90
fromFuture()
fromFuture() 연산자는 Future Interface를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 메서드를 호출한 값을 반환한다.
이 Future Interface는 비동기적인 작업의 결과를 구하는 경우에 사용되는데, 주로 Executor Serviece를 통하여 비동기작업을 할시에 사용된다.
Emitter는 Observable 내부에서 Future.get() 메서드를 호출시키고, Future의 작업을 마칠 때까지 블로킹된다.
Executor Serviece
Executors는 ExecutorService 객체를 생성하며, 다음 메소드를 제공하여 쓰레드 풀의 개수 및 종류를 정할 수 있으며, Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future 객체를 반환한다.
Emitter
RxJava의 emitter는 ObservableEmitter 인터페이스의 매개변수이며, onNext(), onError(), onComplete() 메서드를 호출할 수 있다.
import io.reactivex.Observable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class Part3 {
public static void main(String[] args){
Future<String> future = Executors.newSingleThreadExecutor().submit(()->{
return "Future";
});
Observable<String> source = Observable.fromFuture(future);
source.subscribe(System.out::println);
}
}
import io.reactivex.Observable
import java.util.concurrent.Executors
fun main(){
val future = Executors.newSingleThreadExecutor().submit<String> {
"Future"
}
val source = Observable.fromFuture(future)
source.subscribe(System.out::println)
}
fromPubilsher() 함수
Publisher는 잠재적인 아이템 발행을 제공하는 생산자 역할을 하며 Subsciber로 부터 요청을 받아 아이템을 발행한다. fromPublisher() 연산자는 Publisher를 Observable로 반환한다.
import io.reactivex.Observable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
class Part3 {
public static void main(String[] args){
Publisher<String> publisher = new Publisher<String>(){
@Override
public void subscribe(Subscriber<? super String> s){
s.onNext("Observable.fromPublisher"); s.onComplete();
}
};
Observable<String> source = Observable.fromPublisher(publisher);
source.subscribe(System.out::println);
}
}
import io.reactivex.Observable
import org.reactivestreams.Publisher
fun main(){
val publisher = Publisher<String> {
it.onNext("Observable.fromPublisher")
it.onComplete()
}
val source = Observable.fromPublisher<String>(publisher)
source.subscribe(System.out::println)
}
'Rx' 카테고리의 다른 글
RxKotlin- Part 5. Hot Observable, Cold Observable (0) | 2021.02.10 |
---|---|
RxKotlin- Part 4. Observable 외 다른 스트림 (Single , Maybe) (0) | 2021.02.04 |
RxKotlin- Part 2. Observable 생성 (fromArray(), fromIterable()) (0) | 2021.01.31 |
RxKotlin - Part 1. Observable 생성 - just(), onCreate() (0) | 2021.01.31 |
RxJava, RxKotlin, RxAndroid 이해하기 (0) | 2021.01.31 |