본문 바로가기

Study/Kotlin

Kotlin / 코루틴 처리

반응형

1. 코루틴 동시성 알아보기

코루틴은 코틀린에서 별도 패키지로 동시성 기능을 처리할 수 있도록 다양한 클래스와 함수를 지원한다.

  • 서브루틴(subroutine) : 순수함수. 함수가 호출되면 결과를 반환한다. 순차적으로 처리할 때 사용한다.
  • 코루틴(coroutine) : 함수이지만 상태를 관리하고 일시 중단했다가 다시 시작할 수 있는 일시중단 함수로 구성한다.

코루틴을 구성하는 규칙

코루틴은 코루틴이 실행되는 별도의 영역인 코루틴 스코프를 구성해야 한다.

이 코루틴 스코프에서 코루틴 빌더 함수와 일시중단 함수를 사용한다.

 

코루틴 기본 구성

1. 코루틴 패키지의 구성 요소

  • 코루틴 스코프: GlobalScope, CoroutineScope는 코루틴 스코프를 구성해서 내부에 코루틴 빌더 함수로 코루틴 계층을 구성하고 내부에 일시중단 함수등을 사용해서 코루틴을 처리하는 영역
  • 코루틴 빌더 함수 : launch, runBlocking 등은 코루틴 스코프를 생성하는 함수이거나 코루틴을 처리하는 함수
  • 일반 중단 함수 : delay 함수는 코루틴 내부의 특정 기능을 하는 함수
  • 코루틴 결과 처리 : Job과 Deferred(Job 상속해서 구현)는 코루틴의 결과를 반환받거나 코루틴 처리의 중단 등을 처리할 수 있다.

 

2. 코루틴 구성

runBlocking 작동 방식은 스레드와 코루틴의 중간정도의 요건을 처리한다.

코루틴을 스레드에서 간단하게 처리하는 방식을 제공하나, 현재 처리되는 스레드가 종료되면 같이 종료된다.

fun main1() {
    runBlocking { // 현재 사용하는 스레드 블로킹 처리
        println(
            "World! " + Thread.currentThread().name
        )
        println("Hello, " + Thread.currentThread().name) // 코루틴 내부에서 출력
    }

    Thread.sleep(1000) // 함수 일시 중단
    println("Main proccess") // 코루틴 처리 후 실행
}

main1()

// World! main
// Hello, main
// Main proccess

메인 함수는 일반 스레드가 작동되고 그 내부에 runBlocking이 빌더한 경우만 코루틴으로 처리한다.

처리되는 순서를 보면 메인 함수가 1초 지연으로 코루틴이 순서적으로 먼저 처리된 후에 메인 함수가 실행된다.

모든 것이 스레드 환경에서 실행되는 것을 알 수 있다.

 

runBlocking 내에 코루틴 중단 

코루틴 계층구조의 상위인 부모를 종료하면 자식들도 다 같이 종료된다.

이때 코루틴의 결과인 job 객체의 cancel 메서드를 사용한다.

fun main() = runBlocking {
    var ix = 0
    val job = launch {
        repeat(1000) { _ ->
            println("job을 일시 중단 처리 : ${ix++} ...")
            delay(500L)
        }
    }
    delay(1300L) // 일시중단 -> 다른 코루틴 처리
    println("main  다른 코루틴 처리 ")
    job.cancel() // 코루틴 중단 : 전체가 중단됨
    job.join()
    println("main 함수 종료.")
}

// job을 일시 중단 처리 : 0 ...
// job을 일시 중단 처리 : 1 ...
// job을 일시 중단 처리 : 2 ...
// main  다른 코루틴 처리 
// main 함수 종료.

 

부모 코루틴을 runBlocking으로 만들고 자식은 launch로 만들었다.

반환된 job 내의 cancel 메서드로 중단을 처리하면 대기 중인 코루틴이 모두 중단된다.

이때 join을 사용하면 코루틴이 중단 작업이 끝날 때까지 기다린다.

 

