Skip to content

业务场景实战

1. 概述

在实际业务开发中,并发编程是提高系统性能和响应速度的关键技术。Go语言的并发特性使其成为处理高并发场景的理想选择。本章节将通过实际业务场景,展示如何在真实项目中应用并发设计模式,解决实际问题。

业务场景实战的核心目标是:

  • 理解并发编程在实际业务中的应用价值
  • 掌握常见业务场景的并发解决方案
  • 学习如何根据具体业务需求选择合适的并发模式
  • 了解并发编程中的性能优化和最佳实践

通过本章节的学习,读者将能够在实际项目中灵活运用并发编程技术,提高系统的处理能力和可靠性。

2. 基本概念

2.1 语法

在业务场景中,并发编程的基本语法包括:

  1. Goroutine:使用 go 关键字启动并发执行的函数
  2. Channel:用于 goroutine 之间的通信
  3. Sync 包:提供互斥锁、读写锁、条件变量等同步原语
  4. Context:用于控制 goroutine 的生命周期和传递取消信号

基本语法示例:

go
// 启动 goroutine
go func() {
    // 并发执行的代码
}()

// 创建通道
ch := make(chan int)

// 发送数据到通道
ch <- 42

// 从通道接收数据
value := <-ch

// 使用互斥锁
var mu sync.Mutex
mu.Lock()
// 临界区代码
mu.Unlock()

// 使用上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

2.2 语义

业务场景中并发编程的语义包括:

  • 并发执行:多个任务同时执行,提高系统吞吐量
  • 资源共享:多个 goroutine 共享资源时需要同步机制
  • 通信:goroutine 之间通过通道进行安全的通信
  • 生命周期管理:通过 context 管理 goroutine 的生命周期
  • 错误处理:在并发环境中正确处理和传播错误

2.3 规范

在业务场景中使用并发编程时,应遵循以下规范:

  1. 明确的并发边界:清晰定义哪些操作需要并发执行
  2. 合理的资源管理:确保所有 goroutine 都能正常退出,避免资源泄漏
  3. 适当的同步机制:根据场景选择合适的同步原语
  4. 错误处理机制:设计合理的错误传播和处理机制
  5. 性能考虑:避免过度并发导致的性能下降
  6. 可测试性:设计易于测试的并发代码

3. 原理深度解析

3.1 并发模型在业务中的应用

Go语言的并发模型基于 CSP(Communicating Sequential Processes)理论,通过 goroutine 和 channel 实现。在业务场景中,这种模型的优势在于:

  1. 简单性:使用 channel 进行通信,避免了显式的锁操作
  2. 安全性:通过 channel 传递数据,减少了共享内存带来的竞态条件
  3. 灵活性:可以根据业务需求组合不同的并发模式
  4. 可扩展性:易于构建高并发的系统

3.2 业务场景中的并发模式选择

在实际业务中,选择合适的并发模式至关重要。以下是常见的并发模式及其适用场景:

并发模式适用场景优势
生产者-消费者任务队列、消息处理解耦生产者和消费者,平衡处理能力
工作池并行处理大量任务控制并发度,避免资源耗尽
扇出-扇入并行处理数据,汇总结果提高处理速度,适用于IO密集型任务
管道多步骤数据处理模块化设计,易于维护
竞态检测共享资源访问确保数据一致性

3.3 性能优化原理

在业务场景中,并发编程的性能优化需要考虑以下因素:

  1. 并发度控制:根据系统资源和任务特性,设置合理的并发度
  2. 内存管理:减少内存分配和垃圾回收开销
  3. IO 操作优化:使用非阻塞 IO 或异步 IO
  4. 负载均衡:合理分配任务,避免某些 goroutine 过载
  5. 避免死锁和活锁:设计合理的资源获取顺序
  6. 监控和调优:通过监控发现性能瓶颈并进行优化

4. 常见错误与踩坑点

4.1 goroutine 泄漏

错误表现:系统中 goroutine 数量持续增长,内存使用量不断增加

产生原因

  • goroutine 中的通道操作永久阻塞
  • 没有正确处理 context 取消信号
  • 无限循环中没有退出条件

解决方案

  • 使用 context 控制 goroutine 的生命周期
  • 确保所有通道操作都有超时或退出机制
  • 定期监控 goroutine 数量

4.2 竞态条件

错误表现:程序行为不确定,数据不一致,偶发性错误

产生原因:多个 goroutine 同时访问和修改共享资源,没有使用适当的同步机制

解决方案

  • 使用互斥锁保护共享资源
  • 使用通道传递数据,避免共享内存
  • 使用原子操作处理简单的计数器等场景
  • 使用竞态检测器(race detector)检测潜在的竞态条件

4.3 死锁

错误表现:程序卡住,无法继续执行

产生原因

  • 多个 goroutine 互相等待对方释放资源
  • 通道操作顺序不当
  • 锁的获取顺序不一致

解决方案

  • 避免嵌套锁
  • 确保通道的发送和接收操作配对
  • 使用带缓冲的通道或 select 语句
  • 设计合理的资源获取顺序

4.4 错误处理不当

错误表现:错误被忽略,或者错误处理逻辑导致程序崩溃

产生原因

  • 没有正确处理 goroutine 中的错误
  • 错误传播机制设计不合理
  • panic 没有被 recover

解决方案

  • 使用错误通道传递错误
  • 设计统一的错误处理机制
  • 在 goroutine 中使用 defer recover() 捕获 panic
  • 记录错误日志,便于排查问题

4.5 过度并发

错误表现:系统性能下降,甚至崩溃

产生原因:创建了过多的 goroutine,导致系统资源耗尽

解决方案

  • 使用工作池控制并发度
  • 根据系统资源设置合理的 goroutine 数量
  • 监控系统资源使用情况
  • 优化任务处理逻辑,减少 goroutine 的创建

5. 常见应用场景

5.1 Web 服务器并发处理

场景描述:Web 服务器需要同时处理多个客户端请求,每个请求可能涉及 IO 操作(如数据库查询、文件读写等)

使用方法

  • 为每个请求启动一个 goroutine 处理
  • 使用工作池控制并发度
  • 使用 context 管理请求的生命周期
  • 实现超时机制,避免请求长时间占用资源

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

// 工作池
 type WorkerPool struct {
    tasks chan func()
    wg    sync.WaitGroup
}

// 创建工作池
func NewWorkerPool(size int) *WorkerPool {
    pool := &WorkerPool{
        tasks: make(chan func(), 100),
    }
    
    // 启动工作者
    for i := 0; i < size; i++ {
        pool.wg.Add(1)
        go func() {
            defer pool.wg.Done()
            for task := range pool.tasks {
                task()
            }
        }()
    }
    
    return pool
}

// 提交任务
func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

// 关闭工作池
func (p *WorkerPool) Close() {
    close(p.tasks)
    p.wg.Wait()
}

// 处理 HTTP 请求
func handleRequest(w http.ResponseWriter, r *http.Request) {
    // 设置请求超时
    ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
    defer cancel()
    
    // 模拟处理时间
    select {
    case <-time.After(1 * time.Second):
        fmt.Fprintf(w, "Hello, World!")
    case <-ctx.Done():
        http.Error(w, "Request timeout", http.StatusRequestTimeout)
    }
}

