Go并发编程—pipeline模型的实现

概述

本文讲述了流水线作业的另一种实现方式。这种方式完全是基于Go语言本身的特性来实现。

实现说明

pipeline的实现方式其实可以有多种,在C系统编程中是通过管道(pipe)来实现,但由于进程是很重量级的实体,在创建(fork)时会非常消耗系统资源,所以在linux系统编程中一般会启动固定数量的进程/线程,每个进程完成固定的工作,并通过管道进行连接,形成流水线作业(pipeline)。
这种方式需要进程是常驻的,而且每次每个处理单元只能处理一个数据单元。关于这种模型的Go语言实现,请看我写的这篇文章

为了能同时处理多个数据,可以充分利用Go语言的特性来实现一个处理单元不是固定的流水线作业模型,处理单元是随用随起。

代码实现

实现要点

流水线上的每个处理节点,都会使用一个done的channel来通知pipeline上的各个协程,一旦收到该done channel的数据,流水线上的每个处理(协程)单元将会退出。
每个处理单元都返回一个可以读取本次处理结果数据的channel,并让下一个处理单元读取该channel的数据,这样把整个处理过程组成一个流水线式的pipeline。

定义pipeline的起点函数

该函数是pipeline的起点函数,该函数用来产生数据,并把该数据放入到pipeline中的第一个处理单元。数据会穿过一个一个的处理单元,完成pipeline的整个处理流程。

其中done这个channel是主协程用来通知pipeline上的各个处理单元数据处理完成的channel,主协程退出,会通过done的channel来通知pipeline上的各个处理单元协程,各个处理单元协程得到通知后,会自动退出。

// 该函数是pipeline的产生数据的函数
    generator := func(done <-chan interface{}, integers ...int) <-chan int {
        intStream := make(chan int)

        go func() {
            defer close(intStream)
            for _, i := range integers {
                select {
                case <-done:
                    return
                case intStream <- i:
                }
            }
        }()
        return intStream
    }

该代码的integers是一个整数列表,该整数列表是传入pipeline的。这里是一个测试,也可以自己定义传入pipeline的数据,或通过某种方式和其他模块进行数据对接而传入数据。

定义处理单元:把传入数据的每个数乘以一个整数

    // pipeline中的一个处理节点:把各个数据相乘
     multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int,) <-chan int {
         multipliedStream := make(chan int)

         go func() {
             defer close(multipliedStream)

             for i := range intStream {
                 select {
                 case <-done:
                     return
                 case multipliedStream <- i*multiplier:
                 }
             }
         }()
         return multipliedStream
     }

这里的intStream是数据输入的channel,multiplier参数是每个输入channel需要乘以的整数值。这样,当数据输出时,所有的数据的值都会乘以multiplier这个数。然后把结果输出到一个新的channel中,最后返回该channel的地址。

定义处理单元:把接入的每个数都加上一个数

以下函数定义了一个pipeline的处理单元,其中的intStream是接入数据的channel,最后输出数据的channel会被返回。

    // 为intStream接入的数据都加上一个数additive
    add := func(done <-chan interface{}, intStream <-chan int, additive int, ) <-chan int {
         addedStream := make(chan int)
         go func() {
             defer close(addedStream)
             for i := range intStream {
                 select {
                 case <-done:
                     return
                 case addedStream <- i+additive:
                 }
             }
         }()
         return addedStream
    }

主流程

     // 创建一个通知管道,主协程来通知各个子协程:结束任务。避免goroutine leak
     done := make(chan interface{})
     // 主协程退出时通过channel: done发出通知
     defer close(done)

     // 定义pipeline的起点
     intStream := generator(done, 1, 2, 3, 4)
     // 定制pipeline
     pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
     // 输出pipeline的最后计算结果
     for v := range pipeline {
         fmt.Println(v)
     }

注意:这里定制的pipeline,实现的处理流程如下:
(1) value * 2 => value1
(2) value1 + 1 => value2
(3) value2 * 2
每个值都完成以上三个操作,然后输出,其pipeline如下:

value ==>[*2] ==>[+1]==>[*2] ==>output

总结

通过channel很容易实现流水线作业(pipeline),流水线作业可以把大的任务分解成小的子任务,让各个子任务专注于解决自己的问题。这样逻辑更加清楚,更容易实现。

以上,是一种实现流水线的编程模型,在实际开发过程中,可以借鉴,修改后即可纳为己用。

猜你喜欢

转载自blog.csdn.net/zg_hover/article/details/81509145
今日推荐