Skip to content

读写锁 RWMutex

1. 概述

读写锁(RWMutex)是 Go 语言中一种特殊的同步原语,它允许多个 Goroutine 同时读取共享资源,但同一时刻只允许一个 Goroutine 写入共享资源。RWMutex 是 sync 包中的一个结构体,是互斥锁(Mutex)的扩展,特别适用于读多写少的场景。

在整个 Go 语言课程体系中,RWMutex 是并发编程的重要组件之一,与 Mutex、通道一起构成了 Go 语言并发模型的核心。掌握 RWMutex 的使用和原理,对于构建高性能的并发系统至关重要。

2. 基本概念

2.1 语法

2.1.1 基本用法

go
import "sync"

// 创建读写锁
var rwmu sync.RWMutex

// 读锁定
rwmu.RLock()
// 读取共享资源
// ...
// 读解锁
rwmu.RUnlock()

// 写锁定
rwmu.Lock()
// 修改共享资源
// ...
// 写解锁
rwmu.Unlock()

// 或者使用 defer 解锁
// 读操作
rwmu.RLock()
defer rwmu.RUnlock()
// 读取共享资源
// ...

// 写操作
rwmu.Lock()
defer rwmu.Unlock()
// 修改共享资源
// ...

2.1.2 示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var rwmu sync.RWMutex
    data := "initial"
    var wg sync.WaitGroup
    
    // 启动 5 个读 Goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            rwmu.RLock()
            defer rwmu.RUnlock()
            fmt.Printf("Reader %d: data = %s\n", id, data)
            time.Sleep(time.Millisecond * 100) // 模拟读操作
        }(i)
    }
    
    // 启动 1 个写 Goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Millisecond * 50) // 确保读操作先开始
        rwmu.Lock()
        defer rwmu.Unlock()
        data = "updated"
        fmt.Println("Writer: data updated")
        time.Sleep(time.Millisecond * 100) // 模拟写操作
    }()
    
    wg.Wait()
    fmt.Printf("Final data: %s\n", data)
}

2.2 语义

  • RLock():获取读锁,如果有写锁被持有,则阻塞直到写锁释放。
  • RUnlock():释放读锁,如果读锁计数器变为 0,则唤醒等待的写锁。
  • Lock():获取写锁,如果有读锁或写锁被持有,则阻塞直到所有锁释放。
  • Unlock():释放写锁,唤醒等待的读锁或写锁。
  • 零值可用:RWMutex 的零值是可用的,不需要初始化。
  • 不可复制:RWMutex 是结构体,不是引用类型,不要复制使用中的 RWMutex。
  • 不可重入:同一个 Goroutine 不能多次获取同一个 RWMutex,否则会导致死锁。

2.3 规范

  • 命名规范:RWMutex 变量通常命名为 rwmu
  • 使用顺序
    • 读操作:先调用 RLock(),后读取共享资源,最后调用 RUnlock()
    • 写操作:先调用 Lock(),后修改共享资源,最后调用 Unlock()
  • defer 解锁:使用 defer rwmu.RUnlock()defer rwmu.Unlock() 确保即使发生 panic 也能正确解锁。
  • 锁粒度:尽量减少持有锁的时间,只在必要时加锁。
  • 避免嵌套锁:避免在持有一个锁的同时获取另一个锁,减少死锁的风险。
  • 不可复制:通过指针传递 RWMutex,避免复制它。
  • 读多写少:RWMutex 适用于读多写少的场景,对于读写比例相近的场景,使用 Mutex 可能性能更好。

3. 原理深度解析

3.1 RWMutex 结构体

RWMutex 的底层实现是一个结构体,在 Go 1.18+ 版本中,其结构如下:

go
type RWMutex struct {
    w           Mutex  // 用于写锁的互斥锁
    writerSem   uint32 // 写锁的信号量
    readerSem   uint32 // 读锁的信号量
    readerCount int32  // 读锁计数器
    readerWait  int32  // 等待写锁的读锁数量
}

其中:

  • w:一个 Mutex,用于保护写锁的获取。
  • writerSem:写锁的信号量,用于唤醒等待的写 Goroutine。
  • readerSem:读锁的信号量,用于唤醒等待的读 Goroutine。
  • readerCount:读锁计数器,记录当前持有读锁的 Goroutine 数量。
  • readerWait:等待写锁的读锁数量,记录在写锁请求到达时已经持有读锁的 Goroutine 数量。

3.2 读锁的获取和释放

3.2.1 RLock 方法实现

RLock 方法的主要步骤:

  1. 使用原子操作增加 readerCount
  2. 如果 readerCount 变为负数,表示有写锁正在持有或等待,此时需要阻塞等待。
  3. 否则,获取读锁成功。

3.2.2 RUnlock 方法实现

RUnlock 方法的主要步骤:

  1. 使用原子操作减少 readerCount
  2. 如果 readerCount 变为负数,表示有写锁正在等待,此时需要减少 readerWait
  3. 如果 readerWait 变为 0,表示所有读锁都已释放,此时唤醒等待的写 Goroutine。

3.3 写锁的获取和释放

3.3.1 Lock 方法实现

Lock 方法的主要步骤:

  1. 获取内部的 Mutex w,确保只有一个 Goroutine 可以请求写锁。
  2. readerCount 取反,变为负数,表示有写锁请求。
  3. 记录当前的读锁数量到 readerWait
  4. 释放内部的 Mutex w
  5. 如果有读锁正在持有,则阻塞等待,直到所有读锁释放。

3.3.2 Unlock 方法实现

Unlock 方法的主要步骤:

  1. 获取内部的 Mutex w
  2. readerCount 恢复为正数,表示写锁释放。
  3. 唤醒等待的读 Goroutine。
  4. 释放内部的 Mutex w

3.4 并发安全

RWMutex 的所有方法都是并发安全的,使用原子操作来修改状态,确保在多 Goroutine 环境中安全使用。

3.5 内存模型