3. 주요 코루틴 빌더 함수

코루틴 스코프를 만든 다음에 자식 코루틴을 다양하게 추가할 수 있다. 이때 사용하는 함수가 코루틴 빌더 함수이다.

 

launch 확장함수

CoroutineScope의 확장함수이다. 이 함수를 사용해서 전역 스코프나 코루틴 스코프에 코루틴을 추가한다.

반환값은 Job. 이 Job으로 코루틴을 중단할 수 있다.

fun CoroutineScope.log(msg: String) {
    val name = coroutineContext[CoroutineName]?.name
    println("[$name] $msg") // 코루틴컨텍스트 이름 출력
}

fun main3() {
    GlobalScope.launch(CoroutineName("전역스코프")) {
        log("launch 코루틴 빌더 시작")
        val job = launch {
            delay(500) // 0.5초 대기
            log("코루틴 처리 1")
        }
        job.join() // 전역 코루틴의 자식 코루틴 종료까지 대기
        log("launch 코루틴 빌더 정지") // 최종 종료
    }
    Thread.sleep(1000) // 메인스레드 잠시 대기
    println("메인 스레드 처리 2") // 메인 출력
}

main3()

// [전역스코프] launch 코루틴 빌더 시작
// [전역스코프] 코루틴 처리 1
// [전역스코프] launch 코루틴 빌더 정지
// 메인 스레드 처리 2

GlobalScope 객체에 launch로 코루틴을 빌더한다.

내부에 자식 코루틴을 하나 launch로 빌더하고, 자식 코루틴은 job을 반환받아서 join메서드로 처리를 기다린다.

이를 실행하면 코루틴이 실행된 곳은 log 확장함수에 따라 코루틴 이름부터 출력하는 것을 볼 수 있다.

전체 함수를 코루틴으로 묶지 않아서 main함수가 먼저 종료되지 않도록 sleep으로 코루틴 처리가 끝날 때까지 지연한다.

 

 

async 확장함수 

비동기적으로 처리하는 async 확장함수는 현재 코루틴의 처리 결과를 반환한다.

반환값은 Deferred<T>이다. 이 클래스의 await 메서드를 사용해 반환값을 조회한다.

val syn = GlobalScope.async(CoroutineName("전역스코프")) {
    log("현재 코루틴 1 : " + Thread.currentThread().name)
    async(Dispatchers.Default) { // 비동기 코루틴 빌더
        delay(100) // 일시정지
        log("현재 코루틴 2 : " + Thread.currentThread().name)
    }
    delay(100) // 일시정지
    log("코루틴 종료 " + Thread.currentThread().name) // 부모를 종료처리
}
syn
Thread.sleep(2000)

// [전역스코프] 현재 코루틴1 : DefaultDispatcher-worker-1
// [전역스코프] 현재 코루틴 2 : DefaultDispatcher-worker-1
// [전역스코프] 코루틴 종료 DefaultDispatcher-worker-1

GlobalScope에 async로 코루틴을 이름을 인자로 전달해 빌드한다.

코루틴을 async 빌더 함수에 코루틴 컨텍스트를 지정한 후에 빌더한다.

실행하면 코루틴이 부모부터 자식까지 실행한 결과를 출력한다.

 

withContext 함수

특정 컨텍스트를 지정해 그 내부의 코루틴을 처리할 때 일시중단 함수인 withContext를 사용해서 처리한다.

fun main4() = runBlocking { // 현재 스레드 코루틴 처리
    withContext(Dispatchers.Default){ // 컨텍스트 변경
        println("위드컨텍스트 처리 : "+Thread.currentThread().name)
        delay(100)
    }
    launch{ // 기존 컨텍스트 사용
        println("런치 처리 : "+Thread.currentThread().name)
        delay(100)
    }
    delay(2000)
}
main4()

// 위드컨텍스트 처리 : DefaultDispatcher-worker-1
// 런치 처리 : main

