前言

CelerymachineryCelerymachinerymachinery

抛砖引玉

machinery

个性

machinery
  • 工作重试机制
  • 提早工作反对
  • 工作回调机制
  • 工作后果记录
  • 反对Workflow模式:Chain,Group,Chord
  • 多Brokers反对:Redis, AMQP, AWS SQS
  • 多Backends反对:Redis, Memcache, AMQP, MongoDB

架构

工作队列,简而言之就是一个放大的生产者消费者模型,用户申请会生成工作,工作生产者一直的向队列中插入工作,同时,队列的处理器程序充当消费者一直的生产工作。基于这种框架设计思维,咱们来看下machinery的简略设计结构图例:

  • Sender:业务推送模块,生成具体任务,可依据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的工作,machinery中目前反对到Redis, AMQP,和SQS;
  • Worker:工作过程,负责消费者性能,解决具体的工作;
  • Backend:后端存储,用于存储工作执行状态的数据;

e.g

学习一门新货色,我都习惯先写一个demo,先学会了走,再学会跑。所以先来看一个例子,性能很简略,异步计算1到10的和。

先看一下配置文件代码:

broker: redis://localhost:6379

default_queue: "asong"

result_backend: redis://localhost:6379

redis:
  max_idle: 3
  max_active: 3
  max_idle_timeout: 240
  wait: true
  read_timeout: 15
  write_timeout: 15
  connect_timeout: 15
  normal_tasks_poll_period: 1000
  delayed_tasks_poll_period: 500
  delayed_tasks_key: "asong"
brokerresult_backend

主代码,完整版github获取:


func main()  {

    cnf,err := config.NewFromYaml("./config.yml",false)
    if err != nil{
        log.Println("config failed",err)
        return
    }

    server,err := machinery.NewServer(cnf)
    if err != nil{
        log.Println("start server failed",err)
        return
    }

    // 注册工作
    err = server.RegisterTask("sum",Sum)
    if err != nil{
        log.Println("reg task failed",err)
        return
    }

    worker := server.NewWorker("asong", 1)
    go func() {
        err = worker.Launch()
        if err != nil {
            log.Println("start worker error",err)
            return
        }
    }()

    //task signature
    signature := &tasks.Signature{
        Name: "sum",
        Args: []tasks.Arg{
            {
                Type:  "[]int64",
                Value: []int64{1,2,3,4,5,6,7,8,9,10},
            },
        },
    }

    asyncResult, err := server.SendTask(signature)
    if err != nil {
        log.Fatal(err)
    }
    res, err := asyncResult.Get(1)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("get res is %v\n", tasks.HumanReadableResults(res))

}

运行后果:

INFO: 2020/10/31 11:32:15 file.go:19 Successfully loaded config from file ./config.yml
INFO: 2020/10/31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 2020/10/31 11:32:15 worker.go:59 - Broker: redis://localhost:6379
INFO: 2020/10/31 11:32:15 worker.go:61 - DefaultQueue: asong
INFO: 2020/10/31 11:32:15 worker.go:65 - ResultBackend: redis://localhost:6379
INFO: 2020/10/31 11:32:15 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 2020/10/31 11:32:16 redis.go:342 Received new message: {"UUID":"task_9f01be1f-3237-49f1-8464-eecca2e50597","Name":"sum","RoutingKey":"asong","ETA":null,"GroupUUID":"","GroupTaskCount":0,"Args":[{"Name":"","Type":"[]int64","Value":[1,2,3,4,5,6,7,8,9,10]}],"Headers":{},"Priority":0,"Immutable":false,"RetryCount":0,"RetryTimeout":0,"OnSuccess":null,"OnError":null,"ChordCallback":null,"BrokerMessageGroupId":"","SQSReceiptHandle":"","StopTaskDeletionOnError":false,"IgnoreWhenTaskNotRegistered":false}
DEBUG: 2020/10/31 11:32:16 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020/10/31 11:32:16 get res is 55

好啦,当初咱们开始讲一讲下面的代码流程,

brokerresult_backendredisMachineryServerServerMachineryworkdersServerserver.NewWorkerSignatureServerHumanReadableResults

多功能

1. 延时工作

machinerymachineysignature
eta := time.Now().UTC().Add(time.Second * 20)
    signature.ETA = &eta

