使用go实现一个可以显示进度条的线程池

使用go实现一个可以显示进度条的线程池

一、概述

​ 有些情况下,我们的程序可能同时有好几个goroutine在跑,并且你想要动态地显示每个goroutine的进度条(比如下载一些比较多大的文件的时候),如果你没有这个需求,只想利用多个线程去去执行一些任务,并不想显示进度条,想去设置参数,不让线程池显示
不得不承认,当前线程池还有一写细节地方可以改进,后续将会在gitee仓库(https://gitee.com/he_fu_ren/thread-pool-with-progress-bar)进行更新,如果有bug欢迎指出,将会即使修正

二、设计思路

线程池角色介绍:

任务池: 本质上就是存放所有任务的channel

goroutine的进度: 其实对应的是图中的goroutine_1goroutine_2goroutine_3

任务: 对每个线程所做的事的抽象,是goroutine的进度的一个属性

进度: 进度是每个任务的属性

总进度: 本质上是一个数组,保存着吗每个goroutine的进度,其实本质上是监控的每个任务进度

监听线程: 位于图中最右侧,可以监听每个goroutine的执行情况并做相应处理(如:打印进度条)

请添加图片描述

三、源代码

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strings"
	"sync"
	"time"
)

// FixedThreadPool 线程池
type FixedThreadPool struct {
    
    
	*TotalProgress         // 总体进程
	Tasks          []*Task // 所有任务
	GoroutineNum   int     // 线程数
	IsVisible      bool    // 是否显示进度条
}

// NewFixedThreadPool 创建线程池
func NewFixedThreadPool(tasks []*Task, goroutineNum int, isVisible bool) *FixedThreadPool {
    
    
	return &FixedThreadPool{
    
    
		Tasks:         tasks,
		TotalProgress: NewTotalProgress(len(tasks), goroutineNum),
		GoroutineNum:  goroutineNum,
		IsVisible:     isVisible,
	}
}

// TotalProgress 总进度
type TotalProgress struct {
    
    
	FinishNum   int                  // 完成数
	TaskNum     int                  // 总任务数
	AllProgress []*GoroutineProgress // 所有goroutine的进度
}

// NewTotalProgress 初始化总进度
func NewTotalProgress(taskNum, goroutineNum int) *TotalProgress {
    
    
	return &TotalProgress{
    
    
		FinishNum:   0,
		TaskNum:     taskNum,
		AllProgress: make([]*GoroutineProgress, goroutineNum),
	}
}

// GoroutineProgress 每个goroutine的进度
type GoroutineProgress struct {
    
    
	GoroutineId int  // goroutine的ID
	*Task            // 正在执行的任务
	isIdle      bool // 当前线程是否空闲
}

// Exec 执行任务
func (goProgress *GoroutineProgress) Exec() {
    
    
	goProgress.isIdle = false
	goProgress.Run(goProgress.TaskProgress) // 执行任务
	goProgress.isIdle = true
}

// Task 每个goroutine所执行的任务
type Task struct {
    
    
	TaskProgress *TaskProgress		 // 任务的进度取值范围[1-100]
	Run          func(*TaskProgress) // 执行
}

func (t Task) GetProgress() int {
    
    
	return t.TaskProgress.CurrentProgress
}

func NewTask(f func(*TaskProgress)) *Task {
    
    
	return &Task{
    
    
		TaskProgress: &TaskProgress{
    
    CurrentProgress: 0},
		Run:          f,
	}
}

// TaskProgress 任务的进度
type TaskProgress struct {
    
    
	CurrentProgress int // 任务进度单位%
}

// Exec 线程池开始执行任务
func (pool *FixedThreadPool) Exec() {
    
    
	// 将所有任务放入到channel中
	taskChannel := make(chan *Task, pool.TaskNum)
	for _, task := range pool.Tasks {
    
    
		taskChannel <- task
	}
	// 同步主线程与创建的goroutine
	wg := &sync.WaitGroup{
    
    }
	wg.Add(pool.TaskNum)
	// 通知线程关闭的channel,+1是为了关闭打印进度的线程
	done := make(chan struct{
    
    }, pool.GoroutineNum+1)

	// 开启goroutine线程
	for i := 0; i < pool.GoroutineNum; i++ {
    
    
		// 监听每个线程的进度
		progress := &GoroutineProgress{
    
    
			GoroutineId: i,
		}
		pool.TotalProgress.AllProgress[i] = progress

		// 开启线程
		go func(done chan struct{
    
    }, wg *sync.WaitGroup, progress *GoroutineProgress, totalProgress *TotalProgress) {
    
    
		Label:
			for {
    
    
				select {
    
    
				case progress.Task = <-taskChannel:
					progress.Exec()
					wg.Done()
					totalProgress.FinishNum++
				case <-done:
					// 结束
					break Label
				}
			}
		}(done, wg, progress, pool.TotalProgress)
	}

	if pool.IsVisible {
    
    
		// 单独开启一个监听正在执行的任务的进度的goroutine
		go func(done chan struct{
    
    }, totalProgress *TotalProgress) {
    
    
		Label:
			for {
    
    
				select {
    
    
				case <-done:
					break Label
				default:
					// 输出任务的进度
					fmt.Printf("当前进度: %d/%d\n", totalProgress.FinishNum, totalProgress.TaskNum)
					for _, progress := range totalProgress.AllProgress {
    
    
						if !progress.isIdle && progress.Task != nil {
    
    
							fmt.Printf("任务进度: [%s%s] %d%%\n", strings.Repeat("=", progress.GetProgress()), strings.Repeat(" ", 100-progress.GetProgress()), progress.GetProgress())
						}
					}
					// 间隔1s
					time.Sleep(time.Second)
					// 清屏
					fmt.Println("\033c")
				}
			}
		}(done, pool.TotalProgress)
	}

	// 等待所有任务执行结束
	wg.Wait()
	for i := 0; i <= pool.GoroutineNum; i++ {
    
    
		done <- struct{
    
    }{
    
    }
	}
	log.Println("所有任务执行完毕")
}

四、使用示例

显示进度条

代码

任务的本质其实就是一个func,参数是任务进度的指针,由程序猿自己根据业务决定进度的值
请添加图片描述

执行效果

1、
请添加图片描述

2、
请添加图片描述

3、
请添加图片描述

4、
请添加图片描述

不显示进度条

其实和和上面的唯一差别是在创建线程池的时候,其余的都一样

fixedThreadPool := NewFixedThreadPool(tasks, 2, false)	// 最后一个参数使用false,表示不打印进度

猜你喜欢

转载自blog.csdn.net/weixin_44829930/article/details/123810107
今日推荐