Skip to content

MassTransit 与 RabbitMQ 集成

概述

MassTransit 是一个开源的 .NET 消息传递框架,提供了简洁的 API 用于构建分布式应用程序。它支持多种消息传输协议,包括 RabbitMQ。MassTransit 提供了可靠的消息传递、分布式事务支持 sagas 模式等功能,是 .NET 生态中非常流行的消息框架。

本教程将详细介绍 MassTransit 与 RabbitMQ 的集成方式,包括配置、消息发送/接收、Saga 模式等,帮助 PHP 开发者理解如何与基于 MassTransit 的 .NET 服务进行交互。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                     MassTransit Application (.NET)                   │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                      MassTransit Runtime                      │    │
│  │                                                              │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Producer  │    │   Consumer  │    │    Saga     │     │    │
│  │  │             │    │             │    │   Engine    │     │    │
│  │  │  Publish()  │    │  Consume()  │    │             │     │    │
│  │  │  Send()     │    │  Handle()   │    │  Orchestrate│     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  │                                                              │    │
│  │  ┌─────────────────────────────────────────────────────┐    │    │
│  │  │              RabbitMQ Transport                       │    │    │
│  │  │  • Connection Management                             │    │    │
│  │  │  • Message Serialization                            │    │    │
│  │  │  • Topology Management                               │    │    │
│  │  └─────────────────────────────────────────────────────┘    │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│                    RabbitMQ Broker                                  │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    Exchanges                                │    │
│  │    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐  │    │
│  │    │  (MassTransit)  │    │  (MassTransit)  │    │   (Error)   │  │    │
│  │    │  __MT_.*      │    │  __MT_.*      │    │             │  │    │
│  │    └─────────────┘    └─────────────┘    └─────────────┘  │    │
│  └─────────────────────────────────────────────────────────────┘    │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    Queues                                    │    │
│  │    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐  │    │
│  │    │  Consumer   │    │  Error      │    │   Audit     │  │    │
│  │    │  Queues     │    │  Queues     │    │   Queues    │  │    │
│  │    └─────────────┘    └─────────────┘    └─────────────┘  │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────────┐
│                  PHP Application                                     │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    PHP Consumer                               │    │
│  │                                                              │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   RabbitMQ  │◄───│   Message   │◄───│   Business  │     │    │
│  │  │   Client    │    │   Parser    │    │   Logic     │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

核心概念

概念说明
Message消息,MassTransit 中消息分为 Command(命令)和 Event(事件)
Consumer消费者,处理消息的服务类
Producer生产者,发送消息的客户端
Saga编排器,协调长时间运行的工作流
ReceiveEndpoint接收端点,绑定消费者到队列
Message Contract消息契约,消息的数据结构定义

MassTransit 配置示例

.NET 项目配置

csharp
// .csproj
<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="MassTransit" Version="8.1.3" />
    <PackageReference Include="MassTransit.RabbitMQ" Version="8.1.3" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
  </ItemGroup>

</Project>

appsettings.json 配置

json
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "MassTransit": "Debug"
    }
  },
  "MassTransit": {
    "Host": "localhost",
    "Port": 5672,
    "VirtualHost": "/",
    "Username": "guest",
    "Password": "guest",
    "PrefetchCount": 16,
    "ConcurrentConsumerLimit": 8
  }
}

MassTransit 启动配置

csharp
// Program.cs
using MassTransit;
using Microsoft.Extensions.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        
        cfg.PrefetchCount = 16;
        cfg.ConcurrentMessageLimit = 8;
        
        cfg.ConfigureEndpoints(context);
    });
});

builder.Services.AddHostedService<Worker>();

var app = builder.Build();
app.Run();

MassTransit 消费者配置

csharp
// OrderConsumer.cs
using MassTransit;
using Contracts;

public class OrderConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var order = context.Message;
        
        Console.WriteLine($"Received Order: {order.OrderId}");
        
        // 处理订单逻辑
        await context.Publish(new OrderProcessed
        {
            OrderId = order.OrderId,
            ProcessedAt = DateTime.UtcNow,
            Status = "Completed"
        });
    }
}

// 启动配置
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

MassTransit 生产者配置

csharp
// OrderService.cs
using MassTransit;
using Contracts;

public class OrderService
{
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly ISendEndpointProvider _sendEndpointProvider;