func main() {
    // 创建工作池,大小为 10
    pool := NewWorkerPool(10)
    defer pool.Close()
    
    // 注册 HTTP 处理函数
    http.HandleFunc("/", handleRequest)
    
    // 启动服务器
    log.Println("Server starting on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal("Server failed:", err)
    }
}

5.2 数据批处理系统

场景描述:需要处理大量数据,如日志分析、数据转换、ETL 等

使用方法

  • 使用生产者-消费者模式分离数据读取和处理
  • 使用工作池并行处理数据
  • 使用管道模式组织数据处理流程
  • 实现错误处理和重试机制

示例代码

go
package main

import (
    "fmt"
    "log"
    "os"
    "path/filepath"
    "sync"
    "time"
)

// 数据项
type DataItem struct {
    Path    string
    Content string
}

// 生产者:读取文件
func producer(directory string) <-chan DataItem {
    out := make(chan DataItem)
    
    go func() {
        defer close(out)
        
        // 遍历目录中的文件
        err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            
            if !info.IsDir() && filepath.Ext(path) == ".txt" {
                // 读取文件内容
                content, err := os.ReadFile(path)
                if err != nil {
                    log.Printf("Error reading file %s: %v", path, err)
                    return nil
                }
                
                out <- DataItem{
                    Path:    path,
                    Content: string(content),
                }
            }
            
            return nil
        })
        
        if err != nil {
            log.Printf("Error walking directory: %v", err)
        }
    }()
    
    return out
}

// 处理函数:转换数据
func process(item DataItem) DataItem {
    // 模拟处理时间
    time.Sleep(100 * time.Millisecond)
    
    // 简单的转换:转为大写
    processedContent := item.Content
    // 实际应用中可能有更复杂的处理逻辑
    
    return DataItem{
        Path:    item.Path,
        Content: processedContent,
    }
}

// 工作池:并行处理数据
func workerPool(input <-chan DataItem, numWorkers int) <-chan DataItem {
    out := make(chan DataItem)
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range input {
                processed := process(item)
                out <- processed
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// 消费者:保存处理后的数据
func consumer(input <-chan DataItem, outputDir string) {
    // 创建输出目录
    if err := os.MkdirAll(outputDir, 0755); err != nil {
        log.Fatalf("Error creating output directory: %v", err)
    }
    
    for item := range input {
        // 生成输出文件路径
        relPath, err := filepath.Rel(".", item.Path)
        if err != nil {
            log.Printf("Error getting relative path: %v", err)
            continue
        }
        
        outputPath := filepath.Join(outputDir, relPath)
        
        // 创建输出目录结构
        if err := os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil {
            log.Printf("Error creating output directory structure: %v", err)
            continue
        }
        
        // 写入文件
        if err := os.WriteFile(outputPath, []byte(item.Content), 0644); err != nil {
            log.Printf("Error writing file %s: %v", outputPath, err)
            continue
        }
        
        fmt.Printf("Processed file: %s\n", item.Path)
    }
}

func main() {
    inputDir := "input"
    outputDir := "output"
    numWorkers := 4
    
    // 构建数据处理管道
    dataItems := producer(inputDir)
    processedItems := workerPool(dataItems, numWorkers)
    consumer(processedItems, outputDir)
    
    fmt.Println("Data processing completed!")
}

5.3 实时消息处理系统

场景描述:处理实时消息流,如用户行为数据、传感器数据等

使用方法

  • 使用扇出模式并行处理消息
  • 使用时间窗口进行数据聚合
  • 使用通道传递消息
  • 实现消息确认机制

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
)

// 消息处理函数
func processMessage(msg kafka.Message) error {
    // 模拟处理时间
    time.Sleep(50 * time.Millisecond)
    fmt.Printf("Processed message: %s\n", string(msg.Value))
    return nil
}

// 消费者组
func consumerGroup(ctx context.Context, brokers []string, topic string, groupID string, numWorkers int) {
    // 创建消费者
    consumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers:     brokers,
        Topic:       topic,
        GroupID:     groupID,
        StartOffset: kafka.FirstOffset,
    })
    defer consumer.Close()
    
    // 创建工作池
    tasks := make(chan kafka.Message, 100)
    var wg sync.WaitGroup
    
    // 启动工作者
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for msg := range tasks {
                if err := processMessage(msg); err != nil {
                    log.Printf("Error processing message: %v", err)
                }
            }
        }()
    }
    
    // 消费消息
    for {
        select {
        case <-ctx.Done():
            close(tasks)
            wg.Wait()
            return
        default:
            msg, err := consumer.ReadMessage(ctx)
            if err != nil {
                log.Printf("Error reading message: %v", err)
                continue
            }
            
            tasks <- msg
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    brokers := []string{"localhost:9092"}
    topic := "messages"
    groupID := "message-processing-group"
    numWorkers := 5
    
    // 启动消费者组
    log.Println("Starting message processing...")
    consumerGroup(ctx, brokers, topic, groupID, numWorkers)
}

5.4 分布式任务协调系统

场景描述:在分布式系统中协调多个节点的任务执行,确保任务不重复执行,处理节点故障等情况

使用方法

  • 使用分布式锁确保任务只被一个节点执行
  • 使用心跳机制检测节点健康状态
  • 使用重试机制处理临时故障
  • 使用状态机管理任务生命周期

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/go-redis/redis/v8"
)

// 分布式锁
 type DistributedLock struct {
    client *redis.Client
    key    string
    value  string
    ttl    time.Duration
}

// 创建分布式锁
func NewDistributedLock(client *redis.Client, key string, ttl time.Duration) *DistributedLock {
    return &DistributedLock{
        client: client,
        key:    key,
        value:  fmt.Sprintf("%d", time.Now().UnixNano()),
        ttl:    ttl,
    }
}

// 获取锁
func (l *DistributedLock) Acquire(ctx context.Context) (bool, error) {
    return l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
}

// 释放锁
func (l *DistributedLock) Release(ctx context.Context) error {
    // 使用 Lua 脚本确保原子操作
    script := `
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
    `
    _, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    return err
}

// 任务管理器
 type TaskManager struct {
    client *redis.Client
    tasks  chan string
    wg     sync.WaitGroup
}

// 创建任务管理器
func NewTaskManager(client *redis.Client, numWorkers int) *TaskManager {
    manager := &TaskManager{
        client: client,
        tasks:  make(chan string, 100),
    }
    
    // 启动工作者
    for i := 0; i < numWorkers; i++ {
        manager.wg.Add(1)
        go manager.worker()
    }
    
    return manager
}

// 工作者函数
func (m *TaskManager) worker() {
    defer m.wg.Done()
    
    for taskID := range m.tasks {
        m.processTask(taskID)
    }
}

