Skip to content

同步原语 sync

1. 概述

sync 包是 Go 语言中提供同步原语的核心包,包含了一系列用于并发编程的同步工具。这些同步原语是构建线程安全程序的基础,能够帮助开发者解决并发访问共享资源时的竞态条件问题。

在整个 Go 语言课程体系中,sync 包是并发编程的重要组成部分,与 Goroutine、通道一起构成了 Go 语言并发模型的核心。掌握 sync 包中的同步原语,对于构建可靠、高效的并发系统至关重要。

2. 基本概念

2.1 语法

2.1.1 基本用法

go
import "sync"

// 创建互斥锁
var mu sync.Mutex

// 加锁
mu.Lock()
// 访问共享资源
// ...
// 解锁
mu.Unlock()

// 创建读写锁
var rwmu sync.RWMutex

// 读锁定
rwmu.RLock()
// 读取共享资源
// ...
// 读解锁
rwmu.RUnlock()

// 写锁定
rwmu.Lock()
// 修改共享资源
// ...
// 写解锁
rwmu.Unlock()

// 创建 WaitGroup
var wg sync.WaitGroup

// 增加计数
wg.Add(1)
// 启动 Goroutine
// ...
// 减少计数
wg.Done()
// 等待完成
wg.Wait()

// 创建 Once
var once sync.Once

// 执行只执行一次的操作
once.Do(func() {
    // 初始化操作
})

// 创建 Cond
var cond = sync.NewCond(&sync.Mutex{})

// 等待条件
cond.L.Lock()
for !condition {
    cond.Wait()
}
// 访问共享资源
cond.L.Unlock()

// 通知等待的 Goroutine
cond.Signal()  // 通知一个
cond.Broadcast()  // 通知所有

2.1.2 示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // 示例 1: Mutex
    var mu sync.Mutex
    counter := 0
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            defer mu.Unlock()
            counter++
        }()
    }
    
    wg.Wait()
    fmt.Printf("Mutex example: counter = %d\n", counter)
    
    // 示例 2: RWMutex
    var rwmu sync.RWMutex
    data := "initial"
    
    // 多个读操作
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            rwmu.RLock()
            defer rwmu.RUnlock()
            fmt.Printf("Reader %d: data = %s\n", id, data)
            time.Sleep(time.Millisecond * 100)
        }(i)
    }
    
    // 一个写操作
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Millisecond * 50) // 确保读操作先开始
        rwmu.Lock()
        defer rwmu.Unlock()
        data = "updated"
        fmt.Println("Writer: data updated")
    }()
    
    wg.Wait()
    fmt.Printf("RWMutex example: final data = %s\n", data)
    
    // 示例 3: Once
    var once sync.Once
    initialized := false
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            once.Do(func() {
                initialized = true
                fmt.Println("Once: initialized")
            })
            fmt.Printf("Goroutine %d: initialized = %v\n", id, initialized)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All examples completed")
}

2.2 语义

  • Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
  • RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
  • WaitGroup:等待一组 Goroutine 完成,用于协调多个 Goroutine 的执行。
  • Once:保证某个操作只执行一次,用于初始化等场景。
  • Cond:条件变量,用于等待或宣布某个条件的发生。
  • Pool:对象池,用于存储和复用临时对象,减少内存分配。
  • Map:并发安全的映射,提供了线程安全的键值对存储。

2.3 规范

  • 命名规范:同步原语变量通常使用简短的名称,如 murwmuwgonce 等。
  • 使用顺序
    • 对于 Mutex 和 RWMutex,总是先加锁,后访问共享资源,最后解锁。
    • 对于 WaitGroup,先调用 Add(),然后启动 Goroutine,在 Goroutine 中调用 Done(),最后调用 Wait()
    • 对于 Once,只需要调用 Do() 方法。
    • 对于 Cond,先获取锁,然后检查条件,等待条件满足,最后解锁。
  • 错误处理:同步原语本身不返回错误,但需要确保正确使用,避免死锁、活锁等问题。
  • 性能考虑:根据场景选择合适的同步原语,如读多写少的场景使用 RWMutex,需要等待多个 Goroutine 完成时使用 WaitGroup。

3. 原理深度解析

3.1 Mutex 原理

Mutex 是最基本的同步原语,用于保护共享资源。其底层实现使用了 CAS(Compare-And-Swap)操作和信号量:

  1. 状态标记:Mutex 内部有一个状态标记,用于表示锁是否被持有。
  2. CAS 操作:尝试通过 CAS 操作获取锁,如果成功则设置状态为锁定。
  3. 信号量:如果锁被占用,等待的 Goroutine 会被放入等待队列,通过信号量唤醒。
  4. 自旋锁:在尝试获取锁时,会先进行短暂的自旋,减少上下文切换的开销。

3.2 RWMutex 原理

