Skip to content
fullstackhero

Reference

Eventing building block

Event bus implementation (InMemory or RabbitMQ), outbox/inbox stores for durable delivery, and the open-generic handler registration that lets modules subscribe with one method.

views 0 Last updated

The Eventing block is the runtime side of the kit’s integration-event story. It ships two IEventBus implementations — InMemoryEventBus (synchronous, in-process) and RabbitMqEventBus (durable, cross-service) — an EF Core-backed outbox store for transactional delivery, an inbox store for receiver-side idempotency, a JsonEventSerializer, and the auto-discovery hook that registers every IIntegrationEventHandler<TEvent> in your module assemblies.

What it ships

Extensions

  • AddEventingCore(services, configuration) — registers JsonEventSerializer, a no-op IEventTenantScope default (the Multitenancy module swaps in a Finbuckle-backed one), and picks the IEventBus from EventingOptions:Provider (InMemory default, RabbitMQ). Adds the outbox dispatcher hosted service when UseHostedServiceDispatcher is true.
  • AddEventingForDbContext<TDbContext>(services) — registers EfCoreOutboxStore<TDbContext> + EfCoreInboxStore<TDbContext> (scoped) plus the OutboxDispatcher scoped service.
  • AddIntegrationEventHandlers(services, assemblies[]) — scans the supplied assemblies for IIntegrationEventHandler<TEvent> implementations and registers them scoped in DI.

Event bus implementations

  • InMemoryEventBus — in-process. For each event it sets the tenant context first (IEventTenantScope.Begin(event.TenantId)), creates a fresh DI scope, resolves the matching handlers, checks the inbox (skip if already processed by that handler), awaits each handler in order, then marks the inbox row. Great for dev/test and single-process production hosts.
  • RabbitMqEventBus — durable, cross-service; uses RabbitMQ.Client and publishes to a durable topic exchange (EventingOptions:RabbitMQ).

Outbox / inbox

  • IOutboxStore (AddAsync, GetPendingBatchAsync, MarkAsProcessedAsync, MarkAsFailedAsync) + EfCoreOutboxStore<TDbContext> — serialize integration events into an OutboxMessages table in your module’s DbContext. AddAsync rides the same DbContext as your business write, so inside an open transaction the event commits atomically with it.
  • IInboxStore + EfCoreInboxStore<TDbContext> — dedupe table keyed by (event id, handler name), so each handler processes an event at most once.
  • OutboxDispatcher — scoped service; reads pending rows in batches (OutboxBatchSize, default 100), deserializes, publishes via IEventBus, marks processed. A failing row increments RetryCount; after OutboxMaxRetries (default 5) it’s flagged IsDead and skipped thereafter.
  • OutboxDispatcherHostedService — background loop calling the dispatcher every OutboxDispatchIntervalSeconds (default 10).
  • OutboxMessageId, CreatedOnUtc, Type, Payload, TenantId, CorrelationId, ProcessedOnUtc, RetryCount, LastError, IsDead. Implements IGlobalEntity (background processors must scan across tenants).
  • InboxMessageId + HandlerName (composite key), EventType, ProcessedOnUtc, TenantId. Also IGlobalEntity.

Serializer

  • JsonEventSerializer — System.Text.Json; the outbox stores the event’s type name alongside the payload so the dispatcher can rehydrate the concrete event for publishing.

How modules consume Eventing

Register against your DbContext during module startup:

public void ConfigureServices(IHostApplicationBuilder builder)
{
builder.Services.AddHeroDbContext<TicketsDbContext>();
builder.Services.AddEventingForDbContext<TicketsDbContext>();
}

There are two publish paths, and the kit uses both:

Durable (outbox) — write the event to IOutboxStore next to your business change; the dispatcher publishes it later. This is what Identity does for UserRegisteredIntegrationEvent:

public async ValueTask<Unit> Handle(ResolveTicketCommand cmd, CancellationToken ct)
{
var ticket = await _db.Tickets.FindAsync([cmd.TicketId], ct).ConfigureAwait(false);
ticket!.Resolve(cmd.ResolutionNote);
await _db.SaveChangesAsync(ct).ConfigureAwait(false);
await _outbox.AddAsync(new TicketResolvedIntegrationEvent(/* … */), ct).ConfigureAwait(false);
return Unit.Value;
}

