Skip to content

MongoDB Timestamp 类型详解

本知识点承接《MongoDB数据类型概述》,后续延伸至《MongoDB复制集原理》,建议学习顺序:MongoDB基础→数据类型概述→本知识点→复制集原理

1. 概述

在MongoDB数据库系统中,Timestamp(时间戳) 类型是一种特殊的64位时间值,主要用于MongoDB内部的复制和分片机制。与普通的Date类型不同,Timestamp类型具有递增特性,在分布式环境中能够保证操作的顺序性和一致性。

MongoDB的Timestamp类型由两部分组成:一个32位的时间戳(Unix时间戳秒数)和一个32位的递增计数器。这种设计使得Timestamp在复制集中能够唯一标识操作,确保主从节点之间的数据同步顺序正确。在PHP中,我们使用MongoDB\BSON\Timestamp类来创建和操作MongoDB的Timestamp对象。

Timestamp类型在实际开发中有着特定的应用场景,主要包括:复制集操作日志(oplog)记录、分片集群的chunk迁移追踪、分布式事务的时间戳标记、数据版本控制等。虽然普通应用开发中较少直接使用Timestamp类型,但理解其原理对于深入掌握MongoDB的内部机制至关重要。掌握Timestamp类型的使用,能够帮助开发者更好地理解MongoDB的复制和分片原理,在处理分布式系统问题时更加得心应手。

2. 基本概念

2.1 语法

MongoDB Timestamp类型在PHP中使用MongoDB\BSON\Timestamp类表示,其基本语法如下:

php
use MongoDB\BSON\Timestamp;

// 创建Timestamp对象的基本语法
$timestamp = new Timestamp(int $increment, int $timestamp);

// 参数说明:
// $increment: 32位递增计数器值(0到4294967295)
// $timestamp: 32位Unix时间戳秒数(0到4294967295)

Timestamp结构组成

组成部分位数取值范围说明
时间戳32位0 ~ 4294967295Unix时间戳(秒)
递增计数器32位0 ~ 4294967295同一秒内的操作序号

Timestamp与Date的区别

特性TimestampDate
类型码0x11 (17)0x09 (9)
精度秒级毫秒级
主要用途内部复制机制业务时间记录
递增特性
时区处理UTCUTC
比较语义先比较时间戳,再比较计数器直接比较时间值

2.2 语义

Timestamp类型在MongoDB中的语义主要体现在以下几个方面:

存储语义

  • Timestamp是64位整数,高32位为时间戳,低32位为递增计数器
  • 作为BSON类型的一种,可以存储在文档的任何字段中
  • 在MongoDB内部操作日志(oplog)中广泛使用

比较语义

  • Timestamp对象之间可以进行比较
  • 比较时先比较时间戳部分,时间戳相同则比较递增计数器
  • 这保证了同一秒内的操作顺序正确

排序语义

  • Timestamp天然支持按时间顺序排序
  • 在复制集中,操作按Timestamp顺序应用
php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== Timestamp基本语义演示 ===\n\n";

// 1. 创建Timestamp对象
echo "=== 创建Timestamp对象 ===\n";
$ts1 = new Timestamp(1, time());
$ts2 = new Timestamp(2, time());
$ts3 = new Timestamp(1, time() + 1);

echo "Timestamp1: " . $ts1->getTimestamp() . ":" . $ts1->getIncrement() . "\n";
echo "Timestamp2: " . $ts2->getTimestamp() . ":" . $ts2->getIncrement() . "\n";
echo "Timestamp3: " . $ts3->getTimestamp() . ":" . $ts3->getIncrement() . "\n";

// 2. Timestamp的字符串表示
echo "\n=== Timestamp的字符串表示 ===\n";
echo "Timestamp1 字符串形式: " . (string)$ts1 . "\n";
echo "Timestamp2 字符串形式: " . (string)$ts2 . "\n";

