【Swift】Swift Concurrencyについて

Swift Concurrencyが登場してから4年が経ちますが知識が曖昧だったので調べたことをまとめてみました。

Swift Concurrencyとは

Swiftバージョン5.5で導入された非同期処理の新しい仕組みです。

① async/awaitを利用した非同期処理の記述

awaitを利用してasync関数を呼び出すことで非同期処理を同期的に書くことができます。従来のクロージャを利用したコールバックでは完了ハンドラの呼び忘れにより、呼び出し元の処理が再開されない可能性がありました。
async/awaitではこうした「考慮不足による処理の停止」や「追いにくさ」が大幅に軽減されます。

② Taskを利用した効率的な並行処理

Taskを使うと複数の非同期処理を効率よく同時進行させることができます。必要に応じて複数スレッドに分散して並列処理が可能です。

③ Actorによるデータ競合防止

マルチスレッドな環境で複数のタスクが同じ変数を同時に変更するとデータの競合が発生します。Actorを利用することでデータへのアクセスを自動的に直列化でき、安全に共有データを扱うことが可能になります。

async/awaitの仕組みについて

asyncは関数に非同期で動作することを宣言するキーワードです。async関数内ではawaitを利用して非同期処理の結果を待つことができます。

func showUserProfile() async throws {
    let user = try await fetchUserData()
    let image = try await fetchUserImage(user.id)
    updateUI(user, image)
}

上記の例ではfetchUserData関数の処理が完了してから、fetchUserImage関数が呼び出される形となります。
同期的に処理をさせているように見えるのでコードが非常に見やすくなります。これだけでもSwift Concurrencyを利用する価値はあると思います。

また、awaitで非同期処理の完了を待っている間はスレッドが別の処理に利用されます。そのため中断していた処理が再開するときは非同期処理を呼び出す前とは別のスレッドで実行される可能性もあります。

MainActorを継承する場合

    @IBAction func connectButtonTapped(_ sender: Any) {
        Task {
            print(pthread_self()) // ①
            try? await connect()
        }
    }
    
    private func connect() async throws {
        try await Task.sleep(nanoseconds: 200_000_000)
        print(pthread_self()) // ②
    }
    
    // ① 0x000000010499c200
    // ② 0x000000010499c200

呼び出し元のActorを継承しない場合

    @IBAction func connectButtonTapped(_ sender: Any) {
        Task.detached {
            print(pthread_self()) // ①
            try? await connect()
        }
    }
    
    private func connect() async throws {
        try await Task.sleep(nanoseconds: 200_000_000)
        print(pthread_self()) // ②
    }
    
    // ① 0x000000016b6af000
    // ② 0x000000010499c200

Taskについて

Taskは非同期で実行される処理のかたまりです。スレッドと違い軽量タスクで効率的に処理を切り替えています。Taskを理解する上では仕組みも理解すると良さそうでした。

Executor(エグゼキューター)

タスクを実際に実行キューに流す仕組み。OSのスレッドに直接マップされるわけではなくSwiftが内部で管理しているもので、タスクをどのスレッドでどんな順序で動かすかを決める役割を持ちます。SerialExecutorやConcurrentExecutorなどがあります。

Taskの実行結果を確認する

Taskは非同期で実行されます。そのためクロージャの処理よりも呼び出し元の処理の方が先に実行されます。いろいろなパターンで挙動を確認したので解説していきます。

Taskを同時に続けて実行する場合

    @IBAction func connectButtonTapped(_ sender: Any) {
        print("A")
        Task {
            print("B")
            try? await self.connect()
            print("C")
        }
        
        Task {
            print("D")
            try? await self.connect()
            print("E")
        }
        print("F")
    }
    
    private func connect() async throws {
        try await Task.sleep(nanoseconds: 200_000_000)
        print(pthread_self())
    }
    
    /*
    A
    F
    B
    D
    0x0000000101094200
    E
    0x0000000101094200
    C
    */

上記のコードではTaskは2つ続けて実行されていますが、非同期なのでAの後はFが先に出力されています。

Taskの再開時にはBが先に出力されたので、その次はCが出力されると思っていたのですがEの方が先に出力されていました。どうやらTaskの再開時には順番は保証されておらず、Exexutorがタスクを再開するタイミングをOSのスケジューラに委ねているみたいです(C → Eにもなる)

