Appearance
并发与并行
1. 概述
并发与并行是计算机科学中两个重要的概念,也是 Go 语言并发编程的基础。理解并发与并行的区别和联系,对于掌握 Go 语言的并发编程模型至关重要。本章节将深入探讨并发与并行的概念、原理和实践应用,帮助开发者更好地理解和使用 Go 语言的并发特性。
2. 基本概念
2.1 语法
Go 语言中实现并发与并行的核心语法:
go
// 创建 goroutine(并发执行)
go function()
// 使用 GOMAXPROCS 设置并行度
runtime.GOMAXPROCS(n)2.2 语义
- 并发(Concurrency):指程序的不同部分可以独立执行,不需要等待其他部分完成。并发关注的是程序的结构和设计,允许多个任务在逻辑上同时进行。
- 并行(Parallelism):指程序的不同部分同时执行,需要多个 CPU 核心支持。并行关注的是程序的执行方式,允许多个任务在物理上同时进行。
- GOMAXPROCS:Go 语言中控制并行度的参数,指定可以同时执行的操作系统线程数。
2.3 规范
- 并发设计:合理设计并发结构,避免过度并发导致的性能下降
- 并行度控制:根据硬件资源和任务特性,合理设置 GOMAXPROCS
- 资源管理:在并发和并行场景中,合理管理共享资源,避免竞态条件
- 错误处理:妥善处理并发和并行场景下的错误
3. 原理深度解析
3.1 并发与并行的区别
| 特性 | 并发 | 并行 |
|---|---|---|
| 定义 | 逻辑上的同时执行 | 物理上的同时执行 |
| 依赖 | 不需要多个 CPU 核心 | 需要多个 CPU 核心 |
| 关注 | 程序结构和设计 | 程序执行方式 |
| 目的 | 提高程序的响应性和吞吐量 | 提高程序的执行速度 |
| 实现 | 多任务交替执行 | 多任务同时执行 |
3.2 Go 语言的并发模型
Go 语言采用 M:N 调度模型,将 M 个 goroutine 映射到 N 个操作系统线程上:
- G(Goroutine):轻量级线程,由 Go 运行时管理
- M(Machine):操作系统线程
- P(Processor):处理器,负责调度 goroutine 到 M 上执行
Go 调度器的工作原理:
- 每个 P 维护一个本地 goroutine 队列
- 当 P 的本地队列满时,新的 goroutine 会被放入全局队列
- P 会从本地队列或全局队列中获取 goroutine 执行
- 当 goroutine 阻塞时,P 会寻找其他可执行的 goroutine
3.3 并行度的控制
Go 语言通过 GOMAXPROCS 环境变量或 runtime.GOMAXPROCS() 函数来控制并行度:
- 默认情况下,GOMAXPROCS 等于 CPU 核心数
- 可以根据任务特性和硬件资源调整 GOMAXPROCS
- 对于 CPU 密集型任务,通常设置为 CPU 核心数
- 对于 I/O 密集型任务,可以设置为大于 CPU 核心数
4. 常见错误与踩坑点
4.1 混淆并发与并行
错误表现:认为并发就是并行,或者并行就是并发 产生原因:对两个概念的理解不够清晰 解决方案:明确区分并发与并行的概念,根据实际需求选择合适的实现方式
4.2 过度并行
错误表现:设置过高的 GOMAXPROCS,导致线程切换开销大于并行收益 产生原因:不了解任务特性和硬件资源限制 解决方案:根据任务类型和硬件资源,合理设置 GOMAXPROCS
4.3 竞态条件
错误表现:多个 goroutine 同时访问和修改共享资源,导致结果不一致 产生原因:在并发和并行场景中,没有正确保护共享资源 解决方案:使用互斥锁、读写锁或原子操作来保护共享资源
4.4 死锁
错误表现:多个 goroutine 相互等待对方释放资源,导致程序卡住 产生原因:在并发和并行场景中,资源获取顺序不当 解决方案:避免循环等待,使用带缓冲的通道或超时机制
4.5 资源耗尽
错误表现:创建过多的 goroutine 或线程,导致内存或 CPU 资源耗尽 产生原因:不控制 goroutine 的数量,或设置过高的并行度 解决方案:合理控制 goroutine 的数量,使用工作池模式,根据硬件资源设置合适的并行度
5. 常见应用场景
5.1 网络服务器
场景描述:处理并发的网络请求 使用方法:为每个请求创建一个 goroutine,利用并发提高响应速度 示例代码:
go
func handleConnection(conn net.Conn) {
defer conn.Close()
// 处理请求
}
func main() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}
go handleConnection(conn) // 并发处理请求
}
}5.2 CPU 密集型任务
场景描述:处理需要大量计算的任务 使用方法:使用并行计算,充分利用多核 CPU 示例代码:
go
func compute(data []int) []int {
result := make([]int, len(data))
var wg sync.WaitGroup
// 设置并行度为 CPU 核心数
numWorkers := runtime.GOMAXPROCS(0)
chunkSize := (len(data) + numWorkers - 1) / numWorkers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(start, end int) {
defer wg.Done()
for j := start; j < end && j < len(data); j++ {
// 执行密集计算
result[j] = heavyComputation(data[j])
}
}(i*chunkSize, (i+1)*chunkSize)
}
wg.Wait()
return result
}5.3 I/O 密集型任务
场景描述:处理需要大量 I/O 操作的任务 使用方法:使用并发处理,提高 I/O 利用率 示例代码:
go
func downloadFiles(urls []string) error {
var wg sync.WaitGroup
errCh := make(chan error, len(urls))
for _, url := range urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
// 下载文件
if err := downloadFile(url); err != nil {
errCh <- err
}
}(url)
}
wg.Wait()
close(errCh)
var errs []error
for err := range errCh {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("download failed: %v", errs)
}
return nil
}5.4 数据流处理
场景描述:处理连续的数据流 使用方法:使用并发管道,提高数据处理效率 示例代码:
go
func processData(input <-chan int, output chan<- int) {
for data := range input {
// 处理数据
result := processItem(data)
output <- result
}
close(output)
}
func main() {
input := make(chan int)
output := make(chan int)
// 启动多个处理 goroutine
for i := 0; i < 4; i++ {
go processData(input, output)
}
// 发送数据
go func() {
for i := 0; i < 100; i++ {
input <- i
}
close(input)
}()
// 接收结果
for result := range output {
fmt.Println(result)
}
}5.5 定时任务
场景描述:执行定时任务 使用方法:使用 goroutine 和定时器,实现并发的定时任务 示例代码:
go
func scheduleTask() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
go func() {
// 执行定时任务
fmt.Println("Executing scheduled task")
}()
}
}
}
func main() {
go scheduleTask()
select {}
}6. 企业级进阶应用场景
6.1 微服务架构
场景描述:在微服务架构中处理并发请求 使用方法:结合 goroutine 和 channel 实现高并发的服务端 示例代码:
go
func handleRequest(w http.ResponseWriter, r *http.Request) {
// 解析请求
// 处理业务逻辑
// 返回响应
}
func main() {
http.HandleFunc("/api", handleRequest)
http.ListenAndServe(":8080", nil) // 内部使用 goroutine 处理每个请求
}6.2 大数据处理
场景描述:处理大规模数据 使用方法:使用并行计算,充分利用集群资源 示例代码:
go
func processLargeDataset(dataset []Data) []Result {
result := make([]Result, len(dataset))
var wg sync.WaitGroup
// 根据集群规模设置并行度
numWorkers := runtime.GOMAXPROCS(0)
chunkSize := (len(dataset) + numWorkers - 1) / numWorkers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(start, end int) {
defer wg.Done()
for j := start; j < end && j < len(dataset); j++ {
// 处理数据
result[j] = processDataItem(dataset[j])
}
}(i*chunkSize, (i+1)*chunkSize)
}
wg.Wait()
return result
}6.3 实时系统
场景描述:处理实时数据流,如金融交易、传感器数据等 使用方法:使用并发处理,确保数据的实时性 示例代码:
go
func processRealTimeData(dataChan <-chan Data) {
for data := range dataChan {
go func(data Data) {
// 实时处理数据
processData(data)
}(data)
}
}
func main() {
dataChan := make(chan Data)
go processRealTimeData(dataChan)
// 模拟数据输入
for {
data := generateData()
dataChan <- data
time.Sleep(10 * time.Millisecond)
}
}6.4 分布式系统
场景描述:在分布式系统中处理并发任务 使用方法:结合 goroutine 和网络通信,实现分布式并发 示例代码:
go
func handleRemoteTask(task Task) error {
// 处理远程任务
return nil
}
func main() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
// 读取任务
var task Task
if err := json.NewDecoder(conn).Decode(&task); err != nil {
log.Println(err)
return
}
// 处理任务
if err := handleRemoteTask(task); err != nil {
log.Println(err)
return
}
// 返回结果
json.NewEncoder(conn).Encode(map[string]string{"status": "ok"})
}(conn)
}
}6.5 高并发 API 网关
场景描述:构建高并发的 API 网关 使用方法:使用 goroutine 处理每个请求,结合连接池和缓存 示例代码:
go
func setupRoutes() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
go func() {
// 记录请求日志
log.Printf("Request: %s %s", r.Method, r.URL.Path)
}()
// 处理请求
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"message": "API Gateway"})
})
return mux
}
func main() {
server := &http.Server{
Addr: ":8080",
Handler: setupRoutes(),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
log.Fatal(server.ListenAndServe())
}7. 行业最佳实践
7.1 合理设置并行度
实践内容:根据任务类型和硬件资源,合理设置 GOMAXPROCS 推荐理由:避免过度并行导致的性能下降,充分利用硬件资源
7.2 使用工作池模式
实践内容:使用工作池模式控制并发度 推荐理由:避免创建过多的 goroutine,减少资源消耗
7.3 优先使用并发而非并行
实践内容:优先使用并发设计,而不是盲目追求并行 推荐理由:并发设计更灵活,更容易处理复杂的任务流程
7.4 合理使用同步原语
实践内容:根据场景选择合适的同步原语 推荐理由:不同的同步原语有不同的适用场景,选择合适的可以提高性能和可靠性
7.5 监控并发性能
实践内容:监控并发程序的性能指标 推荐理由:及时发现和解决性能瓶颈,确保系统的稳定性
7.6 错误处理和恢复
实践内容:在并发场景中妥善处理错误和恢复 推荐理由:并发错误处理不当可能导致程序崩溃或行为异常
8. 常见问题答疑(FAQ)
8.1 并发和并行的区别是什么?
问题描述:并发和并行的核心区别 回答内容:并发是逻辑上的同时执行,不需要多个 CPU 核心;并行是物理上的同时执行,需要多个 CPU 核心。并发关注的是程序的结构和设计,并行关注的是程序的执行方式。 示例代码:
go
// 并发执行两个函数
go func() { fmt.Println("Task 1") }()
go func() { fmt.Println("Task 2") }()
// 设置并行度
runtime.GOMAXPROCS(2) // 允许最多 2 个线程并行执行8.2 如何设置 Go 程序的并行度?
问题描述:如何控制 Go 程序的并行执行数量 回答内容:可以通过设置 GOMAXPROCS 环境变量或调用 runtime.GOMAXPROCS() 函数来控制并行度。默认情况下,GOMAXPROCS 等于 CPU 核心数。 示例代码:
go
// 设置并行度为 4
runtime.GOMAXPROCS(4)
// 获取当前并行度
fmt.Println("Current GOMAXPROCS:", runtime.GOMAXPROCS(0))8.3 并发程序中如何避免竞态条件?
问题描述:如何在并发程序中避免多个 goroutine 同时修改共享资源 回答内容:可以使用互斥锁、读写锁或原子操作来保护共享资源,确保同一时间只有一个 goroutine 可以修改共享资源。 示例代码:
go
var (
mu sync.Mutex
counter int
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}8.4 如何处理并发程序中的死锁?
问题描述:如何避免和处理并发程序中的死锁 回答内容:避免循环等待,使用带缓冲的通道,设置超时机制,以及使用 select 语句处理多个通道。 示例代码:
go
// 使用 select 避免死锁
select {
case ch1 <- value:
// 发送成功
case <-time.After(time.Second):
// 超时处理
}8.5 如何监控并发程序的性能?
问题描述:如何监控并发程序的性能指标 回答内容:可以使用 Go 的内置工具如 pprof 和 trace,或者使用第三方监控工具来监控并发程序的性能指标,如 goroutine 数量、CPU 使用率、内存使用等。 示例代码:
go
// 导出性能分析数据
pprof.StartCPUProfile(os.Stdout)
defer pprof.StopCPUProfile()
// 监控 goroutine 数量
func monitorGoroutines() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
fmt.Printf("Number of goroutines: %d\n", runtime.NumGoroutine())
}
}8.6 如何设计高并发的系统?
问题描述:如何设计一个高并发的系统 回答内容:合理设计并发结构,使用工作池模式控制并发度,优先使用并发而非并行,合理使用同步原语,监控并发性能,妥善处理错误和恢复。 示例代码:
go
// 工作池模式
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, j)
results <- j * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动工作池
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集结果
for a := 1; a <= numJobs; a++ {
<-results
}
}9. 实战练习
9.1 基础练习
题目:实现一个并发的计数器,比较不同并行度下的性能 解题思路:
- 创建一个计数器变量
- 使用多个 goroutine 并发增加计数器
- 使用互斥锁保护计数器
- 分别设置不同的 GOMAXPROCS 值,比较性能
- 输出最终结果和执行时间
常见误区:
- 未使用互斥锁保护共享资源,导致竞态条件
- 未正确等待所有 goroutine 完成
- 盲目设置过高的并行度
分步提示:
- 定义计数器变量和互斥锁
- 创建多个 goroutine 并发执行增加操作
- 使用 WaitGroup 等待所有 goroutine 完成
- 分别设置不同的 GOMAXPROCS 值,运行程序并记录执行时间
- 分析不同并行度下的性能差异
参考代码:
go
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
var (
counter int
mu sync.Mutex
wg sync.WaitGroup
)
func increment() {
defer wg.Done()
for i := 0; i < 1000000; i++ {
mu.Lock()
counter++
mu.Unlock()
}
}
func main() {
const numGoroutines = 10
// 测试不同的并行度
for procs := 1; procs <= runtime.NumCPU(); procs++ {
runtime.GOMAXPROCS(procs)
counter = 0
start := time.Now()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go increment()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("GOMAXPROCS=%d, Counter=%d, Time=%v\n", procs, counter, elapsed)
}
}9.2 进阶练习
题目:实现一个并行的归并排序算法 解题思路:
- 实现归并排序算法
- 当数组长度小于阈值时,使用串行排序
- 当数组长度大于阈值时,将数组分成两半,并行排序
- 合并排序结果
常见误区:
- 对小数组也使用并行排序,导致开销大于收益
- 未正确合并排序结果
- 并行度设置过高
分步提示:
- 实现归并排序的基本逻辑
- 设置一个阈值,当数组长度小于阈值时使用串行排序
- 当数组长度大于阈值时,将数组分成两半,创建两个 goroutine 并行排序
- 等待两个 goroutine 完成,然后合并排序结果
- 测试不同阈值下的性能
参考代码:
go
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
const sortThreshold = 1000
func mergeSort(arr []int) []int {
if len(arr) <= 1 {
return arr
}
if len(arr) < sortThreshold {
// 小数组使用串行排序
return serialMergeSort(arr)
}
mid := len(arr) / 2
var wg sync.WaitGroup
var left, right []int
wg.Add(2)
// 并行排序左右两部分
go func() {
defer wg.Done()
left = mergeSort(arr[:mid])
}()
go func() {
defer wg.Done()
right = mergeSort(arr[mid:])
}()
wg.Wait()
// 合并结果
return merge(left, right)
}
func serialMergeSort(arr []int) []int {
if len(arr) <= 1 {
return arr
}
mid := len(arr) / 2
left := serialMergeSort(arr[:mid])
right := serialMergeSort(arr[mid:])
return merge(left, right)
}
func merge(left, right []int) []int {
result := make([]int, len(left)+len(right))
i, j := 0, 0
for k := 0; k < len(result); k++ {
if i >= len(left) {
result[k] = right[j]
j++
} else if j >= len(right) {
result[k] = left[i]
i++
} else if left[i] < right[j] {
result[k] = left[i]
i++
} else {
result[k] = right[j]
j++
}
}
return result
}
func main() {
// 生成随机数组
const size = 1000000
arr := make([]int, size)
for i := 0; i < size; i++ {
arr[i] = size - i
}
// 测试并行排序
start := time.Now()
sorted := mergeSort(arr)
elapsed := time.Since(start)
fmt.Printf("Parallel merge sort took %v\n", elapsed)
fmt.Printf("First 10 elements: %v\n", sorted[:10])
fmt.Printf("Last 10 elements: %v\n", sorted[len(sorted)-10:])
}9.3 挑战练习
题目:实现一个并发的网页爬虫,支持限制并发度 解题思路:
- 定义 URL 队列和已访问 URL 集合
- 使用 goroutine 并发爬取网页
- 解析网页中的链接
- 将新链接添加到队列
- 避免重复爬取
- 限制并发数量
- 比较不同并发度下的性能
常见误区:
- 未限制并发数量,导致资源耗尽
- 未处理循环链接,导致无限爬取
- 未正确处理网络错误
- 盲目设置过高的并发度
分步提示:
- 实现 URL 管理器,处理 URL 的添加和去重
- 实现工作协程,爬取网页并解析链接
- 使用通道控制并发数量
- 实现错误处理机制
- 测试不同并发度下的爬取性能
- 分析并发度对爬取速度的影响
参考代码:
go
package main
import (
"fmt"
"net/http"
"net/url"
"sync"
"time"
"golang.org/x/net/html"
)
type URLManager struct {
urls chan string
visited map[string]bool
mu sync.Mutex
maxDepth int
}
func NewURLManager(maxDepth int) *URLManager {
return &URLManager{
urls: make(chan string, 100),
visited: make(map[string]bool),
maxDepth: maxDepth,
}
}
func (um *URLManager) AddURL(url string, depth int) bool {
if depth > um.maxDepth {
return false
}
um.mu.Lock()
defer um.mu.Unlock()
if um.visited[url] {
return false
}
um.visited[url] = true
um.urls <- url
return true
}
func (um *URLManager) GetURL() string {
return <-um.urls
}
func (um *URLManager) Close() {
close(um.urls)
}
func extractLinks(resp *http.Response) ([]string, error) {
defer resp.Body.Close()
doc, err := html.Parse(resp.Body)
if err != nil {
return nil, err
}
var links []string
var traverse func(*html.Node)
traverse = func(n *html.Node) {
if n.Type == html.ElementNode && n.Data == "a" {
for _, attr := range n.Attr {
if attr.Key == "href" {
links = append(links, attr.Val)
break
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
traverse(c)
}
}
traverse(doc)
return links, nil
}
func crawl(urlStr string, baseURL *url.URL, um *URLManager, depth int, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get(urlStr)
if err != nil {
return
}
if resp.StatusCode != http.StatusOK {
return
}
links, err := extractLinks(resp)
if err != nil {
return
}
for _, link := range links {
resolvedURL, err := baseURL.Parse(link)
if err != nil {
continue
}
if resolvedURL.Scheme != "http" && resolvedURL.Scheme != "https" {
continue
}
if um.AddURL(resolvedURL.String(), depth+1) {
wg.Add(1)
go crawl(resolvedURL.String(), resolvedURL, um, depth+1, wg)
}
}
}
func main() {
startURL := "https://example.com"
maxDepth := 2
// 测试不同的并发度
for maxConcurrent := 1; maxConcurrent <= 10; maxConcurrent++ {
um := NewURLManager(maxDepth)
um.AddURL(startURL, 0)
var wg sync.WaitGroup
semaphore := make(chan struct{}, maxConcurrent)
start := time.Now()
for urlStr := range um.urls {
wg.Add(1)
semaphore <- struct{}{} // 获取信号量
go func(urlStr string) {
defer func() {
<-semaphore // 释放信号量
}()
baseURL, err := url.Parse(urlStr)
if err != nil {
wg.Done()
return
}
crawl(urlStr, baseURL, um, 0, &wg)
}(urlStr)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Concurrent=%d, URLs crawled=%d, Time=%v\n", maxConcurrent, len(um.visited), elapsed)
um.Close()
}
}10. 知识点总结
10.1 核心要点
- 并发:逻辑上的同时执行,关注程序结构和设计
- 并行:物理上的同时执行,关注程序执行方式
- Go 并发模型:采用 M:N 调度模型,将 goroutine 映射到操作系统线程
- 并行度控制:通过 GOMAXPROCS 控制并行度
- 同步原语:包括互斥锁、读写锁、WaitGroup 等
- 并发安全:确保多个 goroutine 安全访问共享资源
10.2 易错点回顾
- 混淆并发与并行:认为并发就是并行,或者并行就是并发
- 过度并行:设置过高的 GOMAXPROCS,导致线程切换开销大于并行收益
- 竞态条件:多个 goroutine 同时访问和修改共享资源,导致结果不一致
- 死锁:多个 goroutine 相互等待对方释放资源,导致程序卡住
- 资源耗尽:创建过多的 goroutine 或线程,导致内存或 CPU 资源耗尽
11. 拓展参考资料
11.1 官方文档链接
11.2 进阶学习路径建议
- 并发模式:学习常见的并发设计模式,如生产者-消费者、工作池等
- 性能优化:学习如何优化并发程序的性能
- 分布式系统:学习在分布式系统中使用并发编程
- 测试:学习如何测试并发程序
11.3 推荐资源
- 《Concurrency in Go》by Katherine Cox-Buday
- 《Go 语言实战》中的并发编程章节
- Go 官方博客关于并发的文章
- 开源项目中的并发编程实践,如 Kubernetes、Docker 等