// 处理任务
func (m *TaskManager) processTask(taskID string) {
    ctx := context.Background()
    
    // 获取分布式锁
    lock := NewDistributedLock(m.client, fmt.Sprintf("task:%s:lock", taskID), 10*time.Second)
    acquired, err := lock.Acquire(ctx)
    if err != nil {
        log.Printf("Error acquiring lock for task %s: %v", taskID, err)
        return
    }
    
    if !acquired {
        log.Printf("Task %s is already being processed by another node", taskID)
        return
    }
    
    defer lock.Release(ctx)
    
    // 检查任务状态
    status, err := m.client.Get(ctx, fmt.Sprintf("task:%s:status", taskID)).Result()
    if err == redis.Nil {
        // 任务不存在,创建
        m.client.Set(ctx, fmt.Sprintf("task:%s:status", taskID), "processing", 0)
    } else if err != nil {
        log.Printf("Error getting task status: %v", err)
        return
    } else if status == "completed" {
        log.Printf("Task %s is already completed", taskID)
        return
    }
    
    // 处理任务
    log.Printf("Processing task %s", taskID)
    time.Sleep(2 * time.Second) // 模拟处理时间
    
    // 更新任务状态
    m.client.Set(ctx, fmt.Sprintf("task:%s:status", taskID), "completed", 0)
    log.Printf("Task %s completed", taskID)
}

// 提交任务
func (m *TaskManager) Submit(taskID string) {
    m.tasks <- taskID
}

// 关闭任务管理器
func (m *TaskManager) Close() {
    close(m.tasks)
    m.wg.Wait()
}

func main() {
    // 连接 Redis
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer client.Close()
    
    // 创建任务管理器
    manager := NewTaskManager(client, 3)
    defer manager.Close()
    
    // 提交任务
    for i := 1; i <= 10; i++ {
        taskID := fmt.Sprintf("task-%d", i)
        manager.Submit(taskID)
    }
    
    // 等待任务完成
    time.Sleep(10 * time.Second)
    fmt.Println("Task submission completed!")
}

5.5 性能监控系统

场景描述:监控系统性能指标,如 CPU 使用率、内存使用量、网络流量等

使用方法

  • 使用 goroutine 定期收集指标
  • 使用通道传递指标数据
  • 使用时间窗口进行数据聚合
  • 实现告警机制

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "runtime"
    "sync"
    "time"
)

// 性能指标
type Metric struct {
    Name      string
    Value     float64
    Timestamp time.Time
}

// 指标收集器
 type MetricCollector struct {
    metrics chan Metric
    wg      sync.WaitGroup
}

// 创建指标收集器
func NewMetricCollector(bufferSize int) *MetricCollector {
    return &MetricCollector{
        metrics: make(chan Metric, bufferSize),
    }
}

// 启动 CPU 使用率收集
func (c *MetricCollector) StartCPUCollection(ctx context.Context, interval time.Duration) {
    c.wg.Add(1)
    go func() {
        defer c.wg.Done()
        
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 模拟 CPU 使用率收集
                var m runtime.MemStats
                runtime.ReadMemStats(&m)
                cpuUsage := float64(m.Alloc) / float64(m.Sys) * 100
                
                c.metrics <- Metric{
                    Name:      "cpu_usage",
                    Value:     cpuUsage,
                    Timestamp: time.Now(),
                }
            }
        }
    }()
}

// 启动内存使用量收集
func (c *MetricCollector) StartMemoryCollection(ctx context.Context, interval time.Duration) {
    c.wg.Add(1)
    go func() {
        defer c.wg.Done()
        
        ticker := time.NewTicker(interval)
        defer ticker.Stop()
        
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
                // 收集内存使用量
                var m runtime.MemStats
                runtime.ReadMemStats(&m)
                memoryUsage := float64(m.Alloc) / 1024 / 1024 // MB
                
                c.metrics <- Metric{
                    Name:      "memory_usage",
                    Value:     memoryUsage,
                    Timestamp: time.Now(),
                }
            }
        }
    }()
}

// 获取指标通道
func (c *MetricCollector) Metrics() <-chan Metric {
    return c.metrics
}

// 停止收集
func (c *MetricCollector) Stop() {
    c.wg.Wait()
    close(c.metrics)
}

// 指标处理器
func processMetrics(metrics <-chan Metric) {
    // 按指标名称分组
    metricGroups := make(map[string][]Metric)
    
    for metric := range metrics {
        metricGroups[metric.Name] = append(metricGroups[metric.Name], metric)
        
        // 每收到 5 个指标,计算平均值
        if len(metricGroups[metric.Name]) >= 5 {
            var sum float64
            for _, m := range metricGroups[metric.Name] {
                sum += m.Value
            }
            avg := sum / float64(len(metricGroups[metric.Name]))
            
            fmt.Printf("%s average: %.2f\n", metric.Name, avg)
            
            // 检查是否需要告警
            if metric.Name == "cpu_usage" && avg > 80 {
                log.Printf("ALERT: CPU usage too high: %.2f%%", avg)
            }
            if metric.Name == "memory_usage" && avg > 500 {
                log.Printf("ALERT: Memory usage too high: %.2f MB", avg)
            }
            
            // 清空组,开始新的统计周期
            metricGroups[metric.Name] = nil
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 创建指标收集器
    collector := NewMetricCollector(100)
    
    // 启动指标收集
    collector.StartCPUCollection(ctx, 1*time.Second)
    collector.StartMemoryCollection(ctx, 1*time.Second)
    
    // 处理指标
    go processMetrics(collector.Metrics())
    
    // 运行一段时间
    time.Sleep(30 * time.Second)
    
    // 停止收集
    cancel()
    collector.Stop()
    
    fmt.Println("Monitoring stopped!")
}

6. 企业级进阶应用场景

6.1 高并发 API 网关

场景描述:API 网关需要处理大量并发请求,进行路由、认证、限流等操作

使用方法

  • 使用工作池处理请求
  • 实现请求限流和熔断机制
  • 使用缓存减少后端服务调用
  • 实现分布式追踪
  • 使用负载均衡分发请求

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "golang.org/x/time/rate"
)

// 限流中间件
func rateLimiter(limit int) gin.HandlerFunc {
    // 每个 IP 的限流器
    limiters := make(map[string]*rate.Limiter)
    var mu sync.Mutex
    
    return func(c *gin.Context) {
        ip := c.ClientIP()
        
        mu.Lock()
        limiter, exists := limiters[ip]
        if !exists {
            // 每秒允许 limit 个请求
            limiter = rate.NewLimiter(rate.Limit(limit), limit)
            limiters[ip] = limiter
        }
        mu.Unlock()
        
        if !limiter.Allow() {
            c.JSON(http.StatusTooManyRequests, gin.H{"error": "Rate limit exceeded"})
            c.Abort()
            return
        }
        
        c.Next()
    }
}

// 工作池
 type WorkerPool struct {
    tasks chan func()
    wg    sync.WaitGroup
}

// 创建工作池
func NewWorkerPool(size int) *WorkerPool {
    pool := &WorkerPool{
        tasks: make(chan func(), 1000),
    }
    
    for i := 0; i < size; i++ {
        pool.wg.Add(1)
        go func() {
            defer pool.wg.Done()
            for task := range pool.tasks {
                task()
            }
        }()
    }
    
    return pool
}

// 提交任务
func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

// 关闭工作池
func (p *WorkerPool) Close() {
    close(p.tasks)
    p.wg.Wait()
}

// 处理 API 请求
func handleAPIRequest(c *gin.Context) {
    // 获取请求参数
    id := c.Param("id")
    
    // 模拟 API 处理
    time.Sleep(100 * time.Millisecond)
    
    c.JSON(http.StatusOK, gin.H{
        "id":   id,
        "data": fmt.Sprintf("Data for %s", id),
        "time": time.Now().Format(time.RFC3339),
    })
}

func main() {
    // 创建工作池
    pool := NewWorkerPool(100)
    defer pool.Close()
    
    // 设置 Gin 模式
    gin.SetMode(gin.ReleaseMode)
    
    // 创建 Gin 引擎
    r := gin.New()
    
    // 添加中间件
    r.Use(gin.Logger())
    r.Use(gin.Recovery())
    r.Use(rateLimiter(10)) // 每秒 10 个请求的限制
    
    // 注册路由
    r.GET("/api/:id", func(c *gin.Context) {
        // 提交到工作池处理
        pool.Submit(func() {
            handleAPIRequest(c)
        })
    })
    
    // 启动服务器
    log.Println("API Gateway starting on :8080")
    if err := r.Run(":8080"); err != nil {
        log.Fatal("Server failed:", err)
    }
}

6.2 分布式数据处理平台

场景描述:处理海量数据,需要分布式计算能力,如大数据分析、机器学习训练等

使用方法

  • 使用 MapReduce 模式处理数据
  • 实现数据分片和并行处理
  • 使用分布式存储系统
  • 实现任务调度和资源管理
  • 处理节点故障和数据一致性

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"
)

