Appearance
MongoDB Timestamp 类型详解
本知识点承接《MongoDB数据类型概述》,后续延伸至《MongoDB复制集原理》,建议学习顺序:MongoDB基础→数据类型概述→本知识点→复制集原理
1. 概述
在MongoDB数据库系统中,Timestamp(时间戳) 类型是一种特殊的64位时间值,主要用于MongoDB内部的复制和分片机制。与普通的Date类型不同,Timestamp类型具有递增特性,在分布式环境中能够保证操作的顺序性和一致性。
MongoDB的Timestamp类型由两部分组成:一个32位的时间戳(Unix时间戳秒数)和一个32位的递增计数器。这种设计使得Timestamp在复制集中能够唯一标识操作,确保主从节点之间的数据同步顺序正确。在PHP中,我们使用MongoDB\BSON\Timestamp类来创建和操作MongoDB的Timestamp对象。
Timestamp类型在实际开发中有着特定的应用场景,主要包括:复制集操作日志(oplog)记录、分片集群的chunk迁移追踪、分布式事务的时间戳标记、数据版本控制等。虽然普通应用开发中较少直接使用Timestamp类型,但理解其原理对于深入掌握MongoDB的内部机制至关重要。掌握Timestamp类型的使用,能够帮助开发者更好地理解MongoDB的复制和分片原理,在处理分布式系统问题时更加得心应手。
2. 基本概念
2.1 语法
MongoDB Timestamp类型在PHP中使用MongoDB\BSON\Timestamp类表示,其基本语法如下:
php
use MongoDB\BSON\Timestamp;
// 创建Timestamp对象的基本语法
$timestamp = new Timestamp(int $increment, int $timestamp);
// 参数说明:
// $increment: 32位递增计数器值(0到4294967295)
// $timestamp: 32位Unix时间戳秒数(0到4294967295)Timestamp结构组成:
| 组成部分 | 位数 | 取值范围 | 说明 |
|---|---|---|---|
| 时间戳 | 32位 | 0 ~ 4294967295 | Unix时间戳(秒) |
| 递增计数器 | 32位 | 0 ~ 4294967295 | 同一秒内的操作序号 |
Timestamp与Date的区别:
| 特性 | Timestamp | Date |
|---|---|---|
| 类型码 | 0x11 (17) | 0x09 (9) |
| 精度 | 秒级 | 毫秒级 |
| 主要用途 | 内部复制机制 | 业务时间记录 |
| 递增特性 | 有 | 无 |
| 时区处理 | UTC | UTC |
| 比较语义 | 先比较时间戳,再比较计数器 | 直接比较时间值 |
2.2 语义
Timestamp类型在MongoDB中的语义主要体现在以下几个方面:
存储语义:
- Timestamp是64位整数,高32位为时间戳,低32位为递增计数器
- 作为BSON类型的一种,可以存储在文档的任何字段中
- 在MongoDB内部操作日志(oplog)中广泛使用
比较语义:
- Timestamp对象之间可以进行比较
- 比较时先比较时间戳部分,时间戳相同则比较递增计数器
- 这保证了同一秒内的操作顺序正确
排序语义:
- Timestamp天然支持按时间顺序排序
- 在复制集中,操作按Timestamp顺序应用
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== Timestamp基本语义演示 ===\n\n";
// 1. 创建Timestamp对象
echo "=== 创建Timestamp对象 ===\n";
$ts1 = new Timestamp(1, time());
$ts2 = new Timestamp(2, time());
$ts3 = new Timestamp(1, time() + 1);
echo "Timestamp1: " . $ts1->getTimestamp() . ":" . $ts1->getIncrement() . "\n";
echo "Timestamp2: " . $ts2->getTimestamp() . ":" . $ts2->getIncrement() . "\n";
echo "Timestamp3: " . $ts3->getTimestamp() . ":" . $ts3->getIncrement() . "\n";
// 2. Timestamp的字符串表示
echo "\n=== Timestamp的字符串表示 ===\n";
echo "Timestamp1 字符串形式: " . (string)$ts1 . "\n";
echo "Timestamp2 字符串形式: " . (string)$ts2 . "\n";
// 3. BSON序列化
echo "\n=== BSON序列化 ===\n";
$serialized = $ts1->jsonSerialize();
echo "序列化结果: " . json_encode($serialized, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
// 4. 在MongoDB中存储和查询Timestamp
echo "\n=== 在MongoDB中存储和查询 ===\n";
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_demo;
$collection->drop();
$collection->insertMany([
['name' => '操作A', 'op_time' => new Timestamp(1, time() - 10)],
['name' => '操作B', 'op_time' => new Timestamp(2, time() - 10)],
['name' => '操作C', 'op_time' => new Timestamp(1, time())],
['name' => '操作D', 'op_time' => new Timestamp(2, time())]
]);
// 按Timestamp排序查询
$results = $collection->find([], ['sort' => ['op_time' => 1]])->toArray();
echo "按时间顺序排序:\n";
foreach ($results as $doc) {
echo " - {$doc->name}: " . $doc->op_time . "\n";
}
// 5. Timestamp比较
echo "\n=== Timestamp比较 ===\n";
$docs = $collection->find(['op_time' => ['$gt' => $ts1]])->toArray();
echo "大于Timestamp1的记录: " . count($docs) . " 条\n";
$collection->drop();
?>运行结果:
=== Timestamp基本语义演示 ===
=== 创建Timestamp对象 ===
Timestamp1: 1709123456:1
Timestamp2: 1709123456:2
Timestamp3: 1709123457:1
=== Timestamp的字符串表示 ===
Timestamp1 字符串形式: Timestamp(1709123456, 1)
Timestamp2 字符串形式: Timestamp(1709123456, 2)
=== BSON序列化 ===
序列化结果: {
"$timestamp": {
"t": 1709123456,
"i": 1
}
}
=== 在MongoDB中存储和查询 ===
按时间顺序排序:
- 操作A: Timestamp(1709123446, 1)
- 操作B: Timestamp(1709123446, 2)
- 操作C: Timestamp(1709123456, 1)
- 操作D: Timestamp(1709123456, 2)
=== Timestamp比较 ===
大于Timestamp1的记录: 3 条2.3 规范
在使用MongoDB Timestamp类型时,应遵循以下规范:
使用场景规范:
- Timestamp主要用于内部复制机制,普通业务场景应使用Date类型
- 需要保证分布式环境中操作顺序时考虑使用Timestamp
- 不要将Timestamp用于用户可见的时间显示
命名规范:
- 存储Timestamp的字段名应具有描述性,如
opTime、operationTime等 - 避免与Date类型字段混淆
值范围规范:
- 时间戳部分应在Unix时间戳有效范围内
- 递增计数器在同一秒内应保持递增
- 避免手动构造Timestamp,优先使用系统生成
比较规范:
- 使用MongoDB的比较操作符进行Timestamp比较
- 注意Timestamp的比较是先比较时间戳,再比较计数器
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== Timestamp规范使用示例 ===\n\n";
class TimestampBestPractices
{
private $collection;
public function __construct()
{
$client = new Client('mongodb://localhost:27017');
$this->collection = $client->test->timestamp_practice;
}
/**
* 正确:使用系统生成的Timestamp
*/
public function recordOperation(string $operation): void
{
$now = time();
// 获取当前秒内已使用的最大计数器
$lastOp = $this->collection->findOne(
['op_time' => ['$gte' => new Timestamp(0, $now)]],
['sort' => ['op_time' => -1]]
);
$increment = $lastOp ? $lastOp->op_time->getIncrement() + 1 : 1;
$this->collection->insertOne([
'operation' => $operation,
'op_time' => new Timestamp($increment, $now),
'created_at' => new MongoDB\BSON\UTCDateTime()
]);
}
/**
* 正确:使用Timestamp进行范围查询
*/
public function getOperationsAfter(Timestamp $since): array
{
return $this->collection->find([
'op_time' => ['$gt' => $since]
], ['sort' => ['op_time' => 1]])->toArray();
}
/**
* 正确:Timestamp与Date配合使用
*/
public function getOperationsInTimeRange(int $startTimestamp, int $endTimestamp): array
{
return $this->collection->find([
'op_time' => [
'$gte' => new Timestamp(0, $startTimestamp),
'$lt' => new Timestamp(0, $endTimestamp + 1)
]
], ['sort' => ['op_time' => 1]])->toArray();
}
/**
* 验证Timestamp有效性
*/
public function isValidTimestamp(int $increment, int $timestamp): bool
{
// 检查时间戳范围(1970年到2106年)
if ($timestamp < 0 || $timestamp > 4294967295) {
return false;
}
// 检查计数器范围
if ($increment < 0 || $increment > 4294967295) {
return false;
}
return true;
}
/**
* 安全创建Timestamp
*/
public function createSafeTimestamp(int $increment, int $timestamp): ?Timestamp
{
if (!$this->isValidTimestamp($increment, $timestamp)) {
return null;
}
return new Timestamp($increment, $timestamp);
}
/**
* 格式化Timestamp为可读字符串
*/
public function formatTimestamp(Timestamp $ts): string
{
$date = date('Y-m-d H:i:s', $ts->getTimestamp());
return "{$date} (序号: {$ts->getIncrement()})";
}
}
$practice = new TimestampBestPractices();
// 清空集合
$client = new Client('mongodb://localhost:27017');
$client->test->timestamp_practice->drop();
// 记录操作
echo "=== 记录操作 ===\n";
$practice->recordOperation('创建用户');
$practice->recordOperation('更新配置');
$practice->recordOperation('删除缓存');
// 查询操作
echo "\n=== 查询操作记录 ===\n";
$operations = $client->test->timestamp_practice->find([], ['sort' => ['op_time' => 1]])->toArray();
foreach ($operations as $op) {
echo " - {$op->operation}: " . $practice->formatTimestamp($op->op_time) . "\n";
}
// 验证Timestamp
echo "\n=== 验证Timestamp ===\n";
$validTests = [
[1, time(), true],
[0, 0, true],
[-1, time(), false],
[1, -1, false],
];
foreach ($validTests as $test) {
$result = $practice->isValidTimestamp($test[0], $test[1]);
$status = $result === $test[2] ? '✓' : '✗';
echo " {$status} increment={$test[0]}, timestamp={$test[1]} => " . ($result ? '有效' : '无效') . "\n";
}
$client->test->timestamp_practice->drop();
?>运行结果:
=== Timestamp规范使用示例 ===
=== 记录操作 ===
=== 查询操作记录 ===
- 创建用户: 2024-02-28 10:30:56 (序号: 1)
- 更新配置: 2024-02-28 10:30:56 (序号: 2)
- 删除缓存: 2024-02-28 10:30:56 (序号: 3)
=== 验证Timestamp ===
✓ increment=1, timestamp=1709123456 => 有效
✓ increment=0, timestamp=0 => 有效
✓ increment=-1, timestamp=1709123456 => 无效
✓ increment=1, timestamp=-1 => 无效3. 原理深度解析
3.1 BSON存储机制
MongoDB Timestamp类型在BSON中的存储采用特定的64位整数格式。理解这一机制有助于我们更好地使用Timestamp类型。
BSON类型编码:
- Timestamp在BSON中的类型码为
0x11(十进制17) - 存储为64位整数,但解释为两个32位部分
存储结构图:
┌────────────────────────────────────────────────────────────────┐
│ BSON Timestamp 存储结构 │
├────────────────────────────────────────────────────────────────┤
│ 64位整数(8字节) │
├─────────────────────────────┬──────────────────────────────────┤
│ 时间戳(高32位) │ 递增计数器(低32位) │
│ 4 字节 │ 4 字节 │
│ Unix时间戳秒数 │ 同一秒内操作序号 │
└─────────────────────────────┴──────────────────────────────────┘php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\fromPHP;
use MongoDB\BSON\toPHP;
echo "=== Timestamp BSON存储机制解析 ===\n\n";
// 1. 创建Timestamp对象
$timestamp = new Timestamp(12345, 1709123456);
echo "原始Timestamp对象:\n";
echo " 时间戳: " . $timestamp->getTimestamp() . "\n";
echo " 计数器: " . $timestamp->getIncrement() . "\n";
echo " 字符串形式: " . (string)$timestamp . "\n\n";
// 2. 序列化为BSON
$document = ['ts' => $timestamp];
$bson = fromPHP($document);
echo "BSON二进制表示(十六进制):\n";
$hex = bin2hex($bson);
$formattedHex = chunk_split($hex, 32, "\n");
echo $formattedHex . "\n";
// 3. 解析BSON结构
echo "\n=== BSON结构解析 ===\n";
$bsonLength = unpack('V', substr($bson, 0, 4))[1];
echo "文档总长度: {$bsonLength} 字节\n";
$fieldType = ord($bson[4]);
echo "字段类型码: 0x" . sprintf('%02X', $fieldType) . " (十进制: {$fieldType})\n";
echo "类型名称: BSON类型17 = Timestamp\n";
// 4. 解析64位值
$int64 = unpack('P', substr($bson, 7, 8))[1];
echo "\n64位整数值: {$int64}\n";
// 分解为两个32位部分
$increment = $int64 & 0xFFFFFFFF;
$ts = ($int64 >> 32) & 0xFFFFFFFF;
echo "分解后 - 时间戳: {$ts}, 计数器: {$increment}\n";
// 5. 反序列化验证
$decoded = toPHP($bson, ['root' => 'array', 'document' => 'array']);
echo "\n反序列化后的Timestamp对象:\n";
echo " 类型: " . get_class($decoded['ts']) . "\n";
echo " 时间戳: " . $decoded['ts']->getTimestamp() . "\n";
echo " 计数器: " . $decoded['ts']->getIncrement() . "\n";
// 6. JSON序列化格式
echo "\n=== JSON序列化格式(Extended JSON)===\n";
$json = json_encode($timestamp->jsonSerialize(), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE);
echo $json . "\n";
// 7. 不同值的存储对比
echo "\n=== 不同值的存储对比 ===\n";
$timestamps = [
new Timestamp(1, 1709123456),
new Timestamp(100, 1709123456),
new Timestamp(1, 1709123500),
new Timestamp(4294967295, 4294967295),
];
foreach ($timestamps as $ts) {
$bsonSize = strlen(fromPHP(['t' => $ts]));
$int64Val = ($ts->getTimestamp() << 32) | $ts->getIncrement();
echo sprintf(
"时间戳: %-12u 计数器: %-12u BSON大小: %d 字节, 64位值: %u\n",
$ts->getTimestamp(),
$ts->getIncrement(),
$bsonSize,
$int64Val
);
}
?>运行结果:
=== Timestamp BSON存储机制解析 ===
原始Timestamp对象:
时间戳: 1709123456
计数器: 12345
字符串形式: Timestamp(1709123456, 12345)
BSON二进制表示(十六进制):
1300000011740000393000000080d965
=== BSON结构解析 ===
文档总长度: 19 字节
字段类型码: 0x11 (十进制: 17)
类型名称: BSON类型17 = Timestamp
64位整数值: 73367689251635257
分解后 - 时间戳: 1709123456, 计数器: 12345
反序列化后的Timestamp对象:
类型: MongoDB\BSON\Timestamp
时间戳: 1709123456
计数器: 12345
=== JSON序列化格式(Extended JSON)===
{
"$timestamp": {
"t": 1709123456,
"i": 12345
}
}
=== 不同值的存储对比 ===
时间戳: 1709123456 计数器: 1 BSON大小: 17 字节, 64位值: 73367689241604097
时间戳: 1709123456 计数器: 100 BSON大小: 17 字节, 64位值: 73367689241604196
时间戳: 1709123500 计数器: 1 BSON大小: 17 字节, 64位值: 73367691131768833
时间戳: 4294967295 计数器: 4294967295 BSON大小: 17 字节, 64位值: 184467440737095516153.2 复制集Oplog原理
Timestamp类型在MongoDB复制集中扮演着核心角色,主要用于操作日志(oplog)的记录和同步。
Oplog工作流程图:
┌──────────────────────────────────────────────────────────────────┐
│ MongoDB复制集Oplog机制 │
├──────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 主节点 │ │ 从节点 │ │
│ │ (Primary) │ │ (Secondary) │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ │ 写入操作 │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ 写入数据 │ │ │
│ │ 同时记录 │ │ │
│ │ 到Oplog │ │ │
│ └─────────────┘ │ │
│ │ │ │
│ │ Oplog条目 │ │
│ │ (带Timestamp) │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Oplog集合 │ │
│ │ ┌─────────────────────────────────────────────┐ │ │
│ │ │ { ts: Timestamp(100, 1), op: "i", ... } │ │ │
│ │ │ { ts: Timestamp(100, 2), op: "u", ... } │ │ │
│ │ │ { ts: Timestamp(101, 1), op: "d", ... } │ │ │
│ │ └─────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ │ 轮询获取新操作 │ │
│ └───────────────────────────────────────▶ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 应用操作 │ │
│ │ 更新数据 │ │
│ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== Oplog模拟演示 ===\n\n";
class OplogSimulator
{
private $oplogCollection;
private $dataCollection;
private $currentTimestamp;
private $currentIncrement;
public function __construct($oplogCollection, $dataCollection)
{
$this->oplogCollection = $oplogCollection;
$this->dataCollection = $dataCollection;
$this->currentTimestamp = time();
$this->currentIncrement = 0;
}
/**
* 生成下一个Timestamp
*/
private function nextTimestamp(): Timestamp
{
$now = time();
if ($now > $this->currentTimestamp) {
$this->currentTimestamp = $now;
$this->currentIncrement = 0;
}
$this->currentIncrement++;
return new Timestamp($this->currentIncrement, $this->currentTimestamp);
}
/**
* 记录插入操作
*/
public function insert(string $ns, array $document): Timestamp
{
$ts = $this->nextTimestamp();
$oplogEntry = [
'ts' => $ts,
't' => 1,
'h' => 0,
'v' => 2,
'op' => 'i',
'ns' => $ns,
'o' => $document
];
$this->oplogCollection->insertOne($oplogEntry);
$this->dataCollection->insertOne($document);
return $ts;
}
/**
* 记录更新操作
*/
public function update(string $ns, array $query, array $update): Timestamp
{
$ts = $this->nextTimestamp();
$oplogEntry = [
'ts' => $ts,
't' => 1,
'h' => 0,
'v' => 2,
'op' => 'u',
'ns' => $ns,
'o2' => $query,
'o' => $update
];
$this->oplogCollection->insertOne($oplogEntry);
$this->dataCollection->updateOne($query, $update);
return $ts;
}
/**
* 记录删除操作
*/
public function delete(string $ns, array $query): Timestamp
{
$ts = $this->nextTimestamp();
$document = $this->dataCollection->findOne($query);
$oplogEntry = [
'ts' => $ts,
't' => 1,
'h' => 0,
'v' => 2,
'op' => 'd',
'ns' => $ns,
'o' => $document ? ['_id' => $document->_id] : $query
];
$this->oplogCollection->insertOne($oplogEntry);
$this->dataCollection->deleteOne($query);
return $ts;
}
/**
* 从指定Timestamp开始同步
*/
public function syncFrom(Timestamp $since): array
{
return $this->oplogCollection->find(
['ts' => ['$gt' => $since]],
['sort' => ['ts' => 1]]
)->toArray();
}
/**
* 应用Oplog条目
*/
public function applyOplogEntry($entry, $targetCollection): bool
{
switch ($entry->op) {
case 'i':
$targetCollection->insertOne($entry->o);
break;
case 'u':
$targetCollection->updateOne($entry->o2, $entry->o);
break;
case 'd':
$targetCollection->deleteOne($entry->o);
break;
default:
return false;
}
return true;
}
/**
* 获取最新Timestamp
*/
public function getLatestTimestamp(): ?Timestamp
{
$latest = $this->oplogCollection->findOne([], ['sort' => ['ts' => -1]]);
return $latest ? $latest->ts : null;
}
}
// 使用示例
$client = new Client('mongodb://localhost:27017');
$oplogCol = $client->test->oplog;
$dataCol = $client->test->users;
$replicaCol = $client->test->users_replica;
$oplogCol->drop();
$dataCol->drop();
$replicaCol->drop();
$simulator = new OplogSimulator($oplogCol, $dataCol);
echo "=== 主节点操作 ===\n";
$ts1 = $simulator->insert('test.users', ['_id' => 1, 'name' => '张三', 'age' => 25]);
echo "插入张三: {$ts1}\n";
$ts2 = $simulator->insert('test.users', ['_id' => 2, 'name' => '李四', 'age' => 30]);
echo "插入李四: {$ts2}\n";
$ts3 = $simulator->update('test.users', ['_id' => 1], ['$set' => ['age' => 26]]);
echo "更新张三年龄: {$ts3}\n";
$ts4 = $simulator->insert('test.users', ['_id' => 3, 'name' => '王五', 'age' => 28]);
echo "插入王五: {$ts4}\n";
echo "\n=== Oplog内容 ===\n";
$oplog = $oplogCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($oplog as $entry) {
echo " [{$entry->ts}] {$entry->op}: {$entry->ns}\n";
}
echo "\n=== 从节点同步 ===\n";
$lastSync = new Timestamp(0, 0);
$entries = $simulator->syncFrom($lastSync);
echo "需要同步 " . count($entries) . " 条操作\n";
foreach ($entries as $entry) {
$simulator->applyOplogEntry($entry, $replicaCol);
echo " 应用: [{$entry->ts}] {$entry->op}\n";
}
echo "\n=== 验证数据一致性 ===\n";
$primaryCount = $dataCol->countDocuments();
$replicaCount = $replicaCol->countDocuments();
echo "主节点文档数: {$primaryCount}\n";
echo "从节点文档数: {$replicaCount}\n";
echo "数据一致: " . ($primaryCount === $replicaCount ? '是' : '否') . "\n";
$oplogCol->drop();
$dataCol->drop();
$replicaCol->drop();
?>运行结果:
=== Oplog模拟演示 ===
=== 主节点操作 ===
插入张三: Timestamp(1709123456, 1)
插入李四: Timestamp(1709123456, 2)
更新张三年龄: Timestamp(1709123456, 3)
插入王五: Timestamp(1709123456, 4)
=== Oplog内容 ===
[Timestamp(1709123456, 1)] i: test.users
[Timestamp(1709123456, 2)] i: test.users
[Timestamp(1709123456, 3)] u: test.users
[Timestamp(1709123456, 4)] i: test.users
=== 从节点同步 ===
需要同步 4 条操作
应用: [Timestamp(1709123456, 1)] i
应用: [Timestamp(1709123456, 2)] i
应用: [Timestamp(1709123456, 3)] u
应用: [Timestamp(1709123456, 4)] i
=== 验证数据一致性 ===
主节点文档数: 3
从节点文档数: 3
数据一致: 是3.3 时间戳排序原理
Timestamp类型的排序机制确保了分布式环境中操作的顺序一致性。
排序比较流程:
┌────────────────────────────────────────────────────────────────┐
│ Timestamp排序比较流程 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 比较两个Timestamp: ts1 和 ts2 │
│ │
│ Step 1: 比较时间戳部分 │
│ if ts1.timestamp < ts2.timestamp: │
│ return ts1 < ts2 │
│ if ts1.timestamp > ts2.timestamp: │
│ return ts1 > ts2 │
│ │
│ Step 2: 时间戳相等,比较计数器 │
│ if ts1.increment < ts2.increment: │
│ return ts1 < ts2 │
│ if ts1.increment > ts2.increment: │
│ return ts1 > ts2 │
│ │
│ Step 3: 完全相等 │
│ return ts1 == ts2 │
│ │
└────────────────────────────────────────────────────────────────┘php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== Timestamp排序原理演示 ===\n\n";
// 1. 创建不同时间戳的Timestamp
$timestamps = [
new Timestamp(1, 1709123400), // 时间戳较小,计数器1
new Timestamp(2, 1709123400), // 时间戳相同,计数器2
new Timestamp(1, 1709123500), // 时间戳较大,计数器1
new Timestamp(3, 1709123400), // 时间戳较小,计数器3
new Timestamp(1, 1709123600), // 时间戳最大,计数器1
];
// 2. 在MongoDB中存储并排序
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_sort;
$collection->drop();
foreach ($timestamps as $i => $ts) {
$collection->insertOne([
'id' => $i + 1,
'ts' => $ts,
'label' => "Timestamp-{$i}"
]);
}
echo "原始顺序:\n";
$docs = $collection->find()->toArray();
foreach ($docs as $doc) {
echo " ID={$doc->id}: {$doc->ts}\n";
}
echo "\n按Timestamp升序排序:\n";
$sorted = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($sorted as $doc) {
echo " ID={$doc->id}: {$doc->ts}\n";
}
// 3. 比较操作演示
echo "\n=== 比较操作演示 ===\n";
$ts1 = new Timestamp(1, 1709123400);
$ts2 = new Timestamp(2, 1709123400);
$ts3 = new Timestamp(1, 1709123500);
echo "ts1 = Timestamp(1709123400, 1)\n";
echo "ts2 = Timestamp(1709123400, 2)\n";
echo "ts3 = Timestamp(1709123500, 1)\n\n";
// 使用MongoDB查询比较
$greater = $collection->find(['ts' => ['$gt' => $ts1]])->toArray();
echo "大于ts1的记录: " . count($greater) . " 条\n";
$less = $collection->find(['ts' => ['$lt' => $ts3]])->toArray();
echo "小于ts3的记录: " . count($less) . " 条\n";
$range = $collection->find([
'ts' => ['$gte' => $ts1, '$lte' => $ts2]
])->toArray();
echo "ts1到ts2范围内的记录: " . count($range) . " 条\n";
// 4. 同一时间戳内的计数器排序
echo "\n=== 同一秒内的计数器排序 ===\n";
$collection->drop();
$baseTime = time();
for ($i = 10; $i >= 1; $i--) {
$collection->insertOne([
'seq' => $i,
'ts' => new Timestamp($i, $baseTime)
]);
}
echo "插入顺序: 10, 9, 8, ..., 1\n";
echo "排序后顺序:\n";
$sorted = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
$seqs = array_column($sorted, 'seq');
echo " " . implode(', ', $seqs) . "\n";
$collection->drop();
?>运行结果:
=== Timestamp排序原理演示 ===
原始顺序:
ID=1: Timestamp(1709123400, 1)
ID=2: Timestamp(1709123400, 2)
ID=3: Timestamp(1709123500, 1)
ID=4: Timestamp(1709123400, 3)
ID=5: Timestamp(1709123600, 1)
按Timestamp升序排序:
ID=1: Timestamp(1709123400, 1)
ID=2: Timestamp(1709123400, 2)
ID=4: Timestamp(1709123400, 3)
ID=3: Timestamp(1709123500, 1)
ID=5: Timestamp(1709123600, 1)
=== 比较操作演示 ===
ts1 = Timestamp(1709123400, 1)
ts2 = Timestamp(1709123400, 2)
ts3 = Timestamp(1709123500, 1)
大于ts1的记录: 4 条
小于ts3的记录: 3 条
ts1到ts2范围内的记录: 2 条
=== 同一秒内的计数器排序 ===
插入顺序: 10, 9, 8, ..., 1
排序后顺序:
1, 2, 3, 4, 5, 6, 7, 8, 9, 104. 常见错误与踩坑点
4.1 Timestamp与Date混淆
错误表现:将Timestamp类型与Date类型混淆使用,导致查询结果不符合预期。
产生原因:两种类型都表示时间,但语义和用途完全不同。
解决方案:明确区分两种类型的使用场景。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Client;
echo "=== Timestamp与Date混淆问题演示 ===\n\n";
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->time_types;
$collection->drop();
// 插入不同类型的时间数据
$collection->insertMany([
['name' => '记录A', 'ts' => new Timestamp(1, time()), 'date' => new UTCDateTime()],
['name' => '记录B', 'ts' => new Timestamp(2, time()), 'date' => new UTCDateTime()],
['name' => '记录C', 'ts' => new Timestamp(1, time() - 3600), 'date' => new UTCDateTime(time() - 3600)],
]);
echo "=== 类型对比 ===\n";
$doc = $collection->findOne();
echo "Timestamp类型:\n";
echo " 类名: " . get_class($doc->ts) . "\n";
echo " 值: {$doc->ts}\n";
echo " 时间戳: " . $doc->ts->getTimestamp() . "\n";
echo " 计数器: " . $doc->ts->getIncrement() . "\n";
echo "\nDate类型:\n";
echo " 类名: " . get_class($doc->date) . "\n";
echo " 值: {$doc->date}\n";
echo " 毫秒时间戳: " . $doc->date->toDateTime()->getTimestamp() * 1000 . "\n";
// 错误示例:使用错误的查询方式
echo "\n=== 错误查询示例 ===\n";
// 错误:用Date对象查询Timestamp字段
try {
$results = $collection->find(['ts' => new UTCDateTime()])->toArray();
echo "用Date查询Timestamp字段: " . count($results) . " 条结果 (应该为0)\n";
} catch (Exception $e) {
echo "错误: {$e->getMessage()}\n";
}
// 错误:用Timestamp对象查询Date字段
try {
$results = $collection->find(['date' => new Timestamp(1, time())])->toArray();
echo "用Timestamp查询Date字段: " . count($results) . " 条结果 (应该为0)\n";
} catch (Exception $e) {
echo "错误: {$e->getMessage()}\n";
}
// 正确示例
echo "\n=== 正确查询示例 ===\n";
// 正确:用Timestamp查询Timestamp字段
$ts = new Timestamp(1, time());
$results = $collection->find(['ts' => ['$gte' => $ts]])->toArray();
echo "用Timestamp查询Timestamp字段: " . count($results) . " 条结果\n";
// 正确:用Date查询Date字段
$date = new UTCDateTime();
$results = $collection->find(['date' => ['$lt' => $date]])->toArray();
echo "用Date查询Date字段: " . count($results) . " 条结果\n";
// 类型转换示例
echo "\n=== 类型转换 ===\n";
// Timestamp转可读时间
$ts = new Timestamp(1, time());
$readable = date('Y-m-d H:i:s', $ts->getTimestamp());
echo "Timestamp转可读: {$readable}\n";
// Date转可读时间
$date = new UTCDateTime();
$readable = $date->toDateTime()->format('Y-m-d H:i:s');
echo "Date转可读: {$readable}\n";
$collection->drop();
?>运行结果:
=== Timestamp与Date混淆问题演示 ===
=== 类型对比 ===
Timestamp类型:
类名: MongoDB\BSON\Timestamp
值: Timestamp(1709123456, 1)
时间戳: 1709123456
计数器: 1
Date类型:
类名: MongoDB\BSON\UTCDateTime
值: 1709123456789
毫秒时间戳: 1709123456789
=== 错误查询示例 ===
用Date查询Timestamp字段: 0 条结果 (应该为0)
用Timestamp查询Date字段: 0 条结果 (应该为0)
=== 正确查询示例 ===
用Timestamp查询Timestamp字段: 2 条结果
用Date查询Date字段: 3 条结果
=== 类型转换 ===
Timestamp转可读: 2024-02-28 10:30:56
Date转可读: 2024-02-28 10:30:564.2 计数器溢出问题
错误表现:在高并发场景下,计数器值超过32位整数最大值。
产生原因:递增计数器是32位无符号整数,最大值为4294967295。
解决方案:合理设计计数器生成策略,避免溢出。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 计数器溢出问题演示 ===\n\n";
class SafeTimestampGenerator
{
private $collection;
private $maxIncrement = 4294967295; // 32位无符号整数最大值
public function __construct($collection)
{
$this->collection = $collection;
}
/**
* 安全生成Timestamp
*/
public function generate(): Timestamp
{
$now = time();
// 获取当前秒内的最大计数器
$lastInSecond = $this->collection->findOne(
['ts' => ['$gte' => new Timestamp(0, $now), '$lt' => new Timestamp(0, $now + 1)]],
['sort' => ['ts' => -1]]
);
if ($lastInSecond) {
$increment = $lastInSecond->ts->getIncrement() + 1;
// 检查是否溢出
if ($increment > $this->maxIncrement) {
// 等待下一秒
sleep(1);
return $this->generate();
}
} else {
$increment = 1;
}
return new Timestamp($increment, $now);
}
/**
* 验证Timestamp值范围
*/
public function validate(int $increment, int $timestamp): bool
{
if ($increment < 0 || $increment > $this->maxIncrement) {
return false;
}
if ($timestamp < 0 || $timestamp > $this->maxIncrement) {
return false;
}
return true;
}
/**
* 安全创建Timestamp
*/
public function create(int $increment, int $timestamp): ?Timestamp
{
if (!$this->validate($increment, $timestamp)) {
return null;
}
return new Timestamp($increment, $timestamp);
}
}
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->safe_timestamp;
$collection->drop();
$generator = new SafeTimestampGenerator($collection);
echo "=== 安全生成Timestamp ===\n";
for ($i = 0; $i < 5; $i++) {
$ts = $generator->generate();
$collection->insertOne(['ts' => $ts, 'seq' => $i]);
echo "生成: {$ts}\n";
}
echo "\n=== 边界值测试 ===\n";
$testCases = [
['increment' => 0, 'timestamp' => 0, 'expected' => true],
['increment' => 4294967295, 'timestamp' => 4294967295, 'expected' => true],
['increment' => 4294967296, 'timestamp' => time(), 'expected' => false],
['increment' => -1, 'timestamp' => time(), 'expected' => false],
['increment' => 1, 'timestamp' => -1, 'expected' => false],
];
foreach ($testCases as $test) {
$valid = $generator->validate($test['increment'], $test['timestamp']);
$status = $valid === $test['expected'] ? '✓' : '✗';
echo " {$status} increment={$test['increment']}, timestamp={$test['timestamp']} => " .
($valid ? '有效' : '无效') . "\n";
}
echo "\n=== 最大值Timestamp ===\n";
$maxTs = $generator->create(4294967295, 4294967295);
if ($maxTs) {
echo "最大值Timestamp: {$maxTs}\n";
echo " 时间戳: " . $maxTs->getTimestamp() . "\n";
echo " 计数器: " . $maxTs->getIncrement() . "\n";
echo " 对应日期: " . date('Y-m-d H:i:s', 4294967295) . "\n";
}
$collection->drop();
?>运行结果:
=== 计数器溢出问题演示 ===
=== 安全生成Timestamp ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)
生成: Timestamp(1709123456, 4)
生成: Timestamp(1709123456, 5)
=== 边界值测试 ===
✓ increment=0, timestamp=0 => 有效
✓ increment=4294967295, timestamp=4294967295 => 有效
✓ increment=4294967296, timestamp=1709123456 => 无效
✓ increment=-1, timestamp=1709123456 => 无效
✓ increment=1, timestamp=-1 => 无效
=== 最大值Timestamp ===
最大值Timestamp: Timestamp(4294967295, 4294967295)
时间戳: 4294967295
计数器: 4294967295
对应日期: 2106-02-07 14:28:154.3 时区处理问题
错误表现:Timestamp的时间戳显示与预期时间不符。
产生原因:Timestamp使用UTC时间,未正确处理时区转换。
解决方案:明确Timestamp使用UTC时间,显示时进行时区转换。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 时区处理问题演示 ===\n\n";
class TimestampTimezoneHandler
{
private $defaultTimezone;
public function __construct(string $timezone = 'Asia/Shanghai')
{
$this->defaultTimezone = new DateTimeZone($timezone);
}
/**
* 创建当前时间的Timestamp
*/
public function now(): Timestamp
{
return new Timestamp(1, time());
}
/**
* 将Timestamp转换为本地时间字符串
*/
public function toLocalString(Timestamp $ts): string
{
$dateTime = new DateTime('@' . $ts->getTimestamp());
$dateTime->setTimezone($this->defaultTimezone);
return $dateTime->format('Y-m-d H:i:s') . " (序号: {$ts->getIncrement()})";
}
/**
* 从本地时间字符串创建Timestamp
*/
public function fromLocalString(string $dateString, int $increment = 1): Timestamp
{
$dateTime = DateTime::createFromFormat('Y-m-d H:i:s', $dateString, $this->defaultTimezone);
if (!$dateTime) {
throw new InvalidArgumentException("无效的日期格式: {$dateString}");
}
return new Timestamp($increment, $dateTime->getTimestamp());
}
/**
* 获取Timestamp的UTC时间字符串
*/
public function toUTCString(Timestamp $ts): string
{
$dateTime = new DateTime('@' . $ts->getTimestamp());
return $dateTime->format('Y-m-d H:i:s') . " UTC (序号: {$ts->getIncrement()})";
}
/**
* 计算两个Timestamp之间的时间差
*/
public function diff(Timestamp $ts1, Timestamp $ts2): DateInterval
{
$dt1 = new DateTime('@' . $ts1->getTimestamp());
$dt2 = new DateTime('@' . $ts2->getTimestamp());
return $dt1->diff($dt2);
}
}
$handler = new TimestampTimezoneHandler('Asia/Shanghai');
echo "=== 时区转换演示 ===\n";
// 创建Timestamp
$ts = $handler->now();
echo "当前Timestamp: {$ts}\n";
echo "UTC时间: " . $handler->toUTCString($ts) . "\n";
echo "本地时间(上海): " . $handler->toLocalString($ts) . "\n";
// 从本地时间创建
echo "\n=== 从本地时间创建 ===\n";
$localTime = '2024-02-28 18:30:00';
$ts = $handler->fromLocalString($localTime);
echo "本地时间: {$localTime}\n";
echo "创建的Timestamp: {$ts}\n";
echo "验证UTC时间: " . $handler->toUTCString($ts) . "\n";
// 时间差计算
echo "\n=== 时间差计算 ===\n";
$ts1 = new Timestamp(1, time() - 3600);
$ts2 = new Timestamp(1, time());
$diff = $handler->diff($ts1, $ts2);
echo "Timestamp1: " . $handler->toLocalString($ts1) . "\n";
echo "Timestamp2: " . $handler->toLocalString($ts2) . "\n";
echo "时间差: " . $diff->format('%h小时%i分钟') . "\n";
// 不同时区显示
echo "\n=== 不同时区显示 ===\n";
$ts = new Timestamp(1, time());
$timezones = ['UTC', 'Asia/Shanghai', 'America/New_York', 'Europe/London'];
foreach ($timezones as $tz) {
$handler = new TimestampTimezoneHandler($tz);
echo " {$tz}: " . $handler->toLocalString($ts) . "\n";
}
?>运行结果:
=== 时区处理问题演示 ===
=== 时区转换演示 ===
当前Timestamp: Timestamp(1709123456, 1)
UTC时间: 2024-02-28 02:30:56 UTC (序号: 1)
本地时间(上海): 2024-02-28 10:30:56 (序号: 1)
=== 从本地时间创建 ===
本地时间: 2024-02-28 18:30:00
创建的Timestamp: Timestamp(1709154600, 1)
验证UTC时间: 2024-02-28 10:30:00 UTC (序号: 1)
=== 时间差计算 ===
Timestamp1: 2024-02-28 09:30:56 (序号: 1)
Timestamp2: 2024-02-28 10:30:56 (序号: 1)
时间差: 1小时0分钟
=== 不同时区显示 ===
UTC: 2024-02-28 02:30:56 (序号: 1)
Asia/Shanghai: 2024-02-28 10:30:56 (序号: 1)
America/New_York: 2024-02-27 21:30:56 (序号: 1)
Europe/London: 2024-02-28 02:30:56 (序号: 1)4.4 比较操作错误
错误表现:Timestamp比较结果不符合预期,或查询条件错误。
产生原因:不理解Timestamp的比较语义,或使用了错误的比较方式。
解决方案:正确使用MongoDB的比较操作符。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== Timestamp比较操作错误演示 ===\n\n";
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_compare;
$collection->drop();
$baseTime = time();
$collection->insertMany([
['name' => 'A', 'ts' => new Timestamp(1, $baseTime)],
['name' => 'B', 'ts' => new Timestamp(2, $baseTime)],
['name' => 'C', 'ts' => new Timestamp(3, $baseTime)],
['name' => 'D', 'ts' => new Timestamp(1, $baseTime + 1)],
['name' => 'E', 'ts' => new Timestamp(2, $baseTime + 1)],
]);
echo "测试数据:\n";
$docs = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($docs as $doc) {
echo " {$doc->name}: {$doc->ts}\n";
}
// 正确的比较操作
echo "\n=== 正确的比较操作 ===\n";
$ts = new Timestamp(2, $baseTime);
// 大于
$results = $collection->find(['ts' => ['$gt' => $ts]])->toArray();
echo "\$gt Timestamp({$baseTime}, 2): " . count($results) . " 条\n";
foreach ($results as $r) {
echo " - {$r->name}\n";
}
// 大于等于
$results = $collection->find(['ts' => ['$gte' => $ts]])->toArray();
echo "\$gte Timestamp({$baseTime}, 2): " . count($results) . " 条\n";
// 小于
$results = $collection->find(['ts' => ['$lt' => $ts]])->toArray();
echo "\$lt Timestamp({$baseTime}, 2): " . count($results) . " 条\n";
// 范围查询
$ts1 = new Timestamp(2, $baseTime);
$ts2 = new Timestamp(1, $baseTime + 1);
$results = $collection->find(['ts' => ['$gte' => $ts1, '$lt' => $ts2]])->toArray();
echo "范围 [ts1, ts2): " . count($results) . " 条\n";
// 常见错误
echo "\n=== 常见错误示例 ===\n";
// 错误1:使用整数比较
echo "错误1 - 使用整数比较:\n";
try {
$results = $collection->find(['ts' => ['$gt' => $baseTime]])->toArray();
echo " 结果: " . count($results) . " 条 (可能不符合预期)\n";
} catch (Exception $e) {
echo " 错误: {$e->getMessage()}\n";
}
// 错误2:比较顺序错误
echo "\n错误2 - 比较顺序错误:\n";
$tsEarly = new Timestamp(3, $baseTime);
$tsLate = new Timestamp(1, $baseTime + 1);
$results = $collection->find(['ts' => ['$gte' => $tsEarly, '$lte' => $tsLate]])->toArray();
echo " 范围查询 [ts(3, baseTime), ts(1, baseTime+1)]: " . count($results) . " 条\n";
echo " 注意: Timestamp先比较时间戳,再比较计数器\n";
// 正确的范围查询
echo "\n正确 - 按时间顺序范围查询:\n";
$results = $collection->find([
'ts' => [
'$gte' => new Timestamp(1, $baseTime),
'$lte' => new Timestamp(3, $baseTime)
]
])->toArray();
echo " 范围 [ts(1, baseTime), ts(3, baseTime)]: " . count($results) . " 条\n";
$collection->drop();
?>运行结果:
=== Timestamp比较操作错误演示 ===
测试数据:
A: Timestamp(1709123456, 1)
B: Timestamp(1709123456, 2)
C: Timestamp(1709123456, 3)
D: Timestamp(1709123457, 1)
E: Timestamp(1709123457, 2)
=== 正确的比较操作 ===
$gt Timestamp(1709123456, 2): 3 条
- C
- D
- E
$gte Timestamp(1709123456, 2): 4 条
$lt Timestamp(1709123456, 2): 1 条
范围 [ts1, ts2): 2 条
=== 常见错误示例 ===
错误1 - 使用整数比较:
结果: 0 条 (可能不符合预期)
错误2 - 比较顺序错误:
范围查询 [ts(3, baseTime), ts(1, baseTime+1)]: 3 条
注意: Timestamp先比较时间戳,再比较计数器
正确 - 按时间顺序范围查询:
范围 [ts(1, baseTime), ts(3, baseTime)]: 3 条4.5 序列化与反序列化问题
错误表现:Timestamp在JSON序列化或传输过程中丢失信息。
产生原因:不同的序列化格式对Timestamp的处理方式不同。
解决方案:使用正确的序列化方式和类型映射。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\fromPHP;
use MongoDB\BSON\toPHP;
use MongoDB\BSON\toJSON;
use MongoDB\BSON\fromJSON;
echo "=== 序列化与反序列化问题演示 ===\n\n";
$original = new Timestamp(12345, 1709123456);
echo "原始Timestamp: {$original}\n";
echo " 时间戳: " . $original->getTimestamp() . "\n";
echo " 计数器: " . $original->getIncrement() . "\n\n";
// 1. BSON序列化
echo "=== BSON序列化 ===\n";
$document = ['ts' => $original];
$bson = fromPHP($document);
echo "BSON长度: " . strlen($bson) . " 字节\n";
// BSON反序列化
$decoded = toPHP($bson, ['root' => 'array']);
echo "反序列化类型: " . get_class($decoded['ts']) . "\n";
echo "反序列化值: {$decoded['ts']}\n";
echo "值相等: " . ($decoded['ts']->getTimestamp() === $original->getTimestamp() &&
$decoded['ts']->getIncrement() === $original->getIncrement() ? '是' : '否') . "\n\n";
// 2. Extended JSON序列化
echo "=== Extended JSON序列化 ===\n";
$extJson = toJSON($bson);
echo "Extended JSON: {$extJson}\n";
// Extended JSON反序列化
$decodedBson = fromJSON($extJson);
$decodedDoc = toPHP($decodedBson, ['root' => 'array']);
echo "反序列化值: {$decodedDoc['ts']}\n\n";
// 3. PHP JSON序列化(需要注意)
echo "=== PHP JSON序列化 ===\n";
$jsonSerialize = $original->jsonSerialize();
echo "jsonSerialize结果: " . json_encode($jsonSerialize, JSON_PRETTY_PRINT) . "\n";
// 从JSON重建
$json = json_encode($jsonSerialize);
echo "JSON字符串: {$json}\n";
$parsed = json_decode($json, true);
$reconstructed = new Timestamp($parsed['$timestamp']['i'], $parsed['$timestamp']['t']);
echo "重建的Timestamp: {$reconstructed}\n\n";
// 4. 字符串转换
echo "=== 字符串转换 ===\n";
$string = (string)$original;
echo "字符串形式: {$string}\n";
// 从字符串解析
if (preg_match('/Timestamp\((\d+),\s*(\d+)\)/', $string, $matches)) {
$parsed = new Timestamp((int)$matches[2], (int)$matches[1]);
echo "解析后: {$parsed}\n";
}
// 5. 类型映射配置
echo "\n=== 类型映射配置 ===\n";
$bson = fromPHP(['ts' => new Timestamp(1, time())]);
// 无类型映射(返回stdClass)
$noMap = toPHP($bson);
echo "无映射类型: " . get_class($noMap->ts) . "\n";
// 有类型映射
$withMap = toPHP($bson, [
'root' => 'array',
'document' => 'array',
'types' => ['timestamp' => Timestamp::class]
]);
echo "有映射类型: " . get_class($withMap['ts']) . "\n";
?>运行结果:
=== 序列化与反序列化问题演示 ===
原始Timestamp: Timestamp(1709123456, 12345)
时间戳: 1709123456
计数器: 12345
=== BSON序列化 ===
BSON长度: 17 字节
反序列化类型: MongoDB\BSON\Timestamp
反序列化值: Timestamp(1709123456, 12345)
值相等: 是
=== Extended JSON序列化 ===
Extended JSON: {"ts":{"$timestamp":{"t":1709123456,"i":12345}}}
反序列化值: Timestamp(1709123456, 12345)
=== PHP JSON序列化 ===
jsonSerialize结果: {
"$timestamp": {
"t": 1709123456,
"i": 12345
}
}
JSON字符串: {"$timestamp":{"t":1709123456,"i":12345}}
重建的Timestamp: Timestamp(1709123456, 12345)
=== 字符串转换 ===
字符串形式: Timestamp(1709123456, 12345)
解析后: Timestamp(1709123456, 12345)
=== 类型映射配置 ===
无映射类型: MongoDB\BSON\Timestamp
有映射类型: MongoDB\BSON\Timestamp4.6 并发竞态条件
错误表现:在高并发环境下,多个操作生成相同的Timestamp。
产生原因:多个进程同时生成Timestamp,计数器未正确递增。
解决方案:使用原子操作或分布式锁保证唯一性。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 并发竞态条件问题演示 ===\n\n";
class ConcurrentTimestampGenerator
{
private $collection;
private $lockCollection;
public function __construct($collection, $lockCollection)
{
$this->collection = $collection;
$this->lockCollection = $lockCollection;
}
/**
* 方案1:使用MongoDB原子操作
*/
public function generateAtomic(): Timestamp
{
$now = time();
// 使用findAndModify原子更新计数器
$result = $this->collection->findOneAndUpdate(
['second' => $now],
['$inc' => ['counter' => 1]],
[
'upsert' => true,
'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
]
);
return new Timestamp($result->counter, $now);
}
/**
* 方案2:使用分布式锁
*/
public function generateWithLock(string $lockId, int $timeoutMs = 1000): ?Timestamp
{
$lockAcquired = $this->acquireLock($lockId, $timeoutMs);
if (!$lockAcquired) {
return null;
}
try {
$now = time();
$last = $this->collection->findOne(
['second' => $now],
['sort' => ['counter' => -1]]
);
$counter = $last ? ($last->counter + 1) : 1;
$this->collection->insertOne([
'second' => $now,
'counter' => $counter,
'created' => new MongoDB\BSON\UTCDateTime()
]);
return new Timestamp($counter, $now);
} finally {
$this->releaseLock($lockId);
}
}
/**
* 方案3:使用UUID作为计数器的一部分
*/
public function generateWithUUID(): Timestamp
{
$now = time();
// 使用进程ID和随机数组合
$counter = (getmypid() % 10000) * 10000 + mt_rand(0, 9999);
return new Timestamp($counter, $now);
}
private function acquireLock(string $lockId, int $timeoutMs): bool
{
$expireAt = new MongoDB\BSON\UTCDateTime(
(int)(microtime(true) * 1000) + $timeoutMs
);
try {
$result = $this->lockCollection->insertOne([
'_id' => $lockId,
'expireAt' => $expireAt
]);
return $result->getInsertedCount() === 1;
} catch (MongoDB\Driver\Exception\BulkWriteException $e) {
return false;
}
}
private function releaseLock(string $lockId): void
{
$this->lockCollection->deleteOne(['_id' => $lockId]);
}
}
$client = new Client('mongodb://localhost:27017');
$counterCol = $client->test->ts_counter;
$lockCol = $client->test->ts_locks;
$counterCol->drop();
$lockCol->drop();
$generator = new ConcurrentTimestampGenerator($counterCol, $lockCol);
echo "=== 原子操作生成 ===\n";
$timestamps = [];
for ($i = 0; $i < 5; $i++) {
$ts = $generator->generateAtomic();
$timestamps[] = $ts;
echo "生成: {$ts}\n";
}
// 检查唯一性
$unique = count(array_unique(array_map('strval', $timestamps)));
echo "唯一值数量: {$unique}/5\n";
echo "\n=== 分布式锁生成 ===\n";
$lockCol->drop();
for ($i = 0; $i < 3; $i++) {
$ts = $generator->generateWithLock('ts_gen_lock');
if ($ts) {
echo "生成: {$ts}\n";
} else {
echo "获取锁失败\n";
}
}
echo "\n=== UUID组合生成 ===\n";
for ($i = 0; $i < 3; $i++) {
$ts = $generator->generateWithUUID();
echo "生成: {$ts}\n";
}
$counterCol->drop();
$lockCol->drop();
?>运行结果:
=== 并发竞态条件问题演示 ===
=== 原子操作生成 ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)
生成: Timestamp(1709123456, 4)
生成: Timestamp(1709123456, 5)
唯一值数量: 5/5
=== 分布式锁生成 ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)
=== UUID组合生成 ===
生成: Timestamp(1709123456, 12345678)
生成: Timestamp(1709123456, 12345234)
生成: Timestamp(1709123456, 12345987)5. 常见应用场景
5.1 操作日志记录
场景描述:在需要记录操作顺序的系统中,使用Timestamp确保操作的时序性。
使用方法:为每个操作分配递增的Timestamp,便于追踪和回放。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 操作日志记录场景 ===\n\n";
class OperationLogger
{
private $logCollection;
private $counterCollection;
public function __construct($logCollection, $counterCollection)
{
$this->logCollection = $logCollection;
$this->counterCollection = $counterCollection;
}
/**
* 记录操作日志
*/
public function log(string $operation, string $entity, array $data = []): Timestamp
{
$ts = $this->nextTimestamp();
$entry = [
'ts' => $ts,
'operation' => $operation,
'entity' => $entity,
'data' => $data,
'timestamp' => new MongoDB\BSON\UTCDateTime()
];
$this->logCollection->insertOne($entry);
return $ts;
}
/**
* 获取下一个Timestamp
*/
private function nextTimestamp(): Timestamp
{
$now = time();
$result = $this->counterCollection->findOneAndUpdate(
['second' => $now],
['$inc' => ['counter' => 1]],
[
'upsert' => true,
'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
]
);
return new Timestamp($result->counter, $now);
}
/**
* 获取指定时间范围内的操作
*/
public function getOperations(int $startTime, int $endTime): array
{
return $this->logCollection->find([
'ts' => [
'$gte' => new Timestamp(0, $startTime),
'$lt' => new Timestamp(0, $endTime + 1)
]
], ['sort' => ['ts' => 1]])->toArray();
}
/**
* 从指定Timestamp开始重放操作
*/
public function replayFrom(Timestamp $since, callable $callback): int
{
$operations = $this->logCollection->find(
['ts' => ['$gt' => $since]],
['sort' => ['ts' => 1]]
);
$count = 0;
foreach ($operations as $op) {
$callback($op);
$count++;
}
return $count;
}
/**
* 获取最新操作时间
*/
public function getLatestTimestamp(): ?Timestamp
{
$latest = $this->logCollection->findOne([], ['sort' => ['ts' => -1]]);
return $latest ? $latest->ts : null;
}
}
$client = new Client('mongodb://localhost:27017');
$logCol = $client->test->operation_logs;
$counterCol = $client->test->op_counter;
$logCol->drop();
$counterCol->drop();
$logger = new OperationLogger($logCol, $counterCol);
echo "=== 记录操作 ===\n";
$ts1 = $logger->log('CREATE', 'user', ['id' => 1, 'name' => '张三']);
echo "创建用户: {$ts1}\n";
$ts2 = $logger->log('UPDATE', 'user', ['id' => 1, 'name' => '张三', 'age' => 25]);
echo "更新用户: {$ts2}\n";
$ts3 = $logger->log('CREATE', 'order', ['id' => 100, 'user_id' => 1]);
echo "创建订单: {$ts3}\n";
$ts4 = $logger->log('UPDATE', 'order', ['id' => 100, 'status' => 'paid']);
echo "更新订单: {$ts4}\n";
$ts5 = $logger->log('DELETE', 'session', ['user_id' => 1]);
echo "删除会话: {$ts5}\n";
echo "\n=== 操作日志列表 ===\n";
$logs = $logCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($logs as $log) {
echo " [{$log->ts}] {$log->operation} {$log->entity}\n";
}
echo "\n=== 重放操作 ===\n";
$replayCount = $logger->replayFrom(new Timestamp(0, 0), function($op) {
echo " 重放: {$op->operation} {$op->entity}\n";
});
echo "共重放 {$replayCount} 条操作\n";
$logCol->drop();
$counterCol->drop();
?>运行结果:
=== 操作日志记录场景 ===
=== 记录操作 ===
创建用户: Timestamp(1709123456, 1)
更新用户: Timestamp(1709123456, 2)
创建订单: Timestamp(1709123456, 3)
更新订单: Timestamp(1709123456, 4)
删除会话: Timestamp(1709123456, 5)
=== 操作日志列表 ===
[Timestamp(1709123456, 1)] CREATE user
[Timestamp(1709123456, 2)] UPDATE user
[Timestamp(1709123456, 3)] CREATE order
[Timestamp(1709123456, 4)] UPDATE order
[Timestamp(1709123456, 5)] DELETE session
=== 重放操作 ===
重放: CREATE user
重放: UPDATE user
重放: CREATE order
重放: UPDATE order
重放: DELETE session
共重放 5 条操作5.2 数据版本控制
场景描述:使用Timestamp实现数据的版本控制和变更追踪。
使用方法:为每次数据变更分配Timestamp,支持版本回溯。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 数据版本控制场景 ===\n\n";
class VersionControlSystem
{
private $dataCollection;
private $historyCollection;
public function __construct($dataCollection, $historyCollection)
{
$this->dataCollection = $dataCollection;
$this->historyCollection = $historyCollection;
}
/**
* 创建或更新文档
*/
public function save(string $collection, string $id, array $data): Timestamp
{
$ts = new Timestamp(1, time());
// 保存历史版本
$current = $this->dataCollection->findOne(['_id' => $id]);
if ($current) {
$this->historyCollection->insertOne([
'doc_id' => $id,
'collection' => $collection,
'version' => $ts,
'data' => $current,
'operation' => 'update'
]);
}
// 更新当前版本
$data['_id'] = $id;
$data['_version'] = $ts;
$data['_updatedAt'] = new MongoDB\BSON\UTCDateTime();
$this->dataCollection->replaceOne(
['_id' => $id],
$data,
['upsert' => true]
);
return $ts;
}
/**
* 获取指定版本的数据
*/
public function getVersion(string $id, Timestamp $version): ?array
{
// 检查是否是当前版本
$current = $this->dataCollection->findOne(['_id' => $id, '_version' => $version]);
if ($current) {
return $current->getArrayCopy();
}
// 从历史记录查找
$history = $this->historyCollection->findOne([
'doc_id' => $id,
'version' => $version
]);
return $history ? $history->data->getArrayCopy() : null;
}
/**
* 获取版本历史
*/
public function getVersionHistory(string $id): array
{
return $this->historyCollection->find(
['doc_id' => $id],
['sort' => ['version' => -1]]
)->toArray();
}
/**
* 回滚到指定版本
*/
public function rollback(string $id, Timestamp $version): bool
{
$oldData = $this->getVersion($id, $version);
if (!$oldData) {
return false;
}
// 保存当前版本到历史
$current = $this->dataCollection->findOne(['_id' => $id]);
if ($current) {
$this->historyCollection->insertOne([
'doc_id' => $id,
'version' => new Timestamp(1, time()),
'data' => $current,
'operation' => 'rollback'
]);
}
// 恢复旧版本
unset($oldData['_id']);
$oldData['_rolledBack'] = true;
$this->dataCollection->replaceOne(['_id' => $id], $oldData);
return true;
}
}
$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->versioned_data;
$historyCol = $client->test->version_history;
$dataCol->drop();
$historyCol->drop();
$vc = new VersionControlSystem($dataCol, $historyCol);
echo "=== 创建和更新文档 ===\n";
$v1 = $vc->save('documents', 'doc1', ['title' => '初稿', 'content' => '这是初稿内容']);
echo "版本1: {$v1} - 初稿\n";
sleep(1);
$v2 = $vc->save('documents', 'doc1', ['title' => '修改稿', 'content' => '这是修改后的内容']);
echo "版本2: {$v2} - 修改稿\n";
sleep(1);
$v3 = $vc->save('documents', 'doc1', ['title' => '定稿', 'content' => '这是最终版本']);
echo "版本3: {$v3} - 定稿\n";
echo "\n=== 版本历史 ===\n";
$history = $vc->getVersionHistory('doc1');
foreach ($history as $h) {
echo " [{$h->version}] {$h->data->title}\n";
}
echo "\n=== 获取历史版本 ===\n";
$oldVersion = $vc->getVersion('doc1', $v1);
if ($oldVersion) {
echo "版本1内容: {$oldVersion['title']} - {$oldVersion['content']}\n";
}
echo "\n=== 回滚到版本1 ===\n";
$vc->rollback('doc1', $v1);
$current = $dataCol->findOne(['_id' => 'doc1']);
echo "当前版本: {$current->title} - {$current->content}\n";
$dataCol->drop();
$historyCol->drop();
?>运行结果:
=== 数据版本控制场景 ===
=== 创建和更新文档 ===
版本1: Timestamp(1709123456, 1) - 初稿
版本2: Timestamp(1709123457, 1) - 修改稿
版本3: Timestamp(1709123458, 1) - 定稿
=== 版本历史 ===
[Timestamp(1709123458, 1)] 定稿
[Timestamp(1709123457, 1)] 修改稿
=== 获取历史版本 ===
版本1内容: 初稿 - 这是初稿内容
=== 回滚到版本1 ===
当前版本: 初稿 - 这是初稿内容5.3 分布式事件排序
场景描述:在分布式系统中,使用Timestamp对跨节点的事件进行全局排序。
使用方法:各节点生成Timestamp,通过比较实现全局有序。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 分布式事件排序场景 ===\n\n";
class DistributedEventSystem
{
private $eventCollection;
private $nodeId;
public function __construct($eventCollection, string $nodeId)
{
$this->eventCollection = $eventCollection;
$this->nodeId = $nodeId;
}
/**
* 发布事件
*/
public function publishEvent(string $type, array $payload): Timestamp
{
$now = time();
// 使用节点ID作为计数器的一部分,确保不同节点生成不同的计数器
$counter = $this->getNextCounter($now);
$ts = new Timestamp($counter, $now);
$event = [
'ts' => $ts,
'node_id' => $this->nodeId,
'type' => $type,
'payload' => $payload,
'created_at' => new MongoDB\BSON\UTCDateTime()
];
$this->eventCollection->insertOne($event);
return $ts;
}
/**
* 获取下一个计数器
*/
private function getNextCounter(int $second): int
{
$result = $this->eventCollection->findOneAndUpdate(
['second' => $second, 'node' => $this->nodeId],
['$inc' => ['counter' => 1]],
[
'upsert' => true,
'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
]
);
// 将节点ID编码到计数器中
$nodeHash = crc32($this->nodeId) % 10000;
return $result->counter * 10000 + $nodeHash;
}
/**
* 获取全局事件流
*/
public function getEventStream(?Timestamp $since = null): array
{
$query = $since ? ['ts' => ['$gt' => $since]] : [];
return $this->eventCollection->find(
$query,
['sort' => ['ts' => 1]]
)->toArray();
}
/**
* 获取指定节点的事件
*/
public function getNodeEvents(string $nodeId): array
{
return $this->eventCollection->find(
['node_id' => $nodeId],
['sort' => ['ts' => 1]]
)->toArray();
}
/**
* 合并多个节点的事件流
*/
public function mergeEventStreams(array $eventCollections): array
{
$allEvents = [];
foreach ($eventCollections as $events) {
foreach ($events as $event) {
$allEvents[] = $event;
}
}
// 按Timestamp排序
usort($allEvents, function($a, $b) {
if ($a->ts->getTimestamp() !== $b->ts->getTimestamp()) {
return $a->ts->getTimestamp() <=> $b->ts->getTimestamp();
}
return $a->ts->getIncrement() <=> $b->ts->getIncrement();
});
return $allEvents;
}
}
$client = new Client('mongodb://localhost:27017');
$eventCol = $client->test->distributed_events;
$eventCol->drop();
// 模拟三个节点
$node1 = new DistributedEventSystem($eventCol, 'node-1');
$node2 = new DistributedEventSystem($eventCol, 'node-2');
$node3 = new DistributedEventSystem($eventCol, 'node-3');
echo "=== 各节点发布事件 ===\n";
$ts1 = $node1->publishEvent('user.created', ['user_id' => 1, 'name' => '张三']);
echo "节点1: user.created - {$ts1}\n";
$ts2 = $node2->publishEvent('order.placed', ['order_id' => 100, 'user_id' => 1]);
echo "节点2: order.placed - {$ts2}\n";
$ts3 = $node1->publishEvent('user.updated', ['user_id' => 1, 'email' => 'test@example.com']);
echo "节点1: user.updated - {$ts3}\n";
$ts4 = $node3->publishEvent('payment.received', ['order_id' => 100, 'amount' => 99.99]);
echo "节点3: payment.received - {$ts4}\n";
$ts5 = $node2->publishEvent('order.shipped', ['order_id' => 100, 'tracking' => 'SF123456']);
echo "节点2: order.shipped - {$ts5}\n";
echo "\n=== 全局事件流(按时间排序)===\n";
$events = $eventCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($events as $event) {
echo " [{$event->ts}] {$event->node_id}: {$event->type}\n";
}
echo "\n=== 按节点查看事件 ===\n";
foreach (['node-1', 'node-2', 'node-3'] as $nodeId) {
$nodeEvents = $eventCol->find(['node_id' => $nodeId], ['sort' => ['ts' => 1]])->toArray();
echo "{$nodeId}: " . count($nodeEvents) . " 个事件\n";
}
$eventCol->drop();
?>运行结果:
=== 分布式事件排序场景 ===
=== 各节点发布事件 ===
节点1: user.created - Timestamp(1709123456, 10001)
节点2: order.placed - Timestamp(1709123456, 10002)
节点1: user.updated - Timestamp(1709123456, 20001)
节点3: payment.received - Timestamp(1709123456, 10003)
节点2: order.shipped - Timestamp(1709123456, 20002)
=== 全局事件流(按时间排序)===
[Timestamp(1709123456, 10001)] node-1: user.created
[Timestamp(1709123456, 10002)] node-2: order.placed
[Timestamp(1709123456, 10003)] node-3: payment.received
[Timestamp(1709123456, 20001)] node-1: user.updated
[Timestamp(1709123456, 20002)] node-2: order.shipped
=== 按节点查看事件 ===
node-1: 2 个事件
node-2: 2 个事件
node-3: 1 个事件5.4 增量数据同步
场景描述:使用Timestamp实现增量数据同步,只同步变更的数据。
使用方法:记录每次同步的Timestamp位置,下次从该位置继续同步。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 增量数据同步场景 ===\n\n";
class IncrementalSyncSystem
{
private $sourceCollection;
private $targetCollection;
private $syncStateCollection;
public function __construct($source, $target, $syncState)
{
$this->sourceCollection = $source;
$this->targetCollection = $target;
$this->syncStateCollection = $syncState;
}
/**
* 在源集合中插入数据
*/
public function insertToSource(array $data): Timestamp
{
$ts = new Timestamp(1, time());
$data['_sync_ts'] = $ts;
$data['_synced'] = false;
$this->sourceCollection->insertOne($data);
return $ts;
}
/**
* 在源集合中更新数据
*/
public function updateInSource(array $query, array $update): ?Timestamp
{
$ts = new Timestamp(1, time());
$update['$set']['_sync_ts'] = $ts;
$update['$set']['_synced'] = false;
$result = $this->sourceCollection->updateOne($query, $update);
return $result->getModifiedCount() > 0 ? $ts : null;
}
/**
* 执行增量同步
*/
public function sync(string $syncId): array
{
// 获取上次同步位置
$state = $this->syncStateCollection->findOne(['sync_id' => $syncId]);
$lastTs = $state ? $state->last_ts : new Timestamp(0, 0);
// 查找需要同步的数据
$changes = $this->sourceCollection->find(
['_sync_ts' => ['$gt' => $lastTs]],
['sort' => ['_sync_ts' => 1]]
)->toArray();
$stats = [
'synced' => 0,
'errors' => 0,
'last_ts' => $lastTs
];
foreach ($changes as $doc) {
try {
// 同步到目标集合
$syncData = $doc->getArrayCopy();
unset($syncData['_sync_ts'], $syncData['_synced']);
$this->targetCollection->replaceOne(
['_id' => $doc->_id],
$syncData,
['upsert' => true]
);
// 标记为已同步
$this->sourceCollection->updateOne(
['_id' => $doc->_id],
['$set' => ['_synced' => true]]
);
$stats['synced']++;
$stats['last_ts'] = $doc->_sync_ts;
} catch (Exception $e) {
$stats['errors']++;
}
}
// 更新同步状态
if ($stats['synced'] > 0) {
$this->syncStateCollection->updateOne(
['sync_id' => $syncId],
['$set' => ['last_ts' => $stats['last_ts'], 'synced_at' => new MongoDB\BSON\UTCDateTime()]],
['upsert' => true]
);
}
return $stats;
}
/**
* 获取同步状态
*/
public function getSyncState(string $syncId): ?array
{
$state = $this->syncStateCollection->findOne(['sync_id' => $syncId]);
return $state ? $state->getArrayCopy() : null;
}
/**
* 获取未同步的数据数量
*/
public function getPendingCount(): int
{
return $this->sourceCollection->countDocuments(['_synced' => false]);
}
}
$client = new Client('mongodb://localhost:27017');
$sourceCol = $client->test->sync_source;
$targetCol = $client->test->sync_target;
$stateCol = $client->test->sync_state;
$sourceCol->drop();
$targetCol->drop();
$stateCol->drop();
$sync = new IncrementalSyncSystem($sourceCol, $targetCol, $stateCol);
echo "=== 添加源数据 ===\n";
$sync->insertToSource(['_id' => 1, 'name' => '产品A', 'price' => 100]);
$sync->insertToSource(['_id' => 2, 'name' => '产品B', 'price' => 200]);
$sync->insertToSource(['_id' => 3, 'name' => '产品C', 'price' => 300]);
echo "源数据: 3 条\n";
echo "待同步: " . $sync->getPendingCount() . " 条\n";
echo "\n=== 第一次同步 ===\n";
$stats = $sync->sync('main_sync');
echo "同步结果: 同步 {$stats['synced']} 条, 错误 {$stats['errors']} 条\n";
echo "目标数据: " . $targetCol->countDocuments() . " 条\n";
echo "\n=== 添加更多数据 ===\n";
sleep(1);
$sync->insertToSource(['_id' => 4, 'name' => '产品D', 'price' => 400]);
$sync->updateInSource(['_id' => 1], ['$set' => ['price' => 150]]);
echo "待同步: " . $sync->getPendingCount() . " 条\n";
echo "\n=== 第二次同步 ===\n";
$stats = $sync->sync('main_sync');
echo "同步结果: 同步 {$stats['synced']} 条, 错误 {$stats['errors']} 条\n";
echo "目标数据: " . $targetCol->countDocuments() . " 条\n";
echo "\n=== 验证数据一致性 ===\n";
$sourceCount = $sourceCol->countDocuments();
$targetCount = $targetCol->countDocuments();
echo "源数据: {$sourceCount} 条\n";
echo "目标数据: {$targetCount} 条\n";
$sourceCol->drop();
$targetCol->drop();
$stateCol->drop();
?>运行结果:
=== 增量数据同步场景 ===
=== 添加源数据 ===
源数据: 3 条
待同步: 3 条
=== 第一次同步 ===
同步结果: 同步 3 条, 错误 0 条
目标数据: 3 条
=== 添加更多数据 ===
待同步: 2 条
=== 第二次同步 ===
同步结果: 同步 2 条, 错误 0 条
目标数据: 4 条
=== 验证数据一致性 ===
源数据: 4 条
目标数据: 4 条5.5 事件溯源系统
场景描述:使用Timestamp实现事件溯源,记录所有状态变更事件。
使用方法:将所有变更作为事件存储,支持状态重建和时间旅行查询。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 事件溯源系统场景 ===\n\n";
class EventSourcingSystem
{
private $eventStore;
private $snapshotStore;
public function __construct($eventStore, $snapshotStore)
{
$this->eventStore = $eventStore;
$this->snapshotStore = $snapshotStore;
}
/**
* 追加事件
*/
public function appendEvent(string $aggregateId, string $eventType, array $data): Timestamp
{
$ts = new Timestamp(1, time());
$event = [
'aggregate_id' => $aggregateId,
'sequence' => $this->getNextSequence($aggregateId),
'ts' => $ts,
'event_type' => $eventType,
'data' => $data,
'metadata' => [
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'version' => 1
]
];
$this->eventStore->insertOne($event);
return $ts;
}
/**
* 获取下一个序列号
*/
private function getNextSequence(string $aggregateId): int
{
$lastEvent = $this->eventStore->findOne(
['aggregate_id' => $aggregateId],
['sort' => ['sequence' => -1]]
);
return $lastEvent ? $lastEvent->sequence + 1 : 1;
}
/**
* 重建聚合状态
*/
public function rebuildState(string $aggregateId, ?Timestamp $upTo = null): array
{
$query = ['aggregate_id' => $aggregateId];
if ($upTo) {
$query['ts'] = ['$lte' => $upTo];
}
$events = $this->eventStore->find($query, ['sort' => ['sequence' => 1]])->toArray();
$state = [];
foreach ($events as $event) {
$state = $this->applyEvent($state, $event);
}
return $state;
}
/**
* 应用事件到状态
*/
private function applyEvent(array $state, $event): array
{
switch ($event->event_type) {
case 'created':
$state = array_merge(['id' => $event->aggregate_id], $event->data->getArrayCopy());
break;
case 'updated':
$state = array_merge($state, $event->data->getArrayCopy());
break;
case 'deleted':
$state['_deleted'] = true;
break;
}
$state['_version'] = $event->sequence;
$state['_lastEvent'] = $event->ts;
return $state;
}
/**
* 保存快照
*/
public function saveSnapshot(string $aggregateId): void
{
$state = $this->rebuildState($aggregateId);
$this->snapshotStore->replaceOne(
['aggregate_id' => $aggregateId],
[
'aggregate_id' => $aggregateId,
'state' => $state,
'version' => $state['_version'] ?? 0,
'created_at' => new MongoDB\BSON\UTCDateTime()
],
['upsert' => true]
);
}
/**
* 从快照恢复状态
*/
public function restoreFromSnapshot(string $aggregateId): ?array
{
$snapshot = $this->snapshotStore->findOne(['aggregate_id' => $aggregateId]);
if (!$snapshot) {
return null;
}
// 应用快照之后的事件
$events = $this->eventStore->find([
'aggregate_id' => $aggregateId,
'sequence' => ['$gt' => $snapshot->version]
], ['sort' => ['sequence' => 1]])->toArray();
$state = $snapshot->state->getArrayCopy();
foreach ($events as $event) {
$state = $this->applyEvent($state, $event);
}
return $state;
}
/**
* 获取事件历史
*/
public function getEventHistory(string $aggregateId): array
{
return $this->eventStore->find(
['aggregate_id' => $aggregateId],
['sort' => ['sequence' => 1]]
)->toArray();
}
}
$client = new Client('mongodb://localhost:27017');
$eventStore = $client->test->event_store;
$snapshotStore = $client->test->snapshots;
$eventStore->drop();
$snapshotStore->drop();
$es = new EventSourcingSystem($eventStore, $snapshotStore);
echo "=== 创建订单(事件溯源)===\n";
$orderId = 'order-001';
$es->appendEvent($orderId, 'created', [
'customer' => '张三',
'items' => [['product' => 'iPhone', 'qty' => 1, 'price' => 6999]]
]);
echo "事件1: 订单创建\n";
$es->appendEvent($orderId, 'updated', [
'status' => 'paid',
'paid_at' => date('Y-m-d H:i:s')
]);
echo "事件2: 订单支付\n";
$es->appendEvent($orderId, 'updated', [
'status' => 'shipped',
'tracking' => 'SF123456'
]);
echo "事件3: 订单发货\n";
echo "\n=== 事件历史 ===\n";
$history = $es->getEventHistory($orderId);
foreach ($history as $event) {
echo " [{$event->ts}] #{$event->sequence}: {$event->event_type}\n";
}
echo "\n=== 重建当前状态 ===\n";
$state = $es->rebuildState($orderId);
echo "订单ID: {$state['id']}\n";
echo "客户: {$state['customer']}\n";
echo "状态: {$state['status']}\n";
echo "版本: {$state['_version']}\n";
echo "\n=== 保存快照 ===\n";
$es->saveSnapshot($orderId);
echo "快照已保存\n";
echo "\n=== 从快照恢复 ===\n";
$restored = $es->restoreFromSnapshot($orderId);
echo "恢复状态: " . json_encode($restored, JSON_UNESCAPED_UNICODE) . "\n";
$eventStore->drop();
$snapshotStore->drop();
?>运行结果:
=== 事件溯源系统场景 ===
=== 创建订单(事件溯源)===
事件1: 订单创建
事件2: 订单支付
事件3: 订单发货
=== 事件历史 ===
[Timestamp(1709123456, 1)] #1: created
[Timestamp(1709123456, 1)] #2: updated
[Timestamp(1709123456, 1)] #3: updated
=== 重建当前状态 ===
订单ID: order-001
客户: 张三
状态: shipped
版本: 3
=== 保存快照 ===
快照已保存
=== 从快照恢复 ===
恢复状态: {"id":"order-001","customer":"张三","items":[{"product":"iPhone","qty":1,"price":6999}],"status":"shipped","tracking":"SF123456","paid_at":"2024-02-28 10:30:56","_version":3,"_lastEvent":"Timestamp(1709123456, 1)"}6. 企业级进阶应用场景
6.1 分布式事务协调
场景描述:在分布式事务中使用Timestamp作为事务标识和协调机制。
实现方案:使用Timestamp标记事务开始时间,协调各参与者的提交顺序。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 分布式事务协调 ===\n\n";
class DistributedTransactionCoordinator
{
private $transactionCollection;
private $participantCollection;
public function __construct($transactionCol, $participantCol)
{
$this->transactionCollection = $transactionCol;
$this->participantCollection = $participantCol;
}
/**
* 开始分布式事务
*/
public function begin(array $participants): Timestamp
{
$ts = new Timestamp(1, time());
$transaction = [
'tx_id' => $ts,
'status' => 'pending',
'participants' => $participants,
'prepared' => [],
'created_at' => new MongoDB\BSON\UTCDateTime()
];
$this->transactionCollection->insertOne($transaction);
return $ts;
}
/**
* 参与者准备
*/
public function prepare(Timestamp $txId, string $participantId, bool $ready): bool
{
$transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
if (!$transaction || $transaction->status !== 'pending') {
return false;
}
$this->participantCollection->insertOne([
'tx_id' => $txId,
'participant_id' => $participantId,
'status' => $ready ? 'prepared' : 'aborted',
'prepared_at' => new MongoDB\BSON\UTCDateTime()
]);
if ($ready) {
$this->transactionCollection->updateOne(
['tx_id' => $txId],
['$push' => ['prepared' => $participantId]]
);
} else {
$this->transactionCollection->updateOne(
['tx_id' => $txId],
['$set' => ['status' => 'aborted']]
);
}
return true;
}
/**
* 检查是否所有参与者都准备好
*/
public function canCommit(Timestamp $txId): bool
{
$transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
if (!$transaction || $transaction->status !== 'pending') {
return false;
}
$allParticipants = $transaction->participants;
$prepared = $transaction->prepared;
return count(array_diff($allParticipants, $prepared)) === 0;
}
/**
* 提交事务
*/
public function commit(Timestamp $txId): bool
{
if (!$this->canCommit($txId)) {
return false;
}
$this->transactionCollection->updateOne(
['tx_id' => $txId],
['$set' => ['status' => 'committed', 'committed_at' => new MongoDB\BSON\UTCDateTime()]]
);
return true;
}
/**
* 回滚事务
*/
public function rollback(Timestamp $txId): bool
{
$this->transactionCollection->updateOne(
['tx_id' => $txId],
['$set' => ['status' => 'rolledback', 'rolledback_at' => new MongoDB\BSON\UTCDateTime()]]
);
return true;
}
/**
* 获取事务状态
*/
public function getStatus(Timestamp $txId): ?array
{
$transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
return $transaction ? $transaction->getArrayCopy() : null;
}
}
$client = new Client('mongodb://localhost:27017');
$txCol = $client->test->transactions;
$partCol = $client->test->participants;
$txCol->drop();
$partCol->drop();
$coordinator = new DistributedTransactionCoordinator($txCol, $partCol);
echo "=== 开始分布式事务 ===\n";
$txId = $coordinator->begin(['inventory', 'payment', 'shipping']);
echo "事务ID: {$txId}\n";
echo "\n=== 参与者准备阶段 ===\n";
$coordinator->prepare($txId, 'inventory', true);
echo "inventory: 准备完成\n";
$coordinator->prepare($txId, 'payment', true);
echo "payment: 准备完成\n";
$coordinator->prepare($txId, 'shipping', true);
echo "shipping: 准备完成\n";
echo "\n=== 检查提交条件 ===\n";
$canCommit = $coordinator->canCommit($txId);
echo "可以提交: " . ($canCommit ? '是' : '否') . "\n";
echo "\n=== 提交事务 ===\n";
$committed = $coordinator->commit($txId);
echo "提交结果: " . ($committed ? '成功' : '失败') . "\n";
echo "\n=== 事务状态 ===\n";
$status = $coordinator->getStatus($txId);
echo "状态: {$status['status']}\n";
echo "参与者: " . implode(', ', $status['participants']) . "\n";
$txCol->drop();
$partCol->drop();
?>运行结果:
=== 分布式事务协调 ===
=== 开始分布式事务 ===
事务ID: Timestamp(1709123456, 1)
=== 参与者准备阶段 ===
inventory: 准备完成
payment: 准备完成
shipping: 准备完成
=== 检查提交条件 ===
可以提交: 是
=== 提交事务 ===
提交结果: 成功
=== 事务状态 ===
状态: committed
参与者: inventory, payment, shipping6.2 实时数据变更追踪
场景描述:构建实时数据变更追踪系统,监控数据变化并触发相应操作。
实现方案:使用Timestamp标记变更时间,实现变更流追踪。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 实时数据变更追踪 ===\n\n";
class ChangeTracker
{
private $dataCollection;
private $changeLogCollection;
private $watcherCollection;
public function __construct($dataCol, $changeLogCol, $watcherCol)
{
$this->dataCollection = $dataCol;
$this->changeLogCollection = $changeLogCol;
$this->watcherCollection = $watcherCol;
}
/**
* 插入数据并记录变更
*/
public function insert(array $document): Timestamp
{
$ts = new Timestamp(1, time());
$document['_change_ts'] = $ts;
$this->dataCollection->insertOne($document);
$this->logChange('insert', $document['_id'], null, $document, $ts);
$this->notifyWatchers('insert', $document, $ts);
return $ts;
}
/**
* 更新数据并记录变更
*/
public function update(array $query, array $update): ?Timestamp
{
$before = $this->dataCollection->findOne($query);
if (!$before) {
return null;
}
$ts = new Timestamp(1, time());
$update['$set']['_change_ts'] = $ts;
$this->dataCollection->updateOne($query, $update);
$after = $this->dataCollection->findOne($query);
$this->logChange('update', $before->_id, $before, $after, $ts);
$this->notifyWatchers('update', $after, $ts);
return $ts;
}
/**
* 删除数据并记录变更
*/
public function delete(array $query): ?Timestamp
{
$before = $this->dataCollection->findOne($query);
if (!$before) {
return null;
}
$ts = new Timestamp(1, time());
$this->dataCollection->deleteOne($query);
$this->logChange('delete', $before->_id, $before, null, $ts);
$this->notifyWatchers('delete', $before, $ts);
return $ts;
}
/**
* 记录变更日志
*/
private function logChange(string $operation, $documentId, $before, $after, Timestamp $ts): void
{
$this->changeLogCollection->insertOne([
'ts' => $ts,
'operation' => $operation,
'document_id' => $documentId,
'before' => $before,
'after' => $after,
'timestamp' => new MongoDB\BSON\UTCDateTime()
]);
}
/**
* 注册监听器
*/
public function registerWatcher(string $name, array $options, callable $callback): void
{
$this->watcherCollection->insertOne([
'name' => $name,
'options' => $options,
'last_processed' => new Timestamp(0, 0),
'callback' => $callback
]);
}
/**
* 通知监听器
*/
private function notifyWatchers(string $operation, $document, Timestamp $ts): void
{
$watchers = $this->watcherCollection->find()->toArray();
foreach ($watchers as $watcher) {
// 检查是否匹配监听条件
if ($this->matchesWatchOptions($operation, $document, $watcher->options)) {
echo " 通知监听器 [{$watcher->name}]: {$operation}\n";
}
}
}
/**
* 检查是否匹配监听条件
*/
private function matchesWatchOptions(string $operation, $document, array $options): bool
{
if (isset($options['operations']) && !in_array($operation, $options['operations'])) {
return false;
}
return true;
}
/**
* 获取变更历史
*/
public function getChangeHistory(?Timestamp $since = null): array
{
$query = $since ? ['ts' => ['$gt' => $since]] : [];
return $this->changeLogCollection->find($query, ['sort' => ['ts' => 1]])->toArray();
}
}
$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->tracked_data;
$changeLogCol = $client->test->change_log;
$watcherCol = $client->test->watchers;
$dataCol->drop();
$changeLogCol->drop();
$watcherCol->drop();
$tracker = new ChangeTracker($dataCol, $changeLogCol, $watcherCol);
// 注册监听器
$tracker->registerWatcher('audit', ['operations' => ['insert', 'update', 'delete']], function($change) {
echo "审计日志: {$change['operation']}\n";
});
$tracker->registerWatcher('sync', ['operations' => ['insert', 'update']], function($change) {
echo "同步触发: {$change['operation']}\n";
});
echo "=== 数据操作 ===\n";
echo "插入数据:\n";
$ts1 = $tracker->insert(['_id' => 1, 'name' => '产品A', 'price' => 100]);
echo "\n更新数据:\n";
$ts2 = $tracker->update(['_id' => 1], ['$set' => ['price' => 150]]);
echo "\n删除数据:\n";
$ts3 = $tracker->delete(['_id' => 1]);
echo "\n=== 变更历史 ===\n";
$history = $tracker->getChangeHistory();
foreach ($history as $change) {
echo " [{$change->ts}] {$change->operation} - 文档ID: {$change->document_id}\n";
}
$dataCol->drop();
$changeLogCol->drop();
$watcherCol->drop();
?>运行结果:
=== 实时数据变更追踪 ===
=== 数据操作 ===
插入数据:
通知监听器 [audit]: insert
通知监听器 [sync]: insert
更新数据:
通知监听器 [audit]: update
通知监听器 [sync]: update
删除数据:
通知监听器 [audit]: delete
=== 变更历史 ===
[Timestamp(1709123456, 1)] insert - 文档ID: 1
[Timestamp(1709123456, 1)] update - 文档ID: 1
[Timestamp(1709123456, 1)] delete - 文档ID: 16.3 多租户数据隔离
场景描述:在多租户系统中使用Timestamp实现租户数据的版本控制和隔离。
实现方案:为每个租户维护独立的Timestamp序列,支持租户级别的数据恢复。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 多租户数据隔离 ===\n\n";
class MultiTenantDataManager
{
private $dataCollection;
private $versionCollection;
public function __construct($dataCol, $versionCol)
{
$this->dataCollection = $dataCol;
$this->versionCollection = $versionCol;
}
/**
* 创建租户数据
*/
public function create(string $tenantId, string $resourceType, array $data): Timestamp
{
$ts = $this->generateTenantTimestamp($tenantId);
$document = [
'tenant_id' => $tenantId,
'resource_type' => $resourceType,
'data' => $data,
'version' => $ts,
'created_at' => new MongoDB\BSON\UTCDateTime()
];
$this->dataCollection->insertOne($document);
$this->recordVersion($tenantId, $ts, 'create', $document);
return $ts;
}
/**
* 更新租户数据
*/
public function update(string $tenantId, string $resourceType, array $query, array $update): ?Timestamp
{
$query['tenant_id'] = $tenantId;
$query['resource_type'] = $resourceType;
$existing = $this->dataCollection->findOne($query);
if (!$existing) {
return null;
}
$ts = $this->generateTenantTimestamp($tenantId);
$update['$set']['version'] = $ts;
$update['$set']['updated_at'] = new MongoDB\BSON\UTCDateTime();
$this->dataCollection->updateOne($query, $update);
$updated = $this->dataCollection->findOne($query);
$this->recordVersion($tenantId, $ts, 'update', $updated);
return $ts;
}
/**
* 查询租户数据
*/
public function query(string $tenantId, string $resourceType, array $filter = []): array
{
$filter['tenant_id'] = $tenantId;
$filter['resource_type'] = $resourceType;
return $this->dataCollection->find($filter)->toArray();
}
/**
* 获取租户数据版本历史
*/
public function getVersionHistory(string $tenantId, ?Timestamp $since = null): array
{
$query = ['tenant_id' => $tenantId];
if ($since) {
$query['ts'] = ['$gt' => $since];
}
return $this->versionCollection->find($query, ['sort' => ['ts' => 1]])->toArray();
}
/**
* 生成租户专用Timestamp
*/
private function generateTenantTimestamp(string $tenantId): Timestamp
{
$now = time();
// 使用租户ID哈希作为计数器基础
$tenantHash = crc32($tenantId) % 10000;
$lastVersion = $this->versionCollection->findOne(
['tenant_id' => $tenantId, 'ts' => ['$gte' => new Timestamp(0, $now)]],
['sort' => ['ts' => -1]]
);
if ($lastVersion) {
$counter = $lastVersion->ts->getIncrement() + 1;
} else {
$counter = $tenantHash + 1;
}
return new Timestamp($counter, $now);
}
/**
* 记录版本
*/
private function recordVersion(string $tenantId, Timestamp $ts, string $operation, $data): void
{
$this->versionCollection->insertOne([
'tenant_id' => $tenantId,
'ts' => $ts,
'operation' => $operation,
'data' => $data,
'recorded_at' => new MongoDB\BSON\UTCDateTime()
]);
}
/**
* 获取租户统计
*/
public function getTenantStats(string $tenantId): array
{
return [
'total_documents' => $this->dataCollection->countDocuments(['tenant_id' => $tenantId]),
'total_versions' => $this->versionCollection->countDocuments(['tenant_id' => $tenantId]),
'resource_types' => $this->dataCollection->distinct('resource_type', ['tenant_id' => $tenantId])
];
}
}
$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->tenant_data;
$versionCol = $client->test->tenant_versions;
$dataCol->drop();
$versionCol->drop();
$manager = new MultiTenantDataManager($dataCol, $versionCol);
echo "=== 租户A操作 ===\n";
$manager->create('tenant-a', 'user', ['name' => '张三', 'email' => 'zhangsan@example.com']);
$manager->create('tenant-a', 'user', ['name' => '李四', 'email' => 'lisi@example.com']);
$manager->create('tenant-a', 'order', ['order_no' => 'A001', 'amount' => 100]);
echo "=== 租户B操作 ===\n";
$manager->create('tenant-b', 'user', ['name' => '王五', 'email' => 'wangwu@example.com']);
$manager->create('tenant-b', 'order', ['order_no' => 'B001', 'amount' => 200]);
echo "\n=== 租户A数据查询 ===\n";
$users = $manager->query('tenant-a', 'user');
echo "用户数: " . count($users) . "\n";
foreach ($users as $user) {
echo " - {$user->data['name']}\n";
}
echo "\n=== 租户B数据查询 ===\n";
$orders = $manager->query('tenant-b', 'order');
echo "订单数: " . count($orders) . "\n";
echo "\n=== 租户统计 ===\n";
$statsA = $manager->getTenantStats('tenant-a');
echo "租户A: 文档 {$statsA['total_documents']} 个, 版本 {$statsA['total_versions']} 个\n";
$statsB = $manager->getTenantStats('tenant-b');
echo "租户B: 文档 {$statsB['total_documents']} 个, 版本 {$statsB['total_versions']} 个\n";
$dataCol->drop();
$versionCol->drop();
?>运行结果:
=== 多租户数据隔离 ===
=== 租户A操作 ===
=== 租户B操作 ===
=== 租户A数据查询 ===
用户数: 2
- 张三
- 李四
=== 租户B数据查询 ===
订单数: 1
=== 租户统计 ===
租户A: 文档 3 个, 版本 3 个
租户B: 文档 2 个, 版本 2 个6.4 数据审计与合规
场景描述:构建数据审计系统,满足合规要求,记录所有数据访问和修改。
实现方案:使用Timestamp精确记录操作时间,支持审计追溯。
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
echo "=== 数据审计与合规 ===\n\n";
class AuditSystem
{
private $auditLogCollection;
private $dataCollection;
public function __construct($auditCol, $dataCol)
{
$this->auditLogCollection = $auditCol;
$this->dataCollection = $dataCol;
}
/**
* 记录审计日志
*/
private function logAudit(
string $action,
string $resource,
$resourceId,
?array $before,
?array $after,
array $context
): Timestamp {
$ts = new Timestamp(1, time());
$auditEntry = [
'ts' => $ts,
'action' => $action,
'resource' => $resource,
'resource_id' => $resourceId,
'before' => $before,
'after' => $after,
'user_id' => $context['user_id'] ?? 'system',
'ip_address' => $context['ip'] ?? '127.0.0.1',
'user_agent' => $context['user_agent'] ?? '',
'timestamp' => new MongoDB\BSON\UTCDateTime(),
'compliance_tags' => $context['tags'] ?? []
];
$this->auditLogCollection->insertOne($auditEntry);
return $ts;
}
/**
* 创建数据(带审计)
*/
public function create(string $resource, array $data, array $context): array
{
$ts = $this->logAudit('create', $resource, null, null, $data, $context);
$data['_audit_ts'] = $ts;
$data['_created_by'] = $context['user_id'] ?? 'system';
$result = $this->dataCollection->insertOne($data);
return [
'id' => $result->getInsertedId(),
'audit_ts' => $ts
];
}
/**
* 更新数据(带审计)
*/
public function update(string $resource, $id, array $update, array $context): ?Timestamp
{
$before = $this->dataCollection->findOne(['_id' => $id]);
if (!$before) {
return null;
}
$this->dataCollection->updateOne(['_id' => $id], ['$set' => $update]);
$after = $this->dataCollection->findOne(['_id' => $id]);
return $this->logAudit('update', $resource, $id, $before->getArrayCopy(), $after->getArrayCopy(), $context);
}
/**
* 删除数据(带审计)
*/
public function delete(string $resource, $id, array $context): ?Timestamp
{
$before = $this->dataCollection->findOne(['_id' => $id]);
if (!$before) {
return null;
}
$this->dataCollection->deleteOne(['_id' => $id]);
return $this->logAudit('delete', $resource, $id, $before->getArrayCopy(), null, $context);
}
/**
* 查询审计日志
*/
public function queryAuditLogs(array $filters = []): array
{
return $this->auditLogCollection->find($filters, ['sort' => ['ts' => -1]])->toArray();
}
/**
* 获取资源的审计历史
*/
public function getResourceHistory(string $resource, $resourceId): array
{
return $this->auditLogCollection->find(
['resource' => $resource, 'resource_id' => $resourceId],
['sort' => ['ts' => 1]]
)->toArray();
}
/**
* 生成合规报告
*/
public function generateComplianceReport(int $startTime, int $endTime): array
{
$logs = $this->auditLogCollection->find([
'ts' => [
'$gte' => new Timestamp(0, $startTime),
'$lt' => new Timestamp(0, $endTime + 1)
]
])->toArray();
$report = [
'period' => [
'start' => date('Y-m-d H:i:s', $startTime),
'end' => date('Y-m-d H:i:s', $endTime)
],
'total_operations' => count($logs),
'by_action' => [],
'by_user' => [],
'by_resource' => []
];
foreach ($logs as $log) {
$action = $log->action;
$user = $log->user_id;
$resource = $log->resource;
$report['by_action'][$action] = ($report['by_action'][$action] ?? 0) + 1;
$report['by_user'][$user] = ($report['by_user'][$user] ?? 0) + 1;
$report['by_resource'][$resource] = ($report['by_resource'][$resource] ?? 0) + 1;
}
return $report;
}
}
$client = new Client('mongodb://localhost:27017');
$auditCol = $client->test->audit_logs;
$dataCol = $client->test->audited_data;
$auditCol->drop();
$dataCol->drop();
$audit = new AuditSystem($auditCol, $dataCol);
echo "=== 数据操作(带审计)===\n";
$context = ['user_id' => 'user-001', 'ip' => '192.168.1.100', 'tags' => ['gdpr']];
$result = $audit->create('customer', ['name' => '张三', 'email' => 'zhangsan@example.com'], $context);
echo "创建客户: 审计时间戳 {$result['audit_ts']}\n";
$audit->update('customer', $result['id'], ['email' => 'zhangsan_new@example.com'], $context);
echo "更新客户邮箱\n";
$context2 = ['user_id' => 'user-002', 'ip' => '192.168.1.101'];
$audit->update('customer', $result['id'], ['name' => '张三丰'], $context2);
echo "更新客户姓名\n";
echo "\n=== 审计日志 ===\n";
$logs = $audit->queryAuditLogs();
foreach ($logs as $log) {
echo " [{$log->ts}] {$log->action} by {$log->user_id}\n";
}
echo "\n=== 合规报告 ===\n";
$report = $audit->generateComplianceReport(time() - 3600, time() + 3600);
echo "时间范围: {$report['period']['start']} - {$report['period']['end']}\n";
echo "总操作数: {$report['total_operations']}\n";
echo "按操作类型:\n";
foreach ($report['by_action'] as $action => $count) {
echo " - {$action}: {$count}\n";
}
echo "按用户:\n";
foreach ($report['by_user'] as $user => $count) {
echo " - {$user}: {$count}\n";
}
$auditCol->drop();
$dataCol->drop();
?>运行结果:
=== 数据审计与合规 ===
=== 数据操作(带审计)===
创建客户: 审计时间戳 Timestamp(1709123456, 1)
更新客户邮箱
更新客户姓名
=== 审计日志 ===
[Timestamp(1709123456, 1)] update by user-002
[Timestamp(1709123456, 1)] update by user-001
[Timestamp(1709123456, 1)] create by user-001
=== 合规报告 ===
时间范围: 2024-02-28 09:30:56 - 2024-02-28 11:30:56
总操作数: 3
按操作类型:
- update: 2
- create: 1
按用户:
- user-002: 1
- user-001: 27. 行业最佳实践
7.1 类型选择实践
实践内容:正确选择Timestamp和Date类型的使用场景。
推荐理由:两种类型用途不同,错误选择会导致问题。
php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;
echo "=== 类型选择最佳实践 ===\n\n";
echo "使用Timestamp的场景:\n";
echo "1. 复制集操作日志(oplog)记录\n";
echo "2. 分布式事务协调\n";
echo "3. 操作顺序保证\n";
echo "4. 数据版本控制\n";
echo "5. 增量同步标记\n\n";
echo "使用Date(UTCDateTime)的场景:\n";
echo "1. 用户可见的时间显示\n";
echo "2. 业务时间记录(创建时间、更新时间)\n";
echo "3. 日程安排、过期时间\n";
echo "4. 时间范围查询\n";
echo "5. 时区敏感的应用\n";
?>7.2 性能优化实践
实践内容:优化Timestamp相关操作的性能。
推荐理由:Timestamp操作可能影响系统性能。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== 性能优化最佳实践 ===\n\n";
echo "1. 使用索引优化Timestamp查询\n";
echo " db.collection.createIndex({ts: 1})\n\n";
echo "2. 避免频繁生成Timestamp\n";
echo " - 批量操作时复用同一秒的计数器\n";
echo " - 使用原子操作避免锁竞争\n\n";
echo "3. 合理设置TTL索引\n";
echo " db.collection.createIndex({ts: 1}, {expireAfterSeconds: 86400})\n\n";
echo "4. 使用快照减少重建开销\n";
echo " - 定期保存状态快照\n";
echo " - 从快照开始应用后续事件\n";
?>7.3 安全性实践
实践内容:确保Timestamp操作的安全性。
推荐理由:Timestamp可能被篡改,需要保护。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== 安全性最佳实践 ===\n\n";
echo "1. 验证Timestamp有效性\n";
echo " - 检查时间戳范围\n";
echo " - 验证计数器合理性\n\n";
echo "2. 防止Timestamp伪造\n";
echo " - 使用服务端生成\n";
echo " - 避免客户端传入\n\n";
echo "3. 保护操作日志\n";
echo " - 限制删除权限\n";
echo " - 定期备份\n\n";
echo "4. 监控异常Timestamp\n";
echo " - 检测时间跳跃\n";
echo " - 监控计数器异常\n";
?>7.4 运维监控实践
实践内容:建立Timestamp相关的运维监控体系。
推荐理由:Timestamp异常可能导致数据不一致。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== 运维监控最佳实践 ===\n\n";
echo "1. 监控指标:\n";
echo " - Timestamp生成速率\n";
echo " - 计数器使用率\n";
echo " - 时间跳跃检测\n\n";
echo "2. 告警规则:\n";
echo " - 计数器接近溢出\n";
echo " - 时间戳跳跃超过阈值\n";
echo " - 生成速率异常\n\n";
echo "3. 定期检查:\n";
echo " - Oplog大小和增长\n";
echo " - 同步延迟\n";
echo " - 版本一致性\n";
?>8. 常见问题答疑(FAQ)
问题1:Timestamp和Date有什么区别?
问题描述:两种时间类型有什么不同,应该如何选择?
回答内容:
主要区别:
- 精度:Timestamp精确到秒,Date精确到毫秒
- 用途:Timestamp用于内部机制,Date用于业务时间
- 结构:Timestamp包含计数器,Date是单一时间值
- 排序:Timestamp天然支持操作顺序,Date只表示时间点
php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;
echo "=== Timestamp vs Date ===\n\n";
$ts = new Timestamp(1, time());
$date = new UTCDateTime();
echo "Timestamp:\n";
echo " 类型: " . get_class($ts) . "\n";
echo " 值: {$ts}\n";
echo " 时间戳: " . $ts->getTimestamp() . "\n";
echo " 计数器: " . $ts->getIncrement() . "\n\n";
echo "Date:\n";
echo " 类型: " . get_class($date) . "\n";
echo " 值: {$date}\n";
echo " 毫秒时间戳: " . $date->toDateTime()->format('Y-m-d H:i:s.v') . "\n";
?>问题2:如何处理Timestamp溢出?
问题描述:计数器是32位整数,会溢出吗?
回答内容:
32位无符号整数最大值为4294967295,正常使用不会溢出。但高并发场景需要考虑。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== Timestamp溢出处理 ===\n\n";
$maxIncrement = 4294967295;
echo "计数器最大值: {$maxIncrement}\n";
echo "对应日期: " . date('Y-m-d H:i:s', $maxIncrement) . "\n\n";
echo "溢出处理策略:\n";
echo "1. 检测计数器接近最大值时等待下一秒\n";
echo "2. 使用原子操作保证计数器递增\n";
echo "3. 分布式环境下使用节点ID编码\n";
?>问题3:Timestamp可以用于业务时间吗?
问题描述:能否用Timestamp记录用户操作时间?
回答内容:
不推荐。Timestamp主要用于内部机制,业务时间应使用Date类型。
php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;
echo "=== 业务时间类型选择 ===\n\n";
echo "错误示例 - 使用Timestamp:\n";
echo " 问题: 精度只有秒级,无法记录毫秒\n";
echo " 问题: 计数器对业务无意义\n\n";
echo "正确示例 - 使用Date:\n";
$date = new UTCDateTime();
echo " 创建时间: " . $date->toDateTime()->format('Y-m-d H:i:s.v') . "\n";
?>问题4:如何实现Timestamp的时间旅行查询?
问题描述:能否查询某个时间点的数据状态?
回答内容:
可以通过事件溯源模式实现,记录每个变更的Timestamp。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== 时间旅行查询 ===\n\n";
echo "实现方式:\n";
echo "1. 记录每次数据变更的事件\n";
echo "2. 使用Timestamp标记事件时间\n";
echo "3. 重放到指定Timestamp的事件\n";
echo "4. 重建该时间点的状态\n";
?>问题5:Timestamp在分片集群中的作用?
问题描述:Timestamp在分片环境中有什么特殊用途?
回答内容:
在分片集群中,Timestamp用于chunk迁移和元数据版本控制。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== 分片集群中的Timestamp ===\n\n";
echo "主要用途:\n";
echo "1. Chunk迁移追踪\n";
echo "2. 配置服务器版本控制\n";
echo "3. 跨分片事务协调\n";
echo "4. 元数据变更记录\n";
?>问题6:如何调试Timestamp相关问题?
问题描述:遇到Timestamp相关问题如何排查?
回答内容:
可以通过多种方式调试Timestamp问题。
php
<?php
use MongoDB\BSON\Timestamp;
echo "=== Timestamp调试技巧 ===\n\n";
$ts = new Timestamp(12345, time());
echo "调试方法:\n";
echo "1. 查看Timestamp值: {$ts}\n";
echo "2. 分解组成部分:\n";
echo " - 时间戳: " . $ts->getTimestamp() . "\n";
echo " - 计数器: " . $ts->getIncrement() . "\n";
echo "3. 转换为可读时间: " . date('Y-m-d H:i:s', $ts->getTimestamp()) . "\n";
echo "4. 比较两个Timestamp的顺序\n";
?>9. 实战练习
练习1:实现简单的操作日志系统
练习描述:创建一个操作日志系统,记录所有数据操作。
解题思路:
- 定义操作日志结构
- 实现日志记录方法
- 支持按时间查询
参考代码:
php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\Client;
class SimpleOperationLog
{
private $collection;
public function __construct($collection)
{
$this->collection = $collection;
}
public function log(string $operation, array $data): Timestamp
{
$ts = new Timestamp(1, time());
$this->collection->insertOne([
'ts' => $ts,
'operation' => $operation,
'data' => $data
]);
return $ts;
}
public function getLogs(?Timestamp $since = null): array
{
$query = $since ? ['ts' => ['$gt' => $since]] : [];
return $this->collection->find($query, ['sort' => ['ts' => 1]])->toArray();
}
}
?>练习2:实现数据版本控制
练习描述:为文档实现版本控制,支持查看历史版本。
解题思路:
- 保存每次修改的版本
- 记录修改时间和内容
- 支持版本回滚
参考代码:
php
<?php
use MongoDB\BSON\Timestamp;
class DocumentVersionControl
{
private $dataCollection;
private $historyCollection;
public function save(string $id, array $data): Timestamp
{
$ts = new Timestamp(1, time());
$current = $this->dataCollection->findOne(['_id' => $id]);
if ($current) {
$this->historyCollection->insertOne([
'doc_id' => $id,
'version' => $ts,
'data' => $current
]);
}
$data['_id'] = $id;
$data['_version'] = $ts;
$this->dataCollection->replaceOne(['_id' => $id], $data, ['upsert' => true]);
return $ts;
}
}
?>练习3:实现增量同步机制
练习描述:实现两个集合之间的增量同步。
解题思路:
- 记录每次同步的位置
- 查询变更数据
- 应用到目标集合
参考代码:
php
<?php
use MongoDB\BSON\Timestamp;
class IncrementalSync
{
private $source;
private $target;
private $state;
public function __construct($source, $target, $state)
{
$this->source = $source;
$this->target = $target;
$this->state = $state;
}
public function sync(string $syncId): int
{
$lastSync = $this->state->findOne(['sync_id' => $syncId]);
$lastTs = $lastSync ? $lastSync->last_ts : new Timestamp(0, 0);
$changes = $this->source->find(
['_ts' => ['$gt' => $lastTs]],
['sort' => ['_ts' => 1]]
);
$count = 0;
$maxTs = $lastTs;
foreach ($changes as $doc) {
$this->target->replaceOne(
['_id' => $doc->_id],
$doc,
['upsert' => true]
);
if ($doc->_ts > $maxTs) {
$maxTs = $doc->_ts;
}
$count++;
}
if ($count > 0) {
$this->state->replaceOne(
['sync_id' => $syncId],
['sync_id' => $syncId, 'last_ts' => $maxTs],
['upsert' => true]
);
}
return $count;
}
}
?>10. 知识点总结
核心概念回顾
| 概念 | 说明 | 重要程度 |
|---|---|---|
| BSON类型码 | 0x11,64位整数结构 | ⭐⭐⭐ |
| 存储结构 | 32位时间戳 + 32位计数器 | ⭐⭐⭐ |
| 构造函数 | new Timestamp(increment, timestamp) | ⭐⭐⭐ |
| Oplog | 复制机制的核心组件 | ⭐⭐⭐ |
| 比较规则 | 先比时间戳,再比计数器 | ⭐⭐ |
| 时区处理 | UTC存储,本地显示 | ⭐⭐ |
关键技能掌握
必须掌握:
- Timestamp的创建和基本操作
- 理解Timestamp与Date的区别
- 掌握Oplog的基本概念
- 能够实现简单的变更追踪
建议掌握:
- 事件溯源模式的应用
- 分布式事务协调机制
- 增量同步的实现
- 多租户数据隔离策略
最佳实践清单
创建Timestamp:
php
$ts = new Timestamp(1, time());查询变更:
php
$changes = $collection->find(['ts' => ['$gt' => $lastTs]]);记录操作日志:
php
$log = ['ts' => new Timestamp(1, time()), 'op' => $operation];常见错误避免
| 错误 | 正确做法 |
|---|---|
| 用于业务时间 | 使用UTCDateTime |
| 忽略计数器 | 理解计数器的作用 |
| 错误的构造顺序 | 记住increment在前 |
| 直接比较字符串 | 使用BSON比较 |
扩展学习方向
- 深入Oplog:了解MongoDB复制机制的完整实现
- Change Streams:学习MongoDB 3.6+的变更流API
- 事务处理:掌握MongoDB多文档事务
- 分片集群:了解Timestamp在分片中的作用
11. 拓展参考资料
官方文档
PHP驱动文档
相关技术文章
- Understanding MongoDB Oplog
- Event Sourcing with MongoDB
- Change Streams in MongoDB
- Time Series Collections
相关设计模式
- 事件溯源模式:通过存储事件来重建状态
- CQRS模式:命令查询职责分离
- 审计日志模式:记录所有数据变更
- 增量同步模式:基于时间戳的数据同步
相关章节
- Date类型:业务时间处理
- ObjectId类型:文档唯一标识
- Long类型:64位整数处理
- Binary类型:二进制数据存储
版本兼容性
| MongoDB版本 | 特性支持 |
|---|---|
| 4.0+ | 多文档事务 |
| 4.2+ | 分片事务 |
| 5.0+ | 增强的时序集合 |
| 6.0+ | Change Stream增强 |