RWMutex 遵循 Go 语言的内存模型,确保以下顺序:

  • 在调用 Unlock() 之前的所有操作,发生在 Lock() 返回之后。
  • 在调用 RUnlock() 之前的所有操作,发生在 RLock() 返回之后。
  • 多个 Goroutine 同时调用 RLock() 时,都可以成功获取读锁,不会互相阻塞。
  • 当有 Goroutine 持有写锁时,其他 Goroutine 无法获取读锁或写锁。

4. 常见错误与踩坑点

4.1 死锁

错误表现:程序卡住,无法继续执行。

产生原因

  • 同一 Goroutine 多次获取同一个 RWMutex。
  • 多个 Goroutine 循环等待对方释放锁。
  • 锁的顺序不一致,导致循环等待。

解决方案

  • 确保每个锁都能正确释放,使用 defer 语句。
  • 保持一致的锁获取顺序。
  • 避免在持有锁时调用可能阻塞的函数。
go
// 错误示例:同一 Goroutine 多次获取同一个 RWMutex
func main() {
    var rwmu sync.RWMutex
    rwmu.RLock()
    rwmu.RLock() // 死锁
    defer rwmu.RUnlock()
    defer rwmu.RUnlock()
}

// 错误示例:循环等待
var rwmu1, rwmu2 sync.RWMutex

func goroutine1() {
    rwmu1.RLock()
    defer rwmu1.RUnlock()
    time.Sleep(time.Millisecond)
    rwmu2.Lock()
    defer rwmu2.Unlock()
}

func goroutine2() {
    rwmu2.RLock()
    defer rwmu2.RUnlock()
    time.Sleep(time.Millisecond)
    rwmu1.Lock()
    defer rwmu1.Unlock()
}

// 正确示例:一致的锁顺序
func process() {
    // 始终先获取 rwmu1,再获取 rwmu2
    rwmu1.Lock()
    defer rwmu1.Unlock()
    rwmu2.Lock()
    defer rwmu2.Unlock()
    // 处理逻辑
}

4.2 忘记解锁

错误表现:其他 Goroutine 无法获取锁,导致死锁。

产生原因:在获取锁后,没有对应的解锁操作,或者在解锁前发生了 panic。

解决方案:始终使用 defer 语句来确保解锁,即使发生 panic 也能正确解锁。

go
// 错误示例:忘记解锁
func readData() {
    rwmu.RLock()
    // 读取数据
    // 忘记调用 rwmu.RUnlock()
}

// 正确示例:使用 defer 解锁
func readData() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 读取数据
}

// 错误示例:忘记解锁
func writeData() {
    rwmu.Lock()
    // 写入数据
    // 忘记调用 rwmu.Unlock()
}

// 正确示例:使用 defer 解锁
func writeData() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 写入数据
}

4.3 读锁定期间修改共享资源

错误表现:导致竞态条件,数据不一致。

产生原因:在读锁定期间修改共享资源,违反了 RWMutex 的使用规范。

解决方案:修改共享资源时必须使用写锁定,而不是读锁定。

go
// 错误示例:读锁定期间修改共享资源
func readAndModify() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    data = "modified" // 错误:读锁定期间修改资源
}

// 正确示例:修改资源时使用写锁定
func readAndModify() {
    rwmu.Lock()
    defer rwmu.Unlock()
    data = "modified" // 正确:写锁定期间修改资源
}

4.4 写操作过于频繁导致读操作饥饿

错误表现:读操作长时间无法获取锁,导致性能下降。

产生原因:写操作过于频繁,导致读操作一直等待。

解决方案

  • 减少写操作的频率,合并多个写操作。
  • 对于读写比例相近的场景,使用 Mutex 而不是 RWMutex。
  • 合理设计数据结构,减少锁的竞争。
go
// 错误示例:频繁的写操作
func updateData() {
    for i := 0; i < 1000; i++ {
        rwmu.Lock()
        data = fmt.Sprintf("value-%d", i)
        rwmu.Unlock()
        time.Sleep(time.Millisecond)
    }
}

// 正确示例:合并写操作
func updateData() {
    rwmu.Lock()
    defer rwmu.Unlock()
    for i := 0; i < 1000; i++ {
        data = fmt.Sprintf("value-%d", i)
    }
}

4.5 复制 RWMutex

错误表现:复制的 RWMutex 与原 RWMutex 状态不同步,导致不可预期的行为。

产生原因:RWMutex 是结构体,不是引用类型,复制后会创建一个新的实例,与原实例状态无关。

解决方案:通过指针传递 RWMutex,而不是复制它。

go
// 错误示例:复制 RWMutex
func worker(rwmu sync.RWMutex) { // 复制 RWMutex
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 处理逻辑
}

// 正确示例:通过指针传递
func worker(rwmu *sync.RWMutex) { // 通过指针传递
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 处理逻辑
}

4.6 锁的使用不当导致性能问题

错误表现:程序运行缓慢,并发性能差。

产生原因

  • 锁的粒度太大,导致锁竞争严重。
  • 在持有锁时执行 I/O 操作或其他阻塞操作。
  • 不必要的锁使用,如对只读操作使用写锁定。

解决方案

  • 最小化锁的粒度,只在必要时加锁。
  • 避免在持有锁时执行阻塞操作。
  • 对于读操作,使用读锁定而不是写锁定。
go
// 错误示例:在持有锁时执行 I/O 操作
func process() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 执行 I/O 操作
    data, err := ioutil.ReadFile("file.txt")
    if err != nil {
        return
    }
    // 处理数据
    sharedResource = processData(data)
}

// 正确示例:先执行 I/O 操作,再获取锁
func process() {
    // 执行 I/O 操作
    data, err := ioutil.ReadFile("file.txt")
    if err != nil {
        return
    }
    // 处理数据
    processedData := processData(data)
    // 只在修改共享资源时加锁
    rwmu.Lock()
    sharedResource = processedData
    rwmu.Unlock()
}

5. 常见应用场景

5.1 读多写少的共享资源

场景描述:多个 Goroutine 需要频繁读取共享资源,偶尔修改共享资源,如配置、缓存等。

