Go 入门笔记『并发编程』

在 github 上看到一个项目 Golang-developer-roadmap,感觉规划的很不错
这里就根据下面的图简单说说
前面的 Go 语言基础就不说了,后面的错误处理之后再补。这就就简单说说 Go 并发编程

主要内容包括:

协程 Goroutine

Go 提供了一种用户态线程(协程)来支持并发。协程可以说是轻量线程,它不由 os 而是由应用程序创建和管理,使用开销较低,切换不需要耗费较多的时间。

Goroutine 有如下特性

  • go 后面的函数的返回值会被忽略
  • 调度器不能保证多个 goroutine 的执行顺序
  • 所有 goroutine 都是平等的
  • 会为 main 函数创建一个 goroutine,遇到其他 go 关键字时再去创建 goroutine
  • 不能再 goroutine 里面显式地操作另一个 goroutine
  • goroutine 的执行是非阻塞的,不会等待

如何启动 goroutine

启动一个协程的语法非常简单,使用 go 关键字即可

// 匿名函数形式
go func() {
    ...
}

// 有名函数形式
go funcName() {
    ...
}

多个 goroutine 之间的同步

多个协程之间可以通过 sync.WaitGroup 同步,类似于信号量

  • wg.Add(1) 信号量加1
  • wg.Done() 信号量减1
  • wg.Wait() 信号量为正时阻塞,直到信号量为 0 时被唤醒

等讲完 channel,可以搭配写一个生产者消费者 demo

调度器

调度器的作用

对于同一个内核上的 goroutine,我们需要一个调度器来维护他们。调度器的主要有4个重要部分,分别是M、G、P、Sched,前三个定义在runtime.h中,Sched定义在proc.c中:

  • M (work thread) 代表了系统线程OS Thread,由操作系统管理

  • P (processor) 衔接M和G的调度上下文,它负责将等待执行的G与M对接。P的数量可以通过GOMAXPROCS()来设置,它其实也就代表了真正的并发度,即有多少个goroutine可以同时运行

  • G (goroutine) goroutine的实体,包括了调用栈,重要的调度信息,例如channel等

PS: 通过 untime.GOMAXPROCS(n) 设定CPU核数,实际中运行的 CPU 核数未必会和实际物理 CPU 数相吻合。

简单说说调度器的工作原理

我看了一些调度器的工作原理和实现,有一些复杂,这里就简单说说一些场景,以后有空再详细说。

假如有两个M (work thread),即两个OS Thread线程,分别对应一个P (processor),每一个P调度一个G (goroutine)队列,那么会出现以下几种场景:

  • 当我们通过循环来创建 goroutine 时,goroutine 会被分配到不同的 G (goroutine) 队列中。 而 M (work thread) 的数量又不是唯一的,当 M 随机挑选 P (processor) 时,也就等同随机挑选了 goroutine。

  • 当有一个 M (work thread) 返回时,一般情况下,它会尝试从别的 M 里取得一个 P (processor) 来运行 goroutine;如果没有拿到,它就把goroutine 放在一个 global 就绪队列里,然后自己进入线程缓存里

  • 如果某个 P (processor) 所分配的 G (goroutine)队列 很快就执行完了,导致整体的队列存在不平衡,当前 P (processor) 会从其他 P 中截取一部分 goroutine 进行调度(一般来说是取一半,确保每个 M 都能充分的使用)

  • 当一个 M (work thread) 线程被阻塞时,在此的 P (processor) 可以投奔另一个 M

贴一个启动 goroutine 的完整用法:

package main

import (
    "fmt"
    "runtime"
    "time"
)

func getGoroutineInfo() {
    NumCPU := runtime.NumCPU()
    NumGoroutine := runtime.NumGoroutine()
    fmt.Println("当前 CPU 内核数: ", NumCPU)
    fmt.Println("当前正在运行的 goroutine 数: ", NumGoroutine)
}

// 通过有名函数启动 goroutine
func testGoroutine1() {
    sum := 0
    for i := 0; i < 1000; i++ {
        sum += i
    }
    fmt.Println(sum)
    time.Sleep(10 * time.Second)
}

func testGoroutine2() {
    // 通过匿名函数启动 goroutine
    go func() {
        sum := 0
        for i := 0; i < 2000; i++ {
            sum += i
        }
        fmt.Println(sum)
        time.Sleep(20 * time.Second)
    }()
}

func main(){

    go testGoroutine1()
    getGoroutineInfo()

    testGoroutine2()
    getGoroutineInfo()

    time.Sleep(5 * time.Second)
}