RWMutex 是读写锁,允许多个读操作同时进行,提高并发性能:

  1. 状态标记:RWMutex 内部维护了读计数器和写锁状态。
  2. 读锁定:读锁定时,检查是否有写锁,如果没有则增加读计数器。
  3. 写锁定:写锁定时,检查是否有读锁或写锁,如果有则等待。
  4. 优先级:写操作通常比读操作有更高的优先级,避免写饥饿。

3.3 WaitGroup 原理

WaitGroup 用于等待一组 Goroutine 完成:

  1. 计数器:内部维护一个计数器,初始值为 0。
  2. Add 方法:增加计数器的值。
  3. Done 方法:减少计数器的值,当计数器变为 0 时,唤醒所有等待的 Goroutine。
  4. Wait 方法:阻塞当前 Goroutine,直到计数器变为 0。

3.4 Once 原理

Once 保证某个操作只执行一次:

  1. 状态标记:内部维护一个 done 标记,表示操作是否已经执行。
  2. 双重检查锁定:使用双重检查锁定模式,先检查 done 标记,再获取锁,提高性能。
  3. 原子操作:使用原子操作检查和设置 done 标记,确保并发安全。

3.5 Cond 原理

Cond 用于等待或宣布某个条件的发生:

  1. 锁关联:Cond 关联一个 Mutex,用于保护条件变量。
  2. 等待队列:维护一个等待队列,存储等待条件的 Goroutine。
  3. Wait 方法:释放锁,将 Goroutine 放入等待队列,等待被唤醒。
  4. Signal/Broadcast 方法:唤醒等待队列中的 Goroutine。

3.6 Pool 原理

Pool 用于存储和复用临时对象:

  1. 本地缓存:每个 Goroutine 有本地缓存,减少锁竞争。
  2. 共享缓存:多个 Goroutine 共享一个全局缓存。
  3. 垃圾回收:当垃圾回收发生时,Pool 会清空缓存。

3.7 Map 原理

Map 是并发安全的映射:

  1. 分段锁:内部使用多个分段,每个分段有自己的锁,减少锁竞争。
  2. 读写分离:读操作不需要加锁,写操作需要加锁。
  3. 惰性删除:删除操作只是标记为删除,实际删除在后续操作中进行。

4. 常见错误与踩坑点

4.1 死锁

错误表现:程序卡住,无法继续执行。

产生原因

  • 同一 Goroutine 多次获取同一把锁。
  • 多个 Goroutine 循环等待对方释放锁。
  • 锁的顺序不一致,导致循环等待。

解决方案

  • 确保每个锁都能正确释放,使用 defer 语句。
  • 保持一致的锁获取顺序。
  • 避免在持有锁时调用可能阻塞的函数。
go
// 错误示例:循环等待
var mu1, mu2 sync.Mutex

func goroutine1() {
    mu1.Lock()
    defer mu1.Unlock()
    time.Sleep(time.Millisecond)
    mu2.Lock()
    defer mu2.Unlock()
    fmt.Println("Goroutine 1 completed")
}

func goroutine2() {
    mu2.Lock()
    defer mu2.Unlock()
    time.Sleep(time.Millisecond)
    mu1.Lock()
    defer mu1.Unlock()
    fmt.Println("Goroutine 2 completed")
}

// 正确示例:一致的锁顺序
func goroutine1() {
    mu1.Lock()
    defer mu1.Unlock()
    mu2.Lock()
    defer mu2.Unlock()
    fmt.Println("Goroutine 1 completed")
}

func goroutine2() {
    mu1.Lock()
    defer mu1.Unlock()
    mu2.Lock()
    defer mu2.Unlock()
    fmt.Println("Goroutine 2 completed")
}

4.2 忘记解锁

错误表现:其他 Goroutine 无法获取锁,导致死锁。

产生原因:在获取锁后,没有对应的解锁操作,或者在解锁前发生了 panic。

解决方案:始终使用 defer 语句来确保解锁,即使发生 panic 也能正确解锁。

go
// 错误示例:忘记解锁
func process() {
    mu.Lock()
    // 处理逻辑
    // 忘记调用 mu.Unlock()
}

// 正确示例:使用 defer 解锁
func process() {
    mu.Lock()
    defer mu.Unlock()
    // 处理逻辑
}

4.3 读写锁使用不当

错误表现:读操作和写操作之间的竞争,或者性能问题。

产生原因

  • 在读锁定期间修改共享资源。
  • 写操作过于频繁,导致读操作饥饿。
  • 不必要地使用读写锁,增加复杂性。

解决方案

  • 确保读锁定期间只读取共享资源,不修改。
  • 合理设计读写比例,避免写操作过于频繁。
  • 根据实际场景选择合适的锁类型。
go
// 错误示例:读锁定期间修改共享资源
func readAndModify() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    data = "modified" // 错误:读锁定期间修改资源
}

