Skip to content

并发安全基础

1. 概述

并发安全是并发编程中的核心概念,它确保多个 goroutine 能够安全地访问共享资源,避免竞态条件和数据不一致的问题。理解并发安全的基本原理和实践方法,对于编写正确、可靠的并发程序至关重要。本章节将深入探讨并发安全的基础知识,包括其核心概念、实现机制和最佳实践,帮助开发者更好地理解和应用并发安全技术。

2. 基本概念

2.1 语法

Go 语言中与并发安全相关的核心语法:

go
// 互斥锁
import "sync"
var mu sync.Mutex

// 读写锁
var rwmu sync.RWMutex

// 原子操作
import "sync/atomic"
var counter int64

// WaitGroup
var wg sync.WaitGroup

// Once
var once sync.Once

// 通道
ch := make(chan Type)

2.2 语义

  • 并发安全:多个 goroutine 同时访问共享资源时,不会产生竞态条件或数据不一致的问题
  • 竞态条件:多个 goroutine 同时访问和修改共享资源,导致结果不确定的情况
  • 互斥锁:确保同一时间只有一个 goroutine 可以访问共享资源
  • 读写锁:允许多个 goroutine 同时读取,但同一时间只有一个 goroutine 可以写入
  • 原子操作:硬件级别的同步操作,确保操作的原子性
  • 通道:通过通信来共享内存,避免直接共享内存导致的并发问题

2.3 规范

  • 最小化共享状态:尽量减少共享状态,使用不可变数据结构
  • 正确使用同步原语:根据场景选择合适的同步原语
  • 避免死锁:注意锁的获取顺序,避免循环等待
  • 避免活锁:合理设计重试机制,避免过度竞争
  • 性能考虑:在保证并发安全的同时,考虑性能影响

3. 原理深度解析

3.1 并发安全的实现原理

并发安全的实现基于以下核心原理:

  1. 互斥访问:通过锁机制确保同一时间只有一个 goroutine 可以访问共享资源
  2. 原子操作:通过硬件指令确保操作的原子性
  3. 内存可见性:确保一个 goroutine 对内存的修改对其他 goroutine 可见
  4. 有序性:确保操作的执行顺序符合预期

3.2 同步原语的工作原理

  1. 互斥锁 (Mutex)

    • 使用互斥量(mutex)来实现互斥访问
    • 当一个 goroutine 获取锁后,其他 goroutine 必须等待
    • 解锁操作会释放锁,允许其他 goroutine 获取
  2. 读写锁 (RWMutex)

    • 允许多个 goroutine 同时读取
    • 同一时间只有一个 goroutine 可以写入
    • 写入时会阻塞所有读取和写入操作
  3. 原子操作

    • 使用硬件指令(如 CAS - Compare-And-Swap)实现
    • 不需要上下文切换,性能比锁更高
    • 适用于简单的操作,如计数器
  4. 通道

    • 基于 CSP (Communicating Sequential Processes) 模型
    • 通过发送和接收操作实现同步
    • 天然支持并发安全

3.3 内存模型与并发安全

Go 语言的内存模型定义了以下规则:

  • Happens-Before 关系:如果操作 A happens before 操作 B,那么操作 A 的结果对操作 B 可见
  • 同步操作:如通道操作、互斥锁等,用于建立 happens-before 关系
  • 内存屏障:确保内存操作的顺序和可见性

3.4 竞态条件的检测

Go 语言提供了 race detector 工具来检测竞态条件:

  • 使用 -race 标志运行程序
  • race detector 会检测并报告数据竞争问题
  • 帮助开发者及时发现和解决并发安全问题

4. 常见错误与踩坑点

4.1 数据竞争

错误表现:程序行为不确定,结果不一致 产生原因:多个 goroutine 同时访问和修改共享资源,没有使用同步原语保护 解决方案:使用互斥锁、读写锁、原子操作或通道来保护共享资源

4.2 死锁

错误表现:多个 goroutine 相互等待对方释放资源,导致程序卡住 产生原因:资源获取顺序不当,或者循环等待 解决方案:避免循环等待,使用带缓冲的通道或超时机制

4.3 活锁

错误表现:goroutine 一直在执行,但无法完成任务 产生原因:过度的重试机制,或者资源竞争导致的饥饿 解决方案:添加随机退避机制,合理设计重试策略

4.4 锁的粒度过粗