使用方法:使用 RWMutex 允许多个读操作同时进行,提高并发性能。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var rwmu sync.RWMutex
    config := "default"
    var wg sync.WaitGroup
    
    // 启动 10 个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                rwmu.RLock()
                defer rwmu.RUnlock()
                fmt.Printf("Reader %d: config = %s\n", id, config)
                time.Sleep(time.Millisecond * 50) // 模拟读操作
            }
        }(i)
    }
    
    // 启动 2 个写 Goroutine
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Millisecond * 100) // 确保读操作先开始
            rwmu.Lock()
            defer rwmu.Unlock()
            newConfig := fmt.Sprintf("config-%d", id)
            config = newConfig
            fmt.Printf("Writer %d: updated config to %s\n", id, newConfig)
            time.Sleep(time.Millisecond * 100) // 模拟写操作
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final config: %s\n", config)
}

5.2 并发安全的缓存

场景描述:在高并发系统中,需要一个线程安全的缓存,支持并发读写。

使用方法:使用 RWMutex 保护缓存,允许多个读操作同时进行,写操作独占。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Item struct {
    Value      interface{}
    Expiration int64
}

type Cache struct {
    items map[string]Item
    rwmu  sync.RWMutex
}

func NewCache() *Cache {
    return &Cache{
        items: make(map[string]Item),
    }
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    var exp int64
    if expiration > 0 {
        exp = time.Now().Add(expiration).UnixNano()
    }
    c.items[key] = Item{
        Value:      value,
        Expiration: exp,
    }
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.rwmu.RLock()
    defer c.rwmu.RUnlock()
    item, found := c.items[key]
    if !found {
        return nil, false
    }
    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        // 异步删除过期项
        go c.Delete(key)
        return nil, false
    }
    return item.Value, true
}

func (c *Cache) Delete(key string) {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    delete(c.items, key)
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup
    
    // 启动 10 个写 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id, j)
                value := fmt.Sprintf("value-%d-%d", id, j)
                cache.Set(key, value, time.Hour)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    // 启动 100 个读 Goroutine
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id%10, j)
                _, _ = cache.Get(key)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
    fmt.Printf("Cache size: %d\n", len(cache.items))
}

5.3 并发安全的配置管理

场景描述:在大型应用中,需要管理全局配置,支持并发读写和热更新。

使用方法:使用 RWMutex 保护配置,允许多个读操作同时进行,写操作独占。

示例代码

go
package main

import (
    "fmt"
    "sync"
)

type Config struct {
    Server struct {
        Host string
        Port int
    }
    Database struct {
        DSN string
    }
}

var (
    config Config
    rwmu   sync.RWMutex
)

func GetConfig() Config {
    rwmu.RLock()
    defer rwmu.RUnlock()
    return config
}

func UpdateConfig(newConfig Config) {
    rwmu.Lock()
    defer rwmu.Unlock()
    config = newConfig
}

func main() {
    // 初始化配置
    UpdateConfig(Config{
        Server: struct {
            Host string
            Port int
        }{Host: "localhost", Port: 8080},
        Database: struct {
            DSN string
        }{DSN: "user:pass@tcp(localhost:3306)/db"},
    })
    
    var wg sync.WaitGroup
    
    // 启动 10 个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            c := GetConfig()
            fmt.Printf("Reader %d: Server=%s:%d, DB=%s\n", id, c.Server.Host, c.Server.Port, c.Database.DSN)
        }(i)
    }
    
    // 启动 1 个写 Goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        newConfig := GetConfig()
        newConfig.Server.Port = 9090
        UpdateConfig(newConfig)
        fmt.Println("Writer: updated server port to 9090")
    }()
    
    wg.Wait()
    fmt.Printf("Final config: Server=%s:%d, DB=%s\n", config.Server.Host, config.Server.Port, config.Database.DSN)
}

5.4 并发安全的计数器

场景描述:在分布式系统中,需要一个高并发的计数器,支持原子操作。

使用方法:使用 RWMutex 保护计数器,允许多个读操作同时进行,写操作独占。

示例代码

go
package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    value int64
    rwmu  sync.RWMutex
}

func NewCounter() *Counter {
    return &Counter{value: 0}
}

func (c *Counter) Increment() int64 {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    c.value++
    return c.value
}

func (c *Counter) Decrement() int64 {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    c.value--
    return c.value
}

func (c *Counter) Get() int64 {
    c.rwmu.RLock()
    defer c.rwmu.RUnlock()
    return c.value
}

func main() {
    counter := NewCounter()
    var wg sync.WaitGroup
    
    // 启动 1000 个写 Goroutine
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    // 启动 500 个读 Goroutine
    for i := 0; i < 500; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Reader %d: counter = %d\n", id, counter.Get())
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter.Get())
}

5.5 并发安全的映射

场景描述:在高并发系统中,需要一个线程安全的映射,支持并发读写。

使用方法:使用 RWMutex 保护映射,允许多个读操作同时进行,写操作独占。

示例代码

go
package main

import (
    "fmt"
    "sync"
)

type ConcurrentMap struct {
    data  map[string]interface{}
    rwmu sync.RWMutex
}

func NewConcurrentMap() *ConcurrentMap {
    return &ConcurrentMap{
        data: make(map[string]interface{}),
    }
}

func (m *ConcurrentMap) Set(key string, value interface{}) {
    m.rwmu.Lock()
    defer m.rwmu.Unlock()
    m.data[key] = value
}

func (m *ConcurrentMap) Get(key string) (interface{}, bool) {
    m.rwmu.RLock()
    defer m.rwmu.RUnlock()
    value, ok := m.data[key]
    return value, ok
}

func (m *ConcurrentMap) Delete(key string) {
    m.rwmu.Lock()
    defer m.rwmu.Unlock()
    delete(m.data, key)
}

func (m *ConcurrentMap) Len() int {
    m.rwmu.RLock()
    defer m.rwmu.RUnlock()
    return len(m.data)
}