// 3. BSON序列化
echo "\n=== BSON序列化 ===\n";
$serialized = $ts1->jsonSerialize();
echo "序列化结果: " . json_encode($serialized, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";

// 4. 在MongoDB中存储和查询Timestamp
echo "\n=== 在MongoDB中存储和查询 ===\n";
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_demo;

$collection->drop();
$collection->insertMany([
    ['name' => '操作A', 'op_time' => new Timestamp(1, time() - 10)],
    ['name' => '操作B', 'op_time' => new Timestamp(2, time() - 10)],
    ['name' => '操作C', 'op_time' => new Timestamp(1, time())],
    ['name' => '操作D', 'op_time' => new Timestamp(2, time())]
]);

// 按Timestamp排序查询
$results = $collection->find([], ['sort' => ['op_time' => 1]])->toArray();
echo "按时间顺序排序:\n";
foreach ($results as $doc) {
    echo "  - {$doc->name}: " . $doc->op_time . "\n";
}

// 5. Timestamp比较
echo "\n=== Timestamp比较 ===\n";
$docs = $collection->find(['op_time' => ['$gt' => $ts1]])->toArray();
echo "大于Timestamp1的记录: " . count($docs) . " 条\n";

$collection->drop();

?>

运行结果

=== Timestamp基本语义演示 ===

=== 创建Timestamp对象 ===
Timestamp1: 1709123456:1
Timestamp2: 1709123456:2
Timestamp3: 1709123457:1

=== Timestamp的字符串表示 ===
Timestamp1 字符串形式: Timestamp(1709123456, 1)
Timestamp2 字符串形式: Timestamp(1709123456, 2)

=== BSON序列化 ===
序列化结果: {
    "$timestamp": {
        "t": 1709123456,
        "i": 1
    }
}

=== 在MongoDB中存储和查询 ===
按时间顺序排序:
  - 操作A: Timestamp(1709123446, 1)
  - 操作B: Timestamp(1709123446, 2)
  - 操作C: Timestamp(1709123456, 1)
  - 操作D: Timestamp(1709123456, 2)

=== Timestamp比较 ===
大于Timestamp1的记录: 3 条

2.3 规范

在使用MongoDB Timestamp类型时,应遵循以下规范:

使用场景规范

  • Timestamp主要用于内部复制机制,普通业务场景应使用Date类型
  • 需要保证分布式环境中操作顺序时考虑使用Timestamp
  • 不要将Timestamp用于用户可见的时间显示

命名规范

  • 存储Timestamp的字段名应具有描述性,如opTimeoperationTime
  • 避免与Date类型字段混淆

值范围规范

  • 时间戳部分应在Unix时间戳有效范围内
  • 递增计数器在同一秒内应保持递增
  • 避免手动构造Timestamp,优先使用系统生成

比较规范

  • 使用MongoDB的比较操作符进行Timestamp比较
  • 注意Timestamp的比较是先比较时间戳,再比较计数器
php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== Timestamp规范使用示例 ===\n\n";

class TimestampBestPractices
{
    private $collection;
    
    public function __construct()
    {
        $client = new Client('mongodb://localhost:27017');
        $this->collection = $client->test->timestamp_practice;
    }
    
    /**
     * 正确:使用系统生成的Timestamp
     */
    public function recordOperation(string $operation): void
    {
        $now = time();
        // 获取当前秒内已使用的最大计数器
        $lastOp = $this->collection->findOne(
            ['op_time' => ['$gte' => new Timestamp(0, $now)]],
            ['sort' => ['op_time' => -1]]
        );
        
        $increment = $lastOp ? $lastOp->op_time->getIncrement() + 1 : 1;
        
        $this->collection->insertOne([
            'operation' => $operation,
            'op_time' => new Timestamp($increment, $now),
            'created_at' => new MongoDB\BSON\UTCDateTime()
        ]);
    }
    
    /**
     * 正确:使用Timestamp进行范围查询
     */
    public function getOperationsAfter(Timestamp $since): array
    {
        return $this->collection->find([
            'op_time' => ['$gt' => $since]
        ], ['sort' => ['op_time' => 1]])->toArray();
    }
    
    /**
     * 正确:Timestamp与Date配合使用
     */
    public function getOperationsInTimeRange(int $startTimestamp, int $endTimestamp): array
    {
        return $this->collection->find([
            'op_time' => [
                '$gte' => new Timestamp(0, $startTimestamp),
                '$lt' => new Timestamp(0, $endTimestamp + 1)
            ]
        ], ['sort' => ['op_time' => 1]])->toArray();
    }
    
    /**
     * 验证Timestamp有效性
     */
    public function isValidTimestamp(int $increment, int $timestamp): bool
    {
        // 检查时间戳范围(1970年到2106年)
        if ($timestamp < 0 || $timestamp > 4294967295) {
            return false;
        }
        
        // 检查计数器范围
        if ($increment < 0 || $increment > 4294967295) {
            return false;
        }
        
        return true;
    }
    
    /**
     * 安全创建Timestamp
     */
    public function createSafeTimestamp(int $increment, int $timestamp): ?Timestamp
    {
        if (!$this->isValidTimestamp($increment, $timestamp)) {
            return null;
        }
        
        return new Timestamp($increment, $timestamp);
    }
    
    /**
     * 格式化Timestamp为可读字符串
     */
    public function formatTimestamp(Timestamp $ts): string
    {
        $date = date('Y-m-d H:i:s', $ts->getTimestamp());
        return "{$date} (序号: {$ts->getIncrement()})";
    }
}

$practice = new TimestampBestPractices();

// 清空集合
$client = new Client('mongodb://localhost:27017');
$client->test->timestamp_practice->drop();

// 记录操作
echo "=== 记录操作 ===\n";
$practice->recordOperation('创建用户');
$practice->recordOperation('更新配置');
$practice->recordOperation('删除缓存');

// 查询操作
echo "\n=== 查询操作记录 ===\n";
$operations = $client->test->timestamp_practice->find([], ['sort' => ['op_time' => 1]])->toArray();
foreach ($operations as $op) {
    echo "  - {$op->operation}: " . $practice->formatTimestamp($op->op_time) . "\n";
}

// 验证Timestamp
echo "\n=== 验证Timestamp ===\n";
$validTests = [
    [1, time(), true],
    [0, 0, true],
    [-1, time(), false],
    [1, -1, false],
];

foreach ($validTests as $test) {
    $result = $practice->isValidTimestamp($test[0], $test[1]);
    $status = $result === $test[2] ? '✓' : '✗';
    echo "  {$status} increment={$test[0]}, timestamp={$test[1]} => " . ($result ? '有效' : '无效') . "\n";
}

$client->test->timestamp_practice->drop();

?>

运行结果

=== Timestamp规范使用示例 ===

=== 记录操作 ===

=== 查询操作记录 ===
  - 创建用户: 2024-02-28 10:30:56 (序号: 1)
  - 更新配置: 2024-02-28 10:30:56 (序号: 2)
  - 删除缓存: 2024-02-28 10:30:56 (序号: 3)

=== 验证Timestamp ===
  ✓ increment=1, timestamp=1709123456 => 有效
  ✓ increment=0, timestamp=0 => 有效
  ✓ increment=-1, timestamp=1709123456 => 无效
  ✓ increment=1, timestamp=-1 => 无效

3. 原理深度解析

3.1 BSON存储机制

MongoDB Timestamp类型在BSON中的存储采用特定的64位整数格式。理解这一机制有助于我们更好地使用Timestamp类型。

BSON类型编码

  • Timestamp在BSON中的类型码为0x11(十进制17)
  • 存储为64位整数,但解释为两个32位部分

存储结构图

┌────────────────────────────────────────────────────────────────┐
│                    BSON Timestamp 存储结构                      │
├────────────────────────────────────────────────────────────────┤
│                    64位整数(8字节)                            │
├─────────────────────────────┬──────────────────────────────────┤
│      时间戳(高32位)        │      递增计数器(低32位)        │
│        4 字节               │           4 字节                 │
│    Unix时间戳秒数           │       同一秒内操作序号           │
└─────────────────────────────┴──────────────────────────────────┘
php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\BSON\fromPHP;
use MongoDB\BSON\toPHP;

echo "=== Timestamp BSON存储机制解析 ===\n\n";

// 1. 创建Timestamp对象
$timestamp = new Timestamp(12345, 1709123456);

echo "原始Timestamp对象:\n";
echo "  时间戳: " . $timestamp->getTimestamp() . "\n";
echo "  计数器: " . $timestamp->getIncrement() . "\n";
echo "  字符串形式: " . (string)$timestamp . "\n\n";

// 2. 序列化为BSON
$document = ['ts' => $timestamp];
$bson = fromPHP($document);

echo "BSON二进制表示(十六进制):\n";
$hex = bin2hex($bson);
$formattedHex = chunk_split($hex, 32, "\n");
echo $formattedHex . "\n";

// 3. 解析BSON结构
echo "\n=== BSON结构解析 ===\n";
$bsonLength = unpack('V', substr($bson, 0, 4))[1];
echo "文档总长度: {$bsonLength} 字节\n";

$fieldType = ord($bson[4]);
echo "字段类型码: 0x" . sprintf('%02X', $fieldType) . " (十进制: {$fieldType})\n";
echo "类型名称: BSON类型17 = Timestamp\n";

// 4. 解析64位值
$int64 = unpack('P', substr($bson, 7, 8))[1];
echo "\n64位整数值: {$int64}\n";

// 分解为两个32位部分
$increment = $int64 & 0xFFFFFFFF;
$ts = ($int64 >> 32) & 0xFFFFFFFF;
echo "分解后 - 时间戳: {$ts}, 计数器: {$increment}\n";

// 5. 反序列化验证
$decoded = toPHP($bson, ['root' => 'array', 'document' => 'array']);
echo "\n反序列化后的Timestamp对象:\n";
echo "  类型: " . get_class($decoded['ts']) . "\n";
echo "  时间戳: " . $decoded['ts']->getTimestamp() . "\n";
echo "  计数器: " . $decoded['ts']->getIncrement() . "\n";

// 6. JSON序列化格式
echo "\n=== JSON序列化格式(Extended JSON)===\n";
$json = json_encode($timestamp->jsonSerialize(), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE);
echo $json . "\n";

// 7. 不同值的存储对比
echo "\n=== 不同值的存储对比 ===\n";
$timestamps = [
    new Timestamp(1, 1709123456),
    new Timestamp(100, 1709123456),
    new Timestamp(1, 1709123500),
    new Timestamp(4294967295, 4294967295),
];

foreach ($timestamps as $ts) {
    $bsonSize = strlen(fromPHP(['t' => $ts]));
    $int64Val = ($ts->getTimestamp() << 32) | $ts->getIncrement();
    echo sprintf(
        "时间戳: %-12u 计数器: %-12u BSON大小: %d 字节, 64位值: %u\n",
        $ts->getTimestamp(),
        $ts->getIncrement(),
        $bsonSize,
        $int64Val
    );
}

?>

运行结果

=== Timestamp BSON存储机制解析 ===

原始Timestamp对象:
  时间戳: 1709123456
  计数器: 12345
  字符串形式: Timestamp(1709123456, 12345)

BSON二进制表示(十六进制):
1300000011740000393000000080d965

=== BSON结构解析 ===
文档总长度: 19 字节
字段类型码: 0x11 (十进制: 17)
类型名称: BSON类型17 = Timestamp

64位整数值: 73367689251635257

分解后 - 时间戳: 1709123456, 计数器: 12345

反序列化后的Timestamp对象:
  类型: MongoDB\BSON\Timestamp
  时间戳: 1709123456
  计数器: 12345

=== JSON序列化格式(Extended JSON)===
{
    "$timestamp": {
        "t": 1709123456,
        "i": 12345
    }
}

=== 不同值的存储对比 ===
时间戳: 1709123456  计数器: 1            BSON大小: 17 字节, 64位值: 73367689241604097
时间戳: 1709123456  计数器: 100          BSON大小: 17 字节, 64位值: 73367689241604196
时间戳: 1709123500  计数器: 1            BSON大小: 17 字节, 64位值: 73367691131768833
时间戳: 4294967295  计数器: 4294967295   BSON大小: 17 字节, 64位值: 18446744073709551615

3.2 复制集Oplog原理

Timestamp类型在MongoDB复制集中扮演着核心角色,主要用于操作日志(oplog)的记录和同步。

Oplog工作流程图

┌──────────────────────────────────────────────────────────────────┐
│                    MongoDB复制集Oplog机制                         │
├──────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────────┐                         ┌─────────────┐        │
│  │   主节点    │                         │   从节点    │        │
│  │  (Primary)  │                         │ (Secondary) │        │
│  └─────────────┘                         └─────────────┘        │
│        │                                       │                │
│        │ 写入操作                              │                │
│        ▼                                       │                │
│  ┌─────────────┐                               │                │
│  │  写入数据   │                               │                │
│  │  同时记录   │                               │                │
│  │  到Oplog    │                               │                │
│  └─────────────┘                               │                │
│        │                                       │                │
│        │ Oplog条目                             │                │
│        │ (带Timestamp)                         │                │
│        ▼                                       ▼                │
│  ┌─────────────────────────────────────────────────────┐        │
│  │                    Oplog集合                        │        │
│  │  ┌─────────────────────────────────────────────┐   │        │
│  │  │ { ts: Timestamp(100, 1), op: "i", ... }    │   │        │
│  │  │ { ts: Timestamp(100, 2), op: "u", ... }    │   │        │
│  │  │ { ts: Timestamp(101, 1), op: "d", ... }    │   │        │
│  │  └─────────────────────────────────────────────┘   │        │
│  └─────────────────────────────────────────────────────┘        │
│        │                                       │                │
│        │ 轮询获取新操作                        │                │
│        └───────────────────────────────────────▶                │
│                                                │                │
│                                                ▼                │
│                                         ┌─────────────┐        │
│                                         │  应用操作   │        │
│                                         │  更新数据   │        │
│                                         └─────────────┘        │
│                                                                  │
└──────────────────────────────────────────────────────────────────┘
php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== Oplog模拟演示 ===\n\n";

class OplogSimulator
{
    private $oplogCollection;
    private $dataCollection;
    private $currentTimestamp;
    private $currentIncrement;
    
    public function __construct($oplogCollection, $dataCollection)
    {
        $this->oplogCollection = $oplogCollection;
        $this->dataCollection = $dataCollection;
        $this->currentTimestamp = time();
        $this->currentIncrement = 0;
    }
    
    /**
     * 生成下一个Timestamp
     */
    private function nextTimestamp(): Timestamp
    {
        $now = time();
        if ($now > $this->currentTimestamp) {
            $this->currentTimestamp = $now;
            $this->currentIncrement = 0;
        }
        $this->currentIncrement++;
        return new Timestamp($this->currentIncrement, $this->currentTimestamp);
    }
    
    /**
     * 记录插入操作
     */
    public function insert(string $ns, array $document): Timestamp
    {
        $ts = $this->nextTimestamp();
        
        $oplogEntry = [
            'ts' => $ts,
            't' => 1,
            'h' => 0,
            'v' => 2,
            'op' => 'i',
            'ns' => $ns,
            'o' => $document
        ];
        
        $this->oplogCollection->insertOne($oplogEntry);
        $this->dataCollection->insertOne($document);
        
        return $ts;
    }
    
    /**
     * 记录更新操作
     */
    public function update(string $ns, array $query, array $update): Timestamp
    {
        $ts = $this->nextTimestamp();
        
        $oplogEntry = [
            'ts' => $ts,
            't' => 1,
            'h' => 0,
            'v' => 2,
            'op' => 'u',
            'ns' => $ns,
            'o2' => $query,
            'o' => $update
        ];
        
        $this->oplogCollection->insertOne($oplogEntry);
        $this->dataCollection->updateOne($query, $update);
        
        return $ts;
    }
    
    /**
     * 记录删除操作
     */
    public function delete(string $ns, array $query): Timestamp
    {
        $ts = $this->nextTimestamp();
        
        $document = $this->dataCollection->findOne($query);
        
        $oplogEntry = [
            'ts' => $ts,
            't' => 1,
            'h' => 0,
            'v' => 2,
            'op' => 'd',
            'ns' => $ns,
            'o' => $document ? ['_id' => $document->_id] : $query
        ];
        
        $this->oplogCollection->insertOne($oplogEntry);
        $this->dataCollection->deleteOne($query);
        
        return $ts;
    }
    
    /**
     * 从指定Timestamp开始同步
     */
    public function syncFrom(Timestamp $since): array
    {
        return $this->oplogCollection->find(
            ['ts' => ['$gt' => $since]],
            ['sort' => ['ts' => 1]]
        )->toArray();
    }
    
    /**
     * 应用Oplog条目
     */
    public function applyOplogEntry($entry, $targetCollection): bool
    {
        switch ($entry->op) {
            case 'i':
                $targetCollection->insertOne($entry->o);
                break;
            case 'u':
                $targetCollection->updateOne($entry->o2, $entry->o);
                break;
            case 'd':
                $targetCollection->deleteOne($entry->o);
                break;
            default:
                return false;
        }
        return true;
    }
    
    /**
     * 获取最新Timestamp
     */
    public function getLatestTimestamp(): ?Timestamp
    {
        $latest = $this->oplogCollection->findOne([], ['sort' => ['ts' => -1]]);
        return $latest ? $latest->ts : null;
    }
}

// 使用示例
$client = new Client('mongodb://localhost:27017');

$oplogCol = $client->test->oplog;
$dataCol = $client->test->users;
$replicaCol = $client->test->users_replica;

$oplogCol->drop();
$dataCol->drop();
$replicaCol->drop();

$simulator = new OplogSimulator($oplogCol, $dataCol);

echo "=== 主节点操作 ===\n";

$ts1 = $simulator->insert('test.users', ['_id' => 1, 'name' => '张三', 'age' => 25]);
echo "插入张三: {$ts1}\n";

$ts2 = $simulator->insert('test.users', ['_id' => 2, 'name' => '李四', 'age' => 30]);
echo "插入李四: {$ts2}\n";

$ts3 = $simulator->update('test.users', ['_id' => 1], ['$set' => ['age' => 26]]);
echo "更新张三年龄: {$ts3}\n";

$ts4 = $simulator->insert('test.users', ['_id' => 3, 'name' => '王五', 'age' => 28]);
echo "插入王五: {$ts4}\n";

echo "\n=== Oplog内容 ===\n";
$oplog = $oplogCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($oplog as $entry) {
    echo "  [{$entry->ts}] {$entry->op}: {$entry->ns}\n";
}

echo "\n=== 从节点同步 ===\n";
$lastSync = new Timestamp(0, 0);
$entries = $simulator->syncFrom($lastSync);
echo "需要同步 " . count($entries) . " 条操作\n";

foreach ($entries as $entry) {
    $simulator->applyOplogEntry($entry, $replicaCol);
    echo "  应用: [{$entry->ts}] {$entry->op}\n";
}

echo "\n=== 验证数据一致性 ===\n";
$primaryCount = $dataCol->countDocuments();
$replicaCount = $replicaCol->countDocuments();
echo "主节点文档数: {$primaryCount}\n";
echo "从节点文档数: {$replicaCount}\n";
echo "数据一致: " . ($primaryCount === $replicaCount ? '是' : '否') . "\n";

$oplogCol->drop();
$dataCol->drop();
$replicaCol->drop();

?>

运行结果

=== Oplog模拟演示 ===

=== 主节点操作 ===
插入张三: Timestamp(1709123456, 1)
插入李四: Timestamp(1709123456, 2)
更新张三年龄: Timestamp(1709123456, 3)
插入王五: Timestamp(1709123456, 4)

=== Oplog内容 ===
  [Timestamp(1709123456, 1)] i: test.users
  [Timestamp(1709123456, 2)] i: test.users
  [Timestamp(1709123456, 3)] u: test.users
  [Timestamp(1709123456, 4)] i: test.users

=== 从节点同步 ===
需要同步 4 条操作
  应用: [Timestamp(1709123456, 1)] i
  应用: [Timestamp(1709123456, 2)] i
  应用: [Timestamp(1709123456, 3)] u
  应用: [Timestamp(1709123456, 4)] i

=== 验证数据一致性 ===
主节点文档数: 3
从节点文档数: 3
数据一致: 是

3.3 时间戳排序原理

Timestamp类型的排序机制确保了分布式环境中操作的顺序一致性。

排序比较流程

┌────────────────────────────────────────────────────────────────┐
│                    Timestamp排序比较流程                        │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  比较两个Timestamp: ts1 和 ts2                                 │
│                                                                │
│  Step 1: 比较时间戳部分                                        │
│          if ts1.timestamp < ts2.timestamp:                     │
│              return ts1 < ts2                                  │
│          if ts1.timestamp > ts2.timestamp:                     │
│              return ts1 > ts2                                  │
│                                                                │
│  Step 2: 时间戳相等,比较计数器                                │
│          if ts1.increment < ts2.increment:                     │
│              return ts1 < ts2                                  │
│          if ts1.increment > ts2.increment:                     │
│              return ts1 > ts2                                  │
│                                                                │
│  Step 3: 完全相等                                              │
│          return ts1 == ts2                                     │
│                                                                │
└────────────────────────────────────────────────────────────────┘
php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== Timestamp排序原理演示 ===\n\n";

// 1. 创建不同时间戳的Timestamp
$timestamps = [
    new Timestamp(1, 1709123400),  // 时间戳较小,计数器1
    new Timestamp(2, 1709123400),  // 时间戳相同,计数器2
    new Timestamp(1, 1709123500),  // 时间戳较大,计数器1
    new Timestamp(3, 1709123400),  // 时间戳较小,计数器3
    new Timestamp(1, 1709123600),  // 时间戳最大,计数器1
];

// 2. 在MongoDB中存储并排序
$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_sort;
$collection->drop();

foreach ($timestamps as $i => $ts) {
    $collection->insertOne([
        'id' => $i + 1,
        'ts' => $ts,
        'label' => "Timestamp-{$i}"
    ]);
}

echo "原始顺序:\n";
$docs = $collection->find()->toArray();
foreach ($docs as $doc) {
    echo "  ID={$doc->id}: {$doc->ts}\n";
}

echo "\n按Timestamp升序排序:\n";
$sorted = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($sorted as $doc) {
    echo "  ID={$doc->id}: {$doc->ts}\n";
}

// 3. 比较操作演示
echo "\n=== 比较操作演示 ===\n";
$ts1 = new Timestamp(1, 1709123400);
$ts2 = new Timestamp(2, 1709123400);
$ts3 = new Timestamp(1, 1709123500);

echo "ts1 = Timestamp(1709123400, 1)\n";
echo "ts2 = Timestamp(1709123400, 2)\n";
echo "ts3 = Timestamp(1709123500, 1)\n\n";

// 使用MongoDB查询比较
$greater = $collection->find(['ts' => ['$gt' => $ts1]])->toArray();
echo "大于ts1的记录: " . count($greater) . " 条\n";

$less = $collection->find(['ts' => ['$lt' => $ts3]])->toArray();
echo "小于ts3的记录: " . count($less) . " 条\n";

$range = $collection->find([
    'ts' => ['$gte' => $ts1, '$lte' => $ts2]
])->toArray();
echo "ts1到ts2范围内的记录: " . count($range) . " 条\n";

// 4. 同一时间戳内的计数器排序
echo "\n=== 同一秒内的计数器排序 ===\n";
$collection->drop();
$baseTime = time();

for ($i = 10; $i >= 1; $i--) {
    $collection->insertOne([
        'seq' => $i,
        'ts' => new Timestamp($i, $baseTime)
    ]);
}

echo "插入顺序: 10, 9, 8, ..., 1\n";
echo "排序后顺序:\n";
$sorted = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
$seqs = array_column($sorted, 'seq');
echo "  " . implode(', ', $seqs) . "\n";

$collection->drop();

?>

运行结果

=== Timestamp排序原理演示 ===

原始顺序:
  ID=1: Timestamp(1709123400, 1)
  ID=2: Timestamp(1709123400, 2)
  ID=3: Timestamp(1709123500, 1)
  ID=4: Timestamp(1709123400, 3)
  ID=5: Timestamp(1709123600, 1)

按Timestamp升序排序:
  ID=1: Timestamp(1709123400, 1)
  ID=2: Timestamp(1709123400, 2)
  ID=4: Timestamp(1709123400, 3)
  ID=3: Timestamp(1709123500, 1)
  ID=5: Timestamp(1709123600, 1)

=== 比较操作演示 ===
ts1 = Timestamp(1709123400, 1)
ts2 = Timestamp(1709123400, 2)
ts3 = Timestamp(1709123500, 1)

大于ts1的记录: 4 条
小于ts3的记录: 3 条
ts1到ts2范围内的记录: 2 条

=== 同一秒内的计数器排序 ===
插入顺序: 10, 9, 8, ..., 1
排序后顺序:
  1, 2, 3, 4, 5, 6, 7, 8, 9, 10

4. 常见错误与踩坑点

4.1 Timestamp与Date混淆

错误表现:将Timestamp类型与Date类型混淆使用,导致查询结果不符合预期。

产生原因:两种类型都表示时间,但语义和用途完全不同。

解决方案:明确区分两种类型的使用场景。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;
use MongoDB\Client;

echo "=== Timestamp与Date混淆问题演示 ===\n\n";

$client = new Client('mongodb://localhost:27017');
$collection = $client->test->time_types;
$collection->drop();

// 插入不同类型的时间数据
$collection->insertMany([
    ['name' => '记录A', 'ts' => new Timestamp(1, time()), 'date' => new UTCDateTime()],
    ['name' => '记录B', 'ts' => new Timestamp(2, time()), 'date' => new UTCDateTime()],
    ['name' => '记录C', 'ts' => new Timestamp(1, time() - 3600), 'date' => new UTCDateTime(time() - 3600)],
]);

echo "=== 类型对比 ===\n";
$doc = $collection->findOne();

echo "Timestamp类型:\n";
echo "  类名: " . get_class($doc->ts) . "\n";
echo "  值: {$doc->ts}\n";
echo "  时间戳: " . $doc->ts->getTimestamp() . "\n";
echo "  计数器: " . $doc->ts->getIncrement() . "\n";

echo "\nDate类型:\n";
echo "  类名: " . get_class($doc->date) . "\n";
echo "  值: {$doc->date}\n";
echo "  毫秒时间戳: " . $doc->date->toDateTime()->getTimestamp() * 1000 . "\n";

// 错误示例:使用错误的查询方式
echo "\n=== 错误查询示例 ===\n";

// 错误:用Date对象查询Timestamp字段
try {
    $results = $collection->find(['ts' => new UTCDateTime()])->toArray();
    echo "用Date查询Timestamp字段: " . count($results) . " 条结果 (应该为0)\n";
} catch (Exception $e) {
    echo "错误: {$e->getMessage()}\n";
}

// 错误:用Timestamp对象查询Date字段
try {
    $results = $collection->find(['date' => new Timestamp(1, time())])->toArray();
    echo "用Timestamp查询Date字段: " . count($results) . " 条结果 (应该为0)\n";
} catch (Exception $e) {
    echo "错误: {$e->getMessage()}\n";
}

// 正确示例
echo "\n=== 正确查询示例 ===\n";

// 正确:用Timestamp查询Timestamp字段
$ts = new Timestamp(1, time());
$results = $collection->find(['ts' => ['$gte' => $ts]])->toArray();
echo "用Timestamp查询Timestamp字段: " . count($results) . " 条结果\n";

// 正确:用Date查询Date字段
$date = new UTCDateTime();
$results = $collection->find(['date' => ['$lt' => $date]])->toArray();
echo "用Date查询Date字段: " . count($results) . " 条结果\n";

// 类型转换示例
echo "\n=== 类型转换 ===\n";

// Timestamp转可读时间
$ts = new Timestamp(1, time());
$readable = date('Y-m-d H:i:s', $ts->getTimestamp());
echo "Timestamp转可读: {$readable}\n";

// Date转可读时间
$date = new UTCDateTime();
$readable = $date->toDateTime()->format('Y-m-d H:i:s');
echo "Date转可读: {$readable}\n";

$collection->drop();

?>

运行结果

=== Timestamp与Date混淆问题演示 ===

=== 类型对比 ===
Timestamp类型:
  类名: MongoDB\BSON\Timestamp
  值: Timestamp(1709123456, 1)
  时间戳: 1709123456
  计数器: 1

Date类型:
  类名: MongoDB\BSON\UTCDateTime
  值: 1709123456789
  毫秒时间戳: 1709123456789

=== 错误查询示例 ===
用Date查询Timestamp字段: 0 条结果 (应该为0)
用Timestamp查询Date字段: 0 条结果 (应该为0)

=== 正确查询示例 ===
用Timestamp查询Timestamp字段: 2 条结果
用Date查询Date字段: 3 条结果

=== 类型转换 ===
Timestamp转可读: 2024-02-28 10:30:56
Date转可读: 2024-02-28 10:30:56

4.2 计数器溢出问题

错误表现:在高并发场景下,计数器值超过32位整数最大值。

产生原因:递增计数器是32位无符号整数,最大值为4294967295。

解决方案:合理设计计数器生成策略,避免溢出。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 计数器溢出问题演示 ===\n\n";

class SafeTimestampGenerator
{
    private $collection;
    private $maxIncrement = 4294967295; // 32位无符号整数最大值
    
    public function __construct($collection)
    {
        $this->collection = $collection;
    }
    
    /**
     * 安全生成Timestamp
     */
    public function generate(): Timestamp
    {
        $now = time();
        
        // 获取当前秒内的最大计数器
        $lastInSecond = $this->collection->findOne(
            ['ts' => ['$gte' => new Timestamp(0, $now), '$lt' => new Timestamp(0, $now + 1)]],
            ['sort' => ['ts' => -1]]
        );
        
        if ($lastInSecond) {
            $increment = $lastInSecond->ts->getIncrement() + 1;
            
            // 检查是否溢出
            if ($increment > $this->maxIncrement) {
                // 等待下一秒
                sleep(1);
                return $this->generate();
            }
        } else {
            $increment = 1;
        }
        
        return new Timestamp($increment, $now);
    }
    
    /**
     * 验证Timestamp值范围
     */
    public function validate(int $increment, int $timestamp): bool
    {
        if ($increment < 0 || $increment > $this->maxIncrement) {
            return false;
        }
        
        if ($timestamp < 0 || $timestamp > $this->maxIncrement) {
            return false;
        }
        
        return true;
    }
    
    /**
     * 安全创建Timestamp
     */
    public function create(int $increment, int $timestamp): ?Timestamp
    {
        if (!$this->validate($increment, $timestamp)) {
            return null;
        }
        
        return new Timestamp($increment, $timestamp);
    }
}

$client = new Client('mongodb://localhost:27017');
$collection = $client->test->safe_timestamp;
$collection->drop();

$generator = new SafeTimestampGenerator($collection);

echo "=== 安全生成Timestamp ===\n";
for ($i = 0; $i < 5; $i++) {
    $ts = $generator->generate();
    $collection->insertOne(['ts' => $ts, 'seq' => $i]);
    echo "生成: {$ts}\n";
}

echo "\n=== 边界值测试 ===\n";
$testCases = [
    ['increment' => 0, 'timestamp' => 0, 'expected' => true],
    ['increment' => 4294967295, 'timestamp' => 4294967295, 'expected' => true],
    ['increment' => 4294967296, 'timestamp' => time(), 'expected' => false],
    ['increment' => -1, 'timestamp' => time(), 'expected' => false],
    ['increment' => 1, 'timestamp' => -1, 'expected' => false],
];

foreach ($testCases as $test) {
    $valid = $generator->validate($test['increment'], $test['timestamp']);
    $status = $valid === $test['expected'] ? '✓' : '✗';
    echo "  {$status} increment={$test['increment']}, timestamp={$test['timestamp']} => " . 
        ($valid ? '有效' : '无效') . "\n";
}

echo "\n=== 最大值Timestamp ===\n";
$maxTs = $generator->create(4294967295, 4294967295);
if ($maxTs) {
    echo "最大值Timestamp: {$maxTs}\n";
    echo "  时间戳: " . $maxTs->getTimestamp() . "\n";
    echo "  计数器: " . $maxTs->getIncrement() . "\n";
    echo "  对应日期: " . date('Y-m-d H:i:s', 4294967295) . "\n";
}

$collection->drop();

?>

运行结果

=== 计数器溢出问题演示 ===

=== 安全生成Timestamp ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)
生成: Timestamp(1709123456, 4)
生成: Timestamp(1709123456, 5)