// 数据分片
type DataShard struct {
    ID   string
    Data []int
}

// Map 函数
func mapFunction(shard DataShard) map[int]int {
    result := make(map[int]int)
    for _, value := range shard.Data {
        result[value]++
    }
    return result
}

// Reduce 函数
func reduceFunction(results []map[int]int) map[int]int {
    finalResult := make(map[int]int)
    for _, result := range results {
        for key, value := range result {
            finalResult[key] += value
        }
    }
    return finalResult
}

// 分布式数据处理
func processData(data []int, numWorkers int) map[int]int {
    // 数据分片
    shards := make([]DataShard, 0, numWorkers)
    shardSize := len(data) / numWorkers
    
    for i := 0; i < numWorkers; i++ {
        start := i * shardSize
        end := (i + 1) * shardSize
        if i == numWorkers-1 {
            end = len(data)
        }
        
        shards = append(shards, DataShard{
            ID:   fmt.Sprintf("shard-%d", i),
            Data: data[start:end],
        })
    }
    
    // 并行处理
    var wg sync.WaitGroup
    results := make([]map[int]int, len(shards))
    var mu sync.Mutex
    
    for i, shard := range shards {
        wg.Add(1)
        go func(idx int, s DataShard) {
            defer wg.Done()
            
            // 模拟处理时间
            time.Sleep(1 * time.Second)
            
            result := mapFunction(s)
            
            mu.Lock()
            results[idx] = result
            mu.Unlock()
            
            log.Printf("Processed shard %s", s.ID)
        }(i, shard)
    }
    
    wg.Wait()
    
    // 合并结果
    return reduceFunction(results)
}

func main() {
    // 生成测试数据
    data := make([]int, 1000000)
    for i := range data {
        data[i] = i % 1000
    }
    
    numWorkers := 4
    
    log.Println("Starting distributed data processing...")
    start := time.Now()
    
    result := processData(data, numWorkers)
    
    duration := time.Since(start)
    log.Printf("Processing completed in %v", duration)
    log.Printf("Result count: %d", len(result))
    
    // 打印前 10 个结果
    count := 0
    for key, value := range result {
        fmt.Printf("%d: %d\n", key, value)
        count++
        if count >= 10 {
            break
        }
    }
}

6.3 实时推荐系统

场景描述:根据用户行为实时生成推荐内容,需要低延迟处理

使用方法

  • 使用流处理框架处理用户行为数据
  • 实现实时特征提取
  • 使用机器学习模型进行预测
  • 实现缓存机制减少计算开销
  • 使用消息队列解耦各个组件

示例代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
)

// 用户行为事件
type UserEvent struct {
    UserID     string    `json:"user_id"`
    ItemID     string    `json:"item_id"`
    EventType  string    `json:"event_type"` // view, click, purchase
    Timestamp  time.Time `json:"timestamp"`
}

// 推荐结果
type Recommendation struct {
    UserID  string   `json:"user_id"`
    ItemIDs []string `json:"item_ids"`
    Score   float64  `json:"score"`
}

// 推荐引擎
 type RecommendationEngine struct {
    model      *MockModel
    userCache  map[string][]string
    cacheMutex sync.RWMutex
}

// 模拟推荐模型
type MockModel struct{}

// 预测推荐
func (m *MockModel) Predict(userID string, recentItems []string) []string {
    // 模拟模型预测
    time.Sleep(50 * time.Millisecond)
    
    // 返回模拟推荐结果
    recommendations := []string{"item1", "item2", "item3", "item4", "item5"}
    return recommendations
}

// 创建推荐引擎
func NewRecommendationEngine() *RecommendationEngine {
    return &RecommendationEngine{
        model:     &MockModel{},
        userCache: make(map[string][]string),
    }
}

// 处理用户事件
func (e *RecommendationEngine) ProcessEvent(event UserEvent) Recommendation {
    // 更新用户行为缓存
    e.cacheMutex.Lock()
    recentItems := e.userCache[event.UserID]
    recentItems = append([]string{event.ItemID}, recentItems...)
    if len(recentItems) > 10 {
        recentItems = recentItems[:10]
    }
    e.userCache[event.UserID] = recentItems
    e.cacheMutex.Unlock()
    
    // 生成推荐
    recommendedItems := e.model.Predict(event.UserID, recentItems)
    
    return Recommendation{
        UserID:  event.UserID,
        ItemIDs: recommendedItems,
        Score:   0.95, // 模拟分数
    }
}

// 消费用户事件
func consumeEvents(ctx context.Context, reader *kafka.Reader, engine *RecommendationEngine) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg, err := reader.ReadMessage(ctx)
            if err != nil {
                log.Printf("Error reading message: %v", err)
                continue
            }
            
            // 解析事件(实际应用中使用 JSON 解析)
            var event UserEvent
            // json.Unmarshal(msg.Value, &event)
            // 模拟事件
            event = UserEvent{
                UserID:    "user1",
                ItemID:    fmt.Sprintf("item%d", time.Now().UnixNano()%100),
                EventType: "click",
                Timestamp: time.Now(),
            }
            
            // 处理事件并生成推荐
            recommendation := engine.ProcessEvent(event)
            
            // 输出推荐结果
            fmt.Printf("Recommendation for user %s: %v\n", recommendation.UserID, recommendation.ItemIDs)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 创建 Kafka 读取器
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{"localhost:9092"},
        Topic:     "user-events",
        Partition: 0,
        MinBytes:  10e3,
        MaxBytes:  10e6,
    })
    defer reader.Close()
    
    // 创建推荐引擎
    engine := NewRecommendationEngine()
    
    // 启动事件消费
    log.Println("Starting recommendation engine...")
    consumeEvents(ctx, reader, engine)
}

