使用go实现一个可以显示进度条的线程池
一、概述
有些情况下,我们的程序可能同时有好几个goroutine在跑,并且你想要动态地显示每个goroutine的进度条(比如下载一些比较多大的文件的时候),如果你没有这个需求,只想利用多个线程去去执行一些任务,并不想显示进度条,想去设置参数,不让线程池显示
不得不承认,当前线程池还有一写细节地方可以改进,后续将会在gitee仓库(https://gitee.com/he_fu_ren/thread-pool-with-progress-bar
)进行更新,如果有bug欢迎指出,将会即使修正
二、设计思路
线程池角色介绍:
任务池
: 本质上就是存放所有任务的channel
goroutine的进度
: 其实对应的是图中的goroutine_1
、goroutine_2
、goroutine_3
任务
: 对每个线程所做的事的抽象,是goroutine的进度
的一个属性
进度
: 进度是每个任务
的属性
总进度
: 本质上是一个数组,保存着吗每个goroutine的进度
,其实本质上是监控的每个任务
的进度
监听线程
: 位于图中最右侧,可以监听每个goroutine的执行情况并做相应处理(如:打印进度条)
三、源代码
import (
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
)
// FixedThreadPool 线程池
type FixedThreadPool struct {
*TotalProgress // 总体进程
Tasks []*Task // 所有任务
GoroutineNum int // 线程数
IsVisible bool // 是否显示进度条
}
// NewFixedThreadPool 创建线程池
func NewFixedThreadPool(tasks []*Task, goroutineNum int, isVisible bool) *FixedThreadPool {
return &FixedThreadPool{
Tasks: tasks,
TotalProgress: NewTotalProgress(len(tasks), goroutineNum),
GoroutineNum: goroutineNum,
IsVisible: isVisible,
}
}
// TotalProgress 总进度
type TotalProgress struct {
FinishNum int // 完成数
TaskNum int // 总任务数
AllProgress []*GoroutineProgress // 所有goroutine的进度
}
// NewTotalProgress 初始化总进度
func NewTotalProgress(taskNum, goroutineNum int) *TotalProgress {
return &TotalProgress{
FinishNum: 0,
TaskNum: taskNum,
AllProgress: make([]*GoroutineProgress, goroutineNum),
}
}
// GoroutineProgress 每个goroutine的进度
type GoroutineProgress struct {
GoroutineId int // goroutine的ID
*Task // 正在执行的任务
isIdle bool // 当前线程是否空闲
}
// Exec 执行任务
func (goProgress *GoroutineProgress) Exec() {
goProgress.isIdle = false
goProgress.Run(goProgress.TaskProgress) // 执行任务
goProgress.isIdle = true
}
// Task 每个goroutine所执行的任务
type Task struct {
TaskProgress *TaskProgress // 任务的进度取值范围[1-100]
Run func(*TaskProgress) // 执行
}
func (t Task) GetProgress() int {
return t.TaskProgress.CurrentProgress
}
func NewTask(f func(*TaskProgress)) *Task {
return &Task{
TaskProgress: &TaskProgress{
CurrentProgress: 0},
Run: f,
}
}
// TaskProgress 任务的进度
type TaskProgress struct {
CurrentProgress int // 任务进度单位%
}
// Exec 线程池开始执行任务
func (pool *FixedThreadPool) Exec() {
// 将所有任务放入到channel中
taskChannel := make(chan *Task, pool.TaskNum)
for _, task := range pool.Tasks {
taskChannel <- task
}
// 同步主线程与创建的goroutine
wg := &sync.WaitGroup{
}
wg.Add(pool.TaskNum)
// 通知线程关闭的channel,+1是为了关闭打印进度的线程
done := make(chan struct{
}, pool.GoroutineNum+1)
// 开启goroutine线程
for i := 0; i < pool.GoroutineNum; i++ {
// 监听每个线程的进度
progress := &GoroutineProgress{
GoroutineId: i,
}
pool.TotalProgress.AllProgress[i] = progress
// 开启线程
go func(done chan struct{
}, wg *sync.WaitGroup, progress *GoroutineProgress, totalProgress *TotalProgress) {
Label:
for {
select {
case progress.Task = <-taskChannel:
progress.Exec()
wg.Done()
totalProgress.FinishNum++
case <-done:
// 结束
break Label
}
}
}(done, wg, progress, pool.TotalProgress)
}
if pool.IsVisible {
// 单独开启一个监听正在执行的任务的进度的goroutine
go func(done chan struct{
}, totalProgress *TotalProgress) {
Label:
for {
select {
case <-done:
break Label
default:
// 输出任务的进度
fmt.Printf("当前进度: %d/%d\n", totalProgress.FinishNum, totalProgress.TaskNum)
for _, progress := range totalProgress.AllProgress {
if !progress.isIdle && progress.Task != nil {
fmt.Printf("任务进度: [%s%s] %d%%\n", strings.Repeat("=", progress.GetProgress()), strings.Repeat(" ", 100-progress.GetProgress()), progress.GetProgress())
}
}
// 间隔1s
time.Sleep(time.Second)
// 清屏
fmt.Println("\033c")
}
}
}(done, pool.TotalProgress)
}
// 等待所有任务执行结束
wg.Wait()
for i := 0; i <= pool.GoroutineNum; i++ {
done <- struct{
}{
}
}
log.Println("所有任务执行完毕")
}
四、使用示例
显示进度条
代码
任务的本质其实就是一个func,参数是任务进度的指针,由程序猿自己根据业务决定进度的值
执行效果
1、
2、
3、
4、
不显示进度条
其实和和上面的唯一差别是在创建线程池的时候,其余的都一样
fixedThreadPool := NewFixedThreadPool(tasks, 2, false) // 最后一个参数使用false,表示不打印进度