本文最后更新于 2025-11-03,文章内容可能已经过时。

一、Go并发模型核心哲学

Go语言的并发设计遵循**"不要通过共享内存来通信,而应该通过通信来共享内存"(Do not communicate by sharing memory; instead, share memory by communicating)**的原则。

1.1 Goroutines深度解析

什么是Goroutine?
  • 轻量级线程:由Go运行时管理的协程
  • 内存开销:初始栈大小约2KB(可动态增长)
  • 调度模型:M:N调度(M个goroutine运行在N个OS线程上)
  • 创建成本:比Java线程创建快100倍以上
// 创建goroutine的多种方式
package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    // 1. 匿名函数
    go func() {
        fmt.Println("Anonymous goroutine")
    }()
    
    // 2. 命名函数
    go printMessage("Named goroutine")
    
    // 3. 方法调用
    worker := &Worker{}
    go worker.Process()
    
    // 4. 带参数的goroutine
    go func(msg string, delay time.Duration) {
        time.Sleep(delay)
        fmt.Println(msg)
    }("Delayed message", 500*time.Millisecond)
    
    // 等待所有goroutine完成
    time.Sleep(1 * time.Second)
}

func printMessage(msg string) {
    fmt.Println(msg)
}

type Worker struct{}

func (w *Worker) Process() {
    fmt.Println("Processing in goroutine")
}
Goroutine生命周期管理
func goroutineLifecycle() {
    var wg sync.WaitGroup
    
    // 启动多个goroutine
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done() // 确保计数器减1
            
            fmt.Printf("Goroutine %d starting\n", id)
            time.Sleep(time.Duration(id) * 100 * time.Millisecond)
            fmt.Printf("Goroutine %d finished\n", id)
        }(i) // 传值避免闭包问题
    }
    
    // 等待所有goroutine完成
    wg.Wait()
    fmt.Println("All goroutines completed")
}

1.2 Channels深度剖析

Channel类型详解
// 1. 无缓冲通道(同步通道)
ch1 := make(chan int)        // 必须同时有发送和接收
ch2 := make(chan string, 0)  // 等同于无缓冲

// 2. 有缓冲通道(异步通道)
ch3 := make(chan int, 10)    // 缓冲区大小为10
ch4 := make(chan *Data, 100) // 可以缓存100个指针

// 3. 单向通道(用于函数参数)
func producer(out chan<- string) {  // 只能发送
    out <- "data"
}

func consumer(in <-chan string) {   // 只能接收
    data := <-in
    fmt.Println(data)
}
Channel操作语义
func channelOperations() {
    ch := make(chan int, 2)
    
    // 1. 发送操作
    ch <- 1  // 阻塞直到有空间或接收者
    ch <- 2
    
    // 2. 接收操作
    val := <-ch  // 阻塞直到有数据
    fmt.Println(val)
    
    // 3. 带ok的接收(检查通道是否关闭)
    if val, ok := <-ch; ok {
        fmt.Println("Received:", val)
    } else {
        fmt.Println("Channel closed")
    }
    
    // 4. 关闭通道
    close(ch)
    
    // 5. for-range遍历
    for val := range ch {
        fmt.Println("Range:", val)
    }
}
Select语句高级用法
func advancedSelect() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    timeout := time.After(2 * time.Second)
    
    go func() { time.Sleep(1 * time.Second); ch1 <- "one" }()
    go func() { time.Sleep(3 * time.Second); ch2 <- "two" }()
    
    // 1. 基本select
    select {
    case msg1 := <-ch1:
        fmt.Println("Received:", msg1)
    case msg2 := <-ch2:
        fmt.Println("Received:", msg2)
    case <-timeout:
        fmt.Println("Timeout occurred")
    default:
        fmt.Println("No message ready")
    }
    
    // 2. 带default的非阻塞操作
    select {
    case msg := <-ch1:
        fmt.Println("Non-blocking receive:", msg)
    default:
        fmt.Println("No message available")
    }
    
    // 3. 发送操作
    select {
    case ch1 <- "hello":
        fmt.Println("Sent message")
    default:
        fmt.Println("Channel full")
    }
}

二、高级并发模式

2.1 Worker Pool模式深度实现

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Data  string
    Delay time.Duration
}