// 正确示例:修改资源时使用写锁定
func readAndModify() {
    rwmu.Lock()
    defer rwmu.Unlock()
    data = "modified" // 正确:写锁定期间修改资源
}

4.4 WaitGroup 使用不当

错误表现Wait() 永远不会返回,或者 panic。

产生原因

  • Add() 的调用次数与 Done() 不匹配。
  • Wait() 之后调用 Add()
  • 复制 WaitGroup 导致状态不同步。

解决方案

  • 确保 Add() 的调用次数与 Done() 完全匹配。
  • Wait() 之前完成所有 Add() 调用。
  • 通过指针传递 WaitGroup,避免复制。
go
// 错误示例:Add 和 Done 不匹配
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()
    // 忘记启动第二个 Goroutine
    wg.Wait() // 永远不会返回
}

// 正确示例:Add 和 Done 匹配
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 1")
    }()
    go func() {
        defer wg.Done()
        fmt.Println("Goroutine 2")
    }()
    wg.Wait() // 正常返回
}

4.5 Once 使用不当

错误表现:初始化操作没有执行,或者执行多次。

产生原因

  • 传递给 Do() 的函数发生 panic,导致 Once 认为操作已执行。
  • 复制 Once 导致状态不同步。
  • Do() 中调用同一个 Once 的 Do() 方法,导致死锁。

解决方案

  • Do() 函数中处理 panic,确保初始化操作能够正确完成。
  • 通过指针传递 Once,避免复制。
  • 避免在 Do() 中调用同一个 Once 的 Do() 方法。
go
// 错误示例:Do 函数发生 panic
func main() {
    var once sync.Once
    once.Do(func() {
        panic("Initialization failed")
    })
    // 这里不会再次执行初始化
    once.Do(func() {
        fmt.Println("This won't be executed")
    })
}

// 正确示例:处理 panic
func main() {
    var once sync.Once
    once.Do(func() {
        defer func() {
            if r := recover(); r != nil {
                fmt.Printf("Recovered from panic: %v\n", r)
            }
        }()
        panic("Initialization failed")
    })
}

4.6 Cond 使用不当

错误表现:等待的 Goroutine 永远不会被唤醒,或者出现虚假唤醒。

产生原因

  • 没有在循环中检查条件,导致虚假唤醒。
  • 没有正确获取和释放锁。
  • 信号丢失,即先发送信号后等待。

解决方案

  • 始终在循环中检查条件,避免虚假唤醒。
  • 确保在调用 Wait() 前获取锁,在调用 Signal()Broadcast() 前持有锁。
  • 确保先等待后发送信号。
go
// 错误示例:没有在循环中检查条件
func waitForCondition() {
    cond.L.Lock()
    if !condition {
        cond.Wait() // 错误:可能出现虚假唤醒
    }
    // 访问共享资源
    cond.L.Unlock()
}

// 正确示例:在循环中检查条件
func waitForCondition() {
    cond.L.Lock()
    for !condition {
        cond.Wait() // 正确:处理虚假唤醒
    }
    // 访问共享资源
    cond.L.Unlock()
}

5. 常见应用场景

5.1 保护共享资源

场景描述:多个 Goroutine 需要访问和修改同一个共享资源,如计数器、配置等。

使用方法:使用 Mutex 或 RWMutex 保护共享资源。

示例代码

go
package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    counter := 0
    var wg sync.WaitGroup
    
    // 1000 个 Goroutine 同时递增计数器
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            defer mu.Unlock()
            counter++
        }()
    }
    
    wg.Wait()
    fmt.Printf("Final counter value: %d\n", counter) // 应该输出 1000
}

5.2 读写分离

场景描述:读操作远多于写操作的场景,如配置读取、缓存访问等。

使用方法:使用 RWMutex 允许多个读操作同时进行,提高并发性能。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var rwmu sync.RWMutex
    config := "default"
    var wg sync.WaitGroup
    
    // 启动 10 个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            rwmu.RLock()
            defer rwmu.RUnlock()
            fmt.Printf("Reader %d: config = %s\n", id, config)
            time.Sleep(time.Millisecond * 50) // 模拟读操作
        }(i)
    }
    
    // 启动 2 个写 Goroutine
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Millisecond * 10) // 确保读操作先开始
            rwmu.Lock()
            defer rwmu.Unlock()
            newConfig := fmt.Sprintf("config-%d", id)
            config = newConfig
            fmt.Printf("Writer %d: updated config to %s\n", id, newConfig)
            time.Sleep(time.Millisecond * 100) // 模拟写操作
        }(i)
    }
    
    wg.Wait()
    fmt.Printf("Final config: %s\n", config)
}

5.3 等待多个任务完成

场景描述:需要等待多个 Goroutine 完成任务后再继续执行,如并发下载、并行处理等。