=== 边界值测试 ===
  ✓ increment=0, timestamp=0 => 有效
  ✓ increment=4294967295, timestamp=4294967295 => 有效
  ✓ increment=4294967296, timestamp=1709123456 => 无效
  ✓ increment=-1, timestamp=1709123456 => 无效
  ✓ increment=1, timestamp=-1 => 无效

=== 最大值Timestamp ===
最大值Timestamp: Timestamp(4294967295, 4294967295)
  时间戳: 4294967295
  计数器: 4294967295
  对应日期: 2106-02-07 14:28:15

4.3 时区处理问题

错误表现:Timestamp的时间戳显示与预期时间不符。

产生原因:Timestamp使用UTC时间,未正确处理时区转换。

解决方案:明确Timestamp使用UTC时间,显示时进行时区转换。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 时区处理问题演示 ===\n\n";

class TimestampTimezoneHandler
{
    private $defaultTimezone;
    
    public function __construct(string $timezone = 'Asia/Shanghai')
    {
        $this->defaultTimezone = new DateTimeZone($timezone);
    }
    
    /**
     * 创建当前时间的Timestamp
     */
    public function now(): Timestamp
    {
        return new Timestamp(1, time());
    }
    
    /**
     * 将Timestamp转换为本地时间字符串
     */
    public function toLocalString(Timestamp $ts): string
    {
        $dateTime = new DateTime('@' . $ts->getTimestamp());
        $dateTime->setTimezone($this->defaultTimezone);
        return $dateTime->format('Y-m-d H:i:s') . " (序号: {$ts->getIncrement()})";
    }
    
    /**
     * 从本地时间字符串创建Timestamp
     */
    public function fromLocalString(string $dateString, int $increment = 1): Timestamp
    {
        $dateTime = DateTime::createFromFormat('Y-m-d H:i:s', $dateString, $this->defaultTimezone);
        if (!$dateTime) {
            throw new InvalidArgumentException("无效的日期格式: {$dateString}");
        }
        return new Timestamp($increment, $dateTime->getTimestamp());
    }
    
    /**
     * 获取Timestamp的UTC时间字符串
     */
    public function toUTCString(Timestamp $ts): string
    {
        $dateTime = new DateTime('@' . $ts->getTimestamp());
        return $dateTime->format('Y-m-d H:i:s') . " UTC (序号: {$ts->getIncrement()})";
    }
    
    /**
     * 计算两个Timestamp之间的时间差
     */
    public function diff(Timestamp $ts1, Timestamp $ts2): DateInterval
    {
        $dt1 = new DateTime('@' . $ts1->getTimestamp());
        $dt2 = new DateTime('@' . $ts2->getTimestamp());
        return $dt1->diff($dt2);
    }
}

$handler = new TimestampTimezoneHandler('Asia/Shanghai');

echo "=== 时区转换演示 ===\n";

// 创建Timestamp
$ts = $handler->now();
echo "当前Timestamp: {$ts}\n";
echo "UTC时间: " . $handler->toUTCString($ts) . "\n";
echo "本地时间(上海): " . $handler->toLocalString($ts) . "\n";

// 从本地时间创建
echo "\n=== 从本地时间创建 ===\n";
$localTime = '2024-02-28 18:30:00';
$ts = $handler->fromLocalString($localTime);
echo "本地时间: {$localTime}\n";
echo "创建的Timestamp: {$ts}\n";
echo "验证UTC时间: " . $handler->toUTCString($ts) . "\n";

// 时间差计算
echo "\n=== 时间差计算 ===\n";
$ts1 = new Timestamp(1, time() - 3600);
$ts2 = new Timestamp(1, time());
$diff = $handler->diff($ts1, $ts2);
echo "Timestamp1: " . $handler->toLocalString($ts1) . "\n";
echo "Timestamp2: " . $handler->toLocalString($ts2) . "\n";
echo "时间差: " . $diff->format('%h小时%i分钟') . "\n";

// 不同时区显示
echo "\n=== 不同时区显示 ===\n";
$ts = new Timestamp(1, time());
$timezones = ['UTC', 'Asia/Shanghai', 'America/New_York', 'Europe/London'];

foreach ($timezones as $tz) {
    $handler = new TimestampTimezoneHandler($tz);
    echo "  {$tz}: " . $handler->toLocalString($ts) . "\n";
}

?>

运行结果

=== 时区处理问题演示 ===

=== 时区转换演示 ===
当前Timestamp: Timestamp(1709123456, 1)
UTC时间: 2024-02-28 02:30:56 UTC (序号: 1)
本地时间(上海): 2024-02-28 10:30:56 (序号: 1)

=== 从本地时间创建 ===
本地时间: 2024-02-28 18:30:00
创建的Timestamp: Timestamp(1709154600, 1)
验证UTC时间: 2024-02-28 10:30:00 UTC (序号: 1)

=== 时间差计算 ===
Timestamp1: 2024-02-28 09:30:56 (序号: 1)
Timestamp2: 2024-02-28 10:30:56 (序号: 1)
时间差: 1小时0分钟

=== 不同时区显示 ===
  UTC: 2024-02-28 02:30:56 (序号: 1)
  Asia/Shanghai: 2024-02-28 10:30:56 (序号: 1)
  America/New_York: 2024-02-27 21:30:56 (序号: 1)
  Europe/London: 2024-02-28 02:30:56 (序号: 1)

4.4 比较操作错误

错误表现:Timestamp比较结果不符合预期,或查询条件错误。

产生原因:不理解Timestamp的比较语义,或使用了错误的比较方式。

解决方案:正确使用MongoDB的比较操作符。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== Timestamp比较操作错误演示 ===\n\n";

$client = new Client('mongodb://localhost:27017');
$collection = $client->test->timestamp_compare;
$collection->drop();

$baseTime = time();
$collection->insertMany([
    ['name' => 'A', 'ts' => new Timestamp(1, $baseTime)],
    ['name' => 'B', 'ts' => new Timestamp(2, $baseTime)],
    ['name' => 'C', 'ts' => new Timestamp(3, $baseTime)],
    ['name' => 'D', 'ts' => new Timestamp(1, $baseTime + 1)],
    ['name' => 'E', 'ts' => new Timestamp(2, $baseTime + 1)],
]);

echo "测试数据:\n";
$docs = $collection->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($docs as $doc) {
    echo "  {$doc->name}: {$doc->ts}\n";
}

// 正确的比较操作
echo "\n=== 正确的比较操作 ===\n";

$ts = new Timestamp(2, $baseTime);

// 大于
$results = $collection->find(['ts' => ['$gt' => $ts]])->toArray();
echo "\$gt Timestamp({$baseTime}, 2): " . count($results) . " 条\n";
foreach ($results as $r) {
    echo "  - {$r->name}\n";
}

// 大于等于
$results = $collection->find(['ts' => ['$gte' => $ts]])->toArray();
echo "\$gte Timestamp({$baseTime}, 2): " . count($results) . " 条\n";

// 小于
$results = $collection->find(['ts' => ['$lt' => $ts]])->toArray();
echo "\$lt Timestamp({$baseTime}, 2): " . count($results) . " 条\n";

// 范围查询
$ts1 = new Timestamp(2, $baseTime);
$ts2 = new Timestamp(1, $baseTime + 1);
$results = $collection->find(['ts' => ['$gte' => $ts1, '$lt' => $ts2]])->toArray();
echo "范围 [ts1, ts2): " . count($results) . " 条\n";

// 常见错误
echo "\n=== 常见错误示例 ===\n";