func main() {
    m := NewConcurrentMap()
    var wg sync.WaitGroup
    
    // 启动 10 个写 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            value := fmt.Sprintf("value-%d", id)
            m.Set(key, value)
            fmt.Printf("Writer %d: set %s = %s\n", id, key, value)
        }(i)
    }
    
    // 启动 20 个读 Goroutine
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id%10)
            value, ok := m.Get(key)
            if ok {
                fmt.Printf("Reader %d: get %s = %v\n", id, key, value)
            } else {
                fmt.Printf("Reader %d: %s not found\n", id, key)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Map size: %d\n", m.Len())
}

6. 企业级进阶应用场景

6.1 高并发缓存系统

场景描述:在大型 Web 应用中,需要一个高性能的缓存系统,支持并发读写。

使用方法:使用 RWMutex 保护缓存,结合分片技术减少锁竞争。

示例代码

go
package cache

import (
    "sync"
    "time"
)

type Item struct {
    Value      interface{}
    Expiration int64
}

type ShardedCache struct {
    shards []*Cache
    rwmu   []sync.RWMutex
    size   int
}

type Cache struct {
    items map[string]Item
}

func NewShardedCache(shardCount int) *ShardedCache {
    shards := make([]*Cache, shardCount)
    rwmu := make([]sync.RWMutex, shardCount)
    for i := 0; i < shardCount; i++ {
        shards[i] = &Cache{
            items: make(map[string]Item),
        }
    }
    return &ShardedCache{
        shards: shards,
        rwmu:   rwmu,
        size:   shardCount,
    }
}

func (sc *ShardedCache) getShardIndex(key string) int {
    hash := 0
    for _, c := range key {
        hash = (hash << 5) - hash + int(c)
    }
    if hash < 0 {
        hash = -hash
    }
    return hash % sc.size
}

func (sc *ShardedCache) Set(key string, value interface{}, expiration time.Duration) {
    idx := sc.getShardIndex(key)
    sc.rwmu[idx].Lock()
    defer sc.rwmu[idx].Unlock()
    
    var exp int64
    if expiration > 0 {
        exp = time.Now().Add(expiration).UnixNano()
    }
    sc.shards[idx].items[key] = Item{
        Value:      value,
        Expiration: exp,
    }
}

func (sc *ShardedCache) Get(key string) (interface{}, bool) {
    idx := sc.getShardIndex(key)
    sc.rwmu[idx].RLock()
    defer sc.rwmu[idx].RUnlock()
    
    item, found := sc.shards[idx].items[key]
    if !found {
        return nil, false
    }
    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        // 异步删除过期项
        go sc.Delete(key)
        return nil, false
    }
    return item.Value, true
}

func (sc *ShardedCache) Delete(key string) {
    idx := sc.getShardIndex(key)
    sc.rwmu[idx].Lock()
    defer sc.rwmu[idx].Unlock()
    delete(sc.shards[idx].items, key)
}

func (sc *ShardedCache) Size() int {
    total := 0
    for i := 0; i < sc.size; i++ {
        sc.rwmu[i].RLock()
        total += len(sc.shards[i].items)
        sc.rwmu[i].RUnlock()
    }
    return total
}

6.2 分布式配置中心

场景描述:在分布式系统中,需要一个配置中心,支持配置的并发读写和实时更新。

使用方法:使用 RWMutex 保护配置,结合发布-订阅模式实现配置更新通知。

示例代码

go
package config

import (
    "sync"
)

type ConfigCenter struct {
    config     map[string]interface{}
    rwmu       sync.RWMutex
    subscribers []chan map[string]interface{}
    submu      sync.Mutex
}

func NewConfigCenter() *ConfigCenter {
    return &ConfigCenter{
        config:     make(map[string]interface{}),
        subscribers: make([]chan map[string]interface{}, 0),
    }
}

func (cc *ConfigCenter) Get(key string) (interface{}, bool) {
    cc.rwmu.RLock()
    defer cc.rwmu.RUnlock()
    value, ok := cc.config[key]
    return value, ok
}

func (cc *ConfigCenter) Set(key string, value interface{}) {
    cc.rwmu.Lock()
    cc.config[key] = value
    configCopy := make(map[string]interface{})
    for k, v := range cc.config {
        configCopy[k] = v
    }
    cc.rwmu.Unlock()
    
    // 通知订阅者
    cc.submu.Lock()
    for _, ch := range cc.subscribers {
        select {
        case ch <- configCopy:
        default:
        }
    }
    cc.submu.Unlock()
}

func (cc *ConfigCenter) Subscribe() chan map[string]interface{} {
    ch := make(chan map[string]interface{}, 1)
    cc.submu.Lock()
    cc.subscribers = append(cc.subscribers, ch)
    cc.submu.Unlock()
    return ch
}

func (cc *ConfigCenter) Unsubscribe(ch chan map[string]interface{}) {
    cc.submu.Lock()
    defer cc.submu.Unlock()
    for i, c := range cc.subscribers {
        if c == ch {
            cc.subscribers = append(cc.subscribers[:i], cc.subscribers[i+1:]...)
            close(ch)
            break
        }
    }
}

6.3 并发安全的事件总线

场景描述:在事件驱动系统中,需要一个并发安全的事件总线,支持事件的发布和订阅。

使用方法:使用 RWMutex 保护事件订阅者列表,允许多个订阅者同时订阅,发布者独占发布。

示例代码

go
package event

import (
    "sync"
)

type Event struct {
    Type    string
    Payload interface{}
}

type EventHandler func(event Event)

type EventBus struct {
    handlers map[string][]EventHandler
    rwmu     sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        handlers: make(map[string][]EventHandler),
    }
}

func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
    eb.rwmu.Lock()
    defer eb.rwmu.Unlock()
    eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}

func (eb *EventBus) Unsubscribe(eventType string, handler EventHandler) {
    eb.rwmu.Lock()
    defer eb.rwmu.Unlock()
    if handlers, ok := eb.handlers[eventType]; ok {
        for i, h := range handlers {
            if h == handler {
                eb.handlers[eventType] = append(handlers[:i], handlers[i+1:]...)
                break
            }
        }
    }
}

