Messaging
The messaging packages provide a transport-agnostic abstraction for asynchronous communication between services, with first-class support for RabbitMQ.
Package Responsibilities
| Package | Role |
|---|---|
Vulthil.Messaging.Abstractions |
Consumer and publisher interfaces – reference this from domain/application projects |
Vulthil.Messaging |
Queue registration, consumer wiring, hosted orchestration, and the transport-author SDK (Vulthil.Messaging.Transport) |
Vulthil.Messaging.RabbitMq |
RabbitMQ transport implementation |
Vulthil.Messaging.TestHarness |
In-memory transport for integration tests |
The Vulthil.Messaging.Transport namespace is a build-your-own-transport SDK — see
Writing a Custom Transport.
Defining Consumers
One-way consumer
public sealed class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
public Task ConsumeAsync(
IMessageContext<OrderCreatedEvent> messageContext,
CancellationToken cancellationToken = default)
{
var order = messageContext.Message;
// Process the event
return Task.CompletedTask;
}
}
Request/reply consumer
public sealed class GetOrderConsumer : IRequestConsumer<GetOrderRequest, OrderDto>
{
public Task<OrderDto> ConsumeAsync(
IMessageContext<GetOrderRequest> messageContext,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new OrderDto());
}
}
The request consumer keeps its strongly-typed Task<TResponse> contract — the requester
on the other side will receive a typed Result<TResponse>.
Registering Queues and Consumers
Registration happens in the composition root using the AddMessaging builder. Queue
definitions and message configurations are first loaded eagerly from IConfiguration,
then merged with whatever code-side calls add; code wins on conflict.
builder.AddMessaging(messaging =>
{
messaging.UseRabbitMq();
messaging.ConfigureQueue("order-events", queue =>
{
queue.AddConsumer<OrderCreatedConsumer>();
queue.UseRetry(retry =>
{
retry.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30));
retry.UseJitter(0.2);
});
queue.UseDeadLetterQueue();
});
messaging.ConfigureQueue("order-requests", queue =>
{
queue.AddRequestConsumer<GetOrderConsumer>();
});
});
Publishing Messages
Inject IPublisher to send one-way messages, or IRequester for request/reply:
public sealed class PlaceOrderHandler(IPublisher publisher)
{
public async Task HandleAsync(PlaceOrderCommand command, CancellationToken ct)
{
// ... create order ...
await publisher.PublishAsync(new OrderCreatedEvent(order.Id), cancellationToken: ct);
}
}
Delivery guarantees. Publishing uses RabbitMQ publisher confirms: the call awaits the broker's acknowledgement and throws if the message is nacked, so a publish the broker never accepted does not report success.
Publish(pub/sub over a fanout/topic exchange) is not mandatory — zero subscribers is normal — whereasSend(point-to-point) is mandatory, so a missing destination queue surfaces as a failure rather than being silently dropped.
Publishing from inside a consumer
IMessageContext exposes PublishAsync directly, so consumers can emit follow-up
messages without injecting IPublisher. Correlation metadata
(CorrelationId, ConversationId, InitiatorId) is automatically propagated from
the incoming message to the outgoing one. The optional configure callback runs
after auto-propagation, so explicit values override the inherited ones.
public sealed class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
public async Task ConsumeAsync(
IMessageContext<OrderCreatedEvent> ctx,
CancellationToken cancellationToken = default)
{
// Inherits CorrelationId/ConversationId/InitiatorId from ctx
await ctx.PublishAsync(new InventoryReserveRequested(ctx.Message.OrderId));
// Or override specific fields explicitly
await ctx.PublishAsync(new ShippingScheduled(ctx.Message.OrderId), c =>
{
c.SetCorrelationId("new-correlation");
c.AddHeader("priority", "high");
return ValueTask.CompletedTask;
});
}
}
IMessageContext.CancellationToken exposes the delivery's cancellation token for
handlers that want to observe it alongside the explicit method parameter.
Point-to-point Send
IPublisher.PublishAsync fans a message out via its per-type exchange to any number of
interested consumers. When you need to address a single, named destination — typically
a specific queue on a specific service — use ISendEndpoint instead. Sends route
through the broker's default exchange using the destination queue name as the routing
key; topology for that queue is owned by the receiving service and is not declared by
the sender.
Inject ISendEndpointProvider and resolve an endpoint by Uri:
public sealed class OrderRouter(ISendEndpointProvider sendEndpoints)
{
public async Task RouteAsync(PlaceOrderCommand command, CancellationToken ct)
{
var endpoint = await sendEndpoints.GetSendEndpointAsync(new Uri("queue:order-commands"), ct);
await endpoint.SendAsync(command, ct);
}
}
The default address scheme is queue:<name>. Absolute amqp://, amqps://, and
rabbitmq:// URIs are also recognized — the queue name is taken from the path. Endpoints
are cached per Uri by the provider for the lifetime of the bus.
MessageConfiguration<T>.CorrelationIdFormatter still applies on the send path; the
Exchange and RoutingKeyFormatter settings are intentionally ignored because the
URI is authoritative for the destination.
Sending from inside a consumer
IMessageContext exposes SendAsync directly, with the same auto-propagation of
CorrelationId/ConversationId/InitiatorId as PublishAsync:
public sealed class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
public async Task ConsumeAsync(
IMessageContext<OrderCreatedEvent> ctx,
CancellationToken cancellationToken = default)
{
await ctx.SendAsync(
new Uri("queue:fulfillment-commands"),
new FulfillOrderCommand(ctx.Message.OrderId));
}
}
Consume Filters
Consume filters wrap the consumer invocation, allowing cross-cutting concerns (logging, validation, scoped resource management, telemetry, etc.) to be composed without modifying transport or consumer code. They mirror the ASP.NET Core middleware shape:
public sealed class LoggingConsumeFilter<TMessage> : IConsumeFilter<TMessage>
where TMessage : notnull
{
private readonly ILogger<LoggingConsumeFilter<TMessage>> _logger;
public LoggingConsumeFilter(ILogger<LoggingConsumeFilter<TMessage>> logger)
=> _logger = logger;
public async Task ConsumeAsync(IMessageContext<TMessage> context, ConsumeDelegate<TMessage> next)
{
_logger.LogInformation("Consuming {Type} (correlation={CorrelationId})",
typeof(TMessage).Name, context.CorrelationId);
try
{
await next(context);
_logger.LogInformation("Consumed {Type}", typeof(TMessage).Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "Consume of {Type} failed", typeof(TMessage).Name);
throw;
}
}
}
Registering filters
builder.AddMessaging(messaging =>
{
messaging.UseRabbitMq();
// Open-generic — applies to every message type
messaging.AddOpenConsumeFilter(typeof(LoggingConsumeFilter<>));
// Closed-generic — applies only to OrderCreatedEvent
messaging.AddConsumeFilter<OrderValidationFilter>();
messaging.ConfigureQueue("order-events", queue =>
{
queue.AddConsumer<OrderCreatedConsumer>();
});
});
Filters are resolved per delivery from the same scope as the consumer, so they
may depend on scoped services (e.g.
Short-circuiting
A filter may skip calling next to reject a message:
public sealed class TenantGate<TMessage> : IConsumeFilter<TMessage>
where TMessage : notnull
{
public Task ConsumeAsync(IMessageContext<TMessage> context, ConsumeDelegate<TMessage> next)
{
if (context.Headers.TryGetValue("Tenant", out var t) && t is "blocked")
{
// Don't invoke next — consumer is skipped, delivery is acked normally.
return Task.CompletedTask;
}
return next(context);
}
}
For request/reply consumers, short-circuiting causes the requester to receive a
Result<TResponse> failure (with an explanatory error) instead of timing out.
Built-in filters
AddMessaging registers a default open-generic LoggingConsumeFilter<TMessage> as
the outermost filter in the pipeline. It emits structured Debug logs at consume
entry/exit and a Warning log on uncaught exceptions, with timing information:
dbug: Consuming Acme.Orders.OrderCreatedEvent (messageId=..., correlationId=...)
dbug: Consumed Acme.Orders.OrderCreatedEvent (messageId=...) in 12ms
User-registered filters compose INSIDE the defaults, so the logging filter wraps every other filter and the consumer itself.
Toggle the built-in filter via MessagingOptions.ConsumeFilters:
{
"Messaging": {
"Options": {
"ConsumeFilters": { "EnableLogging": false }
}
}
}
Or in code:
m.ConfigureMessagingOptions(opts => opts.ConsumeFilters.EnableLogging = false);
The filter stays registered in DI; only its behavior is skipped, so it's still resolvable in unit tests if you want to assert against it.
Idempotent receivers (inbox)
Vulthil.Messaging.Inbox ships a consume filter that deduplicates redelivered
messages, giving exactly-once processing on top of at-least-once delivery. It is a
transaction-owning filter: the consumer runs inside an IIdempotencyStore
transaction so the processed-marker and the consumer's writes commit atomically.
Opt a message type in with AddIdempotentInbox<TMessage>(). See the
Inbox Pattern article for the full design.
Ordered Processing (per-aggregate)
Without ordering controls a queue processing messages concurrently does not preserve
order. UsePartitioner<TMessage> restores per-aggregate ordering: deliveries that
share a partition key are processed one at a time and in publish order, while deliveries
with different keys run concurrently.
builder.AddMessaging(m =>
{
// Order OrderUpdated deliveries per OrderId across 16 lanes.
m.UsePartitioner<OrderUpdated>(partitionCount: 16, ctx => ctx.Message.OrderId.ToString());
// Shorthand: omit the selector to key on CorrelationId (the natural key when it
// carries the aggregate id). Equivalent to passing ctx => ctx.CorrelationId.
m.UsePartitioner<OrderUpdated>(16);
m.ConfigureQueue("orders", q => q.AddConsumer<OrderUpdatedConsumer>());
});
Share one Partitioner across several message types to serialize messages correlated
to the same key regardless of their type (e.g. a saga). The selector is optional here too
and defaults to CorrelationId:
var orders = new Partitioner(16);
m.UsePartitioner<OrderUpdated>(orders);
m.UsePartitioner<OrderShipped>(orders);
How it works
Ordering is enforced by the RabbitMQ transport, not a consume filter. When a queue consumes a partitioned message type, its worker:
- Receives deliveries from a single channel in FIFO order (
consumerDispatchConcurrency = 1), so the partition key is read and the delivery assigned to its lane in arrival order. - Hands each delivery to the key's lane and immediately returns, so the next delivery is laned in order while lanes process concurrently (cross-key parallelism).
- Acknowledges each message when its lane finishes (deferred ack).
PrefetchCountbounds the number of in-flight deliveries and therefore the effective parallelism.
Notes:
- For a partitioned queue,
ConcurrencyLimitdoes not drive dispatch (it is forced to ordered single dispatch); tune throughput withPrefetchCountinstead. - A delivery whose selected key is
nullor empty is processed without lane serialization (it still runs off the receive loop, so it does not block ordering of other keys). - The partition count affects only fan-out (how many distinct keys progress at once), never correctness. The lane hash is in-process, so a key's lane need not be stable across processes.
- The partitioner orders deliveries within one process. Ordering across load-balanced consumer instances additionally requires a single active consumer (see Ordering across instances below), which partitioned queues enable automatically.
- Failure path: a partitioned queue retries in-memory automatically — the consumer is re-invoked in-process while the delivery (and its lane) is held — so a failing message cannot be overtaken by a later same-key message. See Retries below.
Ordering across instances (single active consumer)
The partitioner serializes same-key deliveries inside a single process. When the same queue is consumed by several load-balanced instances, the broker round-robins deliveries between them and same-key messages can again be processed concurrently. RabbitMQ's single active consumer closes that gap: the broker keeps exactly one consumer active and promotes a standby consumer only if the active one disconnects, so ordering is preserved and the queue fails over without manual intervention.
Partitioned queues turn this on automatically. Any queue can opt in explicitly:
m.ConfigureQueue("orders", q =>
{
q.UseSingleActiveConsumer();
q.AddConsumer<OrderUpdatedConsumer>();
});
Notes and trade-offs:
- No scale-out for that queue. Only one consumer works at a time, so adding instances buys failover, not throughput. To scale a partitioned workload across instances, shard into multiple queues (one per partition) and bind each instance to its own — a larger change that is out of scope here. This mirrors MassTransit, whose in-process partitioner is likewise single-instance.
- Existing queues. The single-active-consumer flag is a queue argument fixed at declaration.
Enabling it on a queue that already exists fails declaration with
406 PRECONDITION_FAILED; delete and recreate the queue to change it. - At-least-once on failover. When the active consumer dies mid-delivery, unacknowledged messages are redelivered to the promoted consumer, so a handler may observe a message more than once. Make handlers idempotent; broker-level exactly-once delivery is not provided.
Retries
q.UseRetry(...) configures the retry policy for a queue's consumers. There are two
execution modes:
- Delayed re-delivery (default): a failed message is re-published to the queue's retry exchange with a delay and re-delivered later. Good for back-off without holding a consumer, but it reorders relative to other messages.
- In-memory: the consumer is re-invoked in-process while the delivery is held, preserving
order. Opt in with
q.UseRetry(r => { r.Immediate(3); r.InMemory(); }). Partitioned queues use in-memory retry automatically, since ordering requires it.
On exhaustion the message is dead-lettered (when a dead-letter queue is configured) in both modes.
Head-of-line caveat (delayed re-delivery): the delayed retry uses one shared retry queue with a per-message TTL, and RabbitMQ expires messages only from the head of a queue. With variable or jittered back-off intervals, a long-delay message at the head holds back shorter-delayed messages queued behind it. For predictable timing, use a single fixed interval (uniform TTL) or in-memory retry (
r.InMemory()), which holds the delivery in-process and is unaffected.
Faults
When a consumed message fails terminally — every retry exhausted — a Fault<T> is published
by convention to a shared topic exchange (default Fault.Exchange, configurable via
Messaging:Options:FaultExchangeName). No per-message opt-in by the producer is required, so faults
are observable broker-side without changing any producer. The faulted message's URN is the routing
key, so an operator binds a queue to the fault exchange — # for every fault, or a specific URN to
filter by faulted message type — and reads the payload. The fault body is a Fault<T> JSON document
(the AMQP type is Fault<{original-urn}>):
public record Fault<TMessage> where TMessage : notnull
{
public required TMessage Message { get; init; } // the original message body
public required string ExceptionMessage { get; init; }
public required string? StackTrace { get; init; }
public required string ExceptionType { get; init; }
public required DateTimeOffset FaultedAt { get; init; }
public required MessageContextSnapshot OriginalContext { get; init; } // original transport metadata
}
The fault exchange is a diagnostics/observability broadcast — drain it with a monitoring service or
any AMQP consumer bound to the exchange — rather than a typed IConsumer<Fault<T>> endpoint.
A message can override the routing per-message: if it carries an explicit FaultAddress, the fault is
routed point-to-point to that address (through the broker's default exchange) instead of being
broadcast to the fault exchange — exactly one fault is emitted either way. Set it on publish:
await publisher.PublishAsync(new OrderCreatedEvent(orderId), ctx =>
{
ctx.SetFaultAddress(new Uri("queue:order-faults"));
return ValueTask.CompletedTask;
});
Fault publishing is best-effort: a failure to publish the fault is logged and never prevents the original delivery from being settled (nacked for dead-lettering).
Routing Keys
Routing keys flow through two distinct configuration sites, one on each side of the wire:
- Producer side —
MessageConfiguration<TMessage>.UseRoutingKey(selector)controls the key the publisher writes onto each outgoing message. - Consumer side —
q.Subscribe<TMessage>(routingKey)controls the binding pattern the broker uses to filter deliveries for the queue.
The two sites can use the same value (typical for direct exchanges) or different values (e.g. a topic
binding order.* matching producer keys like order.created). When the binding pattern is left null,
the broker receives an empty pattern: fanout/headers exchanges ignore it; direct/topic exchanges only
match an empty published key — supply a specific pattern explicitly when needed.
// Producer side: what the publisher writes on the wire.
messaging.ConfigureMessage<OrderCreatedEvent>(message =>
{
message.UseRoutingKey(e => $"order.{e.Region}");
message.UseCorrelationId(e => e.OrderId.ToString());
});
// Consumer side: how the queue binds. Pattern matches the producer's published key.
messaging.ConfigureQueue("order-projector", q =>
{
q.Subscribe<OrderCreatedEvent>("order.*");
q.AddConsumer<OrderProjector>();
});
Message Configuration
Each message type is associated with a MessageConfiguration that controls the
exchange name, exchange type, durability, routing/correlation formatters used
when publishing, and the stable wire URN. The Exchange defaults to the message
CLR full type name when constructed via MessageConfiguration<TMessage>; the
publisher and bus topology share that same source of truth, so they never get
out of sync.
messaging.ConfigureMessage<OrderCreatedEvent>(m =>
{
m.Exchange = "orders.events"; // override default of typeof().FullName
m.ExchangeType = MessagingExchangeType.Topic;
m.Durable = true;
m.UseRoutingKey(e => $"order.{e.Region}");
});
Wire identity (URN)
Every message type carries a stable wire URN that identifies it on the broker independent of its CLR type name. The default is derived from the CLR type:
urn:message:Acme.Orders:OrderCreatedEvent
Override it via MessageConfiguration<T>.Urn to keep the wire identity stable
across CLR renames — producers and consumers on different versions agree on the
URN even if their C# class names diverge:
messaging.ConfigureMessage<OrderCreatedEvent>(m =>
{
m.Urn = new Uri("urn:message:Acme.Orders:OrderPlaced");
});
The URN is the dispatch key on the receive side — it appears in the message
envelope's messageType field, and MessageExecutionRegistry<THandler> keys its
execution plans by URN.
Message envelope (wire format)
Vulthil-produced messages are wrapped in a JSON envelope with explicit metadata
fields rather than relying on AMQP BasicProperties headers:
{
"messageId": "01931d7e-...",
"correlationId": "a3f1...",
"conversationId": "a3f1...",
"initiatorId": "01931d7d-...",
"sourceAddress": "queue:order-service",
"messageType": "urn:message:Acme.Orders:OrderPlaced",
"message": { "orderId": "abc", "amount": 100 },
"sentTime": "2026-05-27T12:00:00Z",
"headers": { "tenant": "acme" }
}
BasicProperties.MessageId, CorrelationId, and Type (= URN) are still
mirrored into AMQP for broker tooling and trace propagation, but the envelope
is the source of truth.
External producers that emit bare JSON (no envelope) are accepted on the
receive path — the worker probes the body and falls back to using
BasicProperties.Type as the type identity.
Subscriptions and Polymorphism
Topology and dispatch are separated:
- Subscriptions = exchange bindings. A queue is subscribed to a concrete
message type via
q.Subscribe<TConcrete>(); at topology setup time, the queue's binding toTConcrete's exchange is declared. - Consumers = handlers. A consumer is registered via
q.AddConsumer<TConsumer>()whereTConsumerimplementsIConsumer<TMessage>—TMessagemay be a concrete type, an interface, or an abstract base.
When TMessage is concrete, the consumer's message type is auto-subscribed
at build time — preserves the today's ergonomic for the simple case:
m.ConfigureQueue("orders", q => q.AddConsumer<OrderCreatedConsumer>());
// → q is auto-subscribed to OrderCreatedEvent.
When TMessage is polymorphic (an interface or abstract base), the
consumer fires for any concrete delivery whose CLR type is assignable to it —
but the queue must explicitly subscribe to the concrete types it wants to bind:
// OrderProjector : IConsumer<IOrderEvent>
m.ConfigureQueue("order-projector", q =>
{
q.Subscribe<OrderPlaced>(); // bind queue to OrderPlaced exchange
q.Subscribe<OrderCancelled>(); // bind queue to OrderCancelled exchange
q.AddConsumer<OrderProjector>(); // fires on either delivery
});
SubscribeAll<TInterface>(Assembly) discovers concrete implementers in the
supplied assembly and calls Subscribe<TConcrete> for each — abstract types
and interfaces are skipped (only concrete types have exchanges):
m.ConfigureQueue("order-projector", q =>
{
q.SubscribeAll<IOrderEvent>(typeof(OrderPlaced).Assembly);
q.AddConsumer<OrderProjector>();
});
Dispatch is transitive: with a hierarchy OrderPlaced : IOrder : IOrderEvent,
a single OrderPlaced delivery fires consumers registered against the concrete
OrderPlaced, against IOrder (immediate interface), and against
IOrderEvent (transitive interface).
Validation at composition time
After ConfigureQueue returns, a build pass validates the queue's wiring and
throws with a descriptive message if anything is incoherent:
- A consumer with no matching concrete subscription (e.g.
IConsumer<IOrderEvent>but noSubscribe<TConcrete>for any implementer). - A subscription with no matching consumer.
- A request consumer registered against an abstract or interface request type (responses are typed and can't depend on the concrete request type).
These failures surface at app startup, not at the first message delivery.
Request consumers stay non-polymorphic
IRequestConsumer<TRequest, TResponse> requires a concrete TRequest — the
response type is fixed and can't be selected by the incoming concrete type.
Each (queue, message type) pair admits at most one request consumer; a
second one fails fast at composition.
MessageConfiguration instances can also come from configuration — see below.
Configuration-driven Setup
Queue and message settings can be defined entirely in appsettings.json. The
AddMessaging call loads every section under Messaging:Queues:* and
Messaging:Messages:* into the runtime before running the configurator action.
Subsequent ConfigureQueue / ConfigureMessage<T> calls mutate the loaded
instances, with code taking precedence on conflict.
{
"Messaging": {
"Options": {
"DefaultTimeout": "00:00:30",
"FaultExchangeName": "Fault.Exchange"
},
"Queues": {
"order-events": {
"PrefetchCount": 64,
"ChannelCount": 2,
"ConcurrencyLimit": 4,
"IsQuorum": true,
"DefaultRetryPolicy": {
"MaxRetryCount": 3,
"JitterFactor": 0.2,
"Intervals": [ "00:00:01", "00:00:05", "00:00:30" ]
}
}
},
"Messages": {
"Acme.Orders.OrderCreatedEvent": {
"Exchange": "orders.events",
"ExchangeType": "Topic",
"Durable": true
}
}
}
}
Config-only setup
A service can declare its topology purely in appsettings.json and skip the code
side entirely — useful for publisher-only services or environments where queue
shape is owned by ops:
builder.AddMessaging(m => m.UseRabbitMq());
Code-only override
Code values always win over configuration values:
builder.AddMessaging(m =>
{
m.UseRabbitMq();
m.ConfigureQueue("order-events", q =>
{
q.ConfigureQueue(d => d.PrefetchCount = 128); // overrides appsettings value
q.AddConsumer<OrderCreatedConsumer>();
});
});
Merged
The common case — topology from config, consumer wiring from code:
{ "Messaging": { "Queues": { "order-events": { "PrefetchCount": 64 } } } }
m.ConfigureQueue("order-events", q => q.AddConsumer<OrderCreatedConsumer>());
// PrefetchCount=64 (from config) + OrderCreatedConsumer registration (from code)
Observability
The RabbitMQ transport emits an ActivitySource named "Vulthil.Messaging.RabbitMq"
with Producer/Consumer spans for publish, request, and receive operations. Tag
conventions follow the OpenTelemetry messaging semantic conventions, with a few
Vulthil-specific tags (vulthil.messaging.message_type, .consumer_type,
.retry_count, .queue).
UseRabbitMq registers the source with the application's TracerProvider
automatically, gated on the Aspire client's DisableTracing setting — so disabling
RabbitMQ tracing in Aspire suppresses the Vulthil spans too. If you bring your own
TracerProvider configuration, you can register the source manually:
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing.AddVulthilMessagingInstrumentation());
W3C trace context (traceparent / tracestate) propagation is handled by
RabbitMQ.Client itself, so producer-side activities link to consumer-side
activities on the receiving service without any extra setup.
Health Checks
UseRabbitMq also registers a startup health check named
"vulthil_messaging_rabbitmq_bus" (tagged ready, messaging, rabbitmq). It
reports:
Unhealthy("starting")untilRabbitMqBus.StartAsynccompletes (topology declaration + consumer registration finished).Healthy("started")after a successful startup.Unhealthy(...)with the original exception if startup fails.
Registration is gated on the Aspire client's DisableHealthChecks setting; set
that to true to suppress the health check alongside Aspire's connection-level
health check.
Request/Reply
IRequester is registered automatically by UseRabbitMq and returns a typed
Result<TResponse>:
public sealed class OrderLookupService(IRequester requester)
{
public Task<Result<OrderDto>> GetAsync(Guid orderId, CancellationToken ct)
=> requester.RequestAsync<GetOrderRequest, OrderDto>(
new GetOrderRequest(orderId), cancellationToken: ct);
}
The reply queue is created lazily on the first request, so producer-only services
that never call RequestAsync do not declare any reply infrastructure.
Configuring the request
RequestAsync accepts an optional Func<IRequestContext, Task> to configure the
outgoing request. IRequestContext extends IPublishContext — so the routing key,
correlation id, and headers can be set just like a publish — and adds
SetTimeout(TimeSpan) for overriding the response timeout on a per-request basis:
var result = await requester.RequestAsync<GetOrderRequest, OrderDto>(
new GetOrderRequest(orderId),
ctx =>
{
ctx.SetTimeout(TimeSpan.FromSeconds(5));
ctx.AddHeader("priority", "high");
return Task.CompletedTask;
},
cancellationToken: ct);
When no timeout is set on the context, the request falls back to
Messaging:Options:DefaultTimeout (see
Configuration-driven Setup). A request that exceeds
its timeout completes with a Result<TResponse> failure carrying the
Messaging.Request.Timeout error code rather than throwing.
Reply wire format & correlation
Each request carries a dedicated request id (a fresh GUID per call) in the AMQP
CorrelationId property and the envelope's requestId field. The reply echoes it, and the
requester correlates the reply to the awaiting call by this id — independently of the business
CorrelationId (set via UseCorrelationId or SetCorrelationId), which is therefore free to
repeat across concurrent requests without colliding.
The reply is a normal MessageEnvelope (single-serialized, like every other message):
- Success carries the
TResponsepayload at the response type's URN. - Failure carries an RPC fault at
urn:message:Vulthil:RpcFault(the remote exception's type and message); the requester maps it to aResult<TResponse>failure with theMessaging.Request.Failureerror code.
Writing a Custom Transport
Vulthil.Messaging is also a build-your-own-transport SDK. The transport-agnostic
primitives live in the Vulthil.Messaging.Transport namespace, so a transport for a
broker other than RabbitMQ can be written in a separate package against the public surface
alone — the RabbitMQ transport uses nothing more.
A transport is the glue between the broker and these primitives:
| Concern | Primitive |
|---|---|
| Lifetime | ITransport.StartAsync — declare topology, then start consuming |
| Execution plans | MessageExecutionRegistry<THandler> + your IMessageHandlerFactory<THandler> |
| Wire format | MessageEnvelope + MessageEnvelopeFactory.Create |
| Receive context | MessageContext.CreateFromEnvelope |
| Filter pipeline | ConsumePipelineFactory.Build |
| RPC failures | RpcFault |
1. Build execution plans
Choose a THandler type for your transport's dispatch closure, then implement
IMessageHandlerFactory<THandler> to turn each registration into one. The factory is where the
message type is statically known, so it is also where you compose the filter pipeline and build
the receive context:
public delegate Task Dispatch(IServiceProvider scope, object message, MessageEnvelope envelope, CancellationToken ct);
internal sealed class MyHandlerFactory : IMessageHandlerFactory<Dispatch>
{
public HandlerEntry<Dispatch> ForConsumer(Type consumer, Type message, RetryPolicyDefinition? retry)
=> new(BuildConsumer(consumer, message), HandlerKind.Consumer);
public HandlerEntry<Dispatch> ForRequestConsumer(Type consumer, Type request, Type response, RetryPolicyDefinition? retry)
=> new(BuildRequestConsumer(consumer, request, response), HandlerKind.RequestConsumer);
// Bound generically (e.g. via reflection) so TMessage is known here:
private static Dispatch Consumer<TConsumer, TMessage>() where TConsumer : class, IConsumer<TMessage> where TMessage : notnull
=> async (scope, message, envelope, ct) =>
{
var consumer = scope.GetRequiredService<TConsumer>();
var context = MessageContext.CreateFromEnvelope(
(TMessage)message, envelope, routingKey: "", redelivered: false,
retryCount: 0, replyToFallback: null,
scope.GetRequiredService<IPublisher>(), scope.GetRequiredService<ISendEndpointProvider>(), ct);
var pipeline = ConsumePipelineFactory.Build<TMessage>(scope, c => consumer.ConsumeAsync(c, c.CancellationToken));
await pipeline(context);
};
}
Let MessageExecutionRegistry<THandler> assemble the per-message-type plans from the configured
queues — it handles URN keying, polymorphic fan-out, deduplication, request-consumer uniqueness,
and partition attachment:
var registry = new MessageExecutionRegistry<Dispatch>(provider, new MyHandlerFactory());
foreach (var queue in provider.QueueDefinitions)
{
registry.RegisterQueue(queue);
}
2. Produce
Wrap each outgoing message in a MessageEnvelope. MessageEnvelopeFactory.Create promotes the
publish context's metadata to typed envelope fields and serializes the payload:
var envelope = MessageEnvelopeFactory.Create(
message, publishContext, messageId, correlationId, urn, provider.JsonSerializerOptions);
var body = JsonSerializer.SerializeToUtf8Bytes(envelope, provider.JsonSerializerOptions);
PublishContext/RequestContext implement the IPublishContext/IRequestContext the caller's
configure callback writes to; read their resolved properties (RoutingKey, CorrelationId,
Headers, …) when building the broker message.
3. Consume
In the receive loop, parse the envelope, resolve the plan by URN, deserialize the payload, then run the plan's handlers:
var envelope = JsonSerializer.Deserialize<MessageEnvelope>(body, provider.JsonSerializerOptions)!;
var plan = registry.GetPlanByUrn(envelope.MessageType);
if (plan is null) { return; } // unknown type — drop or dead-letter
var message = envelope.Message.Deserialize(plan.MessageType.Type, provider.JsonSerializerOptions)!;
await using var scope = scopeFactory.CreateAsyncScope();
foreach (var dispatch in plan.Handlers)
{
await dispatch(scope.ServiceProvider, message, envelope, ct);
}
When plan.IsPartitioned, serialize same-key deliveries through plan.Partition so per-key order
is preserved (the RabbitMQ transport lanes deliveries through a Partitioner). The
MessageEnvelope also carries metadata for the bare-JSON fallback — resolve unknown types via
provider.GetMessageType(urn) / registry.GetPlan(typeName).
4. RPC replies
A request consumer replies with a MessageEnvelope: the TResponse payload at the response
type's URN on success, or an RpcFault at RpcFault.UrnUri on failure. Keeping the envelope and
RpcFault shapes identical across transports means Vulthil clients interoperate without a
transport-specific reply contract:
var fault = new RpcFault
{
Message = ex.Message,
ExceptionType = ex.GetType().FullName!,
StackTrace = ex.StackTrace,
FaultedAt = DateTimeOffset.UtcNow,
};
var reply = new MessageEnvelope
{
MessageType = RpcFault.UrnUri,
Message = JsonSerializer.SerializeToElement(fault, provider.JsonSerializerOptions),
RequestId = request.RequestId,
};
Testing Messaging
Vulthil.Messaging.TestHarness provides an in-memory transport that runs your consumers with no broker and
captures produced and consumed messages for assertion. Compose it with UseTestHarness() (in place of a broker
transport) or swap an existing transport in an integration test with ReplaceTransportWithTestHarness():
builder.AddMessaging(messaging =>
{
messaging.ConfigureQueue("orders", q => q.AddConsumer<OrderCreatedConsumer>());
messaging.UseTestHarness();
});
// ...after building the host and publishing:
var harness = host.Services.GetRequiredService<ITestHarness>();
harness.Published<OrderCreatedEvent>().ShouldHaveSingleItem().Message.OrderId.ShouldBe(expectedOrderId);
harness.Consumed<OrderCreatedEvent>().ShouldHaveSingleItem();
The harness is built entirely on the Vulthil.Messaging.Transport SDK above, so it is also a worked example of a
custom transport. See Testing for the full API (Published/Sent/
Consumed/Requested, the Respond/Handle response stubs, and the integration-test swap).