はじめに
今回はUniRXのObservable.Create<T>についてソースコードを交えながら見ていきたいと思います。
使い方
var observable = Observable.Create<int>(observer => { for(var i = 0; i < 10; i++) observer.OnNext(i); observer.OnCompleted(); return Disposable.Empty; }); // 0 1 2 3 4 5 6 7 8 9 observable.Subscribe(x => Debug.Log(x));
処理の流れとしては,
observable.Subscriveの引数よりObserverを作成し,Observable.Createの引数のFunc<IObserver<int>,IDisposable>を作成したObserverを引数に実行する
といった感じです。
簡単にいえばObservable.SubscribeをしたらObservable.Createの引数が実行されるような感じでしょうか。
後Observable.CreateのFunc<IObserver<T>,IDisposable>の返り値のIDisposableはObservable.Subscribeの返り値となって取り出せます。
Observable.Createの中身
ファクトリメソッド達はObservable.Creation.csの中に入っています。
見てみるとかなりの種類のファクトリメソッドがありますが,今回はObservable.Createだけに注目していきます。
UniRx オペレータ逆引き - Qiita
/// <summary> /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator. /// </summary> public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable<T>(subscribe); } /// <summary> /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable). /// </summary> public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable<T>(subscribe, isRequiredSubscribeOnCurrentThread); }
Observable.Createで生成されるインスタンスは,CreateObservable<T>なるものだということが分かります。
つまりはCreateObservable<T>を制すことができれば,もうObservable.Createを制したと言っても過言ではないでしょう。
もう少し冒険は続きます。
CreateObservable<T>の中身
CreateObservable<T>はScripts/Operators/Create.csの中に入ってます。
internal class CreateObservable<T> : OperatorObservableBase<T> { readonly Func<IObserver<T>, IDisposable> subscribe; public CreateObservable(Func<IObserver<T>, IDisposable> subscribe) : base(true) // fail safe { this.subscribe = subscribe; } public CreateObservable(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) { observer = new Create(observer, cancel); return subscribe(observer) ?? Disposable.Empty; } }
こちらはOperatorObservableBaseの派生クラスなので基底クラスの方も確認しなければ挙動を完全に把握することはできませんが、このコードで大切なことは以下の2つだと思います。
- Observable.Createの引数で渡した
Func<IObserver<T>, IDisposable>をキャッシュしている SubscribeCoreメソッドを呼ぶと、新しいObserverを生成してキャッシュしていたFuncデリゲートを実行する
この時点で親クラスにてSubscribeCoreが実行されることが察せますね。あとはこのメソッドが実行されるタイミングを知ることができればかなり全容に近づくことができます。
というわけで親クラスであるOperatorObservableBaseをみていきます。
OperatorObservableBase
OperatorObservableBaseはScripts/Operators/OperatorObservableBase.csに記述されています。
// implements note : all field must be readonly. public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T> { readonly bool isRequiredSubscribeOnCurrentThread; public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread) { this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread; } public bool IsRequiredSubscribeOnCurrentThread() { return isRequiredSubscribeOnCurrentThread; } public IDisposable Subscribe(IObserver<T> observer) { var subscription = new SingleAssignmentDisposable(); // note: // does not make the safe observer, it breaks exception durability. // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription); if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired) { Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription)); } else { subscription.Disposable = SubscribeCore(observer, subscription); } return subscription; } protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel); }
SingleAssignmentDisposableはDisposeのために作っているクラスなのでそこまで気にする必要はありません。
isRequiredSubscribeOnCurrentThreadはとりすーぷさんもよく分からないといっているので,分かりません。(スレッド関連もの)
UniRxのObservable.Createとかに登場するisRequiredSubscribeOnCurrentThreadフラグの用途がわからない。
— とりすーぷ (@toRisouP) August 22, 2019
ただSubscribeをするとSubscribeCoreが実行されていることが分かります。
実はこれが今回一番だと感じていることで,今までの知識をまとめると以下のことが分かります。

つまりは,Subscribeを実行したときにObservable.Createの引数に書いたFuncデリゲートが呼ばれるということです。
最初の使い方で説明したことに無事辿り着くことができました。
さいごに
UniRxにはそれ以外にもたくさんのファクトリメソッドがありますが,どんなインスタンスが生成されているかをしっかりとコードで追えば挙動がわかると思います。
すごい昔にUniRxの色々なファクトリメソッドを使ったことがありましたが、コードも追ってみると面白いかもしれません。
www.hanachiru-blog.com
ではまた。