type Result struct {
    JobID int
    Error error
    Time  time.Time
}

type WorkerPool struct {
    jobs       chan Job
    results    chan Result
    workers    int
    wg         sync.WaitGroup
    shutdown   chan struct{}
    isShutdown bool
    mu         sync.RWMutex
}

func NewWorkerPool(workers, jobQueueSize int) *WorkerPool {
    return &WorkerPool{
        jobs:     make(chan Job, jobQueueSize),
        results:  make(chan Result, jobQueueSize),
        workers:  workers,
        shutdown: make(chan struct{}),
    }
}

func (wp *WorkerPool) Start() {
    wp.mu.Lock()
    if wp.isShutdown {
        wp.mu.Unlock()
        return
    }
    wp.mu.Unlock()
    
    for i := 0; i < wp.wp.workers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()
    
    fmt.Printf("Worker %d started\n", id)
    
    for {
        select {
        case job, ok := <-wp.jobs:
            if !ok {
                fmt.Printf("Worker %d: Job channel closed\n", id)
                return
            }
            
            fmt.Printf("Worker %d processing job %d\n", id, job.ID)
            time.Sleep(job.Delay)
            
            wp.results <- Result{
                JobID: job.ID,
                Time:  time.Now(),
            }
            
        case <-wp.shutdown:
            fmt.Printf("Worker %d received shutdown signal\n", id)
            return
        }
    }
}

func (wp *WorkerPool) Submit(job Job) bool {
    wp.mu.RLock()
    defer wp.mu.RUnlock()
    
    if wp.isShutdown {
        return false
    }
    
    select {
    case wp.jobs <- job:
        return true
    default:
        fmt.Printf("Job queue full, rejecting job %d\n", job.ID)
        return false
    }
}

func (wp *WorkerPool) Results() <-chan Result {
    return wp.results
}

func (wp *WorkerPool) Shutdown() {
    wp.mu.Lock()
    if wp.isShutdown {
        wp.mu.Unlock()
        return
    }
    wp.isShutdown = true
    wp.mu.Unlock()
    
    close(wp.shutdown)
    close(wp.jobs)
    
    wp.wg.Wait()
    close(wp.results)
    
    fmt.Println("Worker pool shutdown complete")
}

func (wp *WorkerPool) Stats() (int, int) {
    return len(wp.jobs), len(wp.results)
}

// 使用示例
func workerPoolExample() {
    pool := NewWorkerPool(3, 10)
    pool.Start()
    
    // 提交任务
    for i := 1; i <= 15; i++ {
        job := Job{
            ID:    i,
            Data:  fmt.Sprintf("Task %d", i),
            Delay: time.Duration(100+100*i) * time.Millisecond,
        }
        
        if !pool.Submit(job) {
            fmt.Printf("Failed to submit job %d\n", i)
        }
    }
    
    // 收集结果
    go func() {
        for result := range pool.Results() {
            fmt.Printf("Job %d completed at %v\n", result.JobID, result.Time.Format("15:04:05.000"))
        }
        fmt.Println("All results collected")
    }()
    
    // 运行一段时间后关闭
    time.Sleep(3 * time.Second)
    pool.Shutdown()
}

2.2 Pipeline模式

func pipelineExample() {
    // 阶段1: 生成数据
    gen := func(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for _, n := range nums {
                out <- n
            }
        }()
        return out
    }
    
    // 阶段2: 平方运算
    sq := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for n := range in {
                out <- n * n
            }
        }()
        return out
    }
    
    // 阶段3: 过滤偶数
    even := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            defer close(out)
            for n := range in {
                if n%2 == 0 {
                    out <- n
                }
            }
        }()
        return out
    }
    
    // 构建管道
    numbers := gen(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squares := sq(numbers)
    evens := even(squares)
    
    // 消费结果
    for result := range evens {
        fmt.Println("Even square:", result)
    }
}

2.3 Fan-out/Fan-in模式增强版

