Skip to content

RabbitMQ 与 CI/CD 集成

概述

在现代软件开发中,持续集成和持续部署(CI/CD)是提高开发效率和软件质量的关键实践。将 RabbitMQ 与 CI/CD 流程集成,可以实现自动化测试、部署通知、构建事件传播等功能,构建更加智能和高效的开发运维流程。

本教程将详细介绍 RabbitMQ 与主流 CI/CD 工具(Jenkins、GitLab CI、GitHub Actions)的集成方案。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                    CI/CD + RabbitMQ 集成架构                         │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    代码仓库层                                │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   GitHub    │    │   GitLab    │    │   Bitbucket │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    CI/CD 工具层                              │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Jenkins   │    │  GitLab CI  │    │  GitHub     │     │    │
│  │  │             │    │             │    │  Actions    │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    RabbitMQ                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   Build     │    │   Deploy    │    │   Notify    │     │    │
│  │  │   Events    │    │   Events    │    │   Events    │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    消费者层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  通知服务   │    │  监控系统   │    │  审计日志   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Build Notification构建通知,发布构建状态事件
Deploy Trigger部署触发,通过消息触发部署流程
Test Result测试结果,发布测试结果事件
Audit Trail审计追踪,记录所有 CI/CD 操作

Jenkins 集成

Jenkins 插件配置

groovy
// Jenkinsfile
pipeline {
    agent any
    
    environment {
        RABBITMQ_HOST = credentials('rabbitmq-host')
        RABBITMQ_USER = credentials('rabbitmq-user')
        RABBITMQ_PASSWORD = credentials('rabbitmq-password')
    }
    
    stages {
        stage('Checkout') {
            steps {
                checkout scm
                script {
                    sendRabbitMQEvent('build.started', [
                        buildId: env.BUILD_ID,
                        jobName: env.JOB_NAME,
                        branch: env.GIT_BRANCH,
                        commit: env.GIT_COMMIT,
                        timestamp: System.currentTimeMillis()
                    ])
                }
            }
        }
        
        stage('Build') {
            steps {
                sh 'make build'
            }
            post {
                success {
                    script {
                        sendRabbitMQEvent('build.completed', [
                            buildId: env.BUILD_ID,
                            status: 'success',
                            duration: currentBuild.duration
                        ])
                    }
                }
                failure {
                    script {
                        sendRabbitMQEvent('build.failed', [
                            buildId: env.BUILD_ID,
                            status: 'failed',
                            duration: currentBuild.duration
                        ])
                    }
                }
            }
        }
        
        stage('Test') {
            steps {
                sh 'make test'
            }
            post {
                always {
                    junit '**/test-results/*.xml'
                    script {
                        sendRabbitMQEvent('test.completed', [
                            buildId: env.BUILD_ID,
                            testResults: 'junit'
                        ])
                    }
                }
            }
        }
        
        stage('Deploy') {
            when {
                branch 'main'
            }
            steps {
                sh 'make deploy'
                script {
                    sendRabbitMQEvent('deploy.started', [
                        buildId: env.BUILD_ID,
                        environment: 'production'
                    ])
                }
            }
            post {
                success {
                    script {
                        sendRabbitMQEvent('deploy.completed', [
                            buildId: env.BUILD_ID,
                            status: 'success'
                        ])
                    }
                }
            }
        }
    }
    
    post {
        always {
            script {
                sendRabbitMQEvent('pipeline.completed', [
                    buildId: env.BUILD_ID,
                    jobName: env.JOB_NAME,
                    status: currentBuild.result ?: 'SUCCESS',
                    duration: currentBuild.duration
                ])
            }
        }
    }
}

def sendRabbitMQEvent(String eventType, Map data) {
    def payload = [
        eventType: eventType,
        data: data,
        timestamp: System.currentTimeMillis(),
        source: 'jenkins'
    ]
    
    sh """
        curl -u ${RABBITMQ_USER}:${RABBITMQ_PASSWORD} \
             -X POST \
             -H "Content-Type: application/json" \
             -d '${groovy.json.JsonOutput.toJson(payload)}' \
             http://${RABBITMQ_HOST}:15672/api/exchanges/%2f/ci.events/publish \
             --data-raw '{"properties":{},"routing_key":"${eventType}","payload":"${groovy.json.JsonOutput.toJson(payload)}","payload_encoding":"string"}'
    """
}

