Appearance
并发安全基础
1. 概述
并发安全是并发编程中的核心概念,它确保多个 goroutine 能够安全地访问共享资源,避免竞态条件和数据不一致的问题。理解并发安全的基本原理和实践方法,对于编写正确、可靠的并发程序至关重要。本章节将深入探讨并发安全的基础知识,包括其核心概念、实现机制和最佳实践,帮助开发者更好地理解和应用并发安全技术。
2. 基本概念
2.1 语法
Go 语言中与并发安全相关的核心语法:
go
// 互斥锁
import "sync"
var mu sync.Mutex
// 读写锁
var rwmu sync.RWMutex
// 原子操作
import "sync/atomic"
var counter int64
// WaitGroup
var wg sync.WaitGroup
// Once
var once sync.Once
// 通道
ch := make(chan Type)2.2 语义
- 并发安全:多个 goroutine 同时访问共享资源时,不会产生竞态条件或数据不一致的问题
- 竞态条件:多个 goroutine 同时访问和修改共享资源,导致结果不确定的情况
- 互斥锁:确保同一时间只有一个 goroutine 可以访问共享资源
- 读写锁:允许多个 goroutine 同时读取,但同一时间只有一个 goroutine 可以写入
- 原子操作:硬件级别的同步操作,确保操作的原子性
- 通道:通过通信来共享内存,避免直接共享内存导致的并发问题
2.3 规范
- 最小化共享状态:尽量减少共享状态,使用不可变数据结构
- 正确使用同步原语:根据场景选择合适的同步原语
- 避免死锁:注意锁的获取顺序,避免循环等待
- 避免活锁:合理设计重试机制,避免过度竞争
- 性能考虑:在保证并发安全的同时,考虑性能影响
3. 原理深度解析
3.1 并发安全的实现原理
并发安全的实现基于以下核心原理:
- 互斥访问:通过锁机制确保同一时间只有一个 goroutine 可以访问共享资源
- 原子操作:通过硬件指令确保操作的原子性
- 内存可见性:确保一个 goroutine 对内存的修改对其他 goroutine 可见
- 有序性:确保操作的执行顺序符合预期
3.2 同步原语的工作原理
互斥锁 (Mutex):
- 使用互斥量(mutex)来实现互斥访问
- 当一个 goroutine 获取锁后,其他 goroutine 必须等待
- 解锁操作会释放锁,允许其他 goroutine 获取
读写锁 (RWMutex):
- 允许多个 goroutine 同时读取
- 同一时间只有一个 goroutine 可以写入
- 写入时会阻塞所有读取和写入操作
原子操作:
- 使用硬件指令(如 CAS - Compare-And-Swap)实现
- 不需要上下文切换,性能比锁更高
- 适用于简单的操作,如计数器
通道:
- 基于 CSP (Communicating Sequential Processes) 模型
- 通过发送和接收操作实现同步
- 天然支持并发安全
3.3 内存模型与并发安全
Go 语言的内存模型定义了以下规则:
- Happens-Before 关系:如果操作 A happens before 操作 B,那么操作 A 的结果对操作 B 可见
- 同步操作:如通道操作、互斥锁等,用于建立 happens-before 关系
- 内存屏障:确保内存操作的顺序和可见性
3.4 竞态条件的检测
Go 语言提供了 race detector 工具来检测竞态条件:
- 使用
-race标志运行程序 - race detector 会检测并报告数据竞争问题
- 帮助开发者及时发现和解决并发安全问题
4. 常见错误与踩坑点
4.1 数据竞争
错误表现:程序行为不确定,结果不一致 产生原因:多个 goroutine 同时访问和修改共享资源,没有使用同步原语保护 解决方案:使用互斥锁、读写锁、原子操作或通道来保护共享资源
4.2 死锁
错误表现:多个 goroutine 相互等待对方释放资源,导致程序卡住 产生原因:资源获取顺序不当,或者循环等待 解决方案:避免循环等待,使用带缓冲的通道或超时机制
4.3 活锁
错误表现:goroutine 一直在执行,但无法完成任务 产生原因:过度的重试机制,或者资源竞争导致的饥饿 解决方案:添加随机退避机制,合理设计重试策略
4.4 锁的粒度过粗
错误表现:并发性能下降,锁竞争严重 产生原因:锁的范围过大,包含了不需要同步的代码 解决方案:减小锁的粒度,只保护需要同步的代码
4.5 错误使用原子操作
错误表现:原子操作与非原子操作混合使用,导致数据竞争 产生原因:不了解原子操作的使用规则 解决方案:对同一个变量的所有操作都使用原子操作,或者使用互斥锁
4.6 内存泄漏
错误表现:程序内存占用持续增长,最终导致内存耗尽 产生原因:goroutine 泄漏,或者资源没有正确释放 解决方案:合理控制 goroutine 的数量,确保资源正确释放
5. 常见应用场景
5.1 共享计数器
场景描述:多个 goroutine 需要递增同一个计数器 使用方法:使用互斥锁或原子操作保护计数器 示例代码:
go
// 使用互斥锁
var (
mu sync.Mutex
counter int
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
// 使用原子操作
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}5.2 共享缓存
场景描述:多个 goroutine 需要访问和修改同一个缓存 使用方法:使用互斥锁或读写锁保护缓存 示例代码:
go
type Cache struct {
data map[string]interface{}
mu sync.RWMutex
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}5.3 并发安全的队列
场景描述:多个 goroutine 需要操作同一个队列 使用方法:使用互斥锁保护队列,或使用通道实现队列 示例代码:
go
type Queue struct {
data []interface{}
mu sync.Mutex
}
func (q *Queue) Enqueue(item interface{}) {
q.mu.Lock()
defer q.mu.Unlock()
q.data = append(q.data, item)
}
func (q *Queue) Dequeue() (interface{}, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.data) == 0 {
return nil, false
}
item := q.data[0]
q.data = q.data[1:]
return item, true
}5.4 单例模式
场景描述:确保某个对象只被创建一次 使用方法:使用 sync.Once 实现单例模式 示例代码:
go
var (
once sync.Once
instance *Type
)
func GetInstance() *Type {
once.Do(func() {
instance = &Type{}
// 初始化
})
return instance
}5.5 并发安全的映射
场景描述:多个 goroutine 需要访问和修改同一个映射 使用方法:使用互斥锁或读写锁保护映射 示例代码:
go
type SafeMap struct {
data map[string]string
mu sync.RWMutex
}
func (sm *SafeMap) Get(key string) (string, bool) {
sm.mu.RLock()
defer sm.mu.RUnlock()
value, ok := sm.data[key]
return value, ok
}
func (sm *SafeMap) Set(key, value string) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.data[key] = value
}6. 企业级进阶应用场景
6.1 高并发 API 服务
场景描述:构建高并发的 API 服务,处理大量并发请求 使用方法:使用 goroutine 处理每个请求,结合并发安全的共享资源 示例代码:
go
type APIServer struct {
cache *Cache
requests int64
}
func (s *APIServer) handleRequest(w http.ResponseWriter, r *http.Request) {
// 原子递增请求计数
atomic.AddInt64(&s.requests, 1)
// 处理请求
key := r.URL.Query().Get("key")
value, ok := s.cache.Get(key)
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{"value": value})
}
func main() {
server := &APIServer{
cache: NewCache(),
}
http.HandleFunc("/api", server.handleRequest)
http.ListenAndServe(":8080", nil)
}6.2 分布式锁
场景描述:在分布式系统中实现互斥访问 使用方法:使用 Redis、ZooKeeper 等实现分布式锁 示例代码:
go
type DistributedLock struct {
client *redis.Client
key string
value string
}
func NewDistributedLock(client *redis.Client, key string) *DistributedLock {
return &DistributedLock{
client: client,
key: key,
value: uuid.New().String(),
}
}
func (l *DistributedLock) Lock() (bool, error) {
// 使用 Redis 的 SETNX 命令实现分布式锁
result, err := l.client.SetNX(context.Background(), l.key, l.value, time.Second*10).Result()
return result, err
}
func (l *DistributedLock) Unlock() error {
// 使用 Lua 脚本确保原子释放锁
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
_, err := l.client.Eval(context.Background(), script, []string{l.key}, l.value).Result()
return err
}6.3 并发安全的配置管理
场景描述:多个 goroutine 需要访问和更新配置 使用方法:使用互斥锁保护配置,或使用原子指针 示例代码:
go
type Config struct {
ServerPort int
DatabaseURL string
}
var (
config *Config
configMu sync.RWMutex
configOnce sync.Once
)
func GetConfig() *Config {
configOnce.Do(func() {
// 初始化配置
config = &Config{
ServerPort: 8080,
DatabaseURL: "postgres://localhost:5432/mydb",
}
})
configMu.RLock()
defer configMu.RUnlock()
return config
}
func UpdateConfig(newConfig *Config) {
configMu.Lock()
defer configMu.Unlock()
config = newConfig
}6.4 并发安全的连接池
场景描述:管理数据库连接等资源的并发访问 使用方法:使用互斥锁保护连接池,或使用通道实现 示例代码:
go
type ConnectionPool struct {
connections chan *sql.DB
mu sync.Mutex
maxSize int
}
func NewConnectionPool(maxSize int) *ConnectionPool {
return &ConnectionPool{
connections: make(chan *sql.DB, maxSize),
maxSize: maxSize,
}
}
func (p *ConnectionPool) Get() (*sql.DB, error) {
select {
case conn := <-p.connections:
return conn, nil
default:
p.mu.Lock()
defer p.mu.Unlock()
// 检查是否达到最大连接数
if len(p.connections) < p.maxSize {
conn, err := sql.Open("postgres", "postgres://localhost:5432/mydb")
if err != nil {
return nil, err
}
return conn, nil
}
// 等待可用连接
conn := <-p.connections
return conn, nil
}
}
func (p *ConnectionPool) Put(conn *sql.DB) {
p.connections <- conn
}6.5 并发安全的事件总线
场景描述:实现一个并发安全的事件发布订阅系统 使用方法:使用互斥锁保护订阅者列表,结合 goroutine 处理事件 示例代码:
go
type Event struct {
Type string
Payload interface{}
}
type EventBus struct {
subscribers map[string][]chan Event
mu sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]chan Event),
}
}
func (eb *EventBus) Subscribe(eventType string, ch chan Event) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.subscribers[eventType] = append(eb.subscribers[eventType], ch)
}
func (eb *EventBus) Publish(eventType string, payload interface{}) {
event := Event{
Type: eventType,
Payload: payload,
}
eb.mu.RLock()
subscribers := eb.subscribers[eventType]
eb.mu.RUnlock()
for _, ch := range subscribers {
go func(ch chan Event) {
ch <- event
}(ch)
}
}7. 行业最佳实践
7.1 优先使用通道进行通信
实践内容:使用通道进行 goroutine 间通信,而不是共享内存 推荐理由:通道提供了内置的同步机制,避免了数据竞争和内存可见性问题
7.2 最小化共享状态
实践内容:尽量减少共享状态,使用不可变数据结构 推荐理由:减少共享状态可以降低并发编程的复杂度,避免数据竞争
7.3 选择合适的同步原语
实践内容:根据场景选择合适的同步原语 推荐理由:不同的同步原语有不同的适用场景,选择合适的可以提高性能和可靠性
7.4 减小锁的粒度
实践内容:只在必要的代码段使用锁,减小锁的范围 推荐理由:减小锁的粒度可以减少锁竞争,提高并发性能
7.5 使用读写锁提高并发性能
实践内容:在读多写少的场景中使用读写锁 推荐理由:读写锁允许多个 goroutine 同时读取,提高并发性能
7.6 定期检查数据竞争
实践内容:使用 -race 标志运行程序,检查数据竞争 推荐理由:及时发现和解决数据竞争问题,提高程序的可靠性
7.7 使用原子操作处理简单计数器
实践内容:对于简单的计数器,使用原子操作而不是互斥锁 推荐理由:原子操作比互斥锁更高效,特别是在高并发场景下
7.8 合理设计并发结构
实践内容:根据任务特性,设计合理的并发结构 推荐理由:合理的并发结构可以提高程序的性能和可维护性
8. 常见问题答疑(FAQ)
8.1 什么是并发安全?
问题描述:并发安全的定义和重要性 回答内容:并发安全是指多个 goroutine 同时访问共享资源时,不会产生竞态条件或数据不一致的问题。并发安全对于编写可靠的并发程序至关重要。 示例代码:
go
// 并发安全的计数器
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
func getCount() int64 {
return atomic.LoadInt64(&counter)
}8.2 如何避免数据竞争?
问题描述:如何在并发程序中避免数据竞争 回答内容:可以使用互斥锁、读写锁、原子操作或通道来避免数据竞争,确保同一时间只有一个 goroutine 可以修改共享资源。 示例代码:
go
// 使用互斥锁避免数据竞争
var (
mu sync.Mutex
counter int
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
// 使用通道避免数据竞争
ch := make(chan int)
func worker() {
for i := range ch {
// 处理数据,不需要锁
}
}
func main() {
go worker()
ch <- 1 // 发送数据
}8.3 互斥锁和读写锁有什么区别?
问题描述:互斥锁和读写锁的区别和适用场景 回答内容:互斥锁确保同一时间只有一个 goroutine 可以访问共享资源;读写锁允许多个 goroutine 同时读取,但同一时间只有一个 goroutine 可以写入。读写锁适用于读多写少的场景。 示例代码:
go
// 使用互斥锁
var mu sync.Mutex
// 使用读写锁
var rwmu sync.RWMutex
func read() {
rwmu.RLock()
defer rwmu.RUnlock()
// 读取操作
}
func write() {
rwmu.Lock()
defer rwmu.Unlock()
// 写入操作
}8.4 原子操作和互斥锁有什么区别?
问题描述:原子操作和互斥锁的区别和适用场景 回答内容:原子操作是硬件级别的操作,比互斥锁更高效,但只适用于简单的操作,如计数器;互斥锁适用于复杂的临界区,保护多个操作的原子性。 示例代码:
go
// 原子操作:适用于简单的计数器
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
// 互斥锁:适用于复杂的临界区
var (
mu sync.Mutex
data map[string]string
)
func updateData(key, value string) {
mu.Lock()
defer mu.Unlock()
data[key] = value
// 可能还有其他操作
}8.5 如何避免死锁?
问题描述:如何避免并发程序中的死锁 回答内容:避免循环等待,使用带缓冲的通道,设置超时机制,以及使用 select 语句处理多个通道。 示例代码:
go
// 使用 select 避免死锁
select {
case ch1 <- value:
// 发送成功
case <-time.After(time.Second):
// 超时处理
}
// 避免循环等待:统一锁的获取顺序
func acquireLocks(mu1, mu2 *sync.Mutex) {
if mu1 < mu2 { // 假设可以比较地址
mu1.Lock()
mu2.Lock()
} else {
mu2.Lock()
mu1.Lock()
}
}8.6 如何使用 race detector 检测数据竞争?
问题描述:如何使用 Go 的 race detector 检测数据竞争 回答内容:使用 go run -race 或 go build -race 命令运行程序,race detector 会检测并报告数据竞争问题。 示例代码:
bash
# 运行程序并检查数据竞争
go run -race main.go
# 构建程序并检查数据竞争
go build -race -o main main.go
./main8.7 什么是活锁?如何避免?
问题描述:活锁的概念和避免方法 回答内容:活锁是指 goroutine 一直在执行,但无法完成任务的情况。避免活锁的方法包括添加随机退避机制,合理设计重试策略,以及使用超时机制。 示例代码:
go
// 使用随机退避避免活锁
func tryAcquireLock() bool {
for i := 0; i < maxRetries; i++ {
if tryLock() {
return true
}
// 随机退避
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
return false
}8.8 如何设计并发安全的 API?
问题描述:如何设计并发安全的 API 回答内容:设计并发安全的 API 应该考虑以下几点:最小化共享状态,使用适当的同步原语,提供清晰的文档说明线程安全性,以及避免在 API 内部使用全局状态。 示例代码:
go
// 并发安全的 API 设计
type SafeCounter struct {
counter int64
}
func NewSafeCounter() *SafeCounter {
return &SafeCounter{}
}
func (c *SafeCounter) Increment() {
atomic.AddInt64(&c.counter, 1)
}
func (c *SafeCounter) Value() int64 {
return atomic.LoadInt64(&c.counter)
}9. 实战练习
9.1 基础练习
题目:实现一个并发安全的计数器 解题思路:
- 使用互斥锁或原子操作实现计数器
- 测试并发场景下的正确性
- 比较不同实现的性能
常见误区:
- 未使用同步原语,导致数据竞争
- 错误使用原子操作,与非原子操作混合使用
- 过度使用锁,导致性能下降
分步提示:
- 使用互斥锁实现计数器
- 使用原子操作实现计数器
- 编写并发测试代码,启动多个 goroutine 同时递增计数器
- 验证计数器的最终值是否正确
- 比较两种实现的性能
参考代码:
go
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 使用互斥锁的计数器
type MutexCounter struct {
mu sync.Mutex
value int
}
func (c *MutexCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *MutexCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 使用原子操作的计数器
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func main() {
const numGoroutines = 100
const numOperations = 10000
// 测试互斥锁计数器
mutexCounter := &MutexCounter{}
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < numOperations; j++ {
mutexCounter.Increment()
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("MutexCounter: %d operations took %v\n", numGoroutines*numOperations, elapsed)
fmt.Printf("Final value: %d\n", mutexCounter.Value())
// 测试原子操作计数器
atomicCounter := &AtomicCounter{}
start = time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < numOperations; j++ {
atomicCounter.Increment()
}
}()
}
wg.Wait()
elapsed = time.Since(start)
fmt.Printf("AtomicCounter: %d operations took %v\n", numGoroutines*numOperations, elapsed)
fmt.Printf("Final value: %d\n", atomicCounter.Value())
}9.2 进阶练习
题目:实现一个并发安全的缓存 解题思路:
- 使用互斥锁或读写锁保护缓存数据
- 实现基本的 Get、Set、Delete 操作
- 测试并发场景下的正确性和性能
常见误区:
- 未正确使用锁,导致数据竞争
- 锁的粒度太粗,导致性能下降
- 内存泄漏,缓存数据没有过期机制
分步提示:
- 定义缓存结构,使用 map 存储数据
- 使用读写锁保护 map 的访问
- 实现 Get、Set、Delete 方法
- 编写并发测试代码,模拟多个 goroutine 同时访问缓存
- 验证缓存操作的正确性
- 分析性能瓶颈
参考代码:
go
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
data map[string]interface{}
mu sync.RWMutex
}
func NewCache() *Cache {
return &Cache{
data: make(map[string]interface{}),
}
}
func (c *Cache) Get(key string) (interface{}, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
value, ok := c.data[key]
return value, ok
}
func (c *Cache) Set(key string, value interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}
func (c *Cache) Delete(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data, key)
}
func (c *Cache) Len() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.data)
}
func main() {
cache := NewCache()
var wg sync.WaitGroup
const numGoroutines = 100
const numOperations = 1000
start := time.Now()
// 并发写入
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key-%d-%d", i, j)
value := fmt.Sprintf("value-%d-%d", i, j)
cache.Set(key, value)
}
}(i)
}
// 并发读取
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
key := fmt.Sprintf("key-%d-%d", i, j)
cache.Get(key)
}
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Cache operations took %v\n", elapsed)
fmt.Printf("Cache size: %d\n", cache.Len())
}9.3 挑战练习
题目:实现一个并发安全的工作队列,支持任务的添加、执行和取消 解题思路:
- 使用通道实现工作队列
- 支持添加任务到队列
- 支持并发执行任务
- 支持取消任务
- 测试并发场景下的正确性和性能
常见误区:
- 通道操作不当,导致死锁或阻塞
- 任务取消机制实现复杂
- 并发控制不当,导致资源竞争
分步提示:
- 定义任务结构和工作队列结构
- 实现任务添加方法
- 实现任务执行方法,使用 goroutine 并发执行
- 实现任务取消机制
- 编写并发测试代码,模拟多个 goroutine 同时添加和取消任务
- 验证工作队列的正确性
- 分析性能瓶颈
参考代码:
go
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Task struct {
ID string
Func func() error
Cancel context.CancelFunc
Context context.Context
}
type WorkQueue struct {
tasks chan *Task
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup
numWorkers int
}
func NewWorkQueue(numWorkers int) *WorkQueue {
ctx, cancel := context.WithCancel(context.Background())
return &WorkQueue{
tasks: make(chan *Task, 100),
cancel: cancel,
ctx: ctx,
numWorkers: numWorkers,
}
}
func (wq *WorkQueue) Start() {
for i := 0; i < wq.numWorkers; i++ {
wq.wg.Add(1)
go wq.worker()
}
}
func (wq *WorkQueue) worker() {
defer wq.wg.Done()
for {
select {
case <-wq.ctx.Done():
return
case task, ok := <-wq.tasks:
if !ok {
return
}
select {
case <-task.Context.Done():
fmt.Printf("Task %s cancelled\n", task.ID)
default:
if err := task.Func(); err != nil {
fmt.Printf("Task %s failed: %v\n", task.ID, err)
} else {
fmt.Printf("Task %s completed\n", task.ID)
}
}
}
}
}
func (wq *WorkQueue) AddTask(id string, f func() error) *Task {
ctx, cancel := context.WithCancel(wq.ctx)
task := &Task{
ID: id,
Func: f,
Cancel: cancel,
Context: ctx,
}
select {
case <-wq.ctx.Done():
return nil
case wq.tasks <- task:
return task
}
}
func (wq *WorkQueue) Stop() {
wq.cancel()
close(wq.tasks)
wq.wg.Wait()
}
func main() {
wq := NewWorkQueue(3)
wq.Start()
// 添加任务
tasks := make([]*Task, 10)
for i := 0; i < 10; i++ {
id := fmt.Sprintf("task-%d", i)
tasks[i] = wq.AddTask(id, func() error {
time.Sleep(100 * time.Millisecond)
return nil
})
}
// 取消部分任务
for i := 0; i < 5; i++ {
if tasks[i] != nil {
tasks[i].Cancel()
fmt.Printf("Cancelled task %s\n", tasks[i].ID)
}
}
// 等待一段时间
time.Sleep(1 * time.Second)
// 停止工作队列
wq.Stop()
fmt.Println("Work queue stopped")
}10. 知识点总结
10.1 核心要点
- 并发安全:确保多个 goroutine 能够安全地访问共享资源
- 竞态条件:多个 goroutine 同时访问和修改共享资源导致的问题
- 同步原语:包括互斥锁、读写锁、原子操作和通道
- 内存模型:定义了多 goroutine 之间的内存可见性规则
- Happens-Before 关系:确保操作的顺序和可见性
- race detector:用于检测数据竞争问题
10.2 易错点回顾
- 数据竞争:未使用同步原语保护共享资源
- 死锁:多个 goroutine 相互等待对方释放资源
- 活锁:过度的重试机制导致无法完成任务
- 锁的粒度过粗:锁的范围过大,导致性能下降
- 错误使用原子操作:与非原子操作混合使用
- 内存泄漏:goroutine 泄漏或资源未正确释放
11. 拓展参考资料
11.1 官方文档链接
11.2 进阶学习路径建议
- 并发模式:学习常见的并发设计模式
- 无锁编程:学习无锁数据结构的设计和实现
- 性能优化:学习如何优化并发程序的性能
- 分布式系统:学习分布式系统中的并发控制
- 测试:学习如何测试并发程序
11.3 推荐资源
- 《Concurrency in Go》by Katherine Cox-Buday
- 《Go 语言实战》中的并发编程章节
- 《The Go Programming Language》中的并发章节
- Go 官方博客关于并发的文章
- 开源项目中的并发编程实践,如 Kubernetes、Docker 等
