asynq 概述asynq 是由 Go 语言编写的简单、可靠、高效的分布式任务队列。

Go中简单、可靠、高效的分布式任务队列https://github.com/hibiken/asynqasynq 工作原理概述:

客户端将任务放入队列

服务器从队列中拉取任务并为每个任务启动一个工作协程

任务由多个 worker 同时处理任务队列用作跨多态机器分配工作的机制。一个系统 key 由多个工作服务器和代理组成,让位于高可用性和水平扩展。

特征

保证至少执行一次任务

任务调度

失败任务的重试

worker 崩溃时自动恢复任务

加权优先级队列

严格的优先队列

添加任务的低延迟,因为 Redis 中的写入速度很快

使用唯一选项对任务进行重复数据删除

允许每个任务超时和截止日期

允许聚合任务组以批处理多个连续操作

支持中间件的灵活处理程序接口

能够暂停队列以停止处理队列中的任务

定期任务

支持Redis Cluster 实现自动分片和高可用

支持Redis Sentinels 以实现高可用性

与 Prometheus 集成以收集和可视化队列指标

用于检查和远程控制队列和任务的 Web UI

CLI 检查和远程控制队列和任务

asynq 的基本使用下面我将基于 asynq v0.24.0 版本,实现 asynq 的简单使用。

客户端:注册立即执行的任务 和 定时任务

服务端:处理注册的任务,完成业务逻辑等

代码目录结构如下:

$ tree.├── client                // 客户端服务│   ├── client.go         // main函数,调用asynq完成注册│   └── tasks│       └── tasks.go      // 客户端创建任务└── server                // 服务端服务    ├── server.go         // main函数,调用asynq监听任务并处理 └── tasks        └── tasks.go      // 服务端处理任务

客户端创建一个立即发送邮件的任务 和 定时生成报表的任务,分别使用asynq 的 NewClient 和 NewScheduler。

// test/asynq/client/tasks/tasks.go

package tasksimport ( 'encoding/json' 'fmt' 'github.com/hibiken/asynq')

const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务)

// EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string}

// NewEmailDeliveryTask 创建一个立即发送邮件任务,为指定用户发送func NewEmailDeliveryTask(data EmailDeliveryPayload) (*asynq.Task, error) { payload, err := json.Marshal(data) if err != nil { return nil, err }

// asynq.NewTask // 传入任务名称、序列化的负载信息以及opts配置项 return asynq.NewTask(TypeEmailDelivery, payload), nil}

// NewGenerateDataReportTask 创建一个定时生成报表任务,处理所有用户func NewGenerateDataReportTask() (*asynq.Task, error) { return asynq.NewTask(TypeGenerateDataReport, nil), nil}

// RegisterEmailDeliveryTask 立即发送邮件任务func RegisterEmailDeliveryTask(client *asynq.Client, data EmailDeliveryPayload) error { task, err := NewEmailDeliveryTask(data) if err != nil { return err }

// Enqueue task to be processed immediately. taskInfo, err := client.Enqueue(task, asynq.MaxRetry(1)) if err != nil { return err } fmt.Printf('enqueued task id: %s,queue: %s\n', taskInfo.ID, taskInfo.Queue)

return nil}

// RegisterGenerateDataReportTask 注册生成报表任务func RegisterGenerateDataReportTask(scheduler *asynq.Scheduler, cronspec string) error { task, err := NewGenerateDataReportTask() if err != nil { return err }

entryId, err := scheduler.Register(cronspec, task, asynq.MaxRetry(3)) if err != nil { return err } fmt.Printf('register entryId: %s\n', entryId)

return nil}

// test/asynq/client/client.go

package main

import ( 'fmt' 'github.com/hibiken/asynq' 'test/asynq/client/tasks')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //}

// 1. 注册立即执行的任务 client := asynq.NewClient(redisOpt) err := tasks.RegisterEmailDeliveryTask(client, tasks.EmailDeliveryPayload{ UserId: 'user_id', TemplateId: 'template_id', }) if err != nil { return }  defer client.Close()

// 2. 注册定时任务 scheduler := asynq.NewScheduler(redisOpt, &asynq.SchedulerOpts{ LogLevel: asynq.InfoLevel, PostEnqueueFunc: handlePostEnqueue, })

cronspec := '* * * * *' //'1 2 * * *' // 每天 02:01:00 执行定时任务,最大重试次数3次. err = tasks.RegisterGenerateDataReportTask(scheduler, cronspec) if err != nil { return }