    public OrderService(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider)
    {
        _publishEndpoint = publishEndpoint;
        _sendEndpointProvider = sendEndpointProvider;
    }

    public async Task CreateOrder(Order order)
    {
        // 发布事件
        await _publishEndpoint.Publish(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Amount = order.Amount,
            CreatedAt = DateTime.UtcNow
        });
    }

    public async Task SendOrderToProcessing(string orderId)
    {
        // 发送命令
        var endpoint = await _sendEndpointProvider.GetSendEndpoint(
            new Uri("queue:order-processing"));
        
        await endpoint.Send(new ProcessOrder
        {
            OrderId = orderId,
            ProcessAt = DateTime.UtcNow
        });
    }
}

Saga 编排器配置

csharp
// OrderStateMachine.cs
using Automatonymous;
using Contracts;

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Processing { get; private set; }
    public State Completed { get; private set; }
    public State Failed { get; private set; }

    public Event<OrderSubmitted> OrderSubmitted { get; private set; }
    public Event<OrderProcessingCompleted> ProcessingCompleted { get; private set; }
    public Event<OrderProcessingFailed> ProcessingFailed { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => OrderSubmitted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => ProcessingCompleted, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => ProcessingFailed, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(OrderSubmitted)
                .Then(context =>
                {
                    context.Instance.OrderId = context.Data.OrderId;
                    context.Instance.CustomerId = context.Data.CustomerId;
                    context.Instance.SubmittedAt = DateTime.UtcNow;
                })
                .TransitionTo(Submitted)
                .Publish(context => new OrderProcessingStarted
                {
                    OrderId = context.Instance.OrderId
                })
        );

        During(Submitted,
            When(ProcessingCompleted)
                .TransitionTo(Completed)
                .Publish(context => new OrderCompleted
                {
                    OrderId = context.Instance.OrderId,
                    CompletedAt = DateTime.UtcNow
                }),
            When(ProcessingFailed)
                .TransitionTo(Failed)
                .Publish(context => new OrderFailed
                {
                    OrderId = context.Instance.OrderId,
                    Reason = context.Data.Reason
                })
        );
    }
}

public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public DateTime? SubmittedAt { get; set; }
    public DateTime? CompletedAt { get; set; }
}

PHP 与 MassTransit 集成

PHP 消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class MassTransitProducer
{
    private $connection;
    private $channel;
    
    private const MT_EXCHANGE = 'MassTransit';
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupTopology();
    }
    
    private function setupTopology(): void
    {
        $this->channel->exchange_declare(
            self::MT_EXCHANGE,
            'fanout',
            false,
            true,
            false
        );
        
        $this->channel->exchange_declare(
            'mt-event',
            'topic',
            false,
            true,
            false
        );
    }
    
    public function publishEvent(string $eventType, array $data): bool
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => uuid_v4(),
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'Content-Type' => 'application/json',
                    'MT-MessageType' => $eventType,
                    'MT-ConversationId' => uuid_v4(),
                    'MT-CorrelationId' => $data['correlationId'] ?? uuid_v4(),
                    'MT-InitiatorId' => $data['initiatorId'] ?? '',
                    'MT-SentTime' => date('c'),
                ])
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            self::MT_EXCHANGE,
            ''
        );
        
        return true;
    }
    
    public function sendCommand(string $queue, array $data): bool
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => uuid_v4(),
                'timestamp' => time(),
                'reply_to' => 'mt-response-' . $data['messageId'] ?? '',
                'headers' => new AMQPTable([
                    'Content-Type' => 'application/json',
                    'MT-MessageType' => 'command',
                    'MT-ConversationId' => uuid_v4(),
                    'MT-CorrelationId' => $data['correlationId'] ?? uuid_v4(),
                    'MT-RequestId' => $data['messageId'] ?? uuid_v4(),
                    'MT-SentTime' => date('c'),
                ])
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            '',
            $queue
        );
        
        return true;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

function uuid_v4(): string
{
    return sprintf(
        '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
        mt_rand(0, 0xffff),
        mt_rand(0, 0xffff),
        mt_rand(0, 0xffff),
        mt_rand(0, 0x0fff) | 0x4000,
        mt_rand(0, 0x3fff) | 0x8000,
        mt_rand(0, 0xffff),
        mt_rand(0, 0xffff),
        mt_rand(0, 0xffff)
    );
}

