본문 바로가기

Coroutine

Coroutines - Part. 9 Asynchronous Flow

728x90

withContext

Suspend Function이며, 내부 동작하는 스레드를 강제 지정할 수 있습니다.

withContext의 잘못된 사용법에 대해 알아보겠습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    withContext(Dispatchers.IO){
        for (i in 1..3) {
            emit(i)
        }
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value") }
}

실행결과 :

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [BlockingCoroutine{Active}@205eac5, BlockingEventLoop@6b6b4017],
        but emission happened in [DispatchedCoroutine{Active}@673c025, Dispatchers.IO].
        Please refer to 'flow' documentation or use 'flowOn' instead....

flow로 만들어진 collection을 호출한 caller의 coroutine context에서 수행됩니다. 즉, context preservation에 의해 emit()을 하는 context 부분만 다르게 변경할 수 없게 됩니다.

즉, caller의 coroutine context를 withContext()으로 변경할 수만 있습니다.

만약 emit()을 하는 context를 변경하고 싶다면, 위 코드의 실행결과의 마지막 줄에서 언습하는 flowOn 연산자를 사용하면 됩니다.

 

flowOn

flow가 실행되는 context를 지정된 context로 변경합니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun foo(): Flow<Int> = flow {
    log("Started foo flow")

    for (i in 1..3) {
        emit(i)
    }

}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    foo().collect { value -> log("Collected $value") }
}

실행결과 : 

[DefaultDispatcher-worker-1] Started foo flow
[main] Collected 1
[main] Collected 2
[main] Collected 3

 

Conflation

flow는 연속적으로 값을 처리해 emit(방출) 하는데, flow에서 방출된 값이 연산의 중간값 또는 상태값의 업데이트인 경우, 마지막 값만 의미한다고 보면 됩니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.lang.Exception

fun foo() : Flow<Int> = flow {
    for (i in 0 until 5){
        delay(100)
        emit(i)
    }
}
suspend fun main() = coroutineScope {
    foo()
        .conflate()
        .collect {
        try {
            delay(300)    // 200인 경우 0 1 3 4만 출력됩니다.
            println(it)
        }catch (e: Exception){
            println(e)
        }
    }
}

실행결과 : 

0
2
4

처음 conflate()가 마지막 값만 출력한다고 이해했었는데, 예제를 확인하면서 아니라는 것을 알게되었습니다.

collector의 동작이 느린 경우 conflate 연산자를 사용해 중간값을 skip합니다. 즉, collector가 값을 처리하는 도중 emit되어 쌓여있는 중간 값을 버리고 마지막 값만 취한다는 것을 알 수 있습니다.

물론 다르겠지만, Conflate 연산자가 배압을 조절해주는 RxJava의 Flowable와 비슷한 출력을 하는 것 같다고 느꼈습니다.

 

CollectLatest

Conflation은 방출하는 작업과 수집 작업이 느린 경우, 방출되는 값을 skip하여 처리 속도를 높이는 하나의 방법이지만, CollectLastest는 느린 수집 작업이 느린 경우, 작업을 취소하고 새로운 값이 나올때 다시 작업을 시작하는 방법입니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.lang.Exception

fun foo() : Flow<Int> = flow {
    for (i in 0 until 5){
        delay(100)
        emit(i)
    }
}
suspend fun main() = coroutineScope {
    foo()
        .collectLatest {
        try {
            delay(300)
            println(it)
        }catch (e: Exception){
            println(e)
        }
    }
}

실행결과 : 

kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
4

Exception을 통해 알 수 있듯이 작업이 취소되고 다시 작업을 수행합니다.

Conflation과 차이가 있다고 느낀 것이 Conflation은 0을 처리하는 중에 1, 2가 방출되면 2를 처리하는 방식이었다면 CollectLatest는 0을 처리할 때 1이 방출되면 작업을 취소하고 1을 작업하고, 2가 방출되면 1을 취소하고 2를 방출하는 방식입니다.


 

multiple flows 구성

Zip

두 flow의 값을 결합하기 위해 사용할 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() = coroutineScope {
    val numInt = (1..3).asFlow()
    val numString = flowOf("one", "two", "three")

    numInt.zip(numString) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

실행결과 :

1 -> one
2 -> two
3 -> three

 

Combine

flow가 연산의 가장 최근 값을 타내는 경우 flow의 가장 최신 값에 따라 달라지는 계산을 수행하고 upstream flow가 값을 방출할 때 마다 이를 다시 계산해야할 때 사용할 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun main() = coroutineScope {
    val numInt = (1..3).asFlow().onEach { delay(300) }
    val numString = flowOf("one", "two", "three").onEach { delay(400) }

    val startTime = System.currentTimeMillis()
    numInt.combine(numString) { a, b -> "$a -> $b\n" }
        .collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

실행결과 :

1 -> one
 at 461 ms from start
2 -> one
 at 665 ms from start
2 -> two
 at 871 ms from start
3 -> two
 at 965 ms from start
3 -> three
 at 1276 ms from start

Combine 예제의 실행결과를 보면 두 flow에서 새로운 값이 방출될 때 마다 계산한 값이 출력됩니다.

Zip 예제에서 두 flow의 방출에 각각 100ms, 1000ms 주고, 두 flow의 크기가 달라도 출력은 동일합니다.