Golang微服务框架Kratos实现分布式任务队列Asynq

任务队列(Task Queue) 一般用于跨线程或跨计算机分配工作的一种机制。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。

任务(Task)
Celery

什么是任务队列

消息队列(Message Queue),一般来说知道的人不少。比如常见的:kafka、Rabbitmq、RocketMQ等。

任务队列(Task Queue),听说过这个概念的人不会太多,清楚它的概念的人怕是更少。

这两个概念是有关系的,他们是怎样的关系呢?任务队列(Task Queue)是消息队列(Message Queue)的超集。任务队列是构建在消息队列之上的。消息队列是任务队列的一部分。

Python
Celery异步任务(Async Task)BrokerCelery BeatTaskBrokerCelery WorkerTaskBackend
BrokerBackendCeleryRabbitMQBrokerRedisBackendProducerConsumer“消息”

综上所述,Celery 作为任务队列是基于消息队列的进一步封装,其实现依赖消息队列。

任务队列的应用场景

我们现在知道了任务队列是什么,也知道了它的工作原理。但是,我们并不知道它可以用来做什么。下面,我们就来看看,它到底用在什么样的场景下。

  • 分布式任务:可以将任务分发到多个工作者进程或机器上执行,以提高任务处理速度。
  • 定时任务:可以在指定时间执行任务。例如:每天定时备份数据、日志归档、心跳测试、运维巡检。支持 crontab 定时模式
  • 后台任务:可以在后台执行耗时任务,例如图像处理、数据分析等,不影响用户界面的响应。
  • 解耦任务:可以将任务与主程序解耦,以提高代码的可读性和可维护性,解耦应用程序最直接的好处就是可扩展性和并发性能的提高。支持并发执行任务,同时支持自动动态扩展。
  • 实时处理:可以支持实时处理任务,例如即时通讯、消息队列等。
  • Asynq概述

    Asynq是一个使用Go语言实现的分布式任务队列和异步处理库,它由Redis提供支持,它提供了轻量级的、易于使用的API,并且具有高可扩展性和高可定制化性。其作者Ken Hibino,任职于Google。

    Asynq主要由以下几个组件组成:

    • 任务(Task):需要被异步执行的操作;
    • 处理器(Processor):负责执行任务的工作进程;
    • 队列(Queue):存放待执行任务的队列;
    • 调度器(Scheduler):根据规则将任务分配给不同的处理器进行执行。

    通过使用Asynq,我们可以非常轻松的实现异步任务处理,同时还可以提供高效率、高可扩展性和高自定义性的处理方案。

    Asynq的特点

    • 保证至少执行一次任务
    • 任务写入Redis后可以持久化
    • 任务失败之后,会自动重试
    • worker崩溃自动恢复
    • 可是实现任务的优先级
    • 任务可以进行编排
    • 任务可以设定执行时间或者最长可执行的时间
    • 支持中间件
    • 可以使用 unique-option 来避免任务重复执行,实现唯一性
    • 支持 Redis Cluster 和 Redis Sentinels 以达成高可用性
    • 作者提供了Web UI & CLI Tool让大家查看任务的执行情况

    Asynq可视化监控

    Asynq提供了两种监控手段:CLI和Web UI。

    命令行工具CLI

    go install github.com/hibiken/asynq/tools/asynq@latest
    

    Web UI

    Asynqmon是一个基于Web的工具,用于监视管理Asynq的任务和队列,有关详细的信息可以参阅工具的README。

    Web UI我们可以通过Docker的方式来进行安装:

    docker pull hibiken/asynqmon:latest
    
    docker run -d 
        --name asynq 
        -p 8080:8080 
        hibiken/asynqmon:latest --redis-addr=host.docker.internal:6379
    

    安装好Web UI之后,我们就可以打开浏览器访问管理后台了:http://localhost:8080

    • 仪表盘
    • 任务视图
    • 性能

    Kratos下实现分布式任务队列

    transport.ServerKratos

    目前,go里面有两个分布式任务队列可用:

    • Asynq
    • Machinery

    我已经对这两个库进行了支持:

    • kratos-transport/Asynq
    • kratos-transport/Machinery

    创建Kratos服务端

    因为它依赖Redis,因此,我们可以使用Docker的方式安装Redis的服务器:

    docker pull bitnami/redis:latest
    
    docker run -itd 
        --name redis-test 
        -p 6379:6379 
        -e ALLOW_EMPTY_PASSWORD=yes 
        bitnami/redis:latest
    

    然后,我们需要在项目中安装Asynq的依赖库:

    go get -u github.com/tx7do/kratos-transport/transport/asynq
    
    Server
    import github.com/tx7do/kratos-transport/transport/asynq
    
    const (
    	localRedisAddr = "127.0.0.1:6379"
    )
    
    ctx := context.Background()
    
    srv := asynq.NewServer(
        asynq.WithAddress(localRedisAddr),
    )
    
    if err := srv.Start(ctx); err != nil {
        panic(err)
    }
    
    defer srv.Stop(ctx)
    

    注册任务回调

    const (
    	testTask1        = "test_task_1"
    	testDelayTask    = "test_delay_task"
    	testPeriodicTask = "test_periodic_task"
    )
    
    type DelayTask struct {
    	Message string `json:"message"`
    }
    
    func DelayTaskBinder() Any { return &DelayTask{} }
    
    func handleTask1(taskType string, taskData *DelayTask) error {
    	LogInfof("Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
    	return nil
    }
    
    func handleDelayTask(taskType string, taskData *DelayTask) error {
    	LogInfof("Delay Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
    	return nil
    }
    
    func handlePeriodicTask(taskType string, taskData *DelayTask) error {
    	LogInfof("Periodic Task Type: [%s], Payload: [%s]", taskType, taskData.Message)
    	return nil
    }
    
    var err error
    
    err = srv.RegisterSubscriber(testTask1,
        func(taskType string, payload MessagePayload) error {
            switch t := payload.(type) {
            case *DelayTask:
                return handleTask1(taskType, t)
            default:
                LogError("invalid payload struct type:", t)
                return errors.New("invalid payload struct type")
            }
        },
        DelayTaskBinder,
    )
    
    err = srv.RegisterSubscriber(testDelayTask,
        func(taskType string, payload MessagePayload) error {
            switch t := payload.(type) {
            case *DelayTask:
                return handleDelayTask(taskType, t)
            default:
                LogError("invalid payload struct type:", t)
                return errors.New("invalid payload struct type")
            }
        },
        DelayTaskBinder,
    )
    
    err = srv.RegisterSubscriber(testPeriodicTask,
        func(taskType string, payload MessagePayload) error {
            switch t := payload.(type) {
            case *DelayTask:
                return handlePeriodicTask(taskType, t)
            default:
                LogError("invalid payload struct type:", t)
                return errors.New("invalid payload struct type")
            }
        },
        DelayTaskBinder,
    )
    
    asynq.Server

    创建新任务

    NewTaskNewPeriodicTaskasynq.Clientasynq.Scheduler
    NewTaskasynq.Client

    普通任务

    普通任务通常是入列后立即执行的(如果不需要排队的),下面就是最简单的任务,一个类型(Type),一个负载数据(Payload)就构成了一个最简单的任务:

    err = srv.NewTask(testTask1, 
        &DelayTask{Message: "delay task"},
    )
    

    当然,你也可以添加一些的参数,比如重试次数、超时时间、过期时间等……

    // 最多重试3次,10秒超时,20秒后过期
    err = srv.NewTask(testTask1, 
        &DelayTask{Message: "delay task"},
        asynq.MaxRetry(10),
        asynq.Timeout(10*time.Second),
        asynq.Deadline(time.Now().Add(20*time.Second)),
    )
    

    延迟任务(Delay Task)

    ProcessAtProcessIn
    ProcessIn
    // 3秒后执行
    err = srv.NewTask(testDelayTask,
        &DelayTask{Message: "delay task"},
        asynq.ProcessIn(3*time.Second),
    )
    
    ProcessAt
    // 1小时后的时间点执行
    oneHourLater := now.Add(time.Hour)
    err = srv.NewTask(testDelayTask,
        &DelayTask{Message: "delay task"},
        asynq.ProcessAt(oneHourLater),
    )
    

    周期性任务(Periodic Task)

    asynq.Scheduler
    // 每分钟执行一次
    _, err = srv.NewPeriodicTask(
        "*/1 * * * ?",
        testPeriodicTask,
        &DelayTask{Message: "periodic task"},
    )
    
    asynq.Scheduler

    示例代码

    示例代码可以在单元测试代码中找到:github.com/tx7do/krato…

    参考资料

    • Asynq – Github
    • Celery – Github
    • Celery 简介
    • 分布式任务队列Celery的实践
    • 分布式任务队列 Celery
    • Asynq: Golang distributed task queue library
    • 异步任务处理系统,如何解决业务长耗时、高并发难题?
    • Asynq: simple, reliable & efficient distributed task queue for your next Go project
    • Asynq: Golang distributed task queue library