$producer = new MassTransitProducer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$producer->publishEvent('OrderCreated', [
    'orderId' => 'ORD-' . time(),
    'customerId' => 'CUST-001',
    'amount' => 1500.00,
    'createdAt' => date('c'),
    'correlationId' => uuid_v4(),
    'initiatorId' => 'user-001',
]);

echo "MassTransit 事件已发布\n";

$producer->sendCommand('order-processing', [
    'orderId' => 'ORD-' . time(),
    'messageId' => uuid_v4(),
    'correlationId' => uuid_v4(),
    'processAt' => date('c'),
]);

echo "MassTransit 命令已发送\n";

$producer->close();

PHP 消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class MassTransitConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setupQueue(string $queue, string $exchange, string $routingKey = '#'): void
    {
        $this->channel->exchange_declare($exchange, 'topic', false, true, false);
        
        $this->channel->queue_declare($queue, false, true, false, false);
        $this->channel->queue_bind($queue, $exchange, $routingKey);
    }
    
    public function consume(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(0, 10, false);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($processor) {
                $this->processMassTransitMessage($message, $processor);
            }
        );
        
        echo "等待 MassTransit 消息...\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function processMassTransitMessage(AMQPMessage $message, callable $processor): void
    {
        try {
            $body = json_decode($message->getBody(), true);
            $headers = $this->extractMassTransitHeaders($message);
            
            echo "收到 MassTransit 消息:\n";
            echo "  Message Type: " . ($headers['MT-MessageType'] ?? 'N/A') . "\n";
            echo "  Correlation ID: " . ($headers['MT-CorrelationId'] ?? 'N/A') . "\n";
            echo "  Conversation ID: " . ($headers['MT-ConversationId'] ?? 'N/A') . "\n";
            echo "  Sent Time: " . ($headers['MT-SentTime'] ?? 'N/A') . "\n";
            
            $result = $processor($body, $headers);
            
            if ($result) {
                $message->ack();
            } else {
                $message->nack(false, true);
            }
        } catch (Exception $e) {
            echo "处理异常: " . $e->getMessage() . "\n";
            $message->nack(false, false);
        }
    }
    
    private function extractMassTransitHeaders(AMQPMessage $message): array
    {
        $headers = [];
        
        if ($message->has('application_headers')) {
            $appHeaders = $message->get('application_headers')->getNativeData();
            
            foreach ($appHeaders as $key => $value) {
                if (strpos($key, 'MT-') === 0) {
                    $headers[$key] = $value;
                }
            }
        }
        
        return $headers;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class MassTransitMessageHandler
{
    public function handleOrderCreated(array $data, array $headers): bool
    {
        echo "处理订单创建事件:\n";
        echo "  订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
        echo "  客户ID: " . ($data['customerId'] ?? 'N/A') . "\n";
        echo "  金额: " . ($data['amount'] ?? 0) . "\n";
        
        return true;
    }
    
    public function handleOrderProcessed(array $data, array $headers): bool
    {
        echo "处理订单处理完成事件:\n";
        echo "  订单ID: " . ($data['orderId'] ?? 'N/A') . "\n";
        echo "  状态: " . ($data['status'] ?? 'N/A') . "\n";
        
        return true;
    }
}

$consumer = new MassTransitConsumer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$consumer->setupQueue('queue.php-consumer', 'MassTransit', 'mt-event');

$handler = new MassTransitMessageHandler();

$consumer->consume('queue.php-consumer', function (array $data, array $headers) use ($handler) {
    $messageType = $headers['MT-MessageType'] ?? '';
    
    switch ($messageType) {
        case 'OrderCreated':
            return $handler->handleOrderCreated($data, $headers);
        case 'OrderProcessed':
            return $handler->handleOrderProcessed($data, $headers);
        default:
            echo "未知消息类型: {$messageType}\n";
            return true;
    }
});

$consumer->close();

实际应用场景

场景一:订单处理Saga

csharp
// MassTransit Saga 启动配置
builder.Services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .RabbitMqRepository();
    
    x.AddConfigureEndpointsCallback((context, name, cfg) =>
    {
        if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
        {
            rmq.BindMessageExchanges = true;
        }
    });
});

// PHP 客户端交互
$producer = new MassTransitProducer($config);

