Golang:请求合并
package req_merge
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
var count int32
func TestGroup_Do(t *testing.T) {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 1000
//sg = &Group{}
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
// res, _ := reqMergeGetArticle(sg, 1)
res, _ := getArticle(1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
func TestGroup_Do2(t *testing.T) {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 1000
sg = &Group{}
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
res, _ := reqMergeGetArticle(sg, 1)
//res, _ := getArticle(1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
func getArticle(id int) (article string, err error) {
// 假设这里会对数据库进行调用, 模拟不同并发下耗时不同
atomic.AddInt32(&count, 1)
time.Sleep(time.Duration(count) * time.Millisecond)
return fmt.Sprintf("article: %d", id), nil
}
func reqMergeGetArticle(sg *Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
return getArticle(id)
})
return v.(string), err
}
func reqMergeGetArticleHang(sg *Group, id int) (string, error) {
v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟出现问题,hang 住
select {}
return getArticle(id)
})
return v.(string), err
}
//这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住
// 执行就会发现死锁了
func TestGroup_DoHang(t *testing.T) {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 1000
sg = &Group{}
)
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
res, _ := reqMergeGetArticleHang(sg, 1)
if res != "article: 1" {
panic("err")
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
func reqMergeGetArticleNoHang(ctx context.Context, sg *Group, id int) (string, error) {
result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) {
// 模拟出现问题,hang 住
select {}
return getArticle(id)
})
select {
case r := <-result:
return r.Val.(string), r.Err
case <-ctx.Done():
return "超时", ctx.Err()
}
}
// DoChan 结合 select 做超时控制
func TestGroup_DoNoHang2(t *testing.T) {
time.AfterFunc(1*time.Second, func() {
atomic.AddInt32(&count, -count)
})
var (
wg sync.WaitGroup
now = time.Now()
n = 1000
sg = &Group{}
)
ctx,cancel := context.WithCancel(context.Background())
go func() {
select {
case <-time.After(time.Duration(2) * time.Second):
cancel()
}
}()
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
res, _ := reqMergeGetArticleNoHang(ctx,sg, 1)
if res != "article: 1" {
panic(res)
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now))
}
// 一个出错,全部出错
//实际使用的时候 如果我们一次调用要 1s,我们的数据库请求或者是
//下游服务可以支撑 10rps 的请求的时候这会导致我们的错误阈提高,
//因为实际上我们可以一秒内尝试 10 次,但是用了 这个 之后只能尝试一次,
//只要出错这段时间内的所有请求都会受影响
func TestName(t *testing.T) {
//这种情况我们可以启动一个 Goroutine 定时 Release一下,相当于将 rps 从 1rps 提高到了 10rps
go func() {
time.Sleep(100 * time.Millisecond)
//g.Release(key)
}()
}