异步任务
组成
- producter:负责任务产生;
- broker:负责任务传递;(暂时保存产生的任务以便于消费)
- consumer:负责任务消费;
- result backend:非必需,用来保存返回结果。
流程
productor
↓↓
broker
↓↑可选:消费后向broker确认已经消费,然后broker删除此任务,否则将超时重发任务
consumer
↓↓
result backend
异步框架
machinery
intsall
go get -u github.com/RichardKnop/machinery/v1
message
// Signature represents a single task invocation
type Signature struct {
UUID string
Name string
RoutingKey string
ETA *time.Time
GroupUUID string
GroupTaskCount int
Args []Arg
Headers Headers
Immutable bool
RetryCount int
RetryTimeout int
OnSuccess []*Signature
OnError []*Signature
ChordCallback *Signature
}
任务生产
// 任务定义
signature := tasks.NewSignature
// 发布
machineryServer.SendTask
异步任务
// 函数的最后一个参数必需是 error。然后这样注册任务。
func Add(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
// register
server.RegisterTasks(map[string]interface{
}{
"add": Add,
})
worker := machineryServer.NewWorker("send_sms", 10)
worker.Launch() // 监听broker
Main
func main() {
// parse cmd args
flag.Parse()
// init config
initConfig()
// init machinery worker
initMachinery()
// register tasks
machineryServer.RegisterTask("sendSMS", sendSMS)
if *worker {
startWorker()
} else {
startWebServer()
}
}
Ytask
特性
- 支持几乎所有类型,包括基本类型( int, floalt, string…),数组切片,结构体以及复杂的结构体嵌套。
- 注册任务,调用任务一行代码完成,不需要对参数进行而外处理。
- 优雅的启动与结束方式,能 1 秒结束任务(如果你用过其他的框架(比如 gocelery,machinery )会发现就算没有任务,他们也没法立即结束任务,而是需要等几秒)
install
go get github.com/gojuukaze/YTask/v2
Task Register
type User struct{
...
}
// 任务函数
func DemoFunc(a int, b float64, c []string, user User) (int, []User, string) {
....
return ....
}
...
ser.Add("group1", "demo_func", DemoFunc)
Task Scheduling
taskId, _ = client.Send("group1", "demo_func", 11, 22.2, []string{
"bb", "cc"}, User{
"hh",24})
Result Getting
result, _ = client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
var a int
var b []User
var c string
a, _ = result.GetInt64(0)
// or
result.Get(1,&b)
// or
result.Gets(&a, &b, &c)
NSQ —— GO的消息队列
“nsq 是消息队列,递的是消息,任务队列框架负责任务队列,传递的是任务。
比较典型的应用场景有:
- 消息队列——可以用来消费日志,把多台服务器的日志合并。但任务队列却不适合这种场景。
- 任务队列——可以用来处理异步的计算任务,比如用户购买了一件物品,需要计算用户的积分等,更新数据库,这时用任务队列比较合适。
消息队列侧重消息的吞吐,处理。任务队列,侧重任务执行,重试,结果返回。
消息队列可以代替任务队列,但还需要进行而外开发,在任务的执行上,提供的功能也不如任务队列。
任务队列更像远程函数调用,不过它和 thrift, grpc 也有不同,它不需要定义描述文件,也不是直接请求,而是借助消息队列传递任务信息”