线程池有效的减少线程创建和销毁所带来的开销。若 worker 线程执行的 G 任务中发生系统调用,则操作系统会将该线程置为阻塞状态,浪费线程资源,线程池消费任务队列的能力变弱了。增加线程池中线程数量可以一定程度上提高消费能力,但随着线程数量增多,过多线程会争抢 CPU,线程数过多,那么操作系统会频繁的切换线程,频繁的上下文切换就成了性能瓶颈。
以通信的⽅式来共享内存。Golang内部有三个对象:
P对象(processor) 代表上下⽂,包含运行 Go 代码的必要资源,代表了真正的并发度,即有多少个 goroutine 可以同时运行,数量由启动时环境变量决定,一般设置为 CPU 的核数,使 Go 程序能充分利用 CPU。在确定了P的最大数量n后,运行时系统会根据这个数量创建n个P。
M(work thread)代表⼯作线程,一个M阻塞了,P会创建新的M,运行时动态创建,M与P的数量没有绝对关系。
G对象(goroutine,Goroutine 是Golang实际并发执⾏的实体,它底层是使⽤协程(coroutine)实现并发,coroutine是⼀种运⾏在⽤户态的⽤户线程)。M必须拥有P才可以执行G中的代码,P含有一个包含多个G的队列,P可以调度G交由M执行。
1struct G
2{
3 uintptr stackguard; // 分段栈的可用空间下界
4 uintptr stackbase; // 分段栈的栈基址
5 Gobuf sched; //进程切换时,利用sched域来保存上下文
6 uintptr stack0;
7 FuncVal* fnstart; // goroutine运行的函数
8 void* param; // 用于传递参数,睡眠时其它goroutine设置param,唤醒时此goroutine可以获取
9 int16 status; // 状态Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead
10 int64 goid; // goroutine的id号
11 G* schedlink;
12 M* m; // for debuggers, but offset not hard-coded
13 M* lockedm; // G被锁定只能在这个m上运行
14 uintptr gopc; // 创建这个goroutine的go表达式的pc
15 ...
16};
每⼀个线程(M0)维护⼀个上下⽂(P),任何时刻,⼀个上下⽂中只有⼀个Goroutine,其他Goroutine在上下文对应的runqueue中等待。
队列轮转:每个 P有个局部队列,局部队列保存待执⾏的 goroutine(流程2),当 M绑定的 P的的局部队列已经满了之后就会把 goroutine 放到全局队列(流程2-1)。P 会周期性的将G调度到M中执行,执行一段时间后,保存上下文,将G放到队列尾部,然后从队列中再取出一个G进行调度。当 M绑定的 P的局部队列为空时,M会从全局队列获取到本地队列来执⾏,全局队列中 G 的来源,主要有从系统调用中恢复的 G,防止全局队列中的 G 被“饿死”。
工作窃取:多个 P 中维护的 G 队列有可能是不均衡的。当从全局队列中没有获取到可执⾏的 G时候,M会从其他 P 的局部队列中偷取 G来执⾏(流程3.2)。确保了每个 OS 线程都能充分的使用
系统调用:一般 M 的个数会略大于 P 的个数,多出来的 M 会在 G 产生系统调用时发挥作用。当G0即将进入系统调用时,M0将释放P,进而某个空闲的M1获取P,继续执行P队列中剩下的G。当G0系统调用结束后,如果有空闲的P,则获取一个P,继续执行G0。如果没有,则将G0放入全局队列,等待被其他的P调度,然后M0将进入缓存池睡眠。
阻塞:当 G因 channel 或者 network I/O 阻塞时,不会阻塞 M,M会寻找其他 runnable 的 G;当阻塞的 G恢复后会重新进⼊ runnable 进⼊ P队列等待执⾏。
线程实现,m内核线程:n用户线程
正常模式(⾮公平锁):正常模式下,所有等待锁的 goroutine 按照 FIFO(先进先出)顺序等待。唤醒的 goroutine 不会直接拥有锁,⽽是会和新请求锁的 goroutine 竞争锁的拥有。新请求锁的 goroutine 具有优势:它正在 CPU上执⾏,⽽且可能有好⼏个,所以刚刚唤醒的 goroutine 有很⼤可能在锁竞争中失败。如果⼀个等待的 goroutine 超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。
饥饿模式(公平锁):为了解决了等待 G队列的⻓尾问题,饥饿模式下,直接由 unlock 把锁交给等待队列中排在第⼀位的 G(队头),同时,饥饿模式下,新进来的 G不会参与抢锁也不会进⼊⾃旋状态,会直接进⼊等待队列的尾部,这样很好的解决了⽼的 g ⼀直抢不到锁的场景。
对于两种模式,正常模式下的性能是最好的,goroutine 可以连续多次获取锁,免去上下文切换开销,饥饿模式解决了取锁公平的问题,但是性能会下降,其实是性能和公平的⼀个平衡模式。
sync.Mutex互斥锁,使同一时刻只能有一个协程执行某段程序,其他协程等待该协程执行完再依次执行。
xxxxxxxxxx
191var sum = 0
2var lock = sync.Mutex{}
3var msgChan = make(chan struct{})
4func main() {
5 //开启100个协程来让 sum + 1
6 for i := 1; i <= 1000; i++ {
7 go add()
8 }
9 for i := 1; i <= 1000; i++ {
10 <-msgChan
11 }
12 fmt.Println(sum)
13}
14func add() {
15 lock.Lock()
16 defer lock.Unlock()
17 sum += 1
18 msgChan <- struct{}{}
19}
RWMutex 是单写多读锁,适⽤于读多写少的场景,通过记录 readerCount 读锁的数量来进⾏控制,当有⼀个写锁的时候,会将读锁数量设置为负数1<<30。⽬的是让新进⼊的读锁等待写锁释放之后再获取读锁。同样的写锁也会等待之前的读锁都释放完毕,才会开始进⾏后续的操作。
写锁释放完之后,会将值重新加上1<<30,并通知刚才新进⼊的读锁(rw.readerSem),所有因操作锁定读锁⽽被阻塞的 goroutine 会被唤醒,并都可以成功锁定读锁。读锁被解锁后,在没有被其他读锁锁定的前提下,所有因操作锁定写锁⽽被阻塞的 goroutine,其中等待时间最⻓的⼀个 goroutine 会被唤醒。
sync.RWMutex,写所互斥,读锁不互斥
xxxxxxxxxx
291var sum = 0
2var lock = sync.RWMutex{}
3var msgChan = make(chan struct{})
4func main() {
5 //开启100个协程来让 sum + 1
6 for i := 0; i <= 64; i++ {
7 go add()
8 }
9 for i := 0; i <= 64; i++ {
10 go get()
11 }
12 for i := 0; i <= 64; i++ {
13 <-msgChan
14 }
15 fmt.Println(sum)
16}
17func add() {
18 // 获得写锁
19 lock.Lock()
20 defer lock.Unlock()
21 sum += 1
22 msgChan <- struct{}{}
23}
24func get() {
25 // 获得读锁
26 lock.RLock()
27 defer lock.RUnlock()
28 fmt.Println(sum)
29}
⼀个 WaitGroup 对象可以等待⼀组协程结束。调⽤ wg.Add(delta int)设置 worker 协程的个数,然后创建 worker 协程;worker 协程执⾏结束以后,都要调⽤ wg.Done();main 协程调⽤ wg.Wait()且被 block,直到所有 worker 协程全部执⾏结束后返回。
WaitGroup 主要维护了2 个计数器,⼀个是请求计数器 v,⼀个是等待计数器 w,⼆者组成⼀个64bit 的值,请求计数器占⾼32bit,等待计数器占低32bit。每次 Add执⾏,请求计数器 v 加1,Done⽅法执⾏,请求计数器减1,v 为0 时通过信号量唤醒 Wait()。
sync.WaitGroup等待多个任务执行完毕
xxxxxxxxxx
221var sum = 0
2var lock = sync.Mutex{}
3var wg = sync.WaitGroup{}
4func main() {
5 // 添加64个任务
6 wg.Add(64)
7 //开启100个协程来让 sum + 1
8 for i := 0; i < 64; i++ {
9 go add()
10 }
11 // 等待任务执行完毕
12 wg.Wait()
13 fmt.Println(sum)
14}
15func add() {
16 // 获得写锁
17 lock.Lock()
18 defer lock.Unlock()
19 sum += 1
20 // 剩余任务减一
21 wg.Done()
22}
1,Context
包提供上下文机制在 goroutine
之间传递 deadline、取消信号(cancellation signals)或者其他请求相关的信息。
xxxxxxxxxx
61type Context interface {
2 Deadline() (deadline time.Time, ok bool)
3 Done() <-chan struct{}
4 Err() error
5 Value(key interface{}) interface{}
6}
服务器程序通过 context.Background()
为每个接受的请求创建一个 Context
实例,称为rootContext
。
之后的 goroutine
中将rootContext
作为参数通过调用 context.WithCancel
、context.WithDeadline
、context.WithTimeout
方法,创建的子 context
。
xxxxxxxxxx
81// 返回的 cancelFunc,如果被调用,会导致 Done channel 关闭
2func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
3// 到达指定的截至时间或者cannelFunc被调用,将关闭消息通道
4func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
5// 经过指定的时间间隔或者cannelFunc被调用,将关闭消息通道
6func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
7// 用于在不同goroutine间传递数据(签名、trace_id),类似Java中的ThreadLocal。ctx.Value(k)
8func WithValue(parent Context, key, val interface{}) Context
goroutine
通过 context.Done()
方法监听取消信号。func Done() <-chan struct{}
返回一个只读channel,用于接收取消信号。(可以借助 select 语句,如果收到取消信号,就退出 goroutine
;否则,默认子句是继续执行 goroutine
);
关闭消息通道条件:当一个 Context
被取消(比如执行了 cancelFunc()
);WithDeadline
创建的 context,deadline 到期;WithTimeout
创建的 context,timeout 到期。
xxxxxxxxxx
131func Stream(ctx context.Context, out chan<- Value) error {
2 for {
3 v, err := DoSomething(ctx)
4 if err != nil {
5 return err
6 }
7 select {
8 case <-ctx.Done():
9 return ctx.Err()
10 case out <- v:
11 }
12 }
13}
sync.Once在高并发的场景下,来保证代码只执行一次, 适合用于创建单例、只加载一次资源等只需要执行一次的场景。
xxxxxxxxxx
121var once = sync.Once{}
2func doInit() {
3 fmt.Println("init done")
4}
5func main() {
6 //开启100个协程来让 sum + 1
7 for i := 0; i < 64; i++ {
8 go func() {
9 once.Do(doInit)
10 }()
11 }
12}
启动时没有任何go程被执行完毕,标志位为0,多个go程竞争一个同步锁,竞争成功的go程获得同步锁,其它线程阻塞,在他执行完毕后将执行标志位写为1并释放锁,其他go程再次开始竞争锁,拿到锁后检查标志位,发现标志位为1,直接返回并释放锁。
x1func (o *Once) Do(f func()) {
2 if atomic.LoadUint32(&o.done) == 0 {
3 o.doSlow(f)
4 }
5}
6
7// 使用lock和在执行完f后再设置标志位是为了保证f执行成功,如果f执行失败,将不更新标志位,并释放锁,其它等待锁的go程继续尝试执行f。
8// 如果使用CAS,如果第一个执行失败,当时由于标志位已被修改,其它go程无法继续尝试完成f。
9// if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
10// f()
11// }
12func (o *Once) doSlow(f func()) {
13 o.m.Lock()
14 defer o.m.Unlock()
15 if o.done == 0 {
16 defer atomic.StoreUint32(&o.done, 1)
17 f()
18 }
19}
条件变量 sync.Cond,基于互斥锁的基础上,增加了一个通知队列,协程刚开始是等待的,通知的协程会从通知队列中唤醒一个或多个被通知的协程。
xxxxxxxxxx
311func main() {
2 // 3个工人共享一个话筒,同一时刻只能有一个发言
3 // 阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现
4 cond := sync.NewCond(&sync.Mutex{})
5 var wg sync.WaitGroup
6 // 3工人+1指令员
7 wg.Add(4)
8 for i := 1; i <= 3; i++ {
9 go func(num int) {
10 defer wg.Done()
11 // 获得话筒
12 cond.L.Lock()
13 fmt.Println(num, "就绪")
14 // 阻塞当前协程,并释放锁资源,直到被其他协程调用 Broadcast 或者 Signal 方法唤醒,使用的时候需要加锁
15 cond.Wait()
16 fmt.Println(num, "运行中")
17 // 释放话筒
18 cond.L.Unlock()
19 }(i)
20 }
21 //等待所有goroutine都进入wait状态
22 time.Sleep(2 * time.Second)
23 go func() {
24 defer wg.Done()
25 fmt.Println("开始运行")
26 // 广播通知,唤醒所有等待的协程
27 cond.Broadcast()
28 }()
29 //防止函数提前返回退出
30 wg.Wait()
31}
xxxxxxxxxx
111func (c *Cond) Wait() {
2 c.checker.check()
3 // 把调用它的 goroutine(也就是当前的 goroutine)加入到当前条件变量的通知队列中。
4 t := runtime_notifyListAdd(&c.notify)
5 // 解锁当前的条件变量基于的那个互斥锁。
6 c.L.Unlock()
7 // 让当前的 goroutine 处于等待状态,等到通知到来时再决定是否唤醒它。此时,这个 goroutine 就会阻塞在调用这个Wait方法的那行代码上。
8 runtime_notifyListWait(&c.notify, t)
9 // 如果通知到来并且决定唤醒这个 goroutine,那么就在唤醒它之后,尝试重新锁定当前条件变量基于的互斥锁,成功后返回。
10 c.L.Lock()
11}
select
语句使一个 Go 程可以等待多个通信操作。每个case
表达式中都必须包含通道的读或者写;select
语句会查看哪些case的读写操作能成功执行,然后开始选择能成功执行的候选分支,进行读写操作,执行对应case内容,然后结束当前select ;当多个分支都准备好时会随机选择一个执行case,而随机的引入就是为了避免饥饿问题的发生,然后结束当前select 。如果所有的候选分支都不满足选择条件,那么默认分支就会被执行,如果这时没有默认分支,那么select
语句就会立即进入阻塞状态,直到至少有一个候选分支满足选择条件为止。
xxxxxxxxxx
191//break 方式
2loop:
3 for {
4 select {
5 case _, ok := <-ch1: //ch1非空或许信道被关闭且没有值时执行此语句
6 if !ok {
7 ch1 = nil //ch1已经关闭且没有值,将他设置为nil,以屏蔽ch1
8 }
9 fmt.Println("ch1")
10 case _, ok := <-ch2: //ch2非空或许信道被关闭且没有值时执行此语句
11 if !ok {
12 break loop //跳出for循环
13 }
14 fmt.Println("ch2")
15 default: // 所有分支都阻塞时执行此分支
16 time.Sleep(50 * time.Millisecond)
17 }
18 }
19 fmt.Println("END")
xxxxxxxxxx
141// 通过有限容量的管道,限制并发数
2// 为每个进入的请求都创建了新的Go程,只是只有MaxOutstabding个操作同时进行,其它Go程被阻塞。若请求来得很快,该程序就会无限地消耗资源
3var sem = make(chan int, MaxOutstanding)
4func handle(r *Request) {
5 sem <- 1 // 往信道中写数据,标志占用一个资源
6 process(r) // 可能需要很长时间。
7 <-sem // 往信道中取数据,标志释放一个资源
8}
9func Serve(queue chan *Request) {
10 for {
11 req := <-queue
12 go handle(req) // 无需等待 handle 结束。
13 }
14}
xxxxxxxxxx
101// 通过有限容量的管道,限制并发数
2func Serve(queue chan *Request) {
3 for req := range queue {
4 sem <- 1
5 go func(req *Request) { //默认情况下闭包类变量只在被执行时才求值,可能导致变量值与创建闭包时的变量不一致。将当前req与函数绑定,立即执行求值。
6 process(req)
7 <-sem
8 }(req)
9 }
10}
xxxxxxxxxx
151// 启动固定数量的 handle Go程,一起从请求信道中读取数据。
2func handle(queue chan *Request) {
3 // 从quene中取出还没被处理的请求,quene长度减一
4 for r := range queue {
5 process(r)
6 }
7}
8func Serve(clientRequests chan *Request, quit chan bool) {
9 // 启动处理程序
10 for i := 0; i < MaxOutstanding; i++ {
11 // MaxOutstanding个handle同时从clientRequests获取任务处理请求
12 go handle(clientRequests)
13 }
14 <-quit // 等待通知退出。
15}
因为拷贝的内容有时候是非引用类型(int、string、struct等这些),这样就在函数中就无法修改原内容数据;有的是引用类型(指针、map、slice、chan等这些),这样就可以修改原内容数据。