はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【Unity】UniRxをクイズ形式で学んである程度使えることを目指す

はじめに

先日UniRxに関する非常に分かりやすい記事を見つけました。

qiita.com

いままでUniRxの中身をブラックボックスと捉えて表面上の使い方だけを学んでいたのですが、しっかりと使うためにはやはり内部構造を知るのは欠かせません。

そこでこちらの記事ではコードを分かりやすく省略などはしてくれているものの、一から作ることでどのように動作しているのかを体系的に学べます。

またこの付録?として記事の筆者がクイズを出していたので、そちらを今回は解いていきたいと思います。

github.com

Cold Observable

クイズ1(Return)

Cold Observableを使った問題。Subscribeした瞬間にOnNextが実行される。

var observer = new TestObserver<int>();

// Q. 1を出力する Cold Observable をつくれ
var observable = Observable.Return<int>(1);
observable.Subscribe(observer);

クイズ2(ReturnUnit)

UnitUniRxでは頻出の構造体。イベントのタイミングだけ知らせたいや通知するデータが何もないときに使う。

var observer = new TestObserver<Unit>();

// Q. Unitを出力する Cold Observableをつくれ
var observable = Observable.ReturnUnit();
observable.Subscribe(observer);

クイズ3(Throw)

SubscribeしたらOnErrorが実行されるCold Observable

var observer = new TestObserver<Unit>();

// Q. errorというmessageのExceptionを出力する Cold Observableをつくれ
var observable = Observable.Throw<Unit>(new System.Exception("error"));
observable.Subscribe(observer);

クイズ4(Empty)

SubscribeしたらOnCompletedが実行される。Observable.Throwと比較してObservable.Emptyと名前がやや分かりにくい。

var observer = new TestObserver<Unit>();

// Q. Completeを出力する Cold Observableをつくれ
var observable = Observable.Empty<Unit>();
observable.Subscribe(observer);

クイズ5(Never)

Subscribeしてもなにも呼ばれない。OnNext,OnError,OnCompletednever発火されない。

var observer = new TestObserver<Unit>();

// Q. 何も出力しない Cold Observableをつくれ
var observable = Observable.Never<Unit>();
observable.Subscribe(observer);

クイズ6(Create)

Observable.Createを使った王道Cold Observable。Subscribe -> OnNext x3 -> OnCompletedが一通り行われる。

またDisposable.Emptyを返すことで、購読解除することができない。(おそらくあってるよね?)

var observer = new TestObserver<int>();

// Q. 1,2,3を出力して、Completeする Cold Observableを作れ
var observable = Observable.Create<int>(_observer =>
{
    _observer.OnNext(1);
    _observer.OnNext(2);
    _observer.OnNext(3);
    _observer.OnCompleted();
    return Disposable.Empty;
});
observable.Subscribe(observer);

クイズ7(Range)

Subscribe -> OnNext(1~10) -> OnCompletedを行うCold Observable

var observer = new TestObserver<int>();

// Q. 1から10まで出力する Cold Observableをつくれ
var observable = Observable.Range(1, 10);
observable.Subscribe(observer);

Combinator

クイズ1(Interval)

Observable.Intervalにより10msごとにOnNextが呼ばれる。Scheduler.ThreadPoolScheduler.MainThreadがあるが、その違いはこちら。

  • MainThread : デフォルト。メインスレッド上で動き(実態はコルーチン)、精度はやや悪い。
  • ThreadPool: Thread.Sleepを用いて時間の計測。精度は良い。メインスレッドだと他の処理を止めてしまうので、別スレッドで動作している。

【UniRx】Schedulerとは何なのか - Qiita

var observer = new TestObserver<long>();

// Q. 10msごとに値を排出するobservableをつくれ (Scheduler.ThreadPoolを利用すること)
// XXX: 実行タイミングによっては失敗するかもしれない
var observable = Observable.Interval(TimeSpan.FromMilliseconds(10), Scheduler.ThreadPool);
var disposable = observable.Subscribe(observer);

また使い終わったら必ずdisposable.Dispose();をして購読を解除(UnSubscribe)をしよう。

クイズ2(CombineLatest)

