Search

Channels

지연 값(Deferred values)은 코루틴 간에 단일 값을 전송하는 편리한 방법을 제공한다.
Channel은 값의 스트림을 전송하는 방법을 제공한다.

Channel 기본

val channel = Channel<Int>() launch { // 이 부분은 CPU를 많이 소모하는 계산 또는 비동기 논리가 될 수 있습니다. // 우리는 다섯 개의 제곱을 전송할 것입니다. for (x in 1..5) channel.send(x * x) } // 여기서 다섯 개의 정수를 수신하여 출력합니다: repeat(5) { println(channel.receive()) } println("Done!") // 출력: // 1 // 4 // 9 // 16 // 25 // Done!
Kotlin
복사
Channel은 개념적으로 BlockingQueue와 매우 유사하다.
한 가지 주요 차이점은 차단되는 put 작업 대신 서스펜딩 send가 있으며 차단되는 task 작업 대신 서스펜딩 receive가 있다는 것이다.

Channel 닫기 및 반복

val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() // 전송이 완료되었습니다. } // 여기서 `for` 루프를 사용하여 수신된 값을 출력합니다 (채널이 닫힐 때까지) for (y in channel) println(y) println("Done!") // 출력: // 1 // 4 // 9 // 16 // 25 // Done!
Kotlin
복사
Queue와 달리 Channel은 더 이상 요소가 오지 않음을 나타내기 위해 닫을 수 있다.
수신측에서는 채널에서 요소를 수신하기 위해 일반적인 for 루프를 사용하는 것이 편리하다.
개념적으로 닫기는 채널에 특별한 종료 토큰을 보내는 것과 같다.
이 종료 토큰이 수신되면 반복이 멈추므로 종료 이전에 보내진 모든 요소가 수신된다는 보장이 있다.

Channel 공급자 생성

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { for (x in 1..5) send(x * x) } fun main() = runBlocking { val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") } // 출력: // 1 // 4 // 9 // 16 // 25 // Done!
Kotlin
복사
코루틴이 일련의 요소를 생성하는 패턴은 매우 흔하다.
이는 동시성 코드에서 자주 사용되는 프로듀서-컨슈머 패턴의 일부이다.
채널을 매개변수로 받는 함수로 이러한 프로듀서를 추상화할 수 있지만 함수는 결과를 반환하는 것이 일반적인 상식에 맞다.
이 문제를 해결하기 위해 프로듀서 측에서 이를 쉽게 처리할 수 있도록 도와주는 produce라는 편리한 코루틴 빌더가 있다.
또한 소비자 측에서는 for 루프 대신 사용할 수 있는 consumeEach 확장 함수가 있다.

파이프라인

fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // 1부터 시작하는 무한한 정수 스트림 }
Kotlin
복사
파이프라인은 한 코루틴이 무한할 수 있는 값의 스트림을 생성하는 패턴이다.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) }
Kotlin
복사
그리고 다른 코루틴이나 여러 코루틴이 그 스트림을 소비하면서 처리한 후에 다른 결과를 생성한다.
val numbers = produceNumbers() // 1부터 시작하는 정수 생성 val squares = square(numbers) // 정수들을 제곱 repeat(5) { println(squares.receive()) // 처음 다섯 개 출력 } println("Done!") // 작업 완료 coroutineContext.cancelChildren() // 자식 코루틴 취소
Kotlin
복사
메인 코드는 전체 파이프라인을 시작하고 연결한다.
모든 코루틴을 생성하는 함수들은 CoroutineScope의 확장으로 정의되며 이를 통해 구조적 동시성을 활용하여 전역 코루틴이 어플리케이션에 남아있지 않도록 보장할 수 있다.

파이프라인을 이용한 소수 찾기

