はなちるのマイノート

Unityをメインとした技術ブログ。自分らしくまったりやっていきたいと思いますー!

【Unity】UniRxで使える自作Subjectを作ってみる

f:id:hanaaaaaachiru:20210303200339p:plain

はじめに

UniRxにはSubjectというクラスがあり,イベント駆動でコーディングするにはほぼ必須と言っても過言ではないのでしょうか。

今回はそんなSubjectへの理解を深めるために,自作Subjectを作ってみたいと思います。

自作Subject

自作Subjectを作るには,ISubject<T>IDisposableを実装したクラスを作ります。

using System;
using System.Collections.Generic;
using UniRx;

namespace Experimental.Tests
{
    public class CustomSubject<T> : ISubject<T>, IDisposable
    {
        // マルチスレッドで動作させたときに不具合が生じないようにスレッド間で同期をとる
        private readonly object _observerLock = new object();

        private bool _isStopped;
        private bool _isDisposed;
        
        // 最後に発生したエラーのキャッシュ用
        private Exception _lastError;
        
        private List<IObserver<T>> _observers;

        public CustomSubject()
            => _observers = new List<IObserver<T>>();

        public void OnNext(T value)
        {
            lock (_observerLock)
            {
                if (_isDisposed) throw new ObjectDisposedException("subject has been disposed.");
                if (_isStopped) return;

                foreach (var observer in _observers)
                    observer.OnNext(value);
            }
        }
        
        public void OnError(Exception error)
        {
            if (error == null) throw new ArgumentNullException("error can't be null.");
            lock (_observerLock)
            {
                if (_isDisposed) throw new ObjectDisposedException("subject has been disposed.");
                if (_isStopped) return;

                _lastError = error;

                try
                {
                    foreach (var observer in _observers)
                        observer.OnError(error);
                }
                finally
                {
                    _isStopped = true;
                }
            }
        }
        
        public void OnCompleted()
        {
            lock (_observerLock)
            {
                if (_isDisposed) throw new ObjectDisposedException("subject has been disposed.");
                if (_isStopped) return;

                try
                {
                    foreach (var observer in _observers)
                        observer.OnCompleted();
                }
                finally
                {
                    _isStopped = true;
                }
            }
        }
        
        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null) throw new ArgumentException("observer can't be null.");
            
            lock (_observerLock)
            {
                if (_isDisposed) throw new ObjectDisposedException("subject has been disposed.");

                if (_isStopped)
                {
                    if (_lastError != null) observer.OnError(_lastError);
                    else observer.OnCompleted();
                    return Disposable.Empty;
                }
                
                _observers.Add(observer);
                return new Subscription(this, observer);
            }
        }

        public void Dispose()
        {
            lock (_observerLock)
            {
                if (!_isDisposed)
                {
                    _isDisposed = true;
                    _observers.Clear();
                    _observers = null;
                    _lastError = null;
                }
            }
        }
        
        /// <summary>
        /// Subjectの返り値のDisposeによって購読解除できるようにするために用いるクラス
        /// </summary>
        public class Subscription : IDisposable
        {
            private readonly IObserver<T> _observer;
            private readonly CustomSubject<T> _parent;

            public Subscription(CustomSubject<T> parent, IObserver<T> observer)
            {
                _parent = parent;
                _observer = observer;
            }
            
            public void Dispose()
            {
                _parent._observers.Remove(_observer);
            }
        }
    }
}

以下の実際のコードとこの前購入したUniRx/UniTask完全理解という本を参考にコードを書いてみました。
github.com


マルチスレッドに対応する箇所と,Subscriptionクラスのインスタンスを返り値として返す箇所を除けば思ったよりシンプルな構造ではないでしょうか。

  • SubscribeObserverをリストにためる
  • OnNextOnErrorOnCompleteでリストにためたObserverの適当なメソッド実行

テスト

実際に動作するかTest Runnerを使って調べてみましょう。

using NUnit.Framework;
using UniRx;

namespace Experimental.Tests
{
    public class CustomSubjectTest
    {
        [Test]
        public void CustomSubjectTestSimplePasses()
        {
            var nextCount = 0;
            var errorCount = 0;
            var completeCount = 0;
            
            // 独自Subjectを作成
            var subject = new CustomSubject<Unit>();

            // 購読をする
            var disposable = subject.Subscribe(x => nextCount++, _ => errorCount++, () => completeCount++);
            
            // OnNextを呼ぶ
            subject.OnNext(default);

            // nextCount = 1になる
            Assert.AreEqual(nextCount, 1);
            Assert.AreEqual(errorCount, 0);
            Assert.AreEqual(completeCount, 0);
            
            // 購読解除をしてみる
            disposable.Dispose();
            
            // OnNextをしても,購読しているobserverはいないので実質なにもしない
            subject.OnNext(default);
            Assert.AreEqual(nextCount, 1);
            Assert.AreEqual(errorCount, 0);
            Assert.AreEqual(completeCount, 0);
            
            // 再度購読
            disposable = subject.Subscribe(x => nextCount++, _ => errorCount++, () => completeCount++);
            
            // OnNext + OnComplate
            subject.OnNext(default);
            subject.OnCompleted();
            subject.OnNext(default);    // subject.isStop = trueなので,意味を成さない(OnCompleteの中でDisposeするよう実装した方が良かった??)
            Assert.AreEqual(nextCount, 2);
            Assert.AreEqual(errorCount, 0);
            Assert.AreEqual(completeCount, 1);

            // Disposeをしてお片付け
            subject.Dispose();
        }
    }
}

動作させてみたところ、無事チェックがついていたので大丈夫そうです。

最後に

実際のコードをみてみると,OnErrorOnCompleteが実行されたときはisStopのフラグが立つだけでDisposeされてない?ような感じでした。(私の勘違いの可能性あり)

しかし購入した本ではDisposeしていて、詳細はよくわかっていません。

少なくともsubjectのインスタンスにDisposeをしたほうが良いことだけは確かなので、そこだけは忘れずにしたいと思います。(SubjectがIDisposableを実装しているので当然といえば当然だが)

ではまた。