Skip to content

条件变量 Cond

1. 概述

条件变量(Cond)是 Go 语言中一种特殊的同步原语,它允许 Goroutine 在特定条件满足时被唤醒。条件变量通常与互斥锁(Mutex)配合使用,用于解决生产者-消费者问题、等待某个条件成立等场景。

在整个 Go 语言课程体系中,条件变量是并发编程的重要组件之一,与 Mutex、RWMutex、通道等一起构成了 Go 语言并发模型的核心。掌握条件变量的使用和原理,对于构建复杂的并发系统至关重要。

2. 基本概念

2.1 语法

2.1.1 基本用法

go
import "sync"

// 创建条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)

// 等待条件满足
mu.Lock()
for !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

// 唤醒一个等待的 Goroutine
cond.Signal()

// 唤醒所有等待的 Goroutine
cond.Broadcast()

2.1.2 示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var ready bool
    var wg sync.WaitGroup
    
    // 启动一个 Goroutine 等待条件满足
    wg.Add(1)
    go func() {
        defer wg.Done()
        mu.Lock()
        for !ready {
            fmt.Println("Waiting for ready...")
            cond.Wait()
        }
        fmt.Println("Condition met, proceeding...")
        mu.Unlock()
    }()
    
    // 等待一段时间后设置条件为 true 并唤醒等待的 Goroutine
    time.Sleep(time.Second)
    mu.Lock()
    ready = true
    fmt.Println("Setting ready to true and signaling...")
    cond.Signal()
    mu.Unlock()
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

2.2 语义

  • Wait():等待条件满足。调用时会自动释放锁,当被唤醒时会重新获取锁。
  • Signal():唤醒一个等待的 Goroutine。
  • Broadcast():唤醒所有等待的 Goroutine。
  • 与 Mutex 配合:条件变量必须与互斥锁配合使用,确保对条件的检查和修改是原子的。
  • 零值不可用:条件变量的零值不可用,必须使用 sync.NewCond 创建。
  • 不可复制:条件变量是结构体,不是引用类型,不要复制使用中的条件变量。

2.3 规范

  • 命名规范:条件变量通常命名为 cond
  • 使用顺序
    1. 获取互斥锁。
    2. 检查条件是否满足,如果不满足,调用 Wait()
    3. 处理逻辑。
    4. 释放互斥锁。
    5. 在适当的时候调用 Signal()Broadcast() 唤醒等待的 Goroutine。
  • 循环检查:使用 for 循环检查条件,而不是 if 语句,避免虚假唤醒。
  • 锁的管理:确保在调用 Wait() 前获取锁,在调用 Signal()Broadcast() 时也持有锁。
  • 不可复制:通过指针传递条件变量,避免复制它。

3. 原理深度解析

3.1 Cond 结构体

Cond 的底层实现是一个结构体,在 Go 语言中,其结构如下:

go
type Cond struct {
    noCopy noCopy
    L      Locker
    notify notifyList
    checker copyChecker
}

其中:

  • L:一个 Locker 接口类型,通常是 sync.Mutexsync.RWMutex
  • notifyList:一个等待队列,存储等待的 Goroutine。
  • checker:用于检测条件变量是否被复制。

3.2 Wait 方法实现

Wait 方法的主要步骤:

  1. 检查条件变量是否被复制。
  2. 将当前 Goroutine 添加到等待队列。
  3. 释放锁。
  4. 阻塞当前 Goroutine,等待被唤醒。
  5. 当被唤醒时,重新获取锁。
  6. 返回。

3.3 Signal 方法实现

Signal 方法的主要步骤:

  1. 检查条件变量是否被复制。
  2. 从等待队列中取出一个 Goroutine。
  3. 唤醒该 Goroutine。

3.4 Broadcast 方法实现

Broadcast 方法的主要步骤:

  1. 检查条件变量是否被复制。
  2. 从等待队列中取出所有 Goroutine。
  3. 唤醒所有 Goroutine。

3.5 内存模型

条件变量遵循 Go 语言的内存模型,确保以下顺序:

  • 在调用 Signal()Broadcast() 之前的所有操作,发生在被唤醒的 Goroutine 从 Wait() 返回之后。
  • 在调用 Wait() 之前的所有操作,发生在其他 Goroutine 调用 Signal()Broadcast() 之前。

3.6 虚假唤醒

虚假唤醒是指 Goroutine 从 Wait() 返回,但条件并未满足的情况。这是由于操作系统或硬件的原因导致的,因此必须使用 for 循环来检查条件,而不是 if 语句。

go
// 错误示例:使用 if 语句检查条件
mu.Lock()
if !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

// 正确示例:使用 for 循环检查条件
mu.Lock()
for !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

4. 常见错误与踩坑点

4.1 忘记获取锁

错误表现:运行时错误或竞态条件。

产生原因:在调用 Wait()Signal()Broadcast() 时没有获取锁。

解决方案:确保在调用这些方法时持有锁。

go
// 错误示例:忘记获取锁
cond.Wait() // 错误:没有获取锁

// 正确示例:获取锁后调用
mu.Lock()
for !condition {
    cond.Wait()
}
mu.Unlock()

4.2 使用 if 语句检查条件

错误表现:虚假唤醒导致条件未满足时继续执行。

产生原因:使用 if 语句检查条件,而不是 for 循环,无法处理虚假唤醒。

解决方案:使用 for 循环检查条件,确保在虚假唤醒时重新检查。

go
// 错误示例:使用 if 语句
mu.Lock()
if !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

