本文已参与「新人创作礼」活动,一起开启掘金创作之路。
引子
因为goroutine,go的并发非常方便,实现方式也很简单粗暴go func()
就搞定了。
在上篇文章# 手把手教你写一个golang协程池中已经类比了java的Runnable和goroutine,一样无参数,无返回,无异常。
这样的三无产品当我们进行一个耗时的异步操作时,就会带来巨大的困惑,运行完了没啊?有没有异常,有没有超时。当然你可以用chan来同步状态,但是有没有更简单的方法呢?
context.Context
google于1.7版本将context加入标准库,按照官方文档的说法,它是一个请求的全局上下文,携带了截止时间、手动取消等信号,并包含一个并发安全的map用于携带数据。这个是不是有java Callable的味道了? 首先来看一下Context的定义
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
复制代码
- Deadline返回绑定当前
context
的任务被取消的截止时间;如果没有设定期限,将返回ok == false
。 - Done 当绑定当前
context
的任务被取消时,将返回一个关闭的channel
;如果当前context
不会被取消,将返回nil
。 - Err 如果
Done
返回的channel
没有关闭,将返回nil
;如果Done
返回的channel
已经关闭,将返回非空的值表示任务结束的原因。如果是context
被取消,Err
将返回Canceled
;如果是context
超时,Err
将返回DeadlineExceeded
。 - Value 返回
context
存储的键值对中当前key
对应的值,如果没有对应的key
,则返回nil
(这个是万精油,那都能用)。
咋用呢?
上一章的协程池我们就拿来复用一下吧:
type GorunTask struct {
Id int64
Name string
Status string
Ctx context.Context //在这里增加Context
Run func() error
Callback func(task *GorunTask)
Err error
}
复制代码
我们在任务中增加上下文,在执行方法里面判断执行状态:
func (pool *GorunPool) call(task *GorunTask) {
pool.Ticket++
go func() {
go func() {
task.Status = "running"
task.Err = task.Run()
task.Status = "exected!" //正常执行退出
pool.ResChan <- task
}()
if task.Ctx != nil {
for {
select {
case <-task.Ctx.Done(): //ctx调用cancel退出,被外部强制退出
task.Status = "exit!"
task.Err = errors.New("timeout!")
pool.ResChan <- task
return
default:
time.Sleep(10 * time.Millisecond)
}
}
}
}()
}
复制代码
测试验证
TestCase代码
func TestContext(t *testing.T) {
pool := NewGorunPool(5)
job := func() error {
time.Sleep(10 * time.Second) //执行方法等10秒,确保超时
t.Error("do thread!")
return nil
}
callback := func(task *GorunTask) { //回调方法
t.Error(task.Name, "do callback!", task.Err, task.Status)
}
ctx1, cancel1 := context.WithCancel(context.TODO())
task1 := NewGorunTaskWithCtx(ctx1, job, callback) //创建第一个任务
task1.Name = "T1" //任务别名
pool.Execute(task1)
defer cancel1()
ctx2, cancel2 := context.WithDeadline(context.TODO(), time.Now().Add(3*time.Second))
task2 := NewGorunTaskWithCtx(ctx2, job, callback) //创建第二个任务
task2.Name = "T2"
pool.Execute(task2)
defer cancel2()
<-ctx2.Done() //在这里等待第二个任务超时(3秒超时的任务)
t.Error("ticker:", "task2 exit", ctx2.Err())
timer := time.NewTicker(5 * time.Second)
c := 0
t.Error("start ticker:", c)
for {
select {
case <-ctx1.Done():
c++
t.Error("ticker:", c, "task1 exit")
break
case <-timer.C:
t.Error("ticker:", c)
cancel1() //关闭第一个任务
}
if c > 0 {
break
}
}
time.Sleep(1 * time.Second)
}
复制代码
执行结果
很好,首先是任务2由于dealine超时退出,调用任务2回调函数,接着是定时器关闭任务1,任务1的回调方法执行
context.WithDeadline是如何自动关闭任务的呢? 让我们来看看WithDeadline的执行逻辑
WithDeadline
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 建立新建context与可取消context祖先节点的取消关联关系
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
// 设置取消原因
c.err = err
设置一个关闭的channel或者将done channel关闭,用以发送关闭信号
if c.done == nil {
c.done = closedchan
} else {
close(c.done) //这里关闭了chan信号,协程退出
}
// 将子节点context依次取消
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
// 将当前context节点从父节点上移除
removeChild(c.Context, c)
}
}
复制代码
其实也就是写了个定时器调用cancel方法,是不是有get了新技能