错误表现:并发性能下降,锁竞争严重 产生原因:锁的范围过大,包含了不需要同步的代码 解决方案:减小锁的粒度,只保护需要同步的代码

4.5 错误使用原子操作

错误表现:原子操作与非原子操作混合使用,导致数据竞争 产生原因:不了解原子操作的使用规则 解决方案:对同一个变量的所有操作都使用原子操作,或者使用互斥锁

4.6 内存泄漏

错误表现:程序内存占用持续增长,最终导致内存耗尽 产生原因:goroutine 泄漏,或者资源没有正确释放 解决方案:合理控制 goroutine 的数量,确保资源正确释放

5. 常见应用场景

5.1 共享计数器

场景描述:多个 goroutine 需要递增同一个计数器 使用方法:使用互斥锁或原子操作保护计数器 示例代码

go
// 使用互斥锁
var (
    mu      sync.Mutex
    counter int
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

// 使用原子操作
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

5.2 共享缓存

场景描述:多个 goroutine 需要访问和修改同一个缓存 使用方法:使用互斥锁或读写锁保护缓存 示例代码

go
type Cache struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    value, ok := c.data[key]
    return value, ok
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

5.3 并发安全的队列

场景描述:多个 goroutine 需要操作同一个队列 使用方法:使用互斥锁保护队列,或使用通道实现队列 示例代码

go
type Queue struct {
    data []interface{}
    mu   sync.Mutex
}

func (q *Queue) Enqueue(item interface{}) {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.data = append(q.data, item)
}

func (q *Queue) Dequeue() (interface{}, bool) {
    q.mu.Lock()
    defer q.mu.Unlock()
    if len(q.data) == 0 {
        return nil, false
    }
    item := q.data[0]
    q.data = q.data[1:]
    return item, true
}

5.4 单例模式

场景描述:确保某个对象只被创建一次 使用方法:使用 sync.Once 实现单例模式 示例代码

go
var (
    once     sync.Once
    instance *Type
)

func GetInstance() *Type {
    once.Do(func() {
        instance = &Type{}
        // 初始化
    })
    return instance
}

5.5 并发安全的映射

场景描述:多个 goroutine 需要访问和修改同一个映射 使用方法:使用互斥锁或读写锁保护映射 示例代码

go
type SafeMap struct {
    data map[string]string
    mu   sync.RWMutex
}

func (sm *SafeMap) Get(key string) (string, bool) {
    sm.mu.RLock()
    defer sm.mu.RUnlock()
    value, ok := sm.data[key]
    return value, ok
}

func (sm *SafeMap) Set(key, value string) {
    sm.mu.Lock()
    defer sm.mu.Unlock()
    sm.data[key] = value
}

6. 企业级进阶应用场景

6.1 高并发 API 服务

场景描述:构建高并发的 API 服务,处理大量并发请求 使用方法:使用 goroutine 处理每个请求,结合并发安全的共享资源 示例代码

go
type APIServer struct {
    cache    *Cache
    requests int64
}

func (s *APIServer) handleRequest(w http.ResponseWriter, r *http.Request) {
    // 原子递增请求计数
    atomic.AddInt64(&s.requests, 1)
    
    // 处理请求
    key := r.URL.Query().Get("key")
    value, ok := s.cache.Get(key)
    if !ok {
        w.WriteHeader(http.StatusNotFound)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]interface{}{"value": value})
}

func main() {
    server := &APIServer{
        cache: NewCache(),
    }
    
    http.HandleFunc("/api", server.handleRequest)
    http.ListenAndServe(":8080", nil)
}

6.2 分布式锁

场景描述:在分布式系统中实现互斥访问 使用方法:使用 Redis、ZooKeeper 等实现分布式锁 示例代码

go
type DistributedLock struct {
    client *redis.Client
    key    string
    value  string
}

func NewDistributedLock(client *redis.Client, key string) *DistributedLock {
    return &DistributedLock{
        client: client,
        key:    key,
        value:  uuid.New().String(),
    }
}

func (l *DistributedLock) Lock() (bool, error) {
    // 使用 Redis 的 SETNX 命令实现分布式锁
    result, err := l.client.SetNX(context.Background(), l.key, l.value, time.Second*10).Result()
    return result, err
}

func (l *DistributedLock) Unlock() error {
    // 使用 Lua 脚本确保原子释放锁
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    _, err := l.client.Eval(context.Background(), script, []string{l.key}, l.value).Result()
    return err
}

