文章关键词
1.golang启动异步任务
2.生产者消费者模型
文章正文
假如我们有如图所示的业务,后端通过http请求获取用户提供的参数name,然后在接口1中进行业务处理,处理完成后返回响应。很显然这是最简单的业务处理逻辑。
package gin
import (
"asynchronous/handlers"
"github.com/gin-gonic/gin"
)
func InitGe() {
//路由引擎
r := gin.Default()
//获取信息
r.GET("/get_user_infos", handlers.GetRecordsHandler)
//启动HTTP服务
r.Run("127.0.0.1:8899")
}我们用gin框架构建一个简单的服务,其含有一个接口名为/get_user_infos的路由,handler很简单,就是取得用户的http请求的name值,然后在service进行业务处理,返回结果。
package handlers
import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
)
func GetRecordsHandler(c *gin.Context) {
name := c.DefaultQuery("name", "")
fmt.Println("get the name:>>>", name)
//service业务处理
service := service{
Name: name,
}
ret, err := service.handleRecordsService()
if err != nil {
c.JSONP(http.StatusBadRequest, nil)
return
}
c.JSON(http.StatusOK, ret)
}service的业务处理也很简单,因为我们只是个演示,简单写个处理过程。其就是封装成一个service对象,返回结果。
package handlers
type service struct {
Name string
}
func (s *service) handleRecordsService() (map[string]interface{}, error) {
ret := make(map[string]interface{})
//需要做的业务
ret["data"] = map[string]interface{}{
"result": "hello:" + s.Name,
}
return ret, nil
}运行结果
我们用curl工具访问本地8899端口的get_user_infos的路由,携带一个name为warming的参数,正确返回结果。
假如我们现在有一个额外的任务,它也需要获取到用户的参数name,然后进行额外的业务处理。很简单的一个逻辑就是我们先处理完业务接口1的任务,然后再去执行这个额外任务逻辑。
但是这样会增加延长响应时长,影响用户体验。具体如下。
假如接口1通过调用http协议传递参数到额外的index接口的另一个业务,该业务需要2秒处理时长。
func (s *service) handleRecordsService() (map[string]interface{}, error) {
ret := make(map[string]interface{})
//1、自己本身的业务
ret["data"] = map[string]interface{}{
"result": "hello:" + s.Name,
}
//2、额外的请求127.0.0.1:9000/index接口
errHandle := HandlerRequestIndex(s.Name)
if errHandle != nil {
fmt.Println(errHandle)
return nil, errHandle
}
return ret, nil
}
该接口由python的Flask框架构建,等待go中handleRecordService处理完自己本身的业务,再调用该接口执行额外的任务。
import time
from flask import Flask,request
app = Flask(__name__)
@app.route("/index")
def index():
if request.method == "GET":
args = request.args
ret = {
"code":6666,
"data":{
"name": args.get("name"),
}
}
# TODO 模拟网络延迟
time.sleep(2)
return ret
if __name__ == '__main__':
app.run("127.0.0.1", 9000)很显然,这样用户需要2s时间等待额外任务处理完成。
我们需要做一个改进,很自然的想法就是把额外的任务接口2做成一个异步任务,然后直接返回响应,而无需等待异步任务完成。
func (s *service) handleRecordsService() (map[string]interface{}, error) {
ret := make(map[string]interface{})
//1、自己本身的业务
ret["data"] = map[string]interface{}{
"result": "hello:" + s.Name,
}
//2、额外的请求127.0.0.1:9000/index接口
//errHandle := HandlerRequestIndex(s.Name)
//if errHandle != nil {
// fmt.Println(errHandle)
// return nil, errHandle
//}
//2、TODO 改进版-额外的请求127.0.0.1:9000/index接口
go func(name string) {
errHandle := HandlerRequestIndex(s.Name)
if errHandle != nil {
fmt.Println(errHandle)
return
}
}(s.Name)
return ret, nil
}把额外的任务包装在一个goroutine里面,结果如我们所愿。
但是这样也有个问题,如果并发量非常大,那么我们就要创建大量的goroutine。每个goroutine都需要建立http请求,那么在高并发的瞬间,就有大量的goroutine和http请求建立。这是非常耗资源的,很容易资源耗尽而崩溃。我们还需要优化。
能不能创建一个生产者-消费者模型?一个请求进来,执行完自身的业务,然后往往缓冲区里面生产一个消息,待消费者空闲时,处理缓冲区里面的消息(也就是进行额外任务的调用)。这样的话,我可以保证消费者占用的资源量。
在我们这个例子中,开辟20个缓冲区(按时机机器的资源开辟),当有个请求进来的时候,处理完自身的业务后就往缓冲区里面写入数据,等待消费者进行处理。而消费者在服务启动的时候就轮询监听缓冲区里的内容,一旦缓冲区里面有生产者生产的物品,就立即取值进行消费处理。
package async_task
import (
"asynchronous/utils"
"fmt"
)
type TaskStruct struct {
Name string `json:"name"`
}
var TaskChan = make(chan TaskStruct, 20)
// AsyncHandleHttpTask
// 项目启动时调用, 监听并获取channel中的数据发送HTTP请求
func AsyncHandleHttpTask() {
defer func() {
if errRecover := recover(); errRecover != nil {
// TODO 记录日志等等,这里就省略了
fmt.Println("errRecover: ", errRecover)
}
}()
//轮询,监听channel中的数据,如果存在数据则构建请求
for taskData := range TaskChan {
fmt.Println("从channel中获取到了数据")
curName := taskData.Name
res := utils.HandlerRequestIndex(curName)
fmt.Println(res)
}
}
func TransmitDataToTaskChannel(task TaskStruct) {
defer func() {
if errRecover := recover(); errRecover != nil {
// TODO 记录日志等等,这里就省略了
fmt.Println("errRecover: ", errRecover)
}
}()
select {
// 只往channel中发送数据
case TaskChan <- task:
// 缓冲区满了记录一下
default:
fmt.Println("缓冲区满了....")
// TODO 记录log等
}
}业务处理的handler函数如下。
func (s *service) handleRecordsService() (map[string]interface{}, error) {
ret := make(map[string]interface{})
//1、自己本身的业务
ret["data"] = map[string]interface{}{
"result": "hello:" + s.Name,
}
//2、额外的请求127.0.0.1:9000/index接口
//errHandle := HandlerRequestIndex(s.Name)
//if errHandle != nil {
// fmt.Println(errHandle)
// return nil, errHandle
//}
//2、TODO 改进版-额外的请求127.0.0.1:9000/index接口
//go func(name string) {
// errHandle := utils.HandlerRequestIndex(s.Name)
// if errHandle != nil {
// fmt.Println(errHandle)
// return
// }
//}(s.Name)
// TODO 第三次迭代,生产者消费者模型
curTask := async_task.TaskStruct{
Name: s.Name,
}
async_task.TransmitDataToTaskChannel(curTask)
return ret, nil
}