はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【UniRx,UniTask】IObservable.ToUniTaskメソッドをうまく使いこなそう

はじめに

今回はIObservable.ToUniTaskメソッドについて紹介をしたいと思います。

このメソッドを使う事でIObservableを実装しているオブジェクトに対して、OnNextもしくはOnCompletedがされるまで処理を待つようなUniTaskに変換することができます。

github.com

使い方

まずはメソッドを使う上でシグネチャを確認してみましょう。

public static UniTask<T> ToUniTask<T>(this IObservable<T> source, bool useFirstValue = false, CancellationToken cancellationToken = default)
名前 意味
useFirstValue trueならOnNextされるとawaitを抜け、falseならOnCompletedされると抜ける
cancellationToken キャンセル用のトークン

引数自体は簡単で,大切なのはuseFirstValuetrue / falseのどっちかですね。

これを踏まえながら以下のコードを見てみてください。

public class UniTaskTest
{
    [UnityTest]
    public IEnumerator ToUniTaskTest() => UniTask.ToCoroutine(async () =>
    {
        var subject = new Subject<int>();
        var source = new CancellationTokenSource();
        
        // 非同期で「初めてOnNextされるまで」と「OnCompleteされるまで」の2種類のIObservable.ToUniTaskを使って待つ
        var task = UniTask.Create(async () =>
        {
            // 「useFirstValue=true」の場合は、OnNextがされるまで待つ
            var result = await subject.ToUniTask(useFirstValue: true, cancellationToken: source.Token);
            Assert.That(result, Is.EqualTo(1));
            
            // 「useFirstValue=false」の場合は、OnCompletedがされるまで待ち、最後に発行された値を返す
            result = await subject.ToUniTask(useFirstValue: false, cancellationToken: source.Token);
            Assert.That(result, Is.EqualTo(3));
        });
        
        subject.OnNext(1);
        subject.OnNext(2);
        subject.OnNext(3);

        await UniTask.Delay(TimeSpan.FromSeconds(1));
        
        subject.OnCompleted();

        // taskの中のもろもろの処理が終わるまで待つ
        await task;
        subject.Dispose();
    });
}

色々書きましたが、注目してほしいのはUniTask.Createの引数の箇所で,await subject.ToUniTaskにより条件を満たすまで処理を待ちます。

それぞれの違いは是非コードのコメントを見てみてください。

ざっと仕組みについて

仕組みを理解するにはやはりコードを覗くのが近道でしょう。

public static UniTask<T> ToUniTask<T>(this IObservable<T> source, bool useFirstValue = false, CancellationToken cancellationToken = default)
{
    var promise = new UniTaskCompletionSource<T>();
    var disposable = new SingleAssignmentDisposable();

    var observer = useFirstValue
        ? (IObserver<T>)new FirstValueToUniTaskObserver<T>(promise, disposable, cancellationToken)
        : (IObserver<T>)new ToUniTaskObserver<T>(promise, disposable, cancellationToken);

    try
    {
        disposable.Disposable = source.Subscribe(observer);
    }
    catch (Exception ex)
    {
        promise.TrySetException(ex);
    }

    return promise.Task;
}

冒頭でUniTaskCompletionSourceを作成していて,これを用いて好きなタイミングで結果を確定させられるUniTaskを生成し、返り値として返しています。
【UniTask】UniTaskCompletionSourceを使って好きなタイミングで結果を確定させるUniTaskを生成する(ついでにUniTask.Voidの紹介) - はなちるのマイノート

ここで一番大切なのは、引数でUniTaskCompletionSourceを渡しながらFirstValueToUniTaskObserverToUniTaskObserverをインスタンス化した後、source.Subscribe(observer)をしていることです。

何言ってるんだと思うかもしれませんが、わかりやすく言うとFirstValueToUniTaskObserverToUniTaskObserverの中でUniTaskCompletionSource.TrySetCanceled(), TrySetException(), TrySetResult()が実行されると,awaitを抜けるというわけです。

class FirstValueToUniTaskObserver<T> : IObserver<T>
{
    static readonly Action<object> callback = OnCanceled;

    readonly UniTaskCompletionSource<T> promise;
    readonly SingleAssignmentDisposable disposable;
    readonly CancellationToken cancellationToken;
    readonly CancellationTokenRegistration registration;

    bool hasValue;

    public FirstValueToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable,
        CancellationToken cancellationToken)
    {
        this.promise = promise;
        this.disposable = disposable;
        this.cancellationToken = cancellationToken;

        if (this.cancellationToken.CanBeCanceled)
        {
            this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this);
        }
    }

    static void OnCanceled(object state)
    {
        var self = (FirstValueToUniTaskObserver<T>) state;
        self.disposable.Dispose();
        self.promise.TrySetCanceled();
    }

    public void OnNext(T value)
    {
        hasValue = true;
        try
        {
            promise.TrySetResult(value);
        }
        finally
        {
            registration.Dispose();
            disposable.Dispose();
        }
    }

    public void OnError(Exception error)
    {
        try
        {
            promise.TrySetException(error);
        }
        finally
        {
            registration.Dispose();
            disposable.Dispose();
        }
    }

    public void OnCompleted()
    {
        try
        {
            if (!hasValue)
            {
                promise.TrySetException(new InvalidOperationException("Sequence has no elements"));
            }
        }
        finally
        {
            registration.Dispose();
            disposable.Dispose();
        }
    }
}
class ToUniTaskObserver<T> : IObserver<T>
{
    static readonly Action<object> callback = OnCanceled;

    readonly UniTaskCompletionSource<T> promise;
    readonly SingleAssignmentDisposable disposable;
    readonly CancellationToken cancellationToken;
    readonly CancellationTokenRegistration registration;

    bool hasValue;
    T latestValue;

    public ToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable,
        CancellationToken cancellationToken)
    {
        this.promise = promise;
        this.disposable = disposable;
        this.cancellationToken = cancellationToken;

        if (this.cancellationToken.CanBeCanceled)
        {
            this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this);
        }
    }

    static void OnCanceled(object state)
    {
        var self = (ToUniTaskObserver<T>) state;
        self.disposable.Dispose();
        self.promise.TrySetCanceled();
    }

    public void OnNext(T value)
    {
        hasValue = true;
        latestValue = value;
    }

    public void OnError(Exception error)
    {
        try
        {
            promise.TrySetException(error);
        }
        finally
        {
            registration.Dispose();
            disposable.Dispose();
        }
    }

    public void OnCompleted()
    {
        try
        {
            if (hasValue)
            {
                promise.TrySetResult(latestValue);
            }
            else
            {
                promise.TrySetException(new InvalidOperationException("Sequence has no elements"));
            }
        }
        finally
        {
            registration.Dispose();
            disposable.Dispose();
        }
    }
}


コードをじっくりとみてみると、以下のタイミングでawaitを抜けることが確認できます。

useFirstValue awaitを抜けるタイミング
true OnCanceled, OnError, OnNext, (OnCompleted)
false OnCanceled, OnError, OnCompleted

またOnCompletedの段階で一度もOnNextされていなかった場合はuseFirstValue関係なくエラーを投げるので注意してください。