7. 行业最佳实践

7.1 并发度控制

实践内容:根据系统资源和任务特性,设置合理的并发度

推荐理由

  • 过度并发会导致系统资源耗尽,性能下降
  • 并发度过低会浪费系统资源,无法充分利用硬件能力
  • 合理的并发度可以最大化系统吞吐量

实现方法

  • 使用工作池控制并发度
  • 根据 CPU 核心数设置初始并发度
  • 通过监控和压测调整并发度
  • 实现动态并发度调整机制

7.2 错误处理

实践内容:设计统一的错误处理机制,确保错误能够被及时捕获和处理

推荐理由

  • 并发环境中的错误处理更加复杂
  • 未处理的错误可能导致 goroutine 泄漏
  • 统一的错误处理机制便于排查问题

实现方法

  • 使用错误通道传递错误
  • 在 goroutine 中使用 defer recover() 捕获 panic
  • 实现错误日志系统,记录错误详情
  • 设计错误重试机制,处理临时故障

7.3 资源管理

实践内容:确保所有资源都能被正确释放,避免资源泄漏

推荐理由

  • 资源泄漏会导致系统性能下降,甚至崩溃
  • 并发环境中的资源管理更加复杂
  • 正确的资源管理可以提高系统的可靠性

实现方法

  • 使用 context 控制 goroutine 的生命周期
  • 使用 defer 语句确保资源被释放
  • 定期监控系统资源使用情况
  • 实现资源池,减少资源创建和销毁的开销

7.4 监控和可观测性

实践内容:为并发系统添加监控和可观测性机制

推荐理由

  • 并发系统的问题更加难以排查
  • 监控可以帮助发现性能瓶颈和潜在问题
  • 可观测性可以提高系统的可靠性和可维护性

实现方法

  • 监控 goroutine 数量和状态
  • 监控通道的使用情况
  • 实现分布式追踪,跟踪请求的处理流程
  • 使用指标系统收集性能数据

7.5 测试策略

实践内容:为并发代码设计合理的测试策略

推荐理由

  • 并发代码的测试更加复杂
  • 并发问题往往是偶发性的,难以复现
  • 合理的测试策略可以提高代码的质量和可靠性

实现方法

  • 使用单元测试测试各个组件
  • 使用集成测试测试整个系统
  • 使用压力测试测试系统的并发性能
  • 使用竞态检测器检测潜在的竞态条件

8. 常见问题答疑(FAQ)

8.1 如何选择合适的并发模式?

问题描述:在不同的业务场景中,如何选择合适的并发模式?

回答内容: 选择合适的并发模式需要考虑以下因素:

  1. 任务类型:IO 密集型任务适合使用更多的并发,CPU 密集型任务则需要考虑 CPU 核心数
  2. 数据依赖:如果任务之间有数据依赖,可能需要使用管道模式
  3. 错误处理:需要考虑错误如何在并发环境中传播和处理
  4. 资源消耗:需要考虑并发对系统资源的影响
  5. 可维护性:需要考虑代码的可读性和可维护性

示例代码

go
// 根据任务类型选择并发模式
func chooseConcurrencyPattern(taskType string) {
    switch taskType {
    case "io-intensive":
        // 使用工作池模式,并发度可以设置得较高
        pool := NewWorkerPool(100)
        // ...
    case "cpu-intensive":
        // 使用工作池模式,并发度设置为 CPU 核心数
        pool := NewWorkerPool(runtime.NumCPU())
        // ...
    case "data-processing":
        // 使用管道模式
        pipeline := createPipeline()
        // ...
    case "message-handling":
        // 使用生产者-消费者模式
        producerConsumer := NewProducerConsumer()
        // ...
    }
}

8.2 如何处理并发中的死锁问题?

问题描述:在并发编程中,如何避免和处理死锁问题?

回答内容: 避免死锁的方法:

  1. 避免嵌套锁:尽量避免在持有一个锁的同时获取另一个锁
  2. 统一锁的获取顺序:如果需要获取多个锁,按照固定的顺序获取
  3. 使用带缓冲的通道:避免通道操作阻塞
  4. 使用 select 语句:为通道操作添加超时机制
  5. 使用 context:为长时间运行的操作添加取消机制

示例代码

go
// 避免嵌套锁
func avoidNestedLocks() {
    var mu1, mu2 sync.Mutex
    
    // 错误的做法:嵌套锁
    // mu1.Lock()
    // mu2.Lock()
    // ...
    // mu2.Unlock()
    // mu1.Unlock()
    
    // 正确的做法:统一获取顺序
    mu1.Lock()
    // 使用 mu1 保护的资源
    mu1.Unlock()
    
    mu2.Lock()
    // 使用 mu2 保护的资源
    mu2.Unlock()
}

// 使用 select 语句避免通道阻塞
func avoidChannelBlock() {
    ch := make(chan int)
    
    select {
    case ch <- 42:
        // 发送成功
    case <-time.After(1 * time.Second):
        // 发送超时
        fmt.Println("Send timeout")
    }
}

8.3 如何处理并发中的错误?

问题描述:在并发编程中,如何处理和传播错误?

回答内容: 处理并发错误的方法:

  1. 使用错误通道:创建一个专门的通道用于传递错误
  2. 使用自定义类型:定义包含数据和错误的结构体
  3. 使用 context:通过 context 传递错误信息
  4. 使用 sync.WaitGroup 和 errgroup:等待所有 goroutine 完成并收集错误
  5. 在 goroutine 中使用 defer recover():捕获 panic,避免程序崩溃

示例代码

go
// 使用错误通道
func useErrorChannel() {
    dataCh := make(chan int)
    errCh := make(chan error)
    
    go func() {
        defer close(dataCh)
        defer close(errCh)
        
        for i := 0; i < 10; i++ {
            if i == 5 {
                errCh <- fmt.Errorf("error at %d", i)
                return
            }
            dataCh <- i
        }
    }()
    
    for {
        select {
        case data, ok := <-dataCh:
            if !ok {
                return
            }
            fmt.Println("Received:", data)
        case err, ok := <-errCh:
            if !ok {
                return
            }
            fmt.Println("Error:", err)
            return
        }
    }
}

// 使用 errgroup
func useErrGroup() {
    g, ctx := errgroup.WithContext(context.Background())
    
    for i := 0; i < 5; i++ {
        i := i
        g.Go(func() error {
            if i == 2 {
                return fmt.Errorf("error at %d", i)
            }
            fmt.Println("Processing:", i)
            return nil
        })
    }
    
    if err := g.Wait(); err != nil {
        fmt.Println("Error:", err)
    }
}

8.4 如何优化并发程序的性能?

