•
지연 값(Deferred values)은 코루틴 간에 단일 값을 전송하는 편리한 방법을 제공한다.
•
Channel은 값의 스트림을 전송하는 방법을 제공한다.
Channel 기본
val channel = Channel<Int>()
launch {
// 이 부분은 CPU를 많이 소모하는 계산 또는 비동기 논리가 될 수 있습니다.
// 우리는 다섯 개의 제곱을 전송할 것입니다.
for (x in 1..5) channel.send(x * x)
}
// 여기서 다섯 개의 정수를 수신하여 출력합니다:
repeat(5) { println(channel.receive()) }
println("Done!")
// 출력:
// 1
// 4
// 9
// 16
// 25
// Done!
Kotlin
복사
•
Channel은 개념적으로 BlockingQueue와 매우 유사하다.
•
한 가지 주요 차이점은 차단되는 put 작업 대신 서스펜딩 send가 있으며 차단되는 task 작업 대신 서스펜딩 receive가 있다는 것이다.
Channel 닫기 및 반복
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // 전송이 완료되었습니다.
}
// 여기서 `for` 루프를 사용하여 수신된 값을 출력합니다 (채널이 닫힐 때까지)
for (y in channel) println(y)
println("Done!")
// 출력:
// 1
// 4
// 9
// 16
// 25
// Done!
Kotlin
복사
•
Queue와 달리 Channel은 더 이상 요소가 오지 않음을 나타내기 위해 닫을 수 있다.
•
수신측에서는 채널에서 요소를 수신하기 위해 일반적인 for 루프를 사용하는 것이 편리하다.
•
개념적으로 닫기는 채널에 특별한 종료 토큰을 보내는 것과 같다.
•
이 종료 토큰이 수신되면 반복이 멈추므로 종료 이전에 보내진 모든 요소가 수신된다는 보장이 있다.
Channel 공급자 생성
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
// 출력:
// 1
// 4
// 9
// 16
// 25
// Done!
Kotlin
복사
•
코루틴이 일련의 요소를 생성하는 패턴은 매우 흔하다.
•
이는 동시성 코드에서 자주 사용되는 프로듀서-컨슈머 패턴의 일부이다.
•
채널을 매개변수로 받는 함수로 이러한 프로듀서를 추상화할 수 있지만 함수는 결과를 반환하는 것이 일반적인 상식에 맞다.
•
이 문제를 해결하기 위해 프로듀서 측에서 이를 쉽게 처리할 수 있도록 도와주는 produce라는 편리한 코루틴 빌더가 있다.
•
또한 소비자 측에서는 for 루프 대신 사용할 수 있는 consumeEach 확장 함수가 있다.
파이프라인
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1부터 시작하는 무한한 정수 스트림
}
Kotlin
복사
•
파이프라인은 한 코루틴이 무한할 수 있는 값의 스트림을 생성하는 패턴이다.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
Kotlin
복사
•
그리고 다른 코루틴이나 여러 코루틴이 그 스트림을 소비하면서 처리한 후에 다른 결과를 생성한다.
val numbers = produceNumbers() // 1부터 시작하는 정수 생성
val squares = square(numbers) // 정수들을 제곱
repeat(5) {
println(squares.receive()) // 처음 다섯 개 출력
}
println("Done!") // 작업 완료
coroutineContext.cancelChildren() // 자식 코루틴 취소
Kotlin
복사
•
메인 코드는 전체 파이프라인을 시작하고 연결한다.
•
모든 코루틴을 생성하는 함수들은 CoroutineScope의 확장으로 정의되며 이를 통해 구조적 동시성을 활용하여 전역 코루틴이 어플리케이션에 남아있지 않도록 보장할 수 있다.
파이프라인을 이용한 소수 찾기
•
파이프라인을 활용하여 소수를 생성하는 예제를 통해 코루틴을 이용한 파이프라인을 극한으로 사용해보자.
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // 시작 숫자부터 시작하는 무한한 정수 스트림
}
Kotlin
복사
•
먼저 무한한 숫자 시퀀스를 생성한다.
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
Kotlin
복사
•
다음 파이프라인 단계는 입력된 숫자 스트림에서 주어진 소수로 나누어진 숫자를 모두 제거하는 필터링 작업을 한다.
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Kotlin
복사
•
이제 파이프라인을 구성하여 2부터 시작하는 숫자 스트림을 만들고 현재 채널에서 소수 하나를 가져와 각 소수에 대해 새로운 파이프라인 단계를 실행한다.
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 메인 종료를 위해 모든 자식 코루틴 취소
// 출력:
// 2
// 3
// 5
// 7
// 11
// 13
// 17
// 19
// 23
// 29
Kotlin
복사
•
첫 10개의 소수를 출력한다.
•
이 코드는 메인 스레드에서 전체 파이프라인을 실행한다.
•
모든 코루틴이 runBlocking 코루틴 범위 내에서 시작되므로 시작한 모든 코루틴의 목록을 명시적으로 유지할 필요는 없다.
•
첫 10개 소수를 출력한 후 cancelChildren 확장 함수를 사용하여 모든 자식 코루틴을 취소한다.
Channel을 사용한 파이프라인의 이점
•
이 파이프라인을 표준 라이브러리의 iterator 코루틴 빌더를 사용하여 동일하게 만들 수 있다.
•
produce → iterator, send → yield, receive → next, ReceiveChannel → Iterator로 바꾸고 코루틴 Scope를 없애면 된다. (runBlocking도 필요없다.)
•
하지만 Channel을 사용하는 파이프라인의 이점은 Dispatchers.Default 컨텍스트에서 실행하면 여러 CPU 코어를 사용할 수 있다는 점이다.
실용적인 관점
•
어쨌든 이 방법은 소수를 찾는데 있어 매우 비실용적인 방법이다.
•
실제로는 파이프라인에서 원격 서비스로의 비동기 호출과 같은 다른 서스펜딩 작업이 포함되며 이러한 파이프라인은 시퀀스나 이터레이터로 구성할 수 없다.
•
왜냐하면 시퀀스/이터레이터는 임의의 서스펜딩 작업을 허용하지 않기 때문이다.
•
반면 produce는 완전한 비동기 방식으로 작동한다.
Fan-out 패턴
•
여러 코루틴이 동일한 채널에서 값을 받아 작업을 분배할 수 있다.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // 1부터 시작
while (true) {
send(x++) // 다음 숫자 전송
delay(100) // 0.1초 대기
}
}
Kotlin
복사
•
먼저 주기적으로 정수를 생성하는 생산자 코루틴을 작성한다.
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
Kotlin
복사
•
그 다음 여러 프로세서 코루틴을 만들 수 있다.
•
이 예에서는 각 프로세서가 자신의 id와 받은 숫자를 출력한다.
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // 생산자 코루틴을 취소하여 모두 종료
// 출력:
// Processor #2 received 1
// Processor #4 received 2
// Processor #0 received 3
// Processor #1 received 4
// Processor #3 received 5
// Processor #2 received 6
// Processor #4 received 7
// Processor #0 received 8
// Processor #1 received 9
// Processor #3 received 10
Kotlin
복사
•
이제 다섯 개의 프로세서를 실행하고 거의 1초 동안 작동하도록 한다.
•
생산자 코루틴을 취소하면 해당 채널이 닫히고 결국 프로세서 코루틴이 채널에서 반복 처리하는 것이 종료된다.
•
또한 launchProcessor 코드에서 채널을 반복 처리할 때 for 루프를 사용하는 것에 주목하라.
•
consumeEach와 달리 이 for 루프 패턴은 여러 코루틴에서 안전하게 사용할 수 있다.
•
만약 프로세서 코루틴 중 하나가 실패하더라도 다른 코루틴은 여전히 채널을 처리할 수 있지만 consumeEach로 작성된 프로세서는 정상적으로 완료되거나 비정상적으로 완료될 때 항상 기본 채널을 취소한다.
Fan-in 패턴
•
여러 코루틴이 동일한 채널에 보낼 수 있다.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
Kotlin
복사
•
문자열 채널을 만들고 지정된 문자열을 지정된 지연 시간으로 이 채널에 반복적으로 보내는 일시 중지 함수가 있다.
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // 처음 6개 수신
println(channel.receive())
}
coroutineContext.cancelChildren() // 모든 자식을 취소하여 메인이 끝나도록 함
// 출력:
// foo
// foo
// BAR!
// foo
// foo
// BAR!
Kotlin
복사
•
이제 문자열을 보내는 몇 개의 코루틴을 메인 스레드의 컨텍스트에서 메인 코루틴의 자식으로 실행한다.
•
여러 코루틴이 동일한 채널에 문자열을 보내고 있으며 수신 측에서는 채널에서 메시지를 수신하여 출력한다.
•
이로 인해 서로 다른 시간 간격으로 전송된 문자열이 혼합되어 출력된다.
버퍼링된 Channel
•
지금까지 보여준 Channel은 버퍼가 없었다.
•
버퍼가 없는 Channel은 송신자와 수신자가 만날 때 요소를 전송한다.
•
먼저 send가 호출되면 수신이 호출될 때까지 중단되고 먼저 receive가 호출되면 송신이 호출될 때까지 중단된다.
val channel = Channel<Int>(4) // 버퍼링된 채널 생성
val sender = launch { // 송신자 코루틴 시작
repeat(10) {
println("Sending $it") // 각 요소를 보내기 전에 출력
channel.send(it) // 버퍼가 가득 차면 중단됨
}
}
// 아무것도 수신하지 않고 그냥 기다림...
delay(1000)
sender.cancel() // 송신자 코루틴 취소
// 출력:
// Sending 0
// Sending 1
// Sending 2
// Sending 3
// Sending 4
Kotlin
복사
•
Channel() 팩토리 함수와 produce 빌더는 버퍼 크기를 지정하는 선택적 용량 매개변수를 받는다.
•
버퍼를 사용하면 송신자는 버퍼가 가득 차기 전에 여러 요소를 보낼 수 있으며 이는 지정된 용량을 가진 BlockingQueue와 유사하며 버퍼가 가득 차면 블로킹된다.
Channel은 공정하다.
•
Channel에 대한 송신 및 수신 작업은 여러 코루틴의 호출 순서에 대해 공정하다.
•
이는 선입선출(FIFO) 방식으로 처리되며 즉 가장 먼저 receive를 호출한 코루틴이 해당 요소를 받는다.
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // 공유 테이블
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // 공을 서비스함
delay(1000) // 1초 대기
coroutineContext.cancelChildren() // 게임 종료, 취소
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // 공을 루프에서 수신
ball.hits++
println("$name $ball")
delay(300) // 잠시 대기
table.send(ball) // 공을 다시 보냄
}
}
// 출력:
// ping Ball(hits=1)
// pong Ball(hits=2)
// ping Ball(hits=3)
// pong Ball(hits=4)
Kotlin
복사
•
다음 예제에서는 두 개의 코루틴인 ping과 pong이 공유 채널인 table에서 ball 객체를 수신하고 있다.
•
ping 코루틴이 먼저 시작되므로 첫 번째 공을 수신하게 된다.
•
ping 코루틴이 공을 다시 테이블에 보내고 즉시 수신을 시작하더라도 공은 pong 코루틴이 수신하게 된다.
•
이는 pong 코루틴이 이미 공을 기다리고 있었기 때문이다.
•
채널은 공정성을 유지하지만 사용 중인 실행자(executor)의 특성으로 인해 때때로 불공정해 보이는 실행이 발생할 수 있다.
Ticker Channel
•
Ticker Channel은 주어진 지연 시간이 지난 후마다 Unit을 생성하는 특별한 만남 채널이다.
•
독립적으로는 쓸모없어 보일 수 있지만 복잡한 시간 기반 프로듀스 파이프라인 및 윈도우 처리와 같은 시간 의존적 처리를 만드는데 유용한 빌딩 블록이다.
•
Ticker Channel은 select에서 틱 발생 시 작업을 수행하는데 사용할 수 있다.
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // 틱커 채널 생성
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("초기 요소는 즉시 사용 가능합니다: $nextElement") // 초기 지연 없음
nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // 이후 모든 요소는 200ms 지연
println("다음 요소는 100ms에 준비되지 않음: $nextElement")
nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
println("다음 요소는 200ms에 준비됨: $nextElement")
// 대규모 소비 지연 모방
println("소비자가 300ms 동안 일시 중지합니다")
delay(300)
// 다음 요소는 즉시 사용 가능
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("대규모 소비자 지연 후 다음 요소는 즉시 사용 가능합니다: $nextElement")
// `receive` 호출 사이의 일시 중지가 고려되어 다음 요소의 도착이 더 빨라짐
nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
println("소비자 300ms 일시 중지 후 100ms에 다음 요소가 준비됨: $nextElement")
tickerChannel.cancel() // 더 이상 요소가 필요하지 않음을 표시
}
// 출력:
// 초기 요소는 즉시 사용 가능합니다: kotlin.Unit
// 다음 요소는 100ms에 준비되지 않음: null
// 다음 요소는 200ms에 준비됨: kotlin.Unit
// 소비자가 300ms 동안 일시 중지합니다
// 대규모 소비자 지연 후 다음 요소는 즉시 사용 가능합니다: kotlin.Unit
// 소비자 300ms 일시 중지 후 100ms에 다음 요소가 준비됨: kotlin.Unit
Kotlin
복사
•
Ticker Channel을 만들기 위해서는 팩토리 메서드인 ticker를 사용한다.
•
더 이상 요소가 필요하지 않음을 표시하려면 ReceiveChannel.cancel 메서드를 호출하면 된다.
•
ticker는 소비자의 일시 중지 가능성을 인식하고 기본적으로 일시 중지가 발생할 경우 다음에 생성된 요소의 지연을 조정하여 생성된 요소의 고정 비율을 유지하려고 한다.
•
선택적으로 TickerMode.FIXED_DELAY와 같은 모드 매개변수를 지정하여 요소 간의 고정 지연을 유지할 수 있다.