Search

Asynchronous Flow

일시 중단 함수는 비동기적으로 단일 값을 반환한다.
하지만 여러 개의 비동기 계산 값을 반환하려면 어떻게 해야 할까?

비동기 Flow

여러 값 표현하기

fun simple(): List<Int> = listOf(1, 2, 3) fun main() { simple().forEach { value -> println(value) } } // 출력: // 1 // 2 // 3
Kotlin
복사
Kotlin에서는 여러 값을 컬렉션으로 표현할 수 있다.
예를 들어, 위 코드는 세 개의 숫자를 반환하고 이를 forEach로 모두 출력하는 간단함 함수이다.

시퀀스

fun simple(): Sequence<Int> = sequence { // 시퀀스 빌더 for (i in 1..3) { Thread.sleep(100) // 계산하는 척하기 yield(i) // 다음 값을 반환 } } fun main() { simple().forEach { value -> println(value) } }
Kotlin
복사
만약 CPU를 많이 사용하는 블로킹 코드로 숫자를 계산하고 각각의 계산이 100ms가 걸린다고 가정하면 이 숫자들을 시퀀스로 표현할 수 있다.
위 코드는 동일한 숫자를 출력하지만 각 숫자를 출력하기 전에 100ms를 기다린다.

일시 중단 함수

suspend fun simple(): List<Int> { delay(1000) // 비동기 작업을 하는 척하기 return listOf(1, 2, 3) } fun main() = runBlocking<Unit> { simple().forEach { value -> println(value) } }
Kotlin
복사
그러나 이 계산은 코드를 실행하는 메인 스레드를 블로킹한다.
만약 이러한 값들이 비동기 코드로 계산된다면 simple 함수를 suspend로 표시하여 작업을 블로킹하지 않고 결과를 리스트로 반환할 수 있다.
위 코드는 1초를 기다린 후 숫자들을 출력한다.

플로우

fun simple(): Flow<Int> = flow { // 플로우 빌더 for (i in 1..3) { delay(100) // 유용한 작업을 하는 척하기 emit(i) // 다음 값을 방출 } } fun main() = runBlocking<Unit> { // 메인 스레드가 블로킹되지 않는지 확인하기 위한 동시 코루틴 실행 launch { for (k in 1..3) { println("I'm not blocked $k") delay(100) } } // 플로우 수집 simple().collect { value -> println(value) } } // 출력: // I'm not blocked 1 // 1 // I'm not blocked 2 // 2 // I'm not blocked 3 // 3
Kotlin
복사
List<Int> 타입을 사용하면 모든 값을 한 번에 반환할 수 있다.
비동기적으로 계산되는 값의 스트림을 표현하기 위해서는 동기적으로 계산되는 값에 대해 Sequence<Int> 타입을 사용하는 것과 같이 Flow<Int> 타입을 사용할 수 있다.
위 코드는 메인 스레드를 블로킹하지 않고 각 숫자를 출력하기 전에 100ms를 기다린다.
이는 메인 스레드에서 메세지를 100ms마다 출력하는 별도의 코루틴이 실행되므로 확인할 수 있다.
simple의 flow { … } 본문에서 delay 대신 Thread.sleep을 사용하면 메인 스레드가 블로킹된다.

Flow를 사용한 코드의 차이점

Flow 타입의 빌더 함수는 flow라고 불린다.
flow { … } 블록 내부의 코드는 일시 중단될 수 있다.
simple 함수는 더 이상 suspend로 표시되지 않는다.
값은 emit 함수를 사용해 Flow에서 방출된다.
값은 collect 함수를 사용해 Flow로부터 수집된다.

Flow는 차갑다.

fun simple(): Flow<Int> = flow { println("Flow started") for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { println("Calling simple function...") val flow = simple() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } } // 출력: // Calling simple function... // Calling collect... // Flow started // 1 // 2 // 3 // Calling collect again... // Flow started // 1 // 2 // 3
Kotlin
복사
플로우는 시퀀스와 비슷한 콜드 스트림이다.
즉, 플로우 빌더 내부의 코드는 플로우가 수집될 때까지 실행되지 않는다.
이것은 플로우를 반환하는 simple 함수가 suspend로 표시되지 않는 주요 이유이다.
simple() 호출 자체는 빠르게 반환되고 아무것도 기다리지 않는다.
플로우는 매번 수집될 때 새로 시작되며 그래서 collect를 다시 호출할 때마다 메세지가 출력된다.

Flow 취소 기본사항

fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit> { withTimeoutOrNull(250) { // 250ms 후에 타임아웃 발생 simple().collect { value -> println(value) } } println("Done") } // 출력: // Emitting 1 // 1 // Emitting 2 // 2 // Done
Kotlin
복사
Flow는 일반적인 코루틴의 협력적 취소를 따른다.
일반적으로 delay와 같은 취소가 가능한 일시 중단 함수에서 Flow 수집이 일시 중단된 경우 Flow 수집이 취소될 수 있다.
위 코드는 withTimeoutOrNull 블록에서 Flow가 시간 초과로 인해 취소되고 더 이상의 코드 실행을 멈추는 방법을 보여준다.
simple 함수는 두 개의 숫자만 출력한다.

Flow 빌더

// 정수 범위를 Flow로 변환 (1..3).asFlow().collect { value -> println(value) }
Kotlin
복사
flow { … } 빌더는 가장 기본적이 빌더이다.
flowOf 빌더는 고정된 값들을 방출하는 Flow를 정의한다.
다양한 컬렉션과 시퀀스는 .asFlow() 확장 함수를 사용해 Flow로 변환할 수 있다.

Flow의 중간 연산자

suspend fun performRequest(request: Int): String { delay(1000) // 장시간 비동기 작업을 모방 return "response $request" } fun main() = runBlocking<Unit> { (1..3).asFlow() // 요청의 Flow .map { request -> performRequest(request) } .collect { response -> println(response) } } // 출력: // response 1 // response 2 // response 3
Kotlin
복사
컬렉션과 시퀀스를 변환하는 것처럼 Flow도 연산자를 사용하여 변환할 수 있다.
중간 연산자는 상위 스트림 Flow에 적용되어 하위 스트림 Flow를 반환한다.
이러한 연산자는 Flow와 마찬가지로 Cold이다.
이러한 연산자 호출 자체는 일시 중단 함수가 아니며 빠르게 작동하여 새로운 변환된 Flow의 정의를 반환한다.
기본 연산자들은 map과 filter처럼 익숙한 이름을 가지고 있다.
이러한 연산자들이 시퀀스와 다른 중요한 차이점은 이들 연산자 내의 코드 블록이 일시 중단 함수를 호출할 수 있다는 점이다.
예를 들어, 들어오는 요청의 Flow를 map 연산자를 통해 결과로 반환할 수 있다.
이때 요청을 처리하는 작업은 일시 중단 함수로 구현된 장시간 실행되는 작업일 수 있다.

transform 연산자

(1..3).asFlow() // 요청의 Flow .transform { request -> emit("Making request $request") emit(performRequest(request)) } .collect { response -> println(response) } // 출력: // Making request 1 // response 1 // Making request 2 // response 2 // Making request 3 // response 3
Kotlin
복사
Flow 변환 연산자 중 가장 일반적인 연산자는 transform이다.
이를 사용하면 map과 filter 같은 간단한 변환을 모방하거나 더 복잡한 변환을 구현할 수 있다.
transform 연산자를 사용하면 임의의 값을 임의의 횟수만큼 방출할 수 있다.
예를 들어, transform을 사용하여 장시간 실행되는 비동기 요청을 수행하기 전에 문자열을 방출하고 그 뒤에 응답을 방출할 수 있다.

take 연산자

fun numbers(): Flow<Int> = flow { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } fun main() = runBlocking<Unit> { numbers() .take(2) // 처음 두 개만 가져오기 .collect { value -> println(value) } } // 출력: // 1 // 2 // Finally in numbers
Kotlin
복사
take와 같은 크기 제한 중간 연산자는 해당 한계에 도달하면 Flow의 실행을 취소한다.
코루틴의 취소는 항상 예외를 발생시켜 수행되므로 모든 자원 관리 함수(try-catch-finally 등)는 취소 시에도 정상적으로 작동한다.

Flow의 일시 중단 연산자

val sum = (1..5).asFlow() .map { it * it } // 1부터 5까지의 숫자의 제곱 .reduce { a, b -> a + b } // 제곱 값을 더함 (terminal 연산자) println(sum) // 출력: // 55
Kotlin
복사
terminal 연산자는 flow의 수집을 시작하는 일시 중단 함수들이다.
collect 연산자가 가장 기본적인 것이지만 더 쉽게 사용할 수 있는 다른 terminal 연산자도 있다.
toList, toSet과 같이 다양한 컬렉션으로 변환하는 연산자
첫 번째 값을 가져오거나 flow가 단일 값을 내보내도록 보장하는 연산자
reduce 및 fold를 사용하여 flow를 하나의 값으로 축약하는 연산자

Flow는 순차적이다.

(1..5).asFlow() .filter { println("Filter $it") it % 2 == 0 } .map { println("Map $it") "string $it" }.collect { println("Collect $it") } // 출력: // Filter 1 // Filter 2 // Map 2 // Collect string 2 // Filter 3 // Filter 4 // Map 4 // Collect string 4 // Filter 5
Kotlin
복사
Flow는 순차적으로 동작한다.
여러 Flow에서 동작하는 특별한 연산자를 사용하지 않는 한 Flow의 각 개별 수집은 순차적으로 수행된다.
수집은 terminal 연산자를 호출한 코루틴에서 직접 이루어진다.
기본적으로 새로운 코루틴이 생성되지 않으며 각 방출된 값은 상류에서 하류로 모든 중간 연산자에 의해 처리된 후 terminal 연산자로 전달된다.

Flow의 컨텍스트

withContext(context) { simple().collect { value -> println(value) // 지정된 컨텍스트에서 실행 } }
Kotlin
복사
Flow의 수집은 항상 호출된 코루틴의 컨텍스트에서 이루어진다.
예를 들어, 단순한 flow가 있을 때 위의 코드는 간단한 flow의 구현 세부 사항과 관계없이 해당 코드 작성자가 지정한 컨텍스트에서 실행된다.
이러한 Flow의 속성을 컨텍스트 보존이라고 한다.
fun simple(): Flow<Int> = flow { log("Started simple flow") for (i in 1..3) { emit(i) } } fun main() = runBlocking<Unit> { simple().collect { value -> log("Collected $value") } } // 출력: // [main @coroutine#1] Started simple flow // [main @coroutine#1] Collected 1 // [main @coroutine#1] Collected 2 // [main @coroutine#1] Collected 3
Kotlin
복사
기본적으로 flow { … } 빌더의 코드는 해당 flow의 수집자가 제공한 컨텍스트에서 실행된다.
예를 들어, 위 코드는 호출된 스레드를 출력하고 세 개의 숫자를 방출하는 간단한 함수의 구현이다.
simple().collect가 메인 스레드에서 호출되었으므로 simple의 flow 본문도 메인 스레드에서 호출된다.
이는 빠르게 실행되는 코드나 비동기 코드에 적합하며 실행 컨텍스트에 신경 쓰지 않으면서 호출자를 차단하지 않는다.

withContext를 사용할 때 흔히 발생하는 실수

fun simple(): Flow<Int> = flow { // CPU 작업을 처리하는 잘못된 컨텍스트 변경 방법 kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in 1..3) { Thread.sleep(100) // CPU 작업을 흉내냄 emit(i) // 다음 값을 방출 } } } fun main() = runBlocking<Unit> { simple().collect { value -> println(value) } } // 출력: // Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: // Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323], // but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. // Please refer to 'flow' documentation or use 'flowOn' instead // at ...
Kotlin
복사
그러나 CPU를 많이 사용하는 긴 작업은 Dispatchers.Default 컨텍스트에서 실행되어야 하고 UI 업데이트 코드는 Dispatchers.Main 에서 실행되어야 할 수 있다.
보통 Kotlin 코루틴을 사용할 때 withContext를 사용하여 컨텍스트를 변경하지만 flow { … } 빌더의 코드는 컨텍스트 보존 속성을 준수해야 하며 다른 컨텍스트에서 값을 방출할 수 없다.

flowOn 연산자

fun simple(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(100) // CPU 작업을 흉내냄 log("Emitting $i") emit(i) // 다음 값을 방출 } }.flowOn(Dispatchers.Default) // CPU 작업을 위한 컨텍스트 변경의 올바른 방법 fun main() = runBlocking<Unit> { simple().collect { value -> log("Collected $value") } } // 출력: // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1 // [main @coroutine#1] Collected 1 // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2 // [main @coroutine#1] Collected 2 // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3 // [main @coroutine#1] Collected 3
Kotlin
복사
위 예외는 flow 방출 시 컨텍스트를 변경하지 위해 flowOn 함수를 사용해야 함을 나타낸다.
Flow의 컨텍스트를 올바르게 변경하는 방법은 위와 같다.
이 코드를 실행하면 flow { … } 가 백그라운드 스레드에서 실행되고 수집은 메인 스레드에서 실행됨을 알 수 있다.
여기서 관찰할 점은 flowOn 연산자가 Flow의 기본 순차적 특성을 변경했다는 것이다.
이제 수집은 하나의 코루틴에서 방출은 다른 코루틴에서 서로 다른 스레드에서 동시에 이루어진다.
flowOn 연산자는 CoroutineDispatcher 를 변경할 때 상류 flow를 위한 새로운 코루틴을 생성한다.

버퍼링

fun simple(): Flow<Int> = flow { for (i in 1..3) { delay(100) // 비동기적으로 100ms를 기다린다고 가정 emit(i) // 다음 값을 방출 } } fun main() = runBlocking<Unit> { val time = measureTimeMillis { simple().collect { value -> delay(300) // 처리하는 데 300ms가 걸린다고 가정 println(value) } } println("Collected in $time ms") } // 출력: // 1 // 2 // 3 // Collected in 1220 ms
Kotlin
복사
Flow의 서로 다른 부분을 서로 다른 코루틴에서 실행하는 것은 전체 Flow 수집 시간 측면에서 유용할 수 있으며 특히 긴 비동기 작업이 관련된 경우에 그렇다.
예를 들어, 위 코드는 간단한 flow의 방출이 느려서 요소를 생성하는데 100ms가 걸리고 수집자도 느려서 요소를 처리하는데 300ms가 걸리는 경우 전체 수집 시간이 약 1200ms정도 걸리는 것을 보여준다.
val time = measureTimeMillis { simple() .buffer() // 방출을 버퍼링하고 기다리지 않음 .collect { value -> delay(300) // 처리하는 데 300ms가 걸린다고 가정 println(value) } } println("Collected in $time ms") // 출력: // 1 // 2 // 3 // Collected in 1071 ms
Kotlin
복사
여기서 buffer 연산자를 사용하여 간단한 flow의 방출 코드와 수집 코드를 동시에 실행할 수 있다.
즉, 순차적으로 실행하지 않고 병렬로 처리한다.
같은 숫자가 출력되지만 훨씬 빠르게 실행되며 첫 번째 숫자를 기다리는데 100ms만 걸리고 이후 각 숫자를 처리하는데 300ms만 소비되어 약 1000ms 정도 걸린다.
flowOn 연산자도 CoroutineDispatcher를 변경할 때 같은 버퍼링 메커니즘을 사용하지만 여기서는 실행 컨텍스트를 변경하지 않고 명시적으로 버퍼링을 요청한다.

conflate 연산자

val time = measureTimeMillis { simple() .conflate() // 방출된 값을 결합하고, 중간 값을 처리하지 않음 .collect { value -> delay(300) // 처리하는 데 300ms가 걸린다고 가정 println(value) } } println("Collected in $time ms") // 출력: // 1 // 3 // Collected in 758 ms
Kotlin
복사
Flow가 작업의 중간 결과나 상태 업데이트를 나타내는 경우 각 값을 모두 처리할 필요가 없고 최신 값만 처리하면 된다.
이 경우 수집자가 너무 느려서 모든 값을 처리할 수 없을 때 중간 값을 건너뛰기 위해 conflate 연산자를 사용할 수 있다.
첫 번째 숫자를 처리하는 동안 두 번째와 세 번째 값이 이미 방출되었고 두 번째 값은 결합되었으며 가장 최근 값인 세 번째 값만 수집자에게 전달되었다.

최신 값 처리

val time = measureTimeMillis { simple() .collectLatest { value -> // 최신 값에서 취소하고 다시 시작 println("Collecting $value") delay(300) // 처리하는 데 300ms가 걸린다고 가정 println("Done $value") } } println("Collected in $time ms") // 출력: // Collecting 1 // Collecting 2 // Collecting 3 // Done 3 // Collected in 741 ms
Kotlin
복사
conflate는 방출자와 수집자가 모두 느릴 때 처리 속도를 높이는 한 가지 방법이다.
이는 방출된 값을 건너뛰는 방식으로 이루어진다.
또 다른 방법은 느린 수집자를 취소하고 새 값이 방출될 때마다 다시 시작하는 것이다.
xxxLatest 연산자군은 새로운 값이 방출될 때 해당 블록의 코드를 취소하고 다시 시작하는 동일한 로직을 수행한다.
collectLatest의 본문은 300ms가 걸리지만 새로운 값은 매 100ms마다 방출되므로 블록이 매번 실행되지만 마지막 값에 대해서만 다시 시작하지 않는다.

여러 Flow 구성하기

zip

val nums = (1..3).asFlow() // 숫자 1..3 val strs = flowOf("one", "two", "three") // 문자열 nums.zip(strs) { a, b -> "$a -> $b" } // 하나의 문자열로 결합 .collect { println(it) } // 수집하고 출력 // 출력: // 1 -> one // 2 -> two // 3 -> three
Kotlin
복사
Kotlin 표준 라이브러리의 Sequence.zip 확장 함수처럼 Flow에도 두 개의 Flow의 값을 결합하는 zip 연산자가 있다.

combine

val nums = (1..3).asFlow().onEach { delay(300) } // 숫자 1..3이 300ms마다 방출 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 문자열이 400ms마다 방출 val startTime = System.currentTimeMillis() // 시작 시간 기록 nums.zip(strs) { a, b -> "$a -> $b" } // "zip"으로 하나의 문자열로 결합 .collect { value -> // 수집하고 출력 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } // 출력: // 1 -> one at 400 ms from start // 2 -> two at 800 ms from start // 3 -> three at 1200 ms from start
Kotlin
복사
Flow가 변수나 작업의 최신 값을 나타내는 경우(conflate 함수처럼) 해당 Flow들이 방출하는 최신 값에 의존하는 계산을 수행할 필요가 있을 수 있으며 상위 Flow들 중 하나가 값을 방출할 때마다 다시 계산을 해야 할 수도 있다.
이를 위한 연산자 그룹이 combine이다.
예를 들어, 위 코드에서 숫자는 300ms마다 업데이트되고 문자열은 400ms마다 업데이트된다고 가정하면 zip 연산자를 사용하여 결합하면 결과가 여전히 400ms마다 출력된다.
val nums = (1..3).asFlow().onEach { delay(300) } // 숫자 1..3이 300ms마다 방출 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 문자열이 400ms마다 방출 val startTime = System.currentTimeMillis() // 시작 시간 기록 nums.combine(strs) { a, b -> "$a -> $b" } // "combine"으로 하나의 문자열로 결합 .collect { value -> // 수집하고 출력 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } // 출력: // 1 -> one at 452 ms from start // 2 -> one at 651 ms from start // 2 -> two at 854 ms from start // 3 -> two at 952 ms from start // 3 -> three at 1256 ms from start
Kotlin
복사
하지만 zip 대신 combine 연산자를 사용하면 결과는 다음과 같이 nums 또는 strs 의 Flow 중 어느 하나에서 값이 방출될 때마다 출력되는 것을 볼 수 있다.

Flow 평탄화

fun requestFlow(i: Int): Flow<String> = flow { emit("$i: First") delay(500) // 500ms 대기 emit("$i: Second") } (1..3).asFlow().map { requestFlow(it) }
Kotlin
복사
Flow는 비동기적으로 값을 수신하는 시퀀스를 나타내기 때문에 각 값이 또 다른 시퀀스를 요청하는 상황이 쉽게 발생할 수 있다.
requestFlow 함수를 호출하는 코드는 Flow<Flow<String>>의 형태로 중첩된 Flow가 생성되며 이를 처리하려면 단일 Flow로 평탄화해야 한다.
컬렉션과 시퀀스에는 이러한 작업을 수행하는 flatten 및 flatMap 연산자가 있다.
그러나 Flow는 비동기적 특성 때문에 평탄화 방법이 달라져야 하며 이에 따라 Flow에서 사용할 수 있는 여러 평탄화 연산자가 존재한다.

flatMapConcat

val startTime = System.currentTimeMillis() // 시작 시간 기록 (1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출 .flatMapConcat { requestFlow(it) } .collect { value -> // 수집하고 출력 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } // 출력: // 1: First at 121 ms from start // 1: Second at 622 ms from start // 2: First at 727 ms from start // 2: Second at 1227 ms from start // 3: First at 1328 ms from start // 3: Second at 1829 ms from start
Kotlin
복사
Flow의 Flow를 순차적으로 이어서(평탄화해서) 처리하는 flatMapConcat 및 flattenConcat 연산자가 있다.
이는 시퀀스 연산자와 가장 직접적으로 대응되는 연산자로 내부 Flow가 완료될 때까지 기다렸다가 다음 Flow를 수집하는 방식이다.

flatMapMerge

val startTime = System.currentTimeMillis() // 시작 시간 기록 (1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출 .flatMapMerge { requestFlow(it) } .collect { value -> // 수집하고 출력 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } // 출력: // 1: First at 136 ms from start // 2: First at 231 ms from start // 3: First at 333 ms from start // 1: Second at 639 ms from start // 2: Second at 732 ms from start // 3: Second at 833 ms from start
Kotlin
복사
flatMapMerge 및 flattenMerge 연산자는 모든 들어오는 Flow를 동시에 수집하여 가능한 한 빨리 값을 방출한다.
이들은 동시에 수집할 수 있는 Flow의 수를 제한하는 선택적 concurrency 매개변수를 받을 수 있다. (기본값은 DEFAULT_CONCURRENCY)
flatMapMerge는 requestFlow(it)을 순차적으로 호출하지만 결과 Flow는 동시에 수집되므로 map { requestFlow(it) } 을 먼저 수행한 후 flattenMerge를 호출한 것과 동일하다.
이 결과는 flatMapMerge의 병렬적인 특성을 보여준다.

flatMapLatest

val startTime = System.currentTimeMillis() // 시작 시간 기록 (1..3).asFlow().onEach { delay(100) } // 100ms마다 숫자 방출 .flatMapLatest { requestFlow(it) } .collect { value -> // 수집하고 출력 println("$value at ${System.currentTimeMillis() - startTime} ms from start") } // 출력: // 1: First at 142 ms from start // 2: First at 322 ms from start // 3: First at 425 ms from start // 3: Second at 931 ms from start
Kotlin
복사
collectLatest 연산자와 유사하게 flatMapLatest는 새 Flow가 방출되면 이전 Flow의 수집을 취소하는 방식의 Latest 평탄화 모드를 제공한다.
flatMapLatest는 새 값이 수신될 때 requestFlow(it) 에서 실행 중이던 모든 코드를 취소한다.
이 예제에서는 requestFlow 호출 자체가 빠르고 취소할 수 없는 비동기 함수이기 때문에 차이가 없지만 delay 같은 일시 중단 함수를 사용하면 출력이 달라질 수 있다.

Flow 예외 처리

Flow 수집은 방출자가 예외를 발생시키거나 연산자 내부에서 코드가 예외를 발생시키면 예외와 함께 종료될 수 있다.

수집기에서 try-catch 사용

fun simple(): Flow<Int> = flow { for (i in 1..3) { println("Emitting $i") emit(i) // 다음 값 방출 } } fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) check(value <= 1) { "Collected $value" } } } catch (e: Throwable) { println("Caught $e") } } // 출력: // Emitting 1 // 1 // Emitting 2 // 2 // Caught java.lang.IllegalStateException: Collected 2
Kotlin
복사
수집기는 try-catch 블록을 사용하여 예외를 처리할 수 있다.
위 코드는 collect 종단 연산자에서 발생한 예외를 성공적으로 잡으며 에외가 발생한 후에는 더 이상 값이 방출되지 않는다.

모든 예외 처리

fun simple(): Flow<String> = flow { for (i in 1..3) { println("Emitting $i") emit(i) // 다음 값 방출 } } .map { value -> check(value <= 1) { "Crashed on $value" } "string $value" } fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) } } catch (e: Throwable) { println("Caught $e") } } // 출력: // Emitting 1 // string 1 // Emitting 2 // Caught java.lang.IllegalStateException: Crashed on 2
Kotlin
복사
이전 예제는 방출자나 중간 또는 종단 연산자에서 발생하는 모든 예외를 처리한다.
예를 들어, 방출된 값을 문자열로 변환하지만 해당 코드에서 예외를 발생시키도록 코드를 변경하면 예외가 여전히 처리되며 수집이 중단된다.

예외 투명성

simple() .catch { e -> emit("Caught $e") } // 예외 시 방출 .collect { value -> println(value) }
Kotlin
복사
Flows는 예외에 대해 투명해야 하며 flow { … } 빌더 내에서 try-catch 블록을 사용하여 값을 방출하는 것은 예외 투명성을 위반하는 것이다.
이는 수집기가 예외를 던질 때 항상 try-catch를 사용하여 예외를 잡을 수 있도록 보장한다.
방출자는 이러한 예외 투명성을 유지하면서 예외 처리를 캡슐화할 수 있는 catch 연산자를 사용할 수 있다.
catch 연산자의 본문은 예외를 분석하고 잡힌 예외에 따라 다르게 반응할 수 있다.
throw를 사용해 다시 던질 수 있다.
catch 블록에서 emit을 사용해 값으로 변환될 수 있다.
무시되거나 로그로 기록되거나 다른 코드로 처리될 수 있다.

투명한 catch

fun simple(): Flow<Int> = flow { for (i in 1..3) { println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit> { simple() .catch { e -> println("Caught $e") } // 하위 스트림 예외는 잡지 않음 .collect { value -> check(value <= 1) { "Collected $value" } println(value) } } // 출력: // Emitting 1 // 1 // Emitting 2 // Exception in thread "main" java.lang.IllegalStateException: Collected 2
Kotlin
복사
catch 중간 연산자는 예외 투명성을 유지하며 상위 스트림의 예외(catch 위의 모든 연산자에서 발생한 예외)만 잡는다.
만약 collect { … } 블록(catch 아래에 위치)이 예외를 던진다면 예외는 잡히지 않고 그대로 전달된다.

선언적으로 예외 처리

simple() .onEach { value -> check(value <= 1) { "Collected $value" } println(value) } .catch { e -> println("Caught $e") } .collect() // 출력: // Emitting 1 // 1 // Emitting 2 // Caught java.lang.IllegalStateException: Collected 2
Kotlin
복사
catch 연산자의 선언적 특성과 모든 예외를 처리하고자 하는 요구를 결합하려면 collect 연산자의 본문을 onEach로 이동시키고 catch 연산자 앞에 배치할 수 있다.
이 Flow의 수집은 파라미터 없이 collect() 호출을 통해 트리거되어야 한다.

Flow 완료

Flow 수집이 정상적으로 또는 예외적으로 완료될 때 어떤 작업을 실행해야 할 필요가 있을 수 있다.
이를 수행하는 방법은 명령형 또는 선언형 두 가지가 있다.

명령형 finally 블록

fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) } } finally { println("Done") } } // 출력: // 1 // 2 // 3 // Done
Kotlin
복사
try-catch 외에도 수집기는 finally 블록을 사용하여 수집 완료 시 작업을 실행할 수 있다.

선언형 처리

fun simple(): Flow<Int> = flow { emit(1) throw RuntimeException() } fun main() = runBlocking<Unit> { simple() .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") } .catch { cause -> println("Caught exception") } .collect { value -> println(value) } } // 출력: // 1 // Flow completed exceptionally // Caught exception
Kotlin
복사
선언형 접근 방식으로 flow에는 수집이 완료될 때 호출되는 onCompletion 중간 연산자가 있다.
onCompletion의 주요 장점은 람다의 nullable Throwable 매개변수로 이를 통해 flow 수집이 정상적으로 완료되었는지 아니면 예외적으로 완료되었는지를 확인할 수 있다.
onCompletion 연산자는 catch와는 달리 예외를 처리하지 않는다.
예외는 여전히 하류로 흐르며 추가적인 onCompletion 연산자에게 전달되고 catch 연산자로 처리될 수 있다.

성공적인 완료

fun simple(): Flow<Int> = (1..3).asFlow() fun main() = runBlocking<Unit> { simple() .onCompletion { cause -> println("Flow completed with $cause") } .collect { value -> check(value <= 1) { "Collected $value" } println(value) } } // 출력: // 1 // Flow completed with java.lang.IllegalStateException: Collected 2 // Exception in thread "main" java.lang.IllegalStateException: Collected 2
Kotlin
복사
catch 연산자와의 또 다른 차이점은 onCompletion이 모든 예외를 보고하고 상위 flow가 정상적으로 완료되었을 때만 null 예외를 취소나 실패 없이 수신한다는 점이다.
하류 예외로 인해 flow가 중단되었기 때문에 완료 원인이 null이 아님을 알 수 없다.

명령형 vs 선언형

이제 flow를 수집하고 명령형 방식과 선언형 방식 모두에서 그 완료와 예외를 처리하는 방법을 알게 되었다.
어떤 접근 방식을 선호해야 하며 그 이유는 무엇일까?
라이브러리로서 특정 접근 방식을 권장하지 않으며 두 가지 옵션 모두 유효하다.
어떤 방식을 선택할지는 개인의 선호도와 코드 스타일에 따라 결정되어야 한다.

Flow 시작

// 이벤트 흐름을 모방 fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) } fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .collect() // <--- Flow를 수집하는 동안 기다림 println("Done") } // 출력: // Event: 1 // Event: 2 // Event: 3 // Done
Kotlin
복사
Flow를 사용하여 어떤 소스에서 비동기 이벤트를 표현하는 것은 매우 쉽다.
이 경우, 들어오는 이벤트에 반응하는 코드를 등록하고 이후 작업을 계속 진행하는 addEventListener 함수와 유사한 기능이 필요하다.
이 역할을 onEach 연산자가 수행할 수 있지만 onEach는 중간 연산자이기 때문에 Flow를 수집하기 위한 최종 연산자도 필요하다.
onEach 이후에 collect 최종 연산자를 사용하면 그 이후의 코드는 Flow가 수집될 때까지 기다린다.
fun main() = runBlocking<Unit> { events() .onEach { event -> println("Event: $event") } .launchIn(this) // <--- Flow를 별도의 코루틴에서 실행 println("Done") } // 출력: // Done // Event: 1 // Event: 2 // Event: 3
Kotlin
복사
여기서 launchIn 이라는 최종 연산자가 유용하게 사용될 수 있다.
collect를 launchIn으로 대체하면 Flow 수집을 별도의 코루틴에서 실행할 수 있어 이후의 코드 실행이 즉시 계속된다.
launchIn에 필요한 매개변수는 Flow를 수집할 코루틴이 실행되는 CoroutineScope를 지정해야 한다.
위 예제에서는 Scope를 runBlocking 코루틴 빌더에서 가져오며 Flow가 실행되는 동안 이 runBlocking Scope는 자식 코루틴이 완료되기를 기다리고 종료되지 않는다.
실제 어플리케이션에서는 Scope가 수명 제한이 있는 엔티티에서 제공된다.
이 엔티티의 수명이 종료되면 해당 Scope도 취소되어 관련 Flow 수집이 취소된다.
이 방법으로 onEach { … }.launchIn(scope)의 조합이 addEventListener처럼 작동하지만 취소와 구조적 동시성이 이 역할을 대신하기 때문에 removeEventListener 함수는 필요하지 않다.
또한 launchIn은 Job을 반환하므로 전체 Scope를 취소하지 않고도 해당 Flow 수집 코루틴만 취소하거나 이를 join 할 수 있다.

Flow 취소 확인

fun foo(): Flow<Int> = flow { for (i in 1..5) { println("Emitting $i") emit(i) } } fun main() = runBlocking<Unit> { foo().collect { value -> if (value == 3) cancel() println(value) } } // 출력: // Emitting 1 // 1 // Emitting 2 // 2 // Emitting 3 // 3 // Emitting 4 // Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
Kotlin
복사
편의상 Flow 빌더는 각 값이 방출될 때마다 취소에 대한 추가적인 ensureActive 체크를 수행한다.
즉, Flow에서 방출되는 바쁜 루프는 취소 가능하다.
fun main() = runBlocking<Unit> { (1..5).asFlow().collect { value -> if (value == 3) cancel() println(value) } } // 출력: // 1 // 2 // 3 // 4 // 5 // Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
Kotlin
복사
그러나 대부분의 다른 Flow 연산자는 성능상의 이유로 추가적인 취소 체크를 하지 않는다.
예를 들어, IntRange.asFlow 확장을 사용하여 동일한 바쁜 루프를 작성하고 어디에서도 일시 중단하지 않으면 취소 체크가 수행되지 않는다.

바쁜 Flow를 취소 가능하게 만들기

fun main() = runBlocking<Unit> { (1..5).asFlow().cancellable().collect { value -> if (value == 3) cancel() println(value) } } // 출력: // 1 // 2 // 3 // Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
Kotlin
복사
코루틴에서 바쁜 루프가 있을 때 명시적으로 취소를 체크해야 한다.
.onEach { currentCoroutineContext().ensureActive() } 를 추가할 수 있지만 이를 대신할 준비된 cancellable 연산자가 제공된다.

Flow와 Reactive Stream

Reactive Stream, RxJava, Project Reactor와 같은 리액티브 프레임워크에 익숙한 사람들에게는 Flow의 설계가 매우 익숙하게 느껴질 수 있다.
실제로 Flow의 설계는 Reactive Stream과 그 다양한 구현에서 영감을 받았다.
하지만 Flow의 주요 목표는 최대한 단순한 설계를 유지하고 Kotlin과 suspension에 친화적이며 구조적 동시성을 존중하는 것이다.
Flow는 개념적으로 다르지만 Reactive Stream과 유사한 측면이 있으며 이를 Reactive Stream인 Publisher로 변환하거나 그 반대로 변환하는 것이 가능하다.
이러한 변환기는 kotlinx.coroutines에 기본적으로 포함되어 있으며 해당 리액티브 모듈에서 찾을 수 있다.
통합 모듈에는 Flow로의 변환과 Flow에서의 변환, Reactor의 Context와 통합 그리고 다양한 리액티브 엔티티를 서스펜션 친화적인 방식으로 다루는 방법이 포함되어 있다.