Concurrent Operation in Swift

Swiftのお話です。

さてさて、Swiftで並行処理ってどうやって実装しますか。

例えばバックグラウンドで動作する複数のタスクを並行に走らせて、すべての処理が完了した場合に特定の処理をするようなことをしたい!となったとき・・・。

ライブラリ使わないで実装となると、NSOperationとNSOperationQueueを使って、キューが空になるまで待つのが妥当かなという個人的な考え。GCDのdispatch_group_asyncを使っても実装できそうですね。でも少し面倒くさいですよね・・・。

ということで今回はライブラリを使ったらどうなるか、調べてみました。

  1. いくつかのタスクがある(リスト)
  2. タスクを並行に処理し全ての処理が完了したら通知
  3. 一つでもエラーがあればエラーを通知し完了を通知しない

という感じの処理を実装してみました。

1. BrightFutures

リポジトリ:https://github.com/Thomvis/BrightFutures
使用したバージョン:3.3.0

依存ライブラリ
Result: https://github.com/antitypical/Result
使用したバージョン:1.0.2

参考:Swiftの非同期処理を簡単に書けるBrightFuturesをコード例を多用して解説する

シンプルで使いやすそうだったので使ってみましたー!

こんな感じ

struct BrightFuturesTest{
    func exec(){

        let taskList = [
            self.createTask("A", interval: 1),
            self.createTask("B", interval: 2),
            self.createTask("C", interval: 0),
            self.createTask("D", interval: 4)
        ]

        taskList.fold("", f: {$0 + $1})
            .onFailure(callback: {print($0)})
            .onComplete(callback: {print($0)})

    }

    func createTask(name: String, interval: NSTimeInterval) -> Future<String, NSError> {
        let promise = Promise<String, NSError>()

        Queue.init().async{
            print("Task \(name) started")

            if interval.isZero{
                promise.failure(NSError(domain: "hoge", code: -1, userInfo: nil))
            }else{
                NSThread.sleepForTimeInterval(interval)
                promise.success(name)
            }
        }

        return promise.future
    }

出力結果

Task A started
Task D started
Task B started
Task C started
Error Domain=hoge Code=-1 "(null)"
.Failure(Error Domain=hoge Code=-1 "(null)")

エラーで終了してますね。

タスクリストを

let taskList = [
    self.createTask("A", interval: 1),
    self.createTask("B", interval: 2),
    self.createTask("C", interval: 3),
    self.createTask("D", interval: 4)
]

こうした場合は

Task A started
Task B started
Task C started
Task D started
.Success(ABCD)

上記のような出力が得られます。タスクのスタートする順序ですが、実行するたびに変わります「ABCD」になったり、「BACD」になったり、「ADBC」になったり。

PromiseとかFutureとかなんやねんとお思いの方は「Future/Promise」でググると幸せになれるかと思います。RxのObserverがPromiseでObservableがFutureのような感じだと思われ。

BrightFuturesでは、Futureの配列をfoldするといい感じに並行処理ができるっぽいですね。全てのタスクが「Success」だった場合と「Failure」が一つでも送られるとcompleteされます。

BrightFuturesは、他にも「andThen」で処理をつなげたり、mapやflapMapでデータを加工したりできます。Rxみたいです。

上の方に貼ったQiitaの記事がわかりやすいのでおすすめです。詳細な使い方を知りたい人はREADMEヘ。

便利です!

2. RxSwift

流行りのRxSwiftさん

リポジトリ:https://github.com/ReactiveX/RxSwift
使用したバージョン:2.4

こんな感じ

struct RxTest{

    private var disposeBag = DisposeBag()

    func exec(){

        let taskList = [
            self.createTask("A", interval: 1),
            self.createTask("B", interval: 2),
            self.createTask("C", interval: 3),
            self.createTask("D", interval: 4)
        ]

        taskList.forEach{$0.connect()}

        taskList.toObservable().merge(maxConcurrent: 10)
            .reduce(""){$0 + $1}
            .subscribe(onNext: {
                print($0)
            },onError: {
                print($0)
            },onCompleted:{
                print("completed")
            }).addDisposableTo(disposeBag)

    }

    func createTask(name: String, interval: NSTimeInterval) -> ConnectableObservable<String> {

        return Observable.create{(observer: AnyObserver<String>) in
            print("Task \(name) started")

            if interval.isZero{
                observer.onError(NSError(domain: "hoge", code: -1, userInfo: nil))
            }else{
                NSThread.sleepForTimeInterval(interval)
                observer.onNext(name)
                observer.onCompleted()
            }

            return AnonymousDisposable{}
        }.subscribeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: DispatchQueueSchedulerQOS.Background)).replayAll()
    }
}

出力結果

Task D started
Task B started
Task C started
Task A started
ABCD
completed

ポイントは、

ConnectableObservable<String>」の配列をtoObservableで「Observable<ConnectableObservable<String>>」に変換してmergeする

ところです。

mergeは

ObservableType where E : ObservableConvertibleType

の場合に使えます。要素がObservableConvertibleTypeに適合していないと表示されないんですね。

要素が少ない場合は

Observable.of(
    self.createTask("A", interval: 1), 
    self.createTask("B", interval: 2))
    .merge(maxConcurrent: 10)

でもよいです。

ちなみに

observer.onCompleted()

これしないとストリーム終わらないので注意です。

まとめ

BrightFuturesもRxSwiftも素晴らしいライブラリです。Future/Promiseのようなものを書きたかった場合はBrightFuturesのほうが良さげかなと思いますが、RxSwiftでもswitchLatest()を使えばandThenのようなこともできるんですよねー。うんうんRxすごい。

まぁでもdispatch_asyncとかNSOperationQueueとか書きたくないし、ライブラリは使っていこうと思います。もうCocoaのクラス使いにくいしよくわからないんですよ!iOSアプリを開発していると、Swiftは素晴らしいけどCocoaはダメダメやんって思うことが多いのでなかなか辛いです。とりあえず、NS何ちゃらを早く無くして欲しいですね。

今回はこの辺りで、ではでは

(最近また私の中でSwiftが熱くなってきているのでSwift関連の投稿が増えるかもしれません。お楽しみに。)

swift -v
Apple Swift version 2.2 (swiftlang-703.0.18.1 clang-703.0.29)