withContext
Suspend Function이며, 내부 동작하는 스레드를 강제 지정할 수 있습니다.
withContext의 잘못된 사용법에 대해 알아보겠습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
log("Started foo flow")
withContext(Dispatchers.IO){
for (i in 1..3) {
emit(i)
}
}
}
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
실행결과 :
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@205eac5, BlockingEventLoop@6b6b4017],
but emission happened in [DispatchedCoroutine{Active}@673c025, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead....
flow로 만들어진 collection을 호출한 caller
의 coroutine context에서 수행됩니다. 즉, context preservation
에 의해 emit()을 하는 context 부분만 다르게 변경할 수 없게 됩니다.
즉, caller의 coroutine context를 withContext()
으로 변경할 수만 있습니다.
만약 emit()을 하는 context를 변경하고 싶다면, 위 코드의 실행결과의 마지막 줄에서 언습하는 flowOn
연산자를 사용하면 됩니다.
flowOn
flow가 실행되는 context를 지정된 context로 변경합니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun foo(): Flow<Int> = flow {
log("Started foo flow")
for (i in 1..3) {
emit(i)
}
}.flowOn(Dispatchers.Default)
fun main() = runBlocking<Unit> {
foo().collect { value -> log("Collected $value") }
}
실행결과 :
[DefaultDispatcher-worker-1] Started foo flow
[main] Collected 1
[main] Collected 2
[main] Collected 3
Conflation
flow
는 연속적으로 값을 처리해 emit(방출) 하는데, flow
에서 방출된 값이 연산의 중간값 또는 상태값의 업데이트인 경우, 마지막 값만 의미한다고 보면 됩니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.lang.Exception
fun foo() : Flow<Int> = flow {
for (i in 0 until 5){
delay(100)
emit(i)
}
}
suspend fun main() = coroutineScope {
foo()
.conflate()
.collect {
try {
delay(300) // 200인 경우 0 1 3 4만 출력됩니다.
println(it)
}catch (e: Exception){
println(e)
}
}
}
실행결과 :
0
2
4
처음 conflate()
가 마지막 값만 출력한다고 이해했었는데, 예제를 확인하면서 아니라는 것을 알게되었습니다.
collector
의 동작이 느린 경우 conflate
연산자를 사용해 중간값을 skip합니다. 즉, collector
가 값을 처리하는 도중 emit
되어 쌓여있는 중간 값을 버리고 마지막 값만 취한다는 것을 알 수 있습니다.
물론 다르겠지만, Conflate
연산자가 배압을 조절해주는 RxJava의 Flowable
와 비슷한 출력을 하는 것 같다고 느꼈습니다.
CollectLatest
Conflation
은 방출하는 작업과 수집 작업이 느린 경우, 방출되는 값을 skip하여 처리 속도를 높이는 하나의 방법이지만, CollectLastest
는 느린 수집 작업이 느린 경우, 작업을 취소하고 새로운 값이 나올때 다시 작업을 시작하는 방법입니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.lang.Exception
fun foo() : Flow<Int> = flow {
for (i in 0 until 5){
delay(100)
emit(i)
}
}
suspend fun main() = coroutineScope {
foo()
.collectLatest {
try {
delay(300)
println(it)
}catch (e: Exception){
println(e)
}
}
}
실행결과 :
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
4
Exception을 통해 알 수 있듯이 작업이 취소되고 다시 작업을 수행합니다.
Conflation
과 차이가 있다고 느낀 것이 Conflation
은 0을 처리하는 중에 1, 2가 방출되면 2를 처리하는 방식이었다면 CollectLatest
는 0을 처리할 때 1이 방출되면 작업을 취소하고 1을 작업하고, 2가 방출되면 1을 취소하고 2를 방출하는 방식입니다.
multiple flows 구성
Zip
두 flow의 값을 결합하기 위해 사용할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
val numInt = (1..3).asFlow()
val numString = flowOf("one", "two", "three")
numInt.zip(numString) { a, b -> "$a -> $b" }
.collect { println(it) }
}
실행결과 :
1 -> one
2 -> two
3 -> three
Combine
flow
가 연산의 가장 최근 값을 타내는 경우 flow의 가장 최신 값에 따라 달라지는 계산을 수행하고 upstream flow
가 값을 방출할 때 마다 이를 다시 계산해야할 때 사용할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
val numInt = (1..3).asFlow().onEach { delay(300) }
val numString = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
numInt.combine(numString) { a, b -> "$a -> $b\n" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
실행결과 :
1 -> one
at 461 ms from start
2 -> one
at 665 ms from start
2 -> two
at 871 ms from start
3 -> two
at 965 ms from start
3 -> three
at 1276 ms from start
Combine
예제의 실행결과를 보면 두 flow
에서 새로운 값이 방출될 때 마다 계산한 값이 출력됩니다.
Zip
예제에서 두 flow
의 방출에 각각 100ms, 1000ms 주고, 두 flow
의 크기가 달라도 출력은 동일합니다.
'Coroutine' 카테고리의 다른 글
📌Coroutine(코루틴) 실습 : viewModelScope, launch, suspend, join (0) | 2022.04.08 |
---|---|
👋Coroutine : 1. Coroutine이란 (0) | 2022.03.30 |
Coroutines - Part. 8 Asynchronous Flow (0) | 2021.02.05 |
Coroutines - Part. 7 Debugging Coroutine and Thread (0) | 2021.01.30 |
Coroutines - Part. 6 CoroutineContext 및 Dispatchers (0) | 2021.01.30 |