Appearance
条件变量 Cond
1. 概述
条件变量(Cond)是 Go 语言中一种特殊的同步原语,它允许 Goroutine 在特定条件满足时被唤醒。条件变量通常与互斥锁(Mutex)配合使用,用于解决生产者-消费者问题、等待某个条件成立等场景。
在整个 Go 语言课程体系中,条件变量是并发编程的重要组件之一,与 Mutex、RWMutex、通道等一起构成了 Go 语言并发模型的核心。掌握条件变量的使用和原理,对于构建复杂的并发系统至关重要。
2. 基本概念
2.1 语法
2.1.1 基本用法
go
import "sync"
// 创建条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
// 等待条件满足
mu.Lock()
for !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()
// 唤醒一个等待的 Goroutine
cond.Signal()
// 唤醒所有等待的 Goroutine
cond.Broadcast()2.1.2 示例代码
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
// 启动一个 Goroutine 等待条件满足
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
for !ready {
fmt.Println("Waiting for ready...")
cond.Wait()
}
fmt.Println("Condition met, proceeding...")
mu.Unlock()
}()
// 等待一段时间后设置条件为 true 并唤醒等待的 Goroutine
time.Sleep(time.Second)
mu.Lock()
ready = true
fmt.Println("Setting ready to true and signaling...")
cond.Signal()
mu.Unlock()
wg.Wait()
fmt.Println("All goroutines completed")
}2.2 语义
- Wait():等待条件满足。调用时会自动释放锁,当被唤醒时会重新获取锁。
- Signal():唤醒一个等待的 Goroutine。
- Broadcast():唤醒所有等待的 Goroutine。
- 与 Mutex 配合:条件变量必须与互斥锁配合使用,确保对条件的检查和修改是原子的。
- 零值不可用:条件变量的零值不可用,必须使用
sync.NewCond创建。 - 不可复制:条件变量是结构体,不是引用类型,不要复制使用中的条件变量。
2.3 规范
- 命名规范:条件变量通常命名为
cond。 - 使用顺序:
- 获取互斥锁。
- 检查条件是否满足,如果不满足,调用
Wait()。 - 处理逻辑。
- 释放互斥锁。
- 在适当的时候调用
Signal()或Broadcast()唤醒等待的 Goroutine。
- 循环检查:使用
for循环检查条件,而不是if语句,避免虚假唤醒。 - 锁的管理:确保在调用
Wait()前获取锁,在调用Signal()或Broadcast()时也持有锁。 - 不可复制:通过指针传递条件变量,避免复制它。
3. 原理深度解析
3.1 Cond 结构体
Cond 的底层实现是一个结构体,在 Go 语言中,其结构如下:
go
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}其中:
L:一个 Locker 接口类型,通常是sync.Mutex或sync.RWMutex。notifyList:一个等待队列,存储等待的 Goroutine。checker:用于检测条件变量是否被复制。
3.2 Wait 方法实现
Wait 方法的主要步骤:
- 检查条件变量是否被复制。
- 将当前 Goroutine 添加到等待队列。
- 释放锁。
- 阻塞当前 Goroutine,等待被唤醒。
- 当被唤醒时,重新获取锁。
- 返回。
3.3 Signal 方法实现
Signal 方法的主要步骤:
- 检查条件变量是否被复制。
- 从等待队列中取出一个 Goroutine。
- 唤醒该 Goroutine。
3.4 Broadcast 方法实现
Broadcast 方法的主要步骤:
- 检查条件变量是否被复制。
- 从等待队列中取出所有 Goroutine。
- 唤醒所有 Goroutine。
3.5 内存模型
条件变量遵循 Go 语言的内存模型,确保以下顺序:
- 在调用
Signal()或Broadcast()之前的所有操作,发生在被唤醒的 Goroutine 从Wait()返回之后。 - 在调用
Wait()之前的所有操作,发生在其他 Goroutine 调用Signal()或Broadcast()之前。
3.6 虚假唤醒
虚假唤醒是指 Goroutine 从 Wait() 返回,但条件并未满足的情况。这是由于操作系统或硬件的原因导致的,因此必须使用 for 循环来检查条件,而不是 if 语句。
go
// 错误示例:使用 if 语句检查条件
mu.Lock()
if !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()
// 正确示例:使用 for 循环检查条件
mu.Lock()
for !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()4. 常见错误与踩坑点
4.1 忘记获取锁
错误表现:运行时错误或竞态条件。
产生原因:在调用 Wait()、Signal() 或 Broadcast() 时没有获取锁。
解决方案:确保在调用这些方法时持有锁。
go
// 错误示例:忘记获取锁
cond.Wait() // 错误:没有获取锁
// 正确示例:获取锁后调用
mu.Lock()
for !condition {
cond.Wait()
}
mu.Unlock()4.2 使用 if 语句检查条件
错误表现:虚假唤醒导致条件未满足时继续执行。
产生原因:使用 if 语句检查条件,而不是 for 循环,无法处理虚假唤醒。
解决方案:使用 for 循环检查条件,确保在虚假唤醒时重新检查。
go
// 错误示例:使用 if 语句
mu.Lock()
if !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()
// 正确示例:使用 for 循环
mu.Lock()
for !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()4.3 复制条件变量
错误表现:运行时错误或不可预期的行为。
产生原因:条件变量是结构体,不是引用类型,复制后会创建一个新的实例,与原实例状态无关。
解决方案:通过指针传递条件变量,避免复制它。
go
// 错误示例:复制条件变量
func worker(cond sync.Cond) { // 复制条件变量
// 处理逻辑
}
// 正确示例:通过指针传递
func worker(cond *sync.Cond) { // 通过指针传递
// 处理逻辑
}4.4 死锁
错误表现:程序卡住,无法继续执行。
产生原因:
- 多个 Goroutine 循环等待对方释放锁。
- 条件变量的使用不当,导致 Goroutine 永远等待。
解决方案:
- 保持一致的锁获取顺序。
- 确保条件最终会被满足,避免无限等待。
- 使用超时机制,避免无限等待。
go
// 错误示例:可能导致死锁
func producer() {
mu.Lock()
// 生产数据
data = "produced"
cond.Signal()
mu.Unlock()
}
func consumer() {
mu.Lock()
for data == "" {
cond.Wait()
}
// 消费数据
data = ""
mu.Unlock()
}
// 正确示例:确保条件最终会被满足
func producer() {
mu.Lock()
// 生产数据
data = "produced"
cond.Signal()
mu.Unlock()
}
func consumer() {
mu.Lock()
for data == "" {
cond.Wait()
}
// 消费数据
data = ""
mu.Unlock()
}4.5 唤醒丢失
错误表现:Goroutine 永远等待,无法被唤醒。
产生原因:在条件满足后、唤醒之前,没有持有锁,导致唤醒信号丢失。
解决方案:确保在修改条件和唤醒 Goroutine 时持有锁。
go
// 错误示例:唤醒丢失
mu.Lock()
ready = true
mu.Unlock()
cond.Signal() // 错误:唤醒时没有持有锁
// 正确示例:唤醒时持有锁
mu.Lock()
ready = true
cond.Signal() // 正确:唤醒时持有锁
mu.Unlock()4.6 过度使用条件变量
错误表现:代码复杂度增加,可读性下降。
产生原因:对于简单的并发场景,过度使用条件变量而不是使用通道。
解决方案:对于简单的并发场景,考虑使用通道,而不是条件变量。
go
// 错误示例:过度使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string
func producer() {
mu.Lock()
data = "produced"
cond.Signal()
mu.Unlock()
}
func consumer() {
mu.Lock()
for data == "" {
cond.Wait()
}
fmt.Println(data)
data = ""
mu.Unlock()
}
// 正确示例:使用通道
ch := make(chan string)
func producer() {
ch <- "produced"
}
func consumer() {
data := <-ch
fmt.Println(data)
}5. 常见应用场景
5.1 生产者-消费者模式
场景描述:一个或多个生产者生成数据,一个或多个消费者消费数据,需要同步生产和消费的节奏。
使用方法:使用条件变量来通知消费者数据已生产,消费者等待数据可用。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string
var wg sync.WaitGroup
// 消费者
wg.Add(2)
for i := 0; i < 2; i++ {
go func(id int) {
defer wg.Done()
for {
mu.Lock()
for data == "" {
fmt.Printf("Consumer %d: waiting for data\n", id)
cond.Wait()
}
fmt.Printf("Consumer %d: got data: %s\n", id, data)
data = ""
mu.Unlock()
time.Sleep(time.Millisecond * 500)
}
}(i)
}
// 生产者
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
mu.Lock()
data = fmt.Sprintf("data-%d", i)
fmt.Printf("Producer: produced %s\n", data)
cond.Broadcast()
mu.Unlock()
}
wg.Wait()
}5.2 等待某个条件成立
场景描述:需要等待某个条件成立后再继续执行,如等待服务启动完成、等待资源可用等。
使用方法:使用条件变量来等待条件成立,当条件成立时唤醒等待的 Goroutine。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
// 等待服务启动
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
for !ready {
fmt.Println("Waiting for service to start...")
cond.Wait()
}
fmt.Println("Service started, proceeding...")
mu.Unlock()
}()
// 模拟服务启动
time.Sleep(time.Second * 2)
mu.Lock()
ready = true
fmt.Println("Service started, notifying waiters...")
cond.Signal()
mu.Unlock()
wg.Wait()
fmt.Println("All goroutines completed")
}5.3 线程池
场景描述:实现一个线程池,当有任务时唤醒工作线程,当没有任务时工作线程等待。
使用方法:使用条件变量来通知工作线程有任务可用,工作线程等待任务。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
type Pool struct {
tasks []Task
mu sync.Mutex
cond *sync.Cond
workers int
wg sync.WaitGroup
shutdown bool
}
func NewPool(workers int) *Pool {
p := &Pool{
tasks: make([]Task, 0),
workers: workers,
shutdown: false,
}
p.cond = sync.NewCond(&p.mu)
return p
}
func (p *Pool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for {
p.mu.Lock()
for len(p.tasks) == 0 && !p.shutdown {
fmt.Printf("Worker %d: waiting for tasks\n", id)
p.cond.Wait()
}
if p.shutdown {
fmt.Printf("Worker %d: shutting down\n", id)
p.mu.Unlock()
return
}
task := p.tasks[0]
p.tasks = p.tasks[1:]
p.mu.Unlock()
fmt.Printf("Worker %d: processing task %d\n", id, task.ID)
time.Sleep(time.Millisecond * 500)
}
}
func (p *Pool) AddTask(task Task) {
p.mu.Lock()
defer p.mu.Unlock()
p.tasks = append(p.tasks, task)
fmt.Printf("Added task %d\n", task.ID)
p.cond.Signal()
}
func (p *Pool) Shutdown() {
p.mu.Lock()
defer p.mu.Unlock()
p.shutdown = true
p.cond.Broadcast()
p.wg.Wait()
fmt.Println("Pool shutdown completed")
}
func main() {
pool := NewPool(3)
pool.Start()
// 添加任务
for i := 0; i < 10; i++ {
pool.AddTask(Task{ID: i})
time.Sleep(time.Millisecond * 100)
}
// 关闭线程池
time.Sleep(time.Second * 2)
pool.Shutdown()
}5.4 读写锁的实现
场景描述:实现一个读写锁,允许多个读操作同时进行,写操作独占。
使用方法:使用条件变量来管理读锁和写锁的状态,当读锁释放时通知等待的写锁,当写锁释放时通知等待的读锁。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type RWMutex struct {
mu sync.Mutex
cond *sync.Cond
readers int
writer bool
readWait int
writeWait int
}
func NewRWMutex() *RWMutex {
rw := &RWMutex{}
rw.cond = sync.NewCond(&rw.mu)
return rw
}
func (rw *RWMutex) RLock() {
rw.mu.Lock()
defer rw.mu.Unlock()
for rw.writer || rw.writeWait > 0 {
rw.readWait++
rw.cond.Wait()
rw.readWait--
}
rw.readers++
}
func (rw *RWMutex) RUnlock() {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.readers--
if rw.readers == 0 && rw.writeWait > 0 {
rw.cond.Broadcast()
}
}
func (rw *RWMutex) Lock() {
rw.mu.Lock()
defer rw.mu.Unlock()
for rw.writer || rw.readers > 0 {
rw.writeWait++
rw.cond.Wait()
rw.writeWait--
}
rw.writer = true
}
func (rw *RWMutex) Unlock() {
rw.mu.Lock()
defer rw.mu.Unlock()
rw.writer = false
rw.cond.Broadcast()
}
func main() {
rw := NewRWMutex()
var wg sync.WaitGroup
// 启动 10 个读 Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
rw.RLock()
defer rw.RUnlock()
fmt.Printf("Reader %d: reading\n", id)
time.Sleep(time.Millisecond * 100)
}(i)
}
// 启动 2 个写 Goroutine
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Millisecond * 50)
rw.Lock()
defer rw.Unlock()
fmt.Printf("Writer %d: writing\n", id)
time.Sleep(time.Millisecond * 100)
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}5.5 条件变量的超时等待
场景描述:需要在等待条件成立时设置超时,避免无限等待。
使用方法:结合通道和 select 语句实现条件变量的超时等待。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
// 实现超时等待
timeout := time.After(time.Second * 2)
for !ready {
// 创建一个通道来接收信号
ch := make(chan struct{})
go func() {
cond.Wait()
close(ch)
}()
select {
case <-ch:
// 条件满足,继续检查
case <-timeout:
// 超时,退出
fmt.Println("Timeout waiting for condition")
return
}
}
fmt.Println("Condition met, proceeding...")
}()
// 模拟条件永远不满足
// time.Sleep(time.Second * 3)
// mu.Lock()
// ready = true
// cond.Signal()
// mu.Unlock()
wg.Wait()
fmt.Println("All goroutines completed")
}6. 企业级进阶应用场景
6.1 分布式锁
场景描述:在分布式系统中,需要一个分布式锁来协调多个节点的操作。
使用方法:使用条件变量来实现分布式锁的本地部分,结合分布式协调服务(如 ZooKeeper、etcd)实现全局锁。
示例代码:
go
package distributed
import (
"sync"
"time"
)
type DistributedLock struct {
mu sync.Mutex
cond *sync.Cond
locked bool
owner string
lease time.Duration
renewCh chan struct{}
}
func NewDistributedLock(owner string, lease time.Duration) *DistributedLock {
dl := &DistributedLock{
owner: owner,
lease: lease,
renewCh: make(chan struct{}),
}
dl.cond = sync.NewCond(&dl.mu)
return dl
}
func (dl *DistributedLock) Lock() error {
dl.mu.Lock()
defer dl.mu.Unlock()
for dl.locked {
dl.cond.Wait()
}
// 在这里向分布式协调服务申请锁
// 例如,向 ZooKeeper 或 etcd 创建临时节点
dl.locked = true
// 启动租约续期 Goroutine
go dl.renewLease()
return nil
}
func (dl *DistributedLock) Unlock() error {
dl.mu.Lock()
defer dl.mu.Unlock()
if !dl.locked {
return nil
}
// 在这里向分布式协调服务释放锁
// 例如,删除 ZooKeeper 或 etcd 中的临时节点
dl.locked = false
close(dl.renewCh)
dl.cond.Signal()
return nil
}
func (dl *DistributedLock) renewLease() {
ticker := time.NewTicker(dl.lease / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 续期租约
// 例如,更新 ZooKeeper 或 etcd 中临时节点的过期时间
case <-dl.renewCh:
return
}
}
}6.2 生产者-消费者模式的高级实现
场景描述:在企业级应用中,需要一个高性能的生产者-消费者模式,支持动态调整消费者数量、批量处理等功能。
使用方法:使用条件变量来管理生产者和消费者的同步,结合通道和线程池实现高级功能。
示例代码:
go
package producerconsumer
import (
"sync"
"time"
)
type Task struct {
ID int
Payload interface{}
}
type ProducerConsumer struct {
tasks []Task
mu sync.Mutex
cond *sync.Cond
workers int
maxWorkers int
wg sync.WaitGroup
shutdown bool
batchSize int
}
func NewProducerConsumer(initialWorkers, maxWorkers, batchSize int) *ProducerConsumer {
pc := &ProducerConsumer{
tasks: make([]Task, 0),
workers: initialWorkers,
maxWorkers: maxWorkers,
batchSize: batchSize,
shutdown: false,
}
pc.cond = sync.NewCond(&pc.mu)
return pc
}
func (pc *ProducerConsumer) Start() {
for i := 0; i < pc.workers; i++ {
pc.wg.Add(1)
go pc.worker(i)
}
}
func (pc *ProducerConsumer) worker(id int) {
defer pc.wg.Done()
for {
pc.mu.Lock()
for len(pc.tasks) == 0 && !pc.shutdown {
pc.cond.Wait()
}
if pc.shutdown {
pc.mu.Unlock()
return
}
// 批量获取任务
batchSize := pc.batchSize
if len(pc.tasks) < batchSize {
batchSize = len(pc.tasks)
}
batch := make([]Task, batchSize)
copy(batch, pc.tasks[:batchSize])
pc.tasks = pc.tasks[batchSize:]
pc.mu.Unlock()
// 处理任务
for _, task := range batch {
// 处理任务逻辑
time.Sleep(time.Millisecond * 100)
}
}
}
func (pc *ProducerConsumer) AddTask(task Task) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.tasks = append(pc.tasks, task)
// 动态调整消费者数量
if len(pc.tasks) > pc.workers*pc.batchSize && pc.workers < pc.maxWorkers {
pc.workers++
pc.wg.Add(1)
go pc.worker(pc.workers - 1)
}
pc.cond.Signal()
}
func (pc *ProducerConsumer) Shutdown() {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.shutdown = true
pc.cond.Broadcast()
pc.wg.Wait()
}6.3 线程安全的阻塞队列
场景描述:在企业级应用中,需要一个线程安全的阻塞队列,支持入队、出队、大小查询等操作。
使用方法:使用条件变量来实现队列的阻塞操作,当队列为空时阻塞出队操作,当队列满时阻塞入队操作。
示例代码:
go
package queue
import (
"sync"
)
type BlockingQueue struct {
items []interface{}
capacity int
mu sync.Mutex
cond *sync.Cond
}
func NewBlockingQueue(capacity int) *BlockingQueue {
bq := &BlockingQueue{
items: make([]interface{}, 0, capacity),
capacity: capacity,
}
bq.cond = sync.NewCond(&bq.mu)
return bq
}
func (bq *BlockingQueue) Enqueue(item interface{}) {
bq.mu.Lock()
defer bq.mu.Unlock()
for len(bq.items) >= bq.capacity {
bq.cond.Wait()
}
bq.items = append(bq.items, item)
bq.cond.Signal()
}
func (bq *BlockingQueue) Dequeue() interface{} {
bq.mu.Lock()
defer bq.mu.Unlock()
for len(bq.items) == 0 {
bq.cond.Wait()
}
item := bq.items[0]
bq.items = bq.items[1:]
bq.cond.Signal()
return item
}
func (bq *BlockingQueue) Size() int {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items)
}
func (bq *BlockingQueue) IsEmpty() bool {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items) == 0
}
func (bq *BlockingQueue) IsFull() bool {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items) >= bq.capacity
}6.4 条件变量的监控和管理
场景描述:在企业级应用中,需要监控和管理条件变量的使用情况,如等待时间、唤醒次数等。
使用方法:封装条件变量,添加监控和管理功能。
示例代码:
go
package monitor
import (
"sync"
"time"
)
type MonitoredCond struct {
cond *sync.Cond
waitCount int64
signalCount int64
broadcastCount int64
maxWaitTime time.Duration
totalWaitTime time.Duration
mu sync.Mutex
}
func NewMonitoredCond(l sync.Locker) *MonitoredCond {
return &MonitoredCond{
cond: sync.NewCond(l),
}
}
func (mc *MonitoredCond) Wait() {
start := time.Now()
mc.cond.Wait()
duration := time.Since(start)
mc.mu.Lock()
defer mc.mu.Unlock()
mc.waitCount++
mc.totalWaitTime += duration
if duration > mc.maxWaitTime {
mc.maxWaitTime = duration
}
}
func (mc *MonitoredCond) Signal() {
mc.cond.Signal()
mc.mu.Lock()
defer mc.mu.Unlock()
mc.signalCount++
}
func (mc *MonitoredCond) Broadcast() {
mc.cond.Broadcast()
mc.mu.Lock()
defer mc.mu.Unlock()
mc.broadcastCount++
}
func (mc *MonitoredCond) Stats() map[string]interface{} {
mc.mu.Lock()
defer mc.mu.Unlock()
return map[string]interface{}{
"waitCount": mc.waitCount,
"signalCount": mc.signalCount,
"broadcastCount": mc.broadcastCount,
"maxWaitTime": mc.maxWaitTime,
"totalWaitTime": mc.totalWaitTime,
"avgWaitTime": mc.totalWaitTime / time.Duration(mc.waitCount),
}
}6.5 条件变量在并发测试中的应用
场景描述:在企业级应用中,需要编写并发测试,验证系统的并发性能和正确性。
使用方法:使用条件变量来协调测试 Goroutine,确保测试的正确性和可重复性。
示例代码:
go
package concurrency
import (
"sync"
"testing"
"time"
)
func TestConcurrentAccess(t *testing.T) {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
var wg sync.WaitGroup
var counter int
// 启动 1000 个 Goroutine 并发访问
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
for !ready {
cond.Wait()
}
counter++
mu.Unlock()
}()
}
// 等待所有 Goroutine 就绪
time.Sleep(time.Millisecond * 100)
// 通知所有 Goroutine 开始
mu.Lock()
ready = true
cond.Broadcast()
mu.Unlock()
wg.Wait()
if counter != 1000 {
t.Errorf("Expected counter to be 1000, got %d", counter)
}
}7. 行业最佳实践
7.1 始终使用 for 循环检查条件
实践内容:使用 for 循环检查条件,而不是 if 语句,避免虚假唤醒。
推荐理由:虚假唤醒是操作系统或硬件的正常行为,使用 for 循环可以确保在虚假唤醒时重新检查条件。
示例:
go
// 正确示例:使用 for 循环
mu.Lock()
for !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()7.2 确保在持有锁时调用 Wait、Signal 和 Broadcast
实践内容:确保在调用 Wait()、Signal() 和 Broadcast() 时持有锁。
推荐理由:这些方法需要在持有锁的情况下调用,否则会导致运行时错误或竞态条件。
示例:
go
// 正确示例:持有锁时调用
mu.Lock()
cond.Signal()
mu.Unlock()7.3 合理设计条件变量的使用场景
实践内容:只在需要等待特定条件成立的场景中使用条件变量,对于简单的并发场景,考虑使用通道。
推荐理由:条件变量的使用复杂度较高,对于简单的并发场景,通道更加简洁和安全。
示例:
go
// 简单场景使用通道
ch := make(chan string)
func producer() {
ch <- "produced"
}
func consumer() {
data := <-ch
fmt.Println(data)
}
// 复杂场景使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string
func producer() {
mu.Lock()
data = "produced"
cond.Signal()
mu.Unlock()
}
func consumer() {
mu.Lock()
for data == "" {
cond.Wait()
}
fmt.Println(data)
data = ""
mu.Unlock()
}7.4 避免在条件变量的等待中执行耗时操作
实践内容:避免在持有锁的情况下执行耗时操作,尽量减少锁的持有时间。
推荐理由:长时间持有锁会导致其他 Goroutine 无法获取锁,降低并发性能。
示例:
go
// 错误示例:在持有锁时执行耗时操作
mu.Lock()
for !condition {
cond.Wait()
}
// 执行耗时操作
processData() // 错误:在持有锁时执行耗时操作
mu.Unlock()
// 正确示例:释放锁后执行耗时操作
mu.Lock()
for !condition {
cond.Wait()
}
// 保存数据
localData := data
mu.Unlock()
// 执行耗时操作
processData(localData) // 正确:释放锁后执行耗时操作7.5 监控条件变量的使用情况
实践内容:在生产环境中监控条件变量的使用情况,如等待时间、唤醒次数等。
推荐理由:监控条件变量的使用情况可以帮助发现潜在的性能问题和死锁风险。
示例:
go
// 使用监控包装器
mc := NewMonitoredCond(&mu)
// 定期输出统计信息
go func() {
for {
time.Sleep(time.Minute)
stats := mc.Stats()
fmt.Printf("Cond stats: %v\n", stats)
}
}()7.6 合理设置超时机制
实践内容:为条件变量的等待设置超时机制,避免无限等待。
推荐理由:设置超时机制可以避免因条件永远不满足而导致的 Goroutine 泄漏。
示例:
go
// 实现超时等待
mu.Lock()
timeout := time.After(time.Second * 5)
for !condition {
ch := make(chan struct{})
go func() {
cond.Wait()
close(ch)
}()
select {
case <-ch:
// 条件满足,继续检查
case <-timeout:
// 超时,退出
mu.Unlock()
return errors.New("timeout waiting for condition")
}
}
// 处理逻辑
mu.Unlock()7.7 避免嵌套条件变量
实践内容:避免在一个条件变量的等待中使用另一个条件变量,减少复杂度。
推荐理由:嵌套条件变量会增加代码复杂度,容易导致死锁。
示例:
go
// 错误示例:嵌套条件变量
var mu1, mu2 sync.Mutex
cond1 := sync.NewCond(&mu1)
cond2 := sync.NewCond(&mu2)
func worker() {
mu1.Lock()
for !condition1 {
cond1.Wait()
mu2.Lock()
for !condition2 {
cond2.Wait()
}
// 处理逻辑
mu2.Unlock()
}
mu1.Unlock()
}
// 正确示例:避免嵌套
var mu sync.Mutex
cond := sync.NewCond(&mu)
func worker() {
mu.Lock()
for !condition1 || !condition2 {
cond.Wait()
}
// 处理逻辑
mu.Unlock()
}7.8 文档和注释
实践内容:为条件变量的使用添加详细的文档和注释,说明条件的含义和使用方式。
推荐理由:条件变量的使用复杂度较高,详细的文档和注释可以帮助其他开发者理解代码。
示例:
go
// Cond 用于等待服务启动完成
// 条件:ready == true
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
// WaitForServiceStart 等待服务启动完成
func WaitForServiceStart() {
mu.Lock()
defer mu.Unlock()
// 使用 for 循环检查条件,避免虚假唤醒
for !ready {
cond.Wait()
}
}
// StartService 启动服务并通知等待的 Goroutine
func StartService() {
mu.Lock()
defer mu.Unlock()
// 启动服务的逻辑
ready = true
// 唤醒所有等待的 Goroutine
cond.Broadcast()
}8. 常见问题答疑(FAQ)
8.1 条件变量和通道有什么区别?
问题描述:条件变量和通道都是 Go 语言中用于并发同步的工具,它们有什么区别?
回答内容:
- 条件变量:
- 与互斥锁配合使用,用于等待特定条件成立。
- 可以唤醒一个或所有等待的 Goroutine。
- 适用于复杂的同步场景,如生产者-消费者模式、线程池等。
- 使用复杂度较高,需要手动管理锁。
- 通道:
- 用于 Goroutine 间的通信,自动处理同步。
- 可以传递数据,也可以用作信号。
- 适用于简单的同步场景,如信号通知、任务分发等。
- 使用简单,自动管理同步。
示例代码:
go
// 使用条件变量
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string
func producer() {
mu.Lock()
data = "produced"
cond.Signal()
mu.Unlock()
}
func consumer() {
mu.Lock()
for data == "" {
cond.Wait()
}
fmt.Println(data)
data = ""
mu.Unlock()
}
// 使用通道
ch := make(chan string)
func producer() {
ch <- "produced"
}
func consumer() {
data := <-ch
fmt.Println(data)
}8.2 为什么需要使用 for 循环检查条件?
问题描述:为什么使用条件变量时需要使用 for 循环检查条件,而不是 if 语句?
回答内容:
- 虚假唤醒:操作系统或硬件可能会导致 Goroutine 从
Wait()返回,但条件并未满足,这称为虚假唤醒。 - 并发修改:当多个 Goroutine 同时等待同一个条件时,一个 Goroutine 被唤醒并修改了条件,其他被唤醒的 Goroutine 可能会发现条件已经不满足。
- 安全性:使用
for循环可以确保在这些情况下重新检查条件,避免错误地继续执行。
示例代码:
go
// 错误示例:使用 if 语句
mu.Lock()
if !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()
// 正确示例:使用 for 循环
mu.Lock()
for !condition {
cond.Wait()
}
// 处理逻辑
mu.Unlock()8.3 条件变量的 Wait 方法是如何工作的?
问题描述:条件变量的 Wait() 方法是如何工作的?
回答内容:
Wait()方法会自动释放锁,允许其他 Goroutine 获取锁。- 然后,
Wait()方法会阻塞当前 Goroutine,等待被唤醒。 - 当被唤醒时,
Wait()方法会重新获取锁,然后返回。 - 因此,在调用
Wait()之前必须持有锁,返回后仍然持有锁。
示例代码:
go
mu.Lock()
for !condition {
// 调用 Wait() 时会释放锁,被唤醒时会重新获取锁
cond.Wait()
}
// 处理逻辑
mu.Unlock()8.4 Signal 和 Broadcast 有什么区别?
问题描述:条件变量的 Signal() 和 Broadcast() 方法有什么区别?
回答内容:
- Signal():唤醒一个等待的 Goroutine,通常用于只需要一个 Goroutine 处理的场景。
- Broadcast():唤醒所有等待的 Goroutine,通常用于需要多个 Goroutine 处理的场景,或当条件的变化影响所有等待的 Goroutine 时。
示例代码:
go
// 唤醒一个 Goroutine
cond.Signal()
// 唤醒所有 Goroutine
cond.Broadcast()8.5 条件变量是否可以与 RWMutex 配合使用?
问题描述:条件变量是否可以与 RWMutex 配合使用?
回答内容:
- 是的,条件变量可以与
RWMutex配合使用。 - 当使用
RWMutex时,需要确保在调用Wait()、Signal()和Broadcast()时持有写锁,因为这些方法需要修改条件变量的内部状态。
示例代码:
go
var rwmu sync.RWMutex
cond := sync.NewCond(&rwmu)
var ready bool
func WaitForReady() {
rwmu.Lock() // 使用写锁
for !ready {
cond.Wait()
}
rwmu.Unlock()
}
func SetReady() {
rwmu.Lock() // 使用写锁
ready = true
cond.Broadcast()
rwmu.Unlock()
}8.6 如何实现条件变量的超时等待?
问题描述:如何实现条件变量的超时等待,避免无限等待?
回答内容:
- 可以结合通道和
select语句实现条件变量的超时等待。 - 创建一个通道来接收条件满足的信号,同时使用
time.After()创建一个超时通道。 - 使用
select语句等待这两个通道中的任何一个。
示例代码:
go
func WaitWithTimeout(cond *sync.Cond, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
cond.Wait()
close(ch)
}()
select {
case <-ch:
return true // 条件满足
case <-time.After(timeout):
return false // 超时
}
}
// 使用示例
mu.Lock()
timedOut := false
for !condition {
mu.Unlock()
timedOut = !WaitWithTimeout(cond, time.Second*5)
mu.Lock()
if timedOut {
break
}
}
if timedOut {
// 处理超时
} else {
// 处理条件满足
}
mu.Unlock()9. 实战练习
9.1 基础练习:生产者-消费者模式
题目:使用条件变量实现一个简单的生产者-消费者模式,生产者生成数据,消费者消费数据。
解题思路:
- 使用条件变量来通知消费者数据已生产,消费者等待数据可用。
- 使用互斥锁来保护共享数据。
- 使用
for循环检查条件,避免虚假唤醒。
常见误区:
- 忘记使用
for循环检查条件,导致虚假唤醒。 - 忘记在持有锁时调用
Signal()或Broadcast(),导致唤醒信号丢失。 - 没有正确管理锁的获取和释放,导致死锁。
分步提示:
- 定义共享数据和互斥锁。
- 创建条件变量。
- 实现生产者函数,生成数据并通知消费者。
- 实现消费者函数,等待数据可用并消费数据。
- 启动生产者和消费者 Goroutine。
- 等待所有 Goroutine 完成。
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
var data string
var wg sync.WaitGroup
// 消费者
wg.Add(1)
go func() {
defer wg.Done()
for {
mu.Lock()
for data == "" {
fmt.Println("Consumer: waiting for data")
cond.Wait()
}
fmt.Printf("Consumer: got data: %s\n", data)
data = ""
mu.Unlock()
time.Sleep(time.Millisecond * 500)
}
}()
// 生产者
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
mu.Lock()
data = fmt.Sprintf("data-%d", i)
fmt.Printf("Producer: produced %s\n", data)
cond.Signal()
mu.Unlock()
}
wg.Wait()
}9.2 进阶练习:线程池
题目:使用条件变量实现一个线程池,支持动态添加任务和关闭线程池。
解题思路:
- 使用条件变量来通知工作线程有任务可用,工作线程等待任务。
- 使用互斥锁来保护任务队列和线程池状态。
- 实现任务添加、线程池启动和关闭功能。
常见误区:
- 没有正确处理线程池的关闭逻辑,导致工作线程无法退出。
- 没有使用
for循环检查条件,导致虚假唤醒。 - 任务队列管理不当,导致任务丢失或重复处理。
分步提示:
- 定义任务结构体和线程池结构体。
- 实现线程池的启动方法,创建工作线程。
- 实现工作线程函数,等待任务并处理。
- 实现任务添加方法,将任务加入队列并通知工作线程。
- 实现线程池的关闭方法,通知所有工作线程退出。
- 测试线程池的功能。
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
}
type Pool struct {
tasks []Task
mu sync.Mutex
cond *sync.Cond
workers int
wg sync.WaitGroup
shutdown bool
}
func NewPool(workers int) *Pool {
p := &Pool{
tasks: make([]Task, 0),
workers: workers,
shutdown: false,
}
p.cond = sync.NewCond(&p.mu)
return p
}
func (p *Pool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for {
p.mu.Lock()
for len(p.tasks) == 0 && !p.shutdown {
fmt.Printf("Worker %d: waiting for tasks\n", id)
p.cond.Wait()
}
if p.shutdown {
fmt.Printf("Worker %d: shutting down\n", id)
p.mu.Unlock()
return
}
task := p.tasks[0]
p.tasks = p.tasks[1:]
p.mu.Unlock()
fmt.Printf("Worker %d: processing task %d\n", id, task.ID)
time.Sleep(time.Millisecond * 500)
}
}
func (p *Pool) AddTask(task Task) {
p.mu.Lock()
defer p.mu.Unlock()
p.tasks = append(p.tasks, task)
fmt.Printf("Added task %d\n", task.ID)
p.cond.Signal()
}
func (p *Pool) Shutdown() {
p.mu.Lock()
defer p.mu.Unlock()
p.shutdown = true
p.cond.Broadcast()
p.wg.Wait()
fmt.Println("Pool shutdown completed")
}
func main() {
pool := NewPool(3)
pool.Start()
// 添加任务
for i := 0; i < 10; i++ {
pool.AddTask(Task{ID: i})
time.Sleep(time.Millisecond * 100)
}
// 关闭线程池
time.Sleep(time.Second * 2)
pool.Shutdown()
}9.3 挑战练习:阻塞队列
题目:使用条件变量实现一个阻塞队列,支持入队、出队、大小查询等操作,当队列为空时阻塞出队操作,当队列满时阻塞入队操作。
解题思路:
- 使用条件变量来实现队列的阻塞操作。
- 使用互斥锁来保护队列的状态。
- 实现入队、出队、大小查询等方法。
常见误区:
- 没有正确处理队列满和队列空的情况,导致阻塞操作无法正常工作。
- 没有使用
for循环检查条件,导致虚假唤醒。 - 队列管理不当,导致数据丢失或重复处理。
分步提示:
- 定义阻塞队列结构体,包含队列、容量、互斥锁和条件变量。
- 实现入队方法,当队列满时阻塞。
- 实现出队方法,当队列空时阻塞。
- 实现大小查询、判空、判满等方法。
- 测试阻塞队列的功能。
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type BlockingQueue struct {
items []interface{}
capacity int
mu sync.Mutex
cond *sync.Cond
}
func NewBlockingQueue(capacity int) *BlockingQueue {
bq := &BlockingQueue{
items: make([]interface{}, 0, capacity),
capacity: capacity,
}
bq.cond = sync.NewCond(&bq.mu)
return bq
}
func (bq *BlockingQueue) Enqueue(item interface{}) {
bq.mu.Lock()
defer bq.mu.Unlock()
for len(bq.items) >= bq.capacity {
fmt.Println("Queue full, waiting to enqueue")
bq.cond.Wait()
}
bq.items = append(bq.items, item)
fmt.Printf("Enqueued: %v, queue size: %d\n", item, len(bq.items))
bq.cond.Signal()
}
func (bq *BlockingQueue) Dequeue() interface{} {
bq.mu.Lock()
defer bq.mu.Unlock()
for len(bq.items) == 0 {
fmt.Println("Queue empty, waiting to dequeue")
bq.cond.Wait()
}
item := bq.items[0]
bq.items = bq.items[1:]
fmt.Printf("Dequeued: %v, queue size: %d\n", item, len(bq.items))
bq.cond.Signal()
return item
}
func (bq *BlockingQueue) Size() int {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items)
}
func (bq *BlockingQueue) IsEmpty() bool {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items) == 0
}
func (bq *BlockingQueue) IsFull() bool {
bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.items) >= bq.capacity
}
func main() {
bq := NewBlockingQueue(5)
var wg sync.WaitGroup
// 启动 2 个生产者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
item := fmt.Sprintf("item-%d-%d", id, j)
bq.Enqueue(item)
time.Sleep(time.Millisecond * 100)
}
}(i)
}
// 启动 2 个消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 10; j++ {
item := bq.Dequeue()
fmt.Printf("Consumer %d got: %v\n", id, item)
time.Sleep(time.Millisecond * 200)
}
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}10. 知识点总结
10.1 核心要点
- 条件变量的特点:与互斥锁配合使用,用于等待特定条件成立,支持唤醒一个或所有等待的 Goroutine。
- 基本操作:
Wait():等待条件满足,自动释放和重新获取锁。Signal():唤醒一个等待的 Goroutine。Broadcast():唤醒所有等待的 Goroutine。
- 实现原理:使用等待队列存储等待的 Goroutine,通过信号机制唤醒 Goroutine。
- 内存模型:确保操作的顺序性,保证并发操作的正确性。
- 使用场景:生产者-消费者模式、线程池、阻塞队列、分布式锁等。
- 虚假唤醒:需要使用
for循环检查条件,避免虚假唤醒。
10.2 易错点回顾
- 忘记获取锁:在调用
Wait()、Signal()或Broadcast()时没有获取锁。 - 使用 if 语句检查条件:使用
if语句检查条件,而不是for循环,无法处理虚假唤醒。 - 复制条件变量:条件变量是结构体,不是引用类型,复制后会创建一个新的实例。
- 死锁:多个 Goroutine 循环等待对方释放锁,或条件变量的使用不当导致 Goroutine 永远等待。
- 唤醒丢失:在条件满足后、唤醒之前,没有持有锁,导致唤醒信号丢失。
- 过度使用条件变量:对于简单的并发场景,过度使用条件变量而不是使用通道。
11. 拓展参考资料
11.1 官方文档链接
11.2 进阶学习路径建议
- 并发编程基础:学习 Goroutine、Channel、WaitGroup 等基本概念。
- 同步原语:深入学习 Mutex、RWMutex、Once、Cond 等同步原语。
- 并发模式:学习生产者-消费者、工作池、扇入扇出等并发模式。
- 性能优化:学习如何减少锁竞争、使用无锁数据结构、优化并发性能。
- 分布式并发:学习分布式系统中的并发控制和一致性协议。
- 并发测试:学习如何编写并发测试,验证系统的并发性能和正确性。
11.3 相关资源
- 《Go 并发编程实战》
- 《Go 语言程序设计》
- Go by Example - Cond
- Go 语言并发编程
- The Go Memory Model