使用方法:使用 WaitGroup 等待所有 Goroutine 完成。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func downloadFile(url string, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Downloading %s\n", url)
    time.Sleep(time.Second) // 模拟下载时间
    fmt.Printf("Downloaded %s\n", url)
}

func main() {
    var wg sync.WaitGroup
    urls := []string{
        "https://example.com/file1.txt",
        "https://example.com/file2.txt",
        "https://example.com/file3.txt",
    }
    
    for _, url := range urls {
        wg.Add(1)
        go downloadFile(url, &wg)
    }
    
    fmt.Println("Waiting for all downloads to complete...")
    wg.Wait()
    fmt.Println("All downloads completed")
}

5.4 单例模式

场景描述:需要创建一个全局唯一的实例,确保只初始化一次。

使用方法:使用 Once 保证初始化操作只执行一次。

示例代码

go
package main

import (
    "fmt"
    "sync"
)

type Singleton struct {
    data string
}

var (
    instance *Singleton
    once     sync.Once
)

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "initialized"}
        fmt.Println("Singleton initialized")
    })
    return instance
}

func main() {
    var wg sync.WaitGroup
    
    // 多个 Goroutine 同时获取单例实例
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            s := GetInstance()
            fmt.Printf("Goroutine %d: %s\n", id, s.data)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

5.5 条件等待

场景描述:需要等待某个条件满足后再执行操作,如生产者-消费者模式、资源就绪等。

使用方法:使用 Cond 等待条件满足。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    ready := false
    var wg sync.WaitGroup
    
    // 消费者 Goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        cond.L.Lock()
        for !ready {
            fmt.Println("Consumer: waiting for ready")
            cond.Wait()
        }
        fmt.Println("Consumer: ready, processing")
        cond.L.Unlock()
    }()
    
    // 生产者 Goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        time.Sleep(time.Second) // 模拟准备时间
        cond.L.Lock()
        ready = true
        fmt.Println("Producer: ready, notifying")
        cond.Signal() // 通知消费者
        cond.L.Unlock()
    }()
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

6. 企业级进阶应用场景

6.1 并发安全的配置管理

场景描述:在大型应用中,需要管理全局配置,支持并发读写和热更新。

使用方法:使用 RWMutex 保护配置,允许多个读操作同时进行,写操作独占。

示例代码

go
package config

import (
    "sync"
)

type Config struct {
    Server struct {
        Host string
        Port int
    }
    Database struct {
        DSN string
    }
    // 其他配置项
}

var (
    config Config
    rwmu   sync.RWMutex
)

func GetConfig() Config {
    rwmu.RLock()
    defer rwmu.RUnlock()
    return config
}

func UpdateConfig(newConfig Config) {
    rwmu.Lock()
    defer rwmu.Unlock()
    config = newConfig
}

// 热更新配置
func ReloadConfig() error {
    // 从文件或环境变量加载配置
    newConfig, err := loadConfig()
    if err != nil {
        return err
    }
    UpdateConfig(newConfig)
    return nil
}

6.2 并发安全的缓存

场景描述:在高并发系统中,需要一个线程安全的缓存,支持并发读写。

使用方法:使用 sync.Map 或 RWMutex 保护的 map 实现缓存。

示例代码

go
package cache

import (
    "sync"
    "time"
)

type Item struct {
    Value      interface{}
    Expiration int64
}

type Cache struct {
    items map[string]Item
    mu    sync.RWMutex
}

func NewCache() *Cache {
    c := &Cache{
        items: make(map[string]Item),
    }
    // 启动清理过期项的 Goroutine
    go c.cleanup()
    return c
}

func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()
    var exp int64
    if expiration > 0 {
        exp = time.Now().Add(expiration).UnixNano()
    }
    c.items[key] = Item{
        Value:      value,
        Expiration: exp,
    }
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    item, found := c.items[key]
    if !found {
        c.mu.RUnlock()
        return nil, false
    }
    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        c.mu.RUnlock()
        // 异步删除过期项
        go c.Delete(key)
        return nil, false
    }
    c.mu.RUnlock()
    return item.Value, true
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}

func (c *Cache) cleanup() {
    for {
        time.Sleep(time.Minute)
        c.mu.Lock()
        now := time.Now().UnixNano()
        for k, v := range c.items {
            if v.Expiration > 0 && now > v.Expiration {
                delete(c.items, k)
            }
        }
        c.mu.Unlock()
    }
}

6.3 工作池

场景描述:在高并发系统中,需要限制并发数量,避免系统资源耗尽。

使用方法:使用 WaitGroup 和通道实现工作池,控制并发数量。

示例代码

go
package worker

import (
    "sync"
)

type WorkerPool struct {
    tasks chan Task
    wg    sync.WaitGroup
    size  int
}

type Task func()