问题描述:如何优化并发程序的性能?

回答内容: 优化并发程序性能的方法:

  1. 控制并发度:根据系统资源和任务特性设置合理的并发度
  2. 减少内存分配:重用对象,减少垃圾回收开销
  3. 优化通道使用:合理设置通道缓冲区大小
  4. 避免过度同步:减少锁的使用,使用无锁数据结构
  5. 使用 profiling 工具:找出性能瓶颈并进行优化
  6. 实现负载均衡:合理分配任务,避免某些 goroutine 过载

示例代码

go
// 优化通道使用
func optimizeChannelUsage() {
    // 为 IO 密集型任务设置较大的缓冲区
    ch := make(chan int, 1000)
    
    go func() {
        defer close(ch)
        for i := 0; i < 10000; i++ {
            ch <- i
        }
    }()
    
    for i := range ch {
        // 处理数据
    }
}

// 使用 worker pool 控制并发度
func useWorkerPool() {
    // 根据 CPU 核心数设置工作者数量
    numWorkers := runtime.NumCPU()
    pool := NewWorkerPool(numWorkers)
    
    // 提交任务
    for i := 0; i < 1000; i++ {
        i := i
        pool.Submit(func() {
            // 处理任务
            fmt.Println("Processing:", i)
        })
    }
    
    pool.Close()
}

8.5 如何测试并发代码?

问题描述:如何测试并发代码,确保其正确性和可靠性?

回答内容: 测试并发代码的方法:

  1. 单元测试:测试各个组件的功能
  2. 集成测试:测试整个系统的功能
  3. 压力测试:测试系统在高并发下的性能
  4. 竞态检测:使用 -race 标志检测竞态条件
  5. 模糊测试:随机生成输入,测试系统的鲁棒性
  6. 超时测试:测试系统在超时情况下的行为

示例代码

go
// 单元测试示例
func TestWorkerPool(t *testing.T) {
    pool := NewWorkerPool(2)
    defer pool.Close()
    
    var wg sync.WaitGroup
    results := make([]int, 10)
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        i := i
        pool.Submit(func() {
            defer wg.Done()
            results[i] = i * i
        })
    }
    
    wg.Wait()
    
    for i, result := range results {
        expected := i * i
        if result != expected {
            t.Errorf("Expected %d, got %d", expected, result)
        }
    }
}

// 压力测试示例
func BenchmarkWorkerPool(b *testing.B) {
    pool := NewWorkerPool(10)
    defer pool.Close()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        pool.Submit(func() {
            time.Sleep(1 * time.Millisecond)
        })
    }
}

8.6 如何在分布式系统中应用并发编程?

问题描述:如何在分布式系统中应用并发编程技术?

回答内容: 在分布式系统中应用并发编程的方法:

  1. 使用消息队列:如 Kafka、RabbitMQ 等,实现系统间的异步通信
  2. 使用分布式锁:如 Redis、Zookeeper 等,实现分布式协调
  3. 使用服务网格:如 Istio、Linkerd 等,管理服务间的通信
  4. 使用流处理框架:如 Kafka Streams、Apache Flink 等,处理实时数据流
  5. 实现分布式工作池:在多个节点上分配任务
  6. 使用一致性哈希:实现负载均衡和数据分布

示例代码

go
// 使用 Redis 实现分布式锁
func useDistributedLock() {
    client := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer client.Close()
    
    lock := NewDistributedLock(client, "task-lock", 10*time.Second)
    ctx := context.Background()
    
    acquired, err := lock.Acquire(ctx)
    if err != nil {
        fmt.Println("Error acquiring lock:", err)
        return
    }
    
    if acquired {
        defer lock.Release(ctx)
        // 执行任务
        fmt.Println("Executing task with distributed lock")
    } else {
        fmt.Println("Could not acquire lock")
    }
}

// 使用 Kafka 实现消息传递
func useKafka() {
    // 生产者
    producer := kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{"localhost:9092"},
        Topic:    "tasks",
        Balancer: &kafka.LeastBytes{},
    })
    defer producer.Close()
    
    // 发送消息
    err := producer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("task1"),
            Value: []byte("Task data"),
        },
    )
    if err != nil {
        fmt.Println("Error sending message:", err)
    }
    
    // 消费者
    consumer := kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{"localhost:9092"},
        Topic:     "tasks",
        GroupID:   "task-consumers",
        MinBytes:  10e3,
        MaxBytes:  10e6,
    })
    defer consumer.Close()
    
    // 读取消息
    msg, err := consumer.ReadMessage(context.Background())
    if err != nil {
        fmt.Println("Error reading message:", err)
    } else {
        fmt.Printf("Received message: %s\n", string(msg.Value))
    }
}

9. 实战练习

9.1 基础练习:并发文件处理

题目:实现一个并发文件处理程序,完成以下功能:

  1. 遍历指定目录下的所有文本文件
  2. 并发读取每个文件的内容
  3. 统计每个文件中的单词数量
  4. 汇总所有文件的单词总数

解题思路

  • 使用生产者-消费者模式,生产者遍历文件,消费者处理文件内容
  • 使用工作池控制并发度
  • 使用通道传递文件路径和处理结果
  • 实现错误处理机制

常见误区

  • 并发度过高导致系统资源耗尽
  • 没有正确处理文件读取错误
  • 没有等待所有 goroutine 完成

分步提示

  1. 实现文件遍历函数,生成文件路径
  2. 实现工作池,处理文件内容
  3. 实现单词计数函数
  4. 实现结果汇总函数
  5. 连接各个组件,构建完整的处理流程

参考代码

go
package main

import (
    "fmt"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

// 文件处理结果
type FileResult struct {
    Path        string
    WordCount   int
    Error       error
}

// 遍历目录,生成文件路径
func walkDirectory(directory string) <-chan string {
    out := make(chan string)
    
    go func() {
        defer close(out)
        
        err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                log.Printf("Error walking path %s: %v", path, err)
                return nil
            }
            
            if !info.IsDir() && filepath.Ext(path) == ".txt" {
                out <- path
            }
            
            return nil
        })
        
        if err != nil {
            log.Printf("Error walking directory %s: %v", directory, err)
        }
    }()
    
    return out
}

// 统计文件中的单词数量
func countWords(path string) (int, error) {
    content, err := os.ReadFile(path)
    if err != nil {
        return 0, err
    }
    
    words := strings.Fields(string(content))
    return len(words), nil
}

// 工作池处理文件
func workerPool(files <-chan string, numWorkers int) <-chan FileResult {
    out := make(chan FileResult)
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            for path := range files {
                count, err := countWords(path)
                out <- FileResult{
                    Path:      path,
                    WordCount: count,
                    Error:     err,
                }
            }
        }()
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// 汇总结果
func summarizeResults(results <-chan FileResult) (int, []error) {
    totalWords := 0
    var errors []error
    
    for result := range results {
        if result.Error != nil {
            errors = append(errors, fmt.Errorf("error processing %s: %v", result.Path, result.Error))
        } else {
            fmt.Printf("%s: %d words\n", result.Path, result.WordCount)
            totalWords += result.WordCount
        }
    }
    
    return totalWords, errors
}