また、上記のBを出力する前に重たい処理などがあると2つ目のTaskは実行待ちが発生することになるため注意が必要です。

    @IBAction func connectButtonTapped(_ sender: Any) {
        print("A")
        Task {
            print("B")
            let start = Date()
            var sum = 0
            // 重たいループ処理
            ループ処理
            for i in 0..<500_000_000 {
                sum += i % 10
            }
            let end = Date()
            print("Heavy loop finished: \(end.timeIntervalSince(start)) seconds")
            try? await self.connect()
            print("C")
        }
        
        Task {
            print("D")
            try? await self.connect()
            print("E")
        }
        print("F")
    }
    
    private func connect() async throws {
        try await Task.sleep(nanoseconds: 200_000_000)
        print(pthread_self())
    }
    
    /*
    A
    F
    B
    Heavy loop finished: 34.307685017585754 seconds
    D
    0x00000001010c8200
    E
    0x00000001010c8200
    C
    */

重たい処理が完了するまでは2つ目のTaskが実行されず、Dが出力されるまでに34秒かかっています。これはTaskにアクターのアノテーションがついていないため呼び出し元のスレッド(ここではメインスレッド)で実行されるため、Taskの処理が直列で実行されます。

どちらのTaskもメインスレッド上で実行されるため1つ目のTaskでawaitするまでに時間がかかり、2つ目のTaskにスレッドを明け渡すことができないという挙動になっています。

Task.detachedで同時に続けて実行する場合

    @IBAction func connectButtonTapped(_ sender: Any) { 
        print(pthread_self()) 
        print("A")
        Task.detached {
            print(pthread_self())
            print("B")
            try? await self.connect()
            print("C")
        }
        
        Task.detached {
            print(pthread_self())
            print("D")
            try? await self.connect()
            print("E")
        }
        print("F")
        count1 += 1
        countLabel1.text = "\(count1)"
    }
    
    private func connect() async throws {
        try await Task.sleep(nanoseconds: 200_000_000)
        print(pthread_self())
    }
    
    /*
    0x0000000103298200
    A
    F
    0x000000016cf6b000
    D
    0x000000016d33f000
    B
    0x0000000103298200
    0x0000000103298200
    C
    E
    */

上記を見るとA → Fの出力までは同じです。そしてMainActorであれば先にBから処理されていた箇所が、今度はDから処理されています。また呼び出し元のメインスレッド(0x0000000103298200)とは別のスレッドを用いて実行されていることが分かります。

これらはシステムのグローバルなスレッドプール(Global Executor)で自由に動きます。そのためどちらのタスクが先に動くかということが保証されていないです。

Taskの中でTaskを実行

    private func example() {
        print("A")
        Task {
            print("B")
            
            Task {
                print("C")
            }
            print("D")
        }
        print("E")
    }
    
    /*
    A
    E
    B
    D
    C
    */

こちらはAEBDCの出力が固定されます。Task.detachedにした場合も同様で非同期に実行されるためです。

Taskを変数に格納する場合

Taskを変数に格納する利点としてはキャンセルしたり、結果を待つ参照を持てることが可能となります。

let task = Task<Success, Failure>
task = Task {
    try await Task.sleep(nanoseconds: 1_000_000_000)
    return "完了"
}

Task {
    let value = try await task.value // String
    let result = try await task.result // Result<Success, Failure>
}

Taskを変数にすることで複数のTaskから同じ結果にアクセスすることが可能となります。

Taskのキャンセルについて

Taskをキャンセルするケースとしてはユーザー操作によって処理が不要になる場合や、タイムアウトなどで不要になる場合などが挙げられます。

Actorに紐づくタスクはそのActorが生存している限り有効で、ライフサイクルなどによってActorが破棄されればキャンセルされます。Task.detachedを利用する場合はライフサイクルに依存していないので必要に応じて手動キャンセルする必要があります。

    private func exampleA() {
        let task = Task {
            print("A")
            try await Task.sleep(nanoseconds: 5_000_000_000)
            print("B")
        }
        task.cancel()
    }
    // A

exampleA関数では即座にTaskをキャンセルしたのにAが出力されています。task.cancel()を呼び出した時、内部的にはキャンセルフラグが立っているのですが実際にキャンセルされるのはawaitキーワードに到達してからとなります。

    private func exampleB() {
        let task: Task<Void, Error>
        
            task = Task {
                do {
                    let start = Date()
                    var sum = 0
                    for i in 0..<500_000_000 {
                        try Task.checkCancellation()
                        sum += i % 10
                    }
                    let end = Date()
                    print("Heavy loop finished: \(end.timeIntervalSince(start)) seconds")
                    print("A")
                    try await Task.sleep(nanoseconds: 5_000_000_000)
                    print("B")
                } catch is CancellationError {
                    print("キャンセルされました")
                }
            }
        task.cancel()
        print("C")
    }
    // C

try Task.checkCancellation()を実行することでキャンセルフラグが立っていれば即座にキャンセルさせることが可能です。そのためexampleAと違いAは出力されていません。また、checkCancellationでキャンセルフラグが立っていなければ後続の処理に進みます。

そのほか、ネストされているTaskがある場合に、親Taskがキャンセルされれば子Taskもキャンセルされる仕組みとなっています。