err = scheduler.Run() // 这里需要调用run方法,别忘记了 if err != nil { fmt.Printf('scheduler run error: %v', err) return } defer scheduler.Shutdown()}

func handlePostEnqueue(taskInfo *asynq.TaskInfo, err error) { fmt.Printf('task id: %s, queue: %s, err: %+v\n', taskInfo.ID, taskInfo.Queue, err)}服务端基于客户端注册的任务,服务端启动 handler 来处理这些任务。

// test/asynq/server/tasks/tasks.gopackage tasks

import ( 'context' 'encoding/json' 'fmt' 'github.com/hibiken/asynq' 'time')

const ( TypeEmailDelivery = 'asynq:email:delivery' // 立即发送邮件任务 TypeGenerateDataReport = 'asynq:generate:data:report' // 定时生成报表任务)

// EmailDeliveryPayload 立即发送邮件负载type EmailDeliveryPayload struct { UserId string TemplateId string}

// HandleEmailDeliveryTask 处理发送邮件任务func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { var data EmailDeliveryPayload err := json.Unmarshal(t.Payload(), &data) if err != nil { return err }

// do something fmt.Printf('email delivery data: %+v\n', data) return nil}

// HandleGenerateDataReportTask 处理生成报表任务func HandleGenerateDataReportTask(ctx context.Context, t *asynq.Task) error { // do something fmt.Printf('generate data report, time: %s\n', time.Now().Format(time.RFC3339))  time.Sleep(time.Second*30) return nil}

// test/asynq/server/server.gopackage main

import ( 'context' 'fmt' 'github.com/hibiken/asynq' 'test/asynq/server/tasks')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'} // redis 集群方式 //redisOpt := asynq.RedisClusterClientOpt{ // Addrs: []string{'127.0.0.1:7001', '127.0.0.1:7002', '127.0.0.1:7003'}, //} config := asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, // Optionally specify multiple queues with different priority. Queues: map[string]int{ 'critical': 6, 'default': 3, 'low': 1, }, ErrorHandler: asynq.ErrorHandlerFunc(errHandler), }

// mux maps a type to a handler mux := asynq.NewServeMux() RegisterTaskHandlers(mux)

server := asynq.NewServer(redisOpt, config) err := server.Run(mux) if err != nil { fmt.Printf('async run error: %v', err) return }}

func errHandler(ctx context.Context, task *asynq.Task, err error) { fmt.Printf('task type: %s, payload: %s, err: %+v\n', task.Type(), string(task.Payload()), err)}

// RegisterTaskHandlers 注册任务func RegisterTaskHandlers(mux *asynq.ServeMux) { mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask) mux.HandleFunc(tasks.TypeGenerateDataReport, tasks.HandleGenerateDataReportTask)}分别启动客户端和服务端,运行结果如下:

命令行工具asynq 提供了命令行工具,具体安装如下:

go install github.com/hibiken/asynq/tools/asynq@latest命令如下:

$ asynq dash --helpDisplay interactive dashboard.

USAGE asynq dash [flags]

FLAGS --help Help for dash --refresh Interval between data refresh (default: 8s, min allowed: 1s)

INHERITED FLAGS --cluster Connect to redis cluster --cluster_addrs List of comma-separated redis server addresses --config Config file to set flag defaut values (default is $HOME/.asynq.yaml) --db Redis database number (default is 0) --password Password to use when connecting to redis server --tls_server Server name for TLS validation --uri Redis server URI

EXAMPLES $ asynq dash $ asynq dash --refresh=3s

LEARN MORE Use 'asynq--help' for more information about a command.

Web UIasynq 也提供了Web UI,支持Docker部署和继承到代码中,具体教程可见:

Web UI for monitoring & admininstering Asynq task queuehttps://github.com/hibiken/asynqmon下面使用客户端嵌入 asynqmon 示例:

// test/asynq/client/client.gopackage main

import ( 'github.com/hibiken/asynq' 'github.com/hibiken/asynqmon' 'net/http')

func main() { redisOpt := asynq.RedisClientOpt{Addr: '127.0.0.1:6379'}  startDashboard(redisOpt)}

func startDashboard(r asynq.RedisConnOpt) { go func() { h := asynqmon.New(asynqmon.Options{ RootPath: '/dashboard', // RootPath specifies the root for asynqmon app RedisConnOpt: r, })

// Note: We need the tailing slash when using net/http.ServeMux. http.Handle(h.RootPath()+'/', h) http.ListenAndServe(':8090', nil) }()}

总结

asynq 作为一个分布式任务队列,如果有定时任务的需求,可以考虑使用它,用起来还是很简便的。