본문 바로가기

Coroutine

Coroutines - Part. 8 Asynchronous Flow

728x90

Asynchronous Flow(비동기 흐름)

일시 중단 함수는 비동기적으로 단일 값을 반환하지만 계산된 여러 값을 반환 하려면 어떻게 해야할까요? Kotlin Flows를 사용하면 됩니다.

Representing multiple values(여러 값 표시)

collection을 사용해 Kotlin에서 여러 값을 나타내는 방법이 있습니다. 예를 들어, mCoroutine() 함수의 반환 값인 List에서 forEach를 사용해 모두 출력시키는 방법을 사용할 수 있습니다.

fun mCoroutine(): List<Int> = listOf(1, 2, 3)

fun main() {
    mCoroutine().forEach { value -> println(value) } 
}

실행결과:

1
2
3

Sequences

CPU를 사용하는 blocking code(각 계산에 100ms가 소요)로 숫자를 계산하는 경우, Sequence를 사용하여 숫자를 나타낼 수 있습니다.

fun mCoroutine(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i)
        /**
         *     yield() : 함수의 코드가 실행한 결과를 반환 합니다.
         *     해당 위치에서 코루틴을 일시중단 합니다. 이때 코루틴의
         *     취소 여부를 확인하고 해당 위치에서 코루틴 취소가 가능합니다.
         */
    }
}

fun main() {
    mCoroutine().forEach { value -> println(value) }
}

실행결과:

1
2
3

Suspending functions

위 코드는 계산하는 동안 main 스레드를 차단하게 됩니다. 이런 경우 비동기 코드로 계산을 진행하려면 mCoroutine() 함수에 suspend 한정자를 붙여 main 스레드를 차단하지 않고 작업을 수행할 수 있고, 결과를 List로 반환할 수 있습니다.

suspend fun mCoroutine(): List<Int> {
    delay(1000)
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    mCoroutine().forEach { value -> println(value) }
}

실행결과:

1
2
3

Flows

List 타입의 결과를 사용하면 한번에 모든 값을 반환할 수 있다는 것을 알고있습니다. 비동기적으로 계산되는 값을 stream을 나타내기 위해서 동기적으로 계산된 값에 Sequence 유형을 사용하는 것처럼 Flow를 사용할 수 있습니다.

fun mCoroutine(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(1000)
        emit(i) // 다음 값 내보내기
    }
}

fun main() = runBlocking<Unit> {
    // 동시에 코루틴을 사용해 메인 스레드가 차단된 상태인지 확인하기위함
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(1000)
        }
    }
    // Collect the flow
    mCoroutine().collect { value -> println(value) }
}

실행결과:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

위 코드는 Main 스레드를 차단하지 않고 각 숫자를 1초를 기다렸다가 출력합니다. Main 스레드에서 실행되는 별도의 코루틴에선 1초를 기다렸다가 I'm not blocked가 출력되도록 했습니다.

delayThread.sleep으로 대체하여 실행하면 결과는 달라진다.

1
2
3
I'm not blocked 1
I'm not blocked 2
I'm not blocked 3

Main Thread가 차단되어 결과가 다른 것인데, 여기서 의문이 생겼습니다. 비동기적으로 실행하기 위해선 suspend 한정자가 필요한 것으로 알고있었는데... 뭐지?

더 자세히 알아볼 필요가 있을것 같다.

Flow 타입에 대한 builder 함수를 flow라고 한다.

flow {...}의 builder 블록은 suspend를 사용할 수 있다.

mCoroutine() 함수는 suspend를 표시하지 않았다.

emit 함수를 사용해 flow에서 값이 방출된다.

collect 함수를 사용해 flow에서 값을 수집할 수 있다.

Flows are cold

Flowssequences와 유사한 cold streams 입니다. flow builder 내부의 코드는 flow가 수집될 때까지 실행되지 않습니다.

import  kotlinx.coroutines. *
import kotlinx.coroutines.flow.*

fun mCoroutine(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling mCoroutine...")
    val flow = mCoroutine()
    println("Calling collect...")
    flow.collect { value -> println(value) }
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}

실행결과:

Calling mCoroutine...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

위 코드는 mCoroutine() 함수에 suspend 한정자를 표시하지 않는 중요한 이유를 보여주게 됩니다. mCoroutine() 함수는 빠르게 반환되어 아무것도 기다리지 않습니다. 이 flow는 수집될 때 마다 시작되기 때문에 다시 collect를 호출할 때 Flow started가 출력됩니다.

Flow cancellation

Flow는 코루틴의 일반적인 취소 메커니즘을 따릅니다. 취소 가능한 일시 중단 기능(예: delay)에서 flow가 일시 중단되면 flowcollect 함수를 취소할 수 있습니다.

withTimeoutOrNull 블록에서 실행될 떄 시간 초과시 흐름이 취소되고 해당 코드 실행을 중지하는 방법을 보여줍니다.

