context包 示例: func main() { var wg sync.WaitGroup done := make(chan interface{}) defer close(done) wg.Add(1) go func() { defer wg.Done() if err := printGreeting(done); err != nil { fmt.Printf("%v",err) return } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(done); err != nil { fmt.Printf("%v",err) return } }() wg.Wait() } func printGreeting(done <-chan interface{}) error { greeting,err := genGreeting(done) if err != nil { return nil } fmt.Printf("%s world!\n",greeting) return nil } func printFarewell(done <-chan interface{}) error { farewell,err := genFarewell(done) if err != nil { return err } fmt.Printf("%s world!\n",farewell) return nil } func genGreeting(done <-chan interface{}) (string,error) { switch locale,err := locale(done); { case err != nil: return "",err case locale == "EN/US": return "hello",nil } return "",fmt.Errorf("unsuppported locale") } func genFarewell(done <-chan interface{}) (string,error) { switch locale,err := locale(done); { case err != nil: return "",err case locale == "En/US": return "googbye",nil } return "",fmt.Errorf("unsupported locale") } func locale(done <-chan interface{}) (string,error) { select { case <-done: return "",fmt.Errorf("canceled") case <-time.After(1*time.Minute): } return "EN/US",nil } 使用context包来实现 func main() { var wg sync.WaitGroup ctx,cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) go func() { defer wg.Done() if err := printGreeting(ctx);err != nil { fmt.Printf("cannot print greeting: %v\n",err) cancel() } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(ctx);err != nil { fmt.Printf("cannot print farewell: %v\n",err) } }() wg.Wait() } func printGreeting(ctx context.Context) error { greeting,err := genGreeting(ctx) if err != nil { return err } fmt.Printf("%s world!\n",greeting) return nil } func printFarewell(ctx context.Context) error { farewell,err := genFarewell(ctx) if err != nil { return err} fmt.Printf("%s world!\n",farewell) return nil } func genGreeting(ctx context.Context) (string,error) { ctx,cancel := context.WithTimeout(ctx,1*time.Second) defer cancel() switch locale,err := locale(ctx); { case err != nil: return "",err case locale == "EN/US": return "hello",nil } return "",fmt.Errorf("unsupported locale") } func genFarewell(ctx context.Context) (string,error) { switch locale,err := locale(ctx); { case err != nil: return "",nil case locale == "EN/US": return "googbyte",nil } return "",fmt.Errorf("unsupported locale") } func locale(ctx context.Context) (string,error) { select { case <-ctx.Done(): return "",ctx.Err() case <-time.After(1*time.Minute): } return "EN/US",nil } 使用context.Context的Deadline方法 func main() { var wg sync.WaitGroup ctx,cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) go func() { defer wg.Done() if err := printGreeting(ctx); err != nil { fmt.Printf("cannot print greeting: %v\n",err) cancel() } }() wg.Add(1) go func() { defer wg.Done() if err := printFarewell(ctx); err != nil { fmt.Printf("cannot print farewell: %v\n",err) } }() wg.Wait() } func printGreeting(ctx context.Context) error { greeting,err := genGreeting(ctx) if err != nil { return err } fmt.Printf("%s world!\n",greeting) return nil } func printFarewell(ctx context.Context) error { farewell,err := genFarewell(ctx) if err != nil { return err } fmt.Printf("%s world!\n",farewell) return nil } func genGreeting(ctx context.Context) (string,error) { ctx,cancel := context.WithTimeout(ctx,1*time.Second) defer cancel() switch locale,err := locale(ctx); { case err != nil: return "",err case locale == "EN/US": return "hello",nil } return "",fmt.Errorf("unsupported locale") } func genFarewell(ctx context.Context) (string,error) { switch locale,err := locale(ctx); { case err != nil: return "",nil case locale == "EN/US": return "goodbyte",nil } return "",fmt.Errorf("unsupported locale") } func locale(ctx context.Context) (string,error) { if deadline,ok := ctx.Deadline();ok { //检查上下文是否提供了截止 if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 { return "",context.DeadlineExceeded } } select { case <-ctx.Done(): return "",ctx.Err() case <-time.After(1* time.Minute): } return "EN/US",nil } 在上下文中存储数据以及检索数据 func main() { ProcessRequest("jane","abc123") } func ProcessRequest(userID,anthToken string) { ctx := context.WithValue(context.Background(),"userID",userID) ctx = context.WithValue(ctx,"authToken",authToken) HandleResponse(ctx) } func HandleResponse(ctx context.Context) { fmt.Printf( "handlint Response for %v (%v)", ctx.Value("userID"), ctx.Value("authToken"), ) } 大规模并发 异常传递 超时取消 心跳 //代码演示一个会发出心跳的groutine: doWork := func( done <-chan interface{}, pulseInterval time.Duration, ) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) //创建发送心跳的channel,返回给doWork results := make(chan time.Time) go func() { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) //设置心跳的间隔时间,pulseInterval,期间会读取channel workGen := time.Tick(2 * pulseInterval) //时间大于pluseInterval,方便聪goroutine中看倒心跳 sendPulse := func() { select { case heartbeat <- struct{}{}: default: //默认语句,可能没有接收心跳 } } sendResult := func(r time.Time) { for { select { case <-done: return case <-pulse: //同done channel,执行接收或发送时,需要一个发送心跳的分支 sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } //利用函数消费发出的事件 done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) //声明标准done channel,10秒后关闭,给goreoutine工作时间 const timeout = 2 * time.Second //设置超时时间,将心跳间隔和超时时间联系起来 heartbeat, results := doWork(done, timeout/2) //timeout/2,心跳有额外的响应时间,超时不太敏感 for { select { case _, ok := <-heartbeat: //处理心跳 if ok == false { return } fmt.Println("pulse") case r, ok := <-results: //处理results channel if ok == false { return } fmt.Printf("results %v\n", r.Second()) case <-time.After(timeout): //没有收到心跳或消息,会超时 return } } 两次迭代后停止goroutine,不关闭channel,模拟一个产生了异常的goroutine doWork := func( done <-chan interface{}, pulseInterval time.Duration, ) (<-chan interface{},<-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go func() { pulse := time.Tick(pulseInterval) workGen := time.Tick(2*pulseInterval) sendPulse := func() { select { case heartbeat <-struct{}{}: default: } } sendResult := func(r time.Time) { for { select { case <-pulse: sendPulse() case results <- r: return } } } for i := 0;i<2;i++ { select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat,results } done := make(chan interface{}) time.AfterFunc(10*time.Second,func() { close(done) }) const timeout = 2 * time.Second heartbeat,results := doWork(done,timeout/2) for { select { case _,ok := <-heartbeat: if ok == false { return } fmt.Println("pulse") case r,ok := <-results: if ok == false { return } fmt.Printf("results %v\n",r) case <-time.After(timeout): fmt.Println("worker goroutine is not healthy") return } } 测试单元有效的,每个工作单元在开始之前发送心跳 doWork := func(done <-chan interface{},) (<-chan interface{}, <-chan int) { heartbeatStream := make(chan interface{},1) //创建一个缓冲大小为1的channel,确保即使没有接发送的消息,至少也会发出一个心跳 workStream := make(chan int) go func(){ defer close(heartbeatStream) defer close(workStream) for i:=0;i<10;i++ { select { //将发送results和心跳分开,若接收者没有准备好接收结果,它仍将接收到一个心跳 case heartbeatStream <- struct{}{}: default: //防止没人接收心跳,增加的默认逻辑,监听但是没有收到第一个心跳,接收者仍然可以收到心跳 } select { case <-done: return case workStream <- rand.Intn(10): } } }() return heartbeatStream,workStream } done := make(chan interface{}) defer close(done) heartbeat,results := doWork(done) for { select { case _,ok := <-heartbeat: if ok { fmt.Println("pulse") } else { return } case r,ok := <-results: if ok { fmt.Printf("results %v\n",r) } else { return } } } doWork生成器的测试函数 func TestDoWork_GeneratestAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0,1,2,3,5} heartbeat,results := Dowork(done,intSlice...) <-heartbeat //等待goroutine开始处理迭代的信号 i := 0 for r := range results { if expected := intSlice[i]; r != expected { t.Errorf("intex %v: expected %v,but received %v,",i,expected,r) } i++ } } 使用心跳进行测试: func DoWork( done <-chan interface{}, pulseInterval time.Duration, nums ...int, ) (<-chan interface{},<-chan int) { heartbeat := make(chan interface{},1) intStream := make(chan int) go func() { defer close(heartbeat) defer close(intStream) time.Sleep(2*time.Second) pulse := time.Tick(pulseInterval) numloop: //使用标签来简化内部循环 for _,n := range nums { for { //需要两个循环,一个循环遍历数列,内部循环持续执行,直到intStream中的数字成功发送 select { case <-done: return case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- n: continue numloop //跳回numloop标签继续执行外部循环 } } } }() return heartbeat,intStream } func TestDoWork_GeneratestAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0,1,2,3,5} const timeout = 2*time.Second heartbeat,results := DoWork(done,timeout/2,intSlice...) <- heartbeat //等待第一次心跳到达,来确认goroutine已经进入循环 i := 0 for { select { case r,ok := <-results: if ok == false { return } else if expected := intSlice[i];r != expected { t.Errorf ( "index %v: expected %v,but received %v,", i, expected, r, ) } i++ case <-heartbeat: //接收心跳,防止超时 case <-time.After(timeout): t.Fatal("test timed out") } } } 复制请求 单个进程中制造复制请求,在10个处理程序上复制模拟请求的示例: doWork := func( done <-chan interface{}, id int, wg *sync.WaitGroup, result chan<- int, ) { started := time.Now() defer wg.Done() //模拟随机负载 simulatedLoadTime := time.Duration(1+rand.Intn(5))*time.Second select { case <-done: case <-time.After(simulatedLoadTime): } select { case <-done: case result <- id: } took := time.Since(started) //显示处理程序需要多长时间 if took < simulatedLoadTime { took = simulatedLoadTime } fmt.Printf("%v took %v\n",id,took) } done := make(chan interface{}) result := make(chan int) var wg sync.WaitGroup wg.Add(10) for i := 0;i < 10;i++ { //启动10个处理程序来处理请求 go doWork(done,i,&wg,result) } firestReturned := <-result //获得处理程序组的第一个返回值 close(done) //取消其余的处理程序,保证他们不会继续做多余的工作 wg.Wait() fmt.Printf("Received an answer from #%v\n",firestReturned) 速率限制 治愈异常的goroutine 216 //启动goroutine函数的管理员 type startGoroutineFn func( done <-chan interface{}, pulseInterval time.Duration, ) (heartbeat <-chan interface{}) newSteward := func( timeout time.Duration, startGoroutine startGoroutineFn, ) startGoroutineFn { return func( done <-chan interface{}, pulseInterval time.Duration, ) (<-chan interface{}) { heartbeat := make(chan interface{}) go func() { defer close(heartbeat) var wardDone chan interface{} var wardHeartbeat <-chan interface{} startWard := func() { wardDone = make(chan interface{}) wardHeartbeat = startGoroutine(or(wardDone,done),timeout/2) } startWard() pulse := time.Tick(pulseInterval) monitorLoop: for { timeoutSignal := time.After(timeout) for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case <-wardHeartbeat: continue monitorLoop case <-timeoutSignal: log.Println("steward: ward unhealthy; restarting") close(wardDone) startWard() continue monitorLoop case <-done: return } } } }() return heartbeat } } //对管理员进行测试 log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) doWork := func(done <-chan interface{},_ time.Duration) <-chan interface{} { log.Println("ward: Hello,I'm irresponsible!") go func() { <-done log.Println("ward: I am halting.") }() return nil } doWorkWithSteward := newSteward(4*time.Second,doWork) done := make(chan interface{}) time.AfterFunc(9*time.Second,func()) { log.Println("main: halting steward and ward.") close(done) } for range doWorkWithSteward(done,4*time.Second) {} log.Println("Done") 管理区,根据离散值列表生成一个整数流 doWorkFn := func( done <-chan interface{}, intList ...int, ) (startGoroutineFn,<-chan interface{}) { intChanStream := make(chan (<-chan interface{})) intStream := bridge(done,intChanStream) doWork := func( done <-chan interface{}, pulseInterval time.Duration, ) <-chan interface{} { intStream := make(chan interface{}) heartbeat := make(chan interface{}) go func() { defer close(intStream) select { case intChanStream <- intStream: case <-done: return } pulse := time.Tick(pulseInterval) for { valueLoop: for _,intVal := range intList { if intVal < 0 { log.Printf("negative value: %v\n",intVal) return } for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- intVal: continue valueLoop case <-done: return } } } } }() return heartbeat } return doWork,intStream } log.SetFlags(log.Ltime|log.LUTC) log.SetOutput(os.Stdout) done := make(chan interface{}) defer close(done) doWork,intStream := doWorkFn(done,1,2,-1,3,2,5) doWorkWithSteward := newSteward(1*time.Millisecond,doWork) doWorkWithSteward(done,1*time.Hour) for intVal := range take(done,intStream,6) { fmt.Printf("Received: %v\n",intVal) } goroutine和go语言的运行时 工作窃取 窃取任务还是续体
Go语言并发之道--笔记4
猜你喜欢
转载自blog.csdn.net/liao__ran/article/details/112801665
今日推荐
周排行