runBlocking으로 부모 코루틴을 만들고 두 개의 코루틴 빌더 중 하나는 withcontext함수로 처리하고 다른 하나는 launch로 처리했다.

코루틴을 실행하면 withContext는 부모와 달리 이 함수가 정한 코루틴 범위로 처리한다.

 

withTimeOut 함수

특정 시간까지 처리가 필요한 코루틴은 withTimeout 함수로 처리한다.

- 코루틴에 타임아웃을 지정해서 람다표현식으로 정의한 것을 특정 시간까지만 처리한다. 

>> 반드시 처리할 것은 finally에서 withContext 함수에 NonCancellable로 지정해서 처리한다.

 

4. 코루틴의 전역 스코프와 코루틴 스코프

Dispatchers

코루틴 컨텍스트의 정보를 디스패처로 지정할 수 있다.

  • Unconfined : 특정 스레드를 확정하지 않는다. 현재 실행되는 스레드를 사용하다가 일시 중단된 후 다시 시작할 때는 다른 스레드에서 실행된다.
  • IO : 차단 입출력을 처리할 때 사용하는 디스패처
  • Default : 코루틴 내부의 기본 처리를 사용하는 것이다.
  • Main : 안드로이드 메인 스레드에서 코루틴을 실행하는 디스패처이다. 이 디스패처는 UI와 상호작용하는 작업을 실행하기 위해서만 사용해야 한다.
 
fun main5(){
    runBlocking {
        launch{
            println("launch : ${Thread.currentThread().name}")
        }
        launch(Dispatchers.Unconfined){
            println("launch(Dispatchers.Unconfined) : ${Thread.currentThread().name}")
        }
//        launch(Dispatchers.Main){ // 안드로이드용
//            println("launch(Dispatchers.Main) :${Thread.currentThread().name}")
//        }
        launch(Dispatchers.IO) {
            println("launch(Dispatchers.IO) :${Thread.currentThread().name}")
        }
        launch(Dispatchers.Default) {
            println("launch(Dispatchers.Default) :${Thread.currentThread().name}")
        }
    }
}

main5()

// launch(Dispatchers.Unconfined) : main
// launch(Dispatchers.IO) :DefaultDispatcher-worker-1
// launch(Dispatchers.Default) :DefaultDispatcher-worker-1
// launch : main

 

 

>> coroutineContext : 코루틴 스코프 내부의 컨텍스트를 확인 할 수 있다.

 

5. 코루틴 사용자의 일시중단 함수

함수를 작성하는 것과 동일하고 suspend 예약어를 fun 예약어 앞에 붙여 일시중단 함수를 정의한다.

suspend fun doSomething(): Int { // 일시중단 함수
    delay(100L) // 일시 정지
    println("일시중단 함수 실행 1")
    return 13
}

fun main6() {
    GlobalScope.launch(Dispatchers.Default) { // 전역 스코프에 코루틴 빌더
        val time = measureTimeMillis { // 처리 시간 계산
            val something = doSomething() // 함수 호출
            println("실행결과 : $something")
        }
        delay(100) // 임시 지연
        println("총 실행시간 : $time ms.") // 함수 실행 결과 출력
    }
    println("~~ 작업 종료 ~~")
}

main6()
Thread.sleep(2000)

// ~~ 작업 종료 ~~
// 일시중단 함수 실행 1
// 실행결과 : 13
// 총 실행시간 : 107 ms.

일사중단 함수를 정의했다. 이 함수는 모든 코드를 처리하고 결과를 반환값으로 반환한다.

 

6. 코루틴 예외처리

프로그램을 실행할 때 정상적인 처리가 아니면 예외가 발생한다.

코루틴도 예외가 필요한데, 예외가 발생하면 전파되어 모든 것을 종료한다.

예외가 발생할 때도 모든 것을 종료하지 않고 필요한 로직이 처리될 수 있어야 한다.

 

