Rx覚え書き (Observable以外編)
mono-reactiveを実装していた時の経験をもとに、Rxの諸クラスについて書けることをつらつら書いてみようと思う。とりあえずObservableクラスは膨大なので、それ以外について書く。例によってRxの入門的な記事はぐぐればいくつか出てくると思うので、それらを読んでもらえればと思う。
System.Reactive.Concurrency
ISchedulerの実装がこのネームスペースで公開されている。
ISchedulerのObservableクラスにおける利用には、2つのパターンがある:
- 時間に依存するはずのスケジューリングを、そのISchedulerが定義する「時間」の上で行うたとえば、HistoricalSchedulerを経由することで、時間の経過をプログラマブルに操作することができる。
- 実際のタスク呼び出し処理を、特定の条件下で行う。たとえば、SynchronizationContextSchedulerは、必ずSynchronizationContextを経由してタスクを実行するため、UI変更を行うコードをRxでもUIスレッドで実行できるようになる。
ImmediateSchedulerとCurrentThreadSchedulerは、現在のスレッド上で同期的に指定されたタスクを実行する(つまり後者)。この2つの違いは、再帰的なタスクの扱いにあるのだけど、FAQなので詳しい説明は余所に丸投げする。
EventLoopScheduler, NewThreadScheduler, TaskPoolScheduler, ThreadPoolSchedulerは、それぞれの方法で生成されたスレッド上でタスクを実行するために存在するスケジューラーだ(つまり後者)。
SynchronizationContextSchedulerは、指定されたタスクを、コンストラクタ引数のSynchronizationContext上で実行する(つまり後者)。その利点は例示で説明してきた通りだ。
HistoricalScheduler(Base)とVirtualTimeScheduler(Base)、そしてTestScheduler (Microsoft.Reactive.TestingでもMono.Reactive.Testingでも)は、時間を任意に進めることが出来るスケジューラーだ(つまり前者)。もしIObservableの実装クラスが、そのSubscribe()の内部において、適切にIScheduler.Schedule()を使用して時間依存の処理を実装していれば、これらのスケジューラーを使うことで、「時間」を好きなように制御しながらタスクを実行することができる。「1分待ってタイムアウトする」コードをテストするのに、実際に1分待つ必要はなくなる。
ISchedulerで時間を制御するために必要なこと
独自のObservable拡張メソッドを定義する場合など、IObservableを返すコードを実装する場合、時間を制御するISchedulerを有効に活用するためには、以下の点に注意する必要がある。
- IObservable上で時間に依存する処理を自ら書かずにIScheduler.Schedule()で処理することが重要だ。たとえば(初期のmono-reactiveがそうだったように)、Delay()関数の中でThread.Sleep(1000)を呼び出していたら、いかにこれらのスケジューラーを使用していても、1秒経過しない限り実際のタスクは実行されない。 https://github.com/atsushieno/mono-reactive/commit/55c94c2119b4d3aa398874ff33615ba2520f82d3
- 時間の計算を行う場合は、ISchedulerのNowプロパティを使って計算することが重要だ。たとえば「土日にはタスクAを、そうでなければタスクBを行う」といった処理を書く場合、DateTime.NowやDateTimeOffset.Nowを使用していたら、その結果はテストを実行した日時に影響を受けてしまう。注意すべきは、Scheduler.Nowも使ってはいけないということだ。これはあくまでIScheduler実装の内部で使われるためにあると考えた方が良い。
(mono-reactiveのObservable実装も、これらを考慮している。)
System.Reactive.Disposables
さまざまなIDisposableの実装がこのネームスペースで公開されている。
IDisposableはRxでは重要なインターフェースであり、様々な場面で利用されている。
- IScheduler.Schedule()を使用して何らかのコード (ActionやFunc) をスケジュールした場合、戻り値のIDisposableをDispose()することで、スケジュールした時間に至っていないタスクをキャンセルできる。
- IObservable.Subscribe()を使用して何らかのリアクション (IObserverやAction) を登録した場合、戻り値のIDisposableをDispose()することで、その登録を解除できる。イベントハンドラであれば、IObservable.Subscribe()がadd、IDisposable.Dispose()がremoveに相当する。
- IConnectableObservable.Connect()を使用してhot observableからIObserverが通知を受け取るようにした場合、戻り値のIDisposableをDispose()することで、通知を受け取らない状態に戻すことができる。
System.Reactive.Disposablesの各クラスは、Rxの機能を使うだけの人にとっては、あまり使う機会が無い、かもしれない。IObservableを実装したり、Rxの諸機能を拡張したりする人が活用するためにあると言える。
- ContextDisposable : SynchronizationContextを使用するクラスで使用する。渡された(別の)IDisposableオブジェクトをSynchronizationContext.Post()の中でDispose()する。(mono-reactive 0.1ではこれが使われていなかったが、修正した)
- ScheduledDisposable : ISchedulerを経由して、(別の)IDisposableをDispose()しなければならない場面で使用する(UI処理を呼び出すためにSynchronizationContextScheduler.Schedule()を経由しなければならない場合など)。
- CancellationDisposable : TaskPoolSchedulerの内部で活用される。(特に必須の存在でもなかったので、mono-reactive 0.1では使用していなかった。)
- CompositeDisposable : 関連する複数のIDisposableを一括してDispose()する場合に活用出来る。たとえば複数のIObservableをMerge()する場合、Merge()は戻り値にCompositeDisposableを返し、その中に引数の各IObservableにSubscribe()した結果をまとめて保持しておくことができる。
- MultipleAssignmentDisposable : DisposableプロパティにsetされたIDisposableオブジェクトを1つだけ保持し、自身がDispose()された時にはそれを直ちにDispose()する。(なかなか使いどころが無かったが、TaskPoolSchedulerで、タスクをSchedule()するまで待つ間はCancellationDisposableを、それ以降はSchedule()の戻り値を設定するように活用した。)
- SerialDisposable : Disposableプロパティに設定されたIDisposableを、自身がDispose()された時に直ちにDispose()する点はMultipleAssignmentDisposableと同様だが、このプロパティが再度設定された場合、以前に設定されたIDisposableプロパティが、直ちにDispose()される。これはConcat()のように、常に1つのIObservableがSchedule()されてIDisposableを保持・破棄する必要がある場面で活用出来る(mono-reactiveではまだ問題があってこれを実現出来ていない)
- SingleAssignmentDisposable : Disposableプロパティに設定されたIDisposableをdisposeするやり方はMultipleAssignmentDisposableと同様だが、複数回IDisposableが設定されると例外を投げる。複数回の設定を想定していない場面で使用する。
- BooleanDisposable : 自身がDispose()されたかどうかをIsDisposableプロパティで明示する。(同プロパティがSingleAssignmentDisposableなどで存在しているため、mono-reactiveでは今のところ使用していない。)
- RefCountDisposable : RefCount()関数で活用出来る(mono-reactiveではまだ使用していない)
DisposableプロパティとIsDisposedのメリット
SingleAssignmentDisposable, MultipleAssignmentDisposable, SerialDisposable, CompositeDisposableのもう一つの大きな利点は、Dispose()を遅延評価出来る点である。
たとえば、EventLoopSchedulerにdueTime付きでFuncを登録した場合、一定時間が経過したらそのfuncを呼び出してIDisposableをdisposeする処理対象にしなければならない。一方でdueTimeに至る前にSchedule()の戻り値がキャンセルされた場合は、そのfunc自体が呼び出されないことになるので、結果的にIDisposableも返されず、処理する必要がない。
Rxではこういう場面が多々存在するが、その度に if (childDisposable != null) childDisposable.Dispose() のようなコードを書くのは煩雑だ。また、Schedule()の戻り値が先にDispose()されてから、タスクがIDisposableを生成する(返す)ような状況が発生することもしばしばある。このような場面では、「Disposableプロパティに設定されたオブジェクトがあれば、それをDispose()する。既に自身がDispose()されていたら、その後Disposableプロパティに設定されたIDisposableも直ちにDispose()する」という動作が、処理を簡潔にしてくれるというわけだ。
System.Reactive.Subjects
ここにはobservableでありかつobserverにもなるISubjectの型がある。
ISubjectの実装であるAsyncSubject, BehaviorSubject, ReplaySubjectの挙動の違いとそれがどのように活かされているかを知るには、IConnectableObservableを返すObservableのメソッドを見ると良い。以下のクラスが、それぞれ対称的に使用されている。
- Publish() : BehaviorSubject(デフォルト値あり)またはSubject(デフォルト値なし)
- PublishLast() : AsyncSubject
- Replay() : ReplaySubject
ISubjectは、他のIObservableで受け取ったnext/error/completedの各イベントを、そのまま他のIObserverにSubscribe()で転送できるので、mono-reactiveではObservableの実装で多大に活用している。(実際には受け取り済みのイベントをバッファリング出来るようにReplaySubjectが多く使用されているが、将来のバージョンで不必要なバッファリングを行わないように変更したいと考えている。)
その他のネームスペース
System.Reactive.Joinsネームスペースには、Observable.And()で使用されるPatternと、さらにThen()で返されるPlanがある。PlanはObservable.When()の引数になる(それ以外に用途が無い)。これらの型はpublic APIとしてはほとんど意味が無く、実装側としても特別に重要な存在ではない。ActionやFuncの引数の数に引きずられた結果として型が多すぎるので、ネームスペースを隔離したものと考えられる。
System.Reactive.Linqネームスペースには、RxのコアとなるObservableと、IGroupedObservableがある。ObservableはRxの中心的な存在であり、そのメソッドを個別に説明されるべきものだ。IGroupedObservableはGroupBy()で使われるkeyed observableで、それほど重要な存在ではない。
System.Reactive.Threading.Tasksネームスペースでは、Task Parallel Library (System.Threading.Tasks) に基づく機能を実装している。Silverlight4 / WP7.5以前の環境でRxを使えるようにするために切り離されたネームスペースということになろう。
System.Reactiveネームスペースには、「その他」とでも言うべきいくつかのクラスが存在している。
- EventPattern、IEventPatternSource、IEventSourceは、イベント処理まわりのメソッドに関連する。
- NotificationおよびNotificationKindは、Observable.Materialize()に関連する。
- TimeIntervalは、Observable.TimeInterval()の戻り値として使われる。
- Timestampedは、Observable.Timestamp()の戻り値として使われる。
- UnitはIObservable
のTが意味を持たない場合に使われる。たとえばTask`1.ToObservable()とは異なりTask.ToObservable()の戻り値のIObservableのジェネリック型引数には意味がないので、Unitが使われる。 - ObserverはIObserverの拡張メソッドを定義している。
Systemネームスペースには、ObservableExtensionsのみ存在しており、大半がObserverのメソッドを呼び出すだけで実装出来ている。いずれもObservableクラスで拡張することは出来たはずだが、Systemネームスペースにある型のみで実現出来るメソッドは、このネームスペースで実装しておけば、System.Reactive.*をusingでインポートする必要もなくなる、という意図であろう。
続く...?
Observable以外編と書いたのだけど、Observableについて書くかどうかは今後の気分次第。