// 正确示例:使用 for 循环
mu.Lock()
for !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

4.3 复制条件变量

错误表现:运行时错误或不可预期的行为。

产生原因:条件变量是结构体,不是引用类型,复制后会创建一个新的实例,与原实例状态无关。

解决方案:通过指针传递条件变量,避免复制它。

go
// 错误示例:复制条件变量
func worker(cond sync.Cond) { // 复制条件变量
    // 处理逻辑
}

// 正确示例:通过指针传递
func worker(cond *sync.Cond) { // 通过指针传递
    // 处理逻辑
}

4.4 死锁

错误表现:程序卡住,无法继续执行。

产生原因

  • 多个 Goroutine 循环等待对方释放锁。
  • 条件变量的使用不当,导致 Goroutine 永远等待。

解决方案

  • 保持一致的锁获取顺序。
  • 确保条件最终会被满足,避免无限等待。
  • 使用超时机制,避免无限等待。
go
// 错误示例:可能导致死锁
func producer() {
    mu.Lock()
    // 生产数据
    data = "produced"
    cond.Signal()
    mu.Unlock()
}

func consumer() {
    mu.Lock()
    for data == "" {
        cond.Wait()
    }
    // 消费数据
    data = ""
    mu.Unlock()
}

// 正确示例:确保条件最终会被满足
func producer() {
    mu.Lock()
    // 生产数据
    data = "produced"
    cond.Signal()
    mu.Unlock()
}

func consumer() {
    mu.Lock()
    for data == "" {
        cond.Wait()
    }
    // 消费数据
    data = ""
    mu.Unlock()
}

4.5 唤醒丢失

错误表现:Goroutine 永远等待,无法被唤醒。

产生原因:在条件满足后、唤醒之前,没有持有锁,导致唤醒信号丢失。

解决方案:确保在修改条件和唤醒 Goroutine 时持有锁。

go
// 错误示例:唤醒丢失
mu.Lock()
ready = true
mu.Unlock()
cond.Signal() // 错误:唤醒时没有持有锁

// 正确示例:唤醒时持有锁
mu.Lock()
ready = true
cond.Signal() // 正确:唤醒时持有锁
mu.Unlock()

4.6 过度使用条件变量

错误表现:代码复杂度增加,可读性下降。

产生原因:对于简单的并发场景,过度使用条件变量而不是使用通道。

解决方案:对于简单的并发场景,考虑使用通道,而不是条件变量。

go
// 错误示例:过度使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string

func producer() {
    mu.Lock()
    data = "produced"
    cond.Signal()
    mu.Unlock()
}

func consumer() {
    mu.Lock()
    for data == "" {
        cond.Wait()
    }
    fmt.Println(data)
    data = ""
    mu.Unlock()
}

// 正确示例:使用通道
ch := make(chan string)

func producer() {
    ch <- "produced"
}

func consumer() {
    data := <-ch
    fmt.Println(data)
}

5. 常见应用场景

5.1 生产者-消费者模式

场景描述:一个或多个生产者生成数据,一个或多个消费者消费数据,需要同步生产和消费的节奏。

使用方法:使用条件变量来通知消费者数据已生产,消费者等待数据可用。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var data string
    var wg sync.WaitGroup
    
    // 消费者
    wg.Add(2)
    for i := 0; i < 2; i++ {
        go func(id int) {
            defer wg.Done()
            for {
                mu.Lock()
                for data == "" {
                    fmt.Printf("Consumer %d: waiting for data\n", id)
                    cond.Wait()
                }
                fmt.Printf("Consumer %d: got data: %s\n", id, data)
                data = ""
                mu.Unlock()
                time.Sleep(time.Millisecond * 500)
            }
        }(i)
    }
    
    // 生产者
    for i := 0; i < 5; i++ {
        time.Sleep(time.Second)
        mu.Lock()
        data = fmt.Sprintf("data-%d", i)
        fmt.Printf("Producer: produced %s\n", data)
        cond.Broadcast()
        mu.Unlock()
    }
    
    wg.Wait()
}

5.2 等待某个条件成立

场景描述:需要等待某个条件成立后再继续执行,如等待服务启动完成、等待资源可用等。

使用方法:使用条件变量来等待条件成立,当条件成立时唤醒等待的 Goroutine。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var ready bool
    var wg sync.WaitGroup
    
    // 等待服务启动
    wg.Add(1)
    go func() {
        defer wg.Done()
        mu.Lock()
        for !ready {
            fmt.Println("Waiting for service to start...")
            cond.Wait()
        }
        fmt.Println("Service started, proceeding...")
        mu.Unlock()
    }()
    
    // 模拟服务启动
    time.Sleep(time.Second * 2)
    mu.Lock()
    ready = true
    fmt.Println("Service started, notifying waiters...")
    cond.Signal()
    mu.Unlock()
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

5.3 线程池

场景描述:实现一个线程池,当有任务时唤醒工作线程,当没有任务时工作线程等待。

使用方法:使用条件变量来通知工作线程有任务可用,工作线程等待任务。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID int
}

type Pool struct {
    tasks      []Task
    mu         sync.Mutex
    cond       *sync.Cond
    workers    int
    wg         sync.WaitGroup
    shutdown   bool
}

func NewPool(workers int) *Pool {
    p := &Pool{
        tasks:    make([]Task, 0),
        workers:  workers,
        shutdown: false,
    }
    p.cond = sync.NewCond(&p.mu)
    return p
}

func (p *Pool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(i)
    }
}

