Skip to content

RabbitMQ 与 Spark Streaming 集成

概述

Apache Spark 是一个强大的分布式数据处理框架,Spark Streaming 是其流处理组件,能够实时处理大规模数据流。将 RabbitMQ 与 Spark Streaming 集成,可以实现实时数据处理、机器学习流水线、日志分析等场景。

本教程将详细介绍 RabbitMQ 与 Spark Streaming 的集成方案,帮助开发者构建实时数据处理管道。

集成架构设计

架构图

┌─────────────────────────────────────────────────────────────────────┐
│                 Spark Streaming + RabbitMQ 集成架构                   │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    数据源层                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │   应用日志   │    │   业务数据   │    │   IoT 数据   │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    RabbitMQ                                  │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  Input      │    │  Output     │    │  Dead       │     │    │
│  │  │  Queue      │    │  Queue      │    │  Letter     │     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                 Spark Streaming                              │    │
│  │  ┌─────────────────────────────────────────────────────┐    │    │
│  │  │              Spark DStream Processing                │    │    │
│  │  │  • Input DStream (RabbitMQ)                         │    │    │
│  │  │  • Transformation (map, filter, reduce)             │    │    │
│  │  │  • Output DStream (RabbitMQ, HDFS, DB)             │    │    │
│  │  └─────────────────────────────────────────────────────┘    │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                       │
│                              ▼                                       │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    存储层                                    │    │
│  │  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │    │
│  │  │  HDFS/S3   │    │   Database   │    │  Elasticsearch│     │    │
│  │  └─────────────┘    └─────────────┘    └─────────────┘     │    │
│  └─────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────┘

集成模式

模式说明
Direct Integration直接集成,Spark 直接消费 RabbitMQ 队列
Receiver-based Integration基于接收器的集成,使用接收器模式
Structured Streaming结构化流,更新一代 API
Continuous Processing连续处理,微批处理模式

Spark Streaming 配置

Maven 依赖 (Scala/Java)

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>com.example</groupId>
    <artifactId>spark-rabbitmq-integration</artifactId>
    <version>1.0.0</version>
    
    <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.5.0</spark.version>
    </properties>
    
    <dependencies>
        <!-- Spark Streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <!-- Spark Streaming RabbitMQ -->
        <dependency>
            <groupId>com.github.ptgoetz</groupId>
            <artifactId>spark-streaming-rabbitmq_${scala.binary.version}</artifactId>
            <version>1.1.0</version>
        </dependency>
        
        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <!-- JSON -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.15.2</version>
        </dependency>
    </dependencies>
</project>

Spark 配置

scala
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils

object SparkRabbitMQConfig {
  
  def createSparkConf(appName: String = "RabbitMQ-Spark-Integration"): SparkConf = {
    new SparkConf()
      .setAppName(appName)
      .setMaster("local[2]")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
      .set("spark.streaming.backpressure.enabled", "true")
      .set("spark.streaming.backpressure.initialRate", "10000")
      .set("spark.rabbitmq.host", "localhost")
      .set("spark.rabbitmq.port", "5672")
      .set("spark.rabbitmq.username", "guest")
      .set("spark.rabbitmq.password", "guest")
      .set("spark.rabbitmq.vhost", "/")
      .set("spark.rabbitmq.prefetch.count", "100")
  }
  
  def createStreamingContext(
    sparkConf: SparkConf,
    batchDuration: Int = 5
  ): StreamingContext = {
    new StreamingContext(sparkConf, Seconds(batchDuration))
  }
}

Scala 代码示例

基础消费者

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream

object RabbitMQConsumer {
  
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("RabbitMQ Consumer")
      .setMaster("local[2]")
    
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    
    val rabbitMQParams = Map(
      "host" -> "localhost",
      "port" -> "5672",
      "username" -> "guest",
      "password" -> "guest",
      "vhost" -> "/",
      "queueName" -> "spark.input",
      "routingKeys" -> "event.#",
      "prefetchCount" -> "100",
      "durable" -> "true",
      "autoDelete" -> "false",
      "automaticRecoveryEnabled" -> "true"
    )
    