예외 전파 차단

  • 슈퍼바이저 스코프(supervisorScope) : 코루틴 스코프를 지정해서 모든 자식에게 슈퍼바이저 잡을 적용할 때 사용
  • 슈퍼바이저 잡(supervisorJob) : 특정 코루틴에 예외를 처리할 때 자식 코루틴에 한정해서 처리하는 방식을 슈퍼바이저 잡이라고 한다. 코루틴 빌더할 때 컨텍스트 내부에 정의한다.

슈퍼바이저가 적용되 자식이 취소되지만 부모는 취소되지 않는다.

동시에 여러 코루틴 작업을 처리할 때 예외가 발생하더라도 반드시 처리가 필요한 부분은 잘 구성할 필요가 있다.

 

코루틴 중단 처리 예외 보기

코루틴의 cancel로 중단을 처리하면 내부적으로 예외가 발생한다.

fun main7() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("코루틴 job 실행 : $i ...")
                delay(500L)
            }
        } catch (e: Exception) {
            println("중단에 따른 예외 : " + e.message)
        }
    }
    delay(1300L)
    println("메인 처리후 자식 종료")
    job.cancel()
    job.join()
    println("메인 종료 .")
}

main7()

// 코루틴 job 실행 : 0 ...
// 코루틴 job 실행 : 1 ...
// 코루틴 job 실행 : 2 ...
// 메인 처리후 자식 종료
// 중단에 따른 예외 : StandaloneCoroutine was cancelled
// 메인 종료 .

 

예외처리 핸들러 사용

자식 코루틴에 예외 핸들러(Exception Handler)를 launch 빌더 함수의 인자로 전달해서 코루틴 컨텍스트에 추가한다.

-> 예외가 발생할 때 부모로 전파되지 않고 예외핸들러에서 처리된다.

 

fun main8() {
    val handler = CoroutineExceptionHandler { // 예외처리 핸들러 작성
            _, exception ->
        println("예외처리 : $exception")
    }
    GlobalScope.launch(handler) {
        launch {
            println("코루틴 실행 1 ")
            delay(500L)
            throw Exception("첫번째 코루틴 내에서 예외 발생")
        }
        launch {
            println("코루틴 실행 2 ")
            delay(500L)
        }
    }
    Thread.sleep(1300L)
    println("메인 처리 후 자식 종료")
    println("메인 종료 .")
}

main8()

// 코루틴 실행 1 
// 코루틴 실행 2 
// 예외처리 : java.lang.Exception: 첫번째 코루틴 내에서 예외 발생
// 메인 처리 후 자식 종료
// 메인 종료 .

 

예외처리가 부모 코루틴에 있어서 다른 자식을 실행하지 않고 부모 코루틴을 종료하면서 모든 자식 코루틴이 종료되는 것을 확인할 수 있다.

 

코루틴 예외처리 : async 코루틴 빌더

예외가 발생하면 내부적으로 예외를 잡아서 반환값으로 전달한다.

이 반환값을 처리하는 await 메서드를 사용하면 예외가 전파되므로 이런 예외 전파를 방지하려면 슈퍼바이저 스코프로 지정해야 한다.

fun main9() = runBlocking {
    try {
        supervisorScope { // 슈퍼바이저 스코프로 에외처리
            val job = async { // 어싱크에서 예외가 발생하면 부모에게 전달
                println("코루틴 실행")
                delay(500L)
                throw Exception("첫번째 코루틴 내에서 예외 발생")
            }
            println("메인 처리 후 자식 종료")
            try {
                job.await()
            } catch (e: Exception) {
                println("예외를 다시 전달")
                println(e.message)
            }
            println("메인 종료 .")
        }
    } catch (e: Exception) {
        println("부모영역까지 예외전달 ")
        println(e.message)
    }
}
main9()

// 메인 처리 후 자식 종료
// 코루틴 실행
// 예외를 다시 전달
// 첫번째 코루틴 내에서 예외 발생
// 메인 종료 .

 

2. 코루틴 정보 전달 알아보기