func NewWorkerPool(size int) *WorkerPool {
    return &WorkerPool{
        tasks: make(chan Task),
        size:  size,
    }
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.size; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for task := range p.tasks {
                task()
            }
        }()
    }
}

func (p *WorkerPool) Submit(task Task) {
    p.tasks <- task
}

func (p *WorkerPool) Close() {
    close(p.tasks)
    p.wg.Wait()
}

func main() {
    pool := NewWorkerPool(5) // 5 个工作协程
    pool.Start()
    defer pool.Close()
    
    // 提交 20 个任务
    for i := 0; i < 20; i++ {
        taskID := i
        pool.Submit(func() {
            println("Processing task", taskID)
        })
    }
}

6.4 并发安全的计数器

场景描述:在分布式系统中,需要一个高并发的计数器,支持原子操作。

使用方法:使用 sync/atomic 包提供的原子操作,或者使用 Mutex 保护计数器。

示例代码

go
package counter

import (
    "sync/atomic"
)

type AtomicCounter struct {
    value int64
}

func NewAtomicCounter() *AtomicCounter {
    return &AtomicCounter{value: 0}
}

func (c *AtomicCounter) Increment() int64 {
    return atomic.AddInt64(&c.value, 1)
}

func (c *AtomicCounter) Decrement() int64 {
    return atomic.AddInt64(&c.value, -1)
}

func (c *AtomicCounter) Get() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *AtomicCounter) Set(value int64) {
    atomic.StoreInt64(&c.value, value)
}

6.5 并发安全的队列

场景描述:在生产者-消费者模式中,需要一个线程安全的队列,支持并发入队和出队。

使用方法:使用 Mutex 和 Cond 实现线程安全的队列。

示例代码

go
package queue

import (
    "sync"
)

type Queue struct {
    items []interface{}
    mu    sync.Mutex
    cond  *sync.Cond
    size  int
}

func NewQueue(size int) *Queue {
    q := &Queue{
        items: make([]interface{}, 0, size),
        size:  size,
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item interface{}) {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) >= q.size {
        q.cond.Wait() // 队列满,等待
    }
    
    q.items = append(q.items, item)
    q.cond.Broadcast() // 通知等待的消费者
}

func (q *Queue) Dequeue() interface{} {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        q.cond.Wait() // 队列空,等待
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    q.cond.Broadcast() // 通知等待的生产者
    
    return item
}

func (q *Queue) Size() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    return len(q.items)
}

7. 行业最佳实践

7.1 选择合适的同步原语

实践内容:根据具体场景选择合适的同步原语。

推荐理由:不同的同步原语有不同的适用场景,选择合适的同步原语可以提高性能和代码可读性。

示例

  • 读多写少的场景使用 RWMutex。
  • 需要等待多个 Goroutine 完成时使用 WaitGroup。
  • 需要保证操作只执行一次时使用 Once。
  • 需要等待条件满足时使用 Cond。

7.2 始终使用 defer 解锁

实践内容:使用 defer 语句确保锁的释放。

推荐理由defer 语句可以确保即使发生 panic,锁也能正确释放,避免死锁。

示例

go
func process() {
    mu.Lock()
    defer mu.Unlock()
    // 处理逻辑
}

7.3 避免长时间持有锁

实践内容:尽量减少持有锁的时间,只在必要时加锁。

推荐理由:长时间持有锁会降低并发性能,增加死锁的风险。

示例

go
// 错误示例:长时间持有锁
func process() {
    mu.Lock()
    defer mu.Unlock()
    // 执行耗时操作
    time.Sleep(time.Second)
    // 修改共享资源
    sharedResource = newValue
}

// 正确示例:只在必要时加锁
func process() {
    // 执行耗时操作
    time.Sleep(time.Second)
    // 只在修改共享资源时加锁
    mu.Lock()
    sharedResource = newValue
    mu.Unlock()
}

7.4 使用原子操作代替锁

实践内容:对于简单的计数器、标志位等,使用原子操作代替锁。

推荐理由:原子操作比锁更轻量,性能更高。

示例

go
// 使用原子操作
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