信道 Channel

Go 中 Channel 的特点:

  • channel 是 goroutine 之间互相通信的通道,goroutine 之间可以通过它发消息和接收消息。

  • channel是类型相关的,一个channel只能传递(发送或接受 | send or receive)一种类型的值,这个类型需要在声明channel时指定

  • 如果没有指定 channel 缓冲区的大小,默认是阻塞的,信道的存消息和取消息都是阻塞的 (叫做无缓冲的信道)

    • 无缓冲区的 channel: 往 channel 发送数据后,这个数据如果没有取走,channel 是阻塞的,也就是不能继续向 channel 里面发送数据
    • 带有缓冲区的channel: 在缓冲区有数据而未填满前,读取不会出现阻塞的情况。

Channel 使用

package main

import (
    "fmt"
    "time"
)

// 使用 make 建立一个 channel
// 定义只能接受的 channel
// var receive_only chan int = make(<-chan int)

// 定义只能发送的 channel
//var send_only chan int = make(chan<- int)

// 定义可同时发送接受的 channel
// var send_receive chan int = make(chan int)

// 定义可同时发送接受的有缓冲 10 个的 channel
var send_receive chan int = make(chan int, 10)

func send(channel chan<- int) {
    for i := 0; i < 10; i++ {
        fmt.Println("send ready: ", i)
        channel <- i
        fmt.Println("has send: ", i)
    }
}

func recv(channel <-chan int) {
    for i := range channel {
        fmt.Println("has received: ", i)
    }
}

func main() {
    go send(send_receive)
    go recv(send_receive)

    time.Sleep(3 * time.Second)
}

使用 Goroutine 和 Channel 实现一个简单的生产者-消费者模型

package main

import (
    "fmt"
    "time"
)

var channel chan int = make(chan int, 10)

func producer(role string, channel chan<- int) {
    for i := 0; i < 10; i++ {
        fmt.Println(role,": ", i)
        channel <- i
    }
    fmt.Println("produce")
}

func consumer(role string, channel <-chan int) {
    for i := range channel {
        fmt.Println(role, ": ", i)
    }
    fmt.Println("consume")

}

func main(){
    go producer("生产者1", channel)
    go producer("生产者2", channel)
    go consumer("消费者1", channel)
    go consumer("消费者2", channel)

    time.Sleep(5 * time.Second)
    close(channel)
    time.Sleep(5 * time.Second)
}

缓冲区

之间在说 channel 的时候提到了缓冲区,channel := make(chan int, 3) 中指定了缓冲区的容量为 3

缓冲区 channel 有一个重要的使用就是实现线程池

Q: 多线程的调度使用线程池有什么作用

A: 线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

接下来尝试实现一个简单的线程池:

  • 创建一组 goroutine 集合监听缓冲区 channel 等待任务
  • 向缓冲区 channel 添加 Job
  • 任务结束后向另一个缓冲区 channel 写入结果
  • 从存储结果的 channel 读取数据并输出
package main

import (
    "fmt"
    "sync"
    "time"
)

var jobs = make(chan int, 10)
var results = make(chan int, 10)

func task(number int) int {
    sum := 0
    for i := 0; i < number; i++ {
        sum += i
    }
    // 用 sleep 来模拟耗时的计算
    time.Sleep(10 * time.Second)
    return sum
}

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        results <- task(job)
    }
    wg.Done()
}