func advancedFanOutIn() {
    // 生产者
    producer := func() <-chan int {
        ch := make(chan int, 100)
        go func() {
            defer close(ch)
            for i := 1; i <= 50; i++ {
                ch <- i
            }
        }()
        return ch
    }
    
    // 消费者工厂
    createWorker := func(id int, jobs <-chan int, results chan<- int) {
        go func() {
            for job := range jobs {
                // 模拟工作负载
                time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
                results <- job * job
                fmt.Printf("Worker %d processed job %d\n", id, job)
            }
            fmt.Printf("Worker %d finished\n", id)
        }()
    }
    
    // 结果合并
    mergeResults := func(cs ...<-chan int) <-chan int {
        var wg sync.WaitGroup
        out := make(chan int)
        
        output := func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }
        
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
        
        go func() {
            wg.Wait()
            close(out)
        }()
        
        return out
    }
    
    // 扇出:分发工作
    jobs := producer()
    workers := 5
    jobChannels := make([]chan int, workers)
    
    for i := 0; i < workers; i++ {
        jobChannels[i] = make(chan int, 10)
        createWorker(i+1, jobChannels[i], results)
        
        // 分发工作到各个worker
        go func(ch chan int) {
            defer close(ch)
            for job := range jobs {
                ch <- job
            }
        }(jobChannels[i])
    }
    
    // 扇入:合并结果
    results := mergeResults(jobChannels...)
    
    // 收集和处理结果
    startTime := time.Now()
    count := 0
    sum := 0
    
    for result := range results {
        count++
        sum += result
        fmt.Printf("Result %d: %d\n", count, result)
    }
    
    duration := time.Since(startTime)
    fmt.Printf("Processed %d jobs in %v, sum: %d\n", count, duration, sum)
}

三、并发安全与同步原语

3.1 sync包深度使用

// 1. sync.Mutex
type SafeCounter struct {
    mu    sync.Mutex
    value map[string]int
}

func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value[key]++
}

func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value[key]
}

// 2. sync.RWMutex
type SafeMap struct {
    mu    sync.RWMutex
    data  map[string]interface{}
}

func (m *SafeMap) Get(key string) interface{} {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return m.data[key]
}

func (m *SafeMap) Set(key string, value interface{}) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

// 3. sync.Once
var once sync.Once
var instance *Singleton

type Singleton struct {
    data string
}

func GetInstance() *Singleton {
    once.Do(func() {
        instance = &Singleton{data: "initialized"}
    })
    return instance
}

// 4. sync.Pool
var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 1024)
    },
}

func processWithPool(data []byte) {
    buf := bufferPool.Get().([]byte)
    defer bufferPool.Put(buf)
    
    // 使用缓冲区
    copy(buf, data)
    // 处理数据...
}

3.2 atomic包使用

import "sync/atomic"

type Counter struct {
    total uint64
}

func (c *Counter) Inc() {
    atomic.AddUint64(&c.total, 1)
}

func (c *Counter) Get() uint64 {
    return atomic.LoadUint64(&c.total)
}

func (c *Counter) CompareAndSwap(old, new uint64) bool {
    return atomic.CompareAndSwapUint64(&c.total, old, new)
}

四、上下文(Context)深度应用

4.1 Context类型详解

// 1. context.Background()
ctx := context.Background()

// 2. WithCancel
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// 3. WithTimeout
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 4. WithDeadline
deadline := time.Now().Add(3 * time.Second)
ctx, cancel = context.WithDeadline(context.Background(), deadline)
defer cancel()

// 5. WithValue
ctx = context.WithValue(ctx, "user_id", "123")
userID := ctx.Value("user_id").(string)

4.2 Context在实际项目中的应用

package main

import (
    "context"
    "database/sql"
    "net/http"
    "time"
)

// 1. HTTP请求处理
func httpHandler(w http.ResponseWriter, r *http.Request) {
    // 为每个请求创建context
    ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
    defer cancel()
    
    // 传递到业务逻辑
    result, err := processRequest(ctx, r)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 返回结果
    w.Write([]byte(result))
}

// 2. 数据库操作
func processRequest(ctx context.Context, r *http.Request) (string, error) {
    // 检查用户认证
    userID, err := authenticate(ctx, r)
    if err != nil {
        return "", err
    }
    
    // 查询用户数据
    user, err := getUser(ctx, userID)
    if err != nil {
        return "", err
    }
    
    // 处理业务逻辑
    result, err := businessLogic(ctx, user)
    if err != nil {
        return "", err
    }
    
    return result, nil
}

