前言
CelerymachineryCelerymachinery抛砖引玉
machinery特性
machinery- 任务重试机制
- 延迟任务支持
- 任务回调机制
- 任务结果记录
- 支持Workflow模式:Chain,Group,Chord
- 多Brokers支持:Redis, AMQP, AWS SQS
- 多Backends支持:Redis, Memcache, AMQP, MongoDB
架构
任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:
- Sender:业务推送模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
- Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
- Worker:工作进程,负责消费者功能,处理具体的任务;
- Backend:后端存储,用于存储任务执行状态的数据;
e.g
学习一门新东西,我都习惯先写一个demo,先学会了走,再学会跑。所以先来看一个例子,功能很简单,异步计算1到10的和。
先看一下配置文件代码:
broker: redis://localhost:6379
default_queue: "asong"
result_backend: redis://localhost:6379
redis:
max_idle: 3
max_active: 3
max_idle_timeout: 240
wait: true
read_timeout: 15
write_timeout: 15
connect_timeout: 15
normal_tasks_poll_period: 1000
delayed_tasks_poll_period: 500
delayed_tasks_key: "asong"
brokerresult_backend主代码,完整版github获取:
func main() {
cnf,err := config.NewFromYaml("./config.yml",false)
if err != nil{
log.Println("config failed",err)
return
}
server,err := machinery.NewServer(cnf)
if err != nil{
log.Println("start server failed",err)
return
}
// 注册任务
err = server.RegisterTask("sum",Sum)
if err != nil{
log.Println("reg task failed",err)
return
}
worker := server.NewWorker("asong", 1)
go func() {
err = worker.Launch()
if err != nil {
log.Println("start worker error",err)
return
}
}()
//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
}
asyncResult, err := server.SendTask(signature)
if err != nil {
log.Fatal(err)
}
res, err := asyncResult.Get(1)
if err != nil {
log.Fatal(err)
}
log.Printf("get res is %v\n", tasks.HumanReadableResults(res))
}
运行结果:
INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55
好啦,现在我们开始讲一讲上面的代码流程,
brokerresult_backendredisMachineryServerServerMachineryworkdersServerserver.NewWorkerSignatureServerHumanReadableResults多功能
1. 延时任务
machinerymachineysignatureeta := time.Now().UTC().Add(time.Second * 20)
signature.ETA = &eta
2. 重试任务
tsak signatureretryTimeoutRetryCount//task signature
signature := &tasks.Signature{
Name: "sum",
Args: []tasks.Arg{
{
Type: "[]int64",
Value: []int64{1,2,3,4,5,6,7,8,9,10},
},
},
RetryTimeout: 100,
RetryCount: 3,
}
return.tasks.ErrRetryTaskLaterfunc Sum(args []int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, tasks.NewErrRetryTaskLater("我说他错了", 4 * time.Second)
}
3. 工作流
machinery3.1 Groups
Group一起来看一个简单的例子:
// group
group,err :=tasks.NewGroup(signature1,signature2,signature3)
if err != nil{
log.Println("add group failed",err)
}
asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
if err != nil {
log.Println(err)
}
for _, asyncResult := range asyncResults{
results,err := asyncResult.Get(1)
if err != nil{
log.Println(err)
continue
}
log.Printf(
"%v %v %v\n",
asyncResult.Signature.Args[0].Value,
tasks.HumanReadableResults(results),
)
}
group3.2 chords
machineyChordgroups来看一段代码:
callback := &tasks.Signature{
Name: "call",
}
group, err := tasks.NewGroup(signature1, signature2, signature3)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chord, err := tasks.NewChord(group, callback)
if err != nil {
log.Printf("Error creating chord: %s", err)
return
}
chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
if err != nil {
log.Printf("Could not send chord: %s", err.Error())
return
}
results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chord result failed with error: %s", err.Error())
return
}
log.Printf("%v\n", tasks.HumanReadableResults(results))
上面的例子并行执行task1、task2、task3,聚合它们的结果并将它们传递给callback任务。
3.3 chains
chainchain看这样一段代码:
//chain
chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
if err != nil {
log.Printf("Error creating group: %s", err.Error())
return
}
chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
if err != nil {
log.Printf("Could not send chain: %s", err.Error())
return
}
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.Printf("Getting chain result failed with error: %s", err.Error())
}
log.Printf(" %v\n", tasks.HumanReadableResults(results))
chaincallback文中代码地址:https://github.com/asong2020/Golang_Dream/tree/master/machinery/example
总结
machinerymachinerymachiney