Actorとは

Actorが登場する前のクラスや構造体では、複数のスレッドから同じプロパティを同時に読み書きするとデータ競合が発生する可能性がありました。Actorを活用すれば内部プロパティへのアクセスを1つの直列キューで処理するのでデータ競合を防ぐことが可能となります。

また、別のTaskからActor内のプロパティやメソッドを呼び出す時はawaitをつける必要があります。

    actor Counter {
        var value = 0

        func increment() {
            value += 1
        }
    }
    
    private func example() {
        let counter = Counter()
        
        Task {
           await counter.increment()
        }
    }

@MainActor

メインスレッド上で実行されることを保証するアクターです。クラス、メソッド、クロージャ、プロトコルにつけることができます。

@escapingがついているクロージャはどこから呼ばれるかは非同期次第になるため、@MainActorをつけておくことで、安心してUIの変更処理を渡すことができます。

    private func example(completion: @MainActor @escaping () -> Void)  {
        // UI更新
    }
    // @MainActorは@escapingの前後のどちらに記述してもOK

isolatedとnonisolated

isolatedを利用するとActor内のプロパティやメソッドを呼び出す時にawaitをつけずに直列キュー上で実行されることが保証されます。

    actor Counter {
        var value = 0

        func increment() {
            value += 1
        }
    }
    
    private func example(counter: isolated Counter) {
        counter.increment()
    }

複数の重い処理を並列に走らせる必要がなく、手軽に値を取得して利用したい場合に使えそうです。

nonisolatedはisolatedとは逆で、Actor内のプロパティを取得する際に直列キュー上で実行しなくてもよくなります。こちらもawaitが不要になります。

    actor Counter {
        var value = 0

        func increment() {
            value += 1
        }
        
        nonisolated func sendLog() {
            print("ログ送信")
        }
    }
    
    private func example() {
        let counter = Counter()
        counter.sendLog()
    }

スレッドセーフであることを保証できるメソッドや、計算型プロパティ、イミュータブルな定数などで利用するべきです。また、アクターのプロパティにアクセスしようとするとコンパイルエラーとなります。

Swift6からはnonisolatedなプロパティやメソッドをisolatedなクラスで保持させていたりすると矛盾が生じるためコンパイルエラーになるみたいです。

Sendableとは

Sendalbeプロトコルに準拠した型はデータの競合が起きないことを宣言している型となります。

Int型やString型はSendableに準拠しており、値型であるstructやenumもすべてのプロパティがSendableであれば暗黙的にSendableに準拠します。参照型のclassはデフォルトではSendableでないです。

構造化並行処理(Structured Concurrency)

Taskはスコープを縛られずに生成されるため、呼び出し元の関数のスコープが抜けたとしてもTask自体は生き続けます。構造化並行処理ではTaskの利用でキャンセル忘れなどに対応するためにライフサイクル管理ができるものとなっています。

async letの使い方

async letでは呼び出し元の関数のスコープに縛られることになるため、抜けるときに待機かキャンセルされるのでリークすることがありません。キャンセルする場合についてはキャンセル処理を呼び出すようにしておく必要があります。またTaskの待機場所を任意の箇所で行えるのも利点かなと思います。

    private func example() async {
        async let message1 = fetchMessage()
        async let message2 = fetchMessage()

        // let messages = try await (message1, message2)
        return 
    } // awaitせずにここを抜けるとfetchMessageにキャンセルが飛ぶ
    
    private func fetchMessage() async throws -> String {
        for i in 0..<10_000 {
           try Task.checkCancellation() // ここでキャンセルされていればthrowされる
           print(i)
        }
        
        for i in 0..<10_000 {
          if Task.isCancelled { // 明示的にキャンセル処理も可能
          キャンセル処理も可能
            // キャンセル処理
            return ""
           }
        }
    }

withTaskGroupの使い方

TaskGroupを利用すると複数の子タスクを同時に走らせつつ一括で管理することができます。withTaskGroupを呼び出しているTaskが親タスクとなり、addTaskされたタスクが子タスクとなります。withTaskGroupのスコープを抜けると、未完了の子タスクは自動キャンセルされるため放置されて裏で動き続けることがありません。

await withTaskGroup(of: Int.self) { group in
    for i in 1...5 {
        group.addTask {
            try await someAsyncFunction(i) // 非同期処理
        }
    }
    
    var collected: [String] = []
    for await value in group {
        try Task.checkCancellation()
        collected.append(value)
    }
    return collected
}

for awaitで子タスクをループで取り出しています。完了したタスクから順に結果が流れてくるので、順序に依存せず終わった順に処理したい場合に使えそうです。

おわりに

async awaitやTaskについての仕組みが前よりも理解できるようになりました。他にも色々な機能がありそうなので勉強してから載せたいと思います。