func (eb *EventBus) Publish(event Event) {
    eb.rwmu.RLock()
    handlers := eb.handlers[event.Type]
    eb.rwmu.RUnlock()
    
    for _, handler := range handlers {
        handler(event)
    }
}

6.4 并发安全的限流系统

场景描述:在 API 服务中,需要一个限流系统,支持并发的流量控制。

使用方法:使用 RWMutex 保护限流计数器,允许多个请求同时检查限流,更新限流计数时独占。

示例代码

go
package rate

import (
    "sync"
    "time"
)

type RateLimiter struct {
    limit      int           // 限制的请求数
    window     time.Duration // 时间窗口
    requests   []time.Time   // 请求时间列表
    rwmu       sync.RWMutex
}

func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
    return &RateLimiter{
        limit:    limit,
        window:   window,
        requests: make([]time.Time, 0),
    }
}

func (rl *RateLimiter) Allow() bool {
    now := time.Now()
    
    // 清理过期的请求
    rl.rwmu.Lock()
    defer rl.rwmu.Unlock()
    
    // 移除时间窗口外的请求
    cutoff := now.Add(-rl.window)
    i := 0
    for ; i < len(rl.requests); i++ {
        if rl.requests[i].After(cutoff) {
            break
        }
    }
    rl.requests = rl.requests[i:]
    
    // 检查是否超过限制
    if len(rl.requests) >= rl.limit {
        return false
    }
    
    // 添加当前请求
    rl.requests = append(rl.requests, now)
    return true
}

func (rl *RateLimiter) Reset() {
    rl.rwmu.Lock()
    defer rl.rwmu.Unlock()
    rl.requests = make([]time.Time, 0)
}

6.5 并发安全的会话管理

场景描述:在 Web 应用中,需要一个会话管理系统,支持并发的会话创建、获取和销毁。

使用方法:使用 RWMutex 保护会话映射,允许多个请求同时获取会话,创建和销毁会话时独占。

示例代码

go
package session

import (
    "sync"
    "time"
)

type Session struct {
    ID        string
    Data      map[string]interface{}
    CreatedAt time.Time
    ExpiresAt time.Time
}

type SessionManager struct {
    sessions map[string]*Session
    rwmu     sync.RWMutex
    ttl      time.Duration
}

func NewSessionManager(ttl time.Duration) *SessionManager {
    sm := &SessionManager{
        sessions: make(map[string]*Session),
        ttl:      ttl,
    }
    // 启动清理过期会话的 Goroutine
    go sm.cleanup()
    return sm
}

func (sm *SessionManager) Create() *Session {
    id := generateSessionID()
    now := time.Now()
    session := &Session{
        ID:        id,
        Data:      make(map[string]interface{}),
        CreatedAt: now,
        ExpiresAt: now.Add(sm.ttl),
    }
    
    sm.rwmu.Lock()
    sm.sessions[id] = session
    sm.rwmu.Unlock()
    
    return session
}

func (sm *SessionManager) Get(id string) (*Session, bool) {
    sm.rwmu.RLock()
    session, ok := sm.sessions[id]
    sm.rwmu.RUnlock()
    
    if !ok {
        return nil, false
    }
    
    // 检查是否过期
    if time.Now().After(session.ExpiresAt) {
        sm.Delete(id)
        return nil, false
    }
    
    return session, true
}

func (sm *SessionManager) Delete(id string) {
    sm.rwmu.Lock()
    delete(sm.sessions, id)
    sm.rwmu.Unlock()
}

func (sm *SessionManager) cleanup() {
    for {
        time.Sleep(time.Minute)
        now := time.Now()
        
        sm.rwmu.Lock()
        for id, session := range sm.sessions {
            if now.After(session.ExpiresAt) {
                delete(sm.sessions, id)
            }
        }
        sm.rwmu.Unlock()
    }
}

func generateSessionID() string {
    // 生成唯一的会话 ID
    // 实际实现中可以使用 UUID 或其他方法
    return fmt.Sprintf("session-%d", time.Now().UnixNano())
}

7. 行业最佳实践

7.1 始终使用 defer 解锁

实践内容:使用 defer 语句确保锁的释放。

推荐理由defer 语句可以确保即使发生 panic,锁也能正确释放,避免死锁。

示例

go
func readData() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 读取数据
}

func writeData() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 写入数据
}

7.2 最小化锁的粒度

实践内容:只对需要保护的共享资源加锁,避免对整个函数加锁。

推荐理由:最小化锁的粒度可以提高并发性能,减少锁竞争。

示例

go
// 错误示例:对整个函数加锁
func process() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 执行不需要锁的操作
    fmt.Println("Processing...")
    // 修改共享资源
    sharedResource = newValue
}

// 正确示例:只对共享资源加锁
func process() {
    // 执行不需要锁的操作
    fmt.Println("Processing...")
    // 只对共享资源加锁
    rwmu.Lock()
    sharedResource = newValue
    rwmu.Unlock()
}

7.3 避免在持有锁时执行阻塞操作

实践内容:避免在持有锁时执行 I/O 操作、网络请求等阻塞操作。

推荐理由:在持有锁时执行阻塞操作会增加锁的持有时间,降低并发性能,增加死锁的风险。

示例

go
// 错误示例:在持有锁时执行 I/O 操作
func process() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 执行 I/O 操作
    data, err := ioutil.ReadFile("file.txt")
    if err != nil {
        return
    }
    // 处理数据
    sharedResource = processData(data)
}

// 正确示例:先执行 I/O 操作,再获取锁
func process() {
    // 执行 I/O 操作
    data, err := ioutil.ReadFile("file.txt")
    if err != nil {
        return
    }
    // 处理数据
    processedData := processData(data)
    // 只在修改共享资源时加锁
    rwmu.Lock()
    sharedResource = processedData
    rwmu.Unlock()
}

7.4 选择合适的同步原语

实践内容:根据具体场景选择合适的同步原语。

推荐理由:不同的同步原语有不同的适用场景,选择合适的同步原语可以提高性能和代码可读性。