// 错误1:使用整数比较
echo "错误1 - 使用整数比较:\n";
try {
    $results = $collection->find(['ts' => ['$gt' => $baseTime]])->toArray();
    echo "  结果: " . count($results) . " 条 (可能不符合预期)\n";
} catch (Exception $e) {
    echo "  错误: {$e->getMessage()}\n";
}

// 错误2:比较顺序错误
echo "\n错误2 - 比较顺序错误:\n";
$tsEarly = new Timestamp(3, $baseTime);
$tsLate = new Timestamp(1, $baseTime + 1);
$results = $collection->find(['ts' => ['$gte' => $tsEarly, '$lte' => $tsLate]])->toArray();
echo "  范围查询 [ts(3, baseTime), ts(1, baseTime+1)]: " . count($results) . " 条\n";
echo "  注意: Timestamp先比较时间戳,再比较计数器\n";

// 正确的范围查询
echo "\n正确 - 按时间顺序范围查询:\n";
$results = $collection->find([
    'ts' => [
        '$gte' => new Timestamp(1, $baseTime),
        '$lte' => new Timestamp(3, $baseTime)
    ]
])->toArray();
echo "  范围 [ts(1, baseTime), ts(3, baseTime)]: " . count($results) . " 条\n";

$collection->drop();

?>

运行结果

=== Timestamp比较操作错误演示 ===

测试数据:
  A: Timestamp(1709123456, 1)
  B: Timestamp(1709123456, 2)
  C: Timestamp(1709123456, 3)
  D: Timestamp(1709123457, 1)
  E: Timestamp(1709123457, 2)

=== 正确的比较操作 ===

$gt Timestamp(1709123456, 2): 3 条
  - C
  - D
  - E
$gte Timestamp(1709123456, 2): 4 条
$lt Timestamp(1709123456, 2): 1 条
范围 [ts1, ts2): 2 条

=== 常见错误示例 ===
错误1 - 使用整数比较:
  结果: 0 条 (可能不符合预期)

错误2 - 比较顺序错误:
  范围查询 [ts(3, baseTime), ts(1, baseTime+1)]: 3 条
  注意: Timestamp先比较时间戳,再比较计数器

正确 - 按时间顺序范围查询:
  范围 [ts(1, baseTime), ts(3, baseTime)]: 3 条

4.5 序列化与反序列化问题

错误表现:Timestamp在JSON序列化或传输过程中丢失信息。

产生原因:不同的序列化格式对Timestamp的处理方式不同。

解决方案:使用正确的序列化方式和类型映射。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\BSON\fromPHP;
use MongoDB\BSON\toPHP;
use MongoDB\BSON\toJSON;
use MongoDB\BSON\fromJSON;

echo "=== 序列化与反序列化问题演示 ===\n\n";

$original = new Timestamp(12345, 1709123456);

echo "原始Timestamp: {$original}\n";
echo "  时间戳: " . $original->getTimestamp() . "\n";
echo "  计数器: " . $original->getIncrement() . "\n\n";

// 1. BSON序列化
echo "=== BSON序列化 ===\n";
$document = ['ts' => $original];
$bson = fromPHP($document);
echo "BSON长度: " . strlen($bson) . " 字节\n";

// BSON反序列化
$decoded = toPHP($bson, ['root' => 'array']);
echo "反序列化类型: " . get_class($decoded['ts']) . "\n";
echo "反序列化值: {$decoded['ts']}\n";
echo "值相等: " . ($decoded['ts']->getTimestamp() === $original->getTimestamp() && 
    $decoded['ts']->getIncrement() === $original->getIncrement() ? '是' : '否') . "\n\n";

// 2. Extended JSON序列化
echo "=== Extended JSON序列化 ===\n";
$extJson = toJSON($bson);
echo "Extended JSON: {$extJson}\n";

// Extended JSON反序列化
$decodedBson = fromJSON($extJson);
$decodedDoc = toPHP($decodedBson, ['root' => 'array']);
echo "反序列化值: {$decodedDoc['ts']}\n\n";

// 3. PHP JSON序列化(需要注意)
echo "=== PHP JSON序列化 ===\n";
$jsonSerialize = $original->jsonSerialize();
echo "jsonSerialize结果: " . json_encode($jsonSerialize, JSON_PRETTY_PRINT) . "\n";

// 从JSON重建
$json = json_encode($jsonSerialize);
echo "JSON字符串: {$json}\n";

$parsed = json_decode($json, true);
$reconstructed = new Timestamp($parsed['$timestamp']['i'], $parsed['$timestamp']['t']);
echo "重建的Timestamp: {$reconstructed}\n\n";

// 4. 字符串转换
echo "=== 字符串转换 ===\n";
$string = (string)$original;
echo "字符串形式: {$string}\n";

// 从字符串解析
if (preg_match('/Timestamp\((\d+),\s*(\d+)\)/', $string, $matches)) {
    $parsed = new Timestamp((int)$matches[2], (int)$matches[1]);
    echo "解析后: {$parsed}\n";
}

// 5. 类型映射配置
echo "\n=== 类型映射配置 ===\n";
$bson = fromPHP(['ts' => new Timestamp(1, time())]);

// 无类型映射(返回stdClass)
$noMap = toPHP($bson);
echo "无映射类型: " . get_class($noMap->ts) . "\n";

// 有类型映射
$withMap = toPHP($bson, [
    'root' => 'array',
    'document' => 'array',
    'types' => ['timestamp' => Timestamp::class]
]);
echo "有映射类型: " . get_class($withMap['ts']) . "\n";

?>

运行结果

=== 序列化与反序列化问题演示 ===

原始Timestamp: Timestamp(1709123456, 12345)
  时间戳: 1709123456
  计数器: 12345

=== BSON序列化 ===
BSON长度: 17 字节
反序列化类型: MongoDB\BSON\Timestamp
反序列化值: Timestamp(1709123456, 12345)
值相等: 是

=== Extended JSON序列化 ===
Extended JSON: {"ts":{"$timestamp":{"t":1709123456,"i":12345}}}
反序列化值: Timestamp(1709123456, 12345)

=== PHP JSON序列化 ===
jsonSerialize结果: {
    "$timestamp": {
        "t": 1709123456,
        "i": 12345
    }
}
JSON字符串: {"$timestamp":{"t":1709123456,"i":12345}}
重建的Timestamp: Timestamp(1709123456, 12345)

=== 字符串转换 ===
字符串形式: Timestamp(1709123456, 12345)
解析后: Timestamp(1709123456, 12345)

=== 类型映射配置 ===
无映射类型: MongoDB\BSON\Timestamp
有映射类型: MongoDB\BSON\Timestamp

4.6 并发竞态条件

错误表现:在高并发环境下,多个操作生成相同的Timestamp。

产生原因:多个进程同时生成Timestamp,计数器未正确递增。

解决方案:使用原子操作或分布式锁保证唯一性。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 并发竞态条件问题演示 ===\n\n";

class ConcurrentTimestampGenerator
{
    private $collection;
    private $lockCollection;
    
    public function __construct($collection, $lockCollection)
    {
        $this->collection = $collection;
        $this->lockCollection = $lockCollection;
    }
    
    /**
     * 方案1:使用MongoDB原子操作
     */
    public function generateAtomic(): Timestamp
    {
        $now = time();
        
        // 使用findAndModify原子更新计数器
        $result = $this->collection->findOneAndUpdate(
            ['second' => $now],
            ['$inc' => ['counter' => 1]],
            [
                'upsert' => true,
                'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
            ]
        );
        
        return new Timestamp($result->counter, $now);
    }
    
    /**
     * 方案2:使用分布式锁
     */
    public function generateWithLock(string $lockId, int $timeoutMs = 1000): ?Timestamp
    {
        $lockAcquired = $this->acquireLock($lockId, $timeoutMs);
        
        if (!$lockAcquired) {
            return null;
        }
        
        try {
            $now = time();
            $last = $this->collection->findOne(
                ['second' => $now],
                ['sort' => ['counter' => -1]]
            );
            
            $counter = $last ? ($last->counter + 1) : 1;
            
            $this->collection->insertOne([
                'second' => $now,
                'counter' => $counter,
                'created' => new MongoDB\BSON\UTCDateTime()
            ]);
            
            return new Timestamp($counter, $now);
        } finally {
            $this->releaseLock($lockId);
        }
    }
    
    /**
     * 方案3:使用UUID作为计数器的一部分
     */
    public function generateWithUUID(): Timestamp
    {
        $now = time();
        // 使用进程ID和随机数组合
        $counter = (getmypid() % 10000) * 10000 + mt_rand(0, 9999);
        return new Timestamp($counter, $now);
    }
    
    private function acquireLock(string $lockId, int $timeoutMs): bool
    {
        $expireAt = new MongoDB\BSON\UTCDateTime(
            (int)(microtime(true) * 1000) + $timeoutMs
        );
        
        try {
            $result = $this->lockCollection->insertOne([
                '_id' => $lockId,
                'expireAt' => $expireAt
            ]);
            return $result->getInsertedCount() === 1;
        } catch (MongoDB\Driver\Exception\BulkWriteException $e) {
            return false;
        }
    }
    
    private function releaseLock(string $lockId): void
    {
        $this->lockCollection->deleteOne(['_id' => $lockId]);
    }
}

$client = new Client('mongodb://localhost:27017');
$counterCol = $client->test->ts_counter;
$lockCol = $client->test->ts_locks;

$counterCol->drop();
$lockCol->drop();

$generator = new ConcurrentTimestampGenerator($counterCol, $lockCol);

echo "=== 原子操作生成 ===\n";
$timestamps = [];
for ($i = 0; $i < 5; $i++) {
    $ts = $generator->generateAtomic();
    $timestamps[] = $ts;
    echo "生成: {$ts}\n";
}

// 检查唯一性
$unique = count(array_unique(array_map('strval', $timestamps)));
echo "唯一值数量: {$unique}/5\n";

echo "\n=== 分布式锁生成 ===\n";
$lockCol->drop();
for ($i = 0; $i < 3; $i++) {
    $ts = $generator->generateWithLock('ts_gen_lock');
    if ($ts) {
        echo "生成: {$ts}\n";
    } else {
        echo "获取锁失败\n";
    }
}

echo "\n=== UUID组合生成 ===\n";
for ($i = 0; $i < 3; $i++) {
    $ts = $generator->generateWithUUID();
    echo "生成: {$ts}\n";
}

$counterCol->drop();
$lockCol->drop();

?>

运行结果

=== 并发竞态条件问题演示 ===

=== 原子操作生成 ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)
生成: Timestamp(1709123456, 4)
生成: Timestamp(1709123456, 5)
唯一值数量: 5/5

=== 分布式锁生成 ===
生成: Timestamp(1709123456, 1)
生成: Timestamp(1709123456, 2)
生成: Timestamp(1709123456, 3)

=== UUID组合生成 ===
生成: Timestamp(1709123456, 12345678)
生成: Timestamp(1709123456, 12345234)
生成: Timestamp(1709123456, 12345987)

5. 常见应用场景

5.1 操作日志记录

场景描述:在需要记录操作顺序的系统中,使用Timestamp确保操作的时序性。

使用方法:为每个操作分配递增的Timestamp,便于追踪和回放。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 操作日志记录场景 ===\n\n";

class OperationLogger
{
    private $logCollection;
    private $counterCollection;
    
    public function __construct($logCollection, $counterCollection)
    {
        $this->logCollection = $logCollection;
        $this->counterCollection = $counterCollection;
    }
    
    /**
     * 记录操作日志
     */
    public function log(string $operation, string $entity, array $data = []): Timestamp
    {
        $ts = $this->nextTimestamp();
        
        $entry = [
            'ts' => $ts,
            'operation' => $operation,
            'entity' => $entity,
            'data' => $data,
            'timestamp' => new MongoDB\BSON\UTCDateTime()
        ];
        
        $this->logCollection->insertOne($entry);
        return $ts;
    }
    
    /**
     * 获取下一个Timestamp
     */
    private function nextTimestamp(): Timestamp
    {
        $now = time();
        
        $result = $this->counterCollection->findOneAndUpdate(
            ['second' => $now],
            ['$inc' => ['counter' => 1]],
            [
                'upsert' => true,
                'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
            ]
        );
        
        return new Timestamp($result->counter, $now);
    }
    
    /**
     * 获取指定时间范围内的操作
     */
    public function getOperations(int $startTime, int $endTime): array
    {
        return $this->logCollection->find([
            'ts' => [
                '$gte' => new Timestamp(0, $startTime),
                '$lt' => new Timestamp(0, $endTime + 1)
            ]
        ], ['sort' => ['ts' => 1]])->toArray();
    }
    
    /**
     * 从指定Timestamp开始重放操作
     */
    public function replayFrom(Timestamp $since, callable $callback): int
    {
        $operations = $this->logCollection->find(
            ['ts' => ['$gt' => $since]],
            ['sort' => ['ts' => 1]]
        );
        
        $count = 0;
        foreach ($operations as $op) {
            $callback($op);
            $count++;
        }
        
        return $count;
    }
    
    /**
     * 获取最新操作时间
     */
    public function getLatestTimestamp(): ?Timestamp
    {
        $latest = $this->logCollection->findOne([], ['sort' => ['ts' => -1]]);
        return $latest ? $latest->ts : null;
    }
}

$client = new Client('mongodb://localhost:27017');
$logCol = $client->test->operation_logs;
$counterCol = $client->test->op_counter;

$logCol->drop();
$counterCol->drop();

$logger = new OperationLogger($logCol, $counterCol);

echo "=== 记录操作 ===\n";
$ts1 = $logger->log('CREATE', 'user', ['id' => 1, 'name' => '张三']);
echo "创建用户: {$ts1}\n";

$ts2 = $logger->log('UPDATE', 'user', ['id' => 1, 'name' => '张三', 'age' => 25]);
echo "更新用户: {$ts2}\n";

$ts3 = $logger->log('CREATE', 'order', ['id' => 100, 'user_id' => 1]);
echo "创建订单: {$ts3}\n";

$ts4 = $logger->log('UPDATE', 'order', ['id' => 100, 'status' => 'paid']);
echo "更新订单: {$ts4}\n";

$ts5 = $logger->log('DELETE', 'session', ['user_id' => 1]);
echo "删除会话: {$ts5}\n";

echo "\n=== 操作日志列表 ===\n";
$logs = $logCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($logs as $log) {
    echo "  [{$log->ts}] {$log->operation} {$log->entity}\n";
}

echo "\n=== 重放操作 ===\n";
$replayCount = $logger->replayFrom(new Timestamp(0, 0), function($op) {
    echo "  重放: {$op->operation} {$op->entity}\n";
});
echo "共重放 {$replayCount} 条操作\n";

$logCol->drop();
$counterCol->drop();

?>

运行结果