func (p *Pool) worker(id int) {
    defer p.wg.Done()
    for {
        p.mu.Lock()
        for len(p.tasks) == 0 && !p.shutdown {
            fmt.Printf("Worker %d: waiting for tasks\n", id)
            p.cond.Wait()
        }
        if p.shutdown {
            fmt.Printf("Worker %d: shutting down\n", id)
            p.mu.Unlock()
            return
        }
        task := p.tasks[0]
        p.tasks = p.tasks[1:]
        p.mu.Unlock()
        fmt.Printf("Worker %d: processing task %d\n", id, task.ID)
        time.Sleep(time.Millisecond * 500)
    }
}

func (p *Pool) AddTask(task Task) {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.tasks = append(p.tasks, task)
    fmt.Printf("Added task %d\n", task.ID)
    p.cond.Signal()
}

func (p *Pool) Shutdown() {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.shutdown = true
    p.cond.Broadcast()
    p.wg.Wait()
    fmt.Println("Pool shutdown completed")
}

func main() {
    pool := NewPool(3)
    pool.Start()
    
    // 添加任务
    for i := 0; i < 10; i++ {
        pool.AddTask(Task{ID: i})
        time.Sleep(time.Millisecond * 100)
    }
    
    // 关闭线程池
    time.Sleep(time.Second * 2)
    pool.Shutdown()
}

5.4 读写锁的实现

场景描述:实现一个读写锁,允许多个读操作同时进行,写操作独占。

使用方法:使用条件变量来管理读锁和写锁的状态,当读锁释放时通知等待的写锁,当写锁释放时通知等待的读锁。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type RWMutex struct {
    mu        sync.Mutex
    cond      *sync.Cond
    readers   int
    writer    bool
    readWait  int
    writeWait int
}

func NewRWMutex() *RWMutex {
    rw := &RWMutex{}
    rw.cond = sync.NewCond(&rw.mu)
    return rw
}

func (rw *RWMutex) RLock() {
    rw.mu.Lock()
    defer rw.mu.Unlock()
    
    for rw.writer || rw.writeWait > 0 {
        rw.readWait++
        rw.cond.Wait()
        rw.readWait--
    }
    rw.readers++
}

func (rw *RWMutex) RUnlock() {
    rw.mu.Lock()
    defer rw.mu.Unlock()
    
    rw.readers--
    if rw.readers == 0 && rw.writeWait > 0 {
        rw.cond.Broadcast()
    }
}

func (rw *RWMutex) Lock() {
    rw.mu.Lock()
    defer rw.mu.Unlock()
    
    for rw.writer || rw.readers > 0 {
        rw.writeWait++
        rw.cond.Wait()
        rw.writeWait--
    }
    rw.writer = true
}

func (rw *RWMutex) Unlock() {
    rw.mu.Lock()
    defer rw.mu.Unlock()
    
    rw.writer = false
    rw.cond.Broadcast()
}

func main() {
    rw := NewRWMutex()
    var wg sync.WaitGroup
    
    // 启动 10 个读 Goroutine
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            rw.RLock()
            defer rw.RUnlock()
            fmt.Printf("Reader %d: reading\n", id)
            time.Sleep(time.Millisecond * 100)
        }(i)
    }
    
    // 启动 2 个写 Goroutine
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(time.Millisecond * 50)
            rw.Lock()
            defer rw.Unlock()
            fmt.Printf("Writer %d: writing\n", id)
            time.Sleep(time.Millisecond * 100)
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

5.5 条件变量的超时等待

场景描述:需要在等待条件成立时设置超时,避免无限等待。

使用方法:结合通道和 select 语句实现条件变量的超时等待。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var ready bool
    var wg sync.WaitGroup
    
    wg.Add(1)
    go func() {
        defer wg.Done()
        mu.Lock()
        defer mu.Unlock()
        
        // 实现超时等待
        timeout := time.After(time.Second * 2)
        for !ready {
            // 创建一个通道来接收信号
            ch := make(chan struct{})
            go func() {
                cond.Wait()
                close(ch)
            }()
            
            select {
            case <-ch:
                // 条件满足,继续检查
            case <-timeout:
                // 超时,退出
                fmt.Println("Timeout waiting for condition")
                return
            }
        }
        fmt.Println("Condition met, proceeding...")
    }()
    
    // 模拟条件永远不满足
    // time.Sleep(time.Second * 3)
    // mu.Lock()
    // ready = true
    // cond.Signal()
    // mu.Unlock()
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

6. 企业级进阶应用场景

6.1 分布式锁

场景描述:在分布式系统中,需要一个分布式锁来协调多个节点的操作。

使用方法:使用条件变量来实现分布式锁的本地部分,结合分布式协调服务(如 ZooKeeper、etcd)实现全局锁。

示例代码

go
package distributed

import (
    "sync"
    "time"
)

type DistributedLock struct {
    mu      sync.Mutex
    cond    *sync.Cond
    locked  bool
    owner   string
    lease   time.Duration
    renewCh chan struct{}
}

func NewDistributedLock(owner string, lease time.Duration) *DistributedLock {
    dl := &DistributedLock{
        owner:   owner,
        lease:   lease,
        renewCh: make(chan struct{}),
    }
    dl.cond = sync.NewCond(&dl.mu)
    return dl
}

func (dl *DistributedLock) Lock() error {
    dl.mu.Lock()
    defer dl.mu.Unlock()
    
    for dl.locked {
        dl.cond.Wait()
    }
    
    // 在这里向分布式协调服务申请锁
    // 例如,向 ZooKeeper 或 etcd 创建临时节点
    
    dl.locked = true
    
    // 启动租约续期 Goroutine
    go dl.renewLease()
    
    return nil
}