func main() {
    directory := "."
    numWorkers := 4
    
    // 构建处理流程
    files := walkDirectory(directory)
    results := workerPool(files, numWorkers)
    totalWords, errors := summarizeResults(results)
    
    // 输出结果
    fmt.Printf("\nTotal words: %d\n", totalWords)
    
    if len(errors) > 0 {
        fmt.Printf("\nErrors encountered:\n")
        for _, err := range errors {
            fmt.Println(err)
        }
    }
}

9.2 进阶练习:并发 Web 爬虫

题目:实现一个并发 Web 爬虫,完成以下功能:

  1. 从指定 URL 开始爬取网页
  2. 提取网页中的链接
  3. 并发爬取这些链接
  4. 限制爬取深度和并发度
  5. 避免重复爬取同一个 URL

解题思路

  • 使用生产者-消费者模式,生产者发现链接,消费者爬取网页
  • 使用工作池控制并发度
  • 使用集合存储已爬取的 URL,避免重复
  • 使用 context 控制爬取深度和超时
  • 实现错误处理机制

常见误区

  • 并发度过高导致被目标网站封禁
  • 没有处理网络错误和超时
  • 没有限制爬取深度,导致无限递归
  • 内存使用过高,存储过多 URL

分步提示

  1. 实现 URL 去重机制
  2. 实现网页爬取和链接提取函数
  3. 实现工作池,控制并发度
  4. 实现深度控制机制
  5. 连接各个组件,构建完整的爬虫系统

参考代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "net/url"
    "regexp"
    "sync"
    "time"
)

// 爬取任务
type CrawlTask struct {
    URL   string
    Depth int
}

// 爬取结果
type CrawlResult struct {
    URL     string
    Links   []string
    Depth   int
    Error   error
}

// URL 去重器
type URLDeduper struct {
    urls map[string]bool
    mu   sync.RWMutex
}

// 创建 URL 去重器
func NewURLDeduper() *URLDeduper {
    return &URLDeduper{
        urls: make(map[string]bool),
    }
}

// 检查 URL 是否已存在
func (d *URLDeduper) Exists(url string) bool {
    d.mu.RLock()
    defer d.mu.RUnlock()
    return d.urls[url]
}

// 添加 URL
func (d *URLDeduper) Add(url string) bool {
    d.mu.Lock()
    defer d.mu.Unlock()
    
    if d.urls[url] {
        return false
    }
    
    d.urls[url] = true
    return true
}

// 提取网页中的链接
func extractLinks(body string, baseURL string) []string {
    var links []string
    re := regexp.MustCompile(`<a[^>]+href="([^"]+)"`)
    matches := re.FindAllStringSubmatch(body, -1)
    
    base, err := url.Parse(baseURL)
    if err != nil {
        return links
    }
    
    for _, match := range matches {
        if len(match) < 2 {
            continue
        }
        
        link := match[1]
        parsedLink, err := url.Parse(link)
        if err != nil {
            continue
        }
        
        absoluteURL := base.ResolveReference(parsedLink).String()
        links = append(links, absoluteURL)
    }
    
    return links
}

// 爬取网页
func crawlURL(url string) (string, error) {
    client := &http.Client{
        Timeout: 10 * time.Second,
    }
    
    resp, err := client.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return "", fmt.Errorf("HTTP error: %d", resp.StatusCode)
    }
    
    // 读取响应体(实际应用中应该限制大小)
    buf := make([]byte, 1024*1024) // 1MB 限制
    n, err := resp.Body.Read(buf)
    if err != nil && err.Error() != "EOF" {
        return "", err
    }
    
    return string(buf[:n]), nil
}

// 工作池
func workerPool(tasks <-chan CrawlTask, results chan<- CrawlResult, maxDepth int, deduper *URLDeduper, numWorkers int) {
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            for task := range tasks {
                if task.Depth > maxDepth {
                    continue
                }
                
                // 爬取网页
                body, err := crawlURL(task.URL)
                if err != nil {
                    results <- CrawlResult{
                        URL:   task.URL,
                        Depth: task.Depth,
                        Error: err,
                    }
                    continue
                }
                
                // 提取链接
                links := extractLinks(body, task.URL)
                
                results <- CrawlResult{
                    URL:   task.URL,
                    Links: links,
                    Depth: task.Depth,
                    Error: nil,
                }
            }
        }()
    }
    
    wg.Wait()
    close(results)
}

func main() {
    startURL := "https://example.com"
    maxDepth := 2
    numWorkers := 4
    
    // 创建 URL 去重器
    deduper := NewURLDeduper()
    
    // 创建通道
    tasks := make(chan CrawlTask, 100)
    results := make(chan CrawlResult, 100)
    
    // 启动工作池
    go workerPool(tasks, results, maxDepth, deduper, numWorkers)
    
    // 提交初始任务
    if deduper.Add(startURL) {
        tasks <- CrawlTask{
            URL:   startURL,
            Depth: 1,
        }
    }
    
    // 处理结果
    processed := 0
    for result := range results {
        processed++
        
        if result.Error != nil {
            log.Printf("Error crawling %s: %v", result.URL, result.Error)
        } else {
            fmt.Printf("Crawled %s (depth: %d, links: %d)\n", result.URL, result.Depth, len(result.Links))
            
            // 提交新任务
            for _, link := range result.Links {
                if deduper.Add(link) && result.Depth < maxDepth {
                    tasks <- CrawlTask{
                        URL:   link,
                        Depth: result.Depth + 1,
                    }
                }
            }
        }
    }
    
    fmt.Printf("Crawling completed. Processed %d URLs.\n", processed)
}

9.3 挑战练习:分布式任务调度系统

题目:实现一个简单的分布式任务调度系统,完成以下功能:

  1. 支持多个工作节点注册到调度器
  2. 调度器将任务分配给空闲的工作节点
  3. 工作节点执行任务并返回结果
  4. 支持任务重试和失败处理
  5. 支持任务优先级

解题思路

  • 使用 gRPC 实现节点间通信
  • 使用分布式锁确保任务只被一个节点执行
  • 使用心跳机制检测节点健康状态
  • 实现任务队列,支持优先级
  • 实现任务状态管理和重试机制

常见误区

  • 节点故障导致任务丢失
  • 任务分配不均,某些节点过载
  • 网络延迟导致任务执行超时
  • 没有处理并发任务冲突

分步提示

  1. 定义 gRPC 服务接口
  2. 实现调度器,管理任务和节点
  3. 实现工作节点,执行任务
  4. 实现任务队列和分配算法
  5. 实现心跳机制和故障检测
  6. 实现任务状态管理和重试机制

参考代码

go
package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net"
    "sort"
    "sync"
    "time"

    "google.golang.org/grpc"
    pb "example.com/task-scheduler/proto"
)

// 任务状态
const (
    TaskStatusPending   = "pending"
    TaskStatusRunning   = "running"
    TaskStatusCompleted = "completed"
    TaskStatusFailed    = "failed"
)