여러 코루틴을 처리할 때 다른 코루틴의 정보를 활용해 처리할 필요가 있다.

이럴 경우 현재 코루틴에서 다른 코루틴으로 채널(channel)을 사용해서 정보를 전달할 수 있다.

코루틴에 전달하는 정보도 연속적인 데이터일 경우가 있으므로 이럴 경우 스트림 처리처럼 연속된 정보 플로우(flow)로 처리한다.

 

1. 코루틴 채널 처리

채널을 정의할 때 capacity 속성을 지정하면 다양한 채널을 구성할 수 있다.

  • 랑데부 (Channel.RANDEVOUS: Unbuffered)
  • 메시지융합 (Channel.CONFATED)
  • 버퍼 지정 (숫자로 입력)
  • 버퍼 지정 (Channel.UNLIMITED)

랑데부 채널

채널을 버퍼가 없이 만들면 랑데부 채널이 만들어진다.

버퍼가 없으므로 송신되어야 수신 코루틴이 실행된다.

 

버퍼 채널

채널을 만들 때 송신과 수신의 지연을 방지하기 위해 버퍼를 처리한다.

이러면 버퍼 사이즈 만큼 송신이 들어가므로 수신에서 지연 없이 처리할 수 있다.

 

무한 버퍼 채널

채널의 버퍼가 현재 실행되는 환경의 메모리를 충분히 사용한다.

채널을 만들 때 버퍼 크기를 무제한(UNLIMITED)으로 지정한다.

 

중첩 버퍼 채널

채널로 송수신할 때 최신 메시지가 중요하고 기존 메시지를 갱신해도 큰 문제가 없을 경우에 메시지 전달 방식으로 사용한다.

 

프로듀스와 컨슈머 처리

앞에서는 채널과 코루틴을 별도로 만들어서 정보를 주고받았다.

이번에는 정보 송수신을 프로듀스와 컨슈머 기능을 하나로 처리하는 방식을 사용한다.

  • 생산자(produce) : 코루틴을 만들 때 별도의 확장함수를 지정해 생산자 코루틴을 만들어서 사용한다.
  • 소비자(comsumeEach) : 생산자의 메서드를 사용해서 전달된 메시지를 처리한다.

 

2. 코루틴 플로우(flow) 처리

코루틴에서 여러 개의 정보를 연속해서 비동기 처리 하는 것을 플로우(flow)라고 한다.

항상 코루틴 내부에서만 작동한다.

 

플로우 생성과 처리 비교

플로우를 처리할 때 플로우를 생성하고 생성된 것을 요청해서 처리하는 것이 중요하다.

  • 플로우 원소생성 : emit, emitAll
  • 플로우 데이터 요청 : collect, collectIndexed, collectLatest

일반함수에 플로우로 코루틴 처리를 정의한다.

플로우 실행은 코루틴 내부에서 처리하고 변수에 할당한다.

fun foo(): Flow<Int> = flow {
    println("플로우 시작")
    for (i in 1..3) {
        delay(100)
        emit(i) // 데이터 송신
    }
}

fun main10() = runBlocking<Unit> {
    println("플로우 스코프 만듬")
    val flow = foo() // 플로우 함수 실행
    println(flow.javaClass) // 플로우 객체로 반환

    flow.collect() { value -> println(value) } // 플로우 처리
    println("집합 반환 : " + flow.toSet()) // 집합으로 변환

    launch {
        println("코루틴 내부에서 호출")
        flow.collect { value -> println(value) } // 플로우 중단함수
    }
}

main10()

// 플로우 스코프 만듬
// class kotlinx.coroutines.flow.SafeFlow
// 플로우 시작
// 1
// 2
// 3
// 플로우 시작
// 집합 반환 : [1, 2, 3]
// 코루틴 내부에서 호출
// 플로우 시작
// 1
// 2
// 3

 

플로우를 launch 빌더 내에 작성해서 실행하는 것을 확인한다.