Immediate — call IEventBus.PublishAsync directly and handlers run right away (in-process with the InMemory bus). Chat’s mention events and the tenant-lifecycle events go this way; you trade durability for latency.

The OutboxDispatcher picks up pending rows on the next interval (or you can call DispatchAsync from a Hangfire job if you don’t want a hosted-service loop). The receiving side declares a handler:

public sealed class TicketResolvedNotifyHandler(/* … */)
: IIntegrationEventHandler<TicketResolvedIntegrationEvent>
{
public async Task HandleAsync(TicketResolvedIntegrationEvent evt, CancellationToken ct = default)
{
// write a notification, send an email, etc.
}
}

Host registers handlers in bulk via AddIntegrationEventHandlers against all module marker assemblies:

builder.Services.AddIntegrationEventHandlers(moduleAssemblies);

Configuration

{
"EventingOptions": {
"Provider": "InMemory", // or "RabbitMQ"
"OutboxBatchSize": 100,
"OutboxMaxRetries": 5,
"EnableInbox": true,
"OutboxDispatchIntervalSeconds": 10,
"UseHostedServiceDispatcher": true,
"RabbitMQ": {
"Host": "rabbitmq",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"ExchangeName": "fsh.events",
"QueuePrefix": "fsh",
"UseSsl": false,
"PublishRetryCount": 3,
"PublishRetryDelayMs": 1000
}
}
}

Set UseHostedServiceDispatcher = false when you’d rather drive the dispatcher from Hangfire on a fixed schedule (more deterministic for some ops setups).

How to extend

Add another transport

Implement IEventBus; register your implementation in place of InMemoryEventBus / RabbitMqEventBus. Bus consumers don’t care which one is wired.

Add a side-channel like Outbox-to-Kafka

OutboxDispatcher is small and replaceable. Subclass or wrap it to publish to Kafka in addition to RabbitMQ, or to write to multiple destinations.

Skip the outbox for cheap fire-and-forget

The outbox is opt-in: IEventBus.PublishAsync already goes straight to the bus. If an event doesn’t need transactional durability (a cache invalidation hint, a metrics ping), just publish it directly and skip IOutboxStore — that’s exactly what Chat and the tenant-lifecycle events do.

Gotchas

  • Domain events vs integration events are different things in the kit. Domain events fire inside the SaveChanges interceptor (synchronous, in-module, via Mediator). Integration events go through the outbox and IEventBus (asynchronous, cross-module / cross-service). Use the right one — domain events for invariants and bookkeeping inside the module, integration events for everything else.
  • Outbox dispatcher is scoped per cycle. Each poll gets a fresh DbContext scope; a failing publish increments the row’s RetryCount (with LastError recorded) and is retried on the next interval. At OutboxMaxRetries the row is flagged IsDead and skipped from then on — monitor/clean dead rows, they don’t retry themselves.
  • Inbox dedupes per handler, by (EventId, HandlerName). A redelivered event is skipped only for handlers that already completed it; if you mint two events with the same Id, the second is silently dropped for every handler. Always generate a fresh Guid per event.
  • Background publishers need the tenant context set. InMemoryEventBus does this for you via IEventTenantScope (reading event.TenantId) before resolving handlers — a MultiTenantDbContext captures its tenant at construction, so setting it later is too late. If you build your own bus or dispatch path, preserve this ordering or tenant-filtered handlers NRE.
  • RabbitMQ publishing retries in-process (PublishRetryCount / PublishRetryDelayMs), but consumers own their durability via inbox + retry semantics.

Critical files

  • src/BuildingBlocks/Eventing/ServiceCollectionExtensions.cs
  • src/BuildingBlocks/Eventing/InMemory/InMemoryEventBus.cs
  • src/BuildingBlocks/Eventing/RabbitMq/RabbitMqEventBus.cs
  • src/BuildingBlocks/Eventing/Outbox/OutboxDispatcher.cs
  • src/BuildingBlocks/Eventing/Inbox/EfCoreInboxStore.cs