はじめに
今回はIObservable.ToUniTask
メソッドについて紹介をしたいと思います。
このメソッドを使う事でIObservable
を実装しているオブジェクトに対して、OnNext
もしくはOnCompleted
がされるまで処理を待つようなUniTask
に変換することができます。
使い方
まずはメソッドを使う上でシグネチャを確認してみましょう。
public static UniTask<T> ToUniTask<T>(this IObservable<T> source, bool useFirstValue = false, CancellationToken cancellationToken = default)
名前 | 意味 |
---|---|
useFirstValue | trueならOnNextされるとawaitを抜け、falseならOnCompletedされると抜ける |
cancellationToken | キャンセル用のトークン |
引数自体は簡単で,大切なのはuseFirstValue
がtrue / false
のどっちかですね。
これを踏まえながら以下のコードを見てみてください。
public class UniTaskTest { [UnityTest] public IEnumerator ToUniTaskTest() => UniTask.ToCoroutine(async () => { var subject = new Subject<int>(); var source = new CancellationTokenSource(); // 非同期で「初めてOnNextされるまで」と「OnCompleteされるまで」の2種類のIObservable.ToUniTaskを使って待つ var task = UniTask.Create(async () => { // 「useFirstValue=true」の場合は、OnNextがされるまで待つ var result = await subject.ToUniTask(useFirstValue: true, cancellationToken: source.Token); Assert.That(result, Is.EqualTo(1)); // 「useFirstValue=false」の場合は、OnCompletedがされるまで待ち、最後に発行された値を返す result = await subject.ToUniTask(useFirstValue: false, cancellationToken: source.Token); Assert.That(result, Is.EqualTo(3)); }); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); await UniTask.Delay(TimeSpan.FromSeconds(1)); subject.OnCompleted(); // taskの中のもろもろの処理が終わるまで待つ await task; subject.Dispose(); }); }
色々書きましたが、注目してほしいのはUniTask.Create
の引数の箇所で,await subject.ToUniTask
により条件を満たすまで処理を待ちます。
それぞれの違いは是非コードのコメントを見てみてください。
ざっと仕組みについて
仕組みを理解するにはやはりコードを覗くのが近道でしょう。
public static UniTask<T> ToUniTask<T>(this IObservable<T> source, bool useFirstValue = false, CancellationToken cancellationToken = default) { var promise = new UniTaskCompletionSource<T>(); var disposable = new SingleAssignmentDisposable(); var observer = useFirstValue ? (IObserver<T>)new FirstValueToUniTaskObserver<T>(promise, disposable, cancellationToken) : (IObserver<T>)new ToUniTaskObserver<T>(promise, disposable, cancellationToken); try { disposable.Disposable = source.Subscribe(observer); } catch (Exception ex) { promise.TrySetException(ex); } return promise.Task; }
冒頭でUniTaskCompletionSource
を作成していて,これを用いて好きなタイミングで結果を確定させられるUniTask
を生成し、返り値として返しています。
【UniTask】UniTaskCompletionSourceを使って好きなタイミングで結果を確定させるUniTaskを生成する(ついでにUniTask.Voidの紹介) - はなちるのマイノート
ここで一番大切なのは、引数でUniTaskCompletionSource
を渡しながらFirstValueToUniTaskObserver
・ToUniTaskObserver
をインスタンス化した後、source.Subscribe(observer)
をしていることです。
何言ってるんだと思うかもしれませんが、わかりやすく言うとFirstValueToUniTaskObserver
・ToUniTaskObserver
の中でUniTaskCompletionSource.TrySetCanceled(), TrySetException(), TrySetResult()
が実行されると,await
を抜けるというわけです。
class FirstValueToUniTaskObserver<T> : IObserver<T> { static readonly Action<object> callback = OnCanceled; readonly UniTaskCompletionSource<T> promise; readonly SingleAssignmentDisposable disposable; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration registration; bool hasValue; public FirstValueToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken) { this.promise = promise; this.disposable = disposable; this.cancellationToken = cancellationToken; if (this.cancellationToken.CanBeCanceled) { this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this); } } static void OnCanceled(object state) { var self = (FirstValueToUniTaskObserver<T>) state; self.disposable.Dispose(); self.promise.TrySetCanceled(); } public void OnNext(T value) { hasValue = true; try { promise.TrySetResult(value); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnError(Exception error) { try { promise.TrySetException(error); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnCompleted() { try { if (!hasValue) { promise.TrySetException(new InvalidOperationException("Sequence has no elements")); } } finally { registration.Dispose(); disposable.Dispose(); } } }
class ToUniTaskObserver<T> : IObserver<T> { static readonly Action<object> callback = OnCanceled; readonly UniTaskCompletionSource<T> promise; readonly SingleAssignmentDisposable disposable; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration registration; bool hasValue; T latestValue; public ToUniTaskObserver(UniTaskCompletionSource<T> promise, SingleAssignmentDisposable disposable, CancellationToken cancellationToken) { this.promise = promise; this.disposable = disposable; this.cancellationToken = cancellationToken; if (this.cancellationToken.CanBeCanceled) { this.registration = this.cancellationToken.RegisterWithoutCaptureExecutionContext(callback, this); } } static void OnCanceled(object state) { var self = (ToUniTaskObserver<T>) state; self.disposable.Dispose(); self.promise.TrySetCanceled(); } public void OnNext(T value) { hasValue = true; latestValue = value; } public void OnError(Exception error) { try { promise.TrySetException(error); } finally { registration.Dispose(); disposable.Dispose(); } } public void OnCompleted() { try { if (hasValue) { promise.TrySetResult(latestValue); } else { promise.TrySetException(new InvalidOperationException("Sequence has no elements")); } } finally { registration.Dispose(); disposable.Dispose(); } } }
コードをじっくりとみてみると、以下のタイミングでawait
を抜けることが確認できます。
useFirstValue | await を抜けるタイミング |
---|---|
true | OnCanceled, OnError, OnNext, (OnCompleted) |
false | OnCanceled, OnError, OnCompleted |
またOnCompleted
の段階で一度もOnNext
されていなかった場合はuseFirstValue
関係なくエラーを投げるので注意してください。