はじめに
UniRx
にはSubjectというクラスがあり,イベント駆動でコーディングするにはほぼ必須と言っても過言ではないのでしょうか。
今回はそんなSubject
への理解を深めるために,自作Subject
を作ってみたいと思います。
自作Subject
自作Subject
を作るには,ISubject<T>
・IDisposable
を実装したクラスを作ります。
using System; using System.Collections.Generic; using UniRx; namespace Experimental.Tests { public class CustomSubject<T> : ISubject<T>, IDisposable { // マルチスレッドで動作させたときに不具合が生じないようにスレッド間で同期をとる private readonly object _observerLock = new object(); private bool _isStopped; private bool _isDisposed; // 最後に発生したエラーのキャッシュ用 private Exception _lastError; private List<IObserver<T>> _observers; public CustomSubject() => _observers = new List<IObserver<T>>(); public void OnNext(T value) { lock (_observerLock) { if (_isDisposed) throw new ObjectDisposedException("subject has been disposed."); if (_isStopped) return; foreach (var observer in _observers) observer.OnNext(value); } } public void OnError(Exception error) { if (error == null) throw new ArgumentNullException("error can't be null."); lock (_observerLock) { if (_isDisposed) throw new ObjectDisposedException("subject has been disposed."); if (_isStopped) return; _lastError = error; try { foreach (var observer in _observers) observer.OnError(error); } finally { _isStopped = true; } } } public void OnCompleted() { lock (_observerLock) { if (_isDisposed) throw new ObjectDisposedException("subject has been disposed."); if (_isStopped) return; try { foreach (var observer in _observers) observer.OnCompleted(); } finally { _isStopped = true; } } } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) throw new ArgumentException("observer can't be null."); lock (_observerLock) { if (_isDisposed) throw new ObjectDisposedException("subject has been disposed."); if (_isStopped) { if (_lastError != null) observer.OnError(_lastError); else observer.OnCompleted(); return Disposable.Empty; } _observers.Add(observer); return new Subscription(this, observer); } } public void Dispose() { lock (_observerLock) { if (!_isDisposed) { _isDisposed = true; _observers.Clear(); _observers = null; _lastError = null; } } } /// <summary> /// Subjectの返り値のDisposeによって購読解除できるようにするために用いるクラス /// </summary> public class Subscription : IDisposable { private readonly IObserver<T> _observer; private readonly CustomSubject<T> _parent; public Subscription(CustomSubject<T> parent, IObserver<T> observer) { _parent = parent; _observer = observer; } public void Dispose() { _parent._observers.Remove(_observer); } } } }
以下の実際のコードとこの前購入したUniRx/UniTask完全理解という本を参考にコードを書いてみました。
github.com
マルチスレッドに対応する箇所と,Subscription
クラスのインスタンスを返り値として返す箇所を除けば思ったよりシンプルな構造ではないでしょうか。
Subscribe
でObserver
をリストにためるOnNext
・OnError
・OnComplete
でリストにためたObserver
の適当なメソッド実行
テスト
実際に動作するかTest Runner
を使って調べてみましょう。
using NUnit.Framework; using UniRx; namespace Experimental.Tests { public class CustomSubjectTest { [Test] public void CustomSubjectTestSimplePasses() { var nextCount = 0; var errorCount = 0; var completeCount = 0; // 独自Subjectを作成 var subject = new CustomSubject<Unit>(); // 購読をする var disposable = subject.Subscribe(x => nextCount++, _ => errorCount++, () => completeCount++); // OnNextを呼ぶ subject.OnNext(default); // nextCount = 1になる Assert.AreEqual(nextCount, 1); Assert.AreEqual(errorCount, 0); Assert.AreEqual(completeCount, 0); // 購読解除をしてみる disposable.Dispose(); // OnNextをしても,購読しているobserverはいないので実質なにもしない subject.OnNext(default); Assert.AreEqual(nextCount, 1); Assert.AreEqual(errorCount, 0); Assert.AreEqual(completeCount, 0); // 再度購読 disposable = subject.Subscribe(x => nextCount++, _ => errorCount++, () => completeCount++); // OnNext + OnComplate subject.OnNext(default); subject.OnCompleted(); subject.OnNext(default); // subject.isStop = trueなので,意味を成さない(OnCompleteの中でDisposeするよう実装した方が良かった??) Assert.AreEqual(nextCount, 2); Assert.AreEqual(errorCount, 0); Assert.AreEqual(completeCount, 1); // Disposeをしてお片付け subject.Dispose(); } } }
動作させてみたところ、無事チェックがついていたので大丈夫そうです。
最後に
実際のコードをみてみると,OnError
・OnComplete
が実行されたときはisStop
のフラグが立つだけでDispose
されてない?ような感じでした。(私の勘違いの可能性あり)
しかし購入した本ではDispose
していて、詳細はよくわかっていません。
少なくともsubject
のインスタンスにDispose
をしたほうが良いことだけは確かなので、そこだけは忘れずにしたいと思います。(SubjectがIDisposableを実装しているので当然といえば当然だが)
ではまた。