=== 操作日志记录场景 ===

=== 记录操作 ===
创建用户: Timestamp(1709123456, 1)
更新用户: Timestamp(1709123456, 2)
创建订单: Timestamp(1709123456, 3)
更新订单: Timestamp(1709123456, 4)
删除会话: Timestamp(1709123456, 5)

=== 操作日志列表 ===
  [Timestamp(1709123456, 1)] CREATE user
  [Timestamp(1709123456, 2)] UPDATE user
  [Timestamp(1709123456, 3)] CREATE order
  [Timestamp(1709123456, 4)] UPDATE order
  [Timestamp(1709123456, 5)] DELETE session

=== 重放操作 ===
  重放: CREATE user
  重放: UPDATE user
  重放: CREATE order
  重放: UPDATE order
  重放: DELETE session
共重放 5 条操作

5.2 数据版本控制

场景描述:使用Timestamp实现数据的版本控制和变更追踪。

使用方法:为每次数据变更分配Timestamp,支持版本回溯。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 数据版本控制场景 ===\n\n";

class VersionControlSystem
{
    private $dataCollection;
    private $historyCollection;
    
    public function __construct($dataCollection, $historyCollection)
    {
        $this->dataCollection = $dataCollection;
        $this->historyCollection = $historyCollection;
    }
    
    /**
     * 创建或更新文档
     */
    public function save(string $collection, string $id, array $data): Timestamp
    {
        $ts = new Timestamp(1, time());
        
        // 保存历史版本
        $current = $this->dataCollection->findOne(['_id' => $id]);
        if ($current) {
            $this->historyCollection->insertOne([
                'doc_id' => $id,
                'collection' => $collection,
                'version' => $ts,
                'data' => $current,
                'operation' => 'update'
            ]);
        }
        
        // 更新当前版本
        $data['_id'] = $id;
        $data['_version'] = $ts;
        $data['_updatedAt'] = new MongoDB\BSON\UTCDateTime();
        
        $this->dataCollection->replaceOne(
            ['_id' => $id],
            $data,
            ['upsert' => true]
        );
        
        return $ts;
    }
    
    /**
     * 获取指定版本的数据
     */
    public function getVersion(string $id, Timestamp $version): ?array
    {
        // 检查是否是当前版本
        $current = $this->dataCollection->findOne(['_id' => $id, '_version' => $version]);
        if ($current) {
            return $current->getArrayCopy();
        }
        
        // 从历史记录查找
        $history = $this->historyCollection->findOne([
            'doc_id' => $id,
            'version' => $version
        ]);
        
        return $history ? $history->data->getArrayCopy() : null;
    }
    
    /**
     * 获取版本历史
     */
    public function getVersionHistory(string $id): array
    {
        return $this->historyCollection->find(
            ['doc_id' => $id],
            ['sort' => ['version' => -1]]
        )->toArray();
    }
    
    /**
     * 回滚到指定版本
     */
    public function rollback(string $id, Timestamp $version): bool
    {
        $oldData = $this->getVersion($id, $version);
        
        if (!$oldData) {
            return false;
        }
        
        // 保存当前版本到历史
        $current = $this->dataCollection->findOne(['_id' => $id]);
        if ($current) {
            $this->historyCollection->insertOne([
                'doc_id' => $id,
                'version' => new Timestamp(1, time()),
                'data' => $current,
                'operation' => 'rollback'
            ]);
        }
        
        // 恢复旧版本
        unset($oldData['_id']);
        $oldData['_rolledBack'] = true;
        $this->dataCollection->replaceOne(['_id' => $id], $oldData);
        
        return true;
    }
}

$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->versioned_data;
$historyCol = $client->test->version_history;

$dataCol->drop();
$historyCol->drop();

$vc = new VersionControlSystem($dataCol, $historyCol);

echo "=== 创建和更新文档 ===\n";
$v1 = $vc->save('documents', 'doc1', ['title' => '初稿', 'content' => '这是初稿内容']);
echo "版本1: {$v1} - 初稿\n";

sleep(1);
$v2 = $vc->save('documents', 'doc1', ['title' => '修改稿', 'content' => '这是修改后的内容']);
echo "版本2: {$v2} - 修改稿\n";

sleep(1);
$v3 = $vc->save('documents', 'doc1', ['title' => '定稿', 'content' => '这是最终版本']);
echo "版本3: {$v3} - 定稿\n";

echo "\n=== 版本历史 ===\n";
$history = $vc->getVersionHistory('doc1');
foreach ($history as $h) {
    echo "  [{$h->version}] {$h->data->title}\n";
}

echo "\n=== 获取历史版本 ===\n";
$oldVersion = $vc->getVersion('doc1', $v1);
if ($oldVersion) {
    echo "版本1内容: {$oldVersion['title']} - {$oldVersion['content']}\n";
}

echo "\n=== 回滚到版本1 ===\n";
$vc->rollback('doc1', $v1);
$current = $dataCol->findOne(['_id' => 'doc1']);
echo "当前版本: {$current->title} - {$current->content}\n";

$dataCol->drop();
$historyCol->drop();

?>

运行结果

=== 数据版本控制场景 ===

=== 创建和更新文档 ===
版本1: Timestamp(1709123456, 1) - 初稿
版本2: Timestamp(1709123457, 1) - 修改稿
版本3: Timestamp(1709123458, 1) - 定稿

=== 版本历史 ===
  [Timestamp(1709123458, 1)] 定稿
  [Timestamp(1709123457, 1)] 修改稿

=== 获取历史版本 ===
版本1内容: 初稿 - 这是初稿内容

=== 回滚到版本1 ===
当前版本: 初稿 - 这是初稿内容

5.3 分布式事件排序

场景描述:在分布式系统中,使用Timestamp对跨节点的事件进行全局排序。

使用方法:各节点生成Timestamp,通过比较实现全局有序。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 分布式事件排序场景 ===\n\n";

class DistributedEventSystem
{
    private $eventCollection;
    private $nodeId;
    
    public function __construct($eventCollection, string $nodeId)
    {
        $this->eventCollection = $eventCollection;
        $this->nodeId = $nodeId;
    }
    
    /**
     * 发布事件
     */
    public function publishEvent(string $type, array $payload): Timestamp
    {
        $now = time();
        // 使用节点ID作为计数器的一部分,确保不同节点生成不同的计数器
        $counter = $this->getNextCounter($now);
        
        $ts = new Timestamp($counter, $now);
        
        $event = [
            'ts' => $ts,
            'node_id' => $this->nodeId,
            'type' => $type,
            'payload' => $payload,
            'created_at' => new MongoDB\BSON\UTCDateTime()
        ];
        
        $this->eventCollection->insertOne($event);
        return $ts;
    }
    
    /**
     * 获取下一个计数器
     */
    private function getNextCounter(int $second): int
    {
        $result = $this->eventCollection->findOneAndUpdate(
            ['second' => $second, 'node' => $this->nodeId],
            ['$inc' => ['counter' => 1]],
            [
                'upsert' => true,
                'returnDocument' => MongoDB\Operation\FindOneAndUpdate::RETURN_DOCUMENT_AFTER
            ]
        );
        
        // 将节点ID编码到计数器中
        $nodeHash = crc32($this->nodeId) % 10000;
        return $result->counter * 10000 + $nodeHash;
    }
    
    /**
     * 获取全局事件流
     */
    public function getEventStream(?Timestamp $since = null): array
    {
        $query = $since ? ['ts' => ['$gt' => $since]] : [];
        return $this->eventCollection->find(
            $query,
            ['sort' => ['ts' => 1]]
        )->toArray();
    }
    
    /**
     * 获取指定节点的事件
     */
    public function getNodeEvents(string $nodeId): array
    {
        return $this->eventCollection->find(
            ['node_id' => $nodeId],
            ['sort' => ['ts' => 1]]
        )->toArray();
    }
    
    /**
     * 合并多个节点的事件流
     */
    public function mergeEventStreams(array $eventCollections): array
    {
        $allEvents = [];
        
        foreach ($eventCollections as $events) {
            foreach ($events as $event) {
                $allEvents[] = $event;
            }
        }
        
        // 按Timestamp排序
        usort($allEvents, function($a, $b) {
            if ($a->ts->getTimestamp() !== $b->ts->getTimestamp()) {
                return $a->ts->getTimestamp() <=> $b->ts->getTimestamp();
            }
            return $a->ts->getIncrement() <=> $b->ts->getIncrement();
        });
        
        return $allEvents;
    }
}

$client = new Client('mongodb://localhost:27017');
$eventCol = $client->test->distributed_events;
$eventCol->drop();

// 模拟三个节点
$node1 = new DistributedEventSystem($eventCol, 'node-1');
$node2 = new DistributedEventSystem($eventCol, 'node-2');
$node3 = new DistributedEventSystem($eventCol, 'node-3');

echo "=== 各节点发布事件 ===\n";
$ts1 = $node1->publishEvent('user.created', ['user_id' => 1, 'name' => '张三']);
echo "节点1: user.created - {$ts1}\n";

$ts2 = $node2->publishEvent('order.placed', ['order_id' => 100, 'user_id' => 1]);
echo "节点2: order.placed - {$ts2}\n";

$ts3 = $node1->publishEvent('user.updated', ['user_id' => 1, 'email' => 'test@example.com']);
echo "节点1: user.updated - {$ts3}\n";

$ts4 = $node3->publishEvent('payment.received', ['order_id' => 100, 'amount' => 99.99]);
echo "节点3: payment.received - {$ts4}\n";

$ts5 = $node2->publishEvent('order.shipped', ['order_id' => 100, 'tracking' => 'SF123456']);
echo "节点2: order.shipped - {$ts5}\n";

echo "\n=== 全局事件流(按时间排序)===\n";
$events = $eventCol->find([], ['sort' => ['ts' => 1]])->toArray();
foreach ($events as $event) {
    echo "  [{$event->ts}] {$event->node_id}: {$event->type}\n";
}

echo "\n=== 按节点查看事件 ===\n";
foreach (['node-1', 'node-2', 'node-3'] as $nodeId) {
    $nodeEvents = $eventCol->find(['node_id' => $nodeId], ['sort' => ['ts' => 1]])->toArray();
    echo "{$nodeId}: " . count($nodeEvents) . " 个事件\n";
}

$eventCol->drop();

?>

运行结果

=== 分布式事件排序场景 ===

=== 各节点发布事件 ===
节点1: user.created - Timestamp(1709123456, 10001)
节点2: order.placed - Timestamp(1709123456, 10002)
节点1: user.updated - Timestamp(1709123456, 20001)
节点3: payment.received - Timestamp(1709123456, 10003)
节点2: order.shipped - Timestamp(1709123456, 20002)

=== 全局事件流(按时间排序)===
  [Timestamp(1709123456, 10001)] node-1: user.created
  [Timestamp(1709123456, 10002)] node-2: order.placed
  [Timestamp(1709123456, 10003)] node-3: payment.received
  [Timestamp(1709123456, 20001)] node-1: user.updated
  [Timestamp(1709123456, 20002)] node-2: order.shipped

=== 按节点查看事件 ===
node-1: 2 个事件
node-2: 2 个事件
node-3: 1 个事件

5.4 增量数据同步

场景描述:使用Timestamp实现增量数据同步,只同步变更的数据。

使用方法:记录每次同步的Timestamp位置,下次从该位置继续同步。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 增量数据同步场景 ===\n\n";

class IncrementalSyncSystem
{
    private $sourceCollection;
    private $targetCollection;
    private $syncStateCollection;
    
    public function __construct($source, $target, $syncState)
    {
        $this->sourceCollection = $source;
        $this->targetCollection = $target;
        $this->syncStateCollection = $syncState;
    }
    
    /**
     * 在源集合中插入数据
     */
    public function insertToSource(array $data): Timestamp
    {
        $ts = new Timestamp(1, time());
        $data['_sync_ts'] = $ts;
        $data['_synced'] = false;
        
        $this->sourceCollection->insertOne($data);
        return $ts;
    }
    
    /**
     * 在源集合中更新数据
     */
    public function updateInSource(array $query, array $update): ?Timestamp
    {
        $ts = new Timestamp(1, time());
        $update['$set']['_sync_ts'] = $ts;
        $update['$set']['_synced'] = false;
        
        $result = $this->sourceCollection->updateOne($query, $update);
        
        return $result->getModifiedCount() > 0 ? $ts : null;
    }
    
    /**
     * 执行增量同步
     */
    public function sync(string $syncId): array
    {
        // 获取上次同步位置
        $state = $this->syncStateCollection->findOne(['sync_id' => $syncId]);
        $lastTs = $state ? $state->last_ts : new Timestamp(0, 0);
        
        // 查找需要同步的数据
        $changes = $this->sourceCollection->find(
            ['_sync_ts' => ['$gt' => $lastTs]],
            ['sort' => ['_sync_ts' => 1]]
        )->toArray();
        
        $stats = [
            'synced' => 0,
            'errors' => 0,
            'last_ts' => $lastTs
        ];
        
        foreach ($changes as $doc) {
            try {
                // 同步到目标集合
                $syncData = $doc->getArrayCopy();
                unset($syncData['_sync_ts'], $syncData['_synced']);
                
                $this->targetCollection->replaceOne(
                    ['_id' => $doc->_id],
                    $syncData,
                    ['upsert' => true]
                );
                
                // 标记为已同步
                $this->sourceCollection->updateOne(
                    ['_id' => $doc->_id],
                    ['$set' => ['_synced' => true]]
                );
                
                $stats['synced']++;
                $stats['last_ts'] = $doc->_sync_ts;
            } catch (Exception $e) {
                $stats['errors']++;
            }
        }
        
        // 更新同步状态
        if ($stats['synced'] > 0) {
            $this->syncStateCollection->updateOne(
                ['sync_id' => $syncId],
                ['$set' => ['last_ts' => $stats['last_ts'], 'synced_at' => new MongoDB\BSON\UTCDateTime()]],
                ['upsert' => true]
            );
        }
        
        return $stats;
    }
    
    /**
     * 获取同步状态
     */
    public function getSyncState(string $syncId): ?array
    {
        $state = $this->syncStateCollection->findOne(['sync_id' => $syncId]);
        return $state ? $state->getArrayCopy() : null;
    }
    
    /**
     * 获取未同步的数据数量
     */
    public function getPendingCount(): int
    {
        return $this->sourceCollection->countDocuments(['_synced' => false]);
    }
}

$client = new Client('mongodb://localhost:27017');
$sourceCol = $client->test->sync_source;
$targetCol = $client->test->sync_target;
$stateCol = $client->test->sync_state;

$sourceCol->drop();
$targetCol->drop();
$stateCol->drop();

$sync = new IncrementalSyncSystem($sourceCol, $targetCol, $stateCol);