$producer->publishEvent('OrderSubmitted', [
    'orderId' => 'ORD-' . time(),
    'customerId' => 'CUST-001',
    'amount' => 2500.00,
    'correlationId' => uuid_v4(),
    'initiatorId' => 'api-gateway',
    'createdAt' => date('c'),
]);

echo "订单已提交到 Saga\n";

场景二:请求-回复模式

csharp
// MassTransit 请求-响应
public class OrderStatusRequest
{
    public string OrderId { get; set; }
}

public class OrderStatusResponse
{
    public string OrderId { get; set; }
    public string Status { get; set; }
    public DateTime UpdatedAt { get; set; }
}

public class OrderStatusConsumer : IConsumer<OrderStatusRequest>
{
    public async Task Consume(ConsumeContext<OrderStatusRequest> context)
    {
        var status = await GetOrderStatus(context.Message.OrderId);
        
        await context.RespondAsync(new OrderStatusResponse
        {
            OrderId = context.Message.OrderId,
            Status = status,
            UpdatedAt = DateTime.UtcNow
        });
    }
}

// PHP 发送请求
class MassTransitRequestClient
{
    private $connection;
    private $channel;
    
    public function request(string $queue, array $data): array
    {
        $replyQueue = 'mt-reply-' . uniqid();
        
        $this->channel->queue_declare($replyQueue, false, true, false, false);
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'reply_to' => $replyQueue,
                'message_id' => uuid_v4(),
                'headers' => new AMQPTable([
                    'MT-MessageType' => 'request',
                    'MT-RequestId' => uuid_v4(),
                    'MT-ResponseType' => 'OrderStatusResponse',
                ])
            ]
        );
        
        $this->channel->basic_publish($message, '', $queue);
        
        return $this->waitForResponse($replyQueue);
    }
    
    private function waitForResponse(string $queue): array
    {
        // 实现响应等待逻辑
    }
}

场景三:异常处理与重试

csharp
// MassTransit 异常处理配置
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.UseMessageRetry(r => r
            .Interval(3, TimeSpan.FromSeconds(5))
            .Handle<BusinessException>());
        
        cfg.UseInMemoryOutbox();
        
        cfg.ConfigureEndpoints(context);
    });
});

// 错误队列处理
public class ErrorConsumer : IConsumer<Fault<OrderCreated>>
{
    public async Task Consume(ConsumeContext<Fault<OrderCreated>> context)
    {
        var fault = context.Message;
        
        foreach (var exception in fault.Exceptions)
        {
            Console.WriteLine($"Error: {exception.Message}");
        }
        
        // 记录错误并告警
        await Task.CompletedTask;
    }
}

常见问题与解决方案

问题一:消息序列化格式

MassTransit 使用 JSON 序列化消息,需要注意 PascalCase 命名。

解决方案:

php
$producer->publishEvent('OrderCreated', [
    'OrderId' => 'ORD-' . time(),
    'CustomerId' => 'CUST-001',
    'Amount' => 1500.00,
    'CreatedAt' => date('c'),
]);

问题二:消息类型识别

MassTransit 通过消息头识别消息类型。

解决方案:

php
$headers = new AMQPTable([
    'MT-MessageType' => 'OrderCreated, Contracts',
    'MT-ContentType' => 'application/json',
]);

问题三:响应队列管理

请求-回复模式需要管理临时响应队列。

解决方案:

php
$replyQueue = 'response-' . $correlationId;
$this->channel->queue_declare($replyQueue, false, true, true, false);

最佳实践建议

1. 消息契约设计

csharp
// 命名空间约定
namespace Contracts.Events;
namespace Contracts.Commands;

2. 消费者组配置

php
$headers = new AMQPTable([
    'MT-ConsumerId' => 'php-consumer-group',
]);

3. 消息追踪

php
$headers = new AMQPTable([
    'MT-ActivityId' => $activityId,
    'MT-CorrelationId' => $correlationId,
    'MT-ConversationId' => $conversationId,
]);

版本兼容性

MassTransit.NETRabbitMQ ClientRabbitMQ Server
8.1.x.NET 86.x3.11+
8.0.x.NET 86.x3.10+
7.3.x.NET 6/76.x3.9+
7.2.x.NET 66.x3.8+
7.1.x.NET 56.x3.8+

相关链接