func (dl *DistributedLock) Unlock() error {
    dl.mu.Lock()
    defer dl.mu.Unlock()
    
    if !dl.locked {
        return nil
    }
    
    // 在这里向分布式协调服务释放锁
    // 例如,删除 ZooKeeper 或 etcd 中的临时节点
    
    dl.locked = false
    close(dl.renewCh)
    dl.cond.Signal()
    
    return nil
}

func (dl *DistributedLock) renewLease() {
    ticker := time.NewTicker(dl.lease / 3)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            // 续期租约
            // 例如,更新 ZooKeeper 或 etcd 中临时节点的过期时间
        case <-dl.renewCh:
            return
        }
    }
}

6.2 生产者-消费者模式的高级实现

场景描述:在企业级应用中,需要一个高性能的生产者-消费者模式,支持动态调整消费者数量、批量处理等功能。

使用方法:使用条件变量来管理生产者和消费者的同步,结合通道和线程池实现高级功能。

示例代码

go
package producerconsumer

import (
    "sync"
    "time"
)

type Task struct {
    ID      int
    Payload interface{}
}

type ProducerConsumer struct {
    tasks      []Task
    mu         sync.Mutex
    cond       *sync.Cond
    workers    int
    maxWorkers int
    wg         sync.WaitGroup
    shutdown   bool
    batchSize  int
}

func NewProducerConsumer(initialWorkers, maxWorkers, batchSize int) *ProducerConsumer {
    pc := &ProducerConsumer{
        tasks:      make([]Task, 0),
        workers:    initialWorkers,
        maxWorkers: maxWorkers,
        batchSize:  batchSize,
        shutdown:   false,
    }
    pc.cond = sync.NewCond(&pc.mu)
    return pc
}

func (pc *ProducerConsumer) Start() {
    for i := 0; i < pc.workers; i++ {
        pc.wg.Add(1)
        go pc.worker(i)
    }
}

func (pc *ProducerConsumer) worker(id int) {
    defer pc.wg.Done()
    for {
        pc.mu.Lock()
        for len(pc.tasks) == 0 && !pc.shutdown {
            pc.cond.Wait()
        }
        if pc.shutdown {
            pc.mu.Unlock()
            return
        }
        
        // 批量获取任务
        batchSize := pc.batchSize
        if len(pc.tasks) < batchSize {
            batchSize = len(pc.tasks)
        }
        batch := make([]Task, batchSize)
        copy(batch, pc.tasks[:batchSize])
        pc.tasks = pc.tasks[batchSize:]
        pc.mu.Unlock()
        
        // 处理任务
        for _, task := range batch {
            // 处理任务逻辑
            time.Sleep(time.Millisecond * 100)
        }
    }
}

func (pc *ProducerConsumer) AddTask(task Task) {
    pc.mu.Lock()
    defer pc.mu.Unlock()
    
    pc.tasks = append(pc.tasks, task)
    
    // 动态调整消费者数量
    if len(pc.tasks) > pc.workers*pc.batchSize && pc.workers < pc.maxWorkers {
        pc.workers++
        pc.wg.Add(1)
        go pc.worker(pc.workers - 1)
    }
    
    pc.cond.Signal()
}

func (pc *ProducerConsumer) Shutdown() {
    pc.mu.Lock()
    defer pc.mu.Unlock()
    
    pc.shutdown = true
    pc.cond.Broadcast()
    pc.wg.Wait()
}

6.3 线程安全的阻塞队列

场景描述:在企业级应用中,需要一个线程安全的阻塞队列,支持入队、出队、大小查询等操作。

使用方法:使用条件变量来实现队列的阻塞操作,当队列为空时阻塞出队操作,当队列满时阻塞入队操作。

示例代码

go
package queue

import (
    "sync"
)

type BlockingQueue struct {
    items    []interface{}
    capacity int
    mu       sync.Mutex
    cond     *sync.Cond
}

func NewBlockingQueue(capacity int) *BlockingQueue {
    bq := &BlockingQueue{
        items:    make([]interface{}, 0, capacity),
        capacity: capacity,
    }
    bq.cond = sync.NewCond(&bq.mu)
    return bq
}

func (bq *BlockingQueue) Enqueue(item interface{}) {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    for len(bq.items) >= bq.capacity {
        bq.cond.Wait()
    }
    
    bq.items = append(bq.items, item)
    bq.cond.Signal()
}

func (bq *BlockingQueue) Dequeue() interface{} {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    for len(bq.items) == 0 {
        bq.cond.Wait()
    }
    
    item := bq.items[0]
    bq.items = bq.items[1:]
    bq.cond.Signal()
    
    return item
}

func (bq *BlockingQueue) Size() int {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items)
}

func (bq *BlockingQueue) IsEmpty() bool {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items) == 0
}

func (bq *BlockingQueue) IsFull() bool {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items) >= bq.capacity
}

6.4 条件变量的监控和管理

场景描述:在企业级应用中,需要监控和管理条件变量的使用情况,如等待时间、唤醒次数等。

使用方法:封装条件变量,添加监控和管理功能。

示例代码

go
package monitor

import (
    "sync"
    "time"
)

type MonitoredCond struct {
    cond         *sync.Cond
    waitCount    int64
    signalCount  int64
    broadcastCount int64
    maxWaitTime  time.Duration
    totalWaitTime time.Duration
    mu           sync.Mutex
}

