Appearance
MassTransit 与 RabbitMQ 集成
概述
MassTransit 是一个开源的 .NET 消息传递框架,提供了简洁的 API 用于构建分布式应用程序。它支持多种消息传输协议,包括 RabbitMQ。MassTransit 提供了可靠的消息传递、分布式事务支持 sagas 模式等功能,是 .NET 生态中非常流行的消息框架。
本教程将详细介绍 MassTransit 与 RabbitMQ 的集成方式,包括配置、消息发送/接收、Saga 模式等,帮助 PHP 开发者理解如何与基于 MassTransit 的 .NET 服务进行交互。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ MassTransit Application (.NET) │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ MassTransit Runtime │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Producer │ │ Consumer │ │ Saga │ │ │
│ │ │ │ │ │ │ Engine │ │ │
│ │ │ Publish() │ │ Consume() │ │ │ │ │
│ │ │ Send() │ │ Handle() │ │ Orchestrate│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ RabbitMQ Transport │ │ │
│ │ │ • Connection Management │ │ │
│ │ │ • Message Serialization │ │ │
│ │ │ • Topology Management │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ RabbitMQ Broker │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Exchanges │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ (MassTransit) │ │ (MassTransit) │ │ (Error) │ │ │
│ │ │ __MT_.* │ │ __MT_.* │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Queues │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Consumer │ │ Error │ │ Audit │ │ │
│ │ │ Queues │ │ Queues │ │ Queues │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
▲
│
┌─────────────────────────────────────────────────────────────────────┐
│ PHP Application │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ PHP Consumer │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ RabbitMQ │◄───│ Message │◄───│ Business │ │ │
│ │ │ Client │ │ Parser │ │ Logic │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘核心概念
| 概念 | 说明 |
|---|---|
| Message | 消息,MassTransit 中消息分为 Command(命令)和 Event(事件) |
| Consumer | 消费者,处理消息的服务类 |
| Producer | 生产者,发送消息的客户端 |
| Saga | 编排器,协调长时间运行的工作流 |
| ReceiveEndpoint | 接收端点,绑定消费者到队列 |
| Message Contract | 消息契约,消息的数据结构定义 |
MassTransit 配置示例
.NET 项目配置
csharp
// .csproj
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="8.1.3" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>
</Project>appsettings.json 配置
json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"MassTransit": "Debug"
}
},
"MassTransit": {
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"Username": "guest",
"Password": "guest",
"PrefetchCount": 16,
"ConcurrentConsumerLimit": 8
}
}MassTransit 启动配置
csharp
// Program.cs
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.PrefetchCount = 16;
cfg.ConcurrentMessageLimit = 8;
cfg.ConfigureEndpoints(context);
});
});
builder.Services.AddHostedService<Worker>();
var app = builder.Build();
app.Run();MassTransit 消费者配置
csharp
// OrderConsumer.cs
using MassTransit;
using Contracts;
public class OrderConsumer : IConsumer<OrderCreated>
{
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var order = context.Message;
Console.WriteLine($"Received Order: {order.OrderId}");
// 处理订单逻辑
await context.Publish(new OrderProcessed
{
OrderId = order.OrderId,
ProcessedAt = DateTime.UtcNow,
Status = "Completed"
});
}
}
// 启动配置
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});MassTransit 生产者配置
csharp
// OrderService.cs
using MassTransit;
using Contracts;
public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly ISendEndpointProvider _sendEndpointProvider;
public OrderService(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider)
{
_publishEndpoint = publishEndpoint;
_sendEndpointProvider = sendEndpointProvider;
}
public async Task CreateOrder(Order order)
{
// 发布事件
await _publishEndpoint.Publish(new OrderCreated
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Amount = order.Amount,
CreatedAt = DateTime.UtcNow
});
}
public async Task SendOrderToProcessing(string orderId)
{
// 发送命令
var endpoint = await _sendEndpointProvider.GetSendEndpoint(
new Uri("queue:order-processing"));
await endpoint.Send(new ProcessOrder
{
OrderId = orderId,
ProcessAt = DateTime.UtcNow
});
}
}Saga 编排器配置
csharp
// OrderStateMachine.cs
using Automatonymous;
using Contracts;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State Processing { get; private set; }
public State Completed { get; private set; }
public State Failed { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; private set; }
public Event<OrderProcessingCompleted> ProcessingCompleted { get; private set; }
public Event<OrderProcessingFailed> ProcessingFailed { get; private set; }
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderSubmitted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => ProcessingCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => ProcessingFailed, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(OrderSubmitted)
.Then(context =>
{
context.Instance.OrderId = context.Data.OrderId;
context.Instance.CustomerId = context.Data.CustomerId;
context.Instance.SubmittedAt = DateTime.UtcNow;
})
.TransitionTo(Submitted)
.Publish(context => new OrderProcessingStarted
{
OrderId = context.Instance.OrderId
})
);
During(Submitted,
When(ProcessingCompleted)
.TransitionTo(Completed)
.Publish(context => new OrderCompleted
{
OrderId = context.Instance.OrderId,
CompletedAt = DateTime.UtcNow
}),
When(ProcessingFailed)
.TransitionTo(Failed)
.Publish(context => new OrderFailed
{
OrderId = context.Instance.OrderId,
Reason = context.Data.Reason
})
);
}
}
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string OrderId { get; set; }
public string CustomerId { get; set; }
public DateTime? SubmittedAt { get; set; }
public DateTime? CompletedAt { get; set; }
}PHP 与 MassTransit 集成
PHP 消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class MassTransitProducer
{
private $connection;
private $channel;
private const MT_EXCHANGE = 'MassTransit';
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupTopology();
}
private function setupTopology(): void
{
$this->channel->exchange_declare(
self::MT_EXCHANGE,
'fanout',
false,
true,
false
);
$this->channel->exchange_declare(
'mt-event',
'topic',
false,
true,
false
);
}
public function publishEvent(string $eventType, array $data): bool
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => uuid_v4(),
'timestamp' => time(),
'headers' => new AMQPTable([
'Content-Type' => 'application/json',
'MT-MessageType' => $eventType,
'MT-ConversationId' => uuid_v4(),
'MT-CorrelationId' => $data['correlationId'] ?? uuid_v4(),
'MT-InitiatorId' => $data['initiatorId'] ?? '',
'MT-SentTime' => date('c'),
])
]
);
$this->channel->basic_publish(
$message,
self::MT_EXCHANGE,
''
);
return true;
}
public function sendCommand(string $queue, array $data): bool
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => uuid_v4(),
'timestamp' => time(),
'reply_to' => 'mt-response-' . $data['messageId'] ?? '',
'headers' => new AMQPTable([
'Content-Type' => 'application/json',
'MT-MessageType' => 'command',
'MT-ConversationId' => uuid_v4(),
'MT-CorrelationId' => $data['correlationId'] ?? uuid_v4(),
'MT-RequestId' => $data['messageId'] ?? uuid_v4(),
'MT-SentTime' => date('c'),
])
]
);
$this->channel->basic_publish(
$message,
'',
$queue
);
return true;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
function uuid_v4(): string
{
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff)
);
}
$producer = new MassTransitProducer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$producer->publishEvent('OrderCreated', [
'orderId' => 'ORD-' . time(),
'customerId' => 'CUST-001',
'amount' => 1500.00,
'createdAt' => date('c'),
'correlationId' => uuid_v4(),
'initiatorId' => 'user-001',
]);
echo "MassTransit 事件已发布\n";
$producer->sendCommand('order-processing', [
'orderId' => 'ORD-' . time(),
'messageId' => uuid_v4(),
'correlationId' => uuid_v4(),
'processAt' => date('c'),
]);
echo "MassTransit 命令已发送\n";
$producer->close();PHP 消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MassTransitConsumer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setupQueue(string $queue, string $exchange, string $routingKey = '#'): void
{
$this->channel->exchange_declare($exchange, 'topic', false, true, false);
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, $exchange, $routingKey);
}
public function consume(string $queue, callable $processor): void
{
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($processor) {
$this->processMassTransitMessage($message, $processor);
}
);
echo "等待 MassTransit 消息...\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function processMassTransitMessage(AMQPMessage $message, callable $processor): void
{
try {
$body = json_decode($message->getBody(), true);
$headers = $this->extractMassTransitHeaders($message);
echo "收到 MassTransit 消息:\n";
echo " Message Type: " . ($headers['MT-MessageType'] ?? 'N/A') . "\n";
echo " Correlation ID: " . ($headers['MT-CorrelationId'] ?? 'N/A') . "\n";
echo " Conversation ID: " . ($headers['MT-ConversationId'] ?? 'N/A') . "\n";
echo " Sent Time: " . ($headers['MT-SentTime'] ?? 'N/A') . "\n";
$result = $processor($body, $headers);
if ($result) {
$message->ack();
} else {
$message->nack(false, true);
}
} catch (Exception $e) {
echo "处理异常: " . $e->getMessage() . "\n";
$message->nack(false, false);
}
}
private function extractMassTransitHeaders(AMQPMessage $message): array
{
$headers = [];
if ($message->has('application_headers')) {
$appHeaders = $message->get('application_headers')->getNativeData();
foreach ($appHeaders as $key => $value) {
if (strpos($key, 'MT-') === 0) {
$headers[$key] = $value;
}
}
}
return $headers;
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
class MassTransitMessageHandler
{
public function handleOrderCreated(array $data, array $headers): bool
{
echo "处理订单创建事件:\n";
echo " 订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
echo " 客户ID: " . ($data['customerId'] ?? 'N/A') . "\n";
echo " 金额: " . ($data['amount'] ?? 0) . "\n";
return true;
}
public function handleOrderProcessed(array $data, array $headers): bool
{
echo "处理订单处理完成事件:\n";
echo " 订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
echo " 状态: " . ($data['status'] ?? 'N/A') . "\n";
return true;
}
}
$consumer = new MassTransitConsumer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$consumer->setupQueue('queue.php-consumer', 'MassTransit', 'mt-event');
$handler = new MassTransitMessageHandler();
$consumer->consume('queue.php-consumer', function (array $data, array $headers) use ($handler) {
$messageType = $headers['MT-MessageType'] ?? '';
switch ($messageType) {
case 'OrderCreated':
return $handler->handleOrderCreated($data, $headers);
case 'OrderProcessed':
return $handler->handleOrderProcessed($data, $headers);
default:
echo "未知消息类型: {$messageType}\n";
return true;
}
});
$consumer->close();实际应用场景
场景一:订单处理Saga
csharp
// MassTransit Saga 启动配置
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.RabbitMqRepository();
x.AddConfigureEndpointsCallback((context, name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.BindMessageExchanges = true;
}
});
});
// PHP 客户端交互
$producer = new MassTransitProducer($config);
$producer->publishEvent('OrderSubmitted', [
'orderId' => 'ORD-' . time(),
'customerId' => 'CUST-001',
'amount' => 2500.00,
'correlationId' => uuid_v4(),
'initiatorId' => 'api-gateway',
'createdAt' => date('c'),
]);
echo "订单已提交到 Saga\n";场景二:请求-回复模式
csharp
// MassTransit 请求-响应
public class OrderStatusRequest
{
public string OrderId { get; set; }
}
public class OrderStatusResponse
{
public string OrderId { get; set; }
public string Status { get; set; }
public DateTime UpdatedAt { get; set; }
}
public class OrderStatusConsumer : IConsumer<OrderStatusRequest>
{
public async Task Consume(ConsumeContext<OrderStatusRequest> context)
{
var status = await GetOrderStatus(context.Message.OrderId);
await context.RespondAsync(new OrderStatusResponse
{
OrderId = context.Message.OrderId,
Status = status,
UpdatedAt = DateTime.UtcNow
});
}
}
// PHP 发送请求
class MassTransitRequestClient
{
private $connection;
private $channel;
public function request(string $queue, array $data): array
{
$replyQueue = 'mt-reply-' . uniqid();
$this->channel->queue_declare($replyQueue, false, true, false, false);
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'reply_to' => $replyQueue,
'message_id' => uuid_v4(),
'headers' => new AMQPTable([
'MT-MessageType' => 'request',
'MT-RequestId' => uuid_v4(),
'MT-ResponseType' => 'OrderStatusResponse',
])
]
);
$this->channel->basic_publish($message, '', $queue);
return $this->waitForResponse($replyQueue);
}
private function waitForResponse(string $queue): array
{
// 实现响应等待逻辑
}
}场景三:异常处理与重试
csharp
// MassTransit 异常处理配置
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseMessageRetry(r => r
.Interval(3, TimeSpan.FromSeconds(5))
.Handle<BusinessException>());
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
// 错误队列处理
public class ErrorConsumer : IConsumer<Fault<OrderCreated>>
{
public async Task Consume(ConsumeContext<Fault<OrderCreated>> context)
{
var fault = context.Message;
foreach (var exception in fault.Exceptions)
{
Console.WriteLine($"Error: {exception.Message}");
}
// 记录错误并告警
await Task.CompletedTask;
}
}常见问题与解决方案
问题一:消息序列化格式
MassTransit 使用 JSON 序列化消息,需要注意 PascalCase 命名。
解决方案:
php
$producer->publishEvent('OrderCreated', [
'OrderId' => 'ORD-' . time(),
'CustomerId' => 'CUST-001',
'Amount' => 1500.00,
'CreatedAt' => date('c'),
]);问题二:消息类型识别
MassTransit 通过消息头识别消息类型。
解决方案:
php
$headers = new AMQPTable([
'MT-MessageType' => 'OrderCreated, Contracts',
'MT-ContentType' => 'application/json',
]);问题三:响应队列管理
请求-回复模式需要管理临时响应队列。
解决方案:
php
$replyQueue = 'response-' . $correlationId;
$this->channel->queue_declare($replyQueue, false, true, true, false);最佳实践建议
1. 消息契约设计
csharp
// 命名空间约定
namespace Contracts.Events;
namespace Contracts.Commands;2. 消费者组配置
php
$headers = new AMQPTable([
'MT-ConsumerId' => 'php-consumer-group',
]);3. 消息追踪
php
$headers = new AMQPTable([
'MT-ActivityId' => $activityId,
'MT-CorrelationId' => $correlationId,
'MT-ConversationId' => $conversationId,
]);版本兼容性
| MassTransit | .NET | RabbitMQ Client | RabbitMQ Server |
|---|---|---|---|
| 8.1.x | .NET 8 | 6.x | 3.11+ |
| 8.0.x | .NET 8 | 6.x | 3.10+ |
| 7.3.x | .NET 6/7 | 6.x | 3.9+ |
| 7.2.x | .NET 6 | 6.x | 3.8+ |
| 7.1.x | .NET 5 | 6.x | 3.8+ |
