はじめに
先日UniRx
に関する非常に分かりやすい記事を見つけました。
いままでUniRx
の中身をブラックボックスと捉えて表面上の使い方だけを学んでいたのですが、しっかりと使うためにはやはり内部構造を知るのは欠かせません。
そこでこちらの記事ではコードを分かりやすく省略などはしてくれているものの、一から作ることでどのように動作しているのかを体系的に学べます。
またこの付録?として記事の筆者がクイズを出していたので、そちらを今回は解いていきたいと思います。
- はじめに
- Cold Observable
- Combinator
- HotObservable
- Operator
- クイズ1(Select)
- クイズ2(Where)
- クイズ3(SelectMany)
- クイズ4(SelectMany)
- クイズ5(Buffer)
- クイズ6(Concat)
- クイズ7(Delay)
- クイズ8(Distinct)
- クイズ9(DistinctUntilChanged)
- クイズ10(First)
- クイズ11(Last)
- クイズ12(Take)
- クイズ13(Skip)
- クイズ14(TakeWhile)
- クイズ15(SkipWhile)
- クイズ16(TakeUntil)
- クイズ17(SkipUntil)
- クイズ18(StartWith)
- クイズ19(ThrottleFirst)
- クイズ20(Throttle)
- クイズ21(Finally)
- クイズ22(Timeout)
- クイズ23(Catch)
- クイズ24(CatchIgnore)
- クイズ25(Retry)
- クイズ26(OnErrorRetry)
- さいごに
Cold Observable
クイズ1(Return)
Cold Observable
を使った問題。Subscribe
した瞬間にOnNext
が実行される。
var observer = new TestObserver<int>(); // Q. 1を出力する Cold Observable をつくれ var observable = Observable.Return<int>(1); observable.Subscribe(observer);
クイズ2(ReturnUnit)
Unit
はUniRx
では頻出の構造体。イベントのタイミングだけ知らせたいや通知するデータが何もないときに使う。
var observer = new TestObserver<Unit>(); // Q. Unitを出力する Cold Observableをつくれ var observable = Observable.ReturnUnit(); observable.Subscribe(observer);
クイズ3(Throw)
Subscribe
したらOnError
が実行されるCold Observable
。
var observer = new TestObserver<Unit>(); // Q. errorというmessageのExceptionを出力する Cold Observableをつくれ var observable = Observable.Throw<Unit>(new System.Exception("error")); observable.Subscribe(observer);
クイズ4(Empty)
Subscribe
したらOnCompleted
が実行される。Observable.Throw
と比較してObservable.Empty
と名前がやや分かりにくい。
var observer = new TestObserver<Unit>(); // Q. Completeを出力する Cold Observableをつくれ var observable = Observable.Empty<Unit>(); observable.Subscribe(observer);
クイズ5(Never)
Subscribe
してもなにも呼ばれない。OnNext
,OnError
,OnCompleted
もnever
発火されない。
var observer = new TestObserver<Unit>(); // Q. 何も出力しない Cold Observableをつくれ var observable = Observable.Never<Unit>(); observable.Subscribe(observer);
クイズ6(Create)
Observable.Create
を使った王道Subscribe -> OnNext x3 -> OnCompleted
が一通り行われる。
またDisposable.Empty
を返すことで、購読解除することができない。(おそらくあってるよね?)
var observer = new TestObserver<int>(); // Q. 1,2,3を出力して、Completeする Cold Observableを作れ var observable = Observable.Create<int>(_observer => { _observer.OnNext(1); _observer.OnNext(2); _observer.OnNext(3); _observer.OnCompleted(); return Disposable.Empty; }); observable.Subscribe(observer);
クイズ7(Range)
Subscribe -> OnNext(1~10) -> OnCompleted
を行うCold Observable
。
var observer = new TestObserver<int>(); // Q. 1から10まで出力する Cold Observableをつくれ var observable = Observable.Range(1, 10); observable.Subscribe(observer);
Combinator
クイズ1(Interval)
Observable.Interval
により10msごとにOnNext
が呼ばれる。Scheduler.ThreadPool
とScheduler.MainThread
があるが、その違いはこちら。
- MainThread : デフォルト。メインスレッド上で動き(実態はコルーチン)、精度はやや悪い。
- ThreadPool: Thread.Sleepを用いて時間の計測。精度は良い。メインスレッドだと他の処理を止めてしまうので、別スレッドで動作している。
【UniRx】Schedulerとは何なのか - Qiita
var observer = new TestObserver<long>(); // Q. 10msごとに値を排出するobservableをつくれ (Scheduler.ThreadPoolを利用すること) // XXX: 実行タイミングによっては失敗するかもしれない var observable = Observable.Interval(TimeSpan.FromMilliseconds(10), Scheduler.ThreadPool); var disposable = observable.Subscribe(observer);
また使い終わったら必ずdisposable.Dispose();
をして購読を解除(UnSubscribe)をしよう。
クイズ2(CombineLatest)
それぞれの最新のメッセージを保持し、いずれかのストリームにメッセージが流れてきたタイミングで流れる。
こちらの記事参照。
UniRxのZip, ZipLatest, CombineLatestの挙動の比較 - プログラミングで世界を変える
var observer = new TestObserver<string>(); var subject1 = new Subject<string>(); var subject2 = new Subject<int>(); // Q. subject1,subject2の内容をstringでつなげるobservableをつくれ var observable = Observable.CombineLatest(subject1, subject2, (name, id) => $"{name}{id}"); observable.Subscribe(observer);
クイズ3(Concat)
subject1.OnNextはOK,subject2.OnNextはブロック -> subject1.OnCompleted -> subject2.OnNextを発火可能
。
var observer = new TestObserver<int>(); var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); // Q. subject1がcompleteしたあとにsubject2を流すobservableをつくれ var observable = Observable.Concat(subject1, subject2); observable.Subscribe(observer);
クイズ4(Merge)
subject1.OnNext
でもsubject2.OnNext
でもobserver.OnNext
を発火する。
var observer = new TestObserver<int>(); var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); // Q. subject1, subject2を組み合わせ、きた順に値を流すobservableをつくれ var observable = Observable.Merge(subject1, subject2); observable.Subscribe(observer);
クイズ5(Zip)
1つずつ揃ったタイミングで流れる。
UniRxのZip, ZipLatest, CombineLatestの挙動の比較 - プログラミングで世界を変える
var observer = new TestObserver<IList<int>>(); var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); // Q. subject1, subject2の値をそれぞれ順番道理にペアで組み合わせて出力するobservableをつくれ var observable = Observable.Zip(subject1, subject2); observable.Subscribe(observer);
クイズ6(Repeat)
Subscribe
されると値を繰り返し発行(OnNext)する。
var observer = new TestObserver<int>(); // A. var observable = Observable.Repeat(1, 3); observable.Subscribe(observer);
クイズ7(WhenAll)
subject1.OnNext
,subject2OnNext
,subject3.OnNext
の最後の値を保持しておいて、すべてOnCompleted
されたらそれらを発行する。
var observer = new TestObserver<int[]>(); var subject1 = new Subject<int>(); var subject2 = new Subject<int>(); var subject3 = new Subject<int>(); // Q. subject1, subject2, subject3 がすべてcompleteしたら、それぞれの最後の値を流すobservableは? var observable = Observable.WhenAll(subject1, subject2, subject3); observable.Subscribe(observer);
クイズ8(Timer)
Subscribe -> 1秒待つ -> OnNext -> OnCompleted
。
var observer = new TestObserver<long>(); // Q. 1ms後に1度だけ値を出力するobservableをつくれ (Scheduler.ThreadPoolを利用すること) // XXX: 実行タイミングによっては失敗するかもしれない var observable = Observable.Timer(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool); observable.Subscribe(observer);
HotObservable
クイズ1(Subject)
Subject
知らずしてUniRx
を語れず。これだけは最初にマスターしよう。
// Q. IObservableであり、IObserverでもあるHotObservableを宣言しろ var observableAndObserver = new Subject<int>(); observableAndObserver.Subscribe(testObserver); observableAndObserver.OnNext(1); observableAndObserver.OnCompleted();
クイズ2(BehaviorSubject)
最後に発行された値をキャシュし、後からSubscribeされたときにその値を発行してくれる。初期値を設定することもできる
// Q. 最後に1つ値を保持するようなIObservableであり、IObserverでもあるHotObservableを宣言しろ var observableAndObserver = new BehaviorSubject<int>(1); var disposable1 = observableAndObserver.Subscribe(testObserver1); // testObserver1.OnNext(1) observableAndObserver.OnNext(2); disposable1.Dispose(); var disposable2 = observableAndObserver.Subscribe(testObserver2); // testObserver2.OnNext(2) observableAndObserver.OnNext(3); disposable2.Dispose();
クイズ3(ReplaySubject)
var testObserver1 = new TestObserver<int>(); // Q. 来た値をすべて記録しておいて、subscribe時に出力するようなObservableを定義しろ var observableAndObserver = new ReplaySubject<int>(); observableAndObserver.OnNext(1); observableAndObserver.OnNext(2); observableAndObserver.OnNext(3); observableAndObserver.OnCompleted(); observableAndObserver.Subscribe(testObserver1).Dispose(); // OnNext(1),OnNext(2),OnNext(3)
クイズ4(AsyncSubject)
var testObserver1 = new TestObserver<int>(); var testObserver2 = new TestObserver<int>(); // Q. Completeしたときに最後のNextの値を送信する IObservableでもありIObserverでもあるHotObservableを宣言しろ var observableAndObserver = new AsyncSubject<int>(); // SUBSCRIBE observableAndObserver.OnNext(1); observableAndObserver.Subscribe(testObserver1); observableAndObserver.OnNext(2); observableAndObserver.OnCompleted(); // 2を発行 observableAndObserver.Subscribe(testObserver2);
Operator
クイズ1(Select)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 来た値を2乗して返すOperatorは? subject .Select(x => x* x) .Subscribe(observer);
クイズ2(Where)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 偶数のみ通すObservableは? subject .Where(x => x % 2 == 0) .Subscribe(observer);
クイズ3(SelectMany)
UniRx
ではSelectMany
はすごい大切。覚えておきたいオペレーター上位入賞。
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 同じ値を2回繰り返すOperatorは? subject .SelectMany(x => new[] { x, x}) .Subscribe(observer);
クイズ4(SelectMany)
「1つの値を受取り、新しいIObservableを返す」というSelectMany
の使い方。これは色々な応用ができる。
一個前の方は「1つ値を受け取って、N個の要素を持つenumerableを返却すると、それらの値をばらして流す」という使い方。
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 来た値をそのまま流してからエラーを起こすoperatorは? subject .SelectMany(x => Observable.Create<int>(_observer => { _observer.OnNext(x); _observer.OnError(new Exception("error")); return Disposable.Empty; })) .Subscribe(observer);
クイズ5(Buffer)
var observer = new TestObserver<IList<int>>(); var subject = new Subject<int>(); // Q. 2個値をまとめてから流すoperatorは? subject .Buffer(2) .Subscribe(observer);
クイズ6(Concat)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); var observable = Observable.Return(10); // Q. observableの値をsubject終了時の末尾につなげるoperatorは? subject .Concat(observable) .Subscribe(observer);
クイズ7(Delay)
Scheduler.ThreadPool
を使うことで別スレッドに移動するので、observer
のOnNext
は別スレッドにて呼ばれるのでUnityAPIを用いるときは要注意。
var observer = new TestObserver<int>(); // Q. 別Threadにて1ms待機してから値を流すoperatorは? Observable.Return(1) .Delay(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool) .Subscribe(observer);
クイズ8(Distinct)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 一度流した値を2度流さないoperatorは? subject .Distinct() .Subscribe(observer);
クイズ9(DistinctUntilChanged)
1 -> 2
なら値を発行するが、1 -> 1
は流さない。
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. 直前の値が同じであれば流さないOperatorは? subject .DistinctUntilChanged() .Subscribe(observer);
クイズ10(First)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. はじめの値1つだけ流して、Completeするoperatorは? subject .First() .Subscribe(observer);
クイズ11(Last)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. Completeした時の最後の値を流すOperatorは? subject .Last() .Subscribe(observer);
クイズ12(Take)
指定回数OnNext
が呼ばれたら自動でOnComplete
が呼ばれる。First
を複数回にしたイメージ。
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. はじめの値2つのみ流す Operatorは? subject .Take(2) .Subscribe(observer);
クイズ13(Skip)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. はじめの1つを無視するoperatorは? subject .Skip(1) .Subscribe(observer);
クイズ14(TakeWhile)
0 -> 2 -> 3
といった場合に0,2
はOnNext
された後にOnCompleted
される。
var subject = new Subject<int>(); // Q. 偶数が続く間流すoperatorは? var observable = subject .TakeWhile(it => it % 2 == 0)
クイズ15(SkipWhile)
0 -> 2 -> 3
といったときに3
以降からずっとOnNext
するようにする。
var subject = new Subject<int>(); // Q. 偶数がつづくまで値を流さない operatorは? var observable = subject .SkipWhile(it => it % 2 == 0);
クイズ16(TakeUntil)
とある条件になった時に値を流すストリーム(ここではtrigger
)を作成し,ストリームに値が流れるとOnCompleted
を呼ぶ。
var subject = new Subject<int>(); var trigger = new Subject<bool>(); // Q. triggerで値がが流れるまで、subjectの値を流すoperatorは? var observable = subject .TakeUntil(trigger);
クイズ17(SkipUntil)
TakeUntil
とは逆でSkipUntil
の引数で指定したストリームに値が流れたらOnNext
を呼べるように。
var subject = new Subject<int>(); var trigger = new Subject<bool>(); // A. var observable = subject .SkipUntil(trigger);
クイズ18(StartWith)
Subscribe
時にStartWith
の引数の値をOnNext
。
var subject = new Subject<int>(); var observer = new TestObserver<int>(); // Q. はじめに1つ0を流すoperatorは? subject .StartWith(0) .Subscribe(observer);
クイズ19(ThrottleFirst)
これはボタンの処理なんかではよく使いそうなオペレーター。
var subject = new Subject<int>(); var observer = new TestObserver<int>(); // Q. 値が流れたら1msの間来た値を無視するoperatorは? subject .ThrottleFirst(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool) .Subscribe(observer);
クイズ20(Throttle)
1 -> 2 -> 3 -> 1ms以上待つ -> 4 -> 5
の場合、3,5
をOnNext
。
// Q. 最後の値が1msの間で流れなくなるまでまってから流れるoperatorは? subject .Throttle(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool) .Subscribe(observer);
クイズ21(Finally)
問題文が1を加えるとなっているが、一つの要素を加えるがおそらく正しい。
var subject = new Subject<int>(); var observer = new TestObserver<int>(); var list = new List<Unit>(); // Q. Completeが流れたときに、listに1を加えるようなoperatorを定義 subject .Finally(() => list.Add(Unit.Default)) .Subscribe(observer);
クイズ22(Timeout)
Subscribe
してから1ms以上たったらOnCompleted
。
var subject = new Subject<int>(); var observer = new TestObserver<int>(); // Q. 1ms後の値は流さないような operatorは? subject .Timeout(TimeSpan.FromMilliseconds(1), Scheduler.ThreadPool) .Subscribe(observer);
クイズ23(Catch)
OnError
が呼ばれたときに、OnNext
を呼ぶ。
もしCatch
が含まれていない場合は、observer
のOnError
が呼ばれるが、Catch
があると呼ばれないことに注意。
var subject = new Subject<string>(); var observer = new TestObserver<string>(); // Q. errorを受け取ってそのmessageを流すoperatorを定義 subject .Catch<string, Exception>(error => Observable.Return(error.Message)) .Subscribe(observer);
クイズ24(CatchIgnore)
var observer = new TestObserver<int>(); var subject = new Subject<int>(); // Q. エラーを無視するoperatorは? subject .CatchIgnore() .Subscribe(observer);
クイズ25(Retry)
var observer = new TestObserver<int>(); var count = 0; // Q. エラーを2度無視して、3回目を通すoperator Observable.Create<int>(_observer => { count++; _observer.OnError(new Exception("error" + count)); return Disposable.Empty; }) .Retry(3) .Subscribe(observer);
クイズ26(OnErrorRetry)
var observer = new TestObserver<int>(); var list = new List<string>(); // Q. Errorが来たら無視して、listにExceptionのMessageを追加するoperator Observable.Create<int>(_observer => { _observer.OnError(new Exception("error1")); return Disposable.Empty; }) .OnErrorRetry<int, Exception>(error => list.Add(error.Message)) .Subscribe(observer);
さいごに
想像以上に長くなってしまって記事を書いたことをやや後悔しています。
ただこれを一通り簡単にこなれせるようになったらUniRx
の基礎はバッチリかもしれませんね。
ではまた。