使い方
AsyncSubject
はOnCompleted
されたときに最後にOnNext
された値を出力します。
[Test] public void AsyncSubjectTest() { var asyncSubject = new AsyncSubject<int>(); var result = 0; // asyncObjectがOnCompletedされたとき,Subscribeの引数が実行される asyncSubject.Subscribe(x => result = x); asyncSubject.OnNext(1); asyncSubject.OnNext(2); asyncSubject.OnNext(3); asyncSubject.OnCompleted(); Assert.That(result, Is.EqualTo(3)); }
またAsyncSubject.OnCompleted
された後にキャッシュされた値を取り出したいときはAsyncSubject.Value
を使うと取り出せます。
[Test] public void AsyncSubjectTest2() { var asyncSubject = new AsyncSubject<int>(); asyncSubject.OnNext(1); try { Assert.That(asyncSubject.Value, Is.EqualTo(1)); } catch (InvalidOperationException) { // AsyncSubject.OnCompletedされる前にAsyncSubject.Valueを使うとInvalidOperationExceptionを投げる } asyncSubject.OnNext(2); asyncSubject.OnNext(3); asyncSubject.OnCompleted(); // OnCompletedされた後にキャッシュされた値はAsyncSubject.Valueで取り出せる Assert.That(asyncSubject.Value, Is.EqualTo(3)); }
加えてOnNext
を一度も実行せずにOnCompleted
を実行してもエラーにはならず、デフォルト値を返します。
(ToUniTaskObserver<T>
とかはエラーを吐いたような)
[Test] public void AsyncSubjectTest3() { var asyncSubject = new AsyncSubject<int>(); asyncSubject.Subscribe(onNext: x => Assert.Fail(), onCompleted: () => Debug.Log("OnCompleted")); asyncSubject.OnCompleted(); Assert.That(asyncSubject.Value, Is.EqualTo(default(int))); }
仕組み
重要そうな箇所を抜粋してみました。
public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable { object observerLock = new object(); T lastValue; bool hasValue; bool isStopped; bool isDisposed; Exception lastError; IObserver<T> outObserver = EmptyObserver<T>.Instance; public T Value { get { ThrowIfDisposed(); if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet"); if (lastError != null) lastError.Throw(); return lastValue; } } ... public void OnCompleted() { IObserver<T> old; T v; bool hv; lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; old = outObserver; outObserver = EmptyObserver<T>.Instance; isStopped = true; v = lastValue; hv = hasValue; } if (hv) { old.OnNext(v); old.OnCompleted(); } else { old.OnCompleted(); } } ... public void OnNext(T value) { lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; this.hasValue = true; this.lastValue = value; } } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentNullException("observer"); var ex = default(Exception); var v = default(T); var hv = false; lock (observerLock) { ThrowIfDisposed(); if (!isStopped) { ... return new Subscription(this, observer); } ex = lastError; v = lastValue; hv = hasValue; } if (ex != null) { observer.OnError(ex); } else if (hv) { observer.OnNext(v); observer.OnCompleted(); } else { observer.OnCompleted(); } return Disposable.Empty; } ... }
UniRx/AsyncSubject.cs at master · neuecc/UniRx · GitHub
上から解剖していきましょう。
まずフィールドとしてlastValue
が存在します。これがOnCompleted
のときに出力する値です。それがValue
で取り出せるようになっていますね。
public T Value { get { ThrowIfDisposed(); if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet"); if (lastError != null) lastError.Throw(); return lastValue; } }
次にOnCompletd
の中でマルチスレッドのためのlock
があってみにくいですが、実質以下のようなコードが書かれています。
public void OnCompleted() { if (hasValue) { old.OnNext(lastValue); old.OnCompleted(); } else { old.OnCompleted(); } }
つまり一度でもOnNext
されたらlastValue
を出力して、一度もされてなければSubscribe
したObserver
をOnCompleted
をしちゃいます。
続いてOnNext
ではhasValue = true
とlastValue
の更新を行います。
public void OnNext(T value) { lock (observerLock) { ThrowIfDisposed(); if (isStopped) return; this.hasValue = true; this.lastValue = value; } }
最後にSubscribe
についてですが、まだOnCompleted
されていなければいつも通りリスト(厳密にはListObserver<T>
)にobserver
を追加。
OnCompleted
されていたら(実質)以下のコードを実行します。
if (lastError != null) { // 既にエラーが確定していたら observer.OnError(lastError); } else if (hasValue) { // OnNextが一度でもされていたら observer.OnNext(lastValue); observer.OnCompleted(); } else { // OnNextが一度もされていない場合 observer.OnCompleted(); }
さいごに
中のコードを見ることで、使い方での挙動を理解することができました。
最近UniRx・UniTask
関連の記事が多くて、あまり興味がない方は退屈かもしれませんがもう少しだけ続いてしまうかもしれません。
よかったらお付き合いください。
ではまた。