Appearance
RabbitMQ 与 CI/CD 集成
概述
在现代软件开发中,持续集成和持续部署(CI/CD)是提高开发效率和软件质量的关键实践。将 RabbitMQ 与 CI/CD 流程集成,可以实现自动化测试、部署通知、构建事件传播等功能,构建更加智能和高效的开发运维流程。
本教程将详细介绍 RabbitMQ 与主流 CI/CD 工具(Jenkins、GitLab CI、GitHub Actions)的集成方案。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ CI/CD + RabbitMQ 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 代码仓库层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ GitHub │ │ GitLab │ │ Bitbucket │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ CI/CD 工具层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Jenkins │ │ GitLab CI │ │ GitHub │ │ │
│ │ │ │ │ │ │ Actions │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Build │ │ Deploy │ │ Notify │ │ │
│ │ │ Events │ │ Events │ │ Events │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 消费者层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 通知服务 │ │ 监控系统 │ │ 审计日志 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| Build Notification | 构建通知,发布构建状态事件 |
| Deploy Trigger | 部署触发,通过消息触发部署流程 |
| Test Result | 测试结果,发布测试结果事件 |
| Audit Trail | 审计追踪,记录所有 CI/CD 操作 |
Jenkins 集成
Jenkins 插件配置
groovy
// Jenkinsfile
pipeline {
agent any
environment {
RABBITMQ_HOST = credentials('rabbitmq-host')
RABBITMQ_USER = credentials('rabbitmq-user')
RABBITMQ_PASSWORD = credentials('rabbitmq-password')
}
stages {
stage('Checkout') {
steps {
checkout scm
script {
sendRabbitMQEvent('build.started', [
buildId: env.BUILD_ID,
jobName: env.JOB_NAME,
branch: env.GIT_BRANCH,
commit: env.GIT_COMMIT,
timestamp: System.currentTimeMillis()
])
}
}
}
stage('Build') {
steps {
sh 'make build'
}
post {
success {
script {
sendRabbitMQEvent('build.completed', [
buildId: env.BUILD_ID,
status: 'success',
duration: currentBuild.duration
])
}
}
failure {
script {
sendRabbitMQEvent('build.failed', [
buildId: env.BUILD_ID,
status: 'failed',
duration: currentBuild.duration
])
}
}
}
}
stage('Test') {
steps {
sh 'make test'
}
post {
always {
junit '**/test-results/*.xml'
script {
sendRabbitMQEvent('test.completed', [
buildId: env.BUILD_ID,
testResults: 'junit'
])
}
}
}
}
stage('Deploy') {
when {
branch 'main'
}
steps {
sh 'make deploy'
script {
sendRabbitMQEvent('deploy.started', [
buildId: env.BUILD_ID,
environment: 'production'
])
}
}
post {
success {
script {
sendRabbitMQEvent('deploy.completed', [
buildId: env.BUILD_ID,
status: 'success'
])
}
}
}
}
}
post {
always {
script {
sendRabbitMQEvent('pipeline.completed', [
buildId: env.BUILD_ID,
jobName: env.JOB_NAME,
status: currentBuild.result ?: 'SUCCESS',
duration: currentBuild.duration
])
}
}
}
}
def sendRabbitMQEvent(String eventType, Map data) {
def payload = [
eventType: eventType,
data: data,
timestamp: System.currentTimeMillis(),
source: 'jenkins'
]
sh """
curl -u ${RABBITMQ_USER}:${RABBITMQ_PASSWORD} \
-X POST \
-H "Content-Type: application/json" \
-d '${groovy.json.JsonOutput.toJson(payload)}' \
http://${RABBITMQ_HOST}:15672/api/exchanges/%2f/ci.events/publish \
--data-raw '{"properties":{},"routing_key":"${eventType}","payload":"${groovy.json.JsonOutput.toJson(payload)}","payload_encoding":"string"}'
"""
}Jenkins 共享库
groovy
// vars/rabbitMQNotify.groovy
def call(String eventType, Map data) {
def config = [
host: env.RABBITMQ_HOST ?: 'localhost',
port: env.RABBITMQ_PORT ?: '5672',
user: env.RABBITMQ_USER ?: 'guest',
password: env.RABBITMQ_PASSWORD ?: 'guest',
exchange: env.RABBITMQ_EXCHANGE ?: 'ci.events'
]
def payload = [
eventType: eventType,
data: data,
timestamp: System.currentTimeMillis(),
build: [
id: env.BUILD_ID,
number: env.BUILD_NUMBER,
url: env.BUILD_URL,
jobName: env.JOB_NAME
],
git: [
branch: env.GIT_BRANCH,
commit: env.GIT_COMMIT,
url: env.GIT_URL
]
]
try {
sh """
python3 -c "
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='${config.host}',
port=${config.port},
credentials=pika.PlainCredentials('${config.user}', '${config.password}')
)
)
channel = connection.channel()
channel.exchange_declare(exchange='${config.exchange}', exchange_type='topic', durable=True)
channel.basic_publish(
exchange='${config.exchange}',
routing_key='${eventType}',
body=json.dumps(${groovy.json.JsonOutput.toJson(payload)}),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
connection.close()
"
"""
echo "RabbitMQ event sent: ${eventType}"
} catch (Exception e) {
echo "Failed to send RabbitMQ event: ${e.message}"
}
}GitLab CI 集成
GitLab CI 配置
yaml
# .gitlab-ci.yml
stages:
- build
- test
- deploy
variables:
RABBITMQ_HOST: "localhost"
RABBITMQ_PORT: "5672"
RABBITMQ_USER: "guest"
RABBITMQ_PASSWORD: "guest"
RABBITMQ_EXCHANGE: "ci.events"
.before_script: &rabbitmq_setup
- |
send_rabbitmq_event() {
local event_type=$1
local data=$2
local payload=$(cat <<EOF
{
"eventType": "$event_type",
"data": $data,
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"source": "gitlab-ci",
"build": {
"id": "$CI_PIPELINE_ID",
"job": "$CI_JOB_NAME",
"stage": "$CI_JOB_STAGE",
"url": "$CI_PIPELINE_URL"
},
"git": {
"branch": "$CI_COMMIT_REF_NAME",
"commit": "$CI_COMMIT_SHA",
"message": "$CI_COMMIT_MESSAGE",
"author": "$CI_COMMIT_AUTHOR"
}
}
EOF
)
python3 -c "
import pika
import json
import os
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.environ.get('RABBITMQ_HOST', 'localhost'),
port=int(os.environ.get('RABBITMQ_PORT', '5672')),
credentials=pika.PlainCredentials(
os.environ.get('RABBITMQ_USER', 'guest'),
os.environ.get('RABBITMQ_PASSWORD', 'guest')
)
)
)
channel = connection.channel()
channel.exchange_declare(exchange='${RABBITMQ_EXCHANGE}', exchange_type='topic', durable=True)
channel.basic_publish(
exchange='${RABBITMQ_EXCHANGE}',
routing_key='$event_type',
body='''$payload''',
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json'
)
)
connection.close()
"
}
build:
stage: build
before_script:
- *rabbitmq_setup
script:
- send_rabbitmq_event 'build.started' '{"status": "running"}'
- make build
- send_rabbitmq_event 'build.completed' '{"status": "success"}'
after_script:
- |
if [ "$CI_JOB_STATUS" = "failed" ]; then
send_rabbitmq_event 'build.failed' '{"status": "failed", "error": "Build failed"}'
fi
test:
stage: test
before_script:
- *rabbitmq_setup
script:
- send_rabbitmq_event 'test.started' '{"status": "running"}'
- make test
- send_rabbitmq_event 'test.completed' '{"status": "success"}'
artifacts:
reports:
junit: test-results/*.xml
deploy:
stage: deploy
before_script:
- *rabbitmq_setup
script:
- send_rabbitmq_event 'deploy.started' '{"environment": "production", "status": "running"}'
- make deploy
- send_rabbitmq_event 'deploy.completed' '{"environment": "production", "status": "success"}'
environment:
name: production
url: https://example.com
only:
- mainGitHub Actions 集成
GitHub Actions 工作流
yaml
# .github/workflows/ci.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
RABBITMQ_HOST: localhost
RABBITMQ_PORT: 5672
RABBITMQ_EXCHANGE: ci.events
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install pika
- name: Notify build started
env:
RABBITMQ_USER: ${{ secrets.RABBITMQ_USER }}
RABBITMQ_PASSWORD: ${{ secrets.RABBITMQ_PASSWORD }}
run: |
python3 -c "
import pika
import json
import os
payload = {
'eventType': 'build.started',
'data': {'status': 'running'},
'timestamp': '$(date -u +%Y-%m-%dT%H:%M:%SZ)',
'source': 'github-actions',
'build': {
'id': '${{ github.run_id }}',
'number': '${{ github.run_number }}',
'url': '${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}'
},
'git': {
'branch': '${{ github.ref_name }}',
'commit': '${{ github.sha }}',
'message': '${{ github.event.head_commit.message }}',
'author': '${{ github.actor }}'
}
}
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.environ.get('RABBITMQ_HOST', 'localhost'),
port=int(os.environ.get('RABBITMQ_PORT', '5672')),
credentials=pika.PlainCredentials(
os.environ.get('RABBITMQ_USER', 'guest'),
os.environ.get('RABBITMQ_PASSWORD', 'guest')
)
)
)
channel = connection.channel()
channel.exchange_declare(exchange='${{ env.RABBITMQ_EXCHANGE }}', exchange_type='topic', durable=True)
channel.basic_publish(
exchange='${{ env.RABBITMQ_EXCHANGE }}',
routing_key='build.started',
body=json.dumps(payload),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
)
connection.close()
"
- name: Build
run: make build
- name: Notify build completed
if: success()
env:
RABBITMQ_USER: ${{ secrets.RABBITMQ_USER }}
RABBITMQ_PASSWORD: ${{ secrets.RABBITMQ_PASSWORD }}
run: |
python3 << 'EOF'
import pika
import json
import os
payload = {
'eventType': 'build.completed',
'data': {'status': 'success'},
'timestamp': '$(date -u +%Y-%m-%dT%H:%M:%SZ)',
'source': 'github-actions',
'build': {
'id': '${{ github.run_id }}',
'number': '${{ github.run_number }}'
}
}
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.environ.get('RABBITMQ_HOST', 'localhost'),
credentials=pika.PlainCredentials(
os.environ.get('RABBITMQ_USER', 'guest'),
os.environ.get('RABBITMQ_PASSWORD', 'guest')
)
)
)
channel = connection.channel()
channel.exchange_declare(exchange='${{ env.RABBITMQ_EXCHANGE }}', exchange_type='topic', durable=True)
channel.basic_publish(
exchange='${{ env.RABBITMQ_EXCHANGE }}',
routing_key='build.completed',
body=json.dumps(payload),
properties=pika.BasicProperties(delivery_mode=2)
)
connection.close()
EOF
test:
needs: build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run tests
run: make test
- name: Publish test results
uses: dorny/test-reporter@v1
if: always()
with:
name: Test Results
path: test-results/*.xml
reporter: java-junit
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Deploy
run: make deployGitHub Actions 复合动作
yaml
# .github/actions/rabbitmq-notify/action.yml
name: 'RabbitMQ Notify'
description: 'Send event notification to RabbitMQ'
inputs:
event-type:
description: 'Event type (routing key)'
required: true
data:
description: 'Event data as JSON string'
required: false
default: '{}'
rabbitmq-host:
description: 'RabbitMQ host'
required: false
default: 'localhost'
rabbitmq-port:
description: 'RabbitMQ port'
required: false
default: '5672'
rabbitmq-user:
description: 'RabbitMQ username'
required: false
default: 'guest'
rabbitmq-password:
description: 'RabbitMQ password'
required: false
default: 'guest'
rabbitmq-exchange:
description: 'RabbitMQ exchange'
required: false
default: 'ci.events'
runs:
using: 'composite'
steps:
- name: Send notification
shell: bash
run: |
python3 << 'EOF'
import pika
import json
import os
from datetime import datetime
payload = {
'eventType': '${{ inputs.event-type }}',
'data': json.loads('${{ inputs.data }}'),
'timestamp': datetime.utcnow().isoformat() + 'Z',
'source': 'github-actions',
'build': {
'id': '${{ github.run_id }}',
'number': '${{ github.run_number }}',
'url': '${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}'
},
'git': {
'branch': '${{ github.ref_name }}',
'commit': '${{ github.sha }}',
'actor': '${{ github.actor }}'
}
}
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='${{ inputs.rabbitmq-host }}',
port=int('${{ inputs.rabbitmq-port }}'),
credentials=pika.PlainCredentials(
'${{ inputs.rabbitmq-user }}',
'${{ inputs.rabbitmq-password }}'
)
)
)
channel = connection.channel()
channel.exchange_declare(exchange='${{ inputs.rabbitmq-exchange }}', exchange_type='topic', durable=True)
channel.basic_publish(
exchange='${{ inputs.rabbitmq-exchange }}',
routing_key='${{ inputs.event-type }}',
body=json.dumps(payload),
properties=pika.BasicProperties(delivery_mode=2, content_type='application/json')
)
connection.close()
print(f"Event sent: ${{ inputs.event-type }}")
EOFPHP 事件消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class CIEventConsumer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
'ci.events',
'topic',
false,
true,
false
);
$this->channel->queue_declare(
'ci.notifications',
false,
true,
false,
false
);
$this->channel->queue_bind(
'ci.notifications',
'ci.events',
'#'
);
}
public function consume(): void
{
$callback = function ($message) {
$event = json_decode($message->getBody(), true);
$this->handleEvent($event);
$message->ack();
};
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
'ci.notifications',
'',
false,
false,
false,
false,
$callback
);
echo "Waiting for CI/CD events...\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function handleEvent(array $event): void
{
$eventType = $event['eventType'] ?? 'unknown';
$source = $event['source'] ?? 'unknown';
echo "Received event: {$eventType} from {$source}\n";
switch ($eventType) {
case 'build.started':
$this->handleBuildStarted($event);
break;
case 'build.completed':
$this->handleBuildCompleted($event);
break;
case 'build.failed':
$this->handleBuildFailed($event);
break;
case 'deploy.started':
$this->handleDeployStarted($event);
break;
case 'deploy.completed':
$this->handleDeployCompleted($event);
break;
case 'test.completed':
$this->handleTestCompleted($event);
break;
default:
$this->handleUnknownEvent($event);
}
}
private function handleBuildStarted(array $event): void
{
$build = $event['build'] ?? [];
$git = $event['git'] ?? [];
echo "Build started:\n";
echo " Job: " . ($build['jobName'] ?? $build['job'] ?? 'N/A') . "\n";
echo " Branch: " . ($git['branch'] ?? 'N/A') . "\n";
echo " Commit: " . ($git['commit'] ?? 'N/A') . "\n";
$this->sendNotification('build', 'started', $event);
}
private function handleBuildCompleted(array $event): void
{
$data = $event['data'] ?? [];
echo "Build completed: " . ($data['status'] ?? 'unknown') . "\n";
echo " Duration: " . ($data['duration'] ?? 'N/A') . "ms\n";
$this->sendNotification('build', 'completed', $event);
}
private function handleBuildFailed(array $event): void
{
$data = $event['data'] ?? [];
echo "Build FAILED!\n";
echo " Error: " . ($data['error'] ?? 'Unknown error') . "\n";
$this->sendNotification('build', 'failed', $event);
$this->sendAlert('build_failed', $event);
}
private function handleDeployStarted(array $event): void
{
$data = $event['data'] ?? [];
echo "Deploy started to: " . ($data['environment'] ?? 'unknown') . "\n";
$this->sendNotification('deploy', 'started', $event);
}
private function handleDeployCompleted(array $event): void
{
$data = $event['data'] ?? [];
echo "Deploy completed to: " . ($data['environment'] ?? 'unknown') . "\n";
echo " Status: " . ($data['status'] ?? 'unknown') . "\n";
$this->sendNotification('deploy', 'completed', $event);
}
private function handleTestCompleted(array $event): void
{
echo "Tests completed\n";
$this->sendNotification('test', 'completed', $event);
}
private function handleUnknownEvent(array $event): void
{
echo "Unknown event type: " . ($event['eventType'] ?? 'N/A') . "\n";
}
private function sendNotification(string $category, string $action, array $event): void
{
// 发送到通知服务
}
private function sendAlert(string $type, array $event): void
{
// 发送告警
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
// 使用示例
$consumer = new CIEventConsumer([
'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
'port' => getenv('RABBITMQ_PORT') ?: 5672,
'user' => getenv('RABBITMQ_USER') ?: 'guest',
'password' => getenv('RABBITMQ_PASSWORD') ?: 'guest',
]);
$consumer->consume();
$consumer->close();实际应用场景
场景一:部署审批流程
php
<?php
class DeploymentApproval
{
private $rabbitChannel;
public function __construct(AMQPStreamConnection $rabbit)
{
$this->rabbitChannel = $rabbit->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->rabbitChannel->exchange_declare(
'deploy.approval',
'direct',
false,
true,
false
);
$this->rabbitChannel->queue_declare(
'deploy.approval.requests',
false,
true,
false,
false
);
$this->rabbitChannel->queue_declare(
'deploy.approval.responses',
false,
true,
false,
false
);
}
public function requestApproval(array $deployment): string
{
$approvalId = uniqid('approval-', true);
$request = [
'approvalId' => $approvalId,
'deployment' => $deployment,
'status' => 'pending',
'requestedAt' => date('c'),
'requestedBy' => $deployment['requestedBy'] ?? 'system',
];
$message = new AMQPMessage(
json_encode($request),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $approvalId,
'reply_to' => 'deploy.approval.responses',
]
);
$this->rabbitChannel->basic_publish(
$message,
'deploy.approval',
'approval.request'
);
return $approvalId;
}
public function waitForApproval(string $approvalId, int $timeout = 300): ?bool
{
$startTime = time();
while (time() - $startTime < $timeout) {
$message = $this->rabbitChannel->basic_get('deploy.approval.responses');
if ($message) {
$response = json_decode($message->getBody(), true);
if ($response['approvalId'] === $approvalId) {
$message->ack();
return $response['approved'] === true;
}
}
sleep(1);
}
return null;
}
public function approve(string $approvalId, string $approvedBy, string $comment = ''): void
{
$response = [
'approvalId' => $approvalId,
'approved' => true,
'approvedBy' => $approvedBy,
'comment' => $comment,
'approvedAt' => date('c'),
];
$message = new AMQPMessage(
json_encode($response),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'deploy.approval',
'approval.response'
);
}
public function reject(string $approvalId, string $rejectedBy, string $reason = ''): void
{
$response = [
'approvalId' => $approvalId,
'approved' => false,
'rejectedBy' => $rejectedBy,
'reason' => $reason,
'rejectedAt' => date('c'),
];
$message = new AMQPMessage(
json_encode($response),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'deploy.approval',
'approval.response'
);
}
}场景二:多环境部署协调
php
<?php
class MultiEnvironmentDeployer
{
private $rabbitChannel;
public function __construct(AMQPStreamConnection $rabbit)
{
$this->rabbitChannel = $rabbit->channel();
}
public function deployToAllEnvironments(array $deployment): void
{
$environments = ['staging', 'uat', 'production'];
foreach ($environments as $env) {
$this->deployToEnvironment($deployment, $env);
}
}
private function deployToEnvironment(array $deployment, string $environment): void
{
$event = [
'deploymentId' => $deployment['id'],
'environment' => $environment,
'version' => $deployment['version'],
'status' => 'queued',
'queuedAt' => date('c'),
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->rabbitChannel->basic_publish(
$message,
'deploy.queue',
"deploy.{$environment}"
);
}
}常见问题与解决方案
问题一:消息丢失
症状: CI/CD 事件未送达
解决方案: 使用持久化和确认机制
php
$channel->queue_declare('ci.events', false, true, false, false);
$message = new AMQPMessage(
json_encode($event),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);问题二:事件顺序
症状: 部署事件顺序混乱
解决方案: 使用消息优先级或时间戳排序
php
$message = new AMQPMessage(
json_encode($event),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $priority,
'timestamp' => time(),
]
);问题三:重试机制
症状: 通知发送失败
解决方案: 实现重试和死信队列
php
$channel->queue_declare('ci.notifications', false, true, false, false, false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => 'ci.dlx',
'x-dead-letter-routing-key' => 'failed',
])
);最佳实践建议
1. 事件格式标准化
php
interface CIEventInterface
{
public function getEventType(): string;
public function getTimestamp(): string;
public function getSource(): string;
public function getData(): array;
public function getMetadata(): array;
}2. 错误处理
php
class CIEventErrorHandler
{
public static function handle(Exception $e, array $event): void
{
error_log(sprintf(
"[%s] CI Event Error: %s\nEvent: %s",
date('c'),
$e->getMessage(),
json_encode($event)
));
}
}3. 监控指标
php
class CIMetrics
{
public static function recordBuildTime(string $job, float $duration): void
{
// 记录构建时间
}
public static function recordDeployCount(string $environment): void
{
// 记录部署次数
}
}版本兼容性
| Jenkins | GitLab CI | GitHub Actions | RabbitMQ |
|---|---|---|---|
| 2.400+ | 16.x | Latest | 3.11+ |
| 2.300+ | 15.x | Latest | 3.10+ |
| 2.200+ | 14.x | Latest | 3.9+ |