파이프라인을 활용하여 소수를 생성하는 예제를 통해 코루틴을 이용한 파이프라인을 극한으로 사용해보자.
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) // 시작 숫자부터 시작하는 무한한 정수 스트림 }
Kotlin
복사
먼저 무한한 숫자 시퀀스를 생성한다.
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) }
Kotlin
복사
다음 파이프라인 단계는 입력된 숫자 스트림에서 주어진 소수로 나누어진 숫자를 모두 제거하는 필터링 작업을 한다.
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
Kotlin
복사
이제 파이프라인을 구성하여 2부터 시작하는 숫자 스트림을 만들고 현재 채널에서 소수 하나를 가져와 각 소수에 대해 새로운 파이프라인 단계를 실행한다.
var cur = numbersFrom(2) repeat(10) { val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() // 메인 종료를 위해 모든 자식 코루틴 취소 // 출력: // 2 // 3 // 5 // 7 // 11 // 13 // 17 // 19 // 23 // 29
Kotlin
복사
첫 10개의 소수를 출력한다.
이 코드는 메인 스레드에서 전체 파이프라인을 실행한다.
모든 코루틴이 runBlocking 코루틴 범위 내에서 시작되므로 시작한 모든 코루틴의 목록을 명시적으로 유지할 필요는 없다.
첫 10개 소수를 출력한 후 cancelChildren 확장 함수를 사용하여 모든 자식 코루틴을 취소한다.

Channel을 사용한 파이프라인의 이점

이 파이프라인을 표준 라이브러리의 iterator 코루틴 빌더를 사용하여 동일하게 만들 수 있다.
produce → iterator, send → yield, receive → next, ReceiveChannel → Iterator로 바꾸고 코루틴 Scope를 없애면 된다. (runBlocking도 필요없다.)
하지만 Channel을 사용하는 파이프라인의 이점은 Dispatchers.Default 컨텍스트에서 실행하면 여러 CPU 코어를 사용할 수 있다는 점이다.

실용적인 관점

어쨌든 이 방법은 소수를 찾는데 있어 매우 비실용적인 방법이다.
실제로는 파이프라인에서 원격 서비스로의 비동기 호출과 같은 다른 서스펜딩 작업이 포함되며 이러한 파이프라인은 시퀀스나 이터레이터로 구성할 수 없다.
왜냐하면 시퀀스/이터레이터는 임의의 서스펜딩 작업을 허용하지 않기 때문이다.
반면 produce는 완전한 비동기 방식으로 작동한다.

Fan-out 패턴

여러 코루틴이 동일한 채널에서 값을 받아 작업을 분배할 수 있다.
fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 // 1부터 시작 while (true) { send(x++) // 다음 숫자 전송 delay(100) // 0.1초 대기 } }
Kotlin
복사
먼저 주기적으로 정수를 생성하는 생산자 코루틴을 작성한다.
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } }
Kotlin
복사
그 다음 여러 프로세서 코루틴을 만들 수 있다.
이 예에서는 각 프로세서가 자신의 id와 받은 숫자를 출력한다.
val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // 생산자 코루틴을 취소하여 모두 종료 // 출력: // Processor #2 received 1 // Processor #4 received 2 // Processor #0 received 3 // Processor #1 received 4 // Processor #3 received 5 // Processor #2 received 6 // Processor #4 received 7 // Processor #0 received 8 // Processor #1 received 9 // Processor #3 received 10
Kotlin
복사
이제 다섯 개의 프로세서를 실행하고 거의 1초 동안 작동하도록 한다.
생산자 코루틴을 취소하면 해당 채널이 닫히고 결국 프로세서 코루틴이 채널에서 반복 처리하는 것이 종료된다.
또한 launchProcessor 코드에서 채널을 반복 처리할 때 for 루프를 사용하는 것에 주목하라.
consumeEach와 달리 이 for 루프 패턴은 여러 코루틴에서 안전하게 사용할 수 있다.
만약 프로세서 코루틴 중 하나가 실패하더라도 다른 코루틴은 여전히 채널을 처리할 수 있지만 consumeEach로 작성된 프로세서는 정상적으로 완료되거나 비정상적으로 완료될 때 항상 기본 채널을 취소한다.

Fan-in 패턴

여러 코루틴이 동일한 채널에 보낼 수 있다.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } }
Kotlin
복사
문자열 채널을 만들고 지정된 문자열을 지정된 지연 시간으로 이 채널에 반복적으로 보내는 일시 중지 함수가 있다.
val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // 처음 6개 수신 println(channel.receive()) } coroutineContext.cancelChildren() // 모든 자식을 취소하여 메인이 끝나도록 함 // 출력: // foo // foo // BAR! // foo // foo // BAR!
Kotlin
복사
이제 문자열을 보내는 몇 개의 코루틴을 메인 스레드의 컨텍스트에서 메인 코루틴의 자식으로 실행한다.
여러 코루틴이 동일한 채널에 문자열을 보내고 있으며 수신 측에서는 채널에서 메시지를 수신하여 출력한다.
이로 인해 서로 다른 시간 간격으로 전송된 문자열이 혼합되어 출력된다.

버퍼링된 Channel

