•
일시 중단 함수는 비동기적으로 단일 값을 반환한다.
•
하지만 여러 개의 비동기 계산 값을 반환하려면 어떻게 해야 할까?
비동기 Flow
여러 값 표현하기
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
// 출력:
// 1
// 2
// 3
Kotlin
복사
•
Kotlin에서는 여러 값을 컬렉션으로 표현할 수 있다.
•
예를 들어, 위 코드는 세 개의 숫자를 반환하고 이를 forEach로 모두 출력하는 간단함 함수이다.
시퀀스
fun simple(): Sequence<Int> = sequence { // 시퀀스 빌더
for (i in 1..3) {
Thread.sleep(100) // 계산하는 척하기
yield(i) // 다음 값을 반환
}
}
fun main() {
simple().forEach { value -> println(value) }
}
Kotlin
복사
•
만약 CPU를 많이 사용하는 블로킹 코드로 숫자를 계산하고 각각의 계산이 100ms가 걸린다고 가정하면 이 숫자들을 시퀀스로 표현할 수 있다.
•
위 코드는 동일한 숫자를 출력하지만 각 숫자를 출력하기 전에 100ms를 기다린다.
일시 중단 함수
suspend fun simple(): List<Int> {
delay(1000) // 비동기 작업을 하는 척하기
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
Kotlin
복사
•
그러나 이 계산은 코드를 실행하는 메인 스레드를 블로킹한다.
•
만약 이러한 값들이 비동기 코드로 계산된다면 simple 함수를 suspend로 표시하여 작업을 블로킹하지 않고 결과를 리스트로 반환할 수 있다.
•
위 코드는 1초를 기다린 후 숫자들을 출력한다.
플로우
fun simple(): Flow<Int> = flow { // 플로우 빌더
for (i in 1..3) {
delay(100) // 유용한 작업을 하는 척하기
emit(i) // 다음 값을 방출
}
}
fun main() = runBlocking<Unit> {
// 메인 스레드가 블로킹되지 않는지 확인하기 위한 동시 코루틴 실행
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 플로우 수집
simple().collect { value -> println(value) }
}
// 출력:
// I'm not blocked 1
// 1
// I'm not blocked 2
// 2
// I'm not blocked 3
// 3
Kotlin
복사
•
List<Int> 타입을 사용하면 모든 값을 한 번에 반환할 수 있다.
•
비동기적으로 계산되는 값의 스트림을 표현하기 위해서는 동기적으로 계산되는 값에 대해 Sequence<Int> 타입을 사용하는 것과 같이 Flow<Int> 타입을 사용할 수 있다.
•
위 코드는 메인 스레드를 블로킹하지 않고 각 숫자를 출력하기 전에 100ms를 기다린다.
•
이는 메인 스레드에서 메세지를 100ms마다 출력하는 별도의 코루틴이 실행되므로 확인할 수 있다.
•
simple의 flow { … } 본문에서 delay 대신 Thread.sleep을 사용하면 메인 스레드가 블로킹된다.
Flow를 사용한 코드의 차이점
•
Flow 타입의 빌더 함수는 flow라고 불린다.
•
flow { … } 블록 내부의 코드는 일시 중단될 수 있다.
•
simple 함수는 더 이상 suspend로 표시되지 않는다.
•
값은 emit 함수를 사용해 Flow에서 방출된다.
•
값은 collect 함수를 사용해 Flow로부터 수집된다.
Flow는 차갑다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
// 출력:
// Calling simple function...
// Calling collect...
// Flow started
// 1
// 2
// 3
// Calling collect again...
// Flow started
// 1
// 2
// 3
Kotlin
복사
•
플로우는 시퀀스와 비슷한 콜드 스트림이다.
•
즉, 플로우 빌더 내부의 코드는 플로우가 수집될 때까지 실행되지 않는다.
•
이것은 플로우를 반환하는 simple 함수가 suspend로 표시되지 않는 주요 이유이다.
•
simple() 호출 자체는 빠르게 반환되고 아무것도 기다리지 않는다.
•
플로우는 매번 수집될 때 새로 시작되며 그래서 collect를 다시 호출할 때마다 메세지가 출력된다.
Flow 취소 기본사항
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 250ms 후에 타임아웃 발생
simple().collect { value -> println(value) }
}
println("Done")
}
// 출력:
// Emitting 1
// 1
// Emitting 2
// 2
// Done
Kotlin
복사
•
Flow는 일반적인 코루틴의 협력적 취소를 따른다.
•
일반적으로 delay와 같은 취소가 가능한 일시 중단 함수에서 Flow 수집이 일시 중단된 경우 Flow 수집이 취소될 수 있다.
•
위 코드는 withTimeoutOrNull 블록에서 Flow가 시간 초과로 인해 취소되고 더 이상의 코드 실행을 멈추는 방법을 보여준다.
•
simple 함수는 두 개의 숫자만 출력한다.
Flow 빌더
// 정수 범위를 Flow로 변환
(1..3).asFlow().collect { value -> println(value) }
Kotlin
복사
•
flow { … } 빌더는 가장 기본적이 빌더이다.
•
flowOf 빌더는 고정된 값들을 방출하는 Flow를 정의한다.
•
다양한 컬렉션과 시퀀스는 .asFlow() 확장 함수를 사용해 Flow로 변환할 수 있다.
Flow의 중간 연산자
suspend fun performRequest(request: Int): String {
delay(1000) // 장시간 비동기 작업을 모방
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 요청의 Flow
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
// 출력:
// response 1
// response 2
// response 3
Kotlin
복사
•
컬렉션과 시퀀스를 변환하는 것처럼 Flow도 연산자를 사용하여 변환할 수 있다.
•
중간 연산자는 상위 스트림 Flow에 적용되어 하위 스트림 Flow를 반환한다.
•
이러한 연산자는 Flow와 마찬가지로 Cold이다.
•
이러한 연산자 호출 자체는 일시 중단 함수가 아니며 빠르게 작동하여 새로운 변환된 Flow의 정의를 반환한다.
•
기본 연산자들은 map과 filter처럼 익숙한 이름을 가지고 있다.
•
이러한 연산자들이 시퀀스와 다른 중요한 차이점은 이들 연산자 내의 코드 블록이 일시 중단 함수를 호출할 수 있다는 점이다.
•
예를 들어, 들어오는 요청의 Flow를 map 연산자를 통해 결과로 반환할 수 있다.
•
이때 요청을 처리하는 작업은 일시 중단 함수로 구현된 장시간 실행되는 작업일 수 있다.
transform 연산자
(1..3).asFlow() // 요청의 Flow
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
// 출력:
// Making request 1
// response 1
// Making request 2
// response 2
// Making request 3
// response 3
Kotlin
복사
•
Flow 변환 연산자 중 가장 일반적인 연산자는 transform이다.
•
이를 사용하면 map과 filter 같은 간단한 변환을 모방하거나 더 복잡한 변환을 구현할 수 있다.
•
transform 연산자를 사용하면 임의의 값을 임의의 횟수만큼 방출할 수 있다.
•
예를 들어, transform을 사용하여 장시간 실행되는 비동기 요청을 수행하기 전에 문자열을 방출하고 그 뒤에 응답을 방출할 수 있다.
take 연산자
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // 처음 두 개만 가져오기
.collect { value -> println(value) }
}
// 출력:
// 1
// 2
// Finally in numbers
Kotlin
복사
•
take와 같은 크기 제한 중간 연산자는 해당 한계에 도달하면 Flow의 실행을 취소한다.
•
코루틴의 취소는 항상 예외를 발생시켜 수행되므로 모든 자원 관리 함수(try-catch-finally 등)는 취소 시에도 정상적으로 작동한다.
Flow의 일시 중단 연산자
val sum = (1..5).asFlow()
.map { it * it } // 1부터 5까지의 숫자의 제곱
.reduce { a, b -> a + b } // 제곱 값을 더함 (terminal 연산자)
println(sum)
// 출력:
// 55
Kotlin
복사
•
terminal 연산자는 flow의 수집을 시작하는 일시 중단 함수들이다.
•
collect 연산자가 가장 기본적인 것이지만 더 쉽게 사용할 수 있는 다른 terminal 연산자도 있다.
◦
toList, toSet과 같이 다양한 컬렉션으로 변환하는 연산자
◦
첫 번째 값을 가져오거나 flow가 단일 값을 내보내도록 보장하는 연산자
◦
reduce 및 fold를 사용하여 flow를 하나의 값으로 축약하는 연산자
Flow는 순차적이다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
// 출력:
// Filter 1
// Filter 2
// Map 2
// Collect string 2
// Filter 3
// Filter 4
// Map 4
// Collect string 4
// Filter 5
Kotlin
복사
•
Flow는 순차적으로 동작한다.
•
여러 Flow에서 동작하는 특별한 연산자를 사용하지 않는 한 Flow의 각 개별 수집은 순차적으로 수행된다.
•
수집은 terminal 연산자를 호출한 코루틴에서 직접 이루어진다.
•
기본적으로 새로운 코루틴이 생성되지 않으며 각 방출된 값은 상류에서 하류로 모든 중간 연산자에 의해 처리된 후 terminal 연산자로 전달된다.
Flow의 컨텍스트
withContext(context) {
simple().collect { value ->
println(value) // 지정된 컨텍스트에서 실행
}
}
Kotlin
복사
•
Flow의 수집은 항상 호출된 코루틴의 컨텍스트에서 이루어진다.
•
예를 들어, 단순한 flow가 있을 때 위의 코드는 간단한 flow의 구현 세부 사항과 관계없이 해당 코드 작성자가 지정한 컨텍스트에서 실행된다.
•
이러한 Flow의 속성을 컨텍스트 보존이라고 한다.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
// 출력:
// [main @coroutine#1] Started simple flow
// [main @coroutine#1] Collected 1
// [main @coroutine#1] Collected 2
// [main @coroutine#1] Collected 3
Kotlin
복사
•
기본적으로 flow { … } 빌더의 코드는 해당 flow의 수집자가 제공한 컨텍스트에서 실행된다.
•
예를 들어, 위 코드는 호출된 스레드를 출력하고 세 개의 숫자를 방출하는 간단한 함수의 구현이다.
•
simple().collect가 메인 스레드에서 호출되었으므로 simple의 flow 본문도 메인 스레드에서 호출된다.
•
이는 빠르게 실행되는 코드나 비동기 코드에 적합하며 실행 컨텍스트에 신경 쓰지 않으면서 호출자를 차단하지 않는다.
withContext를 사용할 때 흔히 발생하는 실수
fun simple(): Flow<Int> = flow {
// CPU 작업을 처리하는 잘못된 컨텍스트 변경 방법
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // CPU 작업을 흉내냄
emit(i) // 다음 값을 방출
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
// 출력:
// Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
// Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
// but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
// Please refer to 'flow' documentation or use 'flowOn' instead
// at ...
Kotlin
복사
•
그러나 CPU를 많이 사용하는 긴 작업은 Dispatchers.Default 컨텍스트에서 실행되어야 하고 UI 업데이트 코드는 Dispatchers.Main 에서 실행되어야 할 수 있다.
•
보통 Kotlin 코루틴을 사용할 때 withContext를 사용하여 컨텍스트를 변경하지만 flow { … } 빌더의 코드는 컨텍스트 보존 속성을 준수해야 하며 다른 컨텍스트에서 값을 방출할 수 없다.
flowOn 연산자
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // CPU 작업을 흉내냄
log("Emitting $i")
emit(i) // 다음 값을 방출
}
}.flowOn(Dispatchers.Default) // CPU 작업을 위한 컨텍스트 변경의 올바른 방법
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
// 출력:
// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
// [main @coroutine#1] Collected 1
// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
// [main @coroutine#1] Collected 2
// [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
// [main @coroutine#1] Collected 3
Kotlin
복사
•
위 예외는 flow 방출 시 컨텍스트를 변경하지 위해 flowOn 함수를 사용해야 함을 나타낸다.
•
Flow의 컨텍스트를 올바르게 변경하는 방법은 위와 같다.
•
이 코드를 실행하면 flow { … } 가 백그라운드 스레드에서 실행되고 수집은 메인 스레드에서 실행됨을 알 수 있다.
•
여기서 관찰할 점은 flowOn 연산자가 Flow의 기본 순차적 특성을 변경했다는 것이다.
•
이제 수집은 하나의 코루틴에서 방출은 다른 코루틴에서 서로 다른 스레드에서 동시에 이루어진다.
•
flowOn 연산자는 CoroutineDispatcher 를 변경할 때 상류 flow를 위한 새로운 코루틴을 생성한다.
버퍼링
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 비동기적으로 100ms를 기다린다고 가정
emit(i) // 다음 값을 방출
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // 처리하는 데 300ms가 걸린다고 가정
println(value)
}
}
println("Collected in $time ms")
}
// 출력:
// 1
// 2
// 3
// Collected in 1220 ms
Kotlin
복사
•
Flow의 서로 다른 부분을 서로 다른 코루틴에서 실행하는 것은 전체 Flow 수집 시간 측면에서 유용할 수 있으며 특히 긴 비동기 작업이 관련된 경우에 그렇다.
•
예를 들어, 위 코드는 간단한 flow의 방출이 느려서 요소를 생성하는데 100ms가 걸리고 수집자도 느려서 요소를 처리하는데 300ms가 걸리는 경우 전체 수집 시간이 약 1200ms정도 걸리는 것을 보여준다.
val time = measureTimeMillis {
simple()
.buffer() // 방출을 버퍼링하고 기다리지 않음
.collect { value ->
delay(300) // 처리하는 데 300ms가 걸린다고 가정
println(value)
}
}
println("Collected in $time ms")
// 출력:
// 1
// 2
// 3
// Collected in 1071 ms
Kotlin
복사
•
여기서 buffer 연산자를 사용하여 간단한 flow의 방출 코드와 수집 코드를 동시에 실행할 수 있다.
•
즉, 순차적으로 실행하지 않고 병렬로 처리한다.
•
같은 숫자가 출력되지만 훨씬 빠르게 실행되며 첫 번째 숫자를 기다리는데 100ms만 걸리고 이후 각 숫자를 처리하는데 300ms만 소비되어 약 1000ms 정도 걸린다.
•
flowOn 연산자도 CoroutineDispatcher를 변경할 때 같은 버퍼링 메커니즘을 사용하지만 여기서는 실행 컨텍스트를 변경하지 않고 명시적으로 버퍼링을 요청한다.
conflate 연산자
val time = measureTimeMillis {
simple()
.conflate() // 방출된 값을 결합하고, 중간 값을 처리하지 않음
.collect { value ->
delay(300) // 처리하는 데 300ms가 걸린다고 가정
println(value)
}
}
println("Collected in $time ms")
// 출력:
// 1
// 3
// Collected in 758 ms
Kotlin
복사
•
Flow가 작업의 중간 결과나 상태 업데이트를 나타내는 경우 각 값을 모두 처리할 필요가 없고 최신 값만 처리하면 된다.
•
이 경우 수집자가 너무 느려서 모든 값을 처리할 수 없을 때 중간 값을 건너뛰기 위해 conflate 연산자를 사용할 수 있다.
•
첫 번째 숫자를 처리하는 동안 두 번째와 세 번째 값이 이미 방출되었고 두 번째 값은 결합되었으며 가장 최근 값인 세 번째 값만 수집자에게 전달되었다.
최신 값 처리
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 최신 값에서 취소하고 다시 시작
println("Collecting $value")
delay(300) // 처리하는 데 300ms가 걸린다고 가정
println("Done $value")
}
}
println("Collected in $time ms")
// 출력:
// Collecting 1
// Collecting 2
// Collecting 3
// Done 3
// Collected in 741 ms
Kotlin
복사
•
conflate는 방출자와 수집자가 모두 느릴 때 처리 속도를 높이는 한 가지 방법이다.
•
이는 방출된 값을 건너뛰는 방식으로 이루어진다.
•
또 다른 방법은 느린 수집자를 취소하고 새 값이 방출될 때마다 다시 시작하는 것이다.
•
xxxLatest 연산자군은 새로운 값이 방출될 때 해당 블록의 코드를 취소하고 다시 시작하는 동일한 로직을 수행한다.
•
collectLatest의 본문은 300ms가 걸리지만 새로운 값은 매 100ms마다 방출되므로 블록이 매번 실행되지만 마지막 값에 대해서만 다시 시작하지 않는다.
여러 Flow 구성하기
zip
val nums = (1..3).asFlow() // 숫자 1..3
val strs = flowOf("one", "two", "three") // 문자열
nums.zip(strs) { a, b -> "$a -> $b" } // 하나의 문자열로 결합
.collect { println(it) } // 수집하고 출력
// 출력:
// 1 -> one
// 2 -> two
// 3 -> three
Kotlin
복사
•
Kotlin 표준 라이브러리의 Sequence.zip 확장 함수처럼 Flow에도 두 개의 Flow의 값을 결합하는 zip 연산자가 있다.
combine
val nums = (1..3).asFlow().onEach { delay(300) } // 숫자 1..3이 300ms마다 방출
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 문자열이 400ms마다 방출
val startTime = System.currentTimeMillis() // 시작 시간 기록
nums.zip(strs) { a, b -> "$a -> $b" } // "zip"으로 하나의 문자열로 결합
.collect { value -> // 수집하고 출력
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 출력:
// 1 -> one at 400 ms from start
// 2 -> two at 800 ms from start
// 3 -> three at 1200 ms from start
Kotlin
복사
•
Flow가 변수나 작업의 최신 값을 나타내는 경우(conflate 함수처럼) 해당 Flow들이 방출하는 최신 값에 의존하는 계산을 수행할 필요가 있을 수 있으며 상위 Flow들 중 하나가 값을 방출할 때마다 다시 계산을 해야 할 수도 있다.
•
이를 위한 연산자 그룹이 combine이다.
•
예를 들어, 위 코드에서 숫자는 300ms마다 업데이트되고 문자열은 400ms마다 업데이트된다고 가정하면 zip 연산자를 사용하여 결합하면 결과가 여전히 400ms마다 출력된다.
val nums = (1..3).asFlow().onEach { delay(300) } // 숫자 1..3이 300ms마다 방출
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 문자열이 400ms마다 방출
val startTime = System.currentTimeMillis() // 시작 시간 기록
nums.combine(strs) { a, b -> "$a -> $b" } // "combine"으로 하나의 문자열로 결합
.collect { value -> // 수집하고 출력
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 출력:
// 1 -> one at 452 ms from start
// 2 -> one at 651 ms from start
// 2 -> two at 854 ms from start
// 3 -> two at 952 ms from start
// 3 -> three at 1256 ms from start
Kotlin
복사
•
하지만 zip 대신 combine 연산자를 사용하면 결과는 다음과 같이 nums 또는 strs 의 Flow 중 어느 하나에서 값이 방출될 때마다 출력되는 것을 볼 수 있다.
Flow 평탄화
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 500ms 대기
emit("$i: Second")
}
(1..3).asFlow().map { requestFlow(it) }
Kotlin
복사
•
Flow는 비동기적으로 값을 수신하는 시퀀스를 나타내기 때문에 각 값이 또 다른 시퀀스를 요청하는 상황이 쉽게 발생할 수 있다.
•
requestFlow 함수를 호출하는 코드는 Flow<Flow<String>>의 형태로 중첩된 Flow가 생성되며 이를 처리하려면 단일 Flow로 평탄화해야 한다.
•
컬렉션과 시퀀스에는 이러한 작업을 수행하는 flatten 및 flatMap 연산자가 있다.
•
그러나 Flow는 비동기적 특성 때문에 평탄화 방법이 달라져야 하며 이에 따라 Flow에서 사용할 수 있는 여러 평탄화 연산자가 존재한다.
flatMapConcat
val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
.flatMapConcat { requestFlow(it) }
.collect { value -> // 수집하고 출력
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 출력:
// 1: First at 121 ms from start
// 1: Second at 622 ms from start
// 2: First at 727 ms from start
// 2: Second at 1227 ms from start
// 3: First at 1328 ms from start
// 3: Second at 1829 ms from start
Kotlin
복사
•
Flow의 Flow를 순차적으로 이어서(평탄화해서) 처리하는 flatMapConcat 및 flattenConcat 연산자가 있다.
•
이는 시퀀스 연산자와 가장 직접적으로 대응되는 연산자로 내부 Flow가 완료될 때까지 기다렸다가 다음 Flow를 수집하는 방식이다.
flatMapMerge
val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
.flatMapMerge { requestFlow(it) }
.collect { value -> // 수집하고 출력
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 출력:
// 1: First at 136 ms from start
// 2: First at 231 ms from start
// 3: First at 333 ms from start
// 1: Second at 639 ms from start
// 2: Second at 732 ms from start
// 3: Second at 833 ms from start
Kotlin
복사
•
flatMapMerge 및 flattenMerge 연산자는 모든 들어오는 Flow를 동시에 수집하여 가능한 한 빨리 값을 방출한다.
•
이들은 동시에 수집할 수 있는 Flow의 수를 제한하는 선택적 concurrency 매개변수를 받을 수 있다. (기본값은 DEFAULT_CONCURRENCY)
•
flatMapMerge는 requestFlow(it)을 순차적으로 호출하지만 결과 Flow는 동시에 수집되므로 map { requestFlow(it) } 을 먼저 수행한 후 flattenMerge를 호출한 것과 동일하다.
•
이 결과는 flatMapMerge의 병렬적인 특성을 보여준다.
flatMapLatest
val startTime = System.currentTimeMillis() // 시작 시간 기록
(1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출
.flatMapLatest { requestFlow(it) }
.collect { value -> // 수집하고 출력
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 출력:
// 1: First at 142 ms from start
// 2: First at 322 ms from start
// 3: First at 425 ms from start
// 3: Second at 931 ms from start
Kotlin
복사
•
collectLatest 연산자와 유사하게 flatMapLatest는 새 Flow가 방출되면 이전 Flow의 수집을 취소하는 방식의 Latest 평탄화 모드를 제공한다.
•
flatMapLatest는 새 값이 수신될 때 requestFlow(it) 에서 실행 중이던 모든 코드를 취소한다.
•
이 예제에서는 requestFlow 호출 자체가 빠르고 취소할 수 없는 비동기 함수이기 때문에 차이가 없지만 delay 같은 일시 중단 함수를 사용하면 출력이 달라질 수 있다.
Flow 예외 처리
•
Flow 수집은 방출자가 예외를 발생시키거나 연산자 내부에서 코드가 예외를 발생시키면 예외와 함께 종료될 수 있다.
수집기에서 try-catch 사용
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 다음 값 방출
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
// 출력:
// Emitting 1
// 1
// Emitting 2
// 2
// Caught java.lang.IllegalStateException: Collected 2
Kotlin
복사
•
수집기는 try-catch 블록을 사용하여 예외를 처리할 수 있다.
•
위 코드는 collect 종단 연산자에서 발생한 예외를 성공적으로 잡으며 에외가 발생한 후에는 더 이상 값이 방출되지 않는다.
모든 예외 처리
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // 다음 값 방출
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
// 출력:
// Emitting 1
// string 1
// Emitting 2
// Caught java.lang.IllegalStateException: Crashed on 2
Kotlin
복사
•
이전 예제는 방출자나 중간 또는 종단 연산자에서 발생하는 모든 예외를 처리한다.
•
예를 들어, 방출된 값을 문자열로 변환하지만 해당 코드에서 예외를 발생시키도록 코드를 변경하면 예외가 여전히 처리되며 수집이 중단된다.
예외 투명성
simple()
.catch { e -> emit("Caught $e") } // 예외 시 방출
.collect { value -> println(value) }
Kotlin
복사
•
Flows는 예외에 대해 투명해야 하며 flow { … } 빌더 내에서 try-catch 블록을 사용하여 값을 방출하는 것은 예외 투명성을 위반하는 것이다.
•
이는 수집기가 예외를 던질 때 항상 try-catch를 사용하여 예외를 잡을 수 있도록 보장한다.
•
방출자는 이러한 예외 투명성을 유지하면서 예외 처리를 캡슐화할 수 있는 catch 연산자를 사용할 수 있다.
•
catch 연산자의 본문은 예외를 분석하고 잡힌 예외에 따라 다르게 반응할 수 있다.
◦
throw를 사용해 다시 던질 수 있다.
◦
catch 블록에서 emit을 사용해 값으로 변환될 수 있다.
◦
무시되거나 로그로 기록되거나 다른 코드로 처리될 수 있다.
투명한 catch
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 하위 스트림 예외는 잡지 않음
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
// 출력:
// Emitting 1
// 1
// Emitting 2
// Exception in thread "main" java.lang.IllegalStateException: Collected 2
Kotlin
복사
•
catch 중간 연산자는 예외 투명성을 유지하며 상위 스트림의 예외(catch 위의 모든 연산자에서 발생한 예외)만 잡는다.
•
만약 collect { … } 블록(catch 아래에 위치)이 예외를 던진다면 예외는 잡히지 않고 그대로 전달된다.
선언적으로 예외 처리
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
// 출력:
// Emitting 1
// 1
// Emitting 2
// Caught java.lang.IllegalStateException: Collected 2
Kotlin
복사
•
catch 연산자의 선언적 특성과 모든 예외를 처리하고자 하는 요구를 결합하려면 collect 연산자의 본문을 onEach로 이동시키고 catch 연산자 앞에 배치할 수 있다.
•
이 Flow의 수집은 파라미터 없이 collect() 호출을 통해 트리거되어야 한다.
Flow 완료
•
Flow 수집이 정상적으로 또는 예외적으로 완료될 때 어떤 작업을 실행해야 할 필요가 있을 수 있다.
•
이를 수행하는 방법은 명령형 또는 선언형 두 가지가 있다.
명령형 finally 블록
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
// 출력:
// 1
// 2
// 3
// Done
Kotlin
복사
•
try-catch 외에도 수집기는 finally 블록을 사용하여 수집 완료 시 작업을 실행할 수 있다.
선언형 처리
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
// 출력:
// 1
// Flow completed exceptionally
// Caught exception
Kotlin
복사
•
선언형 접근 방식으로 flow에는 수집이 완료될 때 호출되는 onCompletion 중간 연산자가 있다.
•
onCompletion의 주요 장점은 람다의 nullable Throwable 매개변수로 이를 통해 flow 수집이 정상적으로 완료되었는지 아니면 예외적으로 완료되었는지를 확인할 수 있다.
•
onCompletion 연산자는 catch와는 달리 예외를 처리하지 않는다.
•
예외는 여전히 하류로 흐르며 추가적인 onCompletion 연산자에게 전달되고 catch 연산자로 처리될 수 있다.
성공적인 완료
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
// 출력:
// 1
// Flow completed with java.lang.IllegalStateException: Collected 2
// Exception in thread "main" java.lang.IllegalStateException: Collected 2
Kotlin
복사
•
catch 연산자와의 또 다른 차이점은 onCompletion이 모든 예외를 보고하고 상위 flow가 정상적으로 완료되었을 때만 null 예외를 취소나 실패 없이 수신한다는 점이다.
•
하류 예외로 인해 flow가 중단되었기 때문에 완료 원인이 null이 아님을 알 수 없다.
명령형 vs 선언형
•
이제 flow를 수집하고 명령형 방식과 선언형 방식 모두에서 그 완료와 예외를 처리하는 방법을 알게 되었다.
•
어떤 접근 방식을 선호해야 하며 그 이유는 무엇일까?
•
라이브러리로서 특정 접근 방식을 권장하지 않으며 두 가지 옵션 모두 유효하다.
•
어떤 방식을 선택할지는 개인의 선호도와 코드 스타일에 따라 결정되어야 한다.
Flow 시작
// 이벤트 흐름을 모방
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Flow를 수집하는 동안 기다림
println("Done")
}
// 출력:
// Event: 1
// Event: 2
// Event: 3
// Done
Kotlin
복사
•
Flow를 사용하여 어떤 소스에서 비동기 이벤트를 표현하는 것은 매우 쉽다.
•
이 경우, 들어오는 이벤트에 반응하는 코드를 등록하고 이후 작업을 계속 진행하는 addEventListener 함수와 유사한 기능이 필요하다.
•
이 역할을 onEach 연산자가 수행할 수 있지만 onEach는 중간 연산자이기 때문에 Flow를 수집하기 위한 최종 연산자도 필요하다.
•
onEach 이후에 collect 최종 연산자를 사용하면 그 이후의 코드는 Flow가 수집될 때까지 기다린다.
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Flow를 별도의 코루틴에서 실행
println("Done")
}
// 출력:
// Done
// Event: 1
// Event: 2
// Event: 3
Kotlin
복사
•
여기서 launchIn 이라는 최종 연산자가 유용하게 사용될 수 있다.
•
collect를 launchIn으로 대체하면 Flow 수집을 별도의 코루틴에서 실행할 수 있어 이후의 코드 실행이 즉시 계속된다.
•
launchIn에 필요한 매개변수는 Flow를 수집할 코루틴이 실행되는 CoroutineScope를 지정해야 한다.
•
위 예제에서는 Scope를 runBlocking 코루틴 빌더에서 가져오며 Flow가 실행되는 동안 이 runBlocking Scope는 자식 코루틴이 완료되기를 기다리고 종료되지 않는다.
•
실제 어플리케이션에서는 Scope가 수명 제한이 있는 엔티티에서 제공된다.
•
이 엔티티의 수명이 종료되면 해당 Scope도 취소되어 관련 Flow 수집이 취소된다.
•
이 방법으로 onEach { … }.launchIn(scope)의 조합이 addEventListener처럼 작동하지만 취소와 구조적 동시성이 이 역할을 대신하기 때문에 removeEventListener 함수는 필요하지 않다.
•
또한 launchIn은 Job을 반환하므로 전체 Scope를 취소하지 않고도 해당 Flow 수집 코루틴만 취소하거나 이를 join 할 수 있다.
Flow 취소 확인
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 출력:
// Emitting 1
// 1
// Emitting 2
// 2
// Emitting 3
// 3
// Emitting 4
// Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
Kotlin
복사
•
편의상 Flow 빌더는 각 값이 방출될 때마다 취소에 대한 추가적인 ensureActive 체크를 수행한다.
•
즉, Flow에서 방출되는 바쁜 루프는 취소 가능하다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 출력:
// 1
// 2
// 3
// 4
// 5
// Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
Kotlin
복사
•
그러나 대부분의 다른 Flow 연산자는 성능상의 이유로 추가적인 취소 체크를 하지 않는다.
•
예를 들어, IntRange.asFlow 확장을 사용하여 동일한 바쁜 루프를 작성하고 어디에서도 일시 중단하지 않으면 취소 체크가 수행되지 않는다.
바쁜 Flow를 취소 가능하게 만들기
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 출력:
// 1
// 2
// 3
// Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
Kotlin
복사
•
코루틴에서 바쁜 루프가 있을 때 명시적으로 취소를 체크해야 한다.
•
.onEach { currentCoroutineContext().ensureActive() } 를 추가할 수 있지만 이를 대신할 준비된 cancellable 연산자가 제공된다.
Flow와 Reactive Stream
•
Reactive Stream, RxJava, Project Reactor와 같은 리액티브 프레임워크에 익숙한 사람들에게는 Flow의 설계가 매우 익숙하게 느껴질 수 있다.
•
실제로 Flow의 설계는 Reactive Stream과 그 다양한 구현에서 영감을 받았다.
•
하지만 Flow의 주요 목표는 최대한 단순한 설계를 유지하고 Kotlin과 suspension에 친화적이며 구조적 동시성을 존중하는 것이다.
•
Flow는 개념적으로 다르지만 Reactive Stream과 유사한 측면이 있으며 이를 Reactive Stream인 Publisher로 변환하거나 그 반대로 변환하는 것이 가능하다.
•
이러한 변환기는 kotlinx.coroutines에 기본적으로 포함되어 있으며 해당 리액티브 모듈에서 찾을 수 있다.
•
통합 모듈에는 Flow로의 변환과 Flow에서의 변환, Reactor의 Context와 통합 그리고 다양한 리액티브 엔티티를 서스펜션 친화적인 방식으로 다루는 방법이 포함되어 있다.