示例

  • 对于读多写少的场景,使用 RWMutex。
  • 对于读写比例相近的场景,使用 Mutex。
  • 对于简单的计数器,使用 atomic 包。
  • 对于需要等待条件满足的场景,使用 Cond。

7.5 监控锁的使用

实践内容:在生产环境中监控锁的使用情况,如锁的持有时间、竞争情况等。

推荐理由:监控锁的使用情况可以帮助发现潜在的性能问题和死锁风险。

示例

go
// 使用监控工具监控锁的使用
func readData() {
    start := time.Now()
    rwmu.RLock()
    defer func() {
        rwmu.RUnlock()
        duration := time.Since(start)
        metrics.RecordLockHoldTime("readData", duration)
    }()
    // 读取数据
}

func writeData() {
    start := time.Now()
    rwmu.Lock()
    defer func() {
        rwmu.Unlock()
        duration := time.Since(start)
        metrics.RecordLockHoldTime("writeData", duration)
    }()
    // 写入数据
}

7.6 合理设计数据结构

实践内容:合理设计数据结构,减少锁的竞争。

推荐理由:良好的数据结构设计可以减少锁的竞争,提高并发性能。

示例

  • 使用分片技术,将数据分成多个片段,每个片段使用独立的锁。
  • 使用无锁数据结构,如 atomic 包提供的原子操作。
  • 使用通道进行 Goroutine 间的通信,减少共享状态。

7.7 避免嵌套锁

实践内容:避免在持有一个锁的同时获取另一个锁。

推荐理由:嵌套锁容易导致死锁,尤其是当多个 Goroutine 以不同的顺序获取锁时。

示例

go
// 错误示例:嵌套锁
func process() {
    rwmu1.Lock()
    defer rwmu1.Unlock()
    // ...
    rwmu2.Lock()
    defer rwmu2.Unlock()
    // ...
}

// 正确示例:避免嵌套锁
func process() {
    // 先获取所有需要的锁
    rwmu1.Lock()
    rwmu2.Lock()
    // 处理逻辑
    rwmu2.Unlock()
    rwmu1.Unlock()
}

7.8 定期清理过期数据

实践内容:定期清理过期数据,减少锁的竞争。

推荐理由:过期数据会增加锁的竞争,定期清理可以提高并发性能。

示例

go
func (c *Cache) cleanup() {
    for {
        time.Sleep(time.Minute)
        c.rwmu.Lock()
        now := time.Now().UnixNano()
        for k, v := range c.items {
            if v.Expiration > 0 && now > v.Expiration {
                delete(c.items, k)
            }
        }
        c.rwmu.Unlock()
    }
}

8. 常见问题答疑(FAQ)

8.1 RWMutex 和 Mutex 有什么区别?

问题描述:RWMutex 和 Mutex 都是同步原语,它们有什么区别?

回答内容

  • Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
  • RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
  • 使用场景
    • 当读写比例相近时,使用 Mutex。
    • 当读操作远多于写操作时,使用 RWMutex 可以提高并发性能。

示例代码

go
// 使用 Mutex
var mu sync.Mutex

// 使用 RWMutex
var rwmu sync.RWMutex

8.2 RWMutex 的性能优势是什么?

问题描述:RWMutex 的性能优势是什么?

回答内容

  • RWMutex 允许多个读操作同时进行,减少了锁的竞争,提高了并发性能。
  • 在读多写少的场景中,RWMutex 的性能远优于 Mutex。
  • 读操作不需要互斥,可以并行执行,充分利用多核 CPU。

示例代码

go
// 读多写少的场景,使用 RWMutex
func readHeavy() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 读取操作
}

func writeOccasional() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 写入操作
}

8.3 RWMutex 有什么缺点?

问题描述:RWMutex 有什么缺点?

回答内容

  • RWMutex 的实现比 Mutex 复杂,内存开销更大。
  • 在写操作频繁的场景中,RWMutex 的性能可能不如 Mutex。
  • 写操作需要等待所有读操作完成,可能导致写操作饥饿。
  • RWMutex 的读锁和写锁不能互相升级或降级。

示例代码

go
// 写操作频繁的场景,使用 Mutex 可能更好
func writeHeavy() {
    mu.Lock()
    defer mu.Unlock()
    // 写入操作
}

8.4 如何避免 RWMutex 中的写操作饥饿?

问题描述:如何避免 RWMutex 中的写操作饥饿?

回答内容

  • 减少写操作的持有时间,尽快释放写锁。
  • 合并多个写操作,减少写操作的频率。
  • 对于读写比例相近的场景,使用 Mutex 而不是 RWMutex。
  • 合理设计数据结构,减少锁的竞争。

示例代码

go
// 合并写操作,减少写操作频率
func batchUpdate(data []string) {
    rwmu.Lock()
    defer rwmu.Unlock()
    for _, item := range data {
        // 处理写操作
    }
}

8.5 RWMutex 是可重入的吗?

问题描述:RWMutex 是可重入的吗?

回答内容

  • 不是,RWMutex 不是可重入的。
  • 同一个 Goroutine 不能多次获取同一个 RWMutex 的读锁或写锁,否则会导致死锁。
  • 如果需要可重入的锁,可以使用 sync.RWMutex 的包装实现,或者使用其他同步原语。

示例代码

go
// 错误示例:RWMutex 不可重入
func main() {
    var rwmu sync.RWMutex
    rwmu.RLock()
    rwmu.RLock() // 死锁
    defer rwmu.RUnlock()
    defer rwmu.RUnlock()
}

8.6 如何处理 RWMutex 的复制问题?

问题描述:如何处理 RWMutex 的复制问题?

回答内容

  • RWMutex 是结构体,不是引用类型,复制后会创建一个新的实例,与原实例状态无关。
  • 避免复制使用中的 RWMutex,通过指针传递 RWMutex。
  • 在结构体中嵌入 RWMutex 时,注意不要复制整个结构体。

示例代码

go
// 错误示例:复制 RWMutex
func worker(rwmu sync.RWMutex) { // 复制 RWMutex
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 处理逻辑
}