6.3 并发安全的配置管理

场景描述:多个 goroutine 需要访问和更新配置 使用方法:使用互斥锁保护配置,或使用原子指针 示例代码

go
type Config struct {
    ServerPort int
    DatabaseURL string
}

var (
    config     *Config
    configMu   sync.RWMutex
    configOnce sync.Once
)

func GetConfig() *Config {
    configOnce.Do(func() {
        // 初始化配置
        config = &Config{
            ServerPort: 8080,
            DatabaseURL: "postgres://localhost:5432/mydb",
        }
    })
    
    configMu.RLock()
    defer configMu.RUnlock()
    return config
}

func UpdateConfig(newConfig *Config) {
    configMu.Lock()
    defer configMu.Unlock()
    config = newConfig
}

6.4 并发安全的连接池

场景描述:管理数据库连接等资源的并发访问 使用方法:使用互斥锁保护连接池,或使用通道实现 示例代码

go
type ConnectionPool struct {
    connections chan *sql.DB
    mu          sync.Mutex
    maxSize     int
}

func NewConnectionPool(maxSize int) *ConnectionPool {
    return &ConnectionPool{
        connections: make(chan *sql.DB, maxSize),
        maxSize:     maxSize,
    }
}

func (p *ConnectionPool) Get() (*sql.DB, error) {
    select {
    case conn := <-p.connections:
        return conn, nil
    default:
        p.mu.Lock()
        defer p.mu.Unlock()
        
        // 检查是否达到最大连接数
        if len(p.connections) < p.maxSize {
            conn, err := sql.Open("postgres", "postgres://localhost:5432/mydb")
            if err != nil {
                return nil, err
            }
            return conn, nil
        }
        
        // 等待可用连接
        conn := <-p.connections
        return conn, nil
    }
}

func (p *ConnectionPool) Put(conn *sql.DB) {
    p.connections <- conn
}

6.5 并发安全的事件总线

场景描述:实现一个并发安全的事件发布订阅系统 使用方法:使用互斥锁保护订阅者列表,结合 goroutine 处理事件 示例代码

go
type Event struct {
    Type    string
    Payload interface{}
}

type EventBus struct {
    subscribers map[string][]chan Event
    mu          sync.RWMutex
}

func NewEventBus() *EventBus {
    return &EventBus{
        subscribers: make(map[string][]chan Event),
    }
}

func (eb *EventBus) Subscribe(eventType string, ch chan Event) {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    eb.subscribers[eventType] = append(eb.subscribers[eventType], ch)
}

func (eb *EventBus) Publish(eventType string, payload interface{}) {
    event := Event{
        Type:    eventType,
        Payload: payload,
    }
    
    eb.mu.RLock()
    subscribers := eb.subscribers[eventType]
    eb.mu.RUnlock()
    
    for _, ch := range subscribers {
        go func(ch chan Event) {
            ch <- event
        }(ch)
    }
}

7. 行业最佳实践

7.1 优先使用通道进行通信

实践内容:使用通道进行 goroutine 间通信,而不是共享内存 推荐理由:通道提供了内置的同步机制,避免了数据竞争和内存可见性问题

7.2 最小化共享状态

实践内容:尽量减少共享状态,使用不可变数据结构 推荐理由:减少共享状态可以降低并发编程的复杂度,避免数据竞争

7.3 选择合适的同步原语

实践内容:根据场景选择合适的同步原语 推荐理由:不同的同步原语有不同的适用场景,选择合适的可以提高性能和可靠性

7.4 减小锁的粒度

实践内容:只在必要的代码段使用锁,减小锁的范围 推荐理由:减小锁的粒度可以减少锁竞争,提高并发性能

7.5 使用读写锁提高并发性能

实践内容:在读多写少的场景中使用读写锁 推荐理由:读写锁允许多个 goroutine 同时读取,提高并发性能

7.6 定期检查数据竞争

实践内容:使用 -race 标志运行程序,检查数据竞争 推荐理由:及时发现和解决数据竞争问题,提高程序的可靠性

7.7 使用原子操作处理简单计数器

实践内容:对于简单的计数器,使用原子操作而不是互斥锁 推荐理由:原子操作比互斥锁更高效,特别是在高并发场景下

7.8 合理设计并发结构

实践内容:根据任务特性,设计合理的并发结构 推荐理由:合理的并发结构可以提高程序的性能和可维护性

8. 常见问题答疑(FAQ)

