使い方
IEnumerable
を実装しているオブジェクト,いわゆるデータの集まりを指すコレクションに対してToObservable
メソッドを実行することで、Subjscribe
をした際に値を順番に発行することができます。
[Test] public void ToObservableTest() { var values = new[] {1, 2, 3, 4, 5, 6}; var result = 0; // Subscribeしたとき、「OnNext(1) -> OnNext(2) -> ... -> OnNext(6) -> OnCompleted」が呼ばれる values.ToObservable() .Subscribe(x => result += x); Assert.That(result, Is.EqualTo(21)); }
仕組みについて
内部のコードを見てみましょう。
public static IObservable<T> ToObservable<T>(this IEnumerable<T> source) { return ToObservable(source, Scheduler.DefaultSchedulers.Iteration); } public static IObservable<T> ToObservable<T>(this IEnumerable<T> source, IScheduler scheduler) { return new ToObservableObservable<T>(source, scheduler); }
UniRx/Observable.Conversions.cs at master · neuecc/UniRx · GitHub
まずはToObservable
を実行することでToObservableObservable
のインスタンスを生成します。いわゆるファクトリメソッドですね。
名前がToObservableObservable
というなかなかすごい名前をしていますが、仕組みを理解するためにはこちらもコードも見ていきましょう。
internal class ToObservableObservable<T> : OperatorObservableBase<T> { readonly IEnumerable<T> source; readonly IScheduler scheduler; public ToObservableObservable(IEnumerable<T> source, IScheduler scheduler) : base(scheduler == Scheduler.CurrentThread) { this.source = source; this.scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) { return new ToObservable(this, observer, cancel).Run(); } }
UniRx/ToObservable.cs at master · neuecc/UniRx · GitHub
基底クラスであるOperatorObservalbeBase
を見ないことには処理の流れは分かりませんが、SubscribeCore
が親クラスにて呼ばれることは想像できます。
またSubscribeCore
が呼ばれるとToObservalbe
のインスタンスを生成しRun
を実行することを覚えておいてください。
というわけでOperatorObservalbeBase
をみていきましょう。
public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T> { readonly bool isRequiredSubscribeOnCurrentThread; public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread) { this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread; } public bool IsRequiredSubscribeOnCurrentThread() { return isRequiredSubscribeOnCurrentThread; } public IDisposable Subscribe(IObserver<T> observer) { var subscription = new SingleAssignmentDisposable(); // note: // does not make the safe observer, it breaks exception durability. // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription); if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired) { Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription)); } else { subscription.Disposable = SubscribeCore(observer, subscription); } return subscription; } protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel); }
isRequiredSubscribeOnCurrentThread
はスレッド関連なので今回は注目しません。(というか私もよくわかっていません)
一番注目してほしいのはSubscribe
されたらSubscribeCore
が呼ばれることですね。
今まで分かったことをまとめると、ToObservable()
実行 -> ToObservableObservable
生成 -> Subscribe
実行 -> SubscribeCore
が呼ばれる -> ToObservable()
生成&Run()
になります。
そして最後の大詰め、ToObservable
をみます。
class ToObservable : OperatorObserverBase<T, T> { readonly ToObservableObservable<T> parent; public ToObservable(ToObservableObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { this.parent = parent; } public IDisposable Run() { var e = default(IEnumerator<T>); try { e = parent.source.GetEnumerator(); } catch (Exception exception) { OnError(exception); return Disposable.Empty; } ... var flag = new SingleAssignmentDisposable(); flag.Disposable = parent.scheduler.Schedule(self => { if (flag.IsDisposed) { e.Dispose(); return; } bool hasNext; var current = default(T); try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception ex) { e.Dispose(); try { observer.OnError(ex); } finally { Dispose(); } return; } if (hasNext) { observer.OnNext(current); self(); } else { e.Dispose(); try { observer.OnCompleted(); } finally { Dispose(); } } }); return flag; } public override void OnNext(T value) { // do nothing } public override void OnError(Exception error) { try { observer.OnError(error); } finally { Dispose(); } } public override void OnCompleted() { try { observer.OnCompleted(); } finally { Dispose(); } } }
UniRx/ToObservable.cs at master · neuecc/UniRx · GitHub
ブログに貼るには長すぎたかもしれませんね。
OnNext
・OnError
・OnCompleted
は何もせず伝搬するだけなので、無視してOKです。
IDisposable
関連の箇所は今回省きますが、Run
の中に注目していただけると挙動を理解することができます。
public IDisposable Run() { var e = default(IEnumerator<T>); try { e = parent.source.GetEnumerator(); } catch (Exception exception) { OnError(exception); return Disposable.Empty; } ... var flag = new SingleAssignmentDisposable(); flag.Disposable = parent.scheduler.Schedule(self => { if (flag.IsDisposed) { e.Dispose(); return; } bool hasNext; var current = default(T); try { hasNext = e.MoveNext(); if (hasNext) current = e.Current; } catch (Exception ex) { e.Dispose(); try { observer.OnError(ex); } finally { Dispose(); } return; } if (hasNext) { observer.OnNext(current); self(); } else { e.Dispose(); try { observer.OnCompleted(); } finally { Dispose(); } } }); return flag; }
while(true)
の中でe.MoveNext()
やe.Current
をしていますが、これはforeach
を愚直に書いていった場合にこのようになります。
エラーなどの考慮をしているのでこうなっていますが、イメージとしては以下のコードと同じです。
foreach (var current in values) { Observer.OnNext(current); } Observer.OnCompleted();
というわけで、使い方で述べたようにSubscribe
したらOnNext &OnCompleted
されるということにたどり着けました。
さいごに
Gist
経由でコードをブログに貼った方が見栄えが良いのでそうしたほうがよかったかもしれませんね。
次からは考えます。
ではまた。