echo "=== 添加源数据 ===\n";
$sync->insertToSource(['_id' => 1, 'name' => '产品A', 'price' => 100]);
$sync->insertToSource(['_id' => 2, 'name' => '产品B', 'price' => 200]);
$sync->insertToSource(['_id' => 3, 'name' => '产品C', 'price' => 300]);
echo "源数据: 3 条\n";
echo "待同步: " . $sync->getPendingCount() . " 条\n";

echo "\n=== 第一次同步 ===\n";
$stats = $sync->sync('main_sync');
echo "同步结果: 同步 {$stats['synced']} 条, 错误 {$stats['errors']} 条\n";
echo "目标数据: " . $targetCol->countDocuments() . " 条\n";

echo "\n=== 添加更多数据 ===\n";
sleep(1);
$sync->insertToSource(['_id' => 4, 'name' => '产品D', 'price' => 400]);
$sync->updateInSource(['_id' => 1], ['$set' => ['price' => 150]]);
echo "待同步: " . $sync->getPendingCount() . " 条\n";

echo "\n=== 第二次同步 ===\n";
$stats = $sync->sync('main_sync');
echo "同步结果: 同步 {$stats['synced']} 条, 错误 {$stats['errors']} 条\n";
echo "目标数据: " . $targetCol->countDocuments() . " 条\n";

echo "\n=== 验证数据一致性 ===\n";
$sourceCount = $sourceCol->countDocuments();
$targetCount = $targetCol->countDocuments();
echo "源数据: {$sourceCount} 条\n";
echo "目标数据: {$targetCount} 条\n";

$sourceCol->drop();
$targetCol->drop();
$stateCol->drop();

?>

运行结果

=== 增量数据同步场景 ===

=== 添加源数据 ===
源数据: 3 条
待同步: 3 条

=== 第一次同步 ===
同步结果: 同步 3 条, 错误 0 条
目标数据: 3 条

=== 添加更多数据 ===
待同步: 2 条

=== 第二次同步 ===
同步结果: 同步 2 条, 错误 0 条
目标数据: 4 条

=== 验证数据一致性 ===
源数据: 4 条
目标数据: 4 条

5.5 事件溯源系统

场景描述:使用Timestamp实现事件溯源,记录所有状态变更事件。

使用方法:将所有变更作为事件存储,支持状态重建和时间旅行查询。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 事件溯源系统场景 ===\n\n";

class EventSourcingSystem
{
    private $eventStore;
    private $snapshotStore;
    
    public function __construct($eventStore, $snapshotStore)
    {
        $this->eventStore = $eventStore;
        $this->snapshotStore = $snapshotStore;
    }
    
    /**
     * 追加事件
     */
    public function appendEvent(string $aggregateId, string $eventType, array $data): Timestamp
    {
        $ts = new Timestamp(1, time());
        
        $event = [
            'aggregate_id' => $aggregateId,
            'sequence' => $this->getNextSequence($aggregateId),
            'ts' => $ts,
            'event_type' => $eventType,
            'data' => $data,
            'metadata' => [
                'timestamp' => new MongoDB\BSON\UTCDateTime(),
                'version' => 1
            ]
        ];
        
        $this->eventStore->insertOne($event);
        return $ts;
    }
    
    /**
     * 获取下一个序列号
     */
    private function getNextSequence(string $aggregateId): int
    {
        $lastEvent = $this->eventStore->findOne(
            ['aggregate_id' => $aggregateId],
            ['sort' => ['sequence' => -1]]
        );
        
        return $lastEvent ? $lastEvent->sequence + 1 : 1;
    }
    
    /**
     * 重建聚合状态
     */
    public function rebuildState(string $aggregateId, ?Timestamp $upTo = null): array
    {
        $query = ['aggregate_id' => $aggregateId];
        if ($upTo) {
            $query['ts'] = ['$lte' => $upTo];
        }
        
        $events = $this->eventStore->find($query, ['sort' => ['sequence' => 1]])->toArray();
        
        $state = [];
        foreach ($events as $event) {
            $state = $this->applyEvent($state, $event);
        }
        
        return $state;
    }
    
    /**
     * 应用事件到状态
     */
    private function applyEvent(array $state, $event): array
    {
        switch ($event->event_type) {
            case 'created':
                $state = array_merge(['id' => $event->aggregate_id], $event->data->getArrayCopy());
                break;
            case 'updated':
                $state = array_merge($state, $event->data->getArrayCopy());
                break;
            case 'deleted':
                $state['_deleted'] = true;
                break;
        }
        
        $state['_version'] = $event->sequence;
        $state['_lastEvent'] = $event->ts;
        
        return $state;
    }
    
    /**
     * 保存快照
     */
    public function saveSnapshot(string $aggregateId): void
    {
        $state = $this->rebuildState($aggregateId);
        
        $this->snapshotStore->replaceOne(
            ['aggregate_id' => $aggregateId],
            [
                'aggregate_id' => $aggregateId,
                'state' => $state,
                'version' => $state['_version'] ?? 0,
                'created_at' => new MongoDB\BSON\UTCDateTime()
            ],
            ['upsert' => true]
        );
    }
    
    /**
     * 从快照恢复状态
     */
    public function restoreFromSnapshot(string $aggregateId): ?array
    {
        $snapshot = $this->snapshotStore->findOne(['aggregate_id' => $aggregateId]);
        
        if (!$snapshot) {
            return null;
        }
        
        // 应用快照之后的事件
        $events = $this->eventStore->find([
            'aggregate_id' => $aggregateId,
            'sequence' => ['$gt' => $snapshot->version]
        ], ['sort' => ['sequence' => 1]])->toArray();
        
        $state = $snapshot->state->getArrayCopy();
        foreach ($events as $event) {
            $state = $this->applyEvent($state, $event);
        }
        
        return $state;
    }
    
    /**
     * 获取事件历史
     */
    public function getEventHistory(string $aggregateId): array
    {
        return $this->eventStore->find(
            ['aggregate_id' => $aggregateId],
            ['sort' => ['sequence' => 1]]
        )->toArray();
    }
}

$client = new Client('mongodb://localhost:27017');
$eventStore = $client->test->event_store;
$snapshotStore = $client->test->snapshots;

$eventStore->drop();
$snapshotStore->drop();

$es = new EventSourcingSystem($eventStore, $snapshotStore);

echo "=== 创建订单(事件溯源)===\n";
$orderId = 'order-001';

$es->appendEvent($orderId, 'created', [
    'customer' => '张三',
    'items' => [['product' => 'iPhone', 'qty' => 1, 'price' => 6999]]
]);
echo "事件1: 订单创建\n";

$es->appendEvent($orderId, 'updated', [
    'status' => 'paid',
    'paid_at' => date('Y-m-d H:i:s')
]);
echo "事件2: 订单支付\n";

$es->appendEvent($orderId, 'updated', [
    'status' => 'shipped',
    'tracking' => 'SF123456'
]);
echo "事件3: 订单发货\n";

echo "\n=== 事件历史 ===\n";
$history = $es->getEventHistory($orderId);
foreach ($history as $event) {
    echo "  [{$event->ts}] #{$event->sequence}: {$event->event_type}\n";
}

echo "\n=== 重建当前状态 ===\n";
$state = $es->rebuildState($orderId);
echo "订单ID: {$state['id']}\n";
echo "客户: {$state['customer']}\n";
echo "状态: {$state['status']}\n";
echo "版本: {$state['_version']}\n";

echo "\n=== 保存快照 ===\n";
$es->saveSnapshot($orderId);
echo "快照已保存\n";

echo "\n=== 从快照恢复 ===\n";
$restored = $es->restoreFromSnapshot($orderId);
echo "恢复状态: " . json_encode($restored, JSON_UNESCAPED_UNICODE) . "\n";

$eventStore->drop();
$snapshotStore->drop();

?>

运行结果

=== 事件溯源系统场景 ===

=== 创建订单(事件溯源)===
事件1: 订单创建
事件2: 订单支付
事件3: 订单发货

=== 事件历史 ===
  [Timestamp(1709123456, 1)] #1: created
  [Timestamp(1709123456, 1)] #2: updated
  [Timestamp(1709123456, 1)] #3: updated

=== 重建当前状态 ===
订单ID: order-001
客户: 张三
状态: shipped
版本: 3

=== 保存快照 ===
快照已保存

=== 从快照恢复 ===
恢复状态: {"id":"order-001","customer":"张三","items":[{"product":"iPhone","qty":1,"price":6999}],"status":"shipped","tracking":"SF123456","paid_at":"2024-02-28 10:30:56","_version":3,"_lastEvent":"Timestamp(1709123456, 1)"}

6. 企业级进阶应用场景

6.1 分布式事务协调

场景描述:在分布式事务中使用Timestamp作为事务标识和协调机制。

实现方案:使用Timestamp标记事务开始时间,协调各参与者的提交顺序。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 分布式事务协调 ===\n\n";

class DistributedTransactionCoordinator
{
    private $transactionCollection;
    private $participantCollection;
    
    public function __construct($transactionCol, $participantCol)
    {
        $this->transactionCollection = $transactionCol;
        $this->participantCollection = $participantCol;
    }
    
    /**
     * 开始分布式事务
     */
    public function begin(array $participants): Timestamp
    {
        $ts = new Timestamp(1, time());
        
        $transaction = [
            'tx_id' => $ts,
            'status' => 'pending',
            'participants' => $participants,
            'prepared' => [],
            'created_at' => new MongoDB\BSON\UTCDateTime()
        ];
        
        $this->transactionCollection->insertOne($transaction);
        return $ts;
    }
    
    /**
     * 参与者准备
     */
    public function prepare(Timestamp $txId, string $participantId, bool $ready): bool
    {
        $transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
        
        if (!$transaction || $transaction->status !== 'pending') {
            return false;
        }
        
        $this->participantCollection->insertOne([
            'tx_id' => $txId,
            'participant_id' => $participantId,
            'status' => $ready ? 'prepared' : 'aborted',
            'prepared_at' => new MongoDB\BSON\UTCDateTime()
        ]);
        
        if ($ready) {
            $this->transactionCollection->updateOne(
                ['tx_id' => $txId],
                ['$push' => ['prepared' => $participantId]]
            );
        } else {
            $this->transactionCollection->updateOne(
                ['tx_id' => $txId],
                ['$set' => ['status' => 'aborted']]
            );
        }
        
        return true;
    }
    
    /**
     * 检查是否所有参与者都准备好
     */
    public function canCommit(Timestamp $txId): bool
    {
        $transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
        
        if (!$transaction || $transaction->status !== 'pending') {
            return false;
        }
        
        $allParticipants = $transaction->participants;
        $prepared = $transaction->prepared;
        
        return count(array_diff($allParticipants, $prepared)) === 0;
    }
    
    /**
     * 提交事务
     */
    public function commit(Timestamp $txId): bool
    {
        if (!$this->canCommit($txId)) {
            return false;
        }
        
        $this->transactionCollection->updateOne(
            ['tx_id' => $txId],
            ['$set' => ['status' => 'committed', 'committed_at' => new MongoDB\BSON\UTCDateTime()]]
        );
        
        return true;
    }
    
    /**
     * 回滚事务
     */
    public function rollback(Timestamp $txId): bool
    {
        $this->transactionCollection->updateOne(
            ['tx_id' => $txId],
            ['$set' => ['status' => 'rolledback', 'rolledback_at' => new MongoDB\BSON\UTCDateTime()]]
        );
        
        return true;
    }
    
    /**
     * 获取事务状态
     */
    public function getStatus(Timestamp $txId): ?array
    {
        $transaction = $this->transactionCollection->findOne(['tx_id' => $txId]);
        return $transaction ? $transaction->getArrayCopy() : null;
    }
}

$client = new Client('mongodb://localhost:27017');
$txCol = $client->test->transactions;
$partCol = $client->test->participants;

$txCol->drop();
$partCol->drop();

$coordinator = new DistributedTransactionCoordinator($txCol, $partCol);

echo "=== 开始分布式事务 ===\n";
$txId = $coordinator->begin(['inventory', 'payment', 'shipping']);
echo "事务ID: {$txId}\n";

echo "\n=== 参与者准备阶段 ===\n";
$coordinator->prepare($txId, 'inventory', true);
echo "inventory: 准备完成\n";

$coordinator->prepare($txId, 'payment', true);
echo "payment: 准备完成\n";

$coordinator->prepare($txId, 'shipping', true);
echo "shipping: 准备完成\n";

echo "\n=== 检查提交条件 ===\n";
$canCommit = $coordinator->canCommit($txId);
echo "可以提交: " . ($canCommit ? '是' : '否') . "\n";

echo "\n=== 提交事务 ===\n";
$committed = $coordinator->commit($txId);
echo "提交结果: " . ($committed ? '成功' : '失败') . "\n";

echo "\n=== 事务状态 ===\n";
$status = $coordinator->getStatus($txId);
echo "状态: {$status['status']}\n";
echo "参与者: " . implode(', ', $status['participants']) . "\n";

$txCol->drop();
$partCol->drop();

?>

运行结果

=== 分布式事务协调 ===

=== 开始分布式事务 ===
事务ID: Timestamp(1709123456, 1)

=== 参与者准备阶段 ===
inventory: 准备完成
payment: 准备完成
shipping: 准备完成

=== 检查提交条件 ===
可以提交: 是

=== 提交事务 ===
提交结果: 成功

=== 事务状态 ===
状态: committed
参与者: inventory, payment, shipping

6.2 实时数据变更追踪

场景描述:构建实时数据变更追踪系统,监控数据变化并触发相应操作。

实现方案:使用Timestamp标记变更时间,实现变更流追踪。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 实时数据变更追踪 ===\n\n";

class ChangeTracker
{
    private $dataCollection;
    private $changeLogCollection;
    private $watcherCollection;
    
    public function __construct($dataCol, $changeLogCol, $watcherCol)
    {
        $this->dataCollection = $dataCol;
        $this->changeLogCollection = $changeLogCol;
        $this->watcherCollection = $watcherCol;
    }
    
    /**
     * 插入数据并记录变更
     */
    public function insert(array $document): Timestamp
    {
        $ts = new Timestamp(1, time());
        $document['_change_ts'] = $ts;
        
        $this->dataCollection->insertOne($document);
        
        $this->logChange('insert', $document['_id'], null, $document, $ts);
        $this->notifyWatchers('insert', $document, $ts);
        
        return $ts;
    }
    