2. 重试工作

 tsak signatureretryTimeoutRetryCount
//task signature
    signature := &tasks.Signature{
        Name: "sum",
        Args: []tasks.Arg{
            {
                Type:  "[]int64",
                Value: []int64{1,2,3,4,5,6,7,8,9,10},
            },
        },
        RetryTimeout: 100,
        RetryCount: 3,
    }
return.tasks.ErrRetryTaskLater
func Sum(args []int64) (int64, error) {
    sum := int64(0)
    for _, arg := range args {
        sum += arg
    }

    return sum, tasks.NewErrRetryTaskLater("我说他错了", 4 * time.Second)

}

3. 工作流

machinery

3.1 Groups

Group

一起来看一个简略的例子:

    // group
    group,err :=tasks.NewGroup(signature1,signature2,signature3)
    if err != nil{
        log.Println("add group failed",err)
    }

    asyncResults, err :=server.SendGroupWithContext(context.Background(),group,10)
    if err != nil {
        log.Println(err)
    }
    for _, asyncResult := range asyncResults{
        results,err := asyncResult.Get(1)
        if err != nil{
            log.Println(err)
            continue
        }
        log.Printf(
            "%v  %v  %v\n",
            asyncResult.Signature.Args[0].Value,
            tasks.HumanReadableResults(results),
        )
    }
group

3.2 chrods

machineyChordgroups

来看一段代码:

callback := &tasks.Signature{
        Name: "call",
    }



    group, err := tasks.NewGroup(signature1, signature2, signature3)
    if err != nil {

        log.Printf("Error creating group: %s", err.Error())
        return
    }

    chord, err := tasks.NewChord(group, callback)
    if err != nil {
        log.Printf("Error creating chord: %s", err)
        return
    }

    chordAsyncResult, err := server.SendChordWithContext(context.Background(), chord, 0)
    if err != nil {
        log.Printf("Could not send chord: %s", err.Error())
        return
    }

    results, err := chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
    if err != nil {
        log.Printf("Getting chord result failed with error: %s", err.Error())
        return
    }
    log.Printf("%v\n", tasks.HumanReadableResults(results))

下面的例子并行执行task1、task2、task3,聚合它们的后果并将它们传递给callback工作。

3.3 chains

chainchain

看这样一段代码:

//chain
    chain,err := tasks.NewChain(signature1,signature2,signature3,callback)
    if err != nil {

        log.Printf("Error creating group: %s", err.Error())
        return
    }
    chainAsyncResult, err := server.SendChainWithContext(context.Background(), chain)
    if err != nil {
        log.Printf("Could not send chain: %s", err.Error())
        return
    }

    results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
    if err != nil {
        log.Printf("Getting chain result failed with error: %s", err.Error())
    }
    log.Printf(" %v\n", tasks.HumanReadableResults(results))
chaincallback

文中代码地址:https://github.com/asong2020/…

总结

machinerymachinerymachiney

获取步骤:关注公众号【Golang梦工厂】,后盾回复:machiney即可获取无水印版~~~

好啦,这一篇文章到这就完结了,咱们下期见~~。心愿对你们有用,又不对的中央欢送指出,可增加我的golang交换群,咱们一起学习交换。

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本人也收集了一本PDF,有须要的小伙能够到自行下载。获取形式:关注公众号:[Golang梦工厂],后盾回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行保护,有须要的小伙伴后盾回复[gin]即可下载。

golangvx

举荐往期文章:

  • 手把手教姐姐写音讯队列
  • 常见面试题之缓存雪崩、缓存穿透、缓存击穿
  • 详解Context包,看这一篇就够了!!!
  • go-ElasticSearch入门看这一篇就够了(一)
  • 面试官:go中for-range应用过吗?这几个问题你能解释一下起因吗
  • 学会wire依赖注入、cron定时工作其实就这么简略!
  • 据说你还不会jwt和swagger-饭我都不吃了带着实际我的项目我就来了
  • 把握这些Go语言个性,你的程度将进步N个品位(二)
  • go实现多人聊天室,在这里你想聊什么都能够的啦!!!
  • grpc实际-学会grpc就是这么简略
  • go规范库rpc实际
  • 2020最新Gin框架中文文档 asong又捡起来了英语,用心翻译
  • 基于gin的几种热加载形式