Appearance
同步原语 sync
1. 概述
sync 包是 Go 语言中提供同步原语的核心包,包含了一系列用于并发编程的同步工具。这些同步原语是构建线程安全程序的基础,能够帮助开发者解决并发访问共享资源时的竞态条件问题。
在整个 Go 语言课程体系中,sync 包是并发编程的重要组成部分,与 Goroutine、通道一起构成了 Go 语言并发模型的核心。掌握 sync 包中的同步原语,对于构建可靠、高效的并发系统至关重要。
2. 基本概念
2.1 语法
2.1.1 基本用法
go
import "sync"
// 创建互斥锁
var mu sync.Mutex
// 加锁
mu.Lock()
// 访问共享资源
// ...
// 解锁
mu.Unlock()
// 创建读写锁
var rwmu sync.RWMutex
// 读锁定
rwmu.RLock()
// 读取共享资源
// ...
// 读解锁
rwmu.RUnlock()
// 写锁定
rwmu.Lock()
// 修改共享资源
// ...
// 写解锁
rwmu.Unlock()
// 创建 WaitGroup
var wg sync.WaitGroup
// 增加计数
wg.Add(1)
// 启动 Goroutine
// ...
// 减少计数
wg.Done()
// 等待完成
wg.Wait()
// 创建 Once
var once sync.Once
// 执行只执行一次的操作
once.Do(func() {
// 初始化操作
})
// 创建 Cond
var cond = sync.NewCond(&sync.Mutex{})
// 等待条件
cond.L.Lock()
for !condition {
cond.Wait()
}
// 访问共享资源
cond.L.Unlock()
// 通知等待的 Goroutine
cond.Signal() // 通知一个
cond.Broadcast() // 通知所有2.1.2 示例代码
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// 示例 1: Mutex
var mu sync.Mutex
counter := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
counter++
}()
}
wg.Wait()
fmt.Printf("Mutex example: counter = %d\n", counter)
// 示例 2: RWMutex
var rwmu sync.RWMutex
data := "initial"
// 多个读操作
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
rwmu.RLock()
defer rwmu.RUnlock()
fmt.Printf("Reader %d: data = %s\n", id, data)
time.Sleep(time.Millisecond * 100)
}(i)
}
// 一个写操作
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 50) // 确保读操作先开始
rwmu.Lock()
defer rwmu.Unlock()
data = "updated"
fmt.Println("Writer: data updated")
}()
wg.Wait()
fmt.Printf("RWMutex example: final data = %s\n", data)
// 示例 3: Once
var once sync.Once
initialized := false
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
once.Do(func() {
initialized = true
fmt.Println("Once: initialized")
})
fmt.Printf("Goroutine %d: initialized = %v\n", id, initialized)
}(i)
}
wg.Wait()
fmt.Println("All examples completed")
}2.2 语义
- Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
- RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
- WaitGroup:等待一组 Goroutine 完成,用于协调多个 Goroutine 的执行。
- Once:保证某个操作只执行一次,用于初始化等场景。
- Cond:条件变量,用于等待或宣布某个条件的发生。
- Pool:对象池,用于存储和复用临时对象,减少内存分配。
- Map:并发安全的映射,提供了线程安全的键值对存储。
2.3 规范
- 命名规范:同步原语变量通常使用简短的名称,如
mu、rwmu、wg、once等。 - 使用顺序:
- 对于 Mutex 和 RWMutex,总是先加锁,后访问共享资源,最后解锁。
- 对于 WaitGroup,先调用
Add(),然后启动 Goroutine,在 Goroutine 中调用Done(),最后调用Wait()。 - 对于 Once,只需要调用
Do()方法。 - 对于 Cond,先获取锁,然后检查条件,等待条件满足,最后解锁。
- 错误处理:同步原语本身不返回错误,但需要确保正确使用,避免死锁、活锁等问题。
- 性能考虑:根据场景选择合适的同步原语,如读多写少的场景使用 RWMutex,需要等待多个 Goroutine 完成时使用 WaitGroup。
3. 原理深度解析
3.1 Mutex 原理
Mutex 是最基本的同步原语,用于保护共享资源。其底层实现使用了 CAS(Compare-And-Swap)操作和信号量:
- 状态标记:Mutex 内部有一个状态标记,用于表示锁是否被持有。
- CAS 操作:尝试通过 CAS 操作获取锁,如果成功则设置状态为锁定。
- 信号量:如果锁被占用,等待的 Goroutine 会被放入等待队列,通过信号量唤醒。
- 自旋锁:在尝试获取锁时,会先进行短暂的自旋,减少上下文切换的开销。
3.2 RWMutex 原理
RWMutex 是读写锁,允许多个读操作同时进行,提高并发性能:
- 状态标记:RWMutex 内部维护了读计数器和写锁状态。
- 读锁定:读锁定时,检查是否有写锁,如果没有则增加读计数器。
- 写锁定:写锁定时,检查是否有读锁或写锁,如果有则等待。
- 优先级:写操作通常比读操作有更高的优先级,避免写饥饿。
3.3 WaitGroup 原理
WaitGroup 用于等待一组 Goroutine 完成:
- 计数器:内部维护一个计数器,初始值为 0。
- Add 方法:增加计数器的值。
- Done 方法:减少计数器的值,当计数器变为 0 时,唤醒所有等待的 Goroutine。
- Wait 方法:阻塞当前 Goroutine,直到计数器变为 0。
3.4 Once 原理
Once 保证某个操作只执行一次:
- 状态标记:内部维护一个
done标记,表示操作是否已经执行。 - 双重检查锁定:使用双重检查锁定模式,先检查
done标记,再获取锁,提高性能。 - 原子操作:使用原子操作检查和设置
done标记,确保并发安全。
3.5 Cond 原理
Cond 用于等待或宣布某个条件的发生:
- 锁关联:Cond 关联一个 Mutex,用于保护条件变量。
- 等待队列:维护一个等待队列,存储等待条件的 Goroutine。
- Wait 方法:释放锁,将 Goroutine 放入等待队列,等待被唤醒。
- Signal/Broadcast 方法:唤醒等待队列中的 Goroutine。
3.6 Pool 原理
Pool 用于存储和复用临时对象:
- 本地缓存:每个 Goroutine 有本地缓存,减少锁竞争。
- 共享缓存:多个 Goroutine 共享一个全局缓存。
- 垃圾回收:当垃圾回收发生时,Pool 会清空缓存。
3.7 Map 原理
Map 是并发安全的映射:
- 分段锁:内部使用多个分段,每个分段有自己的锁,减少锁竞争。
- 读写分离:读操作不需要加锁,写操作需要加锁。
- 惰性删除:删除操作只是标记为删除,实际删除在后续操作中进行。
4. 常见错误与踩坑点
4.1 死锁
错误表现:程序卡住,无法继续执行。
产生原因:
- 同一 Goroutine 多次获取同一把锁。
- 多个 Goroutine 循环等待对方释放锁。
- 锁的顺序不一致,导致循环等待。
解决方案:
- 确保每个锁都能正确释放,使用
defer语句。 - 保持一致的锁获取顺序。
- 避免在持有锁时调用可能阻塞的函数。
go
// 错误示例:循环等待
var mu1, mu2 sync.Mutex
func goroutine1() {
mu1.Lock()
defer mu1.Unlock()
time.Sleep(time.Millisecond)
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 1 completed")
}
func goroutine2() {
mu2.Lock()
defer mu2.Unlock()
time.Sleep(time.Millisecond)
mu1.Lock()
defer mu1.Unlock()
fmt.Println("Goroutine 2 completed")
}
// 正确示例:一致的锁顺序
func goroutine1() {
mu1.Lock()
defer mu1.Unlock()
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 1 completed")
}
func goroutine2() {
mu1.Lock()
defer mu1.Unlock()
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 2 completed")
}4.2 忘记解锁
错误表现:其他 Goroutine 无法获取锁,导致死锁。
产生原因:在获取锁后,没有对应的解锁操作,或者在解锁前发生了 panic。
解决方案:始终使用 defer 语句来确保解锁,即使发生 panic 也能正确解锁。
go
// 错误示例:忘记解锁
func process() {
mu.Lock()
// 处理逻辑
// 忘记调用 mu.Unlock()
}
// 正确示例:使用 defer 解锁
func process() {
mu.Lock()
defer mu.Unlock()
// 处理逻辑
}4.3 读写锁使用不当
错误表现:读操作和写操作之间的竞争,或者性能问题。
产生原因:
- 在读锁定期间修改共享资源。
- 写操作过于频繁,导致读操作饥饿。
- 不必要地使用读写锁,增加复杂性。
解决方案:
- 确保读锁定期间只读取共享资源,不修改。
- 合理设计读写比例,避免写操作过于频繁。
- 根据实际场景选择合适的锁类型。
go
// 错误示例:读锁定期间修改共享资源
func readAndModify() {
rwmu.RLock()
defer rwmu.RUnlock()
data = "modified" // 错误:读锁定期间修改资源
}
// 正确示例:修改资源时使用写锁定
func readAndModify() {
rwmu.Lock()
defer rwmu.Unlock()
data = "modified" // 正确:写锁定期间修改资源
}4.4 WaitGroup 使用不当
错误表现:Wait() 永远不会返回,或者 panic。
产生原因:
Add()的调用次数与Done()不匹配。- 在
Wait()之后调用Add()。 - 复制 WaitGroup 导致状态不同步。
解决方案:
- 确保
Add()的调用次数与Done()完全匹配。 - 在
Wait()之前完成所有Add()调用。 - 通过指针传递 WaitGroup,避免复制。
go
// 错误示例:Add 和 Done 不匹配
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("Goroutine 1")
}()
// 忘记启动第二个 Goroutine
wg.Wait() // 永远不会返回
}
// 正确示例:Add 和 Done 匹配
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("Goroutine 1")
}()
go func() {
defer wg.Done()
fmt.Println("Goroutine 2")
}()
wg.Wait() // 正常返回
}4.5 Once 使用不当
错误表现:初始化操作没有执行,或者执行多次。
产生原因:
- 传递给
Do()的函数发生 panic,导致 Once 认为操作已执行。 - 复制 Once 导致状态不同步。
- 在
Do()中调用同一个 Once 的Do()方法,导致死锁。
解决方案:
- 在
Do()函数中处理 panic,确保初始化操作能够正确完成。 - 通过指针传递 Once,避免复制。
- 避免在
Do()中调用同一个 Once 的Do()方法。
go
// 错误示例:Do 函数发生 panic
func main() {
var once sync.Once
once.Do(func() {
panic("Initialization failed")
})
// 这里不会再次执行初始化
once.Do(func() {
fmt.Println("This won't be executed")
})
}
// 正确示例:处理 panic
func main() {
var once sync.Once
once.Do(func() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic: %v\n", r)
}
}()
panic("Initialization failed")
})
}4.6 Cond 使用不当
错误表现:等待的 Goroutine 永远不会被唤醒,或者出现虚假唤醒。
产生原因:
- 没有在循环中检查条件,导致虚假唤醒。
- 没有正确获取和释放锁。
- 信号丢失,即先发送信号后等待。
解决方案:
- 始终在循环中检查条件,避免虚假唤醒。
- 确保在调用
Wait()前获取锁,在调用Signal()或Broadcast()前持有锁。 - 确保先等待后发送信号。
go
// 错误示例:没有在循环中检查条件
func waitForCondition() {
cond.L.Lock()
if !condition {
cond.Wait() // 错误:可能出现虚假唤醒
}
// 访问共享资源
cond.L.Unlock()
}
// 正确示例:在循环中检查条件
func waitForCondition() {
cond.L.Lock()
for !condition {
cond.Wait() // 正确:处理虚假唤醒
}
// 访问共享资源
cond.L.Unlock()
}5. 常见应用场景
5.1 保护共享资源
场景描述:多个 Goroutine 需要访问和修改同一个共享资源,如计数器、配置等。
使用方法:使用 Mutex 或 RWMutex 保护共享资源。
示例代码:
go
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
counter := 0
var wg sync.WaitGroup
// 1000 个 Goroutine 同时递增计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
counter++
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter) // 应该输出 1000
}5.2 读写分离
场景描述:读操作远多于写操作的场景,如配置读取、缓存访问等。
使用方法:使用 RWMutex 允许多个读操作同时进行,提高并发性能。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var rwmu sync.RWMutex
config := "default"
var wg sync.WaitGroup
// 启动 10 个读 Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
rwmu.RLock()
defer rwmu.RUnlock()
fmt.Printf("Reader %d: config = %s\n", id, config)
time.Sleep(time.Millisecond * 50) // 模拟读操作
}(i)
}
// 启动 2 个写 Goroutine
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Millisecond * 10) // 确保读操作先开始
rwmu.Lock()
defer rwmu.Unlock()
newConfig := fmt.Sprintf("config-%d", id)
config = newConfig
fmt.Printf("Writer %d: updated config to %s\n", id, newConfig)
time.Sleep(time.Millisecond * 100) // 模拟写操作
}(i)
}
wg.Wait()
fmt.Printf("Final config: %s\n", config)
}5.3 等待多个任务完成
场景描述:需要等待多个 Goroutine 完成任务后再继续执行,如并发下载、并行处理等。
使用方法:使用 WaitGroup 等待所有 Goroutine 完成。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func downloadFile(url string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Downloading %s\n", url)
time.Sleep(time.Second) // 模拟下载时间
fmt.Printf("Downloaded %s\n", url)
}
func main() {
var wg sync.WaitGroup
urls := []string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
}
for _, url := range urls {
wg.Add(1)
go downloadFile(url, &wg)
}
fmt.Println("Waiting for all downloads to complete...")
wg.Wait()
fmt.Println("All downloads completed")
}5.4 单例模式
场景描述:需要创建一个全局唯一的实例,确保只初始化一次。
使用方法:使用 Once 保证初始化操作只执行一次。
示例代码:
go
package main
import (
"fmt"
"sync"
)
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "initialized"}
fmt.Println("Singleton initialized")
})
return instance
}
func main() {
var wg sync.WaitGroup
// 多个 Goroutine 同时获取单例实例
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
s := GetInstance()
fmt.Printf("Goroutine %d: %s\n", id, s.data)
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}5.5 条件等待
场景描述:需要等待某个条件满足后再执行操作,如生产者-消费者模式、资源就绪等。
使用方法:使用 Cond 等待条件满足。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
var wg sync.WaitGroup
// 消费者 Goroutine
wg.Add(1)
go func() {
defer wg.Done()
cond.L.Lock()
for !ready {
fmt.Println("Consumer: waiting for ready")
cond.Wait()
}
fmt.Println("Consumer: ready, processing")
cond.L.Unlock()
}()
// 生产者 Goroutine
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Second) // 模拟准备时间
cond.L.Lock()
ready = true
fmt.Println("Producer: ready, notifying")
cond.Signal() // 通知消费者
cond.L.Unlock()
}()
wg.Wait()
fmt.Println("All goroutines completed")
}6. 企业级进阶应用场景
6.1 并发安全的配置管理
场景描述:在大型应用中,需要管理全局配置,支持并发读写和热更新。
使用方法:使用 RWMutex 保护配置,允许多个读操作同时进行,写操作独占。
示例代码:
go
package config
import (
"sync"
)
type Config struct {
Server struct {
Host string
Port int
}
Database struct {
DSN string
}
// 其他配置项
}
var (
config Config
rwmu sync.RWMutex
)
func GetConfig() Config {
rwmu.RLock()
defer rwmu.RUnlock()
return config
}
func UpdateConfig(newConfig Config) {
rwmu.Lock()
defer rwmu.Unlock()
config = newConfig
}
// 热更新配置
func ReloadConfig() error {
// 从文件或环境变量加载配置
newConfig, err := loadConfig()
if err != nil {
return err
}
UpdateConfig(newConfig)
return nil
}6.2 并发安全的缓存
场景描述:在高并发系统中,需要一个线程安全的缓存,支持并发读写。
使用方法:使用 sync.Map 或 RWMutex 保护的 map 实现缓存。
示例代码:
go
package cache
import (
"sync"
"time"
)
type Item struct {
Value interface{}
Expiration int64
}
type Cache struct {
items map[string]Item
mu sync.RWMutex
}
func NewCache() *Cache {
c := &Cache{
items: make(map[string]Item),
}
// 启动清理过期项的 Goroutine
go c.cleanup()
return c
}
func (c *Cache) Set(key string, value interface{}, expiration time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
var exp int64
if expiration > 0 {
exp = time.Now().Add(expiration).UnixNano()
}
c.items[key] = Item{
Value: value,
Expiration: exp,
}
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
item, found := c.items[key]
if !found {
c.mu.RUnlock()
return nil, false
}
if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
c.mu.RUnlock()
// 异步删除过期项
go c.Delete(key)
return nil, false
}
c.mu.RUnlock()
return item.Value, true
}
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}
func (c *Cache) cleanup() {
for {
time.Sleep(time.Minute)
c.mu.Lock()
now := time.Now().UnixNano()
for k, v := range c.items {
if v.Expiration > 0 && now > v.Expiration {
delete(c.items, k)
}
}
c.mu.Unlock()
}
}6.3 工作池
场景描述:在高并发系统中,需要限制并发数量,避免系统资源耗尽。
使用方法:使用 WaitGroup 和通道实现工作池,控制并发数量。
示例代码:
go
package worker
import (
"sync"
)
type WorkerPool struct {
tasks chan Task
wg sync.WaitGroup
size int
}
type Task func()
func NewWorkerPool(size int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task),
size: size,
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.size; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for task := range p.tasks {
task()
}
}()
}
}
func (p *WorkerPool) Submit(task Task) {
p.tasks <- task
}
func (p *WorkerPool) Close() {
close(p.tasks)
p.wg.Wait()
}
func main() {
pool := NewWorkerPool(5) // 5 个工作协程
pool.Start()
defer pool.Close()
// 提交 20 个任务
for i := 0; i < 20; i++ {
taskID := i
pool.Submit(func() {
println("Processing task", taskID)
})
}
}6.4 并发安全的计数器
场景描述:在分布式系统中,需要一个高并发的计数器,支持原子操作。
使用方法:使用 sync/atomic 包提供的原子操作,或者使用 Mutex 保护计数器。
示例代码:
go
package counter
import (
"sync/atomic"
)
type AtomicCounter struct {
value int64
}
func NewAtomicCounter() *AtomicCounter {
return &AtomicCounter{value: 0}
}
func (c *AtomicCounter) Increment() int64 {
return atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Decrement() int64 {
return atomic.AddInt64(&c.value, -1)
}
func (c *AtomicCounter) Get() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *AtomicCounter) Set(value int64) {
atomic.StoreInt64(&c.value, value)
}6.5 并发安全的队列
场景描述:在生产者-消费者模式中,需要一个线程安全的队列,支持并发入队和出队。
使用方法:使用 Mutex 和 Cond 实现线程安全的队列。
示例代码:
go
package queue
import (
"sync"
)
type Queue struct {
items []interface{}
mu sync.Mutex
cond *sync.Cond
size int
}
func NewQueue(size int) *Queue {
q := &Queue{
items: make([]interface{}, 0, size),
size: size,
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) >= q.size {
q.cond.Wait() // 队列满,等待
}
q.items = append(q.items, item)
q.cond.Broadcast() // 通知等待的消费者
}
func (q *Queue) Dequeue() interface{} {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
q.cond.Wait() // 队列空,等待
}
item := q.items[0]
q.items = q.items[1:]
q.cond.Broadcast() // 通知等待的生产者
return item
}
func (q *Queue) Size() int {
q.mu.Lock()
defer q.mu.Unlock()
return len(q.items)
}7. 行业最佳实践
7.1 选择合适的同步原语
实践内容:根据具体场景选择合适的同步原语。
推荐理由:不同的同步原语有不同的适用场景,选择合适的同步原语可以提高性能和代码可读性。
示例:
- 读多写少的场景使用 RWMutex。
- 需要等待多个 Goroutine 完成时使用 WaitGroup。
- 需要保证操作只执行一次时使用 Once。
- 需要等待条件满足时使用 Cond。
7.2 始终使用 defer 解锁
实践内容:使用 defer 语句确保锁的释放。
推荐理由:defer 语句可以确保即使发生 panic,锁也能正确释放,避免死锁。
示例:
go
func process() {
mu.Lock()
defer mu.Unlock()
// 处理逻辑
}7.3 避免长时间持有锁
实践内容:尽量减少持有锁的时间,只在必要时加锁。
推荐理由:长时间持有锁会降低并发性能,增加死锁的风险。
示例:
go
// 错误示例:长时间持有锁
func process() {
mu.Lock()
defer mu.Unlock()
// 执行耗时操作
time.Sleep(time.Second)
// 修改共享资源
sharedResource = newValue
}
// 正确示例:只在必要时加锁
func process() {
// 执行耗时操作
time.Sleep(time.Second)
// 只在修改共享资源时加锁
mu.Lock()
sharedResource = newValue
mu.Unlock()
}7.4 使用原子操作代替锁
实践内容:对于简单的计数器、标志位等,使用原子操作代替锁。
推荐理由:原子操作比锁更轻量,性能更高。
示例:
go
// 使用原子操作
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
// 而不是使用锁
var (
mu sync.Mutex
counter int
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}7.5 保持锁的粒度最小化
实践内容:只对需要保护的共享资源加锁,避免对整个函数加锁。
推荐理由:最小化锁的粒度可以提高并发性能,减少锁竞争。
示例:
go
// 错误示例:对整个函数加锁
func process() {
mu.Lock()
defer mu.Unlock()
// 执行不需要锁的操作
fmt.Println("Processing...")
// 修改共享资源
sharedResource = newValue
}
// 正确示例:只对共享资源加锁
func process() {
// 执行不需要锁的操作
fmt.Println("Processing...")
// 只对共享资源加锁
mu.Lock()
sharedResource = newValue
mu.Unlock()
}7.6 避免嵌套锁
实践内容:避免在持有一个锁的同时获取另一个锁,减少死锁的风险。
推荐理由:嵌套锁容易导致死锁,尤其是当多个 Goroutine 以不同的顺序获取锁时。
示例:
go
// 错误示例:嵌套锁
func process() {
mu1.Lock()
defer mu1.Unlock()
// ...
mu2.Lock()
defer mu2.Unlock()
// ...
}
// 正确示例:避免嵌套锁,或者保持一致的锁顺序
func process() {
// 先获取 mu1,再获取 mu2,保持一致的顺序
mu1.Lock()
defer mu1.Unlock()
// ...
mu2.Lock()
defer mu2.Unlock()
// ...
}7.7 使用 sync.Map 处理并发映射
实践内容:对于需要并发访问的映射,使用 sync.Map 代替 map+Mutex。
推荐理由:sync.Map 是为并发场景优化的,性能比 map+Mutex 更好。
示例:
go
// 使用 sync.Map
var m sync.Map
func store(key, value interface{}) {
m.Store(key, value)
}
func load(key interface{}) (interface{}, bool) {
return m.Load(key)
}
// 而不是使用 map+Mutex
var (
mu sync.Mutex
m = make(map[string]interface{})
)
func store(key string, value interface{}) {
mu.Lock()
defer mu.Unlock()
m[key] = value
}
func load(key string) (interface{}, bool) {
mu.RLock()
defer mu.RUnlock()
v, ok := m[key]
return v, ok
}7.8 定期检查死锁和竞态条件
实践内容:使用 Go 的竞态检测工具(go run -race 或 go test -race)定期检查代码中的竞态条件。
推荐理由:竞态检测工具可以帮助发现代码中的潜在问题,提高代码的可靠性。
示例:
bash
# 运行带竞态检测的程序
go run -race main.go
# 运行带竞态检测的测试
go test -race ./...8. 常见问题答疑(FAQ)
8.1 Mutex 和 RWMutex 有什么区别?
问题描述:Mutex 和 RWMutex 都是同步原语,它们有什么区别?
回答内容:
- Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
- RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
- 使用场景:
- 当读写比例相近时,使用 Mutex。
- 当读操作远多于写操作时,使用 RWMutex 可以提高并发性能。
示例代码:
go
// 使用 Mutex
var mu sync.Mutex
// 使用 RWMutex
var rwmu sync.RWMutex8.2 如何选择合适的同步原语?
问题描述:在不同的场景中,如何选择合适的同步原语?
回答内容:
- 保护共享资源:使用 Mutex 或 RWMutex。
- 等待多个 Goroutine 完成:使用 WaitGroup。
- 保证操作只执行一次:使用 Once。
- 等待条件满足:使用 Cond。
- 存储和复用临时对象:使用 Pool。
- 并发安全的映射:使用 Map。
- 简单的计数器、标志位:使用 atomic 包。
示例代码:
go
// 保护共享资源
var mu sync.Mutex
// 等待多个 Goroutine 完成
var wg sync.WaitGroup
// 保证操作只执行一次
var once sync.Once
// 等待条件满足
var cond = sync.NewCond(&sync.Mutex{})
// 存储和复用临时对象
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
// 并发安全的映射
var m sync.Map
// 原子操作
var counter int648.3 什么是死锁?如何避免死锁?
问题描述:什么是死锁?如何避免死锁?
回答内容:
- 死锁:多个 Goroutine 互相等待对方释放锁,导致程序无法继续执行。
- 避免死锁的方法:
- 始终使用
defer语句确保锁的释放。 - 保持一致的锁获取顺序。
- 避免在持有锁时调用可能阻塞的函数。
- 避免嵌套锁,或者确保以相同的顺序获取锁。
- 使用超时机制,避免无限等待。
- 始终使用
示例代码:
go
// 避免死锁:保持一致的锁顺序
func process() {
// 始终先获取 mu1,再获取 mu2
mu1.Lock()
defer mu1.Unlock()
mu2.Lock()
defer mu2.Unlock()
// 处理逻辑
}8.4 什么是竞态条件?如何避免竞态条件?
问题描述:什么是竞态条件?如何避免竞态条件?
回答内容:
- 竞态条件:多个 Goroutine 并发访问和修改共享资源,导致结果不确定。
- 避免竞态条件的方法:
- 使用同步原语(Mutex、RWMutex 等)保护共享资源。
- 使用原子操作(atomic 包)处理简单的计数器、标志位等。
- 使用通道(channel)进行 Goroutine 间的通信,避免共享状态。
- 使用 sync.Map 等并发安全的数据结构。
- 使用 Go 的竞态检测工具(
go run -race)检测潜在的竞态条件。
示例代码:
go
// 使用 Mutex 避免竞态条件
var mu sync.Mutex
var counter int
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
// 使用原子操作避免竞态条件
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}8.5 Once 和 init() 函数有什么区别?
问题描述:Once 和 init() 函数都可以用于初始化,它们有什么区别?
回答内容:
- init() 函数:在包被导入时自动执行,不能控制执行时机,会增加程序启动时间。
- Once:在首次调用 Do 方法时执行,可以控制执行时机,支持延迟初始化。
- 使用场景:
- 当需要在包导入时就初始化时,使用 init() 函数。
- 当需要延迟初始化或控制初始化时机时,使用 Once。
示例代码:
go
// 使用 init() 函数
func init() {
fmt.Println("Package initialized")
}
// 使用 Once
var once sync.Once
func Init() {
once.Do(func() {
fmt.Println("Initialized on first call")
})
}8.6 如何使用 Cond 实现生产者-消费者模式?
问题描述:如何使用 Cond 实现生产者-消费者模式?
回答内容:
- 使用 Cond 等待队列满或空的条件。
- 生产者在队列满时等待,在生产后通知消费者。
- 消费者在队列空时等待,在消费后通知生产者。
示例代码:
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
cond := sync.NewCond(&mu)
queue := make([]int, 0, 5)
var wg sync.WaitGroup
// 生产者
wg.Add(2)
for i := 0; i < 2; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
mu.Lock()
for len(queue) >= 5 {
fmt.Printf("Producer %d: queue full, waiting\n", id)
cond.Wait()
}
item := id*10 + j
queue = append(queue, item)
fmt.Printf("Producer %d: produced %d\n", id, item)
cond.Broadcast()
mu.Unlock()
time.Sleep(time.Millisecond * 100)
}
}(i)
}
// 消费者
wg.Add(3)
for i := 0; i < 3; i++ {
go func(id int) {
defer wg.Done()
for j := 0; j < 3; j++ {
mu.Lock()
for len(queue) == 0 {
fmt.Printf("Consumer %d: queue empty, waiting\n", id)
cond.Wait()
}
item := queue[0]
queue = queue[1:]
fmt.Printf("Consumer %d: consumed %d\n", id, item)
cond.Broadcast()
mu.Unlock()
time.Sleep(time.Millisecond * 150)
}
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}9. 实战练习
9.1 基础练习:并发安全的计数器
题目:使用 Mutex 和 atomic 包分别实现一个并发安全的计数器,比较两者的性能。
解题思路:
- 使用 Mutex 保护计数器变量。
- 使用 atomic 包的原子操作实现计数器。
- 启动多个 Goroutine 同时递增计数器,比较两者的性能。
常见误区:
- 忘记使用
defer解锁,导致死锁。 - 对原子操作的理解不正确,导致竞态条件。
分步提示:
- 实现使用 Mutex 的计数器。
- 实现使用 atomic 包的计数器。
- 启动 1000 个 Goroutine 同时递增计数器。
- 比较两种实现的性能。
参考代码:
go
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func main() {
// 使用 Mutex
var mu sync.Mutex
var mutexCounter int
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
mu.Lock()
mutexCounter++
mu.Unlock()
}
}()
}
wg.Wait()
mutexTime := time.Since(start)
fmt.Printf("Mutex counter: %d, time: %v\n", mutexCounter, mutexTime)
// 使用 atomic
var atomicCounter int64
start = time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 1000; j++ {
atomic.AddInt64(&atomicCounter, 1)
}
}()
}
wg.Wait()
atomicTime := time.Since(start)
fmt.Printf("Atomic counter: %d, time: %v\n", atomicCounter, atomicTime)
fmt.Printf("Atomic is %.2f times faster than Mutex\n", float64(mutexTime)/float64(atomicTime))
}9.2 进阶练习:并发安全的缓存
题目:使用 RWMutex 实现一个并发安全的缓存,支持设置、获取和删除操作。
解题思路:
- 使用 RWMutex 保护缓存映射。
- 实现 Set、Get 和 Delete 方法。
- 测试多个 Goroutine 同时访问缓存的性能。
常见误区:
- 在读操作时使用写锁定,导致性能下降。
- 没有正确处理并发访问,导致竞态条件。
分步提示:
- 定义缓存结构体,包含映射和 RWMutex。
- 实现 Set 方法,使用写锁定。
- 实现 Get 方法,使用读锁定。
- 实现 Delete 方法,使用写锁定。
- 测试多个 Goroutine 同时访问缓存。
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
items map[string]interface{}
mu sync.RWMutex
}
func NewCache() *Cache {
return &Cache{
items: make(map[string]interface{}),
}
}
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.items[key] = value
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.items[key]
return value, ok
}
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.items, key)
}
func main() {
cache := NewCache()
var wg sync.WaitGroup
// 启动 10 个写 Goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
key := fmt.Sprintf("key-%d-%d", id, j)
value := fmt.Sprintf("value-%d-%d", id, j)
cache.Set(key, value)
time.Sleep(time.Millisecond)
}
}(i)
}
// 启动 100 个读 Goroutine
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 100; j++ {
key := fmt.Sprintf("key-%d-%d", id%10, j)
_, _ = cache.Get(key)
time.Sleep(time.Millisecond)
}
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
fmt.Printf("Cache size: %d\n", len(cache.items))
}9.3 挑战练习:生产者-消费者模式
题目:使用 Cond 实现一个生产者-消费者模式,支持多个生产者和多个消费者。
解题思路:
- 使用 Cond 等待队列满或空的条件。
- 实现生产者,在队列满时等待,在生产后通知消费者。
- 实现消费者,在队列空时等待,在消费后通知生产者。
- 测试多个生产者和多个消费者的并发场景。
常见误区:
- 没有在循环中检查条件,导致虚假唤醒。
- 没有正确获取和释放锁,导致死锁。
- 信号丢失,即先发送信号后等待。
分步提示:
- 定义队列结构体,包含切片、Mutex 和 Cond。
- 实现 Enqueue 方法,在队列满时等待,在生产后通知消费者。
- 实现 Dequeue 方法,在队列空时等待,在消费后通知生产者。
- 启动多个生产者和多个消费者。
- 测试并发场景下的正确性。
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
mu sync.Mutex
cond *sync.Cond
size int
}
func NewQueue(size int) *Queue {
q := &Queue{
items: make([]int, 0, size),
size: size,
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(item int) {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) >= q.size {
fmt.Println("Queue full, waiting to enqueue")
q.cond.Wait()
}
q.items = append(q.items, item)
fmt.Printf("Enqueued: %d, queue size: %d\n", item, len(q.items))
q.cond.Broadcast()
}
func (q *Queue) Dequeue() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
fmt.Println("Queue empty, waiting to dequeue")
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("Dequeued: %d, queue size: %d\n", item, len(q.items))
q.cond.Broadcast()
return item
}
func main() {
queue := NewQueue(5)
var wg sync.WaitGroup
// 启动 3 个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
item := id*10 + j
queue.Enqueue(item)
time.Sleep(time.Millisecond * 100)
}
}(i)
}
// 启动 2 个消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 7; j++ {
item := queue.Dequeue()
fmt.Printf("Consumer %d: processed %d\n", id, item)
time.Sleep(time.Millisecond * 150)
}
}(i)
}
wg.Wait()
fmt.Println("All goroutines completed")
}10. 知识点总结
10.1 核心要点
- sync 包:Go 语言中提供同步原语的核心包,包含 Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map 等。
- Mutex:互斥锁,确保同一时刻只有一个 Goroutine 可以访问共享资源。
- RWMutex:读写锁,允许多个读操作同时进行,但写操作会阻塞所有读写操作。
- WaitGroup:等待一组 Goroutine 完成,用于协调多个 Goroutine 的执行。
- Once:保证某个操作只执行一次,用于初始化等场景。
- Cond:条件变量,用于等待或宣布某个条件的发生。
- Pool:对象池,用于存储和复用临时对象,减少内存分配。
- Map:并发安全的映射,提供了线程安全的键值对存储。
- 原子操作:sync/atomic 包提供的原子操作,用于简单的计数器、标志位等。
10.2 易错点回顾
- 死锁:多个 Goroutine 互相等待对方释放锁,导致程序无法继续执行。
- 忘记解锁:在获取锁后,没有对应的解锁操作,导致其他 Goroutine 无法获取锁。
- 读写锁使用不当:在读锁定期间修改共享资源,或者写操作过于频繁导致读操作饥饿。
- WaitGroup 使用不当:Add() 的调用次数与 Done() 不匹配,或者在 Wait() 之后调用 Add()。
- Once 使用不当:传递给 Do() 的函数发生 panic,导致 Once 认为操作已执行。
- Cond 使用不当:没有在循环中检查条件,导致虚假唤醒,或者没有正确获取和释放锁。
- 锁粒度太大:长时间持有锁,降低并发性能,增加死锁的风险。
- 嵌套锁:在持有一个锁的同时获取另一个锁,增加死锁的风险。
11. 拓展参考资料
11.1 官方文档链接
11.2 进阶学习路径建议
- 并发模式:学习常见的并发模式,如生产者-消费者、工作池、扇入扇出等。
- 通道:深入学习通道的使用和通道模式,理解 CSP 并发模型。
- Context:学习 Context 的使用,实现更复杂的并发控制,如超时控制、取消操作等。
- 性能优化:学习并发性能优化技巧,如减少锁竞争、使用原子操作、优化缓存等。
- 分布式系统:学习分布式系统中的并发控制,如分布式锁、一致性协议等。
11.3 相关资源
- Go 语言实战:介绍 Go 语言的并发编程和同步原语。
- Go 并发编程实战:深入讲解 Go 语言的并发编程技术。
- Go 语言设计与实现:深入分析 Go 语言的内部实现,包括同步原语的底层原理。
- Concurrency in Go:英文书籍,详细介绍 Go 语言的并发编程。