import  kotlinx.coroutines. *
import kotlinx.coroutines.flow.*

fun mCoroutine(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    val flow = mCoroutine()
    withTimeoutOrNull(200){
        println("Calling collect...")
        flow.collect { value -> println(value) }
    }
    println("Done")
}

실행결과:

Calling collect...
Flow started
1
Done

위 코드의 실행결과를 보면 200ms가 지난 후 취소되어 1만 반환하는 것을 확인할 수 있습니다.

Flow builders

flow {...} builder는 가장 기본적인 빌더입니다. 쉬운 flow 선언을 위한 다른 builder를 알아보겠습니다.

고정된 값 집합을 방출하는 flow 를 정의하는 flowOf 빌더가 있습니다.

.asFlow() 익스텐션 함수를 사용해 다양한 collectionsequenceflow로 변환이 가능합니다.

fun main() = runBlocking {
    (1..5).asFlow().collect { value -> println(value) }
}

실행결과:

1
2
3
4
5

flow에서 1부터 5까지 출력하는 결과를 확인할 수 있습니다.

Intermediate flow operators

컬렉션시퀀스와 같이 연산자를 통해 Flow를 변환할 수 있습니다. 중간 연산자는 upstream flow에 적용되어 downstream flow를 반환합니다.

기본 연산자에는 mapfilter와 같은 친숙한 이름이 있습니다. 시퀀스의 중요한 차이점은 이러한 연산자 내부의 코드 블록이 일시 중단 함수를 호출할 수 있다는 것입니다.

시퀀스와 중요한 차이점은 이런 연산자 내부의 코드 블록이 suspend 함수를 호출할 수 있다는 것 입니다.

예를들어, 요청을 수행하는 것이 일시 중단 기능에 의해 구현되는 장기 실행 작업인 경우에도 map 연산자를 사용해 수신 요청의 flow를 결과에 매핑할 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
suspend fun performRequest(request: Int): String {
    delay(1000) // 장기간의 비동기 작업
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()     // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//sampleEnd

실행결과 : 

response 1
response 2
response 3

Transform operator

flow의 transformation 연산자 중에서 가장 일반적인 부분이 transform(변환)입니다. map이나 filter와 같은 단순한 변환부터 복잡한 변환을 구현하는데 사용됩니다. transform 연산자를 사용해 임의의 값을 임의의 횟수로 방출할 수 있습니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
suspend fun performRequest(request: String): String {
    delay(1000) // 장기간의 비동기 작업
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()     // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request.toString()))
        }
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//sampleEnd

실행결과 : 

response Making request 1
response response 1
response Making request 2
response response 2
response Making request 3
response response 3

크기 제한 연산자

take와 같은 크기 제한 연산자는 해당 제한에 도달하면 flow 실행을 취소합니다.

코루틴의 취소는 항상 예외를 throw하여 수행되므로 취소시 모든 리소스 관리 함수(예 :try { ... } finally { ... } )가 정상적으로 작동합니다.

아래 예제를 보면 알 수 있듯이 두 번째 숫자를 내보낸 후 중지됩니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//크기 제한 연산자
fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("이 줄은 실행되지 않습니다.")
        emit(3)
    } finally {
        println("마지막 숫자")
    }
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2) // 처음 2개의 값만 가져옵니다
        .collect { value -> println(value) }
}

실행결과 : 

1
2
마지막 숫자

Terminal flow 연산자

flow의 터미널 연산자는 flow 수집을 시작하는 suspending 함수입니다. collect는 가장 기본적이지만, 다른 터미널 연산자를 활용해 더 쉽게 만들 수 있습니다.

  • toListtoSet과 같은 다양한 컬렉션으로 변환이 가능합니다.
  • 첫 번째 값을 얻고flow가 단일 값을 방출하는지 확인하는 연산자 입니다.
  • reducefold로 flow를 값으로 Reducing 할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    val sum = (1..3).asFlow()
        .map { it * it } // 1 * 1, 2 * 2, 3 * 3 새로운 Flow 생성
        .reduce { a, b -> a + b } // 합치기 (terminal 연산자)
    println(sum)
}

실행결과 : 

14

순차적인 flow

여러 개의flow에서 작동하는 특수한 연산자를 사용하지 않는 한 순차적으로 수행됩니다. collection은 terminal 연산자를 호출하는 코루틴에서 직접 작동합니다. 기본적으로 새로운 코루틴은launch 되지 않습니다. 각각의 방출된 값은 upstream에서 downstream까지 모든 중간 연산자에 의해 처리된 후 terminal 연산자에게 전달됩니다.

짝수 정수를 필터링하여 문자열에 매핑하는 다음 예제를 확인해보세요.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    (1..5).asFlow()
        .filter {
            it % 2 == 0
        }
        .map {
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

실행결과 : 

Collect string 2
Collect string 4