はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【UniRx】AsyncSubject<T>を使ってOnCompletedされたときに最後に入力された値を出力する

はじめに

今回はAsyncSubject<T>について実際のコードも触れながら解剖していきたいと思います。

github.com

使い方

AsyncSubjectOnCompletedされたときに最後にOnNextされた値を出力します

[Test]
public void AsyncSubjectTest()
{
    var asyncSubject = new AsyncSubject<int>();
        
    var result = 0;
        
    // asyncObjectがOnCompletedされたとき,Subscribeの引数が実行される
    asyncSubject.Subscribe(x => result = x);
        
    asyncSubject.OnNext(1);
    asyncSubject.OnNext(2);
    asyncSubject.OnNext(3);
        
    asyncSubject.OnCompleted();
        
    Assert.That(result, Is.EqualTo(3));
}


またAsyncSubject.OnCompletedされた後にキャッシュされた値を取り出したいときはAsyncSubject.Valueを使うと取り出せます。

[Test]
public void AsyncSubjectTest2()
{
    var asyncSubject = new AsyncSubject<int>();

    asyncSubject.OnNext(1);
    try
    {
        Assert.That(asyncSubject.Value, Is.EqualTo(1));
    }
    catch (InvalidOperationException)
    {
        // AsyncSubject.OnCompletedされる前にAsyncSubject.Valueを使うとInvalidOperationExceptionを投げる
    }

    asyncSubject.OnNext(2);
    asyncSubject.OnNext(3);

    asyncSubject.OnCompleted();
    // OnCompletedされた後にキャッシュされた値はAsyncSubject.Valueで取り出せる
    Assert.That(asyncSubject.Value, Is.EqualTo(3));
}


加えてOnNextを一度も実行せずにOnCompletedを実行してもエラーにはならず、デフォルト値を返します。
(ToUniTaskObserver<T>とかはエラーを吐いたような)

[Test]
public void AsyncSubjectTest3()
{
    var asyncSubject = new AsyncSubject<int>();
        
    asyncSubject.Subscribe(onNext: x => Assert.Fail(), onCompleted: () => Debug.Log("OnCompleted"));
        
    asyncSubject.OnCompleted();
    Assert.That(asyncSubject.Value, Is.EqualTo(default(int)));
}

仕組み

重要そうな箇所を抜粋してみました。

public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
{
    object observerLock = new object();

    T lastValue;
    bool hasValue;
    bool isStopped;
    bool isDisposed;
    Exception lastError;
    IObserver<T> outObserver = EmptyObserver<T>.Instance;

    public T Value
    {
        get
        {
            ThrowIfDisposed();
            if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
            if (lastError != null) lastError.Throw();
            return lastValue;
        }
    }

    ...

    public void OnCompleted()
    {
        IObserver<T> old;
        T v;
        bool hv;
        lock (observerLock)
        {
            ThrowIfDisposed();
            if (isStopped) return;

            old = outObserver;
            outObserver = EmptyObserver<T>.Instance;
            isStopped = true;
            v = lastValue;
            hv = hasValue;
        }

        if (hv)
        {
            old.OnNext(v);
            old.OnCompleted();
        }
        else
        {
            old.OnCompleted();
        }
    }

    ...

    public void OnNext(T value)
    {
        lock (observerLock)
        {
            ThrowIfDisposed();
            if (isStopped) return;

            this.hasValue = true;
            this.lastValue = value;
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException("observer");

        var ex = default(Exception);
        var v = default(T);
        var hv = false;

        lock (observerLock)
        {
            ThrowIfDisposed();
            if (!isStopped)
            {
                ...
                return new Subscription(this, observer);
            }

            ex = lastError;
            v = lastValue;
            hv = hasValue;
        }

        if (ex != null)
        {
            observer.OnError(ex);
        }
        else if (hv)
        {
            observer.OnNext(v);
            observer.OnCompleted();
        }
        else
        {
            observer.OnCompleted();
        }

        return Disposable.Empty;
    }

    ...
}

UniRx/AsyncSubject.cs at master · neuecc/UniRx · GitHub


上から解剖していきましょう。

まずフィールドとしてlastValueが存在します。これがOnCompletedのときに出力する値です。それがValueで取り出せるようになっていますね。

public T Value
{
    get
    {
        ThrowIfDisposed();
        if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
        if (lastError != null) lastError.Throw();
        return lastValue;
    }
}


次にOnCompletdの中でマルチスレッドのためのlockがあってみにくいですが、実質以下のようなコードが書かれています。

public void OnCompleted()
{
    if (hasValue)
    {
        old.OnNext(lastValue);
        old.OnCompleted();
    }
    else
    {
        old.OnCompleted();
    }
}

つまり一度でもOnNextされたらlastValueを出力して、一度もされてなければSubscribeしたObserverOnCompletedをしちゃいます。


続いてOnNextではhasValue = truelastValueの更新を行います。

public void OnNext(T value)
{
    lock (observerLock)
    {
        ThrowIfDisposed();
        if (isStopped) return;

        this.hasValue = true;
        this.lastValue = value;
    }
}


最後にSubscribeについてですが、まだOnCompletedされていなければいつも通りリスト(厳密にはListObserver<T>)にobserver を追加。

OnCompletedされていたら(実質)以下のコードを実行します。

if (lastError != null)
{
    // 既にエラーが確定していたら
    observer.OnError(lastError);
}
else if (hasValue)
{
    // OnNextが一度でもされていたら
    observer.OnNext(lastValue);
    observer.OnCompleted();
}
else
{
    // OnNextが一度もされていない場合
    observer.OnCompleted();
}

さいごに

中のコードを見ることで、使い方での挙動を理解することができました。

最近UniRx・UniTask関連の記事が多くて、あまり興味がない方は退屈かもしれませんがもう少しだけ続いてしまうかもしれません。

よかったらお付き合いください。

ではまた。