8.1 什么是并发安全?

问题描述:并发安全的定义和重要性 回答内容:并发安全是指多个 goroutine 同时访问共享资源时,不会产生竞态条件或数据不一致的问题。并发安全对于编写可靠的并发程序至关重要。 示例代码

go
// 并发安全的计数器
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

func getCount() int64 {
    return atomic.LoadInt64(&counter)
}

8.2 如何避免数据竞争?

问题描述:如何在并发程序中避免数据竞争 回答内容:可以使用互斥锁、读写锁、原子操作或通道来避免数据竞争,确保同一时间只有一个 goroutine 可以修改共享资源。 示例代码

go
// 使用互斥锁避免数据竞争
var (
    mu      sync.Mutex
    counter int
)

func increment() {
    mu.Lock()
    defer mu.Unlock()
    counter++
}

// 使用通道避免数据竞争
ch := make(chan int)

func worker() {
    for i := range ch {
        // 处理数据,不需要锁
    }
}

func main() {
    go worker()
    ch <- 1 // 发送数据
}

8.3 互斥锁和读写锁有什么区别?

问题描述:互斥锁和读写锁的区别和适用场景 回答内容:互斥锁确保同一时间只有一个 goroutine 可以访问共享资源;读写锁允许多个 goroutine 同时读取,但同一时间只有一个 goroutine 可以写入。读写锁适用于读多写少的场景。 示例代码

go
// 使用互斥锁
var mu sync.Mutex

// 使用读写锁
var rwmu sync.RWMutex

func read() {
    rwmu.RLock()
    defer rwmu.RUnlock()
    // 读取操作
}

func write() {
    rwmu.Lock()
    defer rwmu.Unlock()
    // 写入操作
}

8.4 原子操作和互斥锁有什么区别?

问题描述:原子操作和互斥锁的区别和适用场景 回答内容:原子操作是硬件级别的操作,比互斥锁更高效,但只适用于简单的操作,如计数器;互斥锁适用于复杂的临界区,保护多个操作的原子性。 示例代码

go
// 原子操作:适用于简单的计数器
var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

// 互斥锁:适用于复杂的临界区
var (
    mu      sync.Mutex
    data    map[string]string
)

func updateData(key, value string) {
    mu.Lock()
    defer mu.Unlock()
    data[key] = value
    // 可能还有其他操作
}

8.5 如何避免死锁?

问题描述:如何避免并发程序中的死锁 回答内容:避免循环等待,使用带缓冲的通道,设置超时机制,以及使用 select 语句处理多个通道。 示例代码

go
// 使用 select 避免死锁
select {
case ch1 <- value:
    // 发送成功
case <-time.After(time.Second):
    // 超时处理
}

// 避免循环等待:统一锁的获取顺序
func acquireLocks(mu1, mu2 *sync.Mutex) {
    if mu1 < mu2 { // 假设可以比较地址
        mu1.Lock()
        mu2.Lock()
    } else {
        mu2.Lock()
        mu1.Lock()
    }
}

8.6 如何使用 race detector 检测数据竞争?

问题描述:如何使用 Go 的 race detector 检测数据竞争 回答内容:使用 go run -racego build -race 命令运行程序,race detector 会检测并报告数据竞争问题。 示例代码

bash
# 运行程序并检查数据竞争
go run -race main.go

# 构建程序并检查数据竞争
go build -race -o main main.go
./main

8.7 什么是活锁?如何避免?

问题描述:活锁的概念和避免方法 回答内容:活锁是指 goroutine 一直在执行,但无法完成任务的情况。避免活锁的方法包括添加随机退避机制,合理设计重试策略,以及使用超时机制。 示例代码