    val stream = RabbitMQUtils.createStream(ssc, rabbitMQParams)
    
    stream.print()
    
    stream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val count = rdd.count()
        println(s"Received $count messages")
      }
    }
    
    ssc.start()
    ssc.awaitTermination()
  }
}

数据处理管道

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object DataProcessingPipeline {
  
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("Data Processing Pipeline")
      .setMaster("local[2]")
    
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    val spark = SparkSession.builder().getOrCreate()
    
    import spark.implicits._
    
    val rabbitMQParams = Map(
      "host" -> "localhost",
      "port" -> "5672",
      "username" -> "guest",
      "password" -> "guest",
      "queueName" -> "orders.input",
      "routingKeys" -> "order.created",
      "prefetchCount" -> "100"
    )
    
    val stream = RabbitMQUtils.createStream(ssc, rabbitMQParams)
    
    val orderStream = stream
      .map(record => new String(record.getBody))
      .filter(json => json != null && json.nonEmpty)
      .map(json => parseOrder(json))
    
    orderStream.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val ordersDF = rdd.toDF()
        
        ordersDF.show()
        
        ordersDF.write
          .mode("append")
          .format("parquet")
          .save("/tmp/orders.parquet")
        
        val count = ordersDF.count()
        println(s"Processed $count orders")
      }
    }
    
    def parseOrder(json: String): Order = {
      val parser = new org.json4s.native.JsonMethods
      import parser._
      implicit val formats = org.json4s.DefaultFormats
      
      parse(json).extract[Order]
    }
    
    ssc.start()
    ssc.awaitTermination()
  }
}

case class Order(
  orderId: String,
  customerId: String,
  amount: Double,
  status: String,
  createdAt: String,
  items: List[OrderItem]
)

case class OrderItem(
  productId: String,
  quantity: Int,
  price: Double
)

窗口处理

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils

object WindowProcessing {
  
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("Window Processing")
      .setMaster("local[2]")
    
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    
    val rabbitMQParams = Map(
      "host" -> "localhost",
      "port" -> "5672",
      "queueName" -> "events.input",
      "routingKeys" -> "event.#"
    )
    
    val events = RabbitMQUtils.createStream(ssc, rabbitMQParams)
    
    val parsedEvents = events
      .map(record => new String(record.getBody))
      .map(json => parseEvent(json))
    
    val windowed = parsedEvents
      .window(Seconds(60), Seconds(30))
    
    val countByType = windowed
      .map(event => (event.eventType, 1))
      .reduceByKey(_ + _)
    
    countByType.print()
    
    val revenueByWindow = parsedEvents
      .filter(_.eventType == "order.completed")
      .map(_.payload)
      .map(json => (json \ "amount").extract[Double])
      .window(Seconds(60), Seconds(30))
      .reduce(_ + _)
    
    revenueByWindow.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
  
  def parseEvent(json: String): Event = {
    import org.json4s._
    import org.json4s.native.JsonMethods._
    implicit val formats = DefaultFormats
    parse(json).extract[Event]
  }
}

case class Event(
  eventId: String,
  eventType: String,
  timestamp: String,
  payload: org.json4s.JValue
)

状态管理

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, State, StateSpec}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils

object StatefulProcessing {
  
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("Stateful Processing")
      .setMaster("local[2]")
    
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.checkpoint("/tmp/spark-checkpoint")
    
    val rabbitMQParams = Map(
      "host" -> "localhost",
      "port" -> "5672",
      "queueName" -> "orders.input",
      "routingKeys" -> "order.#"
    )
    
    val orders = RabbitMQUtils.createStream(ssc, rabbitMQParams)
    
    val orderEvents = orders
      .map(record => new String(record.getBody))
      .map(json => parseOrderEvent(json))
    