// 而不是使用锁
var (
    mu      sync.Mutex
    counter int
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

7.5 保持锁的粒度最小化

实践内容:只对需要保护的共享资源加锁,避免对整个函数加锁。

推荐理由:最小化锁的粒度可以提高并发性能,减少锁竞争。

示例

go
// 错误示例:对整个函数加锁
func process() {
    mu.Lock()
    defer mu.Unlock()
    // 执行不需要锁的操作
    fmt.Println("Processing...")
    // 修改共享资源
    sharedResource = newValue
}

// 正确示例:只对共享资源加锁
func process() {
    // 执行不需要锁的操作
    fmt.Println("Processing...")
    // 只对共享资源加锁
    mu.Lock()
    sharedResource = newValue
    mu.Unlock()
}

7.6 避免嵌套锁

实践内容:避免在持有一个锁的同时获取另一个锁,减少死锁的风险。

推荐理由:嵌套锁容易导致死锁,尤其是当多个 Goroutine 以不同的顺序获取锁时。

示例

go
// 错误示例:嵌套锁
func process() {
    mu1.Lock()
    defer mu1.Unlock()
    // ...
    mu2.Lock()
    defer mu2.Unlock()
    // ...
}

// 正确示例:避免嵌套锁,或者保持一致的锁顺序
func process() {
    // 先获取 mu1,再获取 mu2,保持一致的顺序
    mu1.Lock()
    defer mu1.Unlock()
    // ...
    mu2.Lock()
    defer mu2.Unlock()
    // ...
}

7.7 使用 sync.Map 处理并发映射

实践内容:对于需要并发访问的映射,使用 sync.Map 代替 map+Mutex。

推荐理由:sync.Map 是为并发场景优化的,性能比 map+Mutex 更好。

示例

go
// 使用 sync.Map
var m sync.Map

func store(key, value interface{}) {
    m.Store(key, value)
}

func load(key interface{}) (interface{}, bool) {
    return m.Load(key)
}

// 而不是使用 map+Mutex
var (
    mu sync.Mutex
    m  = make(map[string]interface{})
)

func store(key string, value interface{}) {
    mu.Lock()
    defer mu.Unlock()
    m[key] = value
}

func load(key string) (interface{}, bool) {
    mu.RLock()
    defer mu.RUnlock()
    v, ok := m[key]
    return v, ok
}

7.8 定期检查死锁和竞态条件

实践内容:使用 Go 的竞态检测工具(go run -racego test -race)定期检查代码中的竞态条件。

推荐理由:竞态检测工具可以帮助发现代码中的潜在问题,提高代码的可靠性。

示例

bash
# 运行带竞态检测的程序
go run -race main.go

# 运行带竞态检测的测试
go test -race ./...

8. 常见问题答疑(FAQ)

8.1 Mutex 和 RWMutex 有什么区别?

问题描述:Mutex 和 RWMutex 都是同步原语,它们有什么区别?

回答内容

  • Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
  • RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
  • 使用场景
    • 当读写比例相近时,使用 Mutex。
    • 当读操作远多于写操作时,使用 RWMutex 可以提高并发性能。

示例代码

go
// 使用 Mutex
var mu sync.Mutex

// 使用 RWMutex
var rwmu sync.RWMutex

8.2 如何选择合适的同步原语?

问题描述:在不同的场景中,如何选择合适的同步原语?

回答内容

  • 保护共享资源:使用 Mutex 或 RWMutex。
  • 等待多个 Goroutine 完成:使用 WaitGroup。
  • 保证操作只执行一次:使用 Once。
  • 等待条件满足:使用 Cond。
  • 存储和复用临时对象:使用 Pool。
  • 并发安全的映射:使用 Map。
  • 简单的计数器、标志位:使用 atomic 包。

示例代码

go
// 保护共享资源
var mu sync.Mutex

// 等待多个 Goroutine 完成
var wg sync.WaitGroup

// 保证操作只执行一次
var once sync.Once

// 等待条件满足
var cond = sync.NewCond(&sync.Mutex{})

// 存储和复用临时对象
var pool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

// 并发安全的映射
var m sync.Map

// 原子操作
var counter int64

8.3 什么是死锁?如何避免死锁?

问题描述:什么是死锁?如何避免死锁?

回答内容

  • 死锁:多个 Goroutine 互相等待对方释放锁,导致程序无法继续执行。
  • 避免死锁的方法
    • 始终使用 defer 语句确保锁的释放。
    • 保持一致的锁获取顺序。
    • 避免在持有锁时调用可能阻塞的函数。
    • 避免嵌套锁,或者确保以相同的顺序获取锁。
    • 使用超时机制,避免无限等待。

示例代码

go
// 避免死锁:保持一致的锁顺序
func process() {
    // 始终先获取 mu1,再获取 mu2
    mu1.Lock()
    defer mu1.Unlock()
    mu2.Lock()
    defer mu2.Unlock()
    // 处理逻辑
}

8.4 什么是竞态条件?如何避免竞态条件?

问题描述:什么是竞态条件?如何避免竞态条件?

回答内容

  • 竞态条件:多个 Goroutine 并发访问和修改共享资源,导致结果不确定。
  • 避免竞态条件的方法
    • 使用同步原语(Mutex、RWMutex 等)保护共享资源。
    • 使用原子操作(atomic 包)处理简单的计数器、标志位等。
    • 使用通道(channel)进行 Goroutine 间的通信,避免共享状态。
    • 使用 sync.Map 等并发安全的数据结构。
    • 使用 Go 的竞态检测工具(go run -race)检测潜在的竞态条件。

示例代码

go
// 使用 Mutex 避免竞态条件
var mu sync.Mutex
var counter int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

// 使用原子操作避免竞态条件
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

8.5 Once 和 init() 函数有什么区别?

问题描述:Once 和 init() 函数都可以用于初始化,它们有什么区别?

回答内容

  • init() 函数:在包被导入时自动执行,不能控制执行时机,会增加程序启动时间。
  • Once:在首次调用 Do 方法时执行,可以控制执行时机,支持延迟初始化。
  • 使用场景
    • 当需要在包导入时就初始化时,使用 init() 函数。
    • 当需要延迟初始化或控制初始化时机时,使用 Once。

示例代码

go
// 使用 init() 函数
func init() {
    fmt.Println("Package initialized")
}

// 使用 Once
var once sync.Once

func Init() {
    once.Do(func() {
        fmt.Println("Initialized on first call")
    })
}

8.6 如何使用 Cond 实现生产者-消费者模式?

问题描述:如何使用 Cond 实现生产者-消费者模式?

回答内容

  • 使用 Cond 等待队列满或空的条件。
  • 生产者在队列满时等待,在生产后通知消费者。
  • 消费者在队列空时等待,在消费后通知生产者。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    queue := make([]int, 0, 5)
    var wg sync.WaitGroup
    
    // 生产者
    wg.Add(2)
    for i := 0; i < 2; i++ {
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                mu.Lock()
                for len(queue) >= 5 {
                    fmt.Printf("Producer %d: queue full, waiting\n", id)
                    cond.Wait()
                }
                item := id*10 + j
                queue = append(queue, item)
                fmt.Printf("Producer %d: produced %d\n", id, item)
                cond.Broadcast()
                mu.Unlock()
                time.Sleep(time.Millisecond * 100)
            }
        }(i)
    }
    
    // 消费者
    wg.Add(3)
    for i := 0; i < 3; i++ {
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 3; j++ {
                mu.Lock()
                for len(queue) == 0 {
                    fmt.Printf("Consumer %d: queue empty, waiting\n", id)
                    cond.Wait()
                }
                item := queue[0]
                queue = queue[1:]
                fmt.Printf("Consumer %d: consumed %d\n", id, item)
                cond.Broadcast()
                mu.Unlock()
                time.Sleep(time.Millisecond * 150)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

9. 实战练习

9.1 基础练习:并发安全的计数器

题目:使用 Mutex 和 atomic 包分别实现一个并发安全的计数器,比较两者的性能。

解题思路

  • 使用 Mutex 保护计数器变量。
  • 使用 atomic 包的原子操作实现计数器。
  • 启动多个 Goroutine 同时递增计数器,比较两者的性能。

常见误区

  • 忘记使用 defer 解锁,导致死锁。
  • 对原子操作的理解不正确,导致竞态条件。

分步提示

  1. 实现使用 Mutex 的计数器。
  2. 实现使用 atomic 包的计数器。
  3. 启动 1000 个 Goroutine 同时递增计数器。
  4. 比较两种实现的性能。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    // 使用 Mutex
    var mu sync.Mutex
    var mutexCounter int
    var wg sync.WaitGroup
    
    start := time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                mu.Lock()
                mutexCounter++
                mu.Unlock()
            }
        }()
    }
    wg.Wait()
    mutexTime := time.Since(start)
    fmt.Printf("Mutex counter: %d, time: %v\n", mutexCounter, mutexTime)
    
    // 使用 atomic
    var atomicCounter int64
    start = time.Now()
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 1000; j++ {
                atomic.AddInt64(&atomicCounter, 1)
            }
        }()
    }
    wg.Wait()
    atomicTime := time.Since(start)
    fmt.Printf("Atomic counter: %d, time: %v\n", atomicCounter, atomicTime)
    
    fmt.Printf("Atomic is %.2f times faster than Mutex\n", float64(mutexTime)/float64(atomicTime))
}