func NewMonitoredCond(l sync.Locker) *MonitoredCond {
    return &MonitoredCond{
        cond: sync.NewCond(l),
    }
}

func (mc *MonitoredCond) Wait() {
    start := time.Now()
    mc.cond.Wait()
    duration := time.Since(start)
    
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    mc.waitCount++
    mc.totalWaitTime += duration
    if duration > mc.maxWaitTime {
        mc.maxWaitTime = duration
    }
}

func (mc *MonitoredCond) Signal() {
    mc.cond.Signal()
    
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    mc.signalCount++
}

func (mc *MonitoredCond) Broadcast() {
    mc.cond.Broadcast()
    
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    mc.broadcastCount++
}

func (mc *MonitoredCond) Stats() map[string]interface{} {
    mc.mu.Lock()
    defer mc.mu.Unlock()
    
    return map[string]interface{}{
        "waitCount":      mc.waitCount,
        "signalCount":    mc.signalCount,
        "broadcastCount": mc.broadcastCount,
        "maxWaitTime":    mc.maxWaitTime,
        "totalWaitTime":  mc.totalWaitTime,
        "avgWaitTime":    mc.totalWaitTime / time.Duration(mc.waitCount),
    }
}

6.5 条件变量在并发测试中的应用

场景描述:在企业级应用中,需要编写并发测试,验证系统的并发性能和正确性。

使用方法:使用条件变量来协调测试 Goroutine,确保测试的正确性和可重复性。

示例代码

go
package concurrency

import (
    "sync"
    "testing"
    "time"
)

func TestConcurrentAccess(t *testing.T) {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var ready bool
    var wg sync.WaitGroup
    var counter int
    
    // 启动 1000 个 Goroutine 并发访问
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()
            for !ready {
                cond.Wait()
            }
            counter++
            mu.Unlock()
        }()
    }
    
    // 等待所有 Goroutine 就绪
    time.Sleep(time.Millisecond * 100)
    
    // 通知所有 Goroutine 开始
    mu.Lock()
    ready = true
    cond.Broadcast()
    mu.Unlock()
    
    wg.Wait()
    
    if counter != 1000 {
        t.Errorf("Expected counter to be 1000, got %d", counter)
    }
}

7. 行业最佳实践

7.1 始终使用 for 循环检查条件

实践内容:使用 for 循环检查条件,而不是 if 语句,避免虚假唤醒。

推荐理由:虚假唤醒是操作系统或硬件的正常行为,使用 for 循环可以确保在虚假唤醒时重新检查条件。

示例

go
// 正确示例:使用 for 循环
mu.Lock()
for !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

7.2 确保在持有锁时调用 Wait、Signal 和 Broadcast

实践内容:确保在调用 Wait()Signal()Broadcast() 时持有锁。

推荐理由:这些方法需要在持有锁的情况下调用,否则会导致运行时错误或竞态条件。

示例

go
// 正确示例:持有锁时调用
mu.Lock()
cond.Signal()
mu.Unlock()

7.3 合理设计条件变量的使用场景

实践内容:只在需要等待特定条件成立的场景中使用条件变量,对于简单的并发场景,考虑使用通道。

推荐理由:条件变量的使用复杂度较高,对于简单的并发场景,通道更加简洁和安全。

示例

go
// 简单场景使用通道
ch := make(chan string)

func producer() {
    ch <- "produced"
}

func consumer() {
    data := <-ch
    fmt.Println(data)
}

// 复杂场景使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string

func producer() {
    mu.Lock()
    data = "produced"
    cond.Signal()
    mu.Unlock()
}

func consumer() {
    mu.Lock()
    for data == "" {
        cond.Wait()
    }
    fmt.Println(data)
    data = ""
    mu.Unlock()
}

7.4 避免在条件变量的等待中执行耗时操作

实践内容:避免在持有锁的情况下执行耗时操作,尽量减少锁的持有时间。

推荐理由:长时间持有锁会导致其他 Goroutine 无法获取锁,降低并发性能。

示例

go
// 错误示例:在持有锁时执行耗时操作
mu.Lock()
for !condition {
    cond.Wait()
}
// 执行耗时操作
processData() // 错误:在持有锁时执行耗时操作
mu.Unlock()

// 正确示例:释放锁后执行耗时操作
mu.Lock()
for !condition {
    cond.Wait()
}
// 保存数据
localData := data
mu.Unlock()
// 执行耗时操作
processData(localData) // 正确:释放锁后执行耗时操作

7.5 监控条件变量的使用情况

实践内容:在生产环境中监控条件变量的使用情况,如等待时间、唤醒次数等。

推荐理由:监控条件变量的使用情况可以帮助发现潜在的性能问题和死锁风险。

示例

go
// 使用监控包装器
mc := NewMonitoredCond(&mu)

// 定期输出统计信息
go func() {
    for {
        time.Sleep(time.Minute)
        stats := mc.Stats()
        fmt.Printf("Cond stats: %v\n", stats)
    }
}()

7.6 合理设置超时机制

实践内容:为条件变量的等待设置超时机制,避免无限等待。

推荐理由:设置超时机制可以避免因条件永远不满足而导致的 Goroutine 泄漏。

示例

go
// 实现超时等待
mu.Lock()
timeout := time.After(time.Second * 5)
for !condition {
    ch := make(chan struct{})
    go func() {
        cond.Wait()
        close(ch)
    }()
    
    select {
    case <-ch:
        // 条件满足,继续检查
    case <-timeout:
        // 超时,退出
        mu.Unlock()
        return errors.New("timeout waiting for condition")
    }
}
// 处理逻辑
mu.Unlock()