func authenticate(ctx context.Context, r *http.Request) (string, error) {
    // 模拟认证服务调用
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(100 * time.Millisecond):
        return "user123", nil
    }
}

func getUser(ctx context.Context, userID string) (*User, error) {
    // 模拟数据库查询
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-time.After(200 * time.Millisecond):
        return &User{ID: userID, Name: "Alice"}, nil
    }
}

func businessLogic(ctx context.Context, user *User) (string, error) {
    // 模拟复杂业务逻辑
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(500 * time.Millisecond):
        return fmt.Sprintf("Hello %s", user.Name), nil
    }
}

五、错误处理与恢复

5.1 并发中的错误处理模式

type Result struct {
    Data  interface{}
    Error error
    ID    string
}

// 1. 统一错误通道
func workerWithError(id string, jobs <-chan Job) <-chan Result {
    results := make(chan Result, 1)
    go func() {
        defer close(results)
        
        for job := range jobs {
            result := Result{ID: job.ID}
            
            // 执行任务
            data, err := executeJob(job)
            if err != nil {
                result.Error = fmt.Errorf("worker %s: %w", id, err)
            } else {
                result.Data = data
            }
            
            results <- result
        }
    }()
    return results
}

// 2. 错误聚合
func processWithErrorAggregation() {
    var wg sync.WaitGroup
    errors := make(chan error, 10)
    
    // 启动多个worker
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            
            // 模拟工作
            for j := 0; j < 5; j++ {
                if rand.Intn(10) == 0 { // 10%失败率
                    select {
                    case errors <- fmt.Errorf("worker %d failed on job %d", workerID, j):
                    default:
                        // 错误通道满时丢弃
                    }
                }
                time.Sleep(100 * time.Millisecond)
            }
        }(i)
    }
    
    // 收集错误
    go func() {
        wg.Wait()
        close(errors)
    }()
    
    // 处理错误
    errorCount := 0
    for err := range errors {
        fmt.Printf("Error: %v\n", err)
        errorCount++
    }
    
    fmt.Printf("Total errors: %d\n", errorCount)
}

5.2 Panic恢复机制

func safeGoroutine() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Recovered from panic: %v\n", r)
            // 可以记录日志、发送监控等
        }
    }()
    
    // 可能panic的代码
    riskyOperation()
}

func workerWithRecovery(id int) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Worker %d panicked: %v\n", id, r)
            // 可以重启worker或进行其他处理
        }
    }()
    
    // 工作逻辑
    for {
        // 处理任务...
        time.Sleep(100 * time.Millisecond)
    }
}

六、性能优化与最佳实践

6.1 性能测试

func BenchmarkWorkerPool(b *testing.B) {
    pool := NewWorkerPool(10, 100)
    pool.Start()
    defer pool.Shutdown()
    
    b.ResetTimer()
    
    for i := 0; i < b.N; i++ {
        job := Job{
            ID:    i,
            Delay: 10 * time.Millisecond,
        }
        pool.Submit(job)
    }
    
    // 等待所有任务完成
    for i := 0; i < b.N; i++ {
        <-pool.Results()
    }
}

6.2 内存优化

// 1. 使用sync.Pool减少GC压力
var bufferPool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 32*1024) // 32KB缓冲区
    },
}

// 2. 避免goroutine泄漏
func safeGoroutineWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    result := make(chan string, 1)
    
    go func() {
        // 模拟长时间操作
        time.Sleep(10 * time.Second)
        result <- "done"
    }()
    
    select {
    case r := <-result:
        fmt.Println(r)
    case <-ctx.Done():
        fmt.Println("Operation timed out")
        // goroutine会随着函数返回而结束
    }
}

七、完整企业级示例:分布式任务调度系统

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID          string    `json:"id"`
    Type        string    `json:"type"`
    Payload     string    `json:"payload"`
    Priority    int       `json:"priority"`
    Timeout     time.Duration `json:"timeout"`
    CreatedAt   time.Time `json:"created_at"`
}

type TaskResult struct {
    TaskID      string    `json:"task_id"`
    Success     bool      `json:"success"`
    Data        string    `json:"data"`
    Error       string    `json:"error"`
    Duration    time.Duration `json:"duration"`
    CompletedAt time.Time `json:"completed_at"`
}