Jenkins 共享库

groovy
// vars/rabbitMQNotify.groovy
def call(String eventType, Map data) {
    def config = [
        host: env.RABBITMQ_HOST ?: 'localhost',
        port: env.RABBITMQ_PORT ?: '5672',
        user: env.RABBITMQ_USER ?: 'guest',
        password: env.RABBITMQ_PASSWORD ?: 'guest',
        exchange: env.RABBITMQ_EXCHANGE ?: 'ci.events'
    ]
    
    def payload = [
        eventType: eventType,
        data: data,
        timestamp: System.currentTimeMillis(),
        build: [
            id: env.BUILD_ID,
            number: env.BUILD_NUMBER,
            url: env.BUILD_URL,
            jobName: env.JOB_NAME
        ],
        git: [
            branch: env.GIT_BRANCH,
            commit: env.GIT_COMMIT,
            url: env.GIT_URL
        ]
    ]
    
    try {
        sh """
            python3 -c "
import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='${config.host}',
        port=${config.port},
        credentials=pika.PlainCredentials('${config.user}', '${config.password}')
    )
)
channel = connection.channel()
channel.exchange_declare(exchange='${config.exchange}', exchange_type='topic', durable=True)
channel.basic_publish(
    exchange='${config.exchange}',
    routing_key='${eventType}',
    body=json.dumps(${groovy.json.JsonOutput.toJson(payload)}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        content_type='application/json'
    )
)
connection.close()
"
        """
        echo "RabbitMQ event sent: ${eventType}"
    } catch (Exception e) {
        echo "Failed to send RabbitMQ event: ${e.message}"
    }
}

GitLab CI 集成

GitLab CI 配置

yaml
# .gitlab-ci.yml
stages:
  - build
  - test
  - deploy

variables:
  RABBITMQ_HOST: "localhost"
  RABBITMQ_PORT: "5672"
  RABBITMQ_USER: "guest"
  RABBITMQ_PASSWORD: "guest"
  RABBITMQ_EXCHANGE: "ci.events"

.before_script: &rabbitmq_setup
  - |
    send_rabbitmq_event() {
      local event_type=$1
      local data=$2
      local payload=$(cat <<EOF
    {
      "eventType": "$event_type",
      "data": $data,
      "timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
      "source": "gitlab-ci",
      "build": {
        "id": "$CI_PIPELINE_ID",
        "job": "$CI_JOB_NAME",
        "stage": "$CI_JOB_STAGE",
        "url": "$CI_PIPELINE_URL"
      },
      "git": {
        "branch": "$CI_COMMIT_REF_NAME",
        "commit": "$CI_COMMIT_SHA",
        "message": "$CI_COMMIT_MESSAGE",
        "author": "$CI_COMMIT_AUTHOR"
      }
    }
    EOF
      )
      
      python3 -c "
    import pika
    import json
    import os
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host=os.environ.get('RABBITMQ_HOST', 'localhost'),
            port=int(os.environ.get('RABBITMQ_PORT', '5672')),
            credentials=pika.PlainCredentials(
                os.environ.get('RABBITMQ_USER', 'guest'),
                os.environ.get('RABBITMQ_PASSWORD', 'guest')
            )
        )
    )
    channel = connection.channel()
    channel.exchange_declare(exchange='${RABBITMQ_EXCHANGE}', exchange_type='topic', durable=True)
    channel.basic_publish(
        exchange='${RABBITMQ_EXCHANGE}',
        routing_key='$event_type',
        body='''$payload''',
        properties=pika.BasicProperties(
            delivery_mode=2,
            content_type='application/json'
        )
    )
    connection.close()
    "
    }