// 任务
type Task struct {
    ID       string
    Priority int
    Status   string
    Retries  int
    Data     string
}

// 工作节点
type WorkerNode struct {
    ID        string
    Address   string
    LastHeartbeat time.Time
    IsAlive   bool
}

// 任务调度器
type TaskScheduler struct {
    tasks     map[string]*Task
    workers   map[string]*WorkerNode
    taskQueue []string // 优先级队列
    mu        sync.RWMutex
}

// 创建任务调度器
func NewTaskScheduler() *TaskScheduler {
    return &TaskScheduler{
        tasks:   make(map[string]*Task),
        workers: make(map[string]*WorkerNode),
    }
}

// 注册工作节点
func (s *TaskScheduler) RegisterWorker(worker *WorkerNode) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    s.workers[worker.ID] = worker
    log.Printf("Worker registered: %s", worker.ID)
}

// 更新工作节点心跳
func (s *TaskScheduler) UpdateHeartbeat(workerID string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if worker, exists := s.workers[workerID]; exists {
        worker.LastHeartbeat = time.Now()
        worker.IsAlive = true
    }
}

// 提交任务
func (s *TaskScheduler) SubmitTask(task *Task) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    task.Status = TaskStatusPending
    s.tasks[task.ID] = task
    
    // 添加到优先级队列
    s.taskQueue = append(s.taskQueue, task.ID)
    // 简单的优先级排序
    sort.Slice(s.taskQueue, func(i, j int) bool {
        return s.tasks[s.taskQueue[i]].Priority > s.tasks[s.taskQueue[j]].Priority
    })
    
    log.Printf("Task submitted: %s (priority: %d)", task.ID, task.Priority)
}

// 分配任务给工作节点
func (s *TaskScheduler) AssignTask() (string, *Task) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    // 找到空闲的工作节点
    var availableWorker string
    for id, worker := range s.workers {
        if worker.IsAlive {
            availableWorker = id
            break
        }
    }
    
    if availableWorker == "" {
        return "", nil
    }
    
    // 从队列中取出任务
    if len(s.taskQueue) == 0 {
        return "", nil
    }
    
    taskID := s.taskQueue[0]
    s.taskQueue = s.taskQueue[1:]
    
    task := s.tasks[taskID]
    task.Status = TaskStatusRunning
    
    log.Printf("Task assigned: %s to worker: %s", taskID, availableWorker)
    return availableWorker, task
}

// 更新任务状态
func (s *TaskScheduler) UpdateTaskStatus(taskID string, status string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if task, exists := s.tasks[taskID]; exists {
        task.Status = status
        
        if status == TaskStatusFailed {
            task.Retries++
            if task.Retries < 3 { // 最多重试3次
                task.Status = TaskStatusPending
                s.taskQueue = append(s.taskQueue, taskID)
                // 重新排序
                sort.Slice(s.taskQueue, func(i, j int) bool {
                    return s.tasks[s.taskQueue[i]].Priority > s.tasks[s.taskQueue[j]].Priority
                })
                log.Printf("Task failed, retrying: %s (retry: %d)", taskID, task.Retries)
            }
        }
        
        log.Printf("Task status updated: %s -> %s", taskID, status)
    }
}

// 清理过期的工作节点
func (s *TaskScheduler) CleanupWorkers() {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    now := time.Now()
    for id, worker := range s.workers {
        if now.Sub(worker.LastHeartbeat) > 30*time.Second {
            worker.IsAlive = false
            log.Printf("Worker marked as dead: %s", id)
        }
    }
}

func main() {
    // 创建任务调度器
    scheduler := NewTaskScheduler()
    
    // 启动心跳清理
    go func() {
        ticker := time.NewTicker(10 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            scheduler.CleanupWorkers()
        }
    }()
    
    // 模拟工作节点注册
    go func() {
        scheduler.RegisterWorker(&WorkerNode{
            ID:      "worker-1",
            Address: "localhost:50051",
        })
        
        // 模拟心跳
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            scheduler.UpdateHeartbeat("worker-1")
        }
    }()
    
    // 提交任务
    for i := 1; i <= 10; i++ {
        task := &Task{
            ID:       fmt.Sprintf("task-%d", i),
            Priority: i % 5, // 优先级 0-4
            Data:     fmt.Sprintf("Task data %d", i),
        }
        scheduler.SubmitTask(task)
    }
    
    // 模拟任务分配
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            workerID, task := scheduler.AssignTask()
            if task != nil {
                // 模拟任务执行
                go func(wID string, t *Task) {
                    time.Sleep(2 * time.Second)
                    
                    // 模拟随机失败
                    if rand.Float32() < 0.3 {
                        scheduler.UpdateTaskStatus(t.ID, TaskStatusFailed)
                    } else {
                        scheduler.UpdateTaskStatus(t.ID, TaskStatusCompleted)
                    }
                }(workerID, task)
            }
        }
    }()
    
    // 运行一段时间
    time.Sleep(30 * time.Second)
    fmt.Println("Task scheduling simulation completed!")
}

10. 知识点总结

10.1 核心要点

  • 并发编程在业务中的应用:并发编程是提高系统性能和响应速度的关键技术,广泛应用于Web服务器、数据处理、实时系统等场景
  • 并发模式选择:根据任务类型和业务需求选择合适的并发模式,如生产者-消费者、工作池、扇出-扇入、管道等
  • 错误处理:在并发环境中,需要设计合理的错误传播机制,确保错误能够被及时捕获和处理
  • 资源管理:确保所有goroutine都能正常退出,避免资源泄漏,使用context控制goroutine的生命周期
  • 性能优化:根据系统资源和任务特性,设置合理的并发度,优化内存使用和IO操作
  • 监控和可观测性:为并发系统添加监控和可观测性机制,便于排查问题和优化性能

10.2 易错点回顾

  • goroutine泄漏:没有正确处理context取消信号或通道操作阻塞,导致goroutine无法退出
  • 竞态条件:多个goroutine同时访问和修改共享资源,没有使用适当的同步机制
  • 死锁:多个goroutine互相等待对方释放资源,或通道操作顺序不当
  • 错误处理不当:错误被忽略,或错误处理逻辑导致程序崩溃
  • 过度并发:创建过多的goroutine,导致系统资源耗尽
  • 分布式协调:在分布式系统中,没有正确处理节点故障和网络延迟

11. 拓展参考资料

11.1 官方文档链接

11.2 进阶学习路径建议

  1. Go 语言基础:掌握 Go 语言的基本语法和特性
  2. 并发编程:学习 goroutine、channel、sync 包等并发原语
  3. 设计模式:学习常见的并发设计模式,如生产者-消费者、工作池、扇出-扇入、管道等
  4. 分布式系统:学习分布式系统的基本概念和原理
  5. 性能优化:学习如何优化 Go 程序的性能
  6. 实战项目:参与实际项目,应用并发编程技术解决实际问题

11.3 推荐书籍

  • 《Go 语言实战》
  • 《Go 并发编程实战》
  • 《Effective Go》
  • 《Concurrency in Go》
  • 《Distributed Systems》

11.4 在线资源