코루틴 스코프에서 처리하는 것과 자식 코루틴 내에서 처리하는 결과가 동일한 것을 알 수 있다.

 

플로우 타임아웃 처리

플로우를 특정 시간 내에 처리하도록 지정할 수 있다.

withTimeoutOrNul에 특정 시간을 지정하고 플로우를 처리했다.

플로우도 코루틴이므로 특정 시간이 지나면 타임아웃이 되어 모든 코루틴을 종료한다.

fun foo2(): Flow<Int> = flow { // 플로우 빌더 처리
    for (i in 1..3) {
        delay(100) // 일시 중단
        println("Emitting $i")
        emit(i) // 플로우 값 처리
    }
}

fun main11() = runBlocking<Unit> {
    withTimeoutOrNull(250) {// 250ms 후에 시간 초과
        foo2().collect() { value -> println(value) } // 플로우 중단함수
    }
    println("Done")
}
main11()

// Emitting 1
// 1
// Emitting 2
// 2
// Done

 

시퀀스나 배열을 플로우로 변환

코틀린 컬렉션 내의 클래스를 플로우로 변환해서 처리할 수 있다. -> asFlow

val foo = (1..3).asFlow()

 

두 개의 플로우를 하나로 합치기 : Zip 처리

두 개의 플로우를 하나로 묶어서 맵으로 변환한다.

fun main12() {
    val mapF = mutableMapOf<Int, String>()
    runBlocking<Unit> {
        val nums = (1..3).asFlow()
        val l = listOf("one", "two", "three")
        val strs = flowOf(*l.toTypedArray())
        println("리스트로 변환 : " + strs.toList())

        nums.zip(strs) { a, b -> a to b } // 집을 통해 튜플로 변환
            .collect { mapF.put(it.first, it.second) }
        println("맵으로 변환 : $mapF")
    }
}

// 리스트로 변환 : [one, two, three]
// 맵으로 변환 : {1=one, 2=two, 3=three}

 

플로우 취소

플로우는 별도의 취소를 제공하지 않는다.

 

3. 액터 처리

액터(Actor)는 코루틴에서 특정 상태를 관리하며 수신과 송신을 함께 처리할 수 있어서 동기화된 정보를 관리할 수 있다.

수신과 송신을 별도로 구성한다. -> 액터는  스레드간 동기화를 지원하기 위한 도구로 사용된다.

 

액터 함수로 액터 스코프를 만들어 사용하기

액터로 코루틴을 만들면 액터는 기본으로 데이터를 받을 수 있는 ReceivedChannel을 가진다.

fun main13() = runBlocking { // 런블로킹 스코프 생성
    val actor1 = actor<String>(capacity = 10) { // 액터 빌더
        for (data in channel) { // 액터 내부의 수신된 데이터 출력
            println(data + "Thread : " + Thread.currentThread().name)
        }
    }
    (1..5).forEach {
        actor1.send(it.toString()) // 액터에 데이터 전송
    }
    actor1.close() // 액터 종료
    delay(500L) // 전체 지연
    println(" closed")
}

// 1Thread : main
// 2Thread : main
// 3Thread : main
// 4Thread : main
// 5Thread : main
//  closed

 

액터로 코루틴 스코프를 정의하고, 버퍼 크기를 10으로 지정했다.

액터에 전달된 데이터는 채널로 수신해서 출력된다.

1~5까지 범위를 맏늘고 이숫자를 문자열로 바꿔서 send 메서드로 전송했다.

액터를 모두 사용하면 close로 닫는다.

'Study > Kotlin' 카테고리의 다른 글

Kotlin / 콜라츠 추측  (0) 2024.01.18
Kotlin/ 알고리즘 스터디  (0) 2024.01.14
Kotlin / 파일 입출력과 스레드 처리  (0) 2023.11.05
Kotlin / 제네릭 알아보기  (0) 2023.10.22
Kotlin / 위임(delegation) 알아보기  (0) 2023.09.24