go
// 使用随机退避避免活锁
func tryAcquireLock() bool {
    for i := 0; i < maxRetries; i++ {
        if tryLock() {
            return true
        }
        // 随机退避
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
    return false
}

8.8 如何设计并发安全的 API?

问题描述:如何设计并发安全的 API 回答内容:设计并发安全的 API 应该考虑以下几点:最小化共享状态,使用适当的同步原语,提供清晰的文档说明线程安全性,以及避免在 API 内部使用全局状态。 示例代码

go
// 并发安全的 API 设计
type SafeCounter struct {
    counter int64
}

func NewSafeCounter() *SafeCounter {
    return &SafeCounter{}
}

func (c *SafeCounter) Increment() {
    atomic.AddInt64(&c.counter, 1)
}

func (c *SafeCounter) Value() int64 {
    return atomic.LoadInt64(&c.counter)
}

9. 实战练习

9.1 基础练习

题目:实现一个并发安全的计数器 解题思路

  1. 使用互斥锁或原子操作实现计数器
  2. 测试并发场景下的正确性
  3. 比较不同实现的性能

常见误区

  • 未使用同步原语,导致数据竞争
  • 错误使用原子操作,与非原子操作混合使用
  • 过度使用锁,导致性能下降

分步提示

  1. 使用互斥锁实现计数器
  2. 使用原子操作实现计数器
  3. 编写并发测试代码,启动多个 goroutine 同时递增计数器
  4. 验证计数器的最终值是否正确
  5. 比较两种实现的性能

参考代码

go
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// 使用互斥锁的计数器
type MutexCounter struct {
    mu    sync.Mutex
    value int
}

func (c *MutexCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *MutexCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// 使用原子操作的计数器
type AtomicCounter struct {
    value int64
}

func (c *AtomicCounter) Increment() {
    atomic.AddInt64(&c.value, 1)
}

func (c *AtomicCounter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func main() {
    const numGoroutines = 100
    const numOperations = 10000
    
    // 测试互斥锁计数器
    mutexCounter := &MutexCounter{}
    var wg sync.WaitGroup
    
    start := time.Now()
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < numOperations; j++ {
                mutexCounter.Increment()
            }
        }()
    }
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("MutexCounter: %d operations took %v\n", numGoroutines*numOperations, elapsed)
    fmt.Printf("Final value: %d\n", mutexCounter.Value())
    
    // 测试原子操作计数器
    atomicCounter := &AtomicCounter{}
    
    start = time.Now()
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < numOperations; j++ {
                atomicCounter.Increment()
            }
        }()
    }
    wg.Wait()
    elapsed = time.Since(start)
    fmt.Printf("AtomicCounter: %d operations took %v\n", numGoroutines*numOperations, elapsed)
    fmt.Printf("Final value: %d\n", atomicCounter.Value())
}

9.2 进阶练习

题目:实现一个并发安全的缓存 解题思路

  1. 使用互斥锁或读写锁保护缓存数据
  2. 实现基本的 Get、Set、Delete 操作
  3. 测试并发场景下的正确性和性能

常见误区

  • 未正确使用锁,导致数据竞争
  • 锁的粒度太粗,导致性能下降
  • 内存泄漏,缓存数据没有过期机制

分步提示

  1. 定义缓存结构,使用 map 存储数据
  2. 使用读写锁保护 map 的访问
  3. 实现 Get、Set、Delete 方法
  4. 编写并发测试代码,模拟多个 goroutine 同时访问缓存
  5. 验证缓存操作的正确性
  6. 分析性能瓶颈

参考代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Cache struct {
    data map[string]interface{}
    mu   sync.RWMutex
}

func NewCache() *Cache {
    return &Cache{
        data: make(map[string]interface{}),
    }
}

func (c *Cache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    value, ok := c.data[key]
    return value, ok
}

func (c *Cache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

func (c *Cache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.data, key)
}

func (c *Cache) Len() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return len(c.data)
}

func main() {
    cache := NewCache()
    var wg sync.WaitGroup
    const numGoroutines = 100
    const numOperations = 1000
    
    start := time.Now()
    
    // 并发写入
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key-%d-%d", i, j)
                value := fmt.Sprintf("value-%d-%d", i, j)
                cache.Set(key, value)
            }
        }(i)
    }
    
    // 并发读取
    for i := 0; i < numGoroutines; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            for j := 0; j < numOperations; j++ {
                key := fmt.Sprintf("key-%d-%d", i, j)
                cache.Get(key)
            }
        }(i)
    }
    
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("Cache operations took %v\n", elapsed)
    fmt.Printf("Cache size: %d\n", cache.Len())
}

9.3 挑战练习

题目:实现一个并发安全的工作队列,支持任务的添加、执行和取消 解题思路

  1. 使用通道实现工作队列
  2. 支持添加任务到队列
  3. 支持并发执行任务
  4. 支持取消任务
  5. 测试并发场景下的正确性和性能

常见误区

  • 通道操作不当,导致死锁或阻塞
  • 任务取消机制实现复杂
  • 并发控制不当,导致资源竞争