// 正确示例:通过指针传递
func worker(rwmu *sync.RWMutex) { // 通过指针传递
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 处理逻辑
}

9. 实战练习

9.1 基础练习:并发安全的缓存

题目:使用 RWMutex 实现一个并发安全的缓存,支持设置、获取和删除操作,以及过期时间。

解题思路

  • 使用 RWMutex 保护缓存映射。
  • 实现 Set、Get 和 Delete 方法。
  • 支持设置过期时间,定期清理过期项。
  • 测试多个 Goroutine 同时访问缓存。

常见误区

  • 在读操作时使用写锁定,导致性能下降。
  • 没有正确处理并发访问,导致竞态条件。
  • 没有清理过期项,导致缓存膨胀。

分步提示

  1. 定义缓存项结构体,包含值和过期时间。
  2. 定义缓存结构体,包含映射和 RWMutex。
  3. 实现 Set 方法,使用写锁定保护缓存的设置操作。
  4. 实现 Get 方法,使用读锁定保护缓存的获取操作,并检查过期时间。
  5. 实现 Delete 方法,使用写锁定保护缓存的删除操作。
  6. 启动一个 Goroutine 定期清理过期项。
  7. 测试多个 Goroutine 同时访问缓存。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Item struct {
    Value      interface{}
    Expiration int64
}

type Cache struct {
    items map[string]Item
    rwmu  sync.RWMutex
}

func NewCache() *Cache {
    c := &Cache{
        items: make(map[string]Item),
    }
    // 启动清理过期项的 Goroutine
    go c.cleanup()
    return c
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    var exp int64
    if expiration > 0 {
        exp = time.Now().Add(expiration).UnixNano()
    }
    c.items[key] = Item{
        Value:      value,
        Expiration: exp,
    }
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.rwmu.RLock()
    defer c.rwmu.RUnlock()
    item, found := c.items[key]
    if !found {
        return nil, false
    }
    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        // 异步删除过期项
        go c.Delete(key)
        return nil, false
    }
    return item.Value, true
}

func (c *Cache) Delete(key string) {
    c.rwmu.Lock()
    defer c.rwmu.Unlock()
    delete(c.items, key)
}

func (c *Cache) cleanup() {
    for {
        time.Sleep(time.Minute)
        c.rwmu.Lock()
        now := time.Now().UnixNano()
        for k, v := range c.items {
            if v.Expiration > 0 && now > v.Expiration {
                delete(c.items, k)
            }
        }
        c.rwmu.Unlock()
    }
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup
    
    // 启动 10 个写 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id, j)
                value := fmt.Sprintf("value-%d-%d", id, j)
                cache.Set(key, value, time.Hour)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    // 启动 100 个读 Goroutine
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id%10, j)
                _, _ = cache.Get(key)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
    fmt.Printf("Cache size: %d\n", len(cache.items))
}

9.2 进阶练习:并发安全的配置管理

题目:使用 RWMutex 实现一个并发安全的配置管理系统,支持配置的读写和热更新。

解题思路

  • 使用 RWMutex 保护配置映射。
  • 实现 Get、Set 和 Reload 方法。
  • 支持从文件加载配置。
  • 测试多个 Goroutine 同时访问配置。

常见误区

  • 在读操作时使用写锁定,导致性能下降。
  • 没有正确处理配置加载错误。
  • 没有保护配置的并发访问,导致竞态条件。

分步提示

  1. 定义配置结构体。
  2. 定义配置管理器结构体,包含配置映射和 RWMutex。
  3. 实现 Get 方法,使用读锁定保护配置的读取操作。
  4. 实现 Set 方法,使用写锁定保护配置的设置操作。
  5. 实现 Reload 方法,从文件加载配置并更新。
  6. 测试多个 Goroutine 同时访问配置。

参考代码

go
package main

import (
    "encoding/json"
    "fmt"
    "os"
    "sync"
)

type Config struct {
    Server struct {
        Host string `json:"host"`
        Port int    `json:"port"`
    } `json:"server"`
    Database struct {
        DSN string `json:"dsn"`
    } `json:"database"`
}

type ConfigManager struct {
    config Config
    rwmu   sync.RWMutex
}

func NewConfigManager() *ConfigManager {
    return &ConfigManager{}
}

func (cm *ConfigManager) Get() Config {
    cm.rwmu.RLock()
    defer cm.rwmu.RUnlock()
    return cm.config
}

func (cm *ConfigManager) Set(config Config) {
    cm.rwmu.Lock()
    defer cm.rwmu.Unlock()
    cm.config = config
}

func (cm *ConfigManager) Reload(filePath string) error {
    data, err := os.ReadFile(filePath)
    if err != nil {
        return err
    }
    
    var newConfig Config
    if err := json.Unmarshal(data, &newConfig); err != nil {
        return err
    }
    
    cm.Set(newConfig)
    return nil
}

func main() {
    cm := NewConfigManager()
    
    // 初始化配置
    cm.Set(Config{
        Server: struct {
            Host string `json:"host"`
            Port int    `json:"port"`
        }{Host: "localhost", Port: 8080},
        Database: struct {
            DSN string `json:"dsn"`
        }{DSN: "user:pass@tcp(localhost:3306)/db"},
    })
    
    var wg sync.WaitGroup
    
    // 启动 10 个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                config := cm.Get()
                fmt.Printf("Reader %d: Server=%s:%d\n", id, config.Server.Host, config.Server.Port)
            }
        }(i)
    }
    
    // 启动 2 个写 Goroutine
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            newConfig := cm.Get()
            newConfig.Server.Port = 8080 + id
            cm.Set(newConfig)
            fmt.Printf("Writer %d: updated server port to %d\n", id, newConfig.Server.Port)
        }(i)
    }
    
    wg.Wait()
    finalConfig := cm.Get()
    fmt.Printf("Final config: Server=%s:%d, DB=%s\n", finalConfig.Server.Host, finalConfig.Server.Port, finalConfig.Database.DSN)
}

9.3 挑战练习:分片缓存

题目:使用 RWMutex 实现一个分片缓存,减少锁竞争,提高并发性能。