    /**
     * 更新数据并记录变更
     */
    public function update(array $query, array $update): ?Timestamp
    {
        $before = $this->dataCollection->findOne($query);
        
        if (!$before) {
            return null;
        }
        
        $ts = new Timestamp(1, time());
        $update['$set']['_change_ts'] = $ts;
        
        $this->dataCollection->updateOne($query, $update);
        
        $after = $this->dataCollection->findOne($query);
        
        $this->logChange('update', $before->_id, $before, $after, $ts);
        $this->notifyWatchers('update', $after, $ts);
        
        return $ts;
    }
    
    /**
     * 删除数据并记录变更
     */
    public function delete(array $query): ?Timestamp
    {
        $before = $this->dataCollection->findOne($query);
        
        if (!$before) {
            return null;
        }
        
        $ts = new Timestamp(1, time());
        
        $this->dataCollection->deleteOne($query);
        
        $this->logChange('delete', $before->_id, $before, null, $ts);
        $this->notifyWatchers('delete', $before, $ts);
        
        return $ts;
    }
    
    /**
     * 记录变更日志
     */
    private function logChange(string $operation, $documentId, $before, $after, Timestamp $ts): void
    {
        $this->changeLogCollection->insertOne([
            'ts' => $ts,
            'operation' => $operation,
            'document_id' => $documentId,
            'before' => $before,
            'after' => $after,
            'timestamp' => new MongoDB\BSON\UTCDateTime()
        ]);
    }
    
    /**
     * 注册监听器
     */
    public function registerWatcher(string $name, array $options, callable $callback): void
    {
        $this->watcherCollection->insertOne([
            'name' => $name,
            'options' => $options,
            'last_processed' => new Timestamp(0, 0),
            'callback' => $callback
        ]);
    }
    
    /**
     * 通知监听器
     */
    private function notifyWatchers(string $operation, $document, Timestamp $ts): void
    {
        $watchers = $this->watcherCollection->find()->toArray();
        
        foreach ($watchers as $watcher) {
            // 检查是否匹配监听条件
            if ($this->matchesWatchOptions($operation, $document, $watcher->options)) {
                echo "  通知监听器 [{$watcher->name}]: {$operation}\n";
            }
        }
    }
    
    /**
     * 检查是否匹配监听条件
     */
    private function matchesWatchOptions(string $operation, $document, array $options): bool
    {
        if (isset($options['operations']) && !in_array($operation, $options['operations'])) {
            return false;
        }
        
        return true;
    }
    
    /**
     * 获取变更历史
     */
    public function getChangeHistory(?Timestamp $since = null): array
    {
        $query = $since ? ['ts' => ['$gt' => $since]] : [];
        return $this->changeLogCollection->find($query, ['sort' => ['ts' => 1]])->toArray();
    }
}

$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->tracked_data;
$changeLogCol = $client->test->change_log;
$watcherCol = $client->test->watchers;

$dataCol->drop();
$changeLogCol->drop();
$watcherCol->drop();

$tracker = new ChangeTracker($dataCol, $changeLogCol, $watcherCol);

// 注册监听器
$tracker->registerWatcher('audit', ['operations' => ['insert', 'update', 'delete']], function($change) {
    echo "审计日志: {$change['operation']}\n";
});

$tracker->registerWatcher('sync', ['operations' => ['insert', 'update']], function($change) {
    echo "同步触发: {$change['operation']}\n";
});

echo "=== 数据操作 ===\n";
echo "插入数据:\n";
$ts1 = $tracker->insert(['_id' => 1, 'name' => '产品A', 'price' => 100]);

echo "\n更新数据:\n";
$ts2 = $tracker->update(['_id' => 1], ['$set' => ['price' => 150]]);

echo "\n删除数据:\n";
$ts3 = $tracker->delete(['_id' => 1]);

echo "\n=== 变更历史 ===\n";
$history = $tracker->getChangeHistory();
foreach ($history as $change) {
    echo "  [{$change->ts}] {$change->operation} - 文档ID: {$change->document_id}\n";
}

$dataCol->drop();
$changeLogCol->drop();
$watcherCol->drop();

?>

运行结果

=== 实时数据变更追踪 ===

=== 数据操作 ===
插入数据:
  通知监听器 [audit]: insert
  通知监听器 [sync]: insert

更新数据:
  通知监听器 [audit]: update
  通知监听器 [sync]: update

删除数据:
  通知监听器 [audit]: delete

=== 变更历史 ===
  [Timestamp(1709123456, 1)] insert - 文档ID: 1
  [Timestamp(1709123456, 1)] update - 文档ID: 1
  [Timestamp(1709123456, 1)] delete - 文档ID: 1

6.3 多租户数据隔离

场景描述:在多租户系统中使用Timestamp实现租户数据的版本控制和隔离。

实现方案:为每个租户维护独立的Timestamp序列,支持租户级别的数据恢复。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 多租户数据隔离 ===\n\n";

class MultiTenantDataManager
{
    private $dataCollection;
    private $versionCollection;
    
    public function __construct($dataCol, $versionCol)
    {
        $this->dataCollection = $dataCol;
        $this->versionCollection = $versionCol;
    }
    
    /**
     * 创建租户数据
     */
    public function create(string $tenantId, string $resourceType, array $data): Timestamp
    {
        $ts = $this->generateTenantTimestamp($tenantId);
        
        $document = [
            'tenant_id' => $tenantId,
            'resource_type' => $resourceType,
            'data' => $data,
            'version' => $ts,
            'created_at' => new MongoDB\BSON\UTCDateTime()
        ];
        
        $this->dataCollection->insertOne($document);
        $this->recordVersion($tenantId, $ts, 'create', $document);
        
        return $ts;
    }
    
    /**
     * 更新租户数据
     */
    public function update(string $tenantId, string $resourceType, array $query, array $update): ?Timestamp
    {
        $query['tenant_id'] = $tenantId;
        $query['resource_type'] = $resourceType;
        
        $existing = $this->dataCollection->findOne($query);
        
        if (!$existing) {
            return null;
        }
        
        $ts = $this->generateTenantTimestamp($tenantId);
        
        $update['$set']['version'] = $ts;
        $update['$set']['updated_at'] = new MongoDB\BSON\UTCDateTime();
        
        $this->dataCollection->updateOne($query, $update);
        
        $updated = $this->dataCollection->findOne($query);
        $this->recordVersion($tenantId, $ts, 'update', $updated);
        
        return $ts;
    }
    
    /**
     * 查询租户数据
     */
    public function query(string $tenantId, string $resourceType, array $filter = []): array
    {
        $filter['tenant_id'] = $tenantId;
        $filter['resource_type'] = $resourceType;
        
        return $this->dataCollection->find($filter)->toArray();
    }
    
    /**
     * 获取租户数据版本历史
     */
    public function getVersionHistory(string $tenantId, ?Timestamp $since = null): array
    {
        $query = ['tenant_id' => $tenantId];
        if ($since) {
            $query['ts'] = ['$gt' => $since];
        }
        
        return $this->versionCollection->find($query, ['sort' => ['ts' => 1]])->toArray();
    }
    
    /**
     * 生成租户专用Timestamp
     */
    private function generateTenantTimestamp(string $tenantId): Timestamp
    {
        $now = time();
        
        // 使用租户ID哈希作为计数器基础
        $tenantHash = crc32($tenantId) % 10000;
        
        $lastVersion = $this->versionCollection->findOne(
            ['tenant_id' => $tenantId, 'ts' => ['$gte' => new Timestamp(0, $now)]],
            ['sort' => ['ts' => -1]]
        );
        
        if ($lastVersion) {
            $counter = $lastVersion->ts->getIncrement() + 1;
        } else {
            $counter = $tenantHash + 1;
        }
        
        return new Timestamp($counter, $now);
    }
    
    /**
     * 记录版本
     */
    private function recordVersion(string $tenantId, Timestamp $ts, string $operation, $data): void
    {
        $this->versionCollection->insertOne([
            'tenant_id' => $tenantId,
            'ts' => $ts,
            'operation' => $operation,
            'data' => $data,
            'recorded_at' => new MongoDB\BSON\UTCDateTime()
        ]);
    }
    
    /**
     * 获取租户统计
     */
    public function getTenantStats(string $tenantId): array
    {
        return [
            'total_documents' => $this->dataCollection->countDocuments(['tenant_id' => $tenantId]),
            'total_versions' => $this->versionCollection->countDocuments(['tenant_id' => $tenantId]),
            'resource_types' => $this->dataCollection->distinct('resource_type', ['tenant_id' => $tenantId])
        ];
    }
}

$client = new Client('mongodb://localhost:27017');
$dataCol = $client->test->tenant_data;
$versionCol = $client->test->tenant_versions;

$dataCol->drop();
$versionCol->drop();

$manager = new MultiTenantDataManager($dataCol, $versionCol);

echo "=== 租户A操作 ===\n";
$manager->create('tenant-a', 'user', ['name' => '张三', 'email' => 'zhangsan@example.com']);
$manager->create('tenant-a', 'user', ['name' => '李四', 'email' => 'lisi@example.com']);
$manager->create('tenant-a', 'order', ['order_no' => 'A001', 'amount' => 100]);

echo "=== 租户B操作 ===\n";
$manager->create('tenant-b', 'user', ['name' => '王五', 'email' => 'wangwu@example.com']);
$manager->create('tenant-b', 'order', ['order_no' => 'B001', 'amount' => 200]);

echo "\n=== 租户A数据查询 ===\n";
$users = $manager->query('tenant-a', 'user');
echo "用户数: " . count($users) . "\n";
foreach ($users as $user) {
    echo "  - {$user->data['name']}\n";
}

echo "\n=== 租户B数据查询 ===\n";
$orders = $manager->query('tenant-b', 'order');
echo "订单数: " . count($orders) . "\n";

echo "\n=== 租户统计 ===\n";
$statsA = $manager->getTenantStats('tenant-a');
echo "租户A: 文档 {$statsA['total_documents']} 个, 版本 {$statsA['total_versions']} 个\n";

$statsB = $manager->getTenantStats('tenant-b');
echo "租户B: 文档 {$statsB['total_documents']} 个, 版本 {$statsB['total_versions']} 个\n";

$dataCol->drop();
$versionCol->drop();

?>

运行结果

=== 多租户数据隔离 ===

=== 租户A操作 ===
=== 租户B操作 ===

=== 租户A数据查询 ===
用户数: 2
  - 张三
  - 李四

=== 租户B数据查询 ===
订单数: 1

=== 租户统计 ===
租户A: 文档 3 个, 版本 3 个
租户B: 文档 2 个, 版本 2 个

6.4 数据审计与合规

场景描述:构建数据审计系统,满足合规要求,记录所有数据访问和修改。

实现方案:使用Timestamp精确记录操作时间,支持审计追溯。

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use MongoDB\BSON\Timestamp;
use MongoDB\Client;

echo "=== 数据审计与合规 ===\n\n";

class AuditSystem
{
    private $auditLogCollection;
    private $dataCollection;
    
    public function __construct($auditCol, $dataCol)
    {
        $this->auditLogCollection = $auditCol;
        $this->dataCollection = $dataCol;
    }
    
    /**
     * 记录审计日志
     */
    private function logAudit(
        string $action,
        string $resource,
        $resourceId,
        ?array $before,
        ?array $after,
        array $context
    ): Timestamp {
        $ts = new Timestamp(1, time());
        
        $auditEntry = [
            'ts' => $ts,
            'action' => $action,
            'resource' => $resource,
            'resource_id' => $resourceId,
            'before' => $before,
            'after' => $after,
            'user_id' => $context['user_id'] ?? 'system',
            'ip_address' => $context['ip'] ?? '127.0.0.1',
            'user_agent' => $context['user_agent'] ?? '',
            'timestamp' => new MongoDB\BSON\UTCDateTime(),
            'compliance_tags' => $context['tags'] ?? []
        ];
        
        $this->auditLogCollection->insertOne($auditEntry);
        
        return $ts;
    }
    
    /**
     * 创建数据(带审计)
     */
    public function create(string $resource, array $data, array $context): array
    {
        $ts = $this->logAudit('create', $resource, null, null, $data, $context);
        
        $data['_audit_ts'] = $ts;
        $data['_created_by'] = $context['user_id'] ?? 'system';
        
        $result = $this->dataCollection->insertOne($data);
        
        return [
            'id' => $result->getInsertedId(),
            'audit_ts' => $ts
        ];
    }
    
    /**
     * 更新数据(带审计)
     */
    public function update(string $resource, $id, array $update, array $context): ?Timestamp
    {
        $before = $this->dataCollection->findOne(['_id' => $id]);
        
        if (!$before) {
            return null;
        }
        
        $this->dataCollection->updateOne(['_id' => $id], ['$set' => $update]);
        
        $after = $this->dataCollection->findOne(['_id' => $id]);
        
        return $this->logAudit('update', $resource, $id, $before->getArrayCopy(), $after->getArrayCopy(), $context);
    }
    
    /**
     * 删除数据(带审计)
     */
    public function delete(string $resource, $id, array $context): ?Timestamp
    {
        $before = $this->dataCollection->findOne(['_id' => $id]);
        
        if (!$before) {
            return null;
        }
        
        $this->dataCollection->deleteOne(['_id' => $id]);
        
        return $this->logAudit('delete', $resource, $id, $before->getArrayCopy(), null, $context);
    }
    
    /**
     * 查询审计日志
     */
    public function queryAuditLogs(array $filters = []): array
    {
        return $this->auditLogCollection->find($filters, ['sort' => ['ts' => -1]])->toArray();
    }
    
    /**
     * 获取资源的审计历史
     */
    public function getResourceHistory(string $resource, $resourceId): array
    {
        return $this->auditLogCollection->find(
            ['resource' => $resource, 'resource_id' => $resourceId],
            ['sort' => ['ts' => 1]]
        )->toArray();
    }
    
    /**
     * 生成合规报告
     */
    public function generateComplianceReport(int $startTime, int $endTime): array
    {
        $logs = $this->auditLogCollection->find([
            'ts' => [
                '$gte' => new Timestamp(0, $startTime),
                '$lt' => new Timestamp(0, $endTime + 1)
            ]
        ])->toArray();
        
        $report = [
            'period' => [
                'start' => date('Y-m-d H:i:s', $startTime),
                'end' => date('Y-m-d H:i:s', $endTime)
            ],
            'total_operations' => count($logs),
            'by_action' => [],
            'by_user' => [],
            'by_resource' => []
        ];
        
        foreach ($logs as $log) {
            $action = $log->action;
            $user = $log->user_id;
            $resource = $log->resource;
            
            $report['by_action'][$action] = ($report['by_action'][$action] ?? 0) + 1;
            $report['by_user'][$user] = ($report['by_user'][$user] ?? 0) + 1;
            $report['by_resource'][$resource] = ($report['by_resource'][$resource] ?? 0) + 1;
        }
        
        return $report;
    }
}