    val orderStateSpec = StateSpec.function(
      (customerId: String, order: Option[OrderSummary], state: State[OrderSummary]) => {
        val currentTotal = order.map(_.totalAmount).getOrElse(0.0)
        val newTotal = order.map(_.totalAmount + currentTotal).getOrElse(0.0)
        
        val newState = OrderSummary(customerId, newTotal, System.currentTimeMillis())
        state.update(newState)
        newState
      }
    )
    
    val customerTotals = orderEvents
      .map(order => (order.customerId, order.amount))
      .reduceByKey(_ + _)
      .mapWithState(orderStateSpec)
    
    customerTotals.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}

case class OrderSummary(
  customerId: String,
  totalAmount: Double,
  lastUpdate: Long
)

case class OrderEvent(
  orderId: String,
  customerId: String,
  amount: Double,
  status: String
)

PHP 与 Spark Streaming 集成

虽然 Spark Streaming 主要是 Scala/Java 生态,但 PHP 可以通过以下方式与之交互:

PHP 消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SparkStreamingProducer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
        $this->setupQueue();
    }
    
    private function setupQueue(): void
    {
        $this->channel->exchange_declare(
            'spark.input',
            'topic',
            false,
            true,
            false
        );
        
        $this->channel->queue_declare(
            'spark.input.orders',
            false,
            true,
            false,
            false
        );
        
        $this->channel->queue_bind(
            'spark.input.orders',
            'spark.input',
            'order.#'
        );
    }
    
    public function sendOrderEvent(array $orderData): bool
    {
        $event = [
            'eventId' => uniqid('evt-', true),
            'eventType' => 'order.created',
            'timestamp' => date('c'),
            'payload' => $orderData,
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $event['eventId'],
            ]
        );
        
        $this->channel->basic_publish(
            $message,
            'spark.input',
            'order.created'
        );
        
        return true;
    }
    
    public function sendBatchEvents(array $events): bool
    {
        foreach ($events as $event) {
            $this->sendOrderEvent($event);
        }
        
        return true;
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// 使用示例
$producer = new SparkStreamingProducer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$producer->sendOrderEvent([
    'orderId' => 'ORD-' . time(),
    'customerId' => 'CUST-001',
    'amount' => 1500.00,
    'status' => 'PENDING',
    'createdAt' => date('c'),
]);

$producer->sendBatchEvents([
    ['orderId' => 'ORD-001', 'customerId' => 'CUST-001', 'amount' => 100],
    ['orderId' => 'ORD-002', 'customerId' => 'CUST-002', 'amount' => 200],
    ['orderId' => 'ORD-003', 'customerId' => 'CUST-001', 'amount' => 150],
]);

$producer->close();

PHP 结果消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SparkResultConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(array $config)
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function consumeProcessedResults(string $queue, callable $processor): void
    {
        $this->channel->basic_qos(0, 10, false);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($processor) {
                $result = json_decode($message->getBody(), true);
                $processor($result);
                $message->ack();
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

// 使用示例
$consumer = new SparkResultConsumer([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest',
]);

$consumer->consumeProcessedResults('spark.output.results', function (array $result) {
    echo "收到 Spark 处理结果:\n";
    print_r($result);
});

$consumer->close();

实际应用场景

场景一:实时订单处理

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils
import org.apache.spark.sql.SparkSession

object RealTimeOrderProcessing {
  
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Real Time Order Processing")
      .getOrCreate()
    
    import spark.implicits._
    
    val ssc = new StreamingContext(spark.sparkContext, Seconds(5))
    
    val orderParams = Map(
      "host" -> "localhost",
      "port" -> "5672",
      "queueName" -> "orders.realtime",
      "routingKeys" -> "order.created,order.updated"
    )
    
    val ordersStream = RabbitMQUtils.createStream(ssc, orderParams)
    
    val processedOrders = ordersStream
      .map(record => new String(record.getBody))
      .map(json => parseOrder(json))
      .filter(order => order.amount > 0)
      .map(order => enrichOrder(order))
    
    processedOrders.foreachRDD { rdd =>
      if (!rdd.isEmpty()) {
        val ordersDF = rdd.toDF()
        
        ordersDF.createOrReplaceTempView("orders")
        
        spark.sql("""
          SELECT 
            customerId,
            COUNT(*) as orderCount,
            SUM(amount) as totalAmount,
            AVG(amount) as avgAmount
          FROM orders
          GROUP BY customerId
        """).show()
        
        ordersDF.write
          .mode("append")
          .partitionBy("date")
          .parquet("/data/orders")
      }
    }
    
    ssc.start()
    ssc.awaitTermination()
  }
  
  def parseOrder(json: String): Order = {
    import org.json4s._
    import org.json4s.native.JsonMethods._
    implicit val formats = DefaultFormats
    parse(json).extract[Order]
  }
  
  def enrichOrder(order: Order): EnrichedOrder = {
    EnrichedOrder(
      order.orderId,
      order.customerId,
      order.amount,
      order.status,
      order.createdAt,
      order.amount * 1.1,
      java.time.LocalDate.now.toString
    )
  }
}

场景二:实时分析仪表板

scala
package com.example.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.rabbitmq.RabbitMQUtils

object RealTimeAnalytics {
  
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName("Real Time Analytics")
      .setMaster("local[2]")
    
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    
    val eventsStream = RabbitMQUtils.createStream(ssc, Map(
      "host" -> "localhost",
      "port" -> "5672",
      "queueName" -> "events.stream",
      "routingKeys" -> "event.#"
    ))
    
    val parsedEvents = eventsStream
      .map(record => new String(record.getBody))
      .map(json => parseEvent(json))
    
    parsedEvents
      .map(event => (event.eventType, 1))
      .reduceByKeyAndWindow(
        (a: Int, b: Int) => a + b,
        Seconds(300),
        Seconds(30)
      )
      .foreachRDD { (rdd, time) =>
        val count = rdd.count()
        println(s"[$time] Total events: $count")
        rdd.foreach(println)
      }
    
    parsedEvents
      .filter(_.eventType == "order.completed")
      .map(event => (event.payload \ "customerId", event.payload \ "amount"))
      .reduceByKeyAndWindow(
        (a: Double, b: Double) => a + b,
        Seconds(60),
        Seconds(10)
      )
      .foreachRDD { rdd =>
        val total = rdd.values.sum
        println(s"Total revenue in window: $total")
      }
    
    ssc.start()
    ssc.awaitTermination()
  }
}

常见问题与解决方案

问题一:背压处理

症状: 处理速度跟不上消息到达速度

解决方案: 启用背压和调整批处理大小

scala
val sparkConf = new SparkConf()
  .set("spark.streaming.backpressure.enabled", "true")
  .set("spark.streaming.backpressure.initialRate", "10000")
  .set("spark.streaming.kafka.maxRatePerPartition", "1000")

问题二:检查点管理

症状: 状态丢失或恢复失败

解决方案: 配置适当的检查点

scala
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("hdfs://localhost:9000/checkpoint")

问题三:并行度

症状: 处理效率低

解决方案: 调整分区数

scala
val stream = RabbitMQUtils.createStream(ssc, rabbitMQParams, 10)

最佳实践建议

1. 性能优化

scala
val stream = RabbitMQUtils.createStream(ssc, params)
  .repartition(20)
  .persist(StorageLevel.MEMORY_AND_DISK)

2. 错误处理

scala
stream.foreachRDD { (rdd, time) =>
  rdd.collect().foreach { record =>
    try {
      process(record)
    } catch {
      case e: Exception => handleError(record, e)
    }
  }
}

3. 资源管理

scala
val sparkConf = new SparkConf()
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "2")
  .set("spark.streaming.concurrentJobs", "4")

版本兼容性

SparkScalaRabbitMQJDK
3.5.x2.123.x11/17
3.4.x2.123.x11/17
3.3.x2.123.x11
3.2.x2.123.x11
3.1.x2.123.x8/11

相关链接