// 创建 numberOfWorker 个 goroutine 的线程池
// 创建 goroutine 之前调用 wg.Add(1) 来增加计数器,并将 wg 的地址传给 worker
// 使用 wg.Wait() 等待所有的 goroutine 执行完毕,关闭 results channel,保证不再有 goroutine 能写入数据
func createWorkerPool(numberOfWorker int) {
    var wg sync.WaitGroup
    for i := 0; i < numberOfWorker; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

// 向线程池分配任务
func allocate(numberOfJobs int) {
    for i := 0; i < numberOfJobs; i++ {
        jobs <- i
    }
    close(jobs)
}

func result(done chan bool) {
    for re := range results {
        fmt.Printf("Result is %d\n", re)
    }
    done <- true
}

func main() {
    startTime := time.Now()
    done := make(chan bool)

    go allocate(10)
    go result(done)
    createWorkerPool(10)

    endTime := time.Now()
    runtime := endTime.Sub(startTime)

    fmt.Println("total time: ", runtime)
}

Select

Go 语言借用多路复用的概念,提出了 select 关键字,用于多路监听多个通道:

  • 当监听的通道没有状态是可读或可写的,select 是阻塞的
  • 只要监听的通道中有一个状态是可读或可写的,则 select 不会阻塞,而是进入处理就绪通道的分支流程
  • 如果监听的通道有多个可读或可写的状态,则 select 随机选取一个处理

简单使用:

package main

import "fmt"

func main() {
    channel := make(chan int, 1)
    go func(chan int){
        for {
            select {
            case channel <- 0:
            case channel <- 1:
            }
        }
    }(channel)

    for i := 0; i < 5; i++ {
        fmt.Println(<-channel)
    }
}

Fan in/out

扇入(Fan in): 指将多路通道聚合到一条通道中处理
扇出(Fan out): 指将一条通道发散到多条通道中处理

在 Go 语言里面实现扇入是通过 select 聚合多条通道服务,实现扇出是通过启动多个 goroutine 并发处理

一种应用场景是实现退出通知机制:读取已经关闭的通道不会引起阻塞,也不会导致 Panic,而是立即返回该通道存储类型的零值。关闭 select 监听的某个通道能使 select 立即感知这种通知,然后进行响应的处理。

简单实现:(下游的消费者不需要随机数时,显式地通知生产者停止生产)

package main

import (
    "fmt"
    "math/rand"
    "runtime"
)

func generateRandInt(done chan struct{}) chan int{
    ch := make(chan int)
    go func(){
    Label:
        for {
            select{
            case ch <- rand.Int():
            // 增加对退出通知信号 done 的监听
            case <-done:
                break Label
            }
        }
        close(ch)
    }()
    return ch
}

func main(){
    done := make(chan struct{})
    ch := generateRandInt(done)

    fmt.Println(<-ch)
    fmt.Println(<-ch)

    // 发送通知,告诉生产者停止生产
    close(done)

    fmt.Println(<-ch)
    fmt.Println(<-ch)

    // 生产者已经退出
    fmt.Println("当前的 goroutine 数目: ", runtime.NumGoroutine())

}

Output:

5577006791947779410
8674665223082153551
6129484611666145821
0
当前的 goroutine 数目:  1

互斥锁 Mutex

互斥锁位于 sync 包,提供了 LockUnlock 方法

互斥锁提供了一种锁机制来保证同一时刻只有一个 goroutine 访问临界区,这样就可以解决线程竞争问题。

如果某个 goroutine 已经获得了锁,其他的 goroutine 尝试获取锁时将被阻塞,直到锁被释放。

看一个存在竞争的简单场景:通过循环生成 1000 个 goroutine,每个 goroutine 都是并发执行 increment 函数并且并发获取 x 的值

package main  
import (  
    "fmt"
    "sync"
    )
var x  = 0  
func increment(wg *sync.WaitGroup) {  
    x = x + 1
    wg.Done()
}
func main() {  
    var w sync.WaitGroup
    for i := 0; i < 1000; i++ {
        w.Add(1)        
        go increment(&w)
    }
    w.Wait()
    fmt.Println("final value of x", x)
}

发现最后得到的 x 值并不是 1000,会始终比 1000 小,这是因为这里存在线程竞争的缘故

用互斥锁在解决这个问题:把 x=x+1 这句代码放在 .Lock () 和 .Unlock() 之间,这样就只有一个 goroutine 能在同一时刻执行这句代码。

package main  
import (  
    "fmt"
    "sync"
    )
var x  = 0  
func increment(wg *sync.WaitGroup, m *sync.Mutex) {  
    m.Lock()
    x = x + 1
    m.Unlock()
    wg.Done()   
}
func main() {  
    var w sync.WaitGroup
    var m sync.Mutex
    for i := 0; i < 1000; i++ {
        w.Add(1)        
        go increment(&w, &m)
    }
    w.Wait()
    fmt.Println("final value of x", x)
}

Go 并发编程有很多有趣的地方,有空再说说 goroutine 调度器和线程池啥的,上面都说的太随意了…

以上。有错欢迎提出。

参考:
https://github.com/ffhelicopter/Go42/blob/master/content/42_21_goroutine.md http://www.hi-roy.com/2018/06/04/GO%E8%AF%AD%E8%A8%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0-%E7%BC%93%E5%86%B2%E5%8C%BAChannels%E5%92%8C%E7%BA%BF%E7%A8%8B%E6%B1%A0/ https://golangbot.com/mutex/