Asynchronous Flow(비동기 흐름)
일시 중단 함수는 비동기적으로 단일 값을 반환하지만 계산된 여러 값을 반환 하려면 어떻게 해야할까요? Kotlin Flows를 사용하면 됩니다.
Representing multiple values(여러 값 표시)
collection
을 사용해 Kotlin에서 여러 값을 나타내는 방법이 있습니다. 예를 들어, mCoroutine()
함수의 반환 값인 List에서 forEach
를 사용해 모두 출력시키는 방법을 사용할 수 있습니다.
fun mCoroutine(): List<Int> = listOf(1, 2, 3)
fun main() {
mCoroutine().forEach { value -> println(value) }
}
실행결과:
1
2
3
Sequences
CPU를 사용하는 blocking code(각 계산에 100ms가 소요)로 숫자를 계산하는 경우, Sequence
를 사용하여 숫자를 나타낼 수 있습니다.
fun mCoroutine(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i)
/**
* yield() : 함수의 코드가 실행한 결과를 반환 합니다.
* 해당 위치에서 코루틴을 일시중단 합니다. 이때 코루틴의
* 취소 여부를 확인하고 해당 위치에서 코루틴 취소가 가능합니다.
*/
}
}
fun main() {
mCoroutine().forEach { value -> println(value) }
}
실행결과:
1
2
3
Suspending functions
위 코드는 계산하는 동안 main 스레드를 차단하게 됩니다. 이런 경우 비동기 코드로 계산을 진행하려면 mCoroutine()
함수에 suspend
한정자를 붙여 main 스레드를 차단하지 않고 작업을 수행할 수 있고, 결과를 List
로 반환할 수 있습니다.
suspend fun mCoroutine(): List<Int> {
delay(1000)
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
mCoroutine().forEach { value -> println(value) }
}
실행결과:
1
2
3
Flows
List
타입의 결과를 사용하면 한번에 모든 값을 반환할 수 있다는 것을 알고있습니다. 비동기적으로 계산되는 값을 stream을 나타내기 위해서 동기적으로 계산된 값에 Sequence
유형을 사용하는 것처럼 Flow를 사용할 수 있습니다.
fun mCoroutine(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(1000)
emit(i) // 다음 값 내보내기
}
}
fun main() = runBlocking<Unit> {
// 동시에 코루틴을 사용해 메인 스레드가 차단된 상태인지 확인하기위함
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(1000)
}
}
// Collect the flow
mCoroutine().collect { value -> println(value) }
}
실행결과:
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
위 코드는 Main 스레드를 차단하지 않고 각 숫자를 1초를 기다렸다가 출력합니다. Main 스레드에서 실행되는 별도의 코루틴에선 1초를 기다렸다가 I'm not blocked가 출력되도록 했습니다.
delay
를 Thread.sleep
으로 대체하여 실행하면 결과는 달라진다.
1
2
3
I'm not blocked 1
I'm not blocked 2
I'm not blocked 3
Main Thread가 차단되어 결과가 다른 것인데, 여기서 의문이 생겼습니다. 비동기적으로 실행하기 위해선 suspend
한정자가 필요한 것으로 알고있었는데... 뭐지?
더 자세히 알아볼 필요가 있을것 같다.
Flow
타입에 대한 builder 함수를 flow
라고 한다.
flow {...}
의 builder 블록은 suspend
를 사용할 수 있다.
mCoroutine()
함수는 suspend
를 표시하지 않았다.
emit
함수를 사용해 flow
에서 값이 방출된다.
collect
함수를 사용해 flow
에서 값을 수집할 수 있다.
Flows are cold
Flows
는 sequences
와 유사한 cold streams 입니다. flow builder 내부의 코드는 flow
가 수집될 때까지 실행되지 않습니다.
import kotlinx.coroutines. *
import kotlinx.coroutines.flow.*
fun mCoroutine(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling mCoroutine...")
val flow = mCoroutine()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
실행결과:
Calling mCoroutine...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
위 코드는 mCoroutine()
함수에 suspend
한정자를 표시하지 않는 중요한 이유를 보여주게 됩니다. mCoroutine()
함수는 빠르게 반환되어 아무것도 기다리지 않습니다. 이 flow
는 수집될 때 마다 시작되기 때문에 다시 collect
를 호출할 때 Flow started가 출력됩니다.
Flow cancellation
Flow
는 코루틴의 일반적인 취소 메커니즘을 따릅니다. 취소 가능한 일시 중단 기능(예: delay)에서 flow가 일시 중단되면 flow
의 collect
함수를 취소할 수 있습니다.
withTimeoutOrNull
블록에서 실행될 떄 시간 초과시 흐름이 취소되고 해당 코드 실행을 중지하는 방법을 보여줍니다.
import kotlinx.coroutines. *
import kotlinx.coroutines.flow.*
fun mCoroutine(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
val flow = mCoroutine()
withTimeoutOrNull(200){
println("Calling collect...")
flow.collect { value -> println(value) }
}
println("Done")
}
실행결과:
Calling collect...
Flow started
1
Done
위 코드의 실행결과를 보면 200ms가 지난 후 취소되어 1만 반환하는 것을 확인할 수 있습니다.
Flow builders
flow {...} builder
는 가장 기본적인 빌더입니다. 쉬운 flow 선언을 위한 다른 builder
를 알아보겠습니다.
고정된 값 집합을 방출하는 flow 를 정의하는 flowOf
빌더가 있습니다.
.asFlow()
익스텐션 함수를 사용해 다양한 collection
과 sequence
를 flow
로 변환이 가능합니다.
fun main() = runBlocking {
(1..5).asFlow().collect { value -> println(value) }
}
실행결과:
1
2
3
4
5
flow에서 1부터 5까지 출력하는 결과를 확인할 수 있습니다.
Intermediate flow operators
컬렉션과 시퀀스와 같이 연산자를 통해 Flow
를 변환할 수 있습니다. 중간 연산자는 upstream flow에 적용되어 downstream flow를 반환합니다.
기본 연산자에는 map
및 filter
와 같은 친숙한 이름이 있습니다. 시퀀스의 중요한 차이점은 이러한 연산자 내부의 코드 블록이 일시 중단 함수를 호출할 수 있다는 것입니다.
시퀀스와 중요한 차이점은 이런 연산자 내부의 코드 블록이 suspend
함수를 호출할 수 있다는 것 입니다.
예를들어, 요청을 수행하는 것이 일시 중단 기능에 의해 구현되는 장기 실행 작업인 경우에도 map
연산자를 사용해 수신 요청의 flow
를 결과에 매핑할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
suspend fun performRequest(request: Int): String {
delay(1000) // 장기간의 비동기 작업
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
//sampleEnd
실행결과 :
response 1
response 2
response 3
Transform operator
flow
의 transformation 연산자 중에서 가장 일반적인 부분이 transform
(변환)입니다. map
이나 filter
와 같은 단순한 변환부터 복잡한 변환을 구현하는데 사용됩니다. transform
연산자를 사용해 임의의 값을 임의의 횟수로 방출할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//sampleStart
suspend fun performRequest(request: String): String {
delay(1000) // 장기간의 비동기 작업
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request.toString()))
}
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
//sampleEnd
실행결과 :
response Making request 1
response response 1
response Making request 2
response response 2
response Making request 3
response response 3
크기 제한 연산자
take
와 같은 크기 제한 연산자는 해당 제한에 도달하면 flow 실행을 취소합니다.
코루틴의 취소는 항상 예외를 throw하여 수행되므로 취소시 모든 리소스 관리 함수(예 :try { ... } finally { ... } )가 정상적으로 작동합니다.
아래 예제를 보면 알 수 있듯이 두 번째 숫자를 내보낸 후 중지됩니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
//크기 제한 연산자
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("이 줄은 실행되지 않습니다.")
emit(3)
} finally {
println("마지막 숫자")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 처음 2개의 값만 가져옵니다
.collect { value -> println(value) }
}
실행결과 :
1
2
마지막 숫자
Terminal flow 연산자
flow의 터미널 연산자는 flow 수집을 시작하는 suspending 함수입니다. collect는 가장 기본적이지만, 다른 터미널 연산자를 활용해 더 쉽게 만들 수 있습니다.
toList
및toSet
과 같은 다양한 컬렉션으로 변환이 가능합니다.- 첫 번째 값을 얻고flow가 단일 값을 방출하는지 확인하는 연산자 입니다.
reduce
및fold
로 flow를 값으로 Reducing 할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..3).asFlow()
.map { it * it } // 1 * 1, 2 * 2, 3 * 3 새로운 Flow 생성
.reduce { a, b -> a + b } // 합치기 (terminal 연산자)
println(sum)
}
실행결과 :
14
순차적인 flow
여러 개의flow
에서 작동하는 특수한 연산자를 사용하지 않는 한 순차적으로 수행됩니다. collection은 terminal 연산자를 호출하는 코루틴에서 직접 작동합니다. 기본적으로 새로운 코루틴은launch
되지 않습니다. 각각의 방출된 값은 upstream에서 downstream까지 모든 중간 연산자에 의해 처리된 후 terminal 연산자에게 전달됩니다.
짝수 정수를 필터링하여 문자열에 매핑하는 다음 예제를 확인해보세요.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
it % 2 == 0
}
.map {
"string $it"
}.collect {
println("Collect $it")
}
}
실행결과 :
Collect string 2
Collect string 4
'Coroutine' 카테고리의 다른 글
👋Coroutine : 1. Coroutine이란 (0) | 2022.03.30 |
---|---|
Coroutines - Part. 9 Asynchronous Flow (0) | 2021.05.04 |
Coroutines - Part. 7 Debugging Coroutine and Thread (0) | 2021.01.30 |
Coroutines - Part. 6 CoroutineContext 및 Dispatchers (0) | 2021.01.30 |
Coroutines - Part. 5 안드로이드에서의 코루틴 - 동기식과 비동기식, 콜백과 코루틴 (0) | 2021.01.30 |