type TaskScheduler struct {
    pendingTasks   chan *Task
    completedTasks chan *TaskResult
    workers        int
    wg             sync.WaitGroup
    shutdown       chan struct{}
    metrics        *Metrics
    logger         Logger
}

type Metrics struct {
    TotalTasks    int64
    Successful    int64
    Failed        int64
    AvgDuration   time.Duration
    mu            sync.RWMutex
}

type Logger interface {
    Info(msg string, fields map[string]interface{})
    Error(msg string, fields map[string]interface{})
}

func NewTaskScheduler(workers, queueSize int) *TaskScheduler {
    return &TaskScheduler{
        pendingTasks:   make(chan *Task, queueSize),
        completedTasks: make(chan *TaskResult, queueSize),
        workers:        workers,
        shutdown:       make(chan struct{}),
        metrics:        &Metrics{},
        logger:         &ConsoleLogger{},
    }
}

func (s *TaskScheduler) Start() {
    s.logger.Info("Starting task scheduler", map[string]interface{}{
        "workers": s.workers,
        "queue_size": cap(s.pendingTasks),
    })
    
    for i := 0; i < s.workers; i++ {
        s.wg.Add(1)
        go s.worker(i)
    }
    
    go s.metricsCollector()
}

func (s *TaskScheduler) worker(id int) {
    defer s.wg.Done()
    s.logger.Info("Worker started", map[string]interface{}{"worker_id": id})
    
    for {
        select {
        case task, ok := <-s.pendingTasks:
            if !ok {
                s.logger.Info("Worker shutting down", map[string]interface{}{"worker_id": id})
                return
            }
            
            s.processTask(id, task)
            
        case <-s.shutdown:
            s.logger.Info("Worker received shutdown", map[string]interface{}{"worker_id": id})
            return
        }
    }
}

func (s *TaskScheduler) processTask(workerID int, task *Task) {
    startTime := time.Now()
    ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
    defer cancel()
    
    s.logger.Info("Processing task", map[string]interface{}{
        "worker_id": workerID,
        "task_id":   task.ID,
        "task_type": task.Type,
    })
    
    var result TaskResult
    result.TaskID = task.ID
    result.CompletedAt = time.Now()
    
    defer func() {
        duration := time.Since(startTime)
        result.Duration = duration
        
        // 更新指标
        s.metrics.mu.Lock()
        s.metrics.TotalTasks++
        if result.Success {
            s.metrics.Successful++
        } else {
            s.metrics.Failed++
        }
        s.metrics.AvgDuration = time.Duration(int64(s.metrics.AvgDuration)*99/100 + int64(duration)/100)
        s.metrics.mu.Unlock()
        
        // 发送结果
        s.completedTasks <- &result
        
        s.logger.Info("Task completed", map[string]interface{}{
            "task_id":   task.ID,
            "success":   result.Success,
            "duration":  duration.String(),
        })
    }()
    
    // 模拟任务处理
    select {
    case <-ctx.Done():
        result.Success = false
        result.Error = ctx.Err().Error()
        
    case <-time.After(time.Duration(rand.Intn(1000)) * time.Millisecond):
        // 根据任务类型处理
        switch task.Type {
        case "compute":
            result.Success = true
            result.Data = fmt.Sprintf("Computed result for %s", task.Payload)
        case "io":
            result.Success = true
            result.Data = fmt.Sprintf("IO operation completed for %s", task.Payload)
        default:
            result.Success = false
            result.Error = "unknown task type"
        }
    }
}

func (s *TaskScheduler) Submit(task *Task) bool {
    if task.CreatedAt.IsZero() {
        task.CreatedAt = time.Now()
    }
    if task.Timeout == 0 {
        task.Timeout = 30 * time.Second
    }
    
    select {
    case s.pendingTasks <- task:
        s.logger.Info("Task submitted", map[string]interface{}{
            "task_id": task.ID,
            "task_type": task.Type,
        })
        return true
    default:
        s.logger.Error("Task queue full", map[string]interface{}{
            "task_id": task.ID,
        })
        return false
    }
}

