一、lock
Mutex互斥锁,也是全局锁;Lock()加锁,Unlock()解锁.
RWMutex读写锁,该锁可以加多个读锁或者一个写锁,其经常用于读次数远远多于写次数的场景.写锁权限高于读锁,有写锁时优先进行写锁定。
基本遵循两大原则:
1、可以随便读,多个goroutine同时读
2、写的时候,啥也不能干。不能读也不能写
RWMutex提供了四个方法:
func (*RWMutex) Lock // 写锁定
func (*RWMutex) Unlock // 写解锁
func (*RWMutex) RLock // 读锁定
func (*RWMutex) RUnlock // 读解锁
二、WaitGroup
WaitGroup等待一组线程集合完成,才会继续向下执行。
三、sync.Once可以控制函数只能被调用一次。不能多次重复调用
once实现单利模式
四、pool 用来保存和复用临时对象,以减少内存分配,降低CG压力。
Get返回Pool中的任意一个对象。如果Pool为空,则调用New返回一个新创建的对象。如果没有设置New,则返回nil。
五、条件变量 Cond
等待通知: wait
阻塞当前线程,直到收到该条件变量发来的通知
单发通知: signal
让该条件变量向至少一个正在等待它的通知的线程发送通知,表示共享数据的状态已经改变。
广播通知: broadcast
让条件变量给正在等待它的通知的所有线程都发送通知。
Mutex互斥锁,也是全局锁;Lock()加锁,Unlock()解锁.
func main() { var l *sync.Mutex l = new(sync.Mutex) l.Lock() defer l.Unlock() fmt.Println("1") } 线程安全map type SafeStringMap struct { sync.RWMutex M map[string]string } var StringMap = &SafeStringMap{M:make(map[string]string)} func (this *SafeStringMap) Get(key string) (string, bool) { this.RLock() defer this.RUnlock() v, exists := this.M[key] return v, exists } func (this *SafeStringMap) Set(k string, v string) { this.Lock() defer this.Unlock() this.M[k] = v } func (this *SafeStringMap) Get(key string) map[string]string { this.RLock() defer this.RUnlock() return this.M }
RWMutex读写锁,该锁可以加多个读锁或者一个写锁,其经常用于读次数远远多于写次数的场景.写锁权限高于读锁,有写锁时优先进行写锁定。
基本遵循两大原则:
1、可以随便读,多个goroutine同时读
2、写的时候,啥也不能干。不能读也不能写
RWMutex提供了四个方法:
func (*RWMutex) Lock // 写锁定
func (*RWMutex) Unlock // 写解锁
func (*RWMutex) RLock // 读锁定
func (*RWMutex) RUnlock // 读解锁
二、WaitGroup
WaitGroup等待一组线程集合完成,才会继续向下执行。
func main() { var wg sync.WaitGroup var urls = []string { "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } for _, url := range urls { wg.Add(1) go func(url string) { defer wg.Done() fmt.Println(url) }(url) } wg.Wait() fmt.Println("Game Over") }
三、sync.Once可以控制函数只能被调用一次。不能多次重复调用
once实现单利模式
import ( "sync" "fmt" ) type Singleton map[string]string var ( once sync.Once instance Singleton ) func New() Singleton { once.Do(func() { instance = make(Singleton) }) return instance } func main() { s := New() s["test1"] ="aa" fmt.Println(s) s1 := New() //没有重新初始化 s1["test2"] = "bb" fmt.Println(s1) } 打印结果: map[test1:aa] map[test1:aa test2:bb] package main import ( "fmt" "sync" "time" ) func main() { o := &sync.Once{} go do(o) go do(o) time.Sleep(time.Second * 2) } func do(o *sync.Once) { fmt.Println("Start do") o.Do(func() { fmt.Println("Doing something...") }) fmt.Println("Do end") } 输出结果: Start do Doing something... Do end Start do Do end 这里 Doing something 只被调用了一次。 查看go once的源码实现,也是非常的简单: type Once struct { m Mutex done uint32 } func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 1 { return } o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }核心思想是使用原子计数记录被执行的次数。使用Mutex Lock Unlock锁定被执行函数,防止被重复执行。
四、pool 用来保存和复用临时对象,以减少内存分配,降低CG压力。
type Pool func (p *Pool) Get() interface{} func (p *Pool) Put(x interface{}) New func() interface{}
Get返回Pool中的任意一个对象。如果Pool为空,则调用New返回一个新创建的对象。如果没有设置New,则返回nil。
func TestPool() { var pool sync.Pool pool.New = func() interface{} { return "Hello" } for i := 0; i < 10; i++ { s := pool.Get() fmt.Println(s) pool.Put("World" + strconv.FormatInt(int64(i), 10)) } }
五、条件变量 Cond
等待通知: wait
阻塞当前线程,直到收到该条件变量发来的通知
单发通知: signal
让该条件变量向至少一个正在等待它的通知的线程发送通知,表示共享数据的状态已经改变。
广播通知: broadcast
让条件变量给正在等待它的通知的所有线程都发送通知。
func NewCond(l Locker) *Cond func TestCond() { cond := sync.NewCond(&sync.Mutex{}) for i := 0; i < 10; i++ { go func(t int) { time.Sleep(time.Second) cond.L.Lock() defer cond.L.Unlock() cond.Wait() fmt.Println(t) }(i) } } time.Sleep(2 * time.Second) //cond.Signal() cond.Broadcast() }
Cond实现生产者、消费者
package main import ( "fmt" "math/rand" "sync" "time" ) var cond sync.Cond //生产者 func produce(out chan<- int, nu int) { for { cond.L.Lock() //产品区满 等待消费者消费 for len(out) == 3 { cond.Wait() } num := rand.Intn(1000) out <- num fmt.Printf("%dth ***producer produce***,num = %d,len(chan) = %d\n", nu, num, len(out)) cond.L.Unlock() //生产了产品唤醒 消费者线程 cond.Signal() //生产完了歇一会,给其他协程机会 time.Sleep(time.Second) } } //消费者 func consume(in <-chan int, nu int) { for { cond.L.Lock() //产品区空 等待生产者生产 for len(in) == 0 { cond.Wait() } num := <-in fmt.Printf("%dth ###consumer consume###,num = %d,len(chan) = %d\n", nu, num, len(in)) cond.L.Unlock() cond.Signal() //消费完了歇一会,给其他协程机会 time.Sleep(time.Millisecond * 500) } } func main() { //设置随机数种子 rand.Seed(time.Now().UnixNano()) quit := make(chan bool) //产品区 使用channel模拟 product := make(chan int, 3) //创建互斥锁和条件变量 cond.L = new(sync.Mutex) //5个消费者 for i := 0; i < 5; i++ { go produce(product, i) } //3个生产者 for i := 0; i < 3; i++ { go consume(product, i) } //主协程阻塞 不结束 <-quit }