分步提示

  1. 定义任务结构和工作队列结构
  2. 实现任务添加方法
  3. 实现任务执行方法,使用 goroutine 并发执行
  4. 实现任务取消机制
  5. 编写并发测试代码,模拟多个 goroutine 同时添加和取消任务
  6. 验证工作队列的正确性
  7. 分析性能瓶颈

参考代码

go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID      string
    Func    func() error
    Cancel  context.CancelFunc
    Context context.Context
}

type WorkQueue struct {
    tasks     chan *Task
    cancel    context.CancelFunc
    ctx       context.Context
    wg        sync.WaitGroup
    numWorkers int
}

func NewWorkQueue(numWorkers int) *WorkQueue {
    ctx, cancel := context.WithCancel(context.Background())
    return &WorkQueue{
        tasks:     make(chan *Task, 100),
        cancel:    cancel,
        ctx:       ctx,
        numWorkers: numWorkers,
    }
}

func (wq *WorkQueue) Start() {
    for i := 0; i < wq.numWorkers; i++ {
        wq.wg.Add(1)
        go wq.worker()
    }
}

func (wq *WorkQueue) worker() {
    defer wq.wg.Done()
    for {
        select {
        case <-wq.ctx.Done():
            return
        case task, ok := <-wq.tasks:
            if !ok {
                return
            }
            
            select {
            case <-task.Context.Done():
                fmt.Printf("Task %s cancelled\n", task.ID)
            default:
                if err := task.Func(); err != nil {
                    fmt.Printf("Task %s failed: %v\n", task.ID, err)
                } else {
                    fmt.Printf("Task %s completed\n", task.ID)
                }
            }
        }
    }
}

func (wq *WorkQueue) AddTask(id string, f func() error) *Task {
    ctx, cancel := context.WithCancel(wq.ctx)
    task := &Task{
        ID:      id,
        Func:    f,
        Cancel:  cancel,
        Context: ctx,
    }
    
    select {
    case <-wq.ctx.Done():
        return nil
    case wq.tasks <- task:
        return task
    }
}

func (wq *WorkQueue) Stop() {
    wq.cancel()
    close(wq.tasks)
    wq.wg.Wait()
}

func main() {
    wq := NewWorkQueue(3)
    wq.Start()
    
    // 添加任务
    tasks := make([]*Task, 10)
    for i := 0; i < 10; i++ {
        id := fmt.Sprintf("task-%d", i)
        tasks[i] = wq.AddTask(id, func() error {
            time.Sleep(100 * time.Millisecond)
            return nil
        })
    }
    
    // 取消部分任务
    for i := 0; i < 5; i++ {
        if tasks[i] != nil {
            tasks[i].Cancel()
            fmt.Printf("Cancelled task %s\n", tasks[i].ID)
        }
    }
    
    // 等待一段时间
    time.Sleep(1 * time.Second)
    
    // 停止工作队列
    wq.Stop()
    fmt.Println("Work queue stopped")
}

10. 知识点总结

10.1 核心要点

  • 并发安全:确保多个 goroutine 能够安全地访问共享资源
  • 竞态条件:多个 goroutine 同时访问和修改共享资源导致的问题
  • 同步原语:包括互斥锁、读写锁、原子操作和通道
  • 内存模型:定义了多 goroutine 之间的内存可见性规则
  • Happens-Before 关系:确保操作的顺序和可见性
  • race detector:用于检测数据竞争问题

10.2 易错点回顾

  • 数据竞争:未使用同步原语保护共享资源
  • 死锁:多个 goroutine 相互等待对方释放资源
  • 活锁:过度的重试机制导致无法完成任务
  • 锁的粒度过粗:锁的范围过大,导致性能下降
  • 错误使用原子操作:与非原子操作混合使用
  • 内存泄漏:goroutine 泄漏或资源未正确释放

11. 拓展参考资料

11.1 官方文档链接

11.2 进阶学习路径建议

  • 并发模式:学习常见的并发设计模式
  • 无锁编程:学习无锁数据结构的设计和实现
  • 性能优化:学习如何优化并发程序的性能
  • 分布式系统:学习分布式系统中的并发控制
  • 测试:学习如何测试并发程序

11.3 推荐资源

  • 《Concurrency in Go》by Katherine Cox-Buday
  • 《Go 语言实战》中的并发编程章节
  • 《The Go Programming Language》中的并发章节
  • Go 官方博客关于并发的文章
  • 开源项目中的并发编程实践,如 Kubernetes、Docker 等