はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【UniRx】IEnumerable<T>.ToObservable<T>を使って値を順番に発行するObservableに変換する

はじめに

今回はIEnumerable<T>.ToObservable<T>について取り上げたいと思います。

github.com

使い方

IEnumerableを実装しているオブジェクト,いわゆるデータの集まりを指すコレクションに対してToObservableメソッドを実行することで、Subjscribeをした際に値を順番に発行することができます。

[Test]
public void ToObservableTest()
{
    var values = new[] {1, 2, 3, 4, 5, 6};

    var result = 0;

    // Subscribeしたとき、「OnNext(1) -> OnNext(2) -> ... -> OnNext(6) -> OnCompleted」が呼ばれる
    values.ToObservable()
        .Subscribe(x => result += x);
        
    Assert.That(result, Is.EqualTo(21));
}

仕組みについて

内部のコードを見てみましょう。

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source)
{
    return ToObservable(source, Scheduler.DefaultSchedulers.Iteration);
}

public static IObservable<T> ToObservable<T>(this IEnumerable<T> source, IScheduler scheduler)
{
    return new ToObservableObservable<T>(source, scheduler);
}

UniRx/Observable.Conversions.cs at master · neuecc/UniRx · GitHub


まずはToObservableを実行することでToObservableObservableのインスタンスを生成します。いわゆるファクトリメソッドですね。

名前がToObservableObservableというなかなかすごい名前をしていますが、仕組みを理解するためにはこちらもコードも見ていきましょう。

internal class ToObservableObservable<T> : OperatorObservableBase<T>
{
    readonly IEnumerable<T> source;
    readonly IScheduler scheduler;

    public ToObservableObservable(IEnumerable<T> source, IScheduler scheduler)
        : base(scheduler == Scheduler.CurrentThread)
    {
        this.source = source;
        this.scheduler = scheduler;
    }

    protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
    {
        return new ToObservable(this, observer, cancel).Run();
    }
}

UniRx/ToObservable.cs at master · neuecc/UniRx · GitHub

基底クラスであるOperatorObservalbeBaseを見ないことには処理の流れは分かりませんが、SubscribeCoreが親クラスにて呼ばれることは想像できます。

またSubscribeCoreが呼ばれるとToObservalbeのインスタンスを生成しRunを実行することを覚えておいてください。

というわけでOperatorObservalbeBaseをみていきましょう。

public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
{
    readonly bool isRequiredSubscribeOnCurrentThread;

    public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
    {
        this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
    }

    public bool IsRequiredSubscribeOnCurrentThread()
    {
        return isRequiredSubscribeOnCurrentThread;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new SingleAssignmentDisposable();

        // note:
        // does not make the safe observer, it breaks exception durability.
        // var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);

        if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
        {
            Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
        }
        else
        {
            subscription.Disposable = SubscribeCore(observer, subscription);
        }

        return subscription;
    }

    protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
}

isRequiredSubscribeOnCurrentThreadはスレッド関連なので今回は注目しません。(というか私もよくわかっていません)

一番注目してほしいのはSubscribeされたらSubscribeCoreが呼ばれることですね。

今まで分かったことをまとめると、ToObservable()実行 -> ToObservableObservable生成 -> Subscribe実行 -> SubscribeCoreが呼ばれる -> ToObservable()生成&Run()になります。

そして最後の大詰め、ToObservableをみます。

class ToObservable : OperatorObserverBase<T, T>
{
    readonly ToObservableObservable<T> parent;

    public ToObservable(ToObservableObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer,
        cancel)
    {
        this.parent = parent;
    }

    public IDisposable Run()
    {
        var e = default(IEnumerator<T>);
        try
        {
            e = parent.source.GetEnumerator();
        }
        catch (Exception exception)
        {
            OnError(exception);
            return Disposable.Empty;
        }

        ...

        var flag = new SingleAssignmentDisposable();
        flag.Disposable = parent.scheduler.Schedule(self =>
        {
            if (flag.IsDisposed)
            {
                e.Dispose();
                return;
            }

            bool hasNext;
            var current = default(T);
            try
            {
                hasNext = e.MoveNext();
                if (hasNext) current = e.Current;
            }
            catch (Exception ex)
            {
                e.Dispose();
                try
                {
                    observer.OnError(ex);
                }
                finally
                {
                    Dispose();
                }

                return;
            }

            if (hasNext)
            {
                observer.OnNext(current);
                self();
            }
            else
            {
                e.Dispose();
                try
                {
                    observer.OnCompleted();
                }
                finally
                {
                    Dispose();
                }
            }
        });

        return flag;
    }

    public override void OnNext(T value)
    {
        // do nothing
    }

    public override void OnError(Exception error)
    {
        try
        {
            observer.OnError(error);
        }
        finally
        {
            Dispose();
        }
    }

    public override void OnCompleted()
    {
        try
        {
            observer.OnCompleted();
        }
        finally
        {
            Dispose();
        }
    }
}

UniRx/ToObservable.cs at master · neuecc/UniRx · GitHub


ブログに貼るには長すぎたかもしれませんね。

OnNextOnErrorOnCompletedは何もせず伝搬するだけなので、無視してOKです。

IDisposable関連の箇所は今回省きますが、Runの中に注目していただけると挙動を理解することができます。

public IDisposable Run()
{
    var e = default(IEnumerator<T>);
    try
    {
        e = parent.source.GetEnumerator();
    }
    catch (Exception exception)
    {
        OnError(exception);
        return Disposable.Empty;
    }

    ...

    var flag = new SingleAssignmentDisposable();
    flag.Disposable = parent.scheduler.Schedule(self =>
    {
        if (flag.IsDisposed)
        {
            e.Dispose();
            return;
        }

        bool hasNext;
        var current = default(T);
        try
        {
            hasNext = e.MoveNext();
            if (hasNext) current = e.Current;
        }
        catch (Exception ex)
        {
            e.Dispose();
            try
            {
                observer.OnError(ex);
            }
            finally
            {
                Dispose();
            }

            return;
        }

        if (hasNext)
        {
            observer.OnNext(current);
            self();
        }
        else
        {
            e.Dispose();
            try
            {
                observer.OnCompleted();
            }
            finally
            {
                Dispose();
            }
        }
    });

    return flag;
}

while(true)の中でe.MoveNext()e.Currentをしていますが、これはforeachを愚直に書いていった場合にこのようになります。

エラーなどの考慮をしているのでこうなっていますが、イメージとしては以下のコードと同じです。

foreach (var current in values)
{
    Observer.OnNext(current);
}
Observer.OnCompleted();

というわけで、使い方で述べたようにSubscribeしたらOnNext &OnCompletedされるということにたどり着けました。

さいごに

Gist経由でコードをブログに貼った方が見栄えが良いのでそうしたほうがよかったかもしれませんね。

次からは考えます。

ではまた。