9.2 进阶练习:并发安全的缓存

题目:使用 RWMutex 实现一个并发安全的缓存,支持设置、获取和删除操作。

解题思路

  • 使用 RWMutex 保护缓存映射。
  • 实现 Set、Get 和 Delete 方法。
  • 测试多个 Goroutine 同时访问缓存的性能。

常见误区

  • 在读操作时使用写锁定,导致性能下降。
  • 没有正确处理并发访问,导致竞态条件。

分步提示

  1. 定义缓存结构体,包含映射和 RWMutex。
  2. 实现 Set 方法,使用写锁定。
  3. 实现 Get 方法,使用读锁定。
  4. 实现 Delete 方法,使用写锁定。
  5. 测试多个 Goroutine 同时访问缓存。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    items map[string]interface{}
    mu    sync.RWMutex
}

func NewCache() *Cache {
    return &Cache{
        items: make(map[string]interface{}),
    }
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.items[key] = value
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    value, ok := c.items[key]
    return value, ok
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup
    
    // 启动 10 个写 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id, j)
                value := fmt.Sprintf("value-%d-%d", id, j)
                cache.Set(key, value)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    // 启动 100 个读 Goroutine
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                key := fmt.Sprintf("key-%d-%d", id%10, j)
                _, _ = cache.Get(key)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
    fmt.Printf("Cache size: %d\n", len(cache.items))
}