7.7 避免嵌套条件变量

实践内容:避免在一个条件变量的等待中使用另一个条件变量,减少复杂度。

推荐理由:嵌套条件变量会增加代码复杂度,容易导致死锁。

示例

go
// 错误示例:嵌套条件变量
var mu1, mu2 sync.Mutex
cond1 := sync.NewCond(&mu1)
cond2 := sync.NewCond(&mu2)

func worker() {
    mu1.Lock()
    for !condition1 {
        cond1.Wait()
        mu2.Lock()
        for !condition2 {
            cond2.Wait()
        }
        // 处理逻辑
        mu2.Unlock()
    }
    mu1.Unlock()
}

// 正确示例:避免嵌套
var mu sync.Mutex
cond := sync.NewCond(&mu)

func worker() {
    mu.Lock()
    for !condition1 || !condition2 {
        cond.Wait()
    }
    // 处理逻辑
    mu.Unlock()
}

7.8 文档和注释

实践内容:为条件变量的使用添加详细的文档和注释,说明条件的含义和使用方式。

推荐理由:条件变量的使用复杂度较高,详细的文档和注释可以帮助其他开发者理解代码。

示例

go
// Cond 用于等待服务启动完成
// 条件:ready == true
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool

// WaitForServiceStart 等待服务启动完成
func WaitForServiceStart() {
    mu.Lock()
    defer mu.Unlock()
    // 使用 for 循环检查条件,避免虚假唤醒
    for !ready {
        cond.Wait()
    }
}

// StartService 启动服务并通知等待的 Goroutine
func StartService() {
    mu.Lock()
    defer mu.Unlock()
    // 启动服务的逻辑
    ready = true
    // 唤醒所有等待的 Goroutine
    cond.Broadcast()
}

8. 常见问题答疑(FAQ)

8.1 条件变量和通道有什么区别?

问题描述:条件变量和通道都是 Go 语言中用于并发同步的工具,它们有什么区别?

回答内容

  • 条件变量
    • 与互斥锁配合使用,用于等待特定条件成立。
    • 可以唤醒一个或所有等待的 Goroutine。
    • 适用于复杂的同步场景,如生产者-消费者模式、线程池等。
    • 使用复杂度较高,需要手动管理锁。
  • 通道
    • 用于 Goroutine 间的通信,自动处理同步。
    • 可以传递数据,也可以用作信号。
    • 适用于简单的同步场景,如信号通知、任务分发等。
    • 使用简单,自动管理同步。

示例代码

go
// 使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string

func producer() {
    mu.Lock()
    data = "produced"
    cond.Signal()
    mu.Unlock()
}

func consumer() {
    mu.Lock()
    for data == "" {
        cond.Wait()
    }
    fmt.Println(data)
    data = ""
    mu.Unlock()
}

// 使用通道
ch := make(chan string)

func producer() {
    ch <- "produced"
}

func consumer() {
    data := <-ch
    fmt.Println(data)
}

8.2 为什么需要使用 for 循环检查条件?

问题描述:为什么使用条件变量时需要使用 for 循环检查条件,而不是 if 语句?

回答内容

  • 虚假唤醒:操作系统或硬件可能会导致 Goroutine 从 Wait() 返回,但条件并未满足,这称为虚假唤醒。
  • 并发修改:当多个 Goroutine 同时等待同一个条件时,一个 Goroutine 被唤醒并修改了条件,其他被唤醒的 Goroutine 可能会发现条件已经不满足。
  • 安全性:使用 for 循环可以确保在这些情况下重新检查条件,避免错误地继续执行。

示例代码

go
// 错误示例:使用 if 语句
mu.Lock()
if !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

