DataWhale & Golang(十三、并发编程)
学习大纲:
目录
补充:
Go 并发
Go 语言支持并发,我们只需要通过 go 关键字来开启 goroutine 即可。
goroutine 是轻量级线程,goroutine 的调度是由 Golang 运行时进行管理的。
goroutine 语法格式:
go 函数名( 参数列表 )例如:
go f(x, y, z)开启一个新的 goroutine:
f(x, y, z)Go 允许使用 go 语句开启一个新的运行期线程, 即 goroutine,以一个不同的、新创建的 goroutine 来执行一个函数。 同一个程序中的所有 goroutine 共享同一个地址空间。
实例
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
//
world
hello
hello
world
world
hello
hello
world
world
hello
go为什么能做到高并发
流程图:
goroutine是Go并行设计的核心。goroutine说到底其实就是协程,但是它比线程更小,几十个goroutine可能体现在底层就是五六个线程,Go语言内部帮你实现了这些goroutine之间的内存共享。执行goroutine只需极少的栈内存(大概是4~5KB),当然会根据相应的数据伸缩。也正因为如此,可同时运行成千上万个并发任务。goroutine比thread更易用、更高效、更轻便。
一些高并发的处理方案基本都是使用协程,openresty也是利用lua语言的协程做到了高并发的处理能力,PHP的高性能框架Swoole目前也在使用PHP的协程。
协程更轻量,占用内存更小,这是它能做到高并发的前提。这个就是GO高并发最关键的点。每一个请求都是一个单独的goroutine去执行。
学习go的HTTP代码。先创建一个简单的web服务。
package main import ( "fmt" "log" "net/http" ) func response(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello world!") //这个写入到w的是输出到客户端的 } func main() { http.HandleFunc("/", response) err := http.ListenAndServe(":9000", nil) if err != nil { log.Fatal("ListenAndServe: ", err) } }
然后编译
go build -o test_web.gobin ./test_web.gobin
然后访问
curl 127.0.0.1:9000 Hello world!
通道(channel)
通道(channel)是用来传递数据的一个数据结构。
通道可用于两个 goroutine 之间通过传递一个指定类型的值来同步运行和通讯。操作符
<-
用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。ch <- v // 把 v 发送到通道 ch v := <-ch // 从 ch 接收数据 // 并把值赋给 v
声明一个通道很简单,我们使用chan关键字即可,通道在使用前必须先创建:
ch := make(chan int)
注意:
- 默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须有接收端相应的接收数据。
- 以下实例通过两个 goroutine 来计算数字之和,在 goroutine 完成计算后,它会计算两个结果的和:
实例
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // 把 sum 发送到通道 c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // 从通道 c 中接收
fmt.Println(x, y, x+y)
}
输出结果为:
-5 17 12
通道缓冲区
通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小:
ch := make(chan int, 100)带缓冲区的通道允许发送端的数据发送和接收端的数据获取处于异步状态,就是说发送端发送的数据可以放在缓冲区里面,可以等待接收端去获取数据,而不是立刻需要接收端去获取数据。
不过由于缓冲区的大小是有限的,所以还是必须有接收端来接收数据的,否则缓冲区一满,数据发送端就无法再发送数据了。
注意:
- 如果通道不带缓冲,发送方会阻塞直到接收方从通道中接收了值。如果通道带缓冲,发送方则会阻塞直到发送的值被拷贝到缓冲区内;
- 如果缓冲区已满,则意味着需要等待直到某个接收方获取到一个值。接收方在有值可以接收之前会一直阻塞。
实例
package main
import "fmt"
func main() {
// 这里我们定义了一个可以存储整数类型的带缓冲通道
// 缓冲区大小为2
ch := make(chan int, 2)
// 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
// 而不用立刻需要去同步读取数据
ch <- 1
ch <- 2
// 获取这两个数据
fmt.Println(<-ch)
fmt.Println(<-ch)
}
执行输出结果为:
1
2
Go 遍历通道与关闭通道
Go 通过 range 关键字来实现遍历读取到的数据,类似于与数组或切片。格式如下:
v, ok := <-ch如果通道接收不到数据后 ok 就为 false,这时通道就可以使用 close() 函数来关闭。
实例
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
// range 函数遍历每个从通道接收到的数据,因为 c 在发送完 10 个
// 数据之后就关闭了通道,所以这里我们 range 函数在接收到 10 个数据
// 之后就结束了。如果上面的 c 通道不关闭,那么 range 函数就不
// 会结束,从而在接收第 11 个数据的时候就阻塞了。
for i := range c {
fmt.Println(i)
}
}
执行输出结果为:
0
1
1
2
3
5
8
13
21
34
13.并发编程
13.1 并发与并行
Erlang 之父 Joe Armstrong曾经以下图解释并发与并行。
- 并发在图中的解释是两队人排队接咖啡,两队切换。
- 并行是两个咖啡机,两队人同时接咖啡。
- “Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.” — Rob Pike
- 并发使并行变得容易,并发提供了一种构造解决方案的方法,并行一般伴随这多核。并发一般伴随这CPU切换轮训。
13.2 为什么需要并发?
原因有很多,其中比较重要的原因如下:
- 1) 不阻塞等待其他任务的执行,从而浪费时间,影响系统性能。
- 2) 并行可以使系统变得简单些,将复杂的大任务切换成许多小任务执行,单独测试。
在开发中,经常会遇到为什么某些进程通常会相互等待呢?为什么有些运行慢,有些快呢?
通常受限来源于进程I/O或CPU。
进程I/O限制
- 如:等待网络或磁盘访问
CPU限制
- 如:大量计算
13.3 Go并发原语
13.3.1 协程Goroutines
每个go程序至少都有一个Goroutine:主Goroutine(在运行进程时自动创建)。以及程序中其他Goroutine 例如:下面程序创建了main的Goroutine及匿名的Goroutine。
func main() { go func() { fmt.Println("you forgot me !") }() }
在go中有个package是sync,里面包含了:
WaitGroup、Mutex、Cond、Once、Pool。
1.WaitGroup
Add(n)把计数器设置为n,Done()会将计数器每次减1,Wait()函数会阻塞代码运行,直到计数器减0。
等待多个goroutine完成,可以使用一个等待组。
// 这是我们将在每个goroutine中运行的函数。 // 注意,等待组必须通过指针传递给函数。 func worker(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf("Worker %d starting\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d done\n", id) } func main() { var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go worker(i, &wg) } wg.Wait() }
这里首先把wg 计数设置为1, 每个for循环运行完毕都把计数器减一,主函数中使用Wait() 一直阻塞,直到wg为1——也就是所有的5个for循环都运行完毕。
使用注意点:
- 计数器不能为负值
- WaitGroup对象不是引用类型
2.Once
sync.Once可以控制函数只能被调用一次,不能多次重复调用。
var doOnce sync.Once func main() { DoSomething() DoSomething() } func DoSomething() { doOnce.Do(func() { fmt.Println("Run once - first time, loading...") }) fmt.Println("Run this every time") }
输出:
Run once - first time, loading... Run this every time Run this every time
3.互斥锁Mutex
互斥锁是并发程序对共享资源进行访问控制的主要手段,在go中的sync中提供了Mutex的支持。
例如:使用互斥锁解决多个Goroutine访问同一变量。// SafeCounter 的并发使用是安全的。 type SafeCounter struct { v map[string]int mux sync.Mutex } // Inc 增加给定 key 的计数器的值。 func (c *SafeCounter) Inc(key string) { c.mux.Lock() defer c.mux.Unlock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v c.v[key]++ } // Value 返回给定 key 的计数器的当前值。 func (c *SafeCounter) Value(key string) int { c.mux.Lock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.mux.Unlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) fmt.Println(c.Value("somekey")) }
在这个例子中,我们使用了sync.Mutex的Lock与Unlock方法。
在前面例子中我们使用了sync.Mutex,读操作与写操作都会被阻塞。其实读操作的时候我们是不需要进行阻塞的,因此sync中还有另一个锁:读写锁RWMutex,这是一个单写多读模型。
sync.RWMutex分为:读、写锁。在读锁占用下,会阻止写,但不会阻止读,多个goroutine可以同时获取读锁,调用RLock()函数即可,RUnlock()函数释放。写锁会阻止任何goroutine进来,整个锁被当前goroutine,此时等价于Mutex,写锁调用Lock启用,通过UnLock()释放。
例如: 我们对上述例子进行改写,读的时候用读锁,写的时候用写锁。
// SafeCounter 的并发使用是安全的。 type SafeCounter struct { v map[string]int rwmux sync.RWMutex } // Inc 增加给定 key 的计数器的值。 func (c *SafeCounter) Inc(key string) { // 写操作使用写锁 c.rwmux.Lock() defer c.rwmux.Unlock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v c.v[key]++ } // Value 返回给定 key 的计数器的当前值。 func (c *SafeCounter) Value(key string) int { // 读的时候加读锁 c.rwmux.RLock() // Lock 之后同一时刻只有一个 goroutine 能访问 c.v defer c.rwmux.RUnlock() return c.v[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) for i := 0; i < 10; i++ { fmt.Println(c.Value("somekey")) } }
4.条件变量Cond
sync.Cond是条件变量,它可以让一系列的 Goroutine 都在满足特定条件时被唤醒。条件变量通常与互斥锁一起使用,条件变量可以在共享资源的状态变化时通知相关协程。 经常使用的函数如下:
- NewCond
创建一个Cond的条件变量。func NewCond(l Locker) *Cond
- Broadcast
广播通知,调用时可以加锁,也可以不加。func (c *Cond) Broadcast()
- Signal
单播通知,只唤醒一个等待c的goroutine。func (c *Cond) Signal()
- Wait
等待通知, Wait()会自动释放c.L,并挂起调用者的goroutine。之后恢复执行,Wait()会在返回时对c.L加锁。除非被Signal或者Broadcast唤醒,否则Wait()不会返回。func (c *Cond) Wait()
例如:使用WaitGroup等待两个Goroutine完成, Goroutine1与Goroutine2进入Wait状态,main函数在2s后改变共享数据状态,调用Broadcast函数,此时c.Wait从中恢复并判断条件变量是否已经满足,满足后消费条件,解锁,wg.Done()。
5.原子操作
原子操作即是进行过程中不能被中断的操作。针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。 为了实现这样的严谨性,原子操作仅会由一个独立的CPU指令代表和完成。在sync/atomic 中,提供了一些原子操作,包括加法(Add)、比较并交换(Compare And Swap,简称 CAS)、加载(Load)、存储(Store)和交换(Swap)。
1.加法操作 提供了32/64位有符号与无符号加减操作
var i int64 atomic.AddInt64(&i, 1) fmt.Println("i = i + 1 =", i) atomic.AddInt64(&i, -1) fmt.Println("i = i - 1 =", i)
2.比较并交换
CAS: Compare And Swap
如果addr和old相同,就用new代替addr。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) 如: var a int32 = 1 var b int32 = 2 var c int32 = 3 ok := atomic.CompareAndSwapInt32(&a, a, b) fmt.Printf("ok = %v, a = %v, b = %v\n", ok, a, b) ok = atomic.CompareAndSwapInt32(&a, c, b) fmt.Printf("ok = %v, a = %v, b = %v, c=%v\n", ok, a, b, c)
输出:
ok = true, a = 2, b = 2 ok = false, a = 2, b = 2, c = 3
3.交换
不管旧值与新值是否相等,都会通过新值替换旧值,返回的值是旧值。
func SwapInt32(addr *int32, new int32) (old int32)
例如:
var x int32 = 1 var y int32 = 2 old := atomic.SwapInt32(&x, y) fmt.Println(x, old)
输出:2 1
4.加载
当读取该指针指向的值时,CPU 不会执行任何其它针对此值的读写操作
func LoadInt32(addr *int32) (val int32)
例如:
var x1 int32 = 1 y1 := atomic.LoadInt32(&x) fmt.Println("x1, y1:", x1, y1)
5.存储
加载逆向操作
var xx int32 = 1 var yy int32 = 2 atomic.StoreInt32(&yy, atomic.LoadInt32(&xx)) fmt.Println(xx, yy)
6.原子类型
sync/atomic中添加了一个新的类型Value
v := atomic.Value{} v.Store(1) fmt.Println(v.Load())
7.临时对象池Pool
sync.Pool 可以作为临时对象的保存和复用的集合。P是Goroutine中的重要组成之一,例如:P实际上在操作时会为它的每一个goroutine相关的P生成一个本地P。 本地池没有,则会从其它的 P 本地池获取,或者全局P取。
sync.Pool对于需要重复分配、回收内存的地方,sync.Pool 是一个很好的选择。减少GC负担,如果Pool中有对象,下次直接取,不断服用对象内存,减轻 GC 的压力,提升系统的性能。
var pool *sync.Pool type Foo struct { Name string } func Init() { pool = &sync.Pool{ New: func() interface{} { return new(Foo) }, } } func main() { fmt.Println("Init p") Init() p := pool.Get().(*Foo) fmt.Println("第一次取:", p) p.Name = "bob" pool.Put(p) fmt.Println("池子有对象了,调用获取", pool.Get().(*Foo)) fmt.Println("池子空了", pool.Get().(*Foo)) }var pool *sync.Pool type Foo struct { Name string } func Init() { pool = &sync.Pool{ New: func() interface{} { return new(Foo) }, } } func main() { fmt.Println("Init p") Init() p := pool.Get().(*Foo) fmt.Println("第一次取:", p) p.Name = "bob" pool.Put(p) fmt.Println("池子有对象了,调用获取", pool.Get().(*Foo)) fmt.Println("池子空了", pool.Get().(*Foo)) }
输出:
Init p 第一次取: &{} 池子有对象了,调用获取 &{bob} 池子空了 &{}
13.3.2 通道Channel
1.channel
这里引入一下CSP模型,CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,由 Tony Hoare 于 1977 年提出。
简单来说是实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 Channel。Goroutine对应并发实体。
1)使用
Channel的使用需要通过make创建。unBufferChan := make(chan int) bufferChan := make(chan int, x)
上述创建了无缓冲的Channel与有缓冲的Channel,创建完成之后,需要进行读写操作,如下:
ch := make(chan int, 1) // 读操作 x <- ch // 写操作 ch <- x
最终要正确关闭,只需要调用close即可。
// 关闭 close(ch)
当channe关闭后会引发下面相关问题:
- 重复关闭Channel 会 panic
- 向关闭的Channel发数据 会 Panic,读关闭的Channel不会Panic,但读取的是默认值
读操作默认值如何区分:
val, ok := <-ch if ok == false { // channel closed }
2)channel分类
- 无缓冲的Channel
发送与接受同时进行。如果没有Goroutine读取Channel(<-Channel),发送者(Channel<-x)会一直阻塞。- 有缓冲的Channel
发送与接受并非同时进行。当队列为空,接受者阻塞;队列满,发送者阻塞。2.Select
- 每个case 都必须是一个通信
- 所有channel表达式都会被求值
- 如果没有default语句,select将阻塞,直到某个通信可以运行
- 如果多个case都可以运行,select会随机选择一个执行
1) 随机选择:
select特性之一: 随机选择,下面会随机打印不同的case结果ch := make(chan int, 1) ch <- 1 select { case <-ch: fmt.Println("ch 1") case <-ch: fmt.Println("ch 2") default: fmt.Println("ch default") }
假设chan中没有值,有可能引发死锁。
例如:下面执行后可能会引发死锁。ch := make(chan int, 1) select { case <-ch: fmt.Println("ch 1") case <-ch: fmt.Println("ch 2") }
此时可以加上default即可解决
default: fmt.Println("ch default")
另外,可以添加超时。
timeout := make(chan bool, 1) go func() { time.Sleep(2 * time.Second) timeout <- true }() ch := make(chan int, 1) select { case <-ch: fmt.Println("ch 1") case <-timeout: fmt.Println("timeout 1") case <-time.After(time.Second * 1): fmt.Println("timeout 2") }
2) 检查chan
select+defaul方式来确保channel是否满,如果要调整channel大小,可以在make的时候改变size,这样就可以在case中往channel继续写数据。ch := make(chan int, 1) ch <- 1 select { case ch <- 1: fmt.Println("channel value is ", <-ch) default: fmt.Println("channel blocking") }
3)选择循环
当多个channel需要读取数据的时候,就必须使用 for+select例如:下面例子需要从两个channel中读取数据,当从channel1中数据读取完毕后,会像signal channel中输入stop,此时终止for+select。
func f1(c chan int, s chan string) { for i := 0; i < 10; i++ { time.Sleep(time.Second) c <- i } s <- "stop" } func f2(c chan int, s chan string) { for i := 20; i >= 0; i-- { time.Sleep(time.Second) c <- i } s <- "stop" } func main() { c1 := make(chan int) c2 := make(chan int) signal := make(chan string, 10) go f1(c1, signal) go f2(c2, signal) LOOP: for { select { case data := <-c1: fmt.Println("c1 data is ", data) case data := <-c2: fmt.Println("c2 data is ", data) case data := <-signal: fmt.Println("signal is ", data) break LOOP } } }