はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【UniRx】ファクトリメソッドObservable.Create<T>を解剖してみよう

はじめに

今回はUniRXObservable.Create<T>についてソースコードを交えながら見ていきたいと思います。

github.com

使い方

var observable = Observable.Create<int>(observer =>
{
    for(var i = 0; i < 10; i++)
        observer.OnNext(i);
    observer.OnCompleted();
    return Disposable.Empty;
});

// 0 1 2 3 4 5 6 7 8 9
observable.Subscribe(x => Debug.Log(x));

処理の流れとしては,

observable.Subscriveの引数よりObserverを作成し,Observable.Createの引数のFunc<IObserver<int>,IDisposable>を作成したObserverを引数に実行する

といった感じです。

簡単にいえばObservable.SubscribeをしたらObservable.Createの引数が実行されるような感じでしょうか。

Observable.CreateFunc<IObserver<T>,IDisposable>の返り値のIDisposableObservable.Subscribeの返り値となって取り出せます。

Observable.Createの中身

ファクトリメソッド達はObservable.Creation.csの中に入っています。

見てみるとかなりの種類のファクトリメソッドがありますが,今回はObservable.Createだけに注目していきます。
UniRx オペレータ逆引き - Qiita

/// <summary>
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator. 
/// </summary>
public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe)
{
    if (subscribe == null) throw new ArgumentNullException("subscribe");

    return new CreateObservable<T>(subscribe);
}

/// <summary>
/// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable). 
/// </summary>
public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
{
    if (subscribe == null) throw new ArgumentNullException("subscribe");

    return new CreateObservable<T>(subscribe, isRequiredSubscribeOnCurrentThread);
}

Observable.Createで生成されるインスタンスは,CreateObservable<T>なるものだということが分かります。

つまりはCreateObservable<T>を制すことができれば,もうObservable.Createを制したと言っても過言ではないでしょう。

もう少し冒険は続きます。

CreateObservable<T>の中身

CreateObservable<T>Scripts/Operators/Create.csの中に入ってます。

github.com

internal class CreateObservable<T> : OperatorObservableBase<T>
{
    readonly Func<IObserver<T>, IDisposable> subscribe;

    public CreateObservable(Func<IObserver<T>, IDisposable> subscribe)
        : base(true) // fail safe
    {
        this.subscribe = subscribe;
    }

    public CreateObservable(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread)
        : base(isRequiredSubscribeOnCurrentThread)
    {
        this.subscribe = subscribe;
    }

    protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
    {
        observer = new Create(observer, cancel);
        return subscribe(observer) ?? Disposable.Empty;
    }
}

こちらはOperatorObservableBaseの派生クラスなので基底クラスの方も確認しなければ挙動を完全に把握することはできませんが、このコードで大切なことは以下の2つだと思います。

  • Observable.Createの引数で渡したFunc<IObserver<T>, IDisposable>をキャッシュしている
  • SubscribeCoreメソッドを呼ぶと、新しいObserverを生成してキャッシュしていたFuncデリゲートを実行する

この時点で親クラスにてSubscribeCoreが実行されることが察せますね。あとはこのメソッドが実行されるタイミングを知ることができればかなり全容に近づくことができます。

というわけで親クラスであるOperatorObservableBaseをみていきます。

OperatorObservableBase

OperatorObservableBaseScripts/Operators/OperatorObservableBase.csに記述されています。

github.com

// implements note : all field must be readonly.
public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
{
    readonly bool isRequiredSubscribeOnCurrentThread;

    public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
    {
        this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
    }

    public bool IsRequiredSubscribeOnCurrentThread()
    {
        return isRequiredSubscribeOnCurrentThread;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new SingleAssignmentDisposable();

        // note:
        // does not make the safe observer, it breaks exception durability.
        // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);

        if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
        {
            Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
        }
        else
        {
            subscription.Disposable = SubscribeCore(observer, subscription);
        }

        return subscription;
    }

    protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
}

SingleAssignmentDisposableDisposeのために作っているクラスなのでそこまで気にする必要はありません。

isRequiredSubscribeOnCurrentThreadはとりすーぷさんもよく分からないといっているので,分かりません。(スレッド関連もの)

ただSubscribeをするとSubscribeCoreが実行されていることが分かります。

実はこれが今回一番だと感じていることで,今までの知識をまとめると以下のことが分かります。

f:id:hanaaaaaachiru:20210531174712p:plain
Observable.Createの挙動


つまりは,Subscribeを実行したときにObservable.Createの引数に書いたFuncデリゲートが呼ばれるということです。

最初の使い方で説明したことに無事辿り着くことができました。

さいごに

UniRxにはそれ以外にもたくさんのファクトリメソッドがありますが,どんなインスタンスが生成されているかをしっかりとコードで追えば挙動がわかると思います。

すごい昔にUniRxの色々なファクトリメソッドを使ったことがありましたが、コードも追ってみると面白いかもしれません。
www.hanachiru-blog.com


ではまた。