1、多个独立任务
package main import ( "fmt" "runtime" ) /** 并发多个独立的任务 */ func main() { //处理任务的goroutine 数量(处理任务数量,和等待所有的goroutine处理完毕微循环数量一致) workers := runtime.NumCPU() //使用最大的cup数 runtime.GOMAXPROCS(workers) //任务通道,所有的任务发往此通道 jobs := make(chan int, workers) //完成chan,当任务处理完毕时,往此chan发送一个数据。用于监听所有的goroutine都执行完毕,数据类型没有要求 done := make(chan bool, workers) //创建一个gotoutine,用于生成任务 go generateJob(jobs) //处理任务 processJob(done, jobs, workers) //等待所有goroutine 执行完毕 waitUntil(done, workers) } /** 生成任务 jobs chan 为只接收chan */ func generateJob(jobs chan<- int) { for i := 1; i <= 10; i++ { jobs <- i } //所有数据发送完毕后关闭通道。此方法只是告诉使用此通道的地方,此通道中已经不再发送数据了,并没有真正的关闭。 close(jobs) } /** 处理任务 只向done chan 中发送数据 只从jobs chan 中接收数据 workers 创建goroutine 数量 */ func processJob(done chan<- bool, jobs <-chan int, workers int) { for i := 0; i < workers; i++ { go func() { //当前jobs chan 中没有数据时阻塞,直到调用close(jobs)方法,或者有数据 for job := range jobs { //处理任务 fmt.Println(job) } //此goroutine执行完毕,done 中存放标识 done <- true }() } } //等待所有的goroutine 执行完毕 func waitUntil(done <-chan bool, workers int) { for i := 0; i < workers; i++ { <-done } }说明:
1)、处理任务的goroutine 数量和等待所有goroutine执行完毕循环(waitUntil方法)次数一致,为:workers数量
2)、定义的jobs通道是有缓存的,如果想要按顺序执行,可以不使用缓存,这样使用只能一个一个处理。
3)、generateJob 方法中有关闭jobs通道方法,只是告诉使用此通道的地方,已经没有数据往里面发送了,没有真正的关闭。代码中并没有关闭done 通道的代码,因为没有在需要检查这个通道是否被关闭的地方使用此通道。waitUntil 方法中,通过done的阻塞,可以确保所有的处理工作在主的goroutine退出之前完成。
2、互斥量(锁)
package safe import "sync" /** 安全map */ type SafeMap struct { //map CountMap map[string]int //互斥量 mutex *sync.RWMutex } /** 创建 */ func NewSafeMap() *SafeMap { return *SafeMap{make(map[string]int), new(sync.RWMutex)} } /* 添加计数 */ func (sm *SafeMap) Increment(str string) { //获取锁 sm.mutex.Lock() //释放锁 defer sm.mutex.Unlock() //计数 sm.CountMap[str]++ } /* 读取值 */ func (sm *SafeMap) Read(str string) int { //获取读锁锁 sm.mutex.RLock() //释放锁 defer sm.mutex.RUnlock() //计数 v, e := sm.CountMap[str] if !e { return v } else { return 0 } } package safe import "sync" /** 安全map */ type SafeMap struct { //map CountMap map[string]int //互斥量 mutex *sync.RWMutex } /** 创建 */ func NewSafeMap() *SafeMap { return *SafeMap{make(map[string]int), new(sync.RWMutex)} } /* 添加计数 */ func (sm *SafeMap) Increment(str string) { //获取锁 sm.mutex.Lock() //释放锁 defer sm.mutex.Unlock() //计数 sm.CountMap[str]++ } /* 读取值 */ func (sm *SafeMap) Read(str string) int { //获取读锁锁 sm.mutex.RLock() //释放锁 defer sm.mutex.RUnlock() //计数 v, e := sm.CountMap[str] if !e { return v } else { return 0 } }说明:
1)、SafeMap 是一个多线程安全的map,通过new(sync.RWMutex) 创建一个读写锁。在操作map的时候首先要锁定互斥量,通过defer 来保证释放互斥量。(这是只写了一个方法)
2)、在Read 方法中,获取的读锁,这样可以更加高效一点。
3)、可以通过把对map的操作发送到一个没有缓存的通道,再对map进行操作也可以实现安全的map。
3、多个任务结果的合并
示例:计算50的阶乘
package main import ( "fmt" "runtime" ) func main() { workers := runtime.NumCPU() //使用最大的cup数 runtime.GOMAXPROCS(workers) //任务chann jobs := make(chan section, workers) //创建一个gotoutine,用于生成任务 go generateJob(jobs) //存放每个处理任务的goroutine 运算结果 result := make(chan uint64, workers) //处理任务 processJob(result, jobs, workers) //计算所有结果的乘积 factorialResult := caclResult(result, workers) fmt.Println(factorialResult) } /** 生成任务 jobs chan 为只接收chan */ func generateJob(jobs chan<- section) { jobs <- section{1, 10} jobs <- section{11, 20} jobs <- section{21, 30} jobs <- section{31, 40} jobs <- section{41, 50} //所有数据发送完毕后关闭通道。此方法只是告诉使用此通道的地方,此通道中已经不再发送数据了,并没有真正的关闭。 close(jobs) } func processJob(result chan<- uint64, jobs <-chan section, workers int) { for i := 0; i < workers; i++ { go func() { //阶乘 var factorial uint64 = 1 for job := range jobs { for i := job.min; i <= job.max; i++ { factorial *= uint64(i) } } //计算完毕后,把结果放到result chan中 result <- factorial }() } } /** 计算所有结果 */ func caclResult(result <-chan uint64, workers int) uint64 { var factorial uint64 = 1 for i := 0; i < workers; i++ { count := <-result factorial *= count } return factorial } /** 保存阶乘区间的最大值和最小值 */ type section struct { min int max int }
说明:
1)、就是把50的阶乘分计算,示例代码是计算1..10,11..20 ... 和乘积,然后再计算各个结果的乘积
2)、generateJob中,是把要计算区间的struct 放入jobs通道
3)、在processJob中,每个goroutine都创建一个计算结果,在计算完毕后发送到result 通道中。result 保存所有的计算结果。
4)、监听主goroutine的代码被替换成计算总的结果。
4、不定goroutine 数量
package main import ( "fmt" "runtime" "sync" "time" ) //最大goroutine 数量 const maxGoroutines = 10 //根据任务量的多少,决定使用多少goroutine func main() { //定义一个要计算阶乘的数组。假设数值越大处理的越慢(通过time.Sleep(time.Duration(num) * time.Second / 10) 代码实现) factorialArray := []int{5, 8, 10, 50, 20, 11, 16, 19, 38, 20, 49, 36, 22, 33, 45, 29} //存放计算结果的通道 factorialResultChan := make(chan map[int]uint64, maxGoroutines*2) //计算阶乘 go doFactorial(factorialResultChan, factorialArray) //展示结果 showResult(factorialResultChan) } /** 计算阶乘 */ func doFactorial(factorialResultChan chan map[int]uint64, factorialArray []int) { waiter := &sync.WaitGroup{} for _, v := range factorialArray { calc(factorialResultChan, v, waiter) } waiter.Wait() close(factorialResultChan) } func calc(factorialResultChan chan map[int]uint64, num int, waiter *sync.WaitGroup) { if num < 10 || runtime.NumGoroutine() > maxGoroutines { factorialResultChan <- map[int]uint64{num: doCalc(num)} } else { waiter.Add(1) go func() { factorialResultChan <- map[int]uint64{num: doCalc(num)} waiter.Done() }() } } /** 阶乘计算 */ func doCalc(num int) uint64 { var r uint64 = 1 //数值大,处理的慢 time.Sleep(time.Duration(num) * time.Second / 10) for i := 1; i <= num; i++ { r *= uint64(i) } return r } /** 结果展示 */ func showResult(result chan map[int]uint64) { for m := range result { for k, v := range m { fmt.Printf("%d:%d", k, v) fmt.Println("") } } }说明:
1)、doFactorial 函数是在一个goroutine 中执行的,遍历 factorialArray 切片,计算阶乘。
遍历中调用了calc 函数:如果计算阶乘的数值小于10,或者已经在运行的goroutine 数量(runtime.NumGoroutine()) 大于最大设置的goroutine 数量,就在本goroutine 中运行,
否则就启动一个goroutine计算阶乘。
2)、当factorialArray 切片不确定时,就不知道calc 方法创建了多少个goroutine。为此创建了一个sync.WaitGroup ,每次创建一个goroutine 时,就调用一次sync.WaitGroup.Add()函数,
当执行完毕时调用sync.WaitGroup.Done() 函数。在所有goroutine 都设置运行后,调用sync.WaitGroup.Wait()函数,等待所有的工作goroutine 完成。
sync.WaitGroup.Wait() 阻塞到完成done 数量和添加数量相等为止。
3)、当所有工作goroutine 执行完毕后,就关闭factorialResultChan 通道。当然 showResult 函数仍然可以读这个通道,直到所有的数据都读取出来。