9.3 挑战练习:生产者-消费者模式

题目:使用 Cond 实现一个生产者-消费者模式,支持多个生产者和多个消费者。

解题思路

  • 使用 Cond 等待队列满或空的条件。
  • 实现生产者,在队列满时等待,在生产后通知消费者。
  • 实现消费者,在队列空时等待,在消费后通知生产者。
  • 测试多个生产者和多个消费者的并发场景。

常见误区

  • 没有在循环中检查条件,导致虚假唤醒。
  • 没有正确获取和释放锁,导致死锁。
  • 信号丢失,即先发送信号后等待。

分步提示

  1. 定义队列结构体,包含切片、Mutex 和 Cond。
  2. 实现 Enqueue 方法,在队列满时等待,在生产后通知消费者。
  3. 实现 Dequeue 方法,在队列空时等待,在消费后通知生产者。
  4. 启动多个生产者和多个消费者。
  5. 测试并发场景下的正确性。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Queue struct {
    items []int
    mu    sync.Mutex
    cond  *sync.Cond
    size  int
}

func NewQueue(size int) *Queue {
    q := &Queue{
        items: make([]int, 0, size),
        size:  size,
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) >= q.size {
        fmt.Println("Queue full, waiting to enqueue")
        q.cond.Wait()
    }
    
    q.items = append(q.items, item)
    fmt.Printf("Enqueued: %d, queue size: %d\n", item, len(q.items))
    q.cond.Broadcast()
}

func (q *Queue) Dequeue() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        fmt.Println("Queue empty, waiting to dequeue")
        q.cond.Wait()
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    fmt.Printf("Dequeued: %d, queue size: %d\n", item, len(q.items))
    q.cond.Broadcast()
    
    return item
}

func main() {
    queue := NewQueue(5)
    var wg sync.WaitGroup
    
    // 启动 3 个生产者
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 5; j++ {
                item := id*10 + j
                queue.Enqueue(item)
                time.Sleep(time.Millisecond * 100)
            }
        }(i)
    }
    
    // 启动 2 个消费者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 7; j++ {
                item := queue.Dequeue()
                fmt.Printf("Consumer %d: processed %d\n", id, item)
                time.Sleep(time.Millisecond * 150)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

10. 知识点总结

10.1 核心要点

  • sync 包:Go 语言中提供同步原语的核心包,包含 Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map 等。
  • Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
  • RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
  • WaitGroup:等待一组 Goroutine 完成,用于协调多个 Goroutine 的执行。
  • Once:保证某个操作只执行一次,用于初始化等场景。
  • Cond:条件变量,用于等待或宣布某个条件的发生。
  • Pool:对象池,用于存储和复用临时对象,减少内存分配。
  • Map:并发安全的映射,提供了线程安全的键值对存储。
  • 原子操作:sync/atomic 包提供的原子操作,用于简单的计数器、标志位等。

10.2 易错点回顾

  • 死锁:多个 Goroutine 互相等待对方释放锁,导致程序无法继续执行。
  • 忘记解锁:在获取锁后,没有对应的解锁操作,导致其他 Goroutine 无法获取锁。
  • 读写锁使用不当:在读锁定期间修改共享资源,或者写操作过于频繁导致读操作饥饿。
  • WaitGroup 使用不当:Add() 的调用次数与 Done() 不匹配,或者在 Wait() 之后调用 Add()。
  • Once 使用不当:传递给 Do() 的函数发生 panic,导致 Once 认为操作已执行。
  • Cond 使用不当:没有在循环中检查条件,导致虚假唤醒,或者没有正确获取和释放锁。
  • 锁粒度太大:长时间持有锁,降低并发性能,增加死锁的风险。
  • 嵌套锁:在持有一个锁的同时获取另一个锁,增加死锁的风险。

11. 拓展参考资料

11.1 官方文档链接

11.2 进阶学习路径建议

  • 并发模式:学习常见的并发模式,如生产者-消费者、工作池、扇入扇出等。
  • 通道:深入学习通道的使用和通道模式,理解 CSP 并发模型。
  • Context:学习 Context 的使用,实现更复杂的并发控制,如超时控制、取消操作等。
  • 性能优化:学习并发性能优化技巧,如减少锁竞争、使用原子操作、优化缓存等。
  • 分布式系统:学习分布式系统中的并发控制,如分布式锁、一致性协议等。

11.3 相关资源