pipeline 可以用来在系统中形成抽象的一种工具,尤其是程序需要流式处理或者批处理数据 一系列的数据输入,执行操作并将结果数据传回的系统,称这些操作是pipeline的一个stage 构建一个pipeline的tage pipeline stage函数 multiply := func(values []int,multiplier int) []int { multipliedValues := make([]int,len(values)) for i,v := range values { multipliedValues[i] = v * multiplier } return multipliedValues } add := func(values []int,additive int) []int { addedValues := make([]int,len(values)) for i,v := range values { addedValues[i] = v + additive } return addedValues } ints := []int{1,2,3,4} for _,v := range add(multiply(ints,2),1) { fmt.Println(v) } ints := []int{1,2,3,4} for _,v := range multiply(add(multiply(ints,2),1),2) { fmt.Println(v) } //将stage转换为以流为导向 multiply := func(value,multiplier int) int { return value * multiplier } add := func(value,additive int) int { return value + additive } ints := []int{1,2,3,4} for _,v := range ints { fmt.Println(multiply(add(multiply(v,2),1),2)) } 构建pipeline的最佳实践 generator := func(done <-chan interface{},integers ...int) <-chan int { intStream := make(chan int) go func() { defer close(intStream) for _,i := range integers { select { case <-done: return case intStream <- i: } } }() return intStream } multiply := func( done <-chan interface{}, intStream <-chan int, multiplier int, ) <-chan int { multipliedStream := make(chan int) go func() { defer close(multipliedStream) for i := range intStream { select { case <-done: return case multipliedStream <- i*multiplier: } } }() return multipliedStream } add := func( done <-chan interface{}, intStream <-chan int, additive int, ) <-chan int { addedStream := make(chan int) go func() { defer close(addedStream) for i := range intStream { select { case <-done: return case addedStream <- i+additive: } } }() return addedStream } done := make(chan interface{}) defer close(done) intStream := generator(done,1,2,3,4) pipeline := multiply(done,add(done,multiply(done,intStream,2),1),2) for v := range pipeline { fmt.Println(v) } 一些便利的生成器 pipeline的生成器是将一组离散值转换为channel撒谎那个的值流的任何函数 //repeat的生成器 repeat := func( done <-chan interface{}, values ...interface{}, )<-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { for _,v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } //通用pipeline stage,重复使用时很有用 take := func( done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i:=0;i<num;i++ { select { case <-done: return case takeStream <- valueStream: } } }() return takeStream } done := make(chan interface{}) defer close(done) for num := range take(done,repeat(done,1),10) { fmt.Printf("%v",num) } //创建一个重复调用函数的生成器repeatFn //通用pipeline stage,重复使用时很有用 take := func( done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i:=0;i<num;i++ { select { case <-done: return case takeStream <- valueStream: } } }() return takeStream } //创建一个重复调用函数的生成器repeatFn repeatFn := func( done <-chan interface{}, fn func() interface{}, ) <-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { select { case <-done: return case valueStream <- fn(): } } }() return valueStream } //生成10个随机数字 done := make(chan interface{}) defer close(done) rand := func() interface{} { return rand.Int()} //导入库"math/rand" for num := range take(done,repeatFn(done,rand),10) { fmt.Println(num) } //toString pipeline stage //报错 toString := func( done <-chan interface{}, ) <-chan string { stringStream := make(chan string) go func() { defer close(stringStream) for v := range valueStream { select { case <-done: return case stringStream <- v.(string): } } }() return stringStream } done := make(chan interface{}) defer close(done) var message string for token := range toString(done,take(done,repeat(done,"I","am."),5)) { message += token } fmt.Pringf("message: %s...",message) //基准测试函数,一个测试通用stage,另一个测试特定stage func BenchmarkGeneric(b *testing.B) { done := make(chan interface{}) defer close(done) b.ResetTimer() for range toString(done,take(done,repeat(done,"a"),b.N)) { } } func BenchmarkTyped (b *testing.B) { repeat := func(done <-chan interface{},values ...string) <-chan string { valueStream := make(chan string) go func() { defer close(valueStream) for { for _,v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } take := func( done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string { takeStream := make(chan string) go func() { defer close(takeStream) for i := num; i>0 || i == -1; { if i != -1 { I -- } select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } done := make(chan interface{}) defer close(done) b.ResetTimer() for range take(done,repeat(done,"a"),b.N){ } } 扇入,扇出 扇出用于描述启动多个goroutine以处理来自pipeline的输入的过程, 扇入是描述将多个结果组合到一个channel的过程中的术语。 示例:找到素数菲常低效的函数 rand := func() interface{} { return rand.Intn(50000000) } done := make(chan interface{}) defer close(done) start := time.Now() randIntStream := toInt(done,repeatFn(done,rand)) fmt.Println("Primes:") for prime := range take(done,primeFinder(done,randIntStream),10) { fmt.Printf("\t%d\n",prime) } fmt.Printf("Search took: %v",time.Since(start)) 优化后的函数 fanIn := func( done <-chan interface{}, channels ...<-chan interface{}, ) <-chan interface{} { // var wg sync.WaitGroup // multiplexedStream := make(chan interface{}) multiplex := func(c <-chan interface{}) { // defer wg.Done() for i := range c { select { case <-done: return case multiplexedStream <- i: } } } //从所有的channel里取值 wg.Add(len(channels)) // for _,c := range channels { go multiplex(c) } //等待所有的读操作结束 go func() { // wg.Wait() close(multiplexedStream) }() return multiplexedStream } done := make(chan interface{}) defer close(done) start := time.Now() rand := func() interface{} { return rand.Intn(50000000)} randIntStream := toInt(done,repeatFn(done,rand)) numFinders := runtime.NumCPU() fmt.Printf("Spinning up %d prime finders.\n",numFinders) finders := make([]<-chan interface{},numFinders) fmt.Println("Primes:") for i := 0; i < numFinders; i++ { finders[i] = primeFinder(done,randIntStream) } for prime := range take(done,fanIn(done,finders...),10) { fmt.Printf("\t%d\n",prime) } fmt.Printf("Search took: %v",time.Since(start)) or-done-channel 示例: orDone := func(done,c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { for { select { case <-done: return case v,ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } for val := range orDone(done,muchan) { //用val执行某些操作 } tee-channel 将tee-channel传递给一个读channel,其会返回两个单独的channel, tee := func( done <-chan interface{}, in <-chan interface{}, ) (_,_<-chan interface{}) {<-chan interface{}} { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range orDone(done,in) { var out1,out2 = out1,out2 for i := 0;i<2;i++ { select { case <-done: case out1<-val: out1 = nil //写入channel后,将副本设置为nil,以便进一步阻塞写入 case out2<-val out2 = nil } } } }() return out1,out2 } done := make(chan interface{}) defer close(done) out1,out2 := tee(done,take(done,repeat(done,1,2)),4) for val1 := range out1 { fmt.Printf("out1: %v,out2: %v\n",val1,<-out2) } 桥接channel模式 <-chan <-chan interface{} //处理一个充满channel的channel,将其拆解为一个简单的channel bridge := func( done <-chan interface{}, chanStream <-chan <-chan interface{}, ) <-chan interface{} { valStream := make(chan interface{}) //将返回bridge中的所有值的channel go func() { defer close(valStream) for { //从chanStream中提取channel并将其提供给嵌套循环来使用 var stream <-chan interface{} select { case maybeStream,ok := <-chanStream: if ok==false { return } stream = maybeStream case <-done: return } for val := range orDone(done,stream) { //读取已给出的channel值,并将其重复到valStream中 select { case valStream <- val: case <-done: } } } }() return valStream } //创建10个channel,每个channel写入一个元素,并将这些channel传递给桥接函数: genVals := func() <-chan <-chan interface{} { chanStream := make(chan (<-chan interface{})) go func() { defer close(chanStream) for i := 0;i<10;i++ { stream := make(chan interface{},1) stream <- i close(stream) chanStream <- stream } }() return chanStream } for v := range bridge(nil,genVals()) { fmt.Printf("%v ",v) } 队列排队 队列:在队列尚未准备好的时候开始接受请求 //缓冲写入队列和未缓冲写入队列的简单比较 func BenchmarkUnbufferedWrite(b *testing.B) { perfornWrite(b,tmpFileOrFatal()) } func BenchmarkBufferedWrite(b *testing.B) { bufferedFile := bufio.NewWriter(tmpFileOrFatal()) performWrite(b,bufio.NewWriter(bufferredFile)) } func timeFileOrFatal() *os.File { file,err := ioutil.TempFile("","tmp") iferr != nil { log.Fatal("error: %v",err) } return file } func performWrite(b *testing.B,write io.Writer) { done := make(chan interface{}) defer close(done) b.ResetTimer() for bt := range take(done,repeat(done,byte(0)),b.N) { writer.Writer([]byte{bt.(byte)}) } } go test -bench=. buffering_test.go
Go语言并发之道--笔记3
猜你喜欢
转载自blog.csdn.net/liao__ran/article/details/112801648
今日推荐
周排行