文章关键词

1.golang启动异步任务

2.生产者消费者模型

文章正文


假如我们有如图所示的业务,后端通过http请求获取用户提供的参数name,然后在接口1中进行业务处理,处理完成后返回响应。很显然这是最简单的业务处理逻辑。

package gin

import (
   "asynchronous/handlers"
   "github.com/gin-gonic/gin"
)

func InitGe() {
   //路由引擎
   r := gin.Default()
   //获取信息
   r.GET("/get_user_infos", handlers.GetRecordsHandler)

   //启动HTTP服务
   r.Run("127.0.0.1:8899")
}

我们用gin框架构建一个简单的服务,其含有一个接口名为/get_user_infos的路由,handler很简单,就是取得用户的http请求的name值,然后在service进行业务处理,返回结果。

package handlers

import (
   "fmt"
   "github.com/gin-gonic/gin"
   "net/http"
)

func GetRecordsHandler(c *gin.Context) {
   name := c.DefaultQuery("name", "")
   fmt.Println("get the name:>>>", name)

   //service业务处理
   service := service{
      Name: name,
   }
   ret, err := service.handleRecordsService()
   if err != nil {
      c.JSONP(http.StatusBadRequest, nil)
      return
   }

   c.JSON(http.StatusOK, ret)
}

service的业务处理也很简单,因为我们只是个演示,简单写个处理过程。其就是封装成一个service对象,返回结果。

package handlers

type service struct {
   Name string
}

func (s *service) handleRecordsService() (map[string]interface{}, error) {
   ret := make(map[string]interface{})

   //需要做的业务
   ret["data"] = map[string]interface{}{
      "result": "hello:" + s.Name,
   }

   return ret, nil
}

运行结果

我们用curl工具访问本地8899端口的get_user_infos的路由,携带一个name为warming的参数,正确返回结果。


假如我们现在有一个额外的任务,它也需要获取到用户的参数name,然后进行额外的业务处理。很简单的一个逻辑就是我们先处理完业务接口1的任务,然后再去执行这个额外任务逻辑。


但是这样会增加延长响应时长,影响用户体验。具体如下。

假如接口1通过调用http协议传递参数到额外的index接口的另一个业务,该业务需要2秒处理时长。

func (s *service) handleRecordsService() (map[string]interface{}, error) {
   ret := make(map[string]interface{})

   //1、自己本身的业务
   ret["data"] = map[string]interface{}{
      "result": "hello:" + s.Name,
   }

   //2、额外的请求127.0.0.1:9000/index接口
   errHandle := HandlerRequestIndex(s.Name)
   if errHandle != nil {
      fmt.Println(errHandle)
      return nil, errHandle
   }

   return ret, nil
}

该接口由python的Flask框架构建,等待go中handleRecordService处理完自己本身的业务,再调用该接口执行额外的任务。

import time

from flask import Flask,request

app = Flask(__name__)


@app.route("/index")
def index():
    if request.method == "GET":
        args = request.args
        ret = {
            "code":6666,
            "data":{
                "name": args.get("name"),
            }
        }
        # TODO 模拟网络延迟
        time.sleep(2)
        return ret


if __name__ == '__main__':
    app.run("127.0.0.1", 9000)

很显然,这样用户需要2s时间等待额外任务处理完成。


我们需要做一个改进,很自然的想法就是把额外的任务接口2做成一个异步任务,然后直接返回响应,而无需等待异步任务完成。


func (s *service) handleRecordsService() (map[string]interface{}, error) {
   ret := make(map[string]interface{})

   //1、自己本身的业务
   ret["data"] = map[string]interface{}{
      "result": "hello:" + s.Name,
   }

   //2、额外的请求127.0.0.1:9000/index接口
   //errHandle := HandlerRequestIndex(s.Name)
   //if errHandle != nil {
   // fmt.Println(errHandle)
   // return nil, errHandle
   //}

   //2、TODO 改进版-额外的请求127.0.0.1:9000/index接口
   go func(name string) {
      errHandle := HandlerRequestIndex(s.Name)
      if errHandle != nil {
         fmt.Println(errHandle)
         return
      }
   }(s.Name)

   return ret, nil
}

把额外的任务包装在一个goroutine里面,结果如我们所愿。


但是这样也有个问题,如果并发量非常大,那么我们就要创建大量的goroutine。每个goroutine都需要建立http请求,那么在高并发的瞬间,就有大量的goroutine和http请求建立。这是非常耗资源的,很容易资源耗尽而崩溃。我们还需要优化。

能不能创建一个生产者-消费者模型?一个请求进来,执行完自身的业务,然后往往缓冲区里面生产一个消息,待消费者空闲时,处理缓冲区里面的消息(也就是进行额外任务的调用)。这样的话,我可以保证消费者占用的资源量。

在我们这个例子中,开辟20个缓冲区(按时机机器的资源开辟),当有个请求进来的时候,处理完自身的业务后就往缓冲区里面写入数据,等待消费者进行处理。而消费者在服务启动的时候就轮询监听缓冲区里的内容,一旦缓冲区里面有生产者生产的物品,就立即取值进行消费处理。

package async_task

import (
   "asynchronous/utils"
   "fmt"
)

type TaskStruct struct {
   Name string `json:"name"`
}

var TaskChan = make(chan TaskStruct, 20)

// AsyncHandleHttpTask
// 项目启动时调用, 监听并获取channel中的数据发送HTTP请求
func AsyncHandleHttpTask() {
   defer func() {
      if errRecover := recover(); errRecover != nil {
         // TODO 记录日志等等,这里就省略了
         fmt.Println("errRecover: ", errRecover)
      }
   }()

   //轮询,监听channel中的数据,如果存在数据则构建请求
   for taskData := range TaskChan {
      fmt.Println("从channel中获取到了数据")
      curName := taskData.Name
      res := utils.HandlerRequestIndex(curName)
      fmt.Println(res)
   }
}

func TransmitDataToTaskChannel(task TaskStruct) {
   defer func() {
      if errRecover := recover(); errRecover != nil {
         // TODO 记录日志等等,这里就省略了
         fmt.Println("errRecover: ", errRecover)
      }
   }()

   select {
   // 只往channel中发送数据
   case TaskChan <- task:
   // 缓冲区满了记录一下
   default:
      fmt.Println("缓冲区满了....")
      // TODO 记录log等
   }

}

业务处理的handler函数如下。

func (s *service) handleRecordsService() (map[string]interface{}, error) {
   ret := make(map[string]interface{})

   //1、自己本身的业务
   ret["data"] = map[string]interface{}{
      "result": "hello:" + s.Name,
   }

   //2、额外的请求127.0.0.1:9000/index接口
   //errHandle := HandlerRequestIndex(s.Name)
   //if errHandle != nil {
   // fmt.Println(errHandle)
   // return nil, errHandle
   //}

   //2、TODO 改进版-额外的请求127.0.0.1:9000/index接口
   //go func(name string) {
   // errHandle := utils.HandlerRequestIndex(s.Name)
   // if errHandle != nil {
   //    fmt.Println(errHandle)
   //    return
   // }
   //}(s.Name)

   // TODO 第三次迭代,生产者消费者模型
   curTask := async_task.TaskStruct{
      Name: s.Name,
   }
   async_task.TransmitDataToTaskChannel(curTask)

   return ret, nil
}