はじめに
今回はUniRX
のObservable.Create<T>
についてソースコードを交えながら見ていきたいと思います。
使い方
var observable = Observable.Create<int>(observer => { for(var i = 0; i < 10; i++) observer.OnNext(i); observer.OnCompleted(); return Disposable.Empty; }); // 0 1 2 3 4 5 6 7 8 9 observable.Subscribe(x => Debug.Log(x));
処理の流れとしては,
observable.Subscrive
の引数よりObserver
を作成し,Observable.Create
の引数のFunc<IObserver<int>,IDisposable>
を作成したObserver
を引数に実行する
といった感じです。
簡単にいえばObservable.Subscribe
をしたらObservable.Create
の引数が実行されるような感じでしょうか。
後Observable.Create
のFunc<IObserver<T>,IDisposable>
の返り値のIDisposable
はObservable.Subscribe
の返り値となって取り出せます。
Observable.Createの中身
ファクトリメソッド達はObservable.Creation.cs
の中に入っています。
見てみるとかなりの種類のファクトリメソッドがありますが,今回はObservable.Create
だけに注目していきます。
UniRx オペレータ逆引き - Qiita
/// <summary> /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator. /// </summary> public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable<T>(subscribe); } /// <summary> /// Create anonymous observable. Observer has exception durability. This is recommended for make operator and event like generator(HotObservable). /// </summary> public static IObservable<T> Create<T>(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) { if (subscribe == null) throw new ArgumentNullException("subscribe"); return new CreateObservable<T>(subscribe, isRequiredSubscribeOnCurrentThread); }
Observable.Create
で生成されるインスタンスは,CreateObservable<T>
なるものだということが分かります。
つまりはCreateObservable<T>
を制すことができれば,もうObservable.Create
を制したと言っても過言ではないでしょう。
もう少し冒険は続きます。
CreateObservable<T>の中身
CreateObservable<T>
はScripts/Operators/Create.cs
の中に入ってます。
internal class CreateObservable<T> : OperatorObservableBase<T> { readonly Func<IObserver<T>, IDisposable> subscribe; public CreateObservable(Func<IObserver<T>, IDisposable> subscribe) : base(true) // fail safe { this.subscribe = subscribe; } public CreateObservable(Func<IObserver<T>, IDisposable> subscribe, bool isRequiredSubscribeOnCurrentThread) : base(isRequiredSubscribeOnCurrentThread) { this.subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) { observer = new Create(observer, cancel); return subscribe(observer) ?? Disposable.Empty; } }
こちらはOperatorObservableBase
の派生クラスなので基底クラスの方も確認しなければ挙動を完全に把握することはできませんが、このコードで大切なことは以下の2つだと思います。
- Observable.Createの引数で渡した
Func<IObserver<T>, IDisposable>
をキャッシュしている SubscribeCore
メソッドを呼ぶと、新しいObserver
を生成してキャッシュしていたFunc
デリゲートを実行する
この時点で親クラスにてSubscribeCore
が実行されることが察せますね。あとはこのメソッドが実行されるタイミングを知ることができればかなり全容に近づくことができます。
というわけで親クラスであるOperatorObservableBase
をみていきます。
OperatorObservableBase
OperatorObservableBase
はScripts/Operators/OperatorObservableBase.cs
に記述されています。
// implements note : all field must be readonly. public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T> { readonly bool isRequiredSubscribeOnCurrentThread; public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread) { this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread; } public bool IsRequiredSubscribeOnCurrentThread() { return isRequiredSubscribeOnCurrentThread; } public IDisposable Subscribe(IObserver<T> observer) { var subscription = new SingleAssignmentDisposable(); // note: // does not make the safe observer, it breaks exception durability. // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription); if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired) { Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription)); } else { subscription.Disposable = SubscribeCore(observer, subscription); } return subscription; } protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel); }
SingleAssignmentDisposable
はDispose
のために作っているクラスなのでそこまで気にする必要はありません。
isRequiredSubscribeOnCurrentThread
はとりすーぷさんもよく分からないといっているので,分かりません。(スレッド関連もの)
UniRxのObservable.Createとかに登場するisRequiredSubscribeOnCurrentThreadフラグの用途がわからない。
— とりすーぷ (@toRisouP) August 22, 2019
ただSubscribe
をするとSubscribeCore
が実行されていることが分かります。
実はこれが今回一番だと感じていることで,今までの知識をまとめると以下のことが分かります。
つまりは,Subscribeを実行したときにObservable.Createの引数に書いたFuncデリゲートが呼ばれるということです。
最初の使い方で説明したことに無事辿り着くことができました。
さいごに
UniRx
にはそれ以外にもたくさんのファクトリメソッドがありますが,どんなインスタンスが生成されているかをしっかりとコードで追えば挙動がわかると思います。
すごい昔にUniRx
の色々なファクトリメソッドを使ったことがありましたが、コードも追ってみると面白いかもしれません。
www.hanachiru-blog.com
ではまた。