Search

Coroutines and channels

코루틴을 사용하여 기본 스레드나 콜백을 차단하지 않고 네트워크 요청을 수행하는 방법을 알아보자.

Blocking 요청

예제 코드

interface GitHubService { @GET("orgs/{org}/repos?per_page=100") fun getOrgReposCall( @Path("org") org: String ): Call<List<Repo>> @GET("repos/{owner}/{repo}/contributors?per_page=100") fun getRepoContributorsCall( @Path("owner") owner: String, @Path("repo") repo: String ): Call<List<User>> }
Kotlin
복사
이 API는 주어진 조직에 속한 저장소 목록과 각 저장소의 기여자 목록을 요청하는 기능을 제공한다.
이 API는 아래의 loadContributorsBlocking() 함수에서 사용되며 주어진 조직의 기여자 목록을 가져온다.
fun loadContributorsBlocking( service: GitHubService, req: RequestData ): List<User> { val repos = service .getOrgReposCall(req.org) // #1 .execute() // #2 .also { logRepos(req, it) } // #3 .body() ?: emptyList() // #4 return repos.flatMap { repo -> service .getRepoContributorsCall(req.org, repo.name) // #1 .execute() // #2 .also { logUsers(repo, it) } // #3 .bodyList() // #4 }.aggregate() }
Kotlin
복사
주어진 조직에 속한 저장소 목록을 가져와 repos 리스트에 저장한다.
그런 다음 각 저장소에 대해 기여자 목록을 요청하고 모든 목록을 하나로 병합하여 최종 기여자 목록을 만든다.
fun <T> Response<List<T>>.bodyList(): List<T> { return body() ?: emptyList() }
Kotlin
복사
.body() ?: emptyList() 에 대한 중복 코드를 제거하기 위해 bodyList() 라는 확장 함수를 선언한다.
when (getSelectedVariant()) { BLOCKING -> { // UI 스레드 차단 val users = loadContributorsBlocking(service, req) updateResults(users, startTime) } }
Kotlin
복사
loadContributorsBlocking() 을 호출하는 loadContributors() 함수이다.

코드 설명

1.
getOrgReposCall() 과 getRepoContributorsCall() 은 모두 *Call 클래스의 인스턴스를 반환한다.
a.
이 시점에는 요청이 전송되지 않는다.
2.
Call.execute() 를 호출하여 요청을 실행한다.
a.
execute() 는 동기식 호출로 해당 스레드를 차단한다.
3.
응답을 받으면 결과는 logRepos() 및 logUsers() 함수를 호출하여 기록된다.
a.
HTTP 응답에 오류가 있으면 해당 오류도 여기서 기록된다.
4.
응답의 본문을 가져오고 필요한 데이터를 포함한 응답의 본문이 없으면 빈 리스트를 사용한다.
a.
오류가 발생할 경우 빈 리스트를 결과로 사용하며 해당 오류를 기록한다.

실행 결과

1770 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 40 repos 2025 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 23 contributors 2229 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 45 contributors ...
Kotlin
복사
각 줄의 첫 번째 항목은 프로그램이 시작된 이후 경과한 밀리초이다.
대괄호 안의 내용은 스레드 이름을 나타내며 로딩 요청이 호출된 스레드를 확인할 수 있다.
각 줄의 마지막 항목은 실제 메세지로 몇 개의 저장소 또는 기여자가 로드되었는지 보여준다.
이 로그 출력은 모든 결과가 메인 스레드에서 기록되었음을 나타낸다.
BLOCKING 옵션으로 코드를 실행하면 창이 로드가 끝날 때까지 멈추고 입력에 반응하지 않음을 볼 수 있다.
loadContributorsBlocking() 을 호출한 스레드는 UI 스레드이므로 요청은 동일한 스레드에서 실행되고 UI가 멈춘다.

콜백

Blocking 요청은 작동하지만 스레드를 차단하고 UI를 멈추게 만든다.
이를 피하기 위한 전통적인 접근 방식은 콜백을 사용하는 것이다.
연산이 완료된 직후에 호출되어야 하는 코드를 호출하는 대신 이를 별도의 콜백(종종 람다로 작성)으로 분리하여 호출자에게 전달하고 이후에 호출하도록 할 수 있다.
UI가 응답성을 유지하도록 하려면 전체 연산을 별도의 스레드로 이동시키거나 Blocking 호출 대신 콜백 API를 사용하도록 전환할 수 있다.

백그라운드 스레드 사용하기

thread { loadContributorsBlocking(service, req) }
Kotlin
복사
thread() 함수는 새로운 스레드를 시작하므로 전체 연산을 다른 스레드로 이동시킬 수 있다.
이제 모든 로딩 작업이 별도의 스레드로 이동하였으므로 메인 스레드는 자유로워져 다른 작업을 처리할 수 있다.
fun loadContributorsBackground( service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit )
Kotlin
복사
loadContributorsBackground() 함수의 시그니처를 변경하여 모든 로딩이 완료된 후 호출할 updateResults() 콜백을 마지막 인자로 받는다.
이제 loadContributorsBackground() 가 호출될 때 updateResults() 호출은 즉시 뒤따라오지 않고 콜백 안에서 호출된다.
loadContributorsBackground(service, req) { users -> SwingUtilities.invokeLater { updateResults(users, startTime) } }
Kotlin
복사
SwingUtilities.invokeLater 를 호출함으로써 UI를 업데이트하는 updateResults() 호출이 메인 UI 스레드(AWT 이벤트 디스패치 스레드)에서 이루어지도록 보장한다.
그러나 BACKGROUND 옵션을 통해 기여자를 로드하려고 시도하면 목록이 업데이트되지만 아무런 변화가 없다.

콜백 API 사용

백그라운드 스레드는 여전히 최적의 자원 활용이 아니다.
모든 로딩 요청이 순차적으로 진행되며 로딩 결과를 기다리는 동안 스레드는 차단되지만 그 시간 동안 다른 작업을 처리할 수 있었다.
구체적으로 스레드는 이전 리포지토리의 결과를 기다리지 않고 다른 요청의 로딩을 시작할 수 있다.
각 리포지토리에 대한 데이터 처리는 로딩과 응답 처리의 두 부분으로 나누어져야 한다.
두 번째 응답 처리 부분은 콜백으로 분리되어야 한다.
그런 후에 각 리포지토리의 로딩은 이전 리포지토리의 결과가 도착하기 전에 시작될 수 있다. (해당 콜백이 호출되기 전)
fun loadContributorsCallbacks( service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit ) { service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1 logRepos(req, responseRepos) val repos = responseRepos.bodyList() val allUsers = mutableListOf<User>() for (repo in repos) { service.getRepoContributorsCall(req.org, repo.name) .onResponse { responseUsers -> // #2 logUsers(repo, responseUsers) val users = responseUsers.bodyList() allUsers += users } } } // TODO: 왜 이 코드가 작동하지 않을까요? 어떻게 수정해야 할까요? updateResults(allUsers.aggregate()) }
Kotlin
복사
콜백 API를 사용하여 이를 달성할 수 있다.
Call.enqueue() 함수는 HTTP 요청을 시작하며 콜백을 인자로 받는다.
이 콜백에서 각 요청 후에 수행해야 할 작업을 지정해야 한다.
편의를 위해 이 코드는 동일한 파일에 선언된 onResponse() 확장 함수를 사용한다.
이 함수는 람다를 인자로 받으며 객체 표현 대신 사용된다.
응답을 처리하는 로직은 콜백으로 추출되었으며 각 람다는 #1과 #2에서 시작한다.
그러나 CALLBACKS 옵션을 통해 기여자를 로드하려고 시도하면 목록이 업데이트되지만 아무런 변화가 없다.

일시 중단 함수

// suspend 함수로 직접 결과 반환 interface GitHubService { @GET("orgs/{org}/repos?per_page=100") suspend fun getOrgRepos( @Path("org") org: String ): List<Repo> } // suspend 함수로 Response 결과 반환 interface GitHubService { // getOrgReposCall & getRepoContributorsCall 선언부 @GET("orgs/{org}/repos?per_page=100") suspend fun getOrgRepos( @Path("org") org: String ): Response<List<Repo>> @GET("repos/{owner}/{repo}/contributors?per_page=100") suspend fun getRepoContributors( @Path("owner") owner: String, @Path("repo") repo: String ): Response<List<User>> }
Kotlin
복사
susepnd 함수를 사용하여 동일한 로직을 구현할 수 있다.
suspend 함수를 사용하여 요청을 수행할 때 기본적으로 스레드는 차단되지 않는다.
Call<List<Repo>> 를 반환하는 대신 API 호출을 suspend 함수로 정의하여 결과를 직접 반환한다.
만약 결과가 실패하면 예외를 발생시키므로 직접 결과를 반환하는 대신 Response로 감싸진 결과를 반환할 수 있다.
이 경우 결과 본문이 제공되며 수동으로 오류를 확인할 수 있다.

코루틴

suspend 함수가 포함된 코드는 blocking 요청과 유사한 것처럼 보인다.
주요 차이점은 스레드를 차단하는 대신 코루틴이 일시 중지된다는 점이다.
block -> suspend thread -> coroutine
Kotlin
복사
코루틴은 코드를 스레드에서 실행하는 것처럼 코루틴에서도 코드를 실행할 수 있기 때문에 종종 경량 스레드라고 불린다.
이전에 blocking되었던 작업들은 이제 코루틴을 중단시키는 방식으로 동작한다.
launch { val users = loadContributorsSuspend(req) updateResults(users, startTime) }
Kotlin
복사
launch는 람다를 인수로 받는 라이브러리 함수이고 loadContributorsSuspend() 함수는 launch 안에서 호출된다.
여기서 launch는 데이터를 로드하고 결과를 표시하는 새로운 계산을 시작한다.
이 계산은 일시 중지할 수 있으며 네트워크 요청을 수행할 때 일시 중지되고 기반 스레드를 release한다.
네트워크 요청이 결과를 반환하면 계산이 다시 재개된다.
이런 방식으로 일시 중지할 수 있는 계산을 코루틴이라고 부른다.
코루틴은 스레드 위에서 실행되며 일시 중지될 수 있다.
코루틴이 일시 중지되면 해당 계산이 중지되고 스레드에서 제거된 뒤 메모리에 저장된다.
그 동안 스레드는 다른 작업에 사용될 수 있다.
일시 중지된 코루틴 계산을 계속할 준비가 되면 해당 계산은 스레드로 돌아오지만 반드시 동일한 스레드는 아니다.
loadContributorsSuspend() 에서 각 기여자 요청은 이제 서스펜션 메커니즘을 사용하여 결과를 기다린다.
먼저 새로운 요청이 전송된 후 다음 응답을 기다리는 동안 launch 함수가 시작한 전체 기여자 로드 코루틴이 일시 중지된다.
응답이 수신된 후에 코루틴이 다시 재개되고 요청 일시 중지 응답을 기다리는 동안 스레드는 다른 작업에 사용될 수 있다.
모든 요청이 UI 스레드에서 이루어지고 있음에도 불구하고 UI는 반응성을 유지한다.
2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors 3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors ... 11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors
Kotlin
복사
SUSPEND 옵션을 사용해 프로그램을 실행하면 모든 요청이 UI 스레드에서 처리되었다는 것을 로그에서 확인할 수 있다.
이제 스레드 이름에 코루틴 이름이 함께 표시된다.
suspend 함수는 스레드를 공정하게 다루고 기다리는 것 때문에 스레드를 차단하지 않는다.
하지만 아직까지는 이 방식이 동시성을 제공하는 것은 아니다.

동시성

코루틴은 스레드보다 자원을 훨씬 덜 소모한다.
비동기적으로 새로운 작업을 시작할 때마다 새로운 코루틴을 만들 수 있다.
새로운 코루틴을 시작하려면 launch, async, runBlocking과 같은 주요 코루틴 빌더를 사용할 수 있다.
다른 라이브러리에서는 추가적인 코루틴 빌더를 정의할 수도 있다.

async

async는 새로운 코루틴을 시작하고 launch의 Job을 확장한 제네릭 타입인 Deferred<Type> 객체를 반환한다.
이 때 Type은 람다식 내부의 마지막 표현식에 따라 결정된다.
Deferred는 다른 언어에서 FuturePromise로 알려진 개념을 나타낸다.
이는 어떤 작업의 계산 결과를 가지고 있지만 최종 결과를 나중에 얻을 수 있다는 점에서 그 결과를 약속하는 역할을 한다.
코루틴 결과를 얻으려면 Deferred 인스턴스에서 await()을 호출할 수 있고 이 await()을 호출하는 코루틴은 결과를 기다리는 동안 중단된다.
Deffered 객체 목록이 있는 경우 awaitAll()을 호출하여 모든 결과를 기다릴 수 있다.

async와 launch의 주요 차이점

launch는 특정한 결과를 반환할 필요가 없는 작업을 시작할 때 사용된다는 점이다.
launch는 해당 코루틴을 나타내는 Job을 반환하며 Job.join()을 호출해 완료될 때까지 기다릴 수 있다.

runBlocking

import kotlinx.coroutines.* fun main() = runBlocking { val deferred: Deferred<Int> = async { loadData() } println("waiting...") println(deferred.await()) } suspend fun loadData(): Int { println("loading...") delay(1000L) println("loaded!") return 42 }
Kotlin
복사
runBlocking은 일반 함수와 일시 중단 함수 또는 블로킹과 논블로킹 세계 사이의 다리 역할을 한다.
최상위 코루틴을 시작하는 어댑터로 사용되며 주로 main() 함수와 테스트에서 사용된다.

동시 실행 코루틴

import kotlinx.coroutines.* fun main() = runBlocking { val deferreds: List<Deferred<Int>> = (1..3).map { async { delay(1000L * it) println("Loading $it") it } } val sum = deferreds.awaitAll().sum() println("$sum") }
Kotlin
복사
각 기여자 요청이 새로운 코루틴에서 시작되면 모든 요청은 비동기적으로 시작된다.
이전 요청의 결과를 받기 전에 새로운 요청을 보낼 수 있다.
전체 로딩 시간은 CALLBACK 버전과 거의 동일하지만 콜백이 필요 없다.
또한 async는 코드에서 어떤 부분이 동시에 실행되는지를 명시적으로 나타낸다.

구조적 동시성

코루틴 스코프는 다양한 코루틴 간의 구조부모-자식 관계를 담당한다.
일반적으로 새로운 코루틴은 스코프 내에서 시작해야 한다.
코루틴 컨텍스트코루틴 실행에 필요한 추가 정보를 저장하는데 예를 들어 코루틴의 사용자 지정 이름이나 코루틴이 스케줄될 스레드를 지정하는 디스패처와 같은 정보가 포함된다.

함수를 사용해서 CoroutineScope 생성

launch { /* this: CoroutineScope */ }
Kotlin
복사
launch, async, runBlocking 같은 함수가 새로운 코루틴을 시작할 때 자동으로 해당하는 스코프를 생성한다.
이 함수들은 모두 리시버가 있는 람다를 인수로 받으며 이때 리시버는 CoroutineScope 타입이다.
launch와 async는 CoroutineScope의 확장 함수로 선언되어 있으므로 호출할 때 암시적 또는 명시적 리시버를 항상 전달해야 한다.
runBlocking으로 시작된 코루틴은 유일한 예외로 runBlocking은 최상위 함수로 정의되어 있지만 이 함수는 현재 스레드를 차단하므로 주로 main() 함수나 테스트에서 브리지 함수로 사용된다.
runBlocking 내부에서 launch를 호출할 때는 암시적 리시버 타입인 CoroutineScope의 확장 함수로 호출된다. (명시적으로 this.launch라고 쓸 수도 있다.)
import kotlinx.coroutines.* fun main() = runBlocking { /* this: CoroutineScope */ launch { /* ... */ } // 동일한 표현: this.launch { /* ... */ } }
Kotlin
복사
이 예시에서 launch에 의해 시작된 중첩된 코루틴은 runBlocking에 의해 시작된 외부 코루틴의 자식이라고 볼 수 있다.
이 부모-자식 관계는 스코프를 통해 작동하며 자식 코루틴은 부모 코루틴에 해당하는 스코프에서 시작된다.

함수를 사용하지 않고 CoroutineScope 생성

새로운 스코프를 생성하면서 새로운 코루틴을 시작하지 않고도 CoroutineScope 함수를 사용할 수 있다.
외부 스코프에 접근할 수 없는 suspend 함수 내에서 구조적으로 새로운 코루틴을 시작하려면 새로운 코루틴 스코프를 생성할 수 있으며 이 스코프는 자동으로 호출된 외부 스코프의 자식이 된다.
GlobalScope.async 또는 GlobalScope.launch를 사용하여 글로벌 스코프에서 새로운 코루틴을 시작할 수도 있다.
이 경우 최상위 독립적인 코루틴이 생성된다.

구조적 동시성 메커니즘의 이점

스코프는 자식 코루틴의 수명을 관리하며 자식 코루틴의 수명은 스코프의 수명에 의해 결정된다.
만약 문제가 발생하거나 사용자가 작업을 취소하려 할 때 스코프는 자식 코루틴을 자동으로 취소할 수 있다.
스코프는 자동으로 모든 자식 코루틴이 완료될 때까지 기다린다. 따라서 스코프가 코루틴에 대응하는 경우 부모 코루틴은 그 안에서 시작된 모든 코루틴이 완료될 때까지 완료되지 않는다.

GlobalScope의 단점

GlobalScope.async를 사용할 때는 여러 코루틴을 작은 스코프로 묶는 구조가 없다.
글로벌 스코프에서 시작된 코루틴은 모두 독립적이며 그들의 수명은 어플리케이션 전체의 수명에 의해 제한된다.
글로벌 스코프에서 시작된 코루틴에 대한 참조를 저장하고 완료를 기다리거나 명시적으로 취소할 수 있지만 구조적 동시성처럼 자동으로 취소되지는 않는다.

CoroutineScope와 GlobalScope의 예제

CoroutineScope
suspend fun loadContributorsConcurrent( service: GitHubService, req: RequestData ): List<User> = coroutineScope { // ... async { log("starting loading for ${repo.name}") delay(3000) // 로드 } // ... }
Kotlin
복사
loadContributorsConcurrent() 함수에 3초의 딜레이를 추가한다.
이 딜레이는 요청을 보내기 전에 코루틴을 시작한 후 취소할 수 있는 충분한 시간을 준다.
2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos 2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example /* 'cancel' 클릭 */ /* 요청이 전송되지 않음 */
Kotlin
복사
프로그램을 실행하고 CONCURRENT 옵션을 선택하여 기여자를 로드할 때 모든 요청이 시작된 후 취소 버튼을 클릭하면 모든 요청이 취소된다.
GlobalScope
suspend fun loadContributorsNotCancellable( service: GitHubService, req: RequestData ): List<User> { // ... GlobalScope.async { log("starting loading for ${repo.name}") // 로드 } // ... return deferreds.awaitAll().flatten().aggregate() }
Kotlin
복사
loadContributorsNotCancellable() 함수에서는 coroutineScope 생성을 제거하고 GlobalScope.async를 사용한다.
2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos 2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans ... 2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example /* 'cancel' 클릭 */ /* 그러나 모든 요청이 여전히 전송됨: */ 6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors ... 9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors
Kotlin
복사
프로그램을 실행하고 NOT_CANCELLABLE 옵션을 선택하여 기여자를 로드할 때 취소 버튼을 클릭해도 모든 요청이 여전히 전송된다.

코루틴 취소

interface Contributors { fun loadContributors() { // ... when (getSelectedVariant()) { CONCURRENT -> { launch { val users = loadContributorsConcurrent(service, req) updateResults(users, startTime) }.setUpCancellation() // #1 } } } private fun Job.setUpCancellation() { val loadingJob = this // #2 // 'cancel' 버튼이 클릭되면 로딩 작업 취소: val listener = ActionListener { loadingJob.cancel() // #3 updateLoadingStatus(CANCELED) } // 'cancel' 버튼에 리스너 추가: addCancelListener(listener) // 로딩 작업 완료 후 상태를 업데이트하고 리스너 제거 } }
Kotlin
복사
Cancel 버튼이 클릭되면 로딩 코루틴이 명시적으로 취소되며 자식 코루틴도 자동으로 취소된다.
launch 함수는 Job 인스턴스를 반환하고 이 인스턴스는 데이터를 불러오고 업데이트하는 로딩 코루틴의 참조를 가진다.
Job 인스턴스를 리시버로 전달하여 setUpCancellation() 확장 함수를 호출할 수 있다.
val job = launch { } job.setUpCancellation()
Kotlin
복사
또한 가독성을 위해 위처럼 명시적으로 함수 내부에서 setUpCancellation() 함수의 수신자를 새로운 Job 변수로 참조할 수 있다.

외부 스코프 컨텍스트 사용

주어진 스코프 내에서 새로운 코루틴을 시작할 때 동일한 컨텍스트에서 실행되도록 보장하기가 더 쉽다.
또한 필요에 따라 컨텍스트를 쉽게 교체할 수 있다.
launch(Dispatchers.Default) { // 외부 스코프 val users = loadContributorsConcurrent(service, req) // ... }
Kotlin
복사
CoroutineScope 또는 코루틴 빌더에 의해 생성된 새로운 스코프는 항상 외부 스코프의 컨텍스트를 상속 받는다.
이 경우 외부 스코프는 loadContributorsConcurrent() 함수가 호출된 스코프이다.
suspend fun loadContributorsConcurrent( service: GitHubService, req: RequestData ): List<User> = coroutineScope { // 이 스코프는 외부 스코프의 컨텍스트를 상속받습니다. // ... async { // 상속된 컨텍스트로 시작된 중첩 코루틴 // ... } // ... }
Kotlin
복사
모든 중첩된 코루틴은 자동으로 상속된 컨텍스트로 시작된다.
디스패처는 이 컨텍스트의 일부이다.
그렇기 때문에 async로 시작된 모든 코루틴은 기본 디스패처의 컨텍스트로 시작된다.
구조적 동시성을 사용하면 최상위 코루틴을 생성할 때 주요 컨텍스트 요소(디스패처 등)를 한 번 지정할 수 있으며 중첩된 코루틴은 필요할 때만 이를 수정하여 컨텍스트를 상속받는다.
일반적으로 최상위 코루틴에 대해 CoroutineDispatchers.Main 을 기본적으로 사용하고 다른 스레드에서 코드를 실행해야 할 때 명시적으로 다른 디스패처를 지정하는 것이 일반적인 관행이다.

진행 상황 표시

suspend fun loadContributorsProgress( service: GitHubService, req: RequestData, updateResults: suspend (List<User>, completed: Boolean) -> Unit ) { // 데이터 로드 // 중간 상태에서 `updateResults()` 호출 }
Kotlin
복사
UI를 업데이트하는 로직을 콜백으로 전달해야 한다.
이 콜백은 각 중간 상태에서 호출된다.
호출 시점에 콜백을 전달하여 메인 스레드에서 PROGRESS 옵션에 따라 결과를 업데이트하도록 한다.
launch(Dispatchers.Default) { loadContributorsProgress(service, req) { users, completed -> withContext(Dispatchers.Main) { updateResults(users, startTime, completed) } } }
Kotlin
복사
loadContributorsProgress() 함수에서는 updateResults() 파라미터가 suspend로 선언되어 있다.
이 때문에 해당 람다 인자 안에서 suspend 함수인 withContext를 호출해야 한다.
updateResults() 콜백은 로딩이 완료되었고 결과가 최종적인지 여부를 지정하는 Boolean 파라미터를 추가 인자로 받는다.

연속성 vs 동시성

updateResults() 콜백은 각 요청이 완료된 후 호출된다.
이 코드에는 동시성이 포함되지 않는다.
순차적이므로 동기화가 필요하지 않다.
가장 좋은 선택은 동시에 요청을 보내고 각 저장소에 대한 응답을 받은 후 중간 결과를 업데이트하는 것이다.
동시성을 추가하려면 channels를 사용하라.

채널

공유된 가변 상태를 사용하는 코드 작성은 매우 어렵고 오류 발생 가능성이 크다. (콜백처럼)
더 간단한 방법은 공통 가변 상태를 사용하지 않고 통신을 통해 정보를 공유하는 것이다.
코루틴은 채널을 통해 서로 통신할 수 있다.
채널은 코루틴 사이에서 데이터를 전달하는 기본 통신 수단이다.
하나의 코루틴이 채널에 정보를 보내고 다른 코루틴이 그 정보를 받을 수 있다.
정보를 보내는(생산하는) 코루틴을 보통 생산자(producer)라고 부르며 정보를 받는(소비하는) 코루틴을 소비자(consumer)라고 부른다.
하나 이상의 코루틴이 동일한 채널에 정보를 보낼 수 있으며 하나 이상의 코루틴이 동일한 채널에서 데이터를 받을 수 있다.

여러 코루틴과 채널 사용

여러 코루틴이 동일한 채널에서 정보를 받을 때 각 요소는 한 번만 소비되고 처리된 후에는 채널에서 제거된다.
채널은 요소들이 한쪽 끝에 추가되고 다른 한쪽 끝에서 소비되는 큐(queue)와 비슷하다.
그러나 중요한 차이점은 채널의 send() 및 receive() 연산은 일시 중단될 수 있다는 것이다.
이는 채널이 비어 있거나 가득 차 있을 때 발생한다.
채널이 가득 차면 채널의 크기에 상한이 있는 경우 send() 연산이 일시 중지될 수 있다.

Channel 인터페이스

interface SendChannel<in E> { suspend fun send(element: E) fun close(): Boolean } interface ReceiveChannel<out E> { suspend fun receive(): E } interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
Kotlin
복사
Channel은 SendChannel, ReceiveChannel 그리고 Channel 세 가지 인터페이스로 표현된다.
Channel은 앞의 두 인터페이스를 확장한 형태이다.
보통 SendChannel 인스턴스를 생산자에게 전달하여 생산자만 채널에 정보를 보낼 수 있게 하고 ReceiveChannel 인스턴스를 소비자에게 전달하여 소비자만 채널에서 정보를 받을 수 있게 한다.
send와 receive 메서드는 모두 suspend 함수로 선언되어 있다.
생산자는 더 이상 보낼 요소가 없을 때 채널을 닫을 수 있다.

Channel 종류

내부적으로 얼마나 많은 요소를 저장할 수 있는지와 send() 호출이 일시 중단될 수 있는지 여부에 따라 다르다. 모든 채널 유형에서 receive() 호출은 비슷하게 작동하는데 채널이 비어 있지 않으면 요소를 받고 그렇지 않으면 일시 중단된다.
무제한 채널
큐와 가장 유사한 구조이다.
생산자는 이 채널에 무제한으로 요소를 보낼 수 있으며 메모리가 부족해질 때까지 채널은 계속해서 확장된다.
send() 호출은 절대 일시 중단되지 않으며 메모리가 부족해지면 OutOfMemoryException이 발생할 수 있다.
소비자가 비어 있는 채널에서 요소를 받으려 하면 일시 중단된다.
버퍼링된 채널
채널의 크기는 지정된 수치로 제한된다.
생산자는 이 채널에 지정된 크기까지 요소를 보낼 수 있으며 그 이후에는 send() 호출이 일시 중단된다.
내부적으로 모든 요소가 저장된다.
랑데부 채널
버퍼가 없는 채널이며 버퍼 크기가 0인 버퍼링된 채널과 동일하다.
하나의 함수(send() 또는 receive())는 다른 함수가 호출될 때까지 항상 일시 중단된다.
send() 함수가 호출되고 receive() 함수가 준비되지 않은 경우 send()는 일시 중단된다.
반대로 receive() 함수가 호출되고 채널이 비어 있으면 receive() 호출도 일시 중단된다.
합성 채널
새로 보낸 요소가 이전 요소를 덮어씌운다.
따라서 수신자는 항상 가장 최근에 보낸 요소만 받는다.
send() 호출은 절대 일시 중단되지 않는다.

Channel 코드 예제

val rendezvousChannel = Channel<String>() val bufferedChannel = Channel<String>(10) val conflatedChannel = Channel<String>(CONFLATED) val unlimitedChannel = Channel<String>(UNLIMITED)
Kotlin
복사
기본적으로 랑데부 채널이 생성된다.
suspend fun loadContributorsChannels( service: GitHubService, req: RequestData, updateResults: suspend (List<User>, completed: Boolean) -> Unit ) = coroutineScope { val repos = service .getOrgRepos(req.org) .also { logRepos(req, it) } .bodyList() val channel = Channel<List<User>>() for (repo in repos) { launch { val users = service.getRepoContributors(req.org, repo.name) .also { logUsers(repo, it) } .bodyList() channel.send(users) } } var allUsers = emptyList<User>() repeat(repos.size) { val users = channel.receive() allUsers = (allUsers + users).aggregate() updateResults(allUsers, it == repos.lastIndex) } }
Kotlin
복사
모든 기여자 목록의 중간 상태를 저장할 allUsers 변수를 만들고 채널에서 수신한 각 새로운 목록을 추가한다.
결과를 집계한 후 updateResults() 콜백을 사용하여 상태를 업데이트한다.
데이터를 받지 못했을 때 receive() 호출이 일시 중단되고 이 경우 기여자 로딩 코루틴 전체가 일시 중단된다.

코루틴 테스트

실행 시간을 비교하는 코루틴 테스트의 두 가지 문제점이 있다.
1.
테스트가 너무 오래걸린다.
2.
실행되는 정확한 시간을 신뢰할 수 없다.
테스트 지연이 예상대로 동작하는지 확인하기 위해 가상 시간을 사용하는 특별한 테스트 디스패처를 사용하라.
이 디스패처는 시작 시점부터 경과한 가상 시간을 추적하며 코루틴을 즉시 실행한다.
이 디스패처에서 코루틴을 실행하면 지연이 즉시 반환되고 가상 시간이 진행된다.
@Test fun testDelayInSuspend() = runTest { val realStartTime = System.currentTimeMillis() val virtualStartTime = currentTime foo() println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms println("${currentTime - virtualStartTime} ms") // 1000 ms } suspend fun foo() { delay(1000) // 자동으로 지연 없이 진행 println("foo") // foo() 호출 시 즉시 실행 }
Kotlin
복사
테스트에서 이 메커니즘을 사용하려면 runBlocking 호출을 runTest로 교체하라.
runTest는 TestScope에 대한 확장 람다를 인수로 받는다.
이 특별한 스코프 내에서 suspend 함수 안에서 delay를 호출하면 지연이 실제 시간을 지연시키는 대신 가상 시간을 증가시킨다.
현재 가상 시간을 TestScope의 currentTime 속성을 사용하여 확인할 수 있다.
@Test fun testDelayInLaunch() = runTest { val realStartTime = System.currentTimeMillis() val virtualStartTime = currentTime bar() println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms println("${currentTime - virtualStartTime} ms") // 1000 ms } suspend fun bar() = coroutineScope { launch { delay(1000) // 자동으로 지연 없이 진행 println("bar") // bar() 호출 시 즉시 실행 } }
Kotlin
복사
자식 코루틴에서 가상 지연 효과를 제대로 보려면 모든 자식 코루틴을 TestDispatcher로 시작하라.
이 디스패처는 다른 TestScope에서 자동으로 상속된다.