Messaging
The messaging packages provide a transport-agnostic abstraction for asynchronous communication between services, with first-class support for RabbitMQ.
Package Responsibilities
| Package | Role |
|---|---|
Vulthil.Messaging.Abstractions |
Consumer and publisher interfaces – reference this from domain/application projects |
Vulthil.Messaging |
Queue registration, consumer wiring, and hosted service orchestration |
Vulthil.Messaging.RabbitMq |
RabbitMQ transport implementation |
Vulthil.Messaging.TestHarness |
In-memory transport for integration tests |
Defining Consumers
One-way consumer
public sealed class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
public Task ConsumeAsync(
IMessageContext<OrderCreatedEvent> messageContext,
CancellationToken cancellationToken)
{
var order = messageContext.Message;
// Process the event
return Task.CompletedTask;
}
}
Request/reply consumer
public sealed class GetOrderConsumer : IRequestConsumer<GetOrderRequest, OrderDto>
{
public Task<OrderDto> ConsumeAsync(
IMessageContext<GetOrderRequest> messageContext,
CancellationToken cancellationToken)
{
return Task.FromResult(new OrderDto());
}
}
Registering Queues and Consumers
Registration happens in the composition root using the AddMessaging builder:
builder.AddMessaging(messaging =>
{
messaging.UseRabbitMq();
messaging.AddQueue("order-events", queue =>
{
queue.AddConsumer<OrderCreatedConsumer>();
queue.UseRetry(retry =>
{
retry.Exponential(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30));
retry.UseJitter(0.2);
});
queue.UseDeadLetterQueue();
});
messaging.AddQueue("order-requests", queue =>
{
queue.AddRequestConsumer<GetOrderConsumer>();
});
});
Publishing Messages
Inject IPublisher to send one-way messages, or IRequester for request/reply:
public sealed class PlaceOrderHandler(IPublisher publisher)
{
public async Task HandleAsync(PlaceOrderCommand command, CancellationToken ct)
{
// ... create order ...
await publisher.PublishAsync(new OrderCreatedEvent(order.Id), ct);
}
}
Routing Keys
Routing keys control which consumers receive a message on topic exchanges.
Attribute-based routing
[RoutingKey("order.created")]
public sealed class OrderCreatedConsumer : IConsumer<OrderCreatedEvent> { ... }
Dynamic routing keys
messaging.RegisterRoutingKeyFormatter<OrderCreatedEvent>(e => $"order.{e.Region}");
messaging.RegisterCorrelationIdFormatter<OrderCreatedEvent>(e => e.OrderId.ToString());
Queue Configuration
Queue settings can be tuned in code or bound from appsettings.json:
{
"Messaging": {
"Queues": {
"order-events": {
"PrefetchCount": 64,
"ChannelCount": 2,
"ConcurrencyLimit": 4
}
}
}
}
queue.ConfigureQueue(q =>
{
q.PrefetchCount = 32;
q.ExchangeType = MessagingExchangeType.Topic;
});
Testing Messaging
Vulthil.Messaging.TestHarness provides an in-memory transport that captures published messages for assertion:
var published = testHarness.Published<OrderCreatedEvent>();
Assert.Single(published);
Assert.Equal(expectedOrderId, published.First().Message.OrderId);
See Testing for more details on integration test setup.