Appearance
RabbitMQ 与 Apache Flink 集成
概述
Apache Flink 是一个分布式流处理框架,具有高吞吐量、低延迟、精确一次语义等特点。将 RabbitMQ 与 Flink 集成,可以构建强大的实时数据处理管道,支持复杂事件处理、状态管理、窗口计算等功能。
本教程将详细介绍 RabbitMQ 与 Apache Flink 的集成方案,帮助开发者构建实时流处理应用。
集成架构设计
架构图
┌─────────────────────────────────────────────────────────────────────┐
│ Flink + RabbitMQ 集成架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 数据源层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 应用事件 │ │ IoT 数据 │ │ 日志流 │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ RabbitMQ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Source │ │ Sink │ │ Error │ │ │
│ │ │ Queue │ │ Queue │ │ Queue │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Apache Flink Cluster │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Flink Job Manager │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Flink Task Managers │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │ Task 1 │ │ Task 2 │ │ Task 3 │ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 存储层 │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ RabbitMQ │ │ Kafka │ │ Database │ │ │
│ │ │ (Sink) │ │ (Sink) │ │ (Sink) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘集成模式
| 模式 | 说明 |
|---|---|
| RabbitMQ Source | RabbitMQ 作为 Flink 数据源 |
| RabbitMQ Sink | RabbitMQ 作为 Flink 数据目的地 |
| Stateful Processing | 有状态流处理 |
| Window Aggregation | 窗口聚合计算 |
| CEP | 复杂事件处理 |
配置示例
Maven 依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-rabbitmq-integration</artifactId>
<version>1.0.0</version>
<properties>
<flink.version>1.18.0</flink.version>
<java.version>11</java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Flink Core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Streaming -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink RabbitMQ Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink State Backend -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.11</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>Flink 配置
yaml
# flink-conf.yaml
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2g
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink/checkpoints
state.savepoints.dir: file:///tmp/flink/savepoints
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10000Java 代码示例
RabbitMQ Source 示例
java
package com.example.flink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class RabbitMQSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.setAutomaticRecovery(true)
.setConnectionTimeout(10000)
.setRequestedHeartbeat(60)
.build();
DataStream<String> stream = env.addSource(
new RMQSource<>(
rmqConfig,
"flink.input",
true,
new SimpleStringSchema()
)
).name("RabbitMQ Source");
stream
.map(new MapFunction<String, OrderEvent>() {
@Override
public OrderEvent map(String value) throws Exception {
return parseOrderEvent(value);
}
})
.filter(event -> event.getAmount() > 0)
.map(event -> {
event.setProcessed(true);
return event;
})
.print();
env.execute("RabbitMQ Source Example");
}
private static OrderEvent parseOrderEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
return new OrderEvent();
}
}
}RabbitMQ Sink 示例
java
package com.example.flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class RabbitMQSinkExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<String> stream = env.fromElements(
"{\"orderId\":\"ORD-001\",\"amount\":100.0}",
"{\"orderId\":\"ORD-002\",\"amount\":200.0}",
"{\"orderId\":\"ORD-003\",\"amount\":300.0}"
);
stream.addSink(
new RMQSink<>(
rmqConfig,
"flink.output",
new SimpleStringSchema()
)
).name("RabbitMQ Sink");
env.execute("RabbitMQ Sink Example");
}
}窗口聚合示例
java
package com.example.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import java.time.Duration;
public class WindowAggregationExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<OrderEvent> orders = env
.addSource(new RMQSource<>(
rmqConfig,
"orders.input",
true,
new SimpleStringSchema()
))
.name("RabbitMQ Source")
.map(json -> parseOrderEvent(json))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
DataStream<OrderSummary> summary = orders
.keyBy(OrderEvent::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new OrderAggregator());
summary.print();
env.execute("Window Aggregation Example");
}
public static class OrderAggregator implements AggregateFunction<
OrderEvent, OrderAccumulator, OrderSummary> {
@Override
public OrderAccumulator createAccumulator() {
return new OrderAccumulator();
}
@Override
public OrderAccumulator add(OrderEvent event, OrderAccumulator acc) {
acc.setCustomerId(event.getCustomerId());
acc.setCount(acc.getCount() + 1);
acc.setTotalAmount(acc.getTotalAmount() + event.getAmount());
return acc;
}
@Override
public OrderSummary getResult(OrderAccumulator acc) {
return new OrderSummary(
acc.getCustomerId(),
acc.getCount(),
acc.getTotalAmount()
);
}
@Override
public OrderAccumulator merge(OrderAccumulator a, OrderAccumulator b) {
a.setCount(a.getCount() + b.getCount());
a.setTotalAmount(a.getTotalAmount() + b.getTotalAmount());
return a;
}
}
private static OrderEvent parseOrderEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
return new OrderEvent();
}
}
}状态处理示例
java
package com.example.flink;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class StatefulProcessingExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<OrderEvent> orders = env
.addSource(new RMQSource<>(
rmqConfig,
"orders.input",
true,
new SimpleStringSchema()
))
.map(json -> parseOrderEvent(json));
DataStream<CustomerStats> stats = orders
.keyBy(OrderEvent::getCustomerId)
.map(new CustomerStatsFunction());
stats.print();
env.execute("Stateful Processing Example");
}
public static class CustomerStatsFunction extends RichMapFunction<OrderEvent, CustomerStats> {
private transient ValueState<CustomerStats> state;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<CustomerStats> descriptor =
new ValueStateDescriptor<>(
"customerStats",
CustomerStats.class
);
state = getRuntimeContext().getState(descriptor);
}
@Override
public CustomerStats map(OrderEvent event) throws Exception {
CustomerStats current = state.value();
if (current == null) {
current = new CustomerStats();
current.setCustomerId(event.getCustomerId());
current.setTotalOrders(0);
current.setTotalAmount(0.0);
}
current.setTotalOrders(current.getTotalOrders() + 1);
current.setTotalAmount(current.getTotalAmount() + event.getAmount());
current.setLastOrderTime(System.currentTimeMillis());
state.update(current);
return current;
}
}
private static OrderEvent parseOrderEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
return new OrderEvent();
}
}
}CEP 复杂事件处理
java
package com.example.flink;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import java.util.List;
import java.util.Map;
public class CEPExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<OrderEvent> orders = env
.addSource(new RMQSource<>(
rmqConfig,
"orders.input",
true,
new SimpleStringSchema()
))
.map(json -> parseOrderEvent(json))
.keyBy(OrderEvent::getCustomerId);
Pattern<OrderEvent, ?> pattern = Pattern.<OrderEvent>begin("first")
.where(SimpleCondition.of(event ->
event.getStatus().equals("CREATED")))
.followedBy("second")
.where(SimpleCondition.of(event ->
event.getStatus().equals("CONFIRMED")))
.within(org.apache.flink.streaming.api.windowing.time.Time.minutes(30));
DataStream<OrderPattern> matches = CEP.pattern(orders, pattern)
.select(new PatternSelectFunction<OrderEvent, OrderPattern>() {
@Override
public OrderPattern select(Map<String, List<OrderEvent>> pattern) throws Exception {
OrderEvent created = pattern.get("first").get(0);
OrderEvent confirmed = pattern.get("second").get(0);
return new OrderPattern(
created.getCustomerId(),
created.getOrderId(),
"ORDER_CONFIRMED",
confirmed.getTimestamp() - created.getTimestamp()
);
}
});
matches.print();
env.execute("CEP Example");
}
private static OrderEvent parseOrderEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
return new OrderEvent();
}
}
}PHP 与 Flink 集成
PHP 消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FlinkProducer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
$this->setupQueue();
}
private function setupQueue(): void
{
$this->channel->exchange_declare(
'flink.events',
'topic',
false,
true,
false
);
$this->channel->queue_declare(
'flink.input',
false,
true,
false,
false
);
$this->channel->queue_bind(
'flink.input',
'flink.events',
'event.#'
);
}
public function sendEvent(string $eventType, array $payload): bool
{
$event = [
'eventId' => uniqid('evt-', true),
'eventType' => $eventType,
'timestamp' => time() * 1000,
'payload' => $payload,
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $event['eventId'],
'timestamp' => time(),
]
);
$routingKey = 'event.' . strtolower($eventType);
$this->channel->basic_publish(
$message,
'flink.events',
$routingKey
);
return true;
}
public function sendOrderEvent(array $orderData): bool
{
return $this->sendEvent('OrderCreated', $orderData);
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
// 使用示例
$producer = new FlinkProducer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$producer->sendOrderEvent([
'orderId' => 'ORD-' . time(),
'customerId' => 'CUST-001',
'amount' => 1500.00,
'status' => 'CREATED',
'items' => [
['productId' => 'PROD-001', 'quantity' => 2, 'price' => 500],
['productId' => 'PROD-002', 'quantity' => 1, 'price' => 500],
],
]);
$producer->close();PHP 结果消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class FlinkResultConsumer
{
private $connection;
private $channel;
public function __construct(array $config)
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function consume(string $queue, callable $processor): void
{
$this->channel->basic_qos(0, 10, false);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($processor) {
$result = json_decode($message->getBody(), true);
$processor($result);
$message->ack();
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
// 使用示例
$consumer = new FlinkResultConsumer([
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]);
$consumer->consume('flink.output', function (array $result) {
echo "收到 Flink 处理结果:\n";
print_r($result);
});
$consumer->close();实际应用场景
场景一:实时订单监控
java
package com.example.flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
public class RealTimeOrderMonitoring {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<OrderEvent> orders = env
.addSource(new RMQSource<>(
rmqConfig,
"orders.input",
true,
new SimpleStringSchema()
))
.name("Order Source")
.map(json -> parseOrderEvent(json));
DataStream<Alert> alerts = orders
.filter(order -> order.getAmount() > 10000)
.map(order -> new Alert(
"HIGH_VALUE_ORDER",
order.getOrderId(),
"High value order detected: " + order.getAmount(),
System.currentTimeMillis()
));
alerts
.map(alert -> alert.toJson())
.addSink(new RMQSink<>(
rmqConfig,
"alerts.output",
new SimpleStringSchema()
))
.name("Alert Sink");
env.execute("Real Time Order Monitoring");
}
private static OrderEvent parseOrderEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
return new OrderEvent();
}
}
}场景二:实时数据分析
java
package com.example.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import java.time.Duration;
public class RealTimeAnalytics {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
RMQConnectionConfig rmqConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<Event> events = env
.addSource(new RMQSource<>(
rmqConfig,
"events.input",
true,
new SimpleStringSchema()
))
.map(json -> parseEvent(json))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
DataStream<EventStats> stats = events
.keyBy(Event::getType)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
.aggregate(new EventStatsAggregator());
stats.print();
env.execute("Real Time Analytics");
}
public static class EventStatsAggregator implements AggregateFunction<
Event, EventAccumulator, EventStats> {
@Override
public EventAccumulator createAccumulator() {
return new EventAccumulator();
}
@Override
public EventAccumulator add(Event event, EventAccumulator acc) {
acc.setType(event.getType());
acc.setCount(acc.getCount() + 1);
return acc;
}
@Override
public EventStats getResult(EventAccumulator acc) {
return new EventStats(acc.getType(), acc.getCount());
}
@Override
public EventAccumulator merge(EventAccumulator a, EventAccumulator b) {
a.setCount(a.getCount() + b.getCount());
return a;
}
}
private static Event parseEvent(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, Event.class);
} catch (Exception e) {
return new Event();
}
}
}常见问题与解决方案
问题一:精确一次语义
症状: 消息重复处理
解决方案: 启用检查点和配置精确一次
java
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableUnalignedCheckpoints();问题二:状态过大
症状: 状态存储占用过多内存
解决方案: 使用 RocksDB 状态后端
java
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoints");问题三:背压
症状: 处理延迟增加
解决方案: 优化并行度和缓冲区
java
env.setBufferTimeout(100);
stream.setParallelism(8);最佳实践建议
1. 错误处理
java
DataStream<OrderEvent> orders = source
.map(json -> parseOrderEvent(json))
.returns(OrderEvent.class)
.filter(event -> event != null);2. 监控配置
java
env.getConfig().setAutoWatermarkInterval(200);
env.getConfig().setLatencyTrackingInterval(1000);3. 资源管理
java
env.setParallelism(4);
env.setMaxParallelism(128);版本兼容性
| Flink | Java | RabbitMQ | Scala |
|---|---|---|---|
| 1.18.x | 11/17 | 3.x | 2.12 |
| 1.17.x | 11 | 3.x | 2.12 |
| 1.16.x | 8/11 | 3.x | 2.12 |
| 1.15.x | 8/11 | 3.x | 2.12 |