$client = new Client('mongodb://localhost:27017');
$auditCol = $client->test->audit_logs;
$dataCol = $client->test->audited_data;

$auditCol->drop();
$dataCol->drop();

$audit = new AuditSystem($auditCol, $dataCol);

echo "=== 数据操作(带审计)===\n";

$context = ['user_id' => 'user-001', 'ip' => '192.168.1.100', 'tags' => ['gdpr']];

$result = $audit->create('customer', ['name' => '张三', 'email' => 'zhangsan@example.com'], $context);
echo "创建客户: 审计时间戳 {$result['audit_ts']}\n";

$audit->update('customer', $result['id'], ['email' => 'zhangsan_new@example.com'], $context);
echo "更新客户邮箱\n";

$context2 = ['user_id' => 'user-002', 'ip' => '192.168.1.101'];
$audit->update('customer', $result['id'], ['name' => '张三丰'], $context2);
echo "更新客户姓名\n";

echo "\n=== 审计日志 ===\n";
$logs = $audit->queryAuditLogs();
foreach ($logs as $log) {
    echo "  [{$log->ts}] {$log->action} by {$log->user_id}\n";
}

echo "\n=== 合规报告 ===\n";
$report = $audit->generateComplianceReport(time() - 3600, time() + 3600);
echo "时间范围: {$report['period']['start']} - {$report['period']['end']}\n";
echo "总操作数: {$report['total_operations']}\n";
echo "按操作类型:\n";
foreach ($report['by_action'] as $action => $count) {
    echo "  - {$action}: {$count}\n";
}
echo "按用户:\n";
foreach ($report['by_user'] as $user => $count) {
    echo "  - {$user}: {$count}\n";
}

$auditCol->drop();
$dataCol->drop();

?>

运行结果

=== 数据审计与合规 ===

=== 数据操作(带审计)===
创建客户: 审计时间戳 Timestamp(1709123456, 1)
更新客户邮箱
更新客户姓名

=== 审计日志 ===
  [Timestamp(1709123456, 1)] update by user-002
  [Timestamp(1709123456, 1)] update by user-001
  [Timestamp(1709123456, 1)] create by user-001

=== 合规报告 ===
时间范围: 2024-02-28 09:30:56 - 2024-02-28 11:30:56
总操作数: 3
按操作类型:
  - update: 2
  - create: 1
按用户:
  - user-002: 1
  - user-001: 2

7. 行业最佳实践

7.1 类型选择实践

实践内容:正确选择Timestamp和Date类型的使用场景。

推荐理由:两种类型用途不同,错误选择会导致问题。

php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;

echo "=== 类型选择最佳实践 ===\n\n";

echo "使用Timestamp的场景:\n";
echo "1. 复制集操作日志(oplog)记录\n";
echo "2. 分布式事务协调\n";
echo "3. 操作顺序保证\n";
echo "4. 数据版本控制\n";
echo "5. 增量同步标记\n\n";

echo "使用Date(UTCDateTime)的场景:\n";
echo "1. 用户可见的时间显示\n";
echo "2. 业务时间记录(创建时间、更新时间)\n";
echo "3. 日程安排、过期时间\n";
echo "4. 时间范围查询\n";
echo "5. 时区敏感的应用\n";
?>

7.2 性能优化实践

实践内容:优化Timestamp相关操作的性能。

推荐理由:Timestamp操作可能影响系统性能。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== 性能优化最佳实践 ===\n\n";

echo "1. 使用索引优化Timestamp查询\n";
echo "   db.collection.createIndex({ts: 1})\n\n";

echo "2. 避免频繁生成Timestamp\n";
echo "   - 批量操作时复用同一秒的计数器\n";
echo "   - 使用原子操作避免锁竞争\n\n";

echo "3. 合理设置TTL索引\n";
echo "   db.collection.createIndex({ts: 1}, {expireAfterSeconds: 86400})\n\n";

echo "4. 使用快照减少重建开销\n";
echo "   - 定期保存状态快照\n";
echo "   - 从快照开始应用后续事件\n";
?>

7.3 安全性实践

实践内容:确保Timestamp操作的安全性。

推荐理由:Timestamp可能被篡改,需要保护。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== 安全性最佳实践 ===\n\n";

echo "1. 验证Timestamp有效性\n";
echo "   - 检查时间戳范围\n";
echo "   - 验证计数器合理性\n\n";

echo "2. 防止Timestamp伪造\n";
echo "   - 使用服务端生成\n";
echo "   - 避免客户端传入\n\n";

echo "3. 保护操作日志\n";
echo "   - 限制删除权限\n";
echo "   - 定期备份\n\n";

echo "4. 监控异常Timestamp\n";
echo "   - 检测时间跳跃\n";
echo "   - 监控计数器异常\n";
?>

7.4 运维监控实践

实践内容:建立Timestamp相关的运维监控体系。

推荐理由:Timestamp异常可能导致数据不一致。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== 运维监控最佳实践 ===\n\n";

echo "1. 监控指标:\n";
echo "   - Timestamp生成速率\n";
echo "   - 计数器使用率\n";
echo "   - 时间跳跃检测\n\n";

echo "2. 告警规则:\n";
echo "   - 计数器接近溢出\n";
echo "   - 时间戳跳跃超过阈值\n";
echo "   - 生成速率异常\n\n";

echo "3. 定期检查:\n";
echo "   - Oplog大小和增长\n";
echo "   - 同步延迟\n";
echo "   - 版本一致性\n";
?>

8. 常见问题答疑(FAQ)

问题1:Timestamp和Date有什么区别?

问题描述:两种时间类型有什么不同,应该如何选择?

回答内容

主要区别:

  1. 精度:Timestamp精确到秒,Date精确到毫秒
  2. 用途:Timestamp用于内部机制,Date用于业务时间
  3. 结构:Timestamp包含计数器,Date是单一时间值
  4. 排序:Timestamp天然支持操作顺序,Date只表示时间点
php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;

echo "=== Timestamp vs Date ===\n\n";

$ts = new Timestamp(1, time());
$date = new UTCDateTime();

echo "Timestamp:\n";
echo "  类型: " . get_class($ts) . "\n";
echo "  值: {$ts}\n";
echo "  时间戳: " . $ts->getTimestamp() . "\n";
echo "  计数器: " . $ts->getIncrement() . "\n\n";

echo "Date:\n";
echo "  类型: " . get_class($date) . "\n";
echo "  值: {$date}\n";
echo "  毫秒时间戳: " . $date->toDateTime()->format('Y-m-d H:i:s.v') . "\n";
?>

问题2:如何处理Timestamp溢出?

问题描述:计数器是32位整数,会溢出吗?

回答内容

32位无符号整数最大值为4294967295,正常使用不会溢出。但高并发场景需要考虑。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== Timestamp溢出处理 ===\n\n";

$maxIncrement = 4294967295;
echo "计数器最大值: {$maxIncrement}\n";
echo "对应日期: " . date('Y-m-d H:i:s', $maxIncrement) . "\n\n";

echo "溢出处理策略:\n";
echo "1. 检测计数器接近最大值时等待下一秒\n";
echo "2. 使用原子操作保证计数器递增\n";
echo "3. 分布式环境下使用节点ID编码\n";
?>

问题3:Timestamp可以用于业务时间吗?

问题描述:能否用Timestamp记录用户操作时间?

回答内容

不推荐。Timestamp主要用于内部机制,业务时间应使用Date类型。

php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\BSON\UTCDateTime;

echo "=== 业务时间类型选择 ===\n\n";

echo "错误示例 - 使用Timestamp:\n";
echo "  问题: 精度只有秒级,无法记录毫秒\n";
echo "  问题: 计数器对业务无意义\n\n";

echo "正确示例 - 使用Date:\n";
$date = new UTCDateTime();
echo "  创建时间: " . $date->toDateTime()->format('Y-m-d H:i:s.v') . "\n";
?>

问题4:如何实现Timestamp的时间旅行查询?

问题描述:能否查询某个时间点的数据状态?

回答内容

可以通过事件溯源模式实现,记录每个变更的Timestamp。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== 时间旅行查询 ===\n\n";

echo "实现方式:\n";
echo "1. 记录每次数据变更的事件\n";
echo "2. 使用Timestamp标记事件时间\n";
echo "3. 重放到指定Timestamp的事件\n";
echo "4. 重建该时间点的状态\n";
?>

问题5:Timestamp在分片集群中的作用?

问题描述:Timestamp在分片环境中有什么特殊用途?

回答内容

在分片集群中,Timestamp用于chunk迁移和元数据版本控制。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== 分片集群中的Timestamp ===\n\n";

echo "主要用途:\n";
echo "1. Chunk迁移追踪\n";
echo "2. 配置服务器版本控制\n";
echo "3. 跨分片事务协调\n";
echo "4. 元数据变更记录\n";
?>

问题6:如何调试Timestamp相关问题?

问题描述:遇到Timestamp相关问题如何排查?

回答内容

可以通过多种方式调试Timestamp问题。

php
<?php
use MongoDB\BSON\Timestamp;

echo "=== Timestamp调试技巧 ===\n\n";

$ts = new Timestamp(12345, time());

echo "调试方法:\n";
echo "1. 查看Timestamp值: {$ts}\n";
echo "2. 分解组成部分:\n";
echo "   - 时间戳: " . $ts->getTimestamp() . "\n";
echo "   - 计数器: " . $ts->getIncrement() . "\n";
echo "3. 转换为可读时间: " . date('Y-m-d H:i:s', $ts->getTimestamp()) . "\n";
echo "4. 比较两个Timestamp的顺序\n";
?>

9. 实战练习

练习1:实现简单的操作日志系统

练习描述:创建一个操作日志系统,记录所有数据操作。

解题思路

  1. 定义操作日志结构
  2. 实现日志记录方法
  3. 支持按时间查询

参考代码

php
<?php
use MongoDB\BSON\Timestamp;
use MongoDB\Client;

class SimpleOperationLog
{
    private $collection;
    
    public function __construct($collection)
    {
        $this->collection = $collection;
    }
    
    public function log(string $operation, array $data): Timestamp
    {
        $ts = new Timestamp(1, time());
        $this->collection->insertOne([
            'ts' => $ts,
            'operation' => $operation,
            'data' => $data
        ]);
        return $ts;
    }
    
    public function getLogs(?Timestamp $since = null): array
    {
        $query = $since ? ['ts' => ['$gt' => $since]] : [];
        return $this->collection->find($query, ['sort' => ['ts' => 1]])->toArray();
    }
}
?>

练习2:实现数据版本控制

练习描述:为文档实现版本控制,支持查看历史版本。

解题思路

  1. 保存每次修改的版本
  2. 记录修改时间和内容
  3. 支持版本回滚

参考代码

php
<?php
use MongoDB\BSON\Timestamp;

class DocumentVersionControl
{
    private $dataCollection;
    private $historyCollection;
    
    public function save(string $id, array $data): Timestamp
    {
        $ts = new Timestamp(1, time());
        
        $current = $this->dataCollection->findOne(['_id' => $id]);
        if ($current) {
            $this->historyCollection->insertOne([
                'doc_id' => $id,
                'version' => $ts,
                'data' => $current
            ]);
        }
        
        $data['_id'] = $id;
        $data['_version'] = $ts;
        $this->dataCollection->replaceOne(['_id' => $id], $data, ['upsert' => true]);
        
        return $ts;
    }
}
?>

练习3:实现增量同步机制

练习描述:实现两个集合之间的增量同步。

解题思路

  1. 记录每次同步的位置
  2. 查询变更数据
  3. 应用到目标集合

参考代码

php
<?php
use MongoDB\BSON\Timestamp;

class IncrementalSync
{
    private $source;
    private $target;
    private $state;
    
    public function __construct($source, $target, $state)
    {
        $this->source = $source;
        $this->target = $target;
        $this->state = $state;
    }
    
    public function sync(string $syncId): int
    {
        $lastSync = $this->state->findOne(['sync_id' => $syncId]);
        $lastTs = $lastSync ? $lastSync->last_ts : new Timestamp(0, 0);
        
        $changes = $this->source->find(
            ['_ts' => ['$gt' => $lastTs]],
            ['sort' => ['_ts' => 1]]
        );
        
        $count = 0;
        $maxTs = $lastTs;
        
        foreach ($changes as $doc) {
            $this->target->replaceOne(
                ['_id' => $doc->_id],
                $doc,
                ['upsert' => true]
            );
            
            if ($doc->_ts > $maxTs) {
                $maxTs = $doc->_ts;
            }
            $count++;
        }
        
        if ($count > 0) {
            $this->state->replaceOne(
                ['sync_id' => $syncId],
                ['sync_id' => $syncId, 'last_ts' => $maxTs],
                ['upsert' => true]
            );
        }
        
        return $count;
    }
}
?>

10. 知识点总结

核心概念回顾

概念说明重要程度
BSON类型码0x11,64位整数结构⭐⭐⭐
存储结构32位时间戳 + 32位计数器⭐⭐⭐
构造函数new Timestamp(increment, timestamp)⭐⭐⭐
Oplog复制机制的核心组件⭐⭐⭐
比较规则先比时间戳,再比计数器⭐⭐
时区处理UTC存储,本地显示⭐⭐

关键技能掌握

必须掌握

  1. Timestamp的创建和基本操作
  2. 理解Timestamp与Date的区别
  3. 掌握Oplog的基本概念
  4. 能够实现简单的变更追踪

建议掌握

  1. 事件溯源模式的应用
  2. 分布式事务协调机制
  3. 增量同步的实现
  4. 多租户数据隔离策略

最佳实践清单

创建Timestamp

php
$ts = new Timestamp(1, time());

查询变更

php
$changes = $collection->find(['ts' => ['$gt' => $lastTs]]);

记录操作日志

php
$log = ['ts' => new Timestamp(1, time()), 'op' => $operation];

常见错误避免

错误正确做法
用于业务时间使用UTCDateTime
忽略计数器理解计数器的作用
错误的构造顺序记住increment在前
直接比较字符串使用BSON比较

扩展学习方向

  1. 深入Oplog:了解MongoDB复制机制的完整实现
  2. Change Streams:学习MongoDB 3.6+的变更流API
  3. 事务处理:掌握MongoDB多文档事务
  4. 分片集群:了解Timestamp在分片中的作用

11. 拓展参考资料

官方文档

PHP驱动文档

相关技术文章

相关设计模式

  • 事件溯源模式:通过存储事件来重建状态
  • CQRS模式:命令查询职责分离
  • 审计日志模式:记录所有数据变更
  • 增量同步模式:基于时间戳的数据同步

相关章节

版本兼容性

MongoDB版本特性支持
4.0+多文档事务
4.2+分片事务
5.0+增强的时序集合
6.0+Change Stream增强

社区资源