解题思路

  • 将缓存分成多个分片,每个分片使用独立的 RWMutex。
  • 实现 Set、Get 和 Delete 方法,根据键的哈希值选择分片。
  • 测试多个 Goroutine 同时访问缓存。

常见误区

  • 哈希函数设计不合理,导致分片不均匀。
  • 没有正确处理分片的并发访问。
  • 没有清理过期项,导致缓存膨胀。

分步提示

  1. 定义分片数量和分片结构体。
  2. 实现哈希函数,根据键的哈希值选择分片。
  3. 实现 Set 方法,根据键的哈希值选择分片并设置值。
  4. 实现 Get 方法,根据键的哈希值选择分片并获取值。
  5. 实现 Delete 方法,根据键的哈希值选择分片并删除值。
  6. 启动 Goroutine 定期清理每个分片的过期项。
  7. 测试多个 Goroutine 同时访问缓存。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Item struct {
    Value      interface{}
    Expiration int64
}

type Shard struct {
    items map[string]Item
    rwmu  sync.RWMutex
}

type ShardedCache struct {
    shards []*Shard
    size   int
}

func NewShardedCache(shardCount int) *ShardedCache {
    shards := make([]*Shard, shardCount)
    for i := 0; i < shardCount; i++ {
        shards[i] = &Shard{
            items: make(map[string]Item),
        }
        // 启动清理过期项的 Goroutine
        go shards[i].cleanup()
    }
    return &ShardedCache{
        shards: shards,
        size:   shardCount,
    }
}

func (sc *ShardedCache) getShardIndex(key string) int {
    hash := 0
    for _, c := range key {
        hash = (hash << 5) - hash + int(c)
    }
    if hash < 0 {
        hash = -hash
    }
    return hash % sc.size
}

func (sc *ShardedCache) Set(key string, value interface{}, expiration time.Duration) {
    idx := sc.getShardIndex(key)
    shard := sc.shards[idx]
    shard.rwmu.Lock()
    defer shard.rwmu.Unlock()
    
    var exp int64
    if expiration > 0 {
        exp = time.Now().Add(expiration).UnixNano()
    }
    shard.items[key] = Item{
        Value:      value,
        Expiration: exp,
    }
}

func (sc *ShardedCache) Get(key string) (interface{}, bool) {
    idx := sc.getShardIndex(key)
    shard := sc.shards[idx]
    shard.rwmu.RLock()
    defer shard.rwmu.RUnlock()
    
    item, found := shard.items[key]
    if !found {
        return nil, false
    }
    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        // 异步删除过期项
        go sc.Delete(key)
        return nil, false
    }
    return item.Value, true
}

func (sc *ShardedCache) Delete(key string) {
    idx := sc.getShardIndex(key)
    shard := sc.shards[idx]
    shard.rwmu.Lock()
    defer shard.rwmu.Unlock()
    delete(shard.items, key)
}

func (s *Shard) cleanup() {
    for {
        time.Sleep(time.Minute)
        s.rwmu.Lock()
        now := time.Now().UnixNano()
        for k, v := range s.items {
            if v.Expiration > 0 && now > v.Expiration {
                delete(s.items, k)
            }
        }
        s.rwmu.Unlock()
    }
}

func main() {
    cache := NewShardedCache(16) // 16 个分片
    var wg sync.WaitGroup
    
    // 启动 100 个写 Goroutine
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id, j)
                value := fmt.Sprintf("value-%d-%d", id, j)
                cache.Set(key, value, time.Hour)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    // 启动 1000 个读 Goroutine
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id%100, j)
                _, _ = cache.Get(key)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

## 10. 知识点总结

### 10.1 核心要点

- **读写锁的特点**:允许多个读操作同时进行,写操作独占,适用于读多写少的场景。
- **基本操作**
  - `RLock()`:获取读锁
  - `RUnlock()`:释放读锁
  - `Lock()`:获取写锁
  - `Unlock()`:释放写锁
- **实现原理**:使用内部 Mutex、信号量和计数器来管理读写锁的状态。
- **并发安全**:所有方法都是并发安全的,使用原子操作来修改状态。
- **内存模型**:确保锁操作的顺序性,保证并发操作的正确性。
- **使用场景**:读多写少的共享资源、缓存、配置管理等。

### 10.2 易错点回顾

- **死锁**:同一 Goroutine 多次获取同一个 RWMutex,或循环等待锁。
- **忘记解锁**:未使用 `defer` 语句确保解锁,导致死锁。
- **读锁定期间修改资源**:违反 RWMutex 的使用规范,导致竞态条件。
- **写操作过于频繁**:导致读操作饥饿,性能下降。
- **复制 RWMutex**:RWMutex 是结构体,不是引用类型,复制后状态不同步。
- **锁的粒度太大**:导致锁竞争严重,并发性能差。
- **在持有锁时执行阻塞操作**:增加锁的持有时间,降低并发性能。

## 11. 拓展参考资料

### 11.1 官方文档链接

- [Go 官方文档 - sync.RWMutex](https://pkg.go.dev/sync#RWMutex)
- [Go 语言规范 - 内存模型](https://go.dev/ref/mem)

### 11.2 进阶学习路径建议

- **并发编程基础**:学习 Goroutine、Channel、WaitGroup 等基本概念。
- **同步原语**:深入学习 Mutex、RWMutex、Once、Cond 等同步原语。
- **并发模式**:学习生产者-消费者、工作池、扇入扇出等并发模式。
- **性能优化**:学习如何减少锁竞争、使用无锁数据结构、优化并发性能。
- **竞态检测**:学习如何使用 `-race` 标志检测竞态条件。
- **分布式并发**:学习分布式系统中的并发控制和一致性协议。

### 11.3 相关资源

- 《Go 并发编程实战》
- 《Go 语言程序设计》
- [Go by Example - Mutex](https://gobyexample.com/mutex)
- [Go by Example - RWMutex](https://gobyexample.com/rwmutex)
- [Go 语言并发编程](https://github.com/golang/go/wiki/LearnConcurrency)