build:
  stage: build
  before_script:
    - *rabbitmq_setup
  script:
    - send_rabbitmq_event 'build.started' '{"status": "running"}'
    - make build
    - send_rabbitmq_event 'build.completed' '{"status": "success"}'
  after_script:
    - |
      if [ "$CI_JOB_STATUS" = "failed" ]; then
        send_rabbitmq_event 'build.failed' '{"status": "failed", "error": "Build failed"}'
      fi

test:
  stage: test
  before_script:
    - *rabbitmq_setup
  script:
    - send_rabbitmq_event 'test.started' '{"status": "running"}'
    - make test
    - send_rabbitmq_event 'test.completed' '{"status": "success"}'
  artifacts:
    reports:
      junit: test-results/*.xml

deploy:
  stage: deploy
  before_script:
    - *rabbitmq_setup
  script:
    - send_rabbitmq_event 'deploy.started' '{"environment": "production", "status": "running"}'
    - make deploy
    - send_rabbitmq_event 'deploy.completed' '{"environment": "production", "status": "success"}'
  environment:
    name: production
    url: https://example.com
  only:
    - main

GitHub Actions 集成

GitHub Actions 工作流

yaml
# .github/workflows/ci.yml
name: CI/CD Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  RABBITMQ_HOST: localhost
  RABBITMQ_PORT: 5672
  RABBITMQ_EXCHANGE: ci.events

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install pika
      
      - name: Notify build started
        env:
          RABBITMQ_USER: ${{ secrets.RABBITMQ_USER }}
          RABBITMQ_PASSWORD: ${{ secrets.RABBITMQ_PASSWORD }}
        run: |
          python3 -c "
          import pika
          import json
          import os
          
          payload = {
              'eventType': 'build.started',
              'data': {'status': 'running'},
              'timestamp': '$(date -u +%Y-%m-%dT%H:%M:%SZ)',
              'source': 'github-actions',
              'build': {
                  'id': '${{ github.run_id }}',
                  'number': '${{ github.run_number }}',
                  'url': '${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}'
              },
              'git': {
                  'branch': '${{ github.ref_name }}',
                  'commit': '${{ github.sha }}',
                  'message': '${{ github.event.head_commit.message }}',
                  'author': '${{ github.actor }}'
              }
          }
          
          connection = pika.BlockingConnection(
              pika.ConnectionParameters(
                  host=os.environ.get('RABBITMQ_HOST', 'localhost'),
                  port=int(os.environ.get('RABBITMQ_PORT', '5672')),
                  credentials=pika.PlainCredentials(
                      os.environ.get('RABBITMQ_USER', 'guest'),
                      os.environ.get('RABBITMQ_PASSWORD', 'guest')
                  )
              )
          )
          channel = connection.channel()
          channel.exchange_declare(exchange='${{ env.RABBITMQ_EXCHANGE }}', exchange_type='topic', durable=True)
          channel.basic_publish(
              exchange='${{ env.RABBITMQ_EXCHANGE }}',
              routing_key='build.started',
              body=json.dumps(payload),
              properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
          )
          connection.close()
          "
      
      - name: Build
        run: make build
      
      - name: Notify build completed
        if: success()
        env:
          RABBITMQ_USER: ${{ secrets.RABBITMQ_USER }}
          RABBITMQ_PASSWORD: ${{ secrets.RABBITMQ_PASSWORD }}
        run: |
          python3 << 'EOF'
          import pika
          import json
          import os
          
          payload = {
              'eventType': 'build.completed',
              'data': {'status': 'success'},
              'timestamp': '$(date -u +%Y-%m-%dT%H:%M:%SZ)',
              'source': 'github-actions',
              'build': {
                  'id': '${{ github.run_id }}',
                  'number': '${{ github.run_number }}'
              }
          }
          
          connection = pika.BlockingConnection(
              pika.ConnectionParameters(
                  host=os.environ.get('RABBITMQ_HOST', 'localhost'),
                  credentials=pika.PlainCredentials(
                      os.environ.get('RABBITMQ_USER', 'guest'),
                      os.environ.get('RABBITMQ_PASSWORD', 'guest')
                  )
              )
          )
          channel = connection.channel()
          channel.exchange_declare(exchange='${{ env.RABBITMQ_EXCHANGE }}', exchange_type='topic', durable=True)
          channel.basic_publish(
              exchange='${{ env.RABBITMQ_EXCHANGE }}',
              routing_key='build.completed',
              body=json.dumps(payload),
              properties=pika.BasicProperties(delivery_mode=2)
          )
          connection.close()
          EOF

  test:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run tests
        run: make test
      - name: Publish test results
        uses: dorny/test-reporter@v1
        if: always()
        with:
          name: Test Results
          path: test-results/*.xml
          reporter: java-junit

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v4
      - name: Deploy
        run: make deploy

GitHub Actions 复合动作

yaml
# .github/actions/rabbitmq-notify/action.yml
name: 'RabbitMQ Notify'
description: 'Send event notification to RabbitMQ'
inputs:
  event-type:
    description: 'Event type (routing key)'
    required: true
  data:
    description: 'Event data as JSON string'
    required: false
    default: '{}'
  rabbitmq-host:
    description: 'RabbitMQ host'
    required: false
    default: 'localhost'
  rabbitmq-port:
    description: 'RabbitMQ port'
    required: false
    default: '5672'
  rabbitmq-user:
    description: 'RabbitMQ username'
    required: false
    default: 'guest'
  rabbitmq-password:
    description: 'RabbitMQ password'
    required: false
    default: 'guest'
  rabbitmq-exchange:
    description: 'RabbitMQ exchange'
    required: false
    default: 'ci.events'

runs:
  using: 'composite'
  steps:
    - name: Send notification
      shell: bash
      run: |
        python3 << 'EOF'
        import pika
        import json
        import os
        from datetime import datetime
        
        payload = {
            'eventType': '${{ inputs.event-type }}',
            'data': json.loads('${{ inputs.data }}'),
            'timestamp': datetime.utcnow().isoformat() + 'Z',
            'source': 'github-actions',
            'build': {
                'id': '${{ github.run_id }}',
                'number': '${{ github.run_number }}',
                'url': '${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}'
            },
            'git': {
                'branch': '${{ github.ref_name }}',
                'commit': '${{ github.sha }}',
                'actor': '${{ github.actor }}'
            }
        }
        
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host='${{ inputs.rabbitmq-host }}',
                port=int('${{ inputs.rabbitmq-port }}'),
                credentials=pika.PlainCredentials(
                    '${{ inputs.rabbitmq-user }}',
                    '${{ inputs.rabbitmq-password }}'
                )
            )
        )
        channel = connection.channel()
        channel.exchange_declare(exchange='${{ inputs.rabbitmq-exchange }}', exchange_type='topic', durable=True)
        channel.basic_publish(
            exchange='${{ inputs.rabbitmq-exchange }}',
            routing_key='${{ inputs.event-type }}',
            body=json.dumps(payload),
            properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
        )
        connection.close()
        print(f"Event sent: ${{ inputs.event-type }}")
        EOF

PHP 事件消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class CIEventConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            'ci.events',
            'topic',
            false,
            true,
            false
        );
        
        $this->channel->queue_declare(
            'ci.notifications',
            false,
            true,
            false,
            false
        );
        
        $this->channel->queue_bind(
            'ci.notifications',
            'ci.events',
            '#'
        );
    }
    
    public function consume(): void
    {
        $callback = function ($message) {
            $event = json_decode($message->getBody(), true);
            
            $this->handleEvent($event);
            
            $message->ack();
        };
        
        $this->channel->basic_qos(0, 10, false);
        $this->channel->basic_consume(
            'ci.notifications',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        echo "Waiting for CI/CD events...\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function handleEvent(array $event): void
    {
        $eventType = $event['eventType'] ?? 'unknown';
        $source = $event['source'] ?? 'unknown';
        
        echo "Received event: {$eventType} from {$source}\n";
        
        switch ($eventType) {
            case 'build.started':
                $this->handleBuildStarted($event);
                break;
            case 'build.completed':
                $this->handleBuildCompleted($event);
                break;
            case 'build.failed':
                $this->handleBuildFailed($event);
                break;
            case 'deploy.started':
                $this->handleDeployStarted($event);
                break;
            case 'deploy.completed':
                $this->handleDeployCompleted($event);
                break;
            case 'test.completed':
                $this->handleTestCompleted($event);
                break;
            default:
                $this->handleUnknownEvent($event);
        }
    }
    
    private function handleBuildStarted(array $event): void
    {
        $build = $event['build'] ?? [];
        $git = $event['git'] ?? [];
        
        echo "Build started:\n";
        echo "  Job: " . ($build['jobName'] ?? $build['job'] ?? 'N/A') . "\n";
        echo "  Branch: " . ($git['branch'] ?? 'N/A') . "\n";
        echo "  Commit: " . ($git['commit'] ?? 'N/A') . "\n";
        
        $this->sendNotification('build', 'started', $event);
    }
    
    private function handleBuildCompleted(array $event): void
    {
        $data = $event['data'] ?? [];
        
        echo "Build completed: " . ($data['status'] ?? 'unknown') . "\n";
        echo "  Duration: " . ($data['duration'] ?? 'N/A') . "ms\n";
        
        $this->sendNotification('build', 'completed', $event);
    }
    
    private function handleBuildFailed(array $event): void
    {
        $data = $event['data'] ?? [];
        
        echo "Build FAILED!\n";
        echo "  Error: " . ($data['error'] ?? 'Unknown error') . "\n";
        
        $this->sendNotification('build', 'failed', $event);
        $this->sendAlert('build_failed', $event);
    }
    
    private function handleDeployStarted(array $event): void
    {
        $data = $event['data'] ?? [];
        
        echo "Deploy started to: " . ($data['environment'] ?? 'unknown') . "\n";
        
        $this->sendNotification('deploy', 'started', $event);
    }
    
    private function handleDeployCompleted(array $event): void
    {
        $data = $event['data'] ?? [];
        
        echo "Deploy completed to: " . ($data['environment'] ?? 'unknown') . "\n";
        echo "  Status: " . ($data['status'] ?? 'unknown') . "\n";
        
        $this->sendNotification('deploy', 'completed', $event);
    }
    
    private function handleTestCompleted(array $event): void
    {
        echo "Tests completed\n";
        
        $this->sendNotification('test', 'completed', $event);
    }
    
    private function handleUnknownEvent(array $event): void
    {
        echo "Unknown event type: " . ($event['eventType'] ?? 'N/A') . "\n";
    }
    
    private function sendNotification(string $category, string $action, array $event): void
    {
        // 发送到通知服务
    }
    
    private function sendAlert(string $type, array $event): void
    {
        // 发送告警
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// 使用示例
$consumer = new CIEventConsumer([
    'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
    'port' => getenv('RABBITMQ_PORT') ?: 5672,
    'user' => getenv('RABBITMQ_USER') ?: 'guest',
    'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
]);

$consumer->consume();

$consumer->close();

实际应用场景

场景一:部署审批流程

php
<?php

class DeploymentApproval
{
    private $rabbitChannel;
    
    public function __construct(AMQPStreamConnection $rabbit)
    {
        $this->rabbitChannel = $rabbit->channel();
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure(): void
    {
        $this->rabbitChannel->exchange_declare(
            'deploy.approval',
            'direct',
            false,
            true,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'deploy.approval.requests',
            false,
            true,
            false,
            false
        );
        
        $this->rabbitChannel->queue_declare(
            'deploy.approval.responses',
            false,
            true,
            false,
            false
        );
    }
    
    public function requestApproval(array $deployment): string
    {
        $approvalId = uniqid('approval-', true);
        
        $request = [
            'approvalId' => $approvalId,
            'deployment' => $deployment,
            'status' => 'pending',
            'requestedAt' => date('c'),
            'requestedBy' => $deployment['requestedBy'] ?? 'system',
        ];
        
        $message = new AMQPMessage(
            json_encode($request),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $approvalId,
                'reply_to' => 'deploy.approval.responses',
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'deploy.approval',
            'approval.request'
        );
        
        return $approvalId;
    }
    
    public function waitForApproval(string $approvalId, int $timeout = 300): ?bool
    {
        $startTime = time();
        
        while (time() - $startTime < $timeout) {
            $message = $this->rabbitChannel->basic_get('deploy.approval.responses');
            
            if ($message) {
                $response = json_decode($message->getBody(), true);
                
                if ($response['approvalId'] === $approvalId) {
                    $message->ack();
                    return $response['approved'] === true;
                }
            }
            
            sleep(1);
        }
        
        return null;
    }
    
    public function approve(string $approvalId, string $approvedBy, string $comment = ''): void
    {
        $response = [
            'approvalId' => $approvalId,
            'approved' => true,
            'approvedBy' => $approvedBy,
            'comment' => $comment,
            'approvedAt' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($response),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'deploy.approval',
            'approval.response'
        );
    }
    
    public function reject(string $approvalId, string $rejectedBy, string $reason = ''): void
    {
        $response = [
            'approvalId' => $approvalId,
            'approved' => false,
            'rejectedBy' => $rejectedBy,
            'reason' => $reason,
            'rejectedAt' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($response),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'deploy.approval',
            'approval.response'
        );
    }
}

场景二:多环境部署协调

php
<?php

class MultiEnvironmentDeployer
{
    private $rabbitChannel;
    
    public function __construct(AMQPStreamConnection $rabbit)
    {
        $this->rabbitChannel = $rabbit->channel();
    }
    
    public function deployToAllEnvironments(array $deployment): void
    {
        $environments = ['staging', 'uat', 'production'];
        
        foreach ($environments as $env) {
            $this->deployToEnvironment($deployment, $env);
        }
    }
    
    private function deployToEnvironment(array $deployment, string $environment): void
    {
        $event = [
            'deploymentId' => $deployment['id'],
            'environment' => $environment,
            'version' => $deployment['version'],
            'status' => 'queued',
            'queuedAt' => date('c'),
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->rabbitChannel->basic_publish(
            $message,
            'deploy.queue',
            "deploy.{$environment}"
        );
    }
}

常见问题与解决方案

问题一:消息丢失

症状: CI/CD 事件未送达

解决方案: 使用持久化和确认机制

php
$channel->queue_declare('ci.events', false, true, false, false);

$message = new AMQPMessage(
    json_encode($event),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

问题二:事件顺序

症状: 部署事件顺序混乱

解决方案: 使用消息优先级或时间戳排序

php
$message = new AMQPMessage(
    json_encode($event),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'priority' => $priority,
        'timestamp' => time(),
    ]
);

问题三:重试机制

症状: 通知发送失败

解决方案: 实现重试和死信队列

php
$channel->queue_declare('ci.notifications', false, true, false, false, false,
    new \PhpAmqpLib\Wire\AMQPTable([
        'x-dead-letter-exchange' => 'ci.dlx',
        'x-dead-letter-routing-key' => 'failed',
    ])
);

最佳实践建议

1. 事件格式标准化

php
interface CIEventInterface
{
    public function getEventType(): string;
    public function getTimestamp(): string;
    public function getSource(): string;
    public function getData(): array;
    public function getMetadata(): array;
}

2. 错误处理

php
class CIEventErrorHandler
{
    public static function handle(Exception $e, array $event): void
    {
        error_log(sprintf(
            "[%s] CI Event Error: %s\nEvent: %s",
            date('c'),
            $e->getMessage(),
            json_encode($event)
        ));
    }
}

3. 监控指标

php
class CIMetrics
{
    public static function recordBuildTime(string $job, float $duration): void
    {
        // 记录构建时间
    }
    
    public static function recordDeployCount(string $environment): void
    {
        // 记录部署次数
    }
}

版本兼容性

JenkinsGitLab CIGitHub ActionsRabbitMQ
2.400+16.xLatest3.11+
2.300+15.xLatest3.10+
2.200+14.xLatest3.9+

相关链接