Appearance
业务场景实战
1. 概述
在实际业务开发中,并发编程是提高系统性能和响应速度的关键技术。Go语言的并发特性使其成为处理高并发场景的理想选择。本章节将通过实际业务场景,展示如何在真实项目中应用并发设计模式,解决实际问题。
业务场景实战的核心目标是:
- 理解并发编程在实际业务中的应用价值
- 掌握常见业务场景的并发解决方案
- 学习如何根据具体业务需求选择合适的并发模式
- 了解并发编程中的性能优化和最佳实践
通过本章节的学习,读者将能够在实际项目中灵活运用并发编程技术,提高系统的处理能力和可靠性。
2. 基本概念
2.1 语法
在业务场景中,并发编程的基本语法包括:
- Goroutine:使用
go关键字启动并发执行的函数 - Channel:用于 goroutine 之间的通信
- Sync 包:提供互斥锁、读写锁、条件变量等同步原语
- Context:用于控制 goroutine 的生命周期和传递取消信号
基本语法示例:
go
// 启动 goroutine
go func() {
// 并发执行的代码
}()
// 创建通道
ch := make(chan int)
// 发送数据到通道
ch <- 42
// 从通道接收数据
value := <-ch
// 使用互斥锁
var mu sync.Mutex
mu.Lock()
// 临界区代码
mu.Unlock()
// 使用上下文
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()2.2 语义
业务场景中并发编程的语义包括:
- 并发执行:多个任务同时执行,提高系统吞吐量
- 资源共享:多个 goroutine 共享资源时需要同步机制
- 通信:goroutine 之间通过通道进行安全的通信
- 生命周期管理:通过 context 管理 goroutine 的生命周期
- 错误处理:在并发环境中正确处理和传播错误
2.3 规范
在业务场景中使用并发编程时,应遵循以下规范:
- 明确的并发边界:清晰定义哪些操作需要并发执行
- 合理的资源管理:确保所有 goroutine 都能正常退出,避免资源泄漏
- 适当的同步机制:根据场景选择合适的同步原语
- 错误处理机制:设计合理的错误传播和处理机制
- 性能考虑:避免过度并发导致的性能下降
- 可测试性:设计易于测试的并发代码
3. 原理深度解析
3.1 并发模型在业务中的应用
Go语言的并发模型基于 CSP(Communicating Sequential Processes)理论,通过 goroutine 和 channel 实现。在业务场景中,这种模型的优势在于:
- 简单性:使用 channel 进行通信,避免了显式的锁操作
- 安全性:通过 channel 传递数据,减少了共享内存带来的竞态条件
- 灵活性:可以根据业务需求组合不同的并发模式
- 可扩展性:易于构建高并发的系统
3.2 业务场景中的并发模式选择
在实际业务中,选择合适的并发模式至关重要。以下是常见的并发模式及其适用场景:
| 并发模式 | 适用场景 | 优势 |
|---|---|---|
| 生产者-消费者 | 任务队列、消息处理 | 解耦生产者和消费者,平衡处理能力 |
| 工作池 | 并行处理大量任务 | 控制并发度,避免资源耗尽 |
| 扇出-扇入 | 并行处理数据,汇总结果 | 提高处理速度,适用于IO密集型任务 |
| 管道 | 多步骤数据处理 | 模块化设计,易于维护 |
| 竞态检测 | 共享资源访问 | 确保数据一致性 |
3.3 性能优化原理
在业务场景中,并发编程的性能优化需要考虑以下因素:
- 并发度控制:根据系统资源和任务特性,设置合理的并发度
- 内存管理:减少内存分配和垃圾回收开销
- IO 操作优化:使用非阻塞 IO 或异步 IO
- 负载均衡:合理分配任务,避免某些 goroutine 过载
- 避免死锁和活锁:设计合理的资源获取顺序
- 监控和调优:通过监控发现性能瓶颈并进行优化
4. 常见错误与踩坑点
4.1 goroutine 泄漏
错误表现:系统中 goroutine 数量持续增长,内存使用量不断增加
产生原因:
- goroutine 中的通道操作永久阻塞
- 没有正确处理 context 取消信号
- 无限循环中没有退出条件
解决方案:
- 使用 context 控制 goroutine 的生命周期
- 确保所有通道操作都有超时或退出机制
- 定期监控 goroutine 数量
4.2 竞态条件
错误表现:程序行为不确定,数据不一致,偶发性错误
产生原因:多个 goroutine 同时访问和修改共享资源,没有使用适当的同步机制
解决方案:
- 使用互斥锁保护共享资源
- 使用通道传递数据,避免共享内存
- 使用原子操作处理简单的计数器等场景
- 使用竞态检测器(race detector)检测潜在的竞态条件
4.3 死锁
错误表现:程序卡住,无法继续执行
产生原因:
- 多个 goroutine 互相等待对方释放资源
- 通道操作顺序不当
- 锁的获取顺序不一致
解决方案:
- 避免嵌套锁
- 确保通道的发送和接收操作配对
- 使用带缓冲的通道或 select 语句
- 设计合理的资源获取顺序
4.4 错误处理不当
错误表现:错误被忽略,或者错误处理逻辑导致程序崩溃
产生原因:
- 没有正确处理 goroutine 中的错误
- 错误传播机制设计不合理
- panic 没有被 recover
解决方案:
- 使用错误通道传递错误
- 设计统一的错误处理机制
- 在 goroutine 中使用 defer recover() 捕获 panic
- 记录错误日志,便于排查问题
4.5 过度并发
错误表现:系统性能下降,甚至崩溃
产生原因:创建了过多的 goroutine,导致系统资源耗尽
解决方案:
- 使用工作池控制并发度
- 根据系统资源设置合理的 goroutine 数量
- 监控系统资源使用情况
- 优化任务处理逻辑,减少 goroutine 的创建
5. 常见应用场景
5.1 Web 服务器并发处理
场景描述:Web 服务器需要同时处理多个客户端请求,每个请求可能涉及 IO 操作(如数据库查询、文件读写等)
使用方法:
- 为每个请求启动一个 goroutine 处理
- 使用工作池控制并发度
- 使用 context 管理请求的生命周期
- 实现超时机制,避免请求长时间占用资源
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
)
// 工作池
type WorkerPool struct {
tasks chan func()
wg sync.WaitGroup
}
// 创建工作池
func NewWorkerPool(size int) *WorkerPool {
pool := &WorkerPool{
tasks: make(chan func(), 100),
}
// 启动工作者
for i := 0; i < size; i++ {
pool.wg.Add(1)
go func() {
defer pool.wg.Done()
for task := range pool.tasks {
task()
}
}()
}
return pool
}
// 提交任务
func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}
// 关闭工作池
func (p *WorkerPool) Close() {
close(p.tasks)
p.wg.Wait()
}
// 处理 HTTP 请求
func handleRequest(w http.ResponseWriter, r *http.Request) {
// 设置请求超时
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
// 模拟处理时间
select {
case <-time.After(1 * time.Second):
fmt.Fprintf(w, "Hello, World!")
case <-ctx.Done():
http.Error(w, "Request timeout", http.StatusRequestTimeout)
}
}
func main() {
// 创建工作池,大小为 10
pool := NewWorkerPool(10)
defer pool.Close()
// 注册 HTTP 处理函数
http.HandleFunc("/", handleRequest)
// 启动服务器
log.Println("Server starting on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("Server failed:", err)
}
}5.2 数据批处理系统
场景描述:需要处理大量数据,如日志分析、数据转换、ETL 等
使用方法:
- 使用生产者-消费者模式分离数据读取和处理
- 使用工作池并行处理数据
- 使用管道模式组织数据处理流程
- 实现错误处理和重试机制
示例代码:
go
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
)
// 数据项
type DataItem struct {
Path string
Content string
}
// 生产者:读取文件
func producer(directory string) <-chan DataItem {
out := make(chan DataItem)
go func() {
defer close(out)
// 遍历目录中的文件
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == ".txt" {
// 读取文件内容
content, err := os.ReadFile(path)
if err != nil {
log.Printf("Error reading file %s: %v", path, err)
return nil
}
out <- DataItem{
Path: path,
Content: string(content),
}
}
return nil
})
if err != nil {
log.Printf("Error walking directory: %v", err)
}
}()
return out
}
// 处理函数:转换数据
func process(item DataItem) DataItem {
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
// 简单的转换:转为大写
processedContent := item.Content
// 实际应用中可能有更复杂的处理逻辑
return DataItem{
Path: item.Path,
Content: processedContent,
}
}
// 工作池:并行处理数据
func workerPool(input <-chan DataItem, numWorkers int) <-chan DataItem {
out := make(chan DataItem)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range input {
processed := process(item)
out <- processed
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 消费者:保存处理后的数据
func consumer(input <-chan DataItem, outputDir string) {
// 创建输出目录
if err := os.MkdirAll(outputDir, 0755); err != nil {
log.Fatalf("Error creating output directory: %v", err)
}
for item := range input {
// 生成输出文件路径
relPath, err := filepath.Rel(".", item.Path)
if err != nil {
log.Printf("Error getting relative path: %v", err)
continue
}
outputPath := filepath.Join(outputDir, relPath)
// 创建输出目录结构
if err := os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil {
log.Printf("Error creating output directory structure: %v", err)
continue
}
// 写入文件
if err := os.WriteFile(outputPath, []byte(item.Content), 0644); err != nil {
log.Printf("Error writing file %s: %v", outputPath, err)
continue
}
fmt.Printf("Processed file: %s\n", item.Path)
}
}
func main() {
inputDir := "input"
outputDir := "output"
numWorkers := 4
// 构建数据处理管道
dataItems := producer(inputDir)
processedItems := workerPool(dataItems, numWorkers)
consumer(processedItems, outputDir)
fmt.Println("Data processing completed!")
}5.3 实时消息处理系统
场景描述:处理实时消息流,如用户行为数据、传感器数据等
使用方法:
- 使用扇出模式并行处理消息
- 使用时间窗口进行数据聚合
- 使用通道传递消息
- 实现消息确认机制
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/segmentio/kafka-go"
)
// 消息处理函数
func processMessage(msg kafka.Message) error {
// 模拟处理时间
time.Sleep(50 * time.Millisecond)
fmt.Printf("Processed message: %s\n", string(msg.Value))
return nil
}
// 消费者组
func consumerGroup(ctx context.Context, brokers []string, topic string, groupID string, numWorkers int) {
// 创建消费者
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
GroupID: groupID,
StartOffset: kafka.FirstOffset,
})
defer consumer.Close()
// 创建工作池
tasks := make(chan kafka.Message, 100)
var wg sync.WaitGroup
// 启动工作者
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for msg := range tasks {
if err := processMessage(msg); err != nil {
log.Printf("Error processing message: %v", err)
}
}
}()
}
// 消费消息
for {
select {
case <-ctx.Done():
close(tasks)
wg.Wait()
return
default:
msg, err := consumer.ReadMessage(ctx)
if err != nil {
log.Printf("Error reading message: %v", err)
continue
}
tasks <- msg
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
brokers := []string{"localhost:9092"}
topic := "messages"
groupID := "message-processing-group"
numWorkers := 5
// 启动消费者组
log.Println("Starting message processing...")
consumerGroup(ctx, brokers, topic, groupID, numWorkers)
}5.4 分布式任务协调系统
场景描述:在分布式系统中协调多个节点的任务执行,确保任务不重复执行,处理节点故障等情况
使用方法:
- 使用分布式锁确保任务只被一个节点执行
- 使用心跳机制检测节点健康状态
- 使用重试机制处理临时故障
- 使用状态机管理任务生命周期
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// 分布式锁
type DistributedLock struct {
client *redis.Client
key string
value string
ttl time.Duration
}
// 创建分布式锁
func NewDistributedLock(client *redis.Client, key string, ttl time.Duration) *DistributedLock {
return &DistributedLock{
client: client,
key: key,
value: fmt.Sprintf("%d", time.Now().UnixNano()),
ttl: ttl,
}
}
// 获取锁
func (l *DistributedLock) Acquire(ctx context.Context) (bool, error) {
return l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
}
// 释放锁
func (l *DistributedLock) Release(ctx context.Context) error {
// 使用 Lua 脚本确保原子操作
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
_, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
return err
}
// 任务管理器
type TaskManager struct {
client *redis.Client
tasks chan string
wg sync.WaitGroup
}
// 创建任务管理器
func NewTaskManager(client *redis.Client, numWorkers int) *TaskManager {
manager := &TaskManager{
client: client,
tasks: make(chan string, 100),
}
// 启动工作者
for i := 0; i < numWorkers; i++ {
manager.wg.Add(1)
go manager.worker()
}
return manager
}
// 工作者函数
func (m *TaskManager) worker() {
defer m.wg.Done()
for taskID := range m.tasks {
m.processTask(taskID)
}
}
// 处理任务
func (m *TaskManager) processTask(taskID string) {
ctx := context.Background()
// 获取分布式锁
lock := NewDistributedLock(m.client, fmt.Sprintf("task:%s:lock", taskID), 10*time.Second)
acquired, err := lock.Acquire(ctx)
if err != nil {
log.Printf("Error acquiring lock for task %s: %v", taskID, err)
return
}
if !acquired {
log.Printf("Task %s is already being processed by another node", taskID)
return
}
defer lock.Release(ctx)
// 检查任务状态
status, err := m.client.Get(ctx, fmt.Sprintf("task:%s:status", taskID)).Result()
if err == redis.Nil {
// 任务不存在,创建
m.client.Set(ctx, fmt.Sprintf("task:%s:status", taskID), "processing", 0)
} else if err != nil {
log.Printf("Error getting task status: %v", err)
return
} else if status == "completed" {
log.Printf("Task %s is already completed", taskID)
return
}
// 处理任务
log.Printf("Processing task %s", taskID)
time.Sleep(2 * time.Second) // 模拟处理时间
// 更新任务状态
m.client.Set(ctx, fmt.Sprintf("task:%s:status", taskID), "completed", 0)
log.Printf("Task %s completed", taskID)
}
// 提交任务
func (m *TaskManager) Submit(taskID string) {
m.tasks <- taskID
}
// 关闭任务管理器
func (m *TaskManager) Close() {
close(m.tasks)
m.wg.Wait()
}
func main() {
// 连接 Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer client.Close()
// 创建任务管理器
manager := NewTaskManager(client, 3)
defer manager.Close()
// 提交任务
for i := 1; i <= 10; i++ {
taskID := fmt.Sprintf("task-%d", i)
manager.Submit(taskID)
}
// 等待任务完成
time.Sleep(10 * time.Second)
fmt.Println("Task submission completed!")
}5.5 性能监控系统
场景描述:监控系统性能指标,如 CPU 使用率、内存使用量、网络流量等
使用方法:
- 使用 goroutine 定期收集指标
- 使用通道传递指标数据
- 使用时间窗口进行数据聚合
- 实现告警机制
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
)
// 性能指标
type Metric struct {
Name string
Value float64
Timestamp time.Time
}
// 指标收集器
type MetricCollector struct {
metrics chan Metric
wg sync.WaitGroup
}
// 创建指标收集器
func NewMetricCollector(bufferSize int) *MetricCollector {
return &MetricCollector{
metrics: make(chan Metric, bufferSize),
}
}
// 启动 CPU 使用率收集
func (c *MetricCollector) StartCPUCollection(ctx context.Context, interval time.Duration) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 模拟 CPU 使用率收集
var m runtime.MemStats
runtime.ReadMemStats(&m)
cpuUsage := float64(m.Alloc) / float64(m.Sys) * 100
c.metrics <- Metric{
Name: "cpu_usage",
Value: cpuUsage,
Timestamp: time.Now(),
}
}
}
}()
}
// 启动内存使用量收集
func (c *MetricCollector) StartMemoryCollection(ctx context.Context, interval time.Duration) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 收集内存使用量
var m runtime.MemStats
runtime.ReadMemStats(&m)
memoryUsage := float64(m.Alloc) / 1024 / 1024 // MB
c.metrics <- Metric{
Name: "memory_usage",
Value: memoryUsage,
Timestamp: time.Now(),
}
}
}
}()
}
// 获取指标通道
func (c *MetricCollector) Metrics() <-chan Metric {
return c.metrics
}
// 停止收集
func (c *MetricCollector) Stop() {
c.wg.Wait()
close(c.metrics)
}
// 指标处理器
func processMetrics(metrics <-chan Metric) {
// 按指标名称分组
metricGroups := make(map[string][]Metric)
for metric := range metrics {
metricGroups[metric.Name] = append(metricGroups[metric.Name], metric)
// 每收到 5 个指标,计算平均值
if len(metricGroups[metric.Name]) >= 5 {
var sum float64
for _, m := range metricGroups[metric.Name] {
sum += m.Value
}
avg := sum / float64(len(metricGroups[metric.Name]))
fmt.Printf("%s average: %.2f\n", metric.Name, avg)
// 检查是否需要告警
if metric.Name == "cpu_usage" && avg > 80 {
log.Printf("ALERT: CPU usage too high: %.2f%%", avg)
}
if metric.Name == "memory_usage" && avg > 500 {
log.Printf("ALERT: Memory usage too high: %.2f MB", avg)
}
// 清空组,开始新的统计周期
metricGroups[metric.Name] = nil
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建指标收集器
collector := NewMetricCollector(100)
// 启动指标收集
collector.StartCPUCollection(ctx, 1*time.Second)
collector.StartMemoryCollection(ctx, 1*time.Second)
// 处理指标
go processMetrics(collector.Metrics())
// 运行一段时间
time.Sleep(30 * time.Second)
// 停止收集
cancel()
collector.Stop()
fmt.Println("Monitoring stopped!")
}6. 企业级进阶应用场景
6.1 高并发 API 网关
场景描述:API 网关需要处理大量并发请求,进行路由、认证、限流等操作
使用方法:
- 使用工作池处理请求
- 实现请求限流和熔断机制
- 使用缓存减少后端服务调用
- 实现分布式追踪
- 使用负载均衡分发请求
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"golang.org/x/time/rate"
)
// 限流中间件
func rateLimiter(limit int) gin.HandlerFunc {
// 每个 IP 的限流器
limiters := make(map[string]*rate.Limiter)
var mu sync.Mutex
return func(c *gin.Context) {
ip := c.ClientIP()
mu.Lock()
limiter, exists := limiters[ip]
if !exists {
// 每秒允许 limit 个请求
limiter = rate.NewLimiter(rate.Limit(limit), limit)
limiters[ip] = limiter
}
mu.Unlock()
if !limiter.Allow() {
c.JSON(http.StatusTooManyRequests, gin.H{"error": "Rate limit exceeded"})
c.Abort()
return
}
c.Next()
}
}
// 工作池
type WorkerPool struct {
tasks chan func()
wg sync.WaitGroup
}
// 创建工作池
func NewWorkerPool(size int) *WorkerPool {
pool := &WorkerPool{
tasks: make(chan func(), 1000),
}
for i := 0; i < size; i++ {
pool.wg.Add(1)
go func() {
defer pool.wg.Done()
for task := range pool.tasks {
task()
}
}()
}
return pool
}
// 提交任务
func (p *WorkerPool) Submit(task func()) {
p.tasks <- task
}
// 关闭工作池
func (p *WorkerPool) Close() {
close(p.tasks)
p.wg.Wait()
}
// 处理 API 请求
func handleAPIRequest(c *gin.Context) {
// 获取请求参数
id := c.Param("id")
// 模拟 API 处理
time.Sleep(100 * time.Millisecond)
c.JSON(http.StatusOK, gin.H{
"id": id,
"data": fmt.Sprintf("Data for %s", id),
"time": time.Now().Format(time.RFC3339),
})
}
func main() {
// 创建工作池
pool := NewWorkerPool(100)
defer pool.Close()
// 设置 Gin 模式
gin.SetMode(gin.ReleaseMode)
// 创建 Gin 引擎
r := gin.New()
// 添加中间件
r.Use(gin.Logger())
r.Use(gin.Recovery())
r.Use(rateLimiter(10)) // 每秒 10 个请求的限制
// 注册路由
r.GET("/api/:id", func(c *gin.Context) {
// 提交到工作池处理
pool.Submit(func() {
handleAPIRequest(c)
})
})
// 启动服务器
log.Println("API Gateway starting on :8080")
if err := r.Run(":8080"); err != nil {
log.Fatal("Server failed:", err)
}
}6.2 分布式数据处理平台
场景描述:处理海量数据,需要分布式计算能力,如大数据分析、机器学习训练等
使用方法:
- 使用 MapReduce 模式处理数据
- 实现数据分片和并行处理
- 使用分布式存储系统
- 实现任务调度和资源管理
- 处理节点故障和数据一致性
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// 数据分片
type DataShard struct {
ID string
Data []int
}
// Map 函数
func mapFunction(shard DataShard) map[int]int {
result := make(map[int]int)
for _, value := range shard.Data {
result[value]++
}
return result
}
// Reduce 函数
func reduceFunction(results []map[int]int) map[int]int {
finalResult := make(map[int]int)
for _, result := range results {
for key, value := range result {
finalResult[key] += value
}
}
return finalResult
}
// 分布式数据处理
func processData(data []int, numWorkers int) map[int]int {
// 数据分片
shards := make([]DataShard, 0, numWorkers)
shardSize := len(data) / numWorkers
for i := 0; i < numWorkers; i++ {
start := i * shardSize
end := (i + 1) * shardSize
if i == numWorkers-1 {
end = len(data)
}
shards = append(shards, DataShard{
ID: fmt.Sprintf("shard-%d", i),
Data: data[start:end],
})
}
// 并行处理
var wg sync.WaitGroup
results := make([]map[int]int, len(shards))
var mu sync.Mutex
for i, shard := range shards {
wg.Add(1)
go func(idx int, s DataShard) {
defer wg.Done()
// 模拟处理时间
time.Sleep(1 * time.Second)
result := mapFunction(s)
mu.Lock()
results[idx] = result
mu.Unlock()
log.Printf("Processed shard %s", s.ID)
}(i, shard)
}
wg.Wait()
// 合并结果
return reduceFunction(results)
}
func main() {
// 生成测试数据
data := make([]int, 1000000)
for i := range data {
data[i] = i % 1000
}
numWorkers := 4
log.Println("Starting distributed data processing...")
start := time.Now()
result := processData(data, numWorkers)
duration := time.Since(start)
log.Printf("Processing completed in %v", duration)
log.Printf("Result count: %d", len(result))
// 打印前 10 个结果
count := 0
for key, value := range result {
fmt.Printf("%d: %d\n", key, value)
count++
if count >= 10 {
break
}
}
}6.3 实时推荐系统
场景描述:根据用户行为实时生成推荐内容,需要低延迟处理
使用方法:
- 使用流处理框架处理用户行为数据
- 实现实时特征提取
- 使用机器学习模型进行预测
- 实现缓存机制减少计算开销
- 使用消息队列解耦各个组件
示例代码:
go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/segmentio/kafka-go"
)
// 用户行为事件
type UserEvent struct {
UserID string `json:"user_id"`
ItemID string `json:"item_id"`
EventType string `json:"event_type"` // view, click, purchase
Timestamp time.Time `json:"timestamp"`
}
// 推荐结果
type Recommendation struct {
UserID string `json:"user_id"`
ItemIDs []string `json:"item_ids"`
Score float64 `json:"score"`
}
// 推荐引擎
type RecommendationEngine struct {
model *MockModel
userCache map[string][]string
cacheMutex sync.RWMutex
}
// 模拟推荐模型
type MockModel struct{}
// 预测推荐
func (m *MockModel) Predict(userID string, recentItems []string) []string {
// 模拟模型预测
time.Sleep(50 * time.Millisecond)
// 返回模拟推荐结果
recommendations := []string{"item1", "item2", "item3", "item4", "item5"}
return recommendations
}
// 创建推荐引擎
func NewRecommendationEngine() *RecommendationEngine {
return &RecommendationEngine{
model: &MockModel{},
userCache: make(map[string][]string),
}
}
// 处理用户事件
func (e *RecommendationEngine) ProcessEvent(event UserEvent) Recommendation {
// 更新用户行为缓存
e.cacheMutex.Lock()
recentItems := e.userCache[event.UserID]
recentItems = append([]string{event.ItemID}, recentItems...)
if len(recentItems) > 10 {
recentItems = recentItems[:10]
}
e.userCache[event.UserID] = recentItems
e.cacheMutex.Unlock()
// 生成推荐
recommendedItems := e.model.Predict(event.UserID, recentItems)
return Recommendation{
UserID: event.UserID,
ItemIDs: recommendedItems,
Score: 0.95, // 模拟分数
}
}
// 消费用户事件
func consumeEvents(ctx context.Context, reader *kafka.Reader, engine *RecommendationEngine) {
for {
select {
case <-ctx.Done():
return
default:
msg, err := reader.ReadMessage(ctx)
if err != nil {
log.Printf("Error reading message: %v", err)
continue
}
// 解析事件(实际应用中使用 JSON 解析)
var event UserEvent
// json.Unmarshal(msg.Value, &event)
// 模拟事件
event = UserEvent{
UserID: "user1",
ItemID: fmt.Sprintf("item%d", time.Now().UnixNano()%100),
EventType: "click",
Timestamp: time.Now(),
}
// 处理事件并生成推荐
recommendation := engine.ProcessEvent(event)
// 输出推荐结果
fmt.Printf("Recommendation for user %s: %v\n", recommendation.UserID, recommendation.ItemIDs)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 创建 Kafka 读取器
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "user-events",
Partition: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
defer reader.Close()
// 创建推荐引擎
engine := NewRecommendationEngine()
// 启动事件消费
log.Println("Starting recommendation engine...")
consumeEvents(ctx, reader, engine)
}7. 行业最佳实践
7.1 并发度控制
实践内容:根据系统资源和任务特性,设置合理的并发度
推荐理由:
- 过度并发会导致系统资源耗尽,性能下降
- 并发度过低会浪费系统资源,无法充分利用硬件能力
- 合理的并发度可以最大化系统吞吐量
实现方法:
- 使用工作池控制并发度
- 根据 CPU 核心数设置初始并发度
- 通过监控和压测调整并发度
- 实现动态并发度调整机制
7.2 错误处理
实践内容:设计统一的错误处理机制,确保错误能够被及时捕获和处理
推荐理由:
- 并发环境中的错误处理更加复杂
- 未处理的错误可能导致 goroutine 泄漏
- 统一的错误处理机制便于排查问题
实现方法:
- 使用错误通道传递错误
- 在 goroutine 中使用 defer recover() 捕获 panic
- 实现错误日志系统,记录错误详情
- 设计错误重试机制,处理临时故障
7.3 资源管理
实践内容:确保所有资源都能被正确释放,避免资源泄漏
推荐理由:
- 资源泄漏会导致系统性能下降,甚至崩溃
- 并发环境中的资源管理更加复杂
- 正确的资源管理可以提高系统的可靠性
实现方法:
- 使用 context 控制 goroutine 的生命周期
- 使用 defer 语句确保资源被释放
- 定期监控系统资源使用情况
- 实现资源池,减少资源创建和销毁的开销
7.4 监控和可观测性
实践内容:为并发系统添加监控和可观测性机制
推荐理由:
- 并发系统的问题更加难以排查
- 监控可以帮助发现性能瓶颈和潜在问题
- 可观测性可以提高系统的可靠性和可维护性
实现方法:
- 监控 goroutine 数量和状态
- 监控通道的使用情况
- 实现分布式追踪,跟踪请求的处理流程
- 使用指标系统收集性能数据
7.5 测试策略
实践内容:为并发代码设计合理的测试策略
推荐理由:
- 并发代码的测试更加复杂
- 并发问题往往是偶发性的,难以复现
- 合理的测试策略可以提高代码的质量和可靠性
实现方法:
- 使用单元测试测试各个组件
- 使用集成测试测试整个系统
- 使用压力测试测试系统的并发性能
- 使用竞态检测器检测潜在的竞态条件
8. 常见问题答疑(FAQ)
8.1 如何选择合适的并发模式?
问题描述:在不同的业务场景中,如何选择合适的并发模式?
回答内容: 选择合适的并发模式需要考虑以下因素:
- 任务类型:IO 密集型任务适合使用更多的并发,CPU 密集型任务则需要考虑 CPU 核心数
- 数据依赖:如果任务之间有数据依赖,可能需要使用管道模式
- 错误处理:需要考虑错误如何在并发环境中传播和处理
- 资源消耗:需要考虑并发对系统资源的影响
- 可维护性:需要考虑代码的可读性和可维护性
示例代码:
go
// 根据任务类型选择并发模式
func chooseConcurrencyPattern(taskType string) {
switch taskType {
case "io-intensive":
// 使用工作池模式,并发度可以设置得较高
pool := NewWorkerPool(100)
// ...
case "cpu-intensive":
// 使用工作池模式,并发度设置为 CPU 核心数
pool := NewWorkerPool(runtime.NumCPU())
// ...
case "data-processing":
// 使用管道模式
pipeline := createPipeline()
// ...
case "message-handling":
// 使用生产者-消费者模式
producerConsumer := NewProducerConsumer()
// ...
}
}8.2 如何处理并发中的死锁问题?
问题描述:在并发编程中,如何避免和处理死锁问题?
回答内容: 避免死锁的方法:
- 避免嵌套锁:尽量避免在持有一个锁的同时获取另一个锁
- 统一锁的获取顺序:如果需要获取多个锁,按照固定的顺序获取
- 使用带缓冲的通道:避免通道操作阻塞
- 使用 select 语句:为通道操作添加超时机制
- 使用 context:为长时间运行的操作添加取消机制
示例代码:
go
// 避免嵌套锁
func avoidNestedLocks() {
var mu1, mu2 sync.Mutex
// 错误的做法:嵌套锁
// mu1.Lock()
// mu2.Lock()
// ...
// mu2.Unlock()
// mu1.Unlock()
// 正确的做法:统一获取顺序
mu1.Lock()
// 使用 mu1 保护的资源
mu1.Unlock()
mu2.Lock()
// 使用 mu2 保护的资源
mu2.Unlock()
}
// 使用 select 语句避免通道阻塞
func avoidChannelBlock() {
ch := make(chan int)
select {
case ch <- 42:
// 发送成功
case <-time.After(1 * time.Second):
// 发送超时
fmt.Println("Send timeout")
}
}8.3 如何处理并发中的错误?
问题描述:在并发编程中,如何处理和传播错误?
回答内容: 处理并发错误的方法:
- 使用错误通道:创建一个专门的通道用于传递错误
- 使用自定义类型:定义包含数据和错误的结构体
- 使用 context:通过 context 传递错误信息
- 使用 sync.WaitGroup 和 errgroup:等待所有 goroutine 完成并收集错误
- 在 goroutine 中使用 defer recover():捕获 panic,避免程序崩溃
示例代码:
go
// 使用错误通道
func useErrorChannel() {
dataCh := make(chan int)
errCh := make(chan error)
go func() {
defer close(dataCh)
defer close(errCh)
for i := 0; i < 10; i++ {
if i == 5 {
errCh <- fmt.Errorf("error at %d", i)
return
}
dataCh <- i
}
}()
for {
select {
case data, ok := <-dataCh:
if !ok {
return
}
fmt.Println("Received:", data)
case err, ok := <-errCh:
if !ok {
return
}
fmt.Println("Error:", err)
return
}
}
}
// 使用 errgroup
func useErrGroup() {
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 5; i++ {
i := i
g.Go(func() error {
if i == 2 {
return fmt.Errorf("error at %d", i)
}
fmt.Println("Processing:", i)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}8.4 如何优化并发程序的性能?
问题描述:如何优化并发程序的性能?
回答内容: 优化并发程序性能的方法:
- 控制并发度:根据系统资源和任务特性设置合理的并发度
- 减少内存分配:重用对象,减少垃圾回收开销
- 优化通道使用:合理设置通道缓冲区大小
- 避免过度同步:减少锁的使用,使用无锁数据结构
- 使用 profiling 工具:找出性能瓶颈并进行优化
- 实现负载均衡:合理分配任务,避免某些 goroutine 过载
示例代码:
go
// 优化通道使用
func optimizeChannelUsage() {
// 为 IO 密集型任务设置较大的缓冲区
ch := make(chan int, 1000)
go func() {
defer close(ch)
for i := 0; i < 10000; i++ {
ch <- i
}
}()
for i := range ch {
// 处理数据
}
}
// 使用 worker pool 控制并发度
func useWorkerPool() {
// 根据 CPU 核心数设置工作者数量
numWorkers := runtime.NumCPU()
pool := NewWorkerPool(numWorkers)
// 提交任务
for i := 0; i < 1000; i++ {
i := i
pool.Submit(func() {
// 处理任务
fmt.Println("Processing:", i)
})
}
pool.Close()
}8.5 如何测试并发代码?
问题描述:如何测试并发代码,确保其正确性和可靠性?
回答内容: 测试并发代码的方法:
- 单元测试:测试各个组件的功能
- 集成测试:测试整个系统的功能
- 压力测试:测试系统在高并发下的性能
- 竞态检测:使用
-race标志检测竞态条件 - 模糊测试:随机生成输入,测试系统的鲁棒性
- 超时测试:测试系统在超时情况下的行为
示例代码:
go
// 单元测试示例
func TestWorkerPool(t *testing.T) {
pool := NewWorkerPool(2)
defer pool.Close()
var wg sync.WaitGroup
results := make([]int, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
i := i
pool.Submit(func() {
defer wg.Done()
results[i] = i * i
})
}
wg.Wait()
for i, result := range results {
expected := i * i
if result != expected {
t.Errorf("Expected %d, got %d", expected, result)
}
}
}
// 压力测试示例
func BenchmarkWorkerPool(b *testing.B) {
pool := NewWorkerPool(10)
defer pool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.Submit(func() {
time.Sleep(1 * time.Millisecond)
})
}
}8.6 如何在分布式系统中应用并发编程?
问题描述:如何在分布式系统中应用并发编程技术?
回答内容: 在分布式系统中应用并发编程的方法:
- 使用消息队列:如 Kafka、RabbitMQ 等,实现系统间的异步通信
- 使用分布式锁:如 Redis、Zookeeper 等,实现分布式协调
- 使用服务网格:如 Istio、Linkerd 等,管理服务间的通信
- 使用流处理框架:如 Kafka Streams、Apache Flink 等,处理实时数据流
- 实现分布式工作池:在多个节点上分配任务
- 使用一致性哈希:实现负载均衡和数据分布
示例代码:
go
// 使用 Redis 实现分布式锁
func useDistributedLock() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer client.Close()
lock := NewDistributedLock(client, "task-lock", 10*time.Second)
ctx := context.Background()
acquired, err := lock.Acquire(ctx)
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if acquired {
defer lock.Release(ctx)
// 执行任务
fmt.Println("Executing task with distributed lock")
} else {
fmt.Println("Could not acquire lock")
}
}
// 使用 Kafka 实现消息传递
func useKafka() {
// 生产者
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "tasks",
Balancer: &kafka.LeastBytes{},
})
defer producer.Close()
// 发送消息
err := producer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("task1"),
Value: []byte("Task data"),
},
)
if err != nil {
fmt.Println("Error sending message:", err)
}
// 消费者
consumer := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "tasks",
GroupID: "task-consumers",
MinBytes: 10e3,
MaxBytes: 10e6,
})
defer consumer.Close()
// 读取消息
msg, err := consumer.ReadMessage(context.Background())
if err != nil {
fmt.Println("Error reading message:", err)
} else {
fmt.Printf("Received message: %s\n", string(msg.Value))
}
}9. 实战练习
9.1 基础练习:并发文件处理
题目:实现一个并发文件处理程序,完成以下功能:
- 遍历指定目录下的所有文本文件
- 并发读取每个文件的内容
- 统计每个文件中的单词数量
- 汇总所有文件的单词总数
解题思路:
- 使用生产者-消费者模式,生产者遍历文件,消费者处理文件内容
- 使用工作池控制并发度
- 使用通道传递文件路径和处理结果
- 实现错误处理机制
常见误区:
- 并发度过高导致系统资源耗尽
- 没有正确处理文件读取错误
- 没有等待所有 goroutine 完成
分步提示:
- 实现文件遍历函数,生成文件路径
- 实现工作池,处理文件内容
- 实现单词计数函数
- 实现结果汇总函数
- 连接各个组件,构建完整的处理流程
参考代码:
go
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
)
// 文件处理结果
type FileResult struct {
Path string
WordCount int
Error error
}
// 遍历目录,生成文件路径
func walkDirectory(directory string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("Error walking path %s: %v", path, err)
return nil
}
if !info.IsDir() && filepath.Ext(path) == ".txt" {
out <- path
}
return nil
})
if err != nil {
log.Printf("Error walking directory %s: %v", directory, err)
}
}()
return out
}
// 统计文件中的单词数量
func countWords(path string) (int, error) {
content, err := os.ReadFile(path)
if err != nil {
return 0, err
}
words := strings.Fields(string(content))
return len(words), nil
}
// 工作池处理文件
func workerPool(files <-chan string, numWorkers int) <-chan FileResult {
out := make(chan FileResult)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for path := range files {
count, err := countWords(path)
out <- FileResult{
Path: path,
WordCount: count,
Error: err,
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 汇总结果
func summarizeResults(results <-chan FileResult) (int, []error) {
totalWords := 0
var errors []error
for result := range results {
if result.Error != nil {
errors = append(errors, fmt.Errorf("error processing %s: %v", result.Path, result.Error))
} else {
fmt.Printf("%s: %d words\n", result.Path, result.WordCount)
totalWords += result.WordCount
}
}
return totalWords, errors
}
func main() {
directory := "."
numWorkers := 4
// 构建处理流程
files := walkDirectory(directory)
results := workerPool(files, numWorkers)
totalWords, errors := summarizeResults(results)
// 输出结果
fmt.Printf("\nTotal words: %d\n", totalWords)
if len(errors) > 0 {
fmt.Printf("\nErrors encountered:\n")
for _, err := range errors {
fmt.Println(err)
}
}
}9.2 进阶练习:并发 Web 爬虫
题目:实现一个并发 Web 爬虫,完成以下功能:
- 从指定 URL 开始爬取网页
- 提取网页中的链接
- 并发爬取这些链接
- 限制爬取深度和并发度
- 避免重复爬取同一个 URL
解题思路:
- 使用生产者-消费者模式,生产者发现链接,消费者爬取网页
- 使用工作池控制并发度
- 使用集合存储已爬取的 URL,避免重复
- 使用 context 控制爬取深度和超时
- 实现错误处理机制
常见误区:
- 并发度过高导致被目标网站封禁
- 没有处理网络错误和超时
- 没有限制爬取深度,导致无限递归
- 内存使用过高,存储过多 URL
分步提示:
- 实现 URL 去重机制
- 实现网页爬取和链接提取函数
- 实现工作池,控制并发度
- 实现深度控制机制
- 连接各个组件,构建完整的爬虫系统
参考代码:
go
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/url"
"regexp"
"sync"
"time"
)
// 爬取任务
type CrawlTask struct {
URL string
Depth int
}
// 爬取结果
type CrawlResult struct {
URL string
Links []string
Depth int
Error error
}
// URL 去重器
type URLDeduper struct {
urls map[string]bool
mu sync.RWMutex
}
// 创建 URL 去重器
func NewURLDeduper() *URLDeduper {
return &URLDeduper{
urls: make(map[string]bool),
}
}
// 检查 URL 是否已存在
func (d *URLDeduper) Exists(url string) bool {
d.mu.RLock()
defer d.mu.RUnlock()
return d.urls[url]
}
// 添加 URL
func (d *URLDeduper) Add(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.urls[url] {
return false
}
d.urls[url] = true
return true
}
// 提取网页中的链接
func extractLinks(body string, baseURL string) []string {
var links []string
re := regexp.MustCompile(`<a[^>]+href="([^"]+)"`)
matches := re.FindAllStringSubmatch(body, -1)
base, err := url.Parse(baseURL)
if err != nil {
return links
}
for _, match := range matches {
if len(match) < 2 {
continue
}
link := match[1]
parsedLink, err := url.Parse(link)
if err != nil {
continue
}
absoluteURL := base.ResolveReference(parsedLink).String()
links = append(links, absoluteURL)
}
return links
}
// 爬取网页
func crawlURL(url string) (string, error) {
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("HTTP error: %d", resp.StatusCode)
}
// 读取响应体(实际应用中应该限制大小)
buf := make([]byte, 1024*1024) // 1MB 限制
n, err := resp.Body.Read(buf)
if err != nil && err.Error() != "EOF" {
return "", err
}
return string(buf[:n]), nil
}
// 工作池
func workerPool(tasks <-chan CrawlTask, results chan<- CrawlResult, maxDepth int, deduper *URLDeduper, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
if task.Depth > maxDepth {
continue
}
// 爬取网页
body, err := crawlURL(task.URL)
if err != nil {
results <- CrawlResult{
URL: task.URL,
Depth: task.Depth,
Error: err,
}
continue
}
// 提取链接
links := extractLinks(body, task.URL)
results <- CrawlResult{
URL: task.URL,
Links: links,
Depth: task.Depth,
Error: nil,
}
}
}()
}
wg.Wait()
close(results)
}
func main() {
startURL := "https://example.com"
maxDepth := 2
numWorkers := 4
// 创建 URL 去重器
deduper := NewURLDeduper()
// 创建通道
tasks := make(chan CrawlTask, 100)
results := make(chan CrawlResult, 100)
// 启动工作池
go workerPool(tasks, results, maxDepth, deduper, numWorkers)
// 提交初始任务
if deduper.Add(startURL) {
tasks <- CrawlTask{
URL: startURL,
Depth: 1,
}
}
// 处理结果
processed := 0
for result := range results {
processed++
if result.Error != nil {
log.Printf("Error crawling %s: %v", result.URL, result.Error)
} else {
fmt.Printf("Crawled %s (depth: %d, links: %d)\n", result.URL, result.Depth, len(result.Links))
// 提交新任务
for _, link := range result.Links {
if deduper.Add(link) && result.Depth < maxDepth {
tasks <- CrawlTask{
URL: link,
Depth: result.Depth + 1,
}
}
}
}
}
fmt.Printf("Crawling completed. Processed %d URLs.\n", processed)
}9.3 挑战练习:分布式任务调度系统
题目:实现一个简单的分布式任务调度系统,完成以下功能:
- 支持多个工作节点注册到调度器
- 调度器将任务分配给空闲的工作节点
- 工作节点执行任务并返回结果
- 支持任务重试和失败处理
- 支持任务优先级
解题思路:
- 使用 gRPC 实现节点间通信
- 使用分布式锁确保任务只被一个节点执行
- 使用心跳机制检测节点健康状态
- 实现任务队列,支持优先级
- 实现任务状态管理和重试机制
常见误区:
- 节点故障导致任务丢失
- 任务分配不均,某些节点过载
- 网络延迟导致任务执行超时
- 没有处理并发任务冲突
分步提示:
- 定义 gRPC 服务接口
- 实现调度器,管理任务和节点
- 实现工作节点,执行任务
- 实现任务队列和分配算法
- 实现心跳机制和故障检测
- 实现任务状态管理和重试机制
参考代码:
go
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net"
"sort"
"sync"
"time"
"google.golang.org/grpc"
pb "example.com/task-scheduler/proto"
)
// 任务状态
const (
TaskStatusPending = "pending"
TaskStatusRunning = "running"
TaskStatusCompleted = "completed"
TaskStatusFailed = "failed"
)
// 任务
type Task struct {
ID string
Priority int
Status string
Retries int
Data string
}
// 工作节点
type WorkerNode struct {
ID string
Address string
LastHeartbeat time.Time
IsAlive bool
}
// 任务调度器
type TaskScheduler struct {
tasks map[string]*Task
workers map[string]*WorkerNode
taskQueue []string // 优先级队列
mu sync.RWMutex
}
// 创建任务调度器
func NewTaskScheduler() *TaskScheduler {
return &TaskScheduler{
tasks: make(map[string]*Task),
workers: make(map[string]*WorkerNode),
}
}
// 注册工作节点
func (s *TaskScheduler) RegisterWorker(worker *WorkerNode) {
s.mu.Lock()
defer s.mu.Unlock()
s.workers[worker.ID] = worker
log.Printf("Worker registered: %s", worker.ID)
}
// 更新工作节点心跳
func (s *TaskScheduler) UpdateHeartbeat(workerID string) {
s.mu.Lock()
defer s.mu.Unlock()
if worker, exists := s.workers[workerID]; exists {
worker.LastHeartbeat = time.Now()
worker.IsAlive = true
}
}
// 提交任务
func (s *TaskScheduler) SubmitTask(task *Task) {
s.mu.Lock()
defer s.mu.Unlock()
task.Status = TaskStatusPending
s.tasks[task.ID] = task
// 添加到优先级队列
s.taskQueue = append(s.taskQueue, task.ID)
// 简单的优先级排序
sort.Slice(s.taskQueue, func(i, j int) bool {
return s.tasks[s.taskQueue[i]].Priority > s.tasks[s.taskQueue[j]].Priority
})
log.Printf("Task submitted: %s (priority: %d)", task.ID, task.Priority)
}
// 分配任务给工作节点
func (s *TaskScheduler) AssignTask() (string, *Task) {
s.mu.Lock()
defer s.mu.Unlock()
// 找到空闲的工作节点
var availableWorker string
for id, worker := range s.workers {
if worker.IsAlive {
availableWorker = id
break
}
}
if availableWorker == "" {
return "", nil
}
// 从队列中取出任务
if len(s.taskQueue) == 0 {
return "", nil
}
taskID := s.taskQueue[0]
s.taskQueue = s.taskQueue[1:]
task := s.tasks[taskID]
task.Status = TaskStatusRunning
log.Printf("Task assigned: %s to worker: %s", taskID, availableWorker)
return availableWorker, task
}
// 更新任务状态
func (s *TaskScheduler) UpdateTaskStatus(taskID string, status string) {
s.mu.Lock()
defer s.mu.Unlock()
if task, exists := s.tasks[taskID]; exists {
task.Status = status
if status == TaskStatusFailed {
task.Retries++
if task.Retries < 3 { // 最多重试3次
task.Status = TaskStatusPending
s.taskQueue = append(s.taskQueue, taskID)
// 重新排序
sort.Slice(s.taskQueue, func(i, j int) bool {
return s.tasks[s.taskQueue[i]].Priority > s.tasks[s.taskQueue[j]].Priority
})
log.Printf("Task failed, retrying: %s (retry: %d)", taskID, task.Retries)
}
}
log.Printf("Task status updated: %s -> %s", taskID, status)
}
}
// 清理过期的工作节点
func (s *TaskScheduler) CleanupWorkers() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for id, worker := range s.workers {
if now.Sub(worker.LastHeartbeat) > 30*time.Second {
worker.IsAlive = false
log.Printf("Worker marked as dead: %s", id)
}
}
}
func main() {
// 创建任务调度器
scheduler := NewTaskScheduler()
// 启动心跳清理
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
scheduler.CleanupWorkers()
}
}()
// 模拟工作节点注册
go func() {
scheduler.RegisterWorker(&WorkerNode{
ID: "worker-1",
Address: "localhost:50051",
})
// 模拟心跳
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
scheduler.UpdateHeartbeat("worker-1")
}
}()
// 提交任务
for i := 1; i <= 10; i++ {
task := &Task{
ID: fmt.Sprintf("task-%d", i),
Priority: i % 5, // 优先级 0-4
Data: fmt.Sprintf("Task data %d", i),
}
scheduler.SubmitTask(task)
}
// 模拟任务分配
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
workerID, task := scheduler.AssignTask()
if task != nil {
// 模拟任务执行
go func(wID string, t *Task) {
time.Sleep(2 * time.Second)
// 模拟随机失败
if rand.Float32() < 0.3 {
scheduler.UpdateTaskStatus(t.ID, TaskStatusFailed)
} else {
scheduler.UpdateTaskStatus(t.ID, TaskStatusCompleted)
}
}(workerID, task)
}
}
}()
// 运行一段时间
time.Sleep(30 * time.Second)
fmt.Println("Task scheduling simulation completed!")
}10. 知识点总结
10.1 核心要点
- 并发编程在业务中的应用:并发编程是提高系统性能和响应速度的关键技术,广泛应用于Web服务器、数据处理、实时系统等场景
- 并发模式选择:根据任务类型和业务需求选择合适的并发模式,如生产者-消费者、工作池、扇出-扇入、管道等
- 错误处理:在并发环境中,需要设计合理的错误传播机制,确保错误能够被及时捕获和处理
- 资源管理:确保所有goroutine都能正常退出,避免资源泄漏,使用context控制goroutine的生命周期
- 性能优化:根据系统资源和任务特性,设置合理的并发度,优化内存使用和IO操作
- 监控和可观测性:为并发系统添加监控和可观测性机制,便于排查问题和优化性能
10.2 易错点回顾
- goroutine泄漏:没有正确处理context取消信号或通道操作阻塞,导致goroutine无法退出
- 竞态条件:多个goroutine同时访问和修改共享资源,没有使用适当的同步机制
- 死锁:多个goroutine互相等待对方释放资源,或通道操作顺序不当
- 错误处理不当:错误被忽略,或错误处理逻辑导致程序崩溃
- 过度并发:创建过多的goroutine,导致系统资源耗尽
- 分布式协调:在分布式系统中,没有正确处理节点故障和网络延迟
11. 拓展参考资料
11.1 官方文档链接
11.2 进阶学习路径建议
- Go 语言基础:掌握 Go 语言的基本语法和特性
- 并发编程:学习 goroutine、channel、sync 包等并发原语
- 设计模式:学习常见的并发设计模式,如生产者-消费者、工作池、扇出-扇入、管道等
- 分布式系统:学习分布式系统的基本概念和原理
- 性能优化:学习如何优化 Go 程序的性能
- 实战项目:参与实际项目,应用并发编程技术解决实际问题
11.3 推荐书籍
- 《Go 语言实战》
- 《Go 并发编程实战》
- 《Effective Go》
- 《Concurrency in Go》
- 《Distributed Systems》
