From eventuous
This skill should be used when configuring KurrentDB or EventStoreDB (ESDB) integration with Eventuous. Covers KurrentDBEventStore, AllStreamSubscription, StreamSubscription, StreamPersistentSubscription, AllPersistentSubscription, KurrentDBProducer, and checkpoint stores. Common triggers: 'set up EventStoreDB', 'KurrentDB subscription', 'persistent subscription', 'ESDB event store', 'EsdbEventStore migration'.
How this skill is triggered — by the user, by Claude, or both
Slash command
/eventuous:eventuous-dotnet-kurrentdbThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Infrastructure-specific guidance for using Eventuous with KurrentDB (EventStoreDB) as the event store, subscription source, and producer target.
Infrastructure-specific guidance for using Eventuous with KurrentDB (EventStoreDB) as the event store, subscription source, and producer target.
<PackageReference Include="Eventuous.KurrentDB" />
<PackageReference Include="Eventuous.Extensions.DependencyInjection" />
The Eventuous.KurrentDB package provides: KurrentDBEventStore, AllStreamSubscription, StreamSubscription, StreamPersistentSubscription, AllPersistentSubscription, and KurrentDBProducer. It depends on KurrentDB.Client.
using Eventuous.KurrentDB; // KurrentDBEventStore
using Eventuous.KurrentDB.Subscriptions; // AllStreamSubscription, StreamSubscription, StreamPersistentSubscription, AllPersistentSubscription, options
using Eventuous.KurrentDB.Producers; // KurrentDBProducer, KurrentDBProduceOptions
Register the KurrentDB gRPC client using AddKurrentDBClient (provided by the KurrentDB.Client package):
services.AddKurrentDBClient("kurrentdb://localhost:2113?tls=false");
Or from configuration:
services.AddKurrentDBClient(configuration["KurrentDB:ConnectionString"]!);
This registers KurrentDBClient as a singleton in the DI container. All Eventuous KurrentDB types (KurrentDBEventStore, subscriptions, producer) resolve this client automatically.
Register KurrentDBEventStore as IEventStore, IEventReader, and IEventWriter:
services.AddKurrentDBClient("kurrentdb://localhost:2113?tls=false");
services.AddEventStore<KurrentDBEventStore>();
KurrentDBEventStore implements IEventStore (which combines IEventReader and IEventWriter). AddEventStore<T>() registers all three interfaces, with tracing wrappers when diagnostics are enabled.
The legacy class EsdbEventStore is obsolete -- use KurrentDBEventStore instead.
Subscribes to the $all global stream. Requires a checkpoint store. Use this for cross-aggregate projections and integrations.
services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
.WithPartitioningByStream(2)
);
AllStreamSubscriptionOptions properties:
| Property | Type | Default | Description |
|---|---|---|---|
EventFilter | IEventFilter? | null | Server-side event filter (defaults to excluding system events) |
CheckpointInterval | uint | 10 | How often to persist checkpoint when filtering skips events |
ConcurrencyLimit | int | 1 | Number of concurrent message consumers |
ResolveLinkTos | bool | false | Resolve link events to their targets |
Credentials | UserCredentials? | null | Optional user credentials |
Inherited from SubscriptionWithCheckpointOptions:
| Property | Type | Default | Description |
|---|---|---|---|
CheckpointCommitBatchSize | int | 100 | Commit checkpoint after this many events |
CheckpointCommitDelayMs | int | 5000 | Commit checkpoint after this delay (ms) |
StartFrom | InitialPosition | Earliest | Where to start if no checkpoint exists (Earliest or Latest) |
Subscribes to a single named stream. Requires a checkpoint store.
services.AddSubscription<StreamSubscription, StreamSubscriptionOptions>(
"MyStreamSub",
builder => builder
.Configure(o => o.StreamName = "MyStream-123")
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<MyStreamHandler>()
);
StreamSubscriptionOptions adds:
| Property | Type | Default | Description |
|---|---|---|---|
StreamName | StreamName | (required) | The stream to subscribe to |
IgnoreSystemEvents | bool | true | Skip events whose type starts with $ |
Plus all properties from AllStreamSubscriptionOptions above (except EventFilter and CheckpointInterval).
Server-managed persistent subscription for a specific stream. KurrentDB manages the checkpoint -- no checkpoint store needed. The subscription is auto-created if it does not exist.
services.AddSubscription<StreamPersistentSubscription, StreamPersistentSubscriptionOptions>(
"PaymentIntegration",
builder => builder
.Configure(x => x.StreamName = "PaymentEvents")
.AddEventHandler<PaymentsIntegrationHandler>()
);
StreamPersistentSubscriptionOptions properties:
| Property | Type | Default | Description |
|---|---|---|---|
StreamName | StreamName | (required) | The stream to subscribe to |
SubscriptionSettings | PersistentSubscriptionSettings? | null | Native KurrentDB persistent subscription settings |
BufferSize | int | 10 | Subscription buffer size |
Deadline | TimeSpan? | null | gRPC call deadline |
FailureHandler | HandleEventProcessingFailure? | null | Custom failure handling (default: retry then NACK) |
ResolveLinkTos | bool | false | Resolve link events |
Credentials | UserCredentials? | null | Optional user credentials |
Server-managed persistent subscription for the $all stream.
services.AddSubscription<AllPersistentSubscription, AllPersistentSubscriptionOptions>(
"AllPersistent",
builder => builder
.Configure(x => x.EventFilter = EventTypeFilter.ExcludeSystemEvents())
.AddEventHandler<MyHandler>()
);
AllPersistentSubscriptionOptions adds:
| Property | Type | Default | Description |
|---|---|---|---|
EventFilter | IEventFilter? | null | Server-side event filter (set at subscription creation time, not updated afterward) |
Plus all properties from PersistentSubscriptionOptions above.
KurrentDBProducer appends events to KurrentDB streams. Register it with AddProducer:
services.AddProducer<KurrentDBProducer>();
Use the producer to publish events:
await producer.Produce("target-stream", message, cancellationToken: ct);
KurrentDBProduceOptions controls produce behavior:
| Property | Type | Default | Description |
|---|---|---|---|
ExpectedState | StreamState | Any | Expected stream state for optimistic concurrency |
MaxAppendEventsCount | int | 500 | Max events per batch append |
Deadline | TimeSpan? | null | Timeout for the produce operation |
Credentials | UserCredentials? | null | Optional user credentials |
KurrentDB itself does not provide a checkpoint store. For catch-up subscriptions (AllStreamSubscription, StreamSubscription), you need an external checkpoint store. Common choices:
MongoCheckpointStore from Eventuous.Projections.MongoDBPostgresCheckpointStore from Eventuous.PostgresqlSqlServerCheckpointStore from Eventuous.SqlServerRedisCheckpointStore from Eventuous.RedisRegister globally or per-subscription:
// Global default
services.AddCheckpointStore<MongoCheckpointStore>();
// Per-subscription override
services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"MySub",
builder => builder
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<MyHandler>()
);
Persistent subscriptions (StreamPersistentSubscription, AllPersistentSubscription) do not need a checkpoint store -- KurrentDB manages the position server-side.
Use AddGateway to route events from a subscription through a producer to another stream or system:
services.AddCheckpointStore<MongoCheckpointStore>();
services.AddProducer<KurrentDBProducer>();
services.AddGateway<AllStreamSubscription, AllStreamSubscriptionOptions, KurrentDBProducer, KurrentDBProduceOptions>(
"IntegrationSubscription",
PaymentsGateway.Transform
);
using Eventuous.KurrentDB;
using Eventuous.KurrentDB.Subscriptions;
using Eventuous.KurrentDB.Producers;
using Eventuous.Projections.MongoDB;
public static class EventuousRegistrations {
public static void AddEventuous(this IServiceCollection services, IConfiguration configuration) {
// 1. KurrentDB client
services.AddKurrentDBClient(configuration["KurrentDB:ConnectionString"]!);
// 2. Event store (IEventStore, IEventReader, IEventWriter)
services.AddEventStore<KurrentDBEventStore>();
// 3. Command service
services.AddCommandService<BookingsCommandService, BookingState>();
// 4. All-stream catch-up subscription with MongoDB checkpoint store
services.AddSubscription<AllStreamSubscription, AllStreamSubscriptionOptions>(
"BookingsProjections",
builder => builder
.UseCheckpointStore<MongoCheckpointStore>()
.AddEventHandler<BookingStateProjection>()
.AddEventHandler<MyBookingsProjection>()
.WithPartitioningByStream(2)
);
// 5. Persistent subscription (no checkpoint store needed)
services.AddSubscription<StreamPersistentSubscription, StreamPersistentSubscriptionOptions>(
"PaymentIntegration",
builder => builder
.Configure(x => x.StreamName = PaymentsIntegrationHandler.Stream)
.AddEventHandler<PaymentsIntegrationHandler>()
);
// 6. Producer for publishing to KurrentDB streams
services.AddProducer<KurrentDBProducer>();
}
}
src/KurrentDB/src/Eventuous.KurrentDB/KurrentDBEventStore.cssrc/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/src/KurrentDB/src/Eventuous.KurrentDB/Subscriptions/Options/src/KurrentDB/src/Eventuous.KurrentDB/Producers/KurrentDBProducer.cssrc/KurrentDB/src/Eventuous.KurrentDB/Producers/KurrentDBProduceOptions.cssrc/Extensions/src/Eventuous.Extensions.DependencyInjection/Registrations/Stores.cssamples/kurrentdb/Bookings/Registrations.csnpx claudepluginhub eventuous/eventuous-plugin --plugin eventuousDesigns event store schemas and selects technologies for event-sourced systems. Covers PostgreSQL, EventStoreDB, Kafka, and DynamoDB patterns.
Designs and implements event stores for event-sourced systems, covering architecture, requirements, tech comparisons (EventStoreDB, Postgres, Kafka), and scaling strategies.
Designs event stores for event-sourced systems, covering architecture, technology comparison (EventStoreDB, PostgreSQL, Kafka, DynamoDB, Marten), and event persistence patterns.