golang-基础-Go语言协程并发
Go语言的协程(Goroutine)是其并发编程的核心特性之一,它通过轻量级线程和通道(Channel)的组合,实现了高效、简洁的并发模型。以下是关于Goroutine并发的全面介绍,涵盖其定义、使用方法、同步机制、适用场景、代码示例以及性能优化技巧。
1. 协程(Goroutine)的基本概念
1.1 什么是Goroutine?
Goroutine是Go语言中的轻量级线程,由Go运行时(runtime)管理,而不是操作系统直接调度。每个Goroutine的栈空间初始为2KB,动态扩展,资源消耗极低。相比传统线程(如Java的Thread),Goroutine的创建和销毁成本极低,单个Go程序可以轻松创建数万个并发任务。
1.2 启动Goroutine
使用 go
关键字启动一个Goroutine:
go function() // 启动一个协程执行function
例如:
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
fmt.Println(s)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
go say("world") // 启动一个协程
say("hello") // 主协程执行
}
输出会交替打印 hello
和 world
,因为两个函数在不同协程中并发执行。
2. 协程的同步机制
2.1 使用 sync.WaitGroup
等待协程完成
主函数默认不会等待Goroutine结束,因此需要同步机制确保程序正确退出。sync.WaitGroup
是最常用的同步工具:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 任务完成后减少计数器
fmt.Printf("Worker %d starting\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait() // 等待所有协程完成
fmt.Println("All workers done")
}
2.2 使用互斥锁(Mutex)保护共享资源
当多个协程访问共享资源时,需通过互斥锁(sync.Mutex
)避免数据竞争:
package main
import (
"fmt"
"sync"
)
var counter int
var mu sync.Mutex
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock() // 加锁
counter++
fmt.Println("Counter:", counter)
mu.Unlock() // 解锁
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
}
3. 通道(Channel)的使用
3.1 通道的基本操作
通道(Channel)是协程间通信的核心工具,支持数据传递和同步。通道分为缓冲通道和非缓冲通道:
- 非缓冲通道:发送和接收必须同时发生。
- 缓冲通道:允许在缓冲区未满时异步发送数据。
示例:
ch := make(chan int) // 非缓冲通道
ch := make(chan int, 10) // 缓冲通道,容量为10
// 发送数据
ch <- 42
// 接收数据
value := <-ch
3.2 通道的并发协作
通过通道实现协程间的协作:
package main
import (
"fmt"
"time"
)
func sum(s []int, ch chan int) {
total := 0
for _, v := range s {
total += v
}
ch <- total // 将结果发送到通道
}
func main() {
numbers := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
ch := make(chan int)
go sum(numbers[:len(numbers)/2], ch)
go sum(numbers[len(numbers)/2:], ch)
x, y := <-ch, <-ch // 接收两个结果
fmt.Println("Total:", x+y)
}
4. Context 控制协程生命周期
4.1 使用 context.Context
取消任务
通过 context.WithCancel
或 context.WithTimeout
可以优雅地取消协程任务:
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker canceled:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(2 * time.Second)
cancel() // 取消任务
time.Sleep(1 * time.Second)
fmt.Println("Main done")
}
4.2 超时控制
使用 context.WithTimeout
自动取消超时任务:
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
5. 协程的适用场景
5.1 并发执行任务
适用于需要并行处理多个独立任务的场景,例如批量数据处理、文件下载等:
package main
import (
"fmt"
"sync"
"time"
)
func processTask(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Processing task %d\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Task %d completed\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go processTask(i, &wg)
}
wg.Wait()
}
5.2 I/O 密集型操作
适用于网络请求、文件读写等I/O密集型任务,避免阻塞主线程:
package main
import (
"fmt"
"net/http"
"time"
)
func fetchURL(url string, ch chan<- string) {
start := time.Now()
resp, err := http.Get(url)
if err != nil {
ch <- fmt.Sprintf("Failed to fetch %s: %v", url, err)
return
}
defer resp.Body.Close()
ch <- fmt.Sprintf("Fetched %s in %v", url, time.Since(start))
}
func main() {
urls := []string{
"https://example.com",
"https://httpbin.org/get",
"https://golang.org",
}
ch := make(chan string, len(urls))
for _, url := range urls {
go fetchURL(url, ch)
}
for i := 0; i < len(urls); i++ {
fmt.Println(<-ch)
}
}
5.3 实时系统
适用于实时聊天服务器、事件驱动系统等场景:
package main
import (
"fmt"
"sync"
)
func handleClient(clientID int, wg *sync.WaitGroup) {
defer wg.Done()
for {
fmt.Printf("Client %d is active\n", clientID)
// 模拟客户端交互
break
}
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go handleClient(i, &wg)
}
wg.Wait()
}
6. 性能优化与注意事项
6.1 限制最大并发数
使用带缓冲的通道作为令牌桶,控制最大并发数:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup, tokenCh <-chan struct{}) {
defer wg.Done()
<-tokenCh // 获取令牌
fmt.Printf("Worker %d started\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d done\n", id)
tokenCh <- struct{}{} // 释放令牌
}
func main() {
maxConcurrency := 3
tokenCh := make(chan struct{}, maxConcurrency)
for i := 1; i <= 5; i++ {
tokenCh <- struct{}{} // 填充令牌桶
go worker(i, &sync.WaitGroup{}, tokenCh)
}
time.Sleep(3 * time.Second)
}
6.2 错误处理
协程中的错误需要通过通道或全局变量传递给主协程:
package main
import (
"fmt"
"sync"
)
func riskyTask(id int, wg *sync.WaitGroup, errCh chan<- error) {
defer wg.Done()
if id == 3 {
errCh <- fmt.Errorf("Task %d failed", id)
return
}
fmt.Printf("Task %d succeeded\n", id)
}
func main() {
var wg sync.WaitGroup
errCh := make(chan error, 1)
for i := 1; i <= 5; i++ {
wg.Add(1)
go riskyTask(i, &wg, errCh)
}
wg.Wait()
if err, ok := <-errCh; ok {
fmt.Println("Error occurred:", err)
}
}
6.3 使用 select
多路复用
通过 select
监听多个通道事件,实现非阻塞通信:
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Result from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Result from ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
7. 协程的高级用法
7.1 协程池
通过协程池管理并发任务,避免资源耗尽:
package main
import (
"fmt"
"sync"
"time"
)
type Pool struct {
workers int
taskCh chan func()
}
func NewPool(workers int) *Pool {
return &Pool{
workers: workers,
taskCh: make(chan func()),
}
}
func (p *Pool) Start() {
for i := 0; i < p.workers; i++ {
go func() {
for task := range p.taskCh {
task()
}
}()
}
}
func (p *Pool) AddTask(task func()) {
p.taskCh <- task
}
func main() {
pool := NewPool(3)
pool.Start()
for i := 1; i <= 5; i++ {
pool.AddTask(func() {
fmt.Println("Executing task")
time.Sleep(500 * time.Millisecond)
})
}
time.Sleep(3 * time.Second)
}
7.2 协程与定时器
结合 time.Ticker
实现周期性任务:
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(1 * time.Second)
done := make(chan bool)
go func() {
for {
select {
case t := <-ticker.C:
fmt.Println("Tick at", t)
case <-done:
ticker.Stop()
return
}
}
}()
time.Sleep(5 * time.Second)
done <- true
fmt.Println("Ticker stopped")
}
8. 协程的适用领域
- 高并发服务器:Web服务器、API网关、实时聊天系统。
- 微服务架构:处理分布式任务、服务间通信。
- 数据处理:批量数据转换、ETL流水线。
- IoT设备管理:并发监控多个传感器或设备。
- 游戏开发:处理玩家行为、实时更新状态。
9. 总结
Go语言的协程并发模型通过以下特性实现了高效并发:
- 轻量级:协程资源消耗低,可轻松创建数万个并发任务。
- 简洁语法:通过
go
关键字和channel
实现并发编程。 - 同步机制:
sync.WaitGroup
、sync.Mutex
和context.Context
提供了强大的同步和取消能力。 - 通道通信:通过 Channel 安全传递数据,避免共享内存竞争。
在实际开发中,合理使用协程可以显著提升程序性能,但需注意资源管理、错误处理和同步控制,避免资源泄漏或竞态条件。
- 感谢你赐予我前进的力量