지금까지 보여준 Channel은 버퍼가 없었다.
버퍼가 없는 Channel은 송신자와 수신자가 만날 때 요소를 전송한다.
먼저 send가 호출되면 수신이 호출될 때까지 중단되고 먼저 receive가 호출되면 송신이 호출될 때까지 중단된다.
val channel = Channel<Int>(4) // 버퍼링된 채널 생성 val sender = launch { // 송신자 코루틴 시작 repeat(10) { println("Sending $it") // 각 요소를 보내기 전에 출력 channel.send(it) // 버퍼가 가득 차면 중단됨 } } // 아무것도 수신하지 않고 그냥 기다림... delay(1000) sender.cancel() // 송신자 코루틴 취소 // 출력: // Sending 0 // Sending 1 // Sending 2 // Sending 3 // Sending 4
Kotlin
복사
Channel() 팩토리 함수와 produce 빌더는 버퍼 크기를 지정하는 선택적 용량 매개변수를 받는다.
버퍼를 사용하면 송신자는 버퍼가 가득 차기 전에 여러 요소를 보낼 수 있으며 이는 지정된 용량을 가진 BlockingQueue와 유사하며 버퍼가 가득 차면 블로킹된다.

Channel은 공정하다.

Channel에 대한 송신 및 수신 작업은 여러 코루틴의 호출 순서에 대해 공정하다.
이는 선입선출(FIFO) 방식으로 처리되며 즉 가장 먼저 receive를 호출한 코루틴이 해당 요소를 받는다.
data class Ball(var hits: Int) fun main() = runBlocking { val table = Channel<Ball>() // 공유 테이블 launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // 공을 서비스함 delay(1000) // 1초 대기 coroutineContext.cancelChildren() // 게임 종료, 취소 } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // 공을 루프에서 수신 ball.hits++ println("$name $ball") delay(300) // 잠시 대기 table.send(ball) // 공을 다시 보냄 } } // 출력: // ping Ball(hits=1) // pong Ball(hits=2) // ping Ball(hits=3) // pong Ball(hits=4)
Kotlin
복사
다음 예제에서는 두 개의 코루틴인 ping과 pong이 공유 채널인 table에서 ball 객체를 수신하고 있다.
ping 코루틴이 먼저 시작되므로 첫 번째 공을 수신하게 된다.
ping 코루틴이 공을 다시 테이블에 보내고 즉시 수신을 시작하더라도 공은 pong 코루틴이 수신하게 된다.
이는 pong 코루틴이 이미 공을 기다리고 있었기 때문이다.
채널은 공정성을 유지하지만 사용 중인 실행자(executor)의 특성으로 인해 때때로 불공정해 보이는 실행이 발생할 수 있다.

Ticker Channel

Ticker Channel은 주어진 지연 시간이 지난 후마다 Unit을 생성하는 특별한 만남 채널이다.
독립적으로는 쓸모없어 보일 수 있지만 복잡한 시간 기반 프로듀스 파이프라인 및 윈도우 처리와 같은 시간 의존적 처리를 만드는데 유용한 빌딩 블록이다.
Ticker Channel은 select에서 틱 발생 시 작업을 수행하는데 사용할 수 있다.
fun main() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // 틱커 채널 생성 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("초기 요소는 즉시 사용 가능합니다: $nextElement") // 초기 지연 없음 nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // 이후 모든 요소는 200ms 지연 println("다음 요소는 100ms에 준비되지 않음: $nextElement") nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } println("다음 요소는 200ms에 준비됨: $nextElement") // 대규모 소비 지연 모방 println("소비자가 300ms 동안 일시 중지합니다") delay(300) // 다음 요소는 즉시 사용 가능 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("대규모 소비자 지연 후 다음 요소는 즉시 사용 가능합니다: $nextElement") // `receive` 호출 사이의 일시 중지가 고려되어 다음 요소의 도착이 더 빨라짐 nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } println("소비자 300ms 일시 중지 후 100ms에 다음 요소가 준비됨: $nextElement") tickerChannel.cancel() // 더 이상 요소가 필요하지 않음을 표시 } // 출력: // 초기 요소는 즉시 사용 가능합니다: kotlin.Unit // 다음 요소는 100ms에 준비되지 않음: null // 다음 요소는 200ms에 준비됨: kotlin.Unit // 소비자가 300ms 동안 일시 중지합니다 // 대규모 소비자 지연 후 다음 요소는 즉시 사용 가능합니다: kotlin.Unit // 소비자 300ms 일시 중지 후 100ms에 다음 요소가 준비됨: kotlin.Unit
Kotlin
복사
Ticker Channel을 만들기 위해서는 팩토리 메서드인 ticker를 사용한다.
더 이상 요소가 필요하지 않음을 표시하려면 ReceiveChannel.cancel 메서드를 호출하면 된다.
ticker는 소비자의 일시 중지 가능성을 인식하고 기본적으로 일시 중지가 발생할 경우 다음에 생성된 요소의 지연을 조정하여 생성된 요소의 고정 비율을 유지하려고 한다.
선택적으로 TickerMode.FIXED_DELAY와 같은 모드 매개변수를 지정하여 요소 간의 고정 지연을 유지할 수 있다.