概述
本文讲述了流水线作业的另一种实现方式。这种方式完全是基于Go语言本身的特性来实现。
实现说明
pipeline的实现方式其实可以有多种,在C系统编程中是通过管道(pipe)来实现,但由于进程是很重量级的实体,在创建(fork)时会非常消耗系统资源,所以在linux系统编程中一般会启动固定数量的进程/线程,每个进程完成固定的工作,并通过管道进行连接,形成流水线作业(pipeline)。
这种方式需要进程是常驻的,而且每次每个处理单元只能处理一个数据单元。关于这种模型的Go语言实现,请看我写的这篇文章。
为了能同时处理多个数据,可以充分利用Go语言的特性来实现一个处理单元不是固定的流水线作业模型,处理单元是随用随起。
代码实现
实现要点
流水线上的每个处理节点,都会使用一个done的channel来通知pipeline上的各个协程,一旦收到该done channel的数据,流水线上的每个处理(协程)单元将会退出。
每个处理单元都返回一个可以读取本次处理结果数据的channel,并让下一个处理单元读取该channel的数据,这样把整个处理过程组成一个流水线式的pipeline。
定义pipeline的起点函数
该函数是pipeline的起点函数,该函数用来产生数据,并把该数据放入到pipeline中的第一个处理单元。数据会穿过一个一个的处理单元,完成pipeline的整个处理流程。
其中done这个channel是主协程用来通知pipeline上的各个处理单元数据处理完成的channel,主协程退出,会通过done的channel来通知pipeline上的各个处理单元协程,各个处理单元协程得到通知后,会自动退出。
// 该函数是pipeline的产生数据的函数
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
该代码的integers是一个整数列表,该整数列表是传入pipeline的。这里是一个测试,也可以自己定义传入pipeline的数据,或通过某种方式和其他模块进行数据对接而传入数据。
定义处理单元:把传入数据的每个数乘以一个整数
// pipeline中的一个处理节点:把各个数据相乘
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int,) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
}()
return multipliedStream
}
这里的intStream是数据输入的channel,multiplier参数是每个输入channel需要乘以的整数值。这样,当数据输出时,所有的数据的值都会乘以multiplier这个数。然后把结果输出到一个新的channel中,最后返回该channel的地址。
定义处理单元:把接入的每个数都加上一个数
以下函数定义了一个pipeline的处理单元,其中的intStream是接入数据的channel,最后输出数据的channel会被返回。
// 为intStream接入的数据都加上一个数additive
add := func(done <-chan interface{}, intStream <-chan int, additive int, ) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i+additive:
}
}
}()
return addedStream
}
主流程
// 创建一个通知管道,主协程来通知各个子协程:结束任务。避免goroutine leak
done := make(chan interface{})
// 主协程退出时通过channel: done发出通知
defer close(done)
// 定义pipeline的起点
intStream := generator(done, 1, 2, 3, 4)
// 定制pipeline
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
// 输出pipeline的最后计算结果
for v := range pipeline {
fmt.Println(v)
}
注意:这里定制的pipeline,实现的处理流程如下:
(1) value * 2 => value1
(2) value1 + 1 => value2
(3) value2 * 2
每个值都完成以上三个操作,然后输出,其pipeline如下:
value ==>[*2] ==>[+1]==>[*2] ==>output
总结
通过channel很容易实现流水线作业(pipeline),流水线作业可以把大的任务分解成小的子任务,让各个子任务专注于解决自己的问题。这样逻辑更加清楚,更容易实现。
以上,是一种实现流水线的编程模型,在实际开发过程中,可以借鉴,修改后即可纳为己用。