// 正确示例:使用 for 循环
mu.Lock()
for !condition {
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

8.3 条件变量的 Wait 方法是如何工作的?

问题描述:条件变量的 Wait() 方法是如何工作的?

回答内容

  • Wait() 方法会自动释放锁,允许其他 Goroutine 获取锁。
  • 然后,Wait() 方法会阻塞当前 Goroutine,等待被唤醒。
  • 当被唤醒时,Wait() 方法会重新获取锁,然后返回。
  • 因此,在调用 Wait() 之前必须持有锁,返回后仍然持有锁。

示例代码

go
mu.Lock()
for !condition {
    // 调用 Wait() 时会释放锁,被唤醒时会重新获取锁
    cond.Wait()
}
// 处理逻辑
mu.Unlock()

8.4 Signal 和 Broadcast 有什么区别?

问题描述:条件变量的 Signal()Broadcast() 方法有什么区别?

回答内容

  • Signal():唤醒一个等待的 Goroutine,通常用于只需要一个 Goroutine 处理的场景。
  • Broadcast():唤醒所有等待的 Goroutine,通常用于需要多个 Goroutine 处理的场景,或当条件的变化影响所有等待的 Goroutine 时。

示例代码

go
// 唤醒一个 Goroutine
cond.Signal()

// 唤醒所有 Goroutine
cond.Broadcast()

8.5 条件变量是否可以与 RWMutex 配合使用?

问题描述:条件变量是否可以与 RWMutex 配合使用?

回答内容

  • 是的,条件变量可以与 RWMutex 配合使用。
  • 当使用 RWMutex 时,需要确保在调用 Wait()Signal()Broadcast() 时持有写锁,因为这些方法需要修改条件变量的内部状态。

示例代码

go
var rwmu sync.RWMutex
cond := sync.NewCond(&rwmu)
var ready bool

func WaitForReady() {
    rwmu.Lock() // 使用写锁
    for !ready {
        cond.Wait()
    }
    rwmu.Unlock()
}

func SetReady() {
    rwmu.Lock() // 使用写锁
    ready = true
    cond.Broadcast()
    rwmu.Unlock()
}

8.6 如何实现条件变量的超时等待?

问题描述:如何实现条件变量的超时等待,避免无限等待?

回答内容

  • 可以结合通道和 select 语句实现条件变量的超时等待。
  • 创建一个通道来接收条件满足的信号,同时使用 time.After() 创建一个超时通道。
  • 使用 select 语句等待这两个通道中的任何一个。

示例代码

go
func WaitWithTimeout(cond *sync.Cond, timeout time.Duration) bool {
    ch := make(chan struct{})
    go func() {
        cond.Wait()
        close(ch)
    }()
    
    select {
    case <-ch:
        return true // 条件满足
    case <-time.After(timeout):
        return false // 超时
    }
}

// 使用示例
mu.Lock()
timedOut := false
for !condition {
    mu.Unlock()
    timedOut = !WaitWithTimeout(cond, time.Second*5)
    mu.Lock()
    if timedOut {
        break
    }
}
if timedOut {
    // 处理超时
} else {
    // 处理条件满足
}
mu.Unlock()

9. 实战练习

9.1 基础练习:生产者-消费者模式

题目:使用条件变量实现一个简单的生产者-消费者模式,生产者生成数据,消费者消费数据。

解题思路

  • 使用条件变量来通知消费者数据已生产,消费者等待数据可用。
  • 使用互斥锁来保护共享数据。
  • 使用 for 循环检查条件,避免虚假唤醒。

常见误区

  • 忘记使用 for 循环检查条件,导致虚假唤醒。
  • 忘记在持有锁时调用 Signal()Broadcast(),导致唤醒信号丢失。
  • 没有正确管理锁的获取和释放,导致死锁。

分步提示

  1. 定义共享数据和互斥锁。
  2. 创建条件变量。
  3. 实现生产者函数,生成数据并通知消费者。
  4. 实现消费者函数,等待数据可用并消费数据。
  5. 启动生产者和消费者 Goroutine。
  6. 等待所有 Goroutine 完成。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    cond := sync.NewCond(&mu)
    var data string
    var wg sync.WaitGroup
    
    // 消费者
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            for data == "" {
                fmt.Println("Consumer: waiting for data")
                cond.Wait()
            }
            fmt.Printf("Consumer: got data: %s\n", data)
            data = ""
            mu.Unlock()
            time.Sleep(time.Millisecond * 500)
        }
    }()
    
    // 生产者
    for i := 0; i < 5; i++ {
        time.Sleep(time.Second)
        mu.Lock()
        data = fmt.Sprintf("data-%d", i)
        fmt.Printf("Producer: produced %s\n", data)
        cond.Signal()
        mu.Unlock()
    }
    
    wg.Wait()
}

9.2 进阶练习:线程池

题目:使用条件变量实现一个线程池,支持动态添加任务和关闭线程池。

解题思路

  • 使用条件变量来通知工作线程有任务可用,工作线程等待任务。
  • 使用互斥锁来保护任务队列和线程池状态。
  • 实现任务添加、线程池启动和关闭功能。

常见误区

  • 没有正确处理线程池的关闭逻辑,导致工作线程无法退出。
  • 没有使用 for 循环检查条件,导致虚假唤醒。
  • 任务队列管理不当,导致任务丢失或重复处理。

分步提示

  1. 定义任务结构体和线程池结构体。
  2. 实现线程池的启动方法,创建工作线程。
  3. 实现工作线程函数,等待任务并处理。
  4. 实现任务添加方法,将任务加入队列并通知工作线程。
  5. 实现线程池的关闭方法,通知所有工作线程退出。
  6. 测试线程池的功能。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID int
}

type Pool struct {
    tasks      []Task
    mu         sync.Mutex
    cond       *sync.Cond
    workers    int
    wg         sync.WaitGroup
    shutdown   bool
}

func NewPool(workers int) *Pool {
    p := &Pool{
        tasks:    make([]Task, 0),
        workers:  workers,
        shutdown: false,
    }
    p.cond = sync.NewCond(&p.mu)
    return p
}

func (p *Pool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go p.worker(i)
    }
}

func (p *Pool) worker(id int) {
    defer p.wg.Done()
    for {
        p.mu.Lock()
        for len(p.tasks) == 0 && !p.shutdown {
            fmt.Printf("Worker %d: waiting for tasks\n", id)
            p.cond.Wait()
        }
        if p.shutdown {
            fmt.Printf("Worker %d: shutting down\n", id)
            p.mu.Unlock()
            return
        }
        task := p.tasks[0]
        p.tasks = p.tasks[1:]
        p.mu.Unlock()
        fmt.Printf("Worker %d: processing task %d\n", id, task.ID)
        time.Sleep(time.Millisecond * 500)
    }
}

func (p *Pool) AddTask(task Task) {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.tasks = append(p.tasks, task)
    fmt.Printf("Added task %d\n", task.ID)
    p.cond.Signal()
}

func (p *Pool) Shutdown() {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.shutdown = true
    p.cond.Broadcast()
    p.wg.Wait()
    fmt.Println("Pool shutdown completed")
}

