package req_merge import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" ) var count int32 func TestGroup_Do(t *testing.T) { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) }) var ( wg sync.WaitGroup now = time.Now() n = 1000 //sg = &Group{} ) for i := 0; i < n; i++ { wg.Add(1) go func() { // res, _ := reqMergeGetArticle(sg, 1) res, _ := getArticle(1) if res != "article: 1" { panic("err") } wg.Done() }() } wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) } func TestGroup_Do2(t *testing.T) { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) }) var ( wg sync.WaitGroup now = time.Now() n = 1000 sg = &Group{} ) for i := 0; i < n; i++ { wg.Add(1) go func() { res, _ := reqMergeGetArticle(sg, 1) //res, _ := getArticle(1) if res != "article: 1" { panic("err") } wg.Done() }() } wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) } func getArticle(id int) (article string, err error) { // 假设这里会对数据库进行调用, 模拟不同并发下耗时不同 atomic.AddInt32(&count, 1) time.Sleep(time.Duration(count) * time.Millisecond) return fmt.Sprintf("article: %d", id), nil } func reqMergeGetArticle(sg *Group, id int) (string, error) { v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) { return getArticle(id) }) return v.(string), err } func reqMergeGetArticleHang(sg *Group, id int) (string, error) { v, err, _ := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) { // 模拟出现问题,hang 住 select {} return getArticle(id) }) return v.(string), err } //这个极端情况下会导致整个程序 hang 住,如果我们的代码出点问题,有一个调用 hang 住了,那么会导致所有的请求都 hang 住 // 执行就会发现死锁了 func TestGroup_DoHang(t *testing.T) { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) }) var ( wg sync.WaitGroup now = time.Now() n = 1000 sg = &Group{} ) for i := 0; i < n; i++ { wg.Add(1) go func() { res, _ := reqMergeGetArticleHang(sg, 1) if res != "article: 1" { panic("err") } wg.Done() }() } wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) } func reqMergeGetArticleNoHang(ctx context.Context, sg *Group, id int) (string, error) { result := sg.DoChan(fmt.Sprintf("%d", id), func() (interface{}, error) { // 模拟出现问题,hang 住 select {} return getArticle(id) }) select { case r := <-result: return r.Val.(string), r.Err case <-ctx.Done(): return "超时", ctx.Err() } } // DoChan 结合 select 做超时控制 func TestGroup_DoNoHang2(t *testing.T) { time.AfterFunc(1*time.Second, func() { atomic.AddInt32(&count, -count) }) var ( wg sync.WaitGroup now = time.Now() n = 1000 sg = &Group{} ) ctx,cancel := context.WithCancel(context.Background()) go func() { select { case <-time.After(time.Duration(2) * time.Second): cancel() } }() for i := 0; i < n; i++ { wg.Add(1) go func() { res, _ := reqMergeGetArticleNoHang(ctx,sg, 1) if res != "article: 1" { panic(res) } wg.Done() }() } wg.Wait() fmt.Printf("同时发起 %d 次请求,耗时: %s", n, time.Since(now)) } // 一个出错,全部出错 //实际使用的时候 如果我们一次调用要 1s,我们的数据库请求或者是 //下游服务可以支撑 10rps 的请求的时候这会导致我们的错误阈提高, //因为实际上我们可以一秒内尝试 10 次,但是用了 这个 之后只能尝试一次, //只要出错这段时间内的所有请求都会受影响 func TestName(t *testing.T) { //这种情况我们可以启动一个 Goroutine 定时 Release一下,相当于将 rps 从 1rps 提高到了 10rps go func() { time.Sleep(100 * time.Millisecond) //g.Release(key) }() }