func (s *TaskScheduler) Results() <-chan *TaskResult {
    return s.completedTasks
}

func (s *TaskScheduler) Metrics() map[string]interface{} {
    s.metrics.mu.RLock()
    defer s.metrics.mu.RUnlock()
    
    return map[string]interface{}{
        "total_tasks":    s.metrics.TotalTasks,
        "successful":     s.metrics.Successful,
        "failed":         s.metrics.Failed,
        "avg_duration":   s.metrics.AvgDuration.String(),
        "queue_length":   len(s.pendingTasks),
        "results_length": len(s.completedTasks),
    }
}

func (s *TaskScheduler) Shutdown() {
    s.logger.Info("Shutting down task scheduler", nil)
    
    close(s.shutdown)
    close(s.pendingTasks)
    
    s.wg.Wait()
    close(s.completedTasks)
    
    s.logger.Info("Task scheduler shutdown complete", nil)
}

func (s *TaskScheduler) metricsCollector() {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            metrics := s.Metrics()
            s.logger.Info("Current metrics", metrics)
            
        case <-s.shutdown:
            return
        }
    }
}

// 简单的日志实现
type ConsoleLogger struct{}

func (l *ConsoleLogger) Info(msg string, fields map[string]interface{}) {
    timestamp := time.Now().Format("2006-01-02 15:04:05")
    fmt.Printf("[%s] INFO: %s", timestamp, msg)
    if len(fields) > 0 {
        data, _ := json.Marshal(fields)
        fmt.Printf(" %s", data)
    }
    fmt.Println()
}

func (l *ConsoleLogger) Error(msg string, fields map[string]interface{}) {
    timestamp := time.Now().Format("2006-01-02 15:04:05")
    fmt.Printf("[%s] ERROR: %s", timestamp, msg)
    if len(fields) > 0 {
        data, _ := json.Marshal(fields)
        fmt.Printf(" %s", data)
    }
    fmt.Println()
}

// 使用示例
func main() {
    scheduler := NewTaskScheduler(5, 100)
    scheduler.Start()
    defer scheduler.Shutdown()
    
    // 提交任务
    for i := 1; i <= 20; i++ {
        taskType := "compute"
        if i%3 == 0 {
            taskType = "io"
        }
        
        task := &Task{
            ID:       fmt.Sprintf("task-%d", i),
            Type:     taskType,
            Payload:  fmt.Sprintf("data-%d", i),
            Priority: i % 5,
            Timeout:  5 * time.Second,
        }
        
        if !scheduler.Submit(task) {
            fmt.Printf("Failed to submit task %d\n", i)
        }
        
        // 控制提交速率
        if i%5 == 0 {
            time.Sleep(500 * time.Millisecond)
        }
    }
    
    // 收集结果
    go func() {
        for result := range scheduler.Results() {
            status := "SUCCESS"
            if !result.Success {
                status = "FAILED"
            }
            fmt.Printf("Task %s: %s (%v)\n", result.TaskID, status, result.Duration)
        }
    }()
    
    // 运行一段时间
    time.Sleep(15 * time.Second)
    
    // 显示最终指标
    metrics := scheduler.Metrics()
    data, _ := json.MarshalIndent(metrics, "", "  ")
    fmt.Printf("Final metrics:\n%s\n", string(data))
}

八、总结与建议

8.1 Go并发优势

  • 高性能:Goroutines轻量,M:N调度高效
  • 简洁性:channels提供优雅的通信机制
  • 灵活性:可以实现各种复杂的并发模式
  • 内置支持:标准库提供丰富的并发工具

8.2 注意事项

  • 避免goroutine泄漏:总是有退出机制
  • 合理使用缓冲:根据场景选择缓冲大小
  • 错误处理:确保错误能被正确传递和处理
  • 性能监控:监控goroutine数量和channel状态

8.3 学习路径建议

  1. 掌握基础:goroutines、channels、select
  2. 学习标准库:sync、context包
  3. 实践常见模式:worker pool、pipeline等
  4. 阅读优秀源码:如etcd、kubernetes中的并发实现
  5. 性能调优:使用pprof进行性能分析

Go语言的并发模型虽然需要更多手动管理,但提供了极大的灵活性和性能优势,特别适合构建高并发、分布式系统。