From dotnet-skills
Builds reactive/event-driven C# with R3, covering Observable<T>, error handling, async dispatch, scheduling, and migration from System.Reactive.
How this skill is triggered — by the user, by Claude, or both
Slash command
/dotnet-skills:r3-reactive-extensionsThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
R3 is [Cysharp's](https://github.com/Cysharp/R3) ground-up reimplementation of Reactive
R3 is Cysharp's ground-up reimplementation of Reactive
Extensions — "the new future of dotnet/reactive and UniRx." It keeps the LINQ-over-events
programming model but rebuilds the core types, error contract, and scheduler to fix
long-standing problems in System.Reactive (Rx.NET). Use this skill when composing event
streams, UI input, timers, or push-based pipelines in C#.
Canonical sources (link to these from code and docs):
Use this skill when:
ReactiveProperty / BindableReactivePropertyTask / async and IAsyncEnumerableSystem.Reactive, UniRx, or IObservable<T> codeNot the right tool for: request/response I/O (use async/await), bounded producer/consumer
with backpressure (use System.Threading.Channels), or server-side stream processing with
batching/backpressure (use Akka.NET Streams). R3, like all Rx, is push-based with no
backpressure. See the csharp-concurrency-patterns skill for choosing between these.
AwaitOperation, Task integration, IAsyncEnumerable round-tripping, ReactiveProperty/MVVM, subjects, and subscription lifecycle.Synchronize, ObserveOn), TimeProvider vs FrameProvider, when each is necessary, and deterministic testing with fake providers.Everything in this skill was validated empirically against R3 1.3.1. Captured output appears in the reference files as evidence.
The author (neuecc)
built R3 to fix concrete defects in System.Reactive:
OnError and unsubscribes forever — "a billion-dollar mistake" for long-lived event
streams (a single bad UI event tears down the whole subscription). R3 routes errors to
OnErrorResume and keeps the subscription alive by default.IScheduler is heavy and confusing. ImmediateScheduler/Merge were measured causing
real server memory/CPU bloat. R3 deletes IScheduler and uses .NET 8's TimeProvider
(wall-clock) plus a new FrameProvider (frame-clock).Observable<T> an abstract class so
all subscriptions funnel through one place, enabling ObservableTracker to list every live
subscription with stack traces.AwaitOperation, FromAsync, ToAsyncEnumerable) instead of pretending events
are pull-based sequences.dotnet add package R3
# Platform glue (pick what applies): R3.WPF, R3.Avalonia, R3.WinForms, R3.Unity (UPM),
# R3.Godot, ObservableCollections.R3, etc. See the repo README for the full list.
using R3;
R3 replaces Rx's interfaces with abstract classes, and replaces Rx's two-method error contract with a single completion that carries a result.
public abstract class Observable<T>
{
public IDisposable Subscribe(Observer<T> observer); // tracked centrally
protected abstract IDisposable SubscribeCore(Observer<T> observer);
}
public abstract class Observer<T> : IDisposable // the observer IS the subscription
{
public void OnNext(T value);
public void OnErrorResume(Exception error); // error WITHOUT unsubscribing
public void OnCompleted(Result result); // success OR failure terminates
}
The grammar is (OnNext | OnErrorResume)* OnCompleted(Result)?. Note the difference from Rx's
OnNext* (OnError | OnCompleted)?: errors and termination are decoupled. An error is just a
notification; only OnCompleted ends the stream, and it carries a Result that is either
Result.Success or Result.Failure(exception).
using R3;
var subscription = Observable
.EveryValueChanged(model, m => m.SearchText) // emits when the property changes
.Debounce(TimeSpan.FromMilliseconds(300)) // Rx called this "Throttle" (see differences)
.DistinctUntilChanged()
.SubscribeAwait(async (text, ct) =>
{
var results = await _api.SearchAsync(text, ct);
Render(results);
}, AwaitOperation.Switch); // cancel the in-flight search on a new keystroke
// Dispose to unsubscribe; or route into a DisposableBag / AddTo(token).
subscription.Dispose();
var subject = new Subject<int>();
subject.Select(x => 100 / x).Subscribe(
onNext: x => Console.WriteLine($"next {x}"),
onErrorResume: e => Console.WriteLine($"errorResume {e.GetType().Name}"),
onCompleted: (Result r) => Console.WriteLine($"completed IsSuccess={r.IsSuccess}"));
subject.OnNext(2); // next 50
subject.OnNext(0); // errorResume DivideByZeroException <-- NOT terminated
subject.OnNext(5); // next 20 <-- subscription is still alive!
subject.OnCompleted(); // completed IsSuccess=True
This is the single biggest behavioral change from Rx. To opt back into classic "an error
terminates the sequence" behavior, insert .OnErrorResumeAsFailure() — the error then flows to
OnCompleted(Result.Failure(e)) and downstream OnNexts stop. Recover with Catch. Full
captured runs and the (deliberately absent) Retry story are in
rx-net-differences.md.
R3's async operators (SubscribeAwait, SelectAwait, WhereAwait, …) take an AwaitOperation
that decides what happens when values arrive faster than the async work completes:
AwaitOperation | Overlap behavior | Typical use |
|---|---|---|
Sequential (default) | Queue values, run one at a time | Ordered processing |
Drop | Ignore new values while one is running | Debounced submit / cooldown |
Switch | Cancel the running one, start the new | Search-as-you-type, latest-wins |
Parallel | Run all concurrently | Independent fan-out |
SequentialParallel | Run concurrently, emit results in order | Parallel map, ordered output |
ThrottleFirstLast | Run first + last of a burst | Leading/trailing sampling |
These were verified to behave exactly as described (including Switch cancelling the superseded
operation's CancellationToken). See async-and-integration-patterns.md.
// Task -> Observable
await Observable.FromAsync(async ct => await LoadAsync(ct)).FirstAsync();
// Observable -> Task (terminal operators return Task<T>)
List<int> all = await source.ToListAsync();
int last = await source.LastAsync();
// IAsyncEnumerable -> Observable, and back
await asyncEnumerable.ToObservable().ForEachAsync(Handle);
await foreach (var x in source.ToAsyncEnumerable()) { /* ... */ }
All verified working. Details and the full terminal-operator list are in async-and-integration-patterns.md.
R3 does not serialize concurrent producers. Like Rx, it assumes the Rx grammar: OnNext
must not be called concurrently or re-entrantly from multiple threads. Operators (Where,
Select, Subject, …) are not internally locked. Pushing OnNext from many threads at once
into a stateful downstream corrupts state — in testing, 20,000 concurrent OnNext calls into
a List<T> subscriber lost ~half the items and threw inside the operator chain.
The fix is to make the boundary explicit:
// Multiple producer threads -> one serialized consumer
subject.Synchronize() // lock-based gate; delivery becomes single-threaded
.Where(x => x.IsValid)
.Subscribe(Handle); // verified: 10000/10000 items, no corruption
// Or marshal onto a context/threadpool, which also serializes delivery:
source.ObserveOnThreadPool().Subscribe(Handle);
// For shared MVVM state written from many threads:
var counter = new SynchronizedReactiveProperty<int>(0); // thread-safe writes
Practical rule: if more than one thread can publish into a stream, put Synchronize() (or an
ObserveOn*) immediately after the source, or use SynchronizedReactiveProperty. Full race
reproductions and outputs are in scheduling-and-concurrency.md.
R3 has two notions of "when," and both are abstractions you can fake in tests:
TimeProvider (the .NET 8 BCL type) = wall-clock time. Used by Delay, Debounce,
Interval, Timer, Timeout. This is what server/business code uses.FrameProvider (R3-specific) = a frame clock. Used by EveryUpdate, DelayFrame(n),
IntervalFrame(n), etc.When is a FrameProvider necessary? Whenever "progress" is measured in render/update ticks instead of elapsed time:
FakeFrameProvider.Advance(n) drives frames with zero real time,
exactly as FakeTimeProvider.Advance(timeSpan) drives the clock.Plain server/business code virtually never needs FrameProvider — that's TimeProvider
territory. Both fakes make time-dependent pipelines fully deterministic; examples in
scheduling-and-concurrency.md.
OnErrorResume as the default: design streams that survive individual bad events..OnErrorResumeAsFailure() when you genuinely want an error to terminate the stream.AwaitOperation deliberately for every async operator (Switch for latest-wins,
Sequential for ordering, Drop for cooldowns).Synchronize() / ObserveOn* after any source that multiple threads publish into.TimeProvider to time operators and a FrameProvider to frame operators so tests can
use FakeTimeProvider / FakeFrameProvider.DisposableBag, CompositeDisposable, or
.AddTo(cancellationToken); turn on ObservableTracker in dev to catch leaks.ReactiveProperty for de-duplicated observable state; BindableReactiveProperty for
XAML-bound state.Debounce (not Throttle), ThrottleLast
(not Sample), Chunk (not Buffer). Retry, GroupBy, Finally, and plain Buffer are
absent in 1.3.1 — see the differences file for replacements.OnNext concurrently/re-entrantly from multiple threads without Synchronize()..Result/.Wait()); they return Task<T> — await them.TimeProvider (BCL): https://learn.microsoft.com/en-us/dotnet/api/system.timeproviderFakeTimeProvider: https://www.nuget.org/packages/Microsoft.Extensions.TimeProvider.Testingcsharp-concurrency-patterns (choosing R3 vs async/await vs Channels vs Akka.NET)npx claudepluginhub aaronontheweb/dotnet-skills --plugin dotnet-skillsGuides C# async/await patterns including Task, ValueTask, async streams, and cancellation for responsive applications. Use when writing asynchronous C# code.
Streams, observables, event-driven reactive systems, backpressure, and reactive programming.
Provides .NET code examples for Pub-Sub patterns using System.Threading.Channels and System.Reactive for event-driven, reactive architectures.