func main() {
    pool := NewPool(3)
    pool.Start()
    
    // 添加任务
    for i := 0; i < 10; i++ {
        pool.AddTask(Task{ID: i})
        time.Sleep(time.Millisecond * 100)
    }
    
    // 关闭线程池
    time.Sleep(time.Second * 2)
    pool.Shutdown()
}

9.3 挑战练习:阻塞队列

题目:使用条件变量实现一个阻塞队列,支持入队、出队、大小查询等操作,当队列为空时阻塞出队操作,当队列满时阻塞入队操作。

解题思路

  • 使用条件变量来实现队列的阻塞操作。
  • 使用互斥锁来保护队列的状态。
  • 实现入队、出队、大小查询等方法。

常见误区

  • 没有正确处理队列满和队列空的情况,导致阻塞操作无法正常工作。
  • 没有使用 for 循环检查条件,导致虚假唤醒。
  • 队列管理不当,导致数据丢失或重复处理。

分步提示

  1. 定义阻塞队列结构体,包含队列、容量、互斥锁和条件变量。
  2. 实现入队方法,当队列满时阻塞。
  3. 实现出队方法,当队列空时阻塞。
  4. 实现大小查询、判空、判满等方法。
  5. 测试阻塞队列的功能。

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type BlockingQueue struct {
    items    []interface{}
    capacity int
    mu       sync.Mutex
    cond     *sync.Cond
}

func NewBlockingQueue(capacity int) *BlockingQueue {
    bq := &BlockingQueue{
        items:    make([]interface{}, 0, capacity),
        capacity: capacity,
    }
    bq.cond = sync.NewCond(&bq.mu)
    return bq
}

func (bq *BlockingQueue) Enqueue(item interface{}) {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    for len(bq.items) >= bq.capacity {
        fmt.Println("Queue full, waiting to enqueue")
        bq.cond.Wait()
    }
    
    bq.items = append(bq.items, item)
    fmt.Printf("Enqueued: %v, queue size: %d\n", item, len(bq.items))
    bq.cond.Signal()
}

func (bq *BlockingQueue) Dequeue() interface{} {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    for len(bq.items) == 0 {
        fmt.Println("Queue empty, waiting to dequeue")
        bq.cond.Wait()
    }
    
    item := bq.items[0]
    bq.items = bq.items[1:]
    fmt.Printf("Dequeued: %v, queue size: %d\n", item, len(bq.items))
    bq.cond.Signal()
    
    return item
}

func (bq *BlockingQueue) Size() int {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items)
}

func (bq *BlockingQueue) IsEmpty() bool {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items) == 0
}

func (bq *BlockingQueue) IsFull() bool {
    bq.mu.Lock()
    defer bq.mu.Unlock()
    
    return len(bq.items) >= bq.capacity
}

func main() {
    bq := NewBlockingQueue(5)
    var wg sync.WaitGroup
    
    // 启动 2 个生产者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                item := fmt.Sprintf("item-%d-%d", id, j)
                bq.Enqueue(item)
                time.Sleep(time.Millisecond * 100)
            }
        }(i)
    }
    
    // 启动 2 个消费者
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for j := 0; j < 10; j++ {
                item := bq.Dequeue()
                fmt.Printf("Consumer %d got: %v\n", id, item)
                time.Sleep(time.Millisecond * 200)
            }
        }(i)
    }
    
    wg.Wait()
    fmt.Println("All goroutines completed")
}

10. 知识点总结

10.1 核心要点

  • 条件变量的特点:与互斥锁配合使用,用于等待特定条件成立,支持唤醒一个或所有等待的 Goroutine。
  • 基本操作
    • Wait():等待条件满足,自动释放和重新获取锁。
    • Signal():唤醒一个等待的 Goroutine。
    • Broadcast():唤醒所有等待的 Goroutine。
  • 实现原理:使用等待队列存储等待的 Goroutine,通过信号机制唤醒 Goroutine。
  • 内存模型:确保操作的顺序性,保证并发操作的正确性。
  • 使用场景:生产者-消费者模式、线程池、阻塞队列、分布式锁等。
  • 虚假唤醒:需要使用 for 循环检查条件,避免虚假唤醒。

10.2 易错点回顾

  • 忘记获取锁:在调用 Wait()Signal()Broadcast() 时没有获取锁。
  • 使用 if 语句检查条件:使用 if 语句检查条件,而不是 for 循环,无法处理虚假唤醒。
  • 复制条件变量:条件变量是结构体,不是引用类型,复制后会创建一个新的实例。
  • 死锁:多个 Goroutine 循环等待对方释放锁,或条件变量的使用不当导致 Goroutine 永远等待。
  • 唤醒丢失:在条件满足后、唤醒之前,没有持有锁,导致唤醒信号丢失。
  • 过度使用条件变量:对于简单的并发场景,过度使用条件变量而不是使用通道。

11. 拓展参考资料

11.1 官方文档链接

11.2 进阶学习路径建议

  • 并发编程基础:学习 Goroutine、Channel、WaitGroup 等基本概念。
  • 同步原语:深入学习 Mutex、RWMutex、Once、Cond 等同步原语。
  • 并发模式:学习生产者-消费者、工作池、扇入扇出等并发模式。
  • 性能优化:学习如何减少锁竞争、使用无锁数据结构、优化并发性能。
  • 分布式并发:学习分布式系统中的并发控制和一致性协议。
  • 并发测试:学习如何编写并发测试,验证系统的并发性能和正确性。

11.3 相关资源