それぞれの最新のメッセージを保持し、いずれかのストリームにメッセージが流れてきたタイミングで流れる。

こちらの記事参照。
UniRxのZip, ZipLatest, CombineLatestの挙動の比較 - プログラミングで世界を変える

var observer = new TestObserver<string>();
var subject1 = new Subject<string>();
var subject2 = new Subject<int>();

// Q. subject1,subject2の内容をstringでつなげるobservableをつくれ
var observable = Observable.CombineLatest(subject1, subject2, (name, id) => $"{name}{id}");
observable.Subscribe(observer);

クイズ3(Concat)

subject1.OnNextはOK,subject2.OnNextはブロック -> subject1.OnCompleted -> subject2.OnNextを発火可能

var observer = new TestObserver<int>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();

// Q. subject1がcompleteしたあとにsubject2を流すobservableをつくれ
var observable = Observable.Concat(subject1, subject2);
observable.Subscribe(observer);

クイズ4(Merge)

subject1.OnNextでもsubject2.OnNextでもobserver.OnNextを発火する。

var observer = new TestObserver<int>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();

// Q. subject1, subject2を組み合わせ、きた順に値を流すobservableをつくれ
var observable = Observable.Merge(subject1, subject2);
observable.Subscribe(observer);

クイズ5(Zip)

1つずつ揃ったタイミングで流れる。
UniRxのZip, ZipLatest, CombineLatestの挙動の比較 - プログラミングで世界を変える

var observer = new TestObserver<IList<int>>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();

// Q. subject1, subject2の値をそれぞれ順番道理にペアで組み合わせて出力するobservableをつくれ
var observable = Observable.Zip(subject1, subject2);
observable.Subscribe(observer);

クイズ6(Repeat)

Subscribeされると値を繰り返し発行(OnNext)する。

var observer = new TestObserver<int>();

// A.
var observable = Observable.Repeat(1, 3);
observable.Subscribe(observer);

クイズ7(WhenAll)

subject1.OnNext,subject2OnNext,subject3.OnNextの最後の値を保持しておいて、すべてOnCompletedされたらそれらを発行する。

var observer = new TestObserver<int[]>();
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();

// Q. subject1, subject2, subject3 がすべてcompleteしたら、それぞれの最後の値を流すobservableは?
var observable = Observable.WhenAll(subject1, subject2, subject3);
observable.Subscribe(observer);

クイズ8(Timer)

Subscribe -> 1秒待つ -> OnNext -> OnCompleted

var observer = new TestObserver<long>();

// Q. 1ms後に1度だけ値を出力するobservableをつくれ (Scheduler.ThreadPoolを利用すること)
// XXX: 実行タイミングによっては失敗するかもしれない
var observable = Observable.Timer(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool);
observable.Subscribe(observer);

HotObservable

クイズ1(Subject)

Subject知らずしてUniRxを語れず。これだけは最初にマスターしよう。

// Q. IObservableであり、IObserverでもあるHotObservableを宣言しろ
var observableAndObserver = new Subject<int>();

observableAndObserver.Subscribe(testObserver);
observableAndObserver.OnNext(1);
observableAndObserver.OnCompleted();

クイズ2(BehaviorSubject)

最後に発行された値をキャシュし、後からSubscribeされたときにその値を発行してくれる。初期値を設定することもできる

// Q. 最後に1つ値を保持するようなIObservableであり、IObserverでもあるHotObservableを宣言しろ
var observableAndObserver = new BehaviorSubject<int>(1);

var disposable1 = observableAndObserver.Subscribe(testObserver1);   // testObserver1.OnNext(1)
observableAndObserver.OnNext(2);
disposable1.Dispose();

var disposable2 = observableAndObserver.Subscribe(testObserver2);   // testObserver2.OnNext(2)
observableAndObserver.OnNext(3);
disposable2.Dispose();

クイズ3(ReplaySubject)

var testObserver1 = new TestObserver<int>();

// Q. 来た値をすべて記録しておいて、subscribe時に出力するようなObservableを定義しろ
var observableAndObserver = new ReplaySubject<int>();

observableAndObserver.OnNext(1);
observableAndObserver.OnNext(2);
observableAndObserver.OnNext(3);
observableAndObserver.OnCompleted();

observableAndObserver.Subscribe(testObserver1).Dispose();       // OnNext(1),OnNext(2),OnNext(3)

クイズ4(AsyncSubject)

var testObserver1 = new TestObserver<int>();
var testObserver2 = new TestObserver<int>();

// Q. Completeしたときに最後のNextの値を送信する IObservableでもありIObserverでもあるHotObservableを宣言しろ
var observableAndObserver = new AsyncSubject<int>();

// SUBSCRIBE
observableAndObserver.OnNext(1);
observableAndObserver.Subscribe(testObserver1);

observableAndObserver.OnNext(2);
observableAndObserver.OnCompleted();                // 2を発行
observableAndObserver.Subscribe(testObserver2);

Operator

クイズ1(Select)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 来た値を2乗して返すOperatorは?
subject
    .Select(x => x* x)
    .Subscribe(observer);

クイズ2(Where)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 偶数のみ通すObservableは?
subject
    .Where(x => x % 2 == 0)
    .Subscribe(observer);

クイズ3(SelectMany)

UniRxではSelectManyはすごい大切。覚えておきたいオペレーター上位入賞。

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 同じ値を2回繰り返すOperatorは?
subject
    .SelectMany(x => new[] { x, x})
    .Subscribe(observer);

クイズ4(SelectMany)

「1つの値を受取り、新しいIObservableを返す」というSelectManyの使い方。これは色々な応用ができる。

一個前の方は「1つ値を受け取って、N個の要素を持つenumerableを返却すると、それらの値をばらして流す」という使い方。

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 来た値をそのまま流してからエラーを起こすoperatorは?
subject
    .SelectMany(x => Observable.Create<int>(_observer =>
    {
        _observer.OnNext(x);
        _observer.OnError(new Exception("error"));
        return Disposable.Empty;
    }))
    .Subscribe(observer);

クイズ5(Buffer)

var observer = new TestObserver<IList<int>>();
var subject = new Subject<int>();

// Q. 2個値をまとめてから流すoperatorは?
subject
    .Buffer(2)
    .Subscribe(observer);

クイズ6(Concat)

var observer = new TestObserver<int>();
var subject = new Subject<int>();
var observable = Observable.Return(10);

// Q. observableの値をsubject終了時の末尾につなげるoperatorは?
subject
    .Concat(observable)
    .Subscribe(observer);

クイズ7(Delay)

Scheduler.ThreadPoolを使うことで別スレッドに移動するので、observerOnNextは別スレッドにて呼ばれるのでUnityAPIを用いるときは要注意。

var observer = new TestObserver<int>();

// Q. 別Threadにて1ms待機してから値を流すoperatorは?
Observable.Return(1)
    .Delay(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool)
    .Subscribe(observer);

クイズ8(Distinct)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 一度流した値を2度流さないoperatorは?
subject
    .Distinct()
    .Subscribe(observer);

クイズ9(DistinctUntilChanged)

1 -> 2なら値を発行するが、1 -> 1は流さない。

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. 直前の値が同じであれば流さないOperatorは?
subject
    .DistinctUntilChanged()
    .Subscribe(observer);

クイズ10(First)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. はじめの値1つだけ流して、Completeするoperatorは?
subject
    .First()
    .Subscribe(observer);

クイズ11(Last)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. Completeした時の最後の値を流すOperatorは?
subject
    .Last()
    .Subscribe(observer);

クイズ12(Take)

指定回数OnNextが呼ばれたら自動でOnCompleteが呼ばれる。Firstを複数回にしたイメージ。

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. はじめの値2つのみ流す Operatorは?
subject
    .Take(2)
    .Subscribe(observer);

クイズ13(Skip)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. はじめの1つを無視するoperatorは?
subject
    .Skip(1)
    .Subscribe(observer);

クイズ14(TakeWhile)

0 -> 2 -> 3といった場合に0,2OnNextされた後にOnCompletedされる。

var subject = new Subject<int>();

// Q. 偶数が続く間流すoperatorは?
var observable = subject
    .TakeWhile(it => it % 2 == 0)

クイズ15(SkipWhile)

0 -> 2 -> 3といったときに3以降からずっとOnNextするようにする。

var subject = new Subject<int>();

// Q. 偶数がつづくまで値を流さない operatorは?
var observable = subject
    .SkipWhile(it => it % 2 == 0);

クイズ16(TakeUntil)

とある条件になった時に値を流すストリーム(ここではtrigger)を作成し,ストリームに値が流れるとOnCompletedを呼ぶ。

var subject = new Subject<int>();
var trigger = new Subject<bool>();

// Q. triggerで値がが流れるまで、subjectの値を流すoperatorは?
var observable = subject
    .TakeUntil(trigger);

クイズ17(SkipUntil)

TakeUntilとは逆でSkipUntilの引数で指定したストリームに値が流れたらOnNextを呼べるように。

var subject = new Subject<int>();
var trigger = new Subject<bool>();

// A.
var observable = subject
    .SkipUntil(trigger);

クイズ18(StartWith)

Subscribe時にStartWithの引数の値をOnNext

var subject = new Subject<int>();
var observer = new TestObserver<int>();

// Q. はじめに1つ0を流すoperatorは?
subject
    .StartWith(0)
    .Subscribe(observer);

クイズ19(ThrottleFirst)

これはボタンの処理なんかではよく使いそうなオペレーター。

var subject = new Subject<int>();
var observer = new TestObserver<int>();

// Q. 値が流れたら1msの間来た値を無視するoperatorは?
subject
    .ThrottleFirst(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool)
    .Subscribe(observer);

クイズ20(Throttle)

1 -> 2 -> 3 -> 1ms以上待つ -> 4 -> 5の場合、3,5OnNext

// Q. 最後の値が1msの間で流れなくなるまでまってから流れるoperatorは?
subject
      .Throttle(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool)
     .Subscribe(observer);

クイズ21(Finally)

問題文が1を加えるとなっているが、一つの要素を加えるがおそらく正しい。

var subject = new Subject<int>();
var observer = new TestObserver<int>();
var list = new List<Unit>();

// Q. Completeが流れたときに、listに1を加えるようなoperatorを定義
subject
    .Finally(() => list.Add(Unit.Default))
    .Subscribe(observer);

クイズ22(Timeout)

Subscribeしてから1ms以上たったらOnCompleted

var subject = new Subject<int>();
var observer = new TestObserver<int>();

// Q. 1ms後の値は流さないような operatorは?
subject
    .Timeout(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool)
    .Subscribe(observer);

クイズ23(Catch)

OnErrorが呼ばれたときに、OnNextを呼ぶ。

もしCatchが含まれていない場合は、observerOnErrorが呼ばれるが、Catchがあると呼ばれないことに注意。

var subject = new Subject<string>();
var observer = new TestObserver<string>();

// Q. errorを受け取ってそのmessageを流すoperatorを定義
subject
    .Catch<string, Exception>(error => Observable.Return(error.Message))
    .Subscribe(observer);

クイズ24(CatchIgnore)

var observer = new TestObserver<int>();
var subject = new Subject<int>();

// Q. エラーを無視するoperatorは?
subject
    .CatchIgnore()
    .Subscribe(observer);

クイズ25(Retry)

var observer = new TestObserver<int>();
var count = 0;

// Q. エラーを2度無視して、3回目を通すoperator
Observable.Create<int>(_observer =>
    {
        count++;
        _observer.OnError(new Exception("error" + count));
        return Disposable.Empty;
    })
    .Retry(3)
    .Subscribe(observer);

クイズ26(OnErrorRetry)

var observer = new TestObserver<int>();
var list = new List<string>();

// Q. Errorが来たら無視して、listにExceptionのMessageを追加するoperator
Observable.Create<int>(_observer =>
    {
        _observer.OnError(new Exception("error1"));
        return Disposable.Empty;
    })
    .OnErrorRetry<int, Exception>(error => list.Add(error.Message))
    .Subscribe(observer);

さいごに

想像以上に長くなってしまって記事を書いたことをやや後悔しています。

ただこれを一通り簡単にこなれせるようになったらUniRxの基礎はバッチリかもしれませんね。

ではまた。