作者
golang中有2种方式同步程序,一种使用channel,另一种使用sync.WaitGroup。最近在使用golang写一个比较简单的功能 ---- host1主机需要先在本机起一个TCP监听,起来后给host2主机发送指令,让其主动给host1主机监听的端口进行连接。最终使用了sync.WaitGroup实现了该功能。本篇就结合一些示例来看下两者的使用。
一、channel并行同步
比如有三个需要取数据的程序同时进行,但是终需要同步并返回数据。我们可以按如下代码操作:
<br />
package main
import (
"fmt"
"time"
)
func main() {
messages := make(chan int)
go func() {
time.Sleep(time.Second * 3)
messages <- 1
}()
go func() {
time.Sleep(time.Second * 2)
messages <- 2
}()
go func() {
time.Sleep(time.Second * 1)
messages <- 3
}()
go func() {
for i := range messages {
fmt.Println(i)
}
}()
time.Sleep(time.Second * 5)
}
最终取回的结果是3 2 1 ,但是如果该代码中如果不加time.sleep 5秒的动作,程序执行时会出现主进程还未等各个进程执行完成就结束了。因为go函数可以简单理论为shell里的&操作。当然遇到这样的问题,使用sync.WaitGroup是可以解决的。但如果不用sync.WaitGroup,还是使用channel去处理能不能解决呢?
当然是可以的,我们可以再创建一个无缓存的channel,由于该channel是阻塞的,在所有的数据未取出前,主程序就不退出。具体做法如下:
<br />
package main
import (
"fmt"
"time"
)
func main() {
messages := make(chan int)
// Use this channel to follow the execution status
// of our goroutines :D
done := make(chan bool)
go func() {
time.Sleep(time.Second * 3)
messages <- 1
done <- true
}()
go func() {
time.Sleep(time.Second * 2)
messages <- 2
done <- true
}()
go func() {
time.Sleep(time.Second * 1)
messages <- 3
done <- true
}()
go func() {
for i := range messages {
fmt.Println(i)
}
}()
for i := 0; i < 3; i++ {
<-done
}
}
这里上面每个channel执行完成后,会向done这样一个channel里向true,在true的结果没有取出之前,程序就会一直阻塞,直接所有的程序都完成。可能聪明的同学会觉得不需要这么麻烦,只需要把示例1中的代码最后一个go func()去掉,而且把sleep 5秒也去掉,直接改为如下循环取出就可以:
<br />
for i := range messages {
fmt.Println(i)
}
实际执行的时候呢?看下图:
<img src="https://www.361way.com/wp-content/uploads/2019/01/channel-deadlock.png" width="417" height="149" title="channel-deadlock" alt="channel-deadlock" />
二、sync.WaitGroup并行同步处理
sync包提供了基本同步和互持锁。其可以操作的类型有Cond、Locker、Map、Mutex、Once、Pool、RWMutex、WailtGroup。这里只说WaitGroup,WaitGroup提供了三个方法:Add()用来添加计数。Done()用来在操作结束时调用,使计数减一。Wait()用来等待所有的操作结束,即计数变为0,该函数会在计数不为0时等待,在计数为0时立即返回。同样是上面的示例,使用sync.WailtGroup解决比较容易,如下:
<br />
package main
import (
"fmt"
"sync"
"time"
)
func main() {
messages := make(chan int)
var wg sync.WaitGroup
// you can also add these one at
// a time if you need to
wg.Add(3)
go func() {
defer wg.Done()
time.Sleep(time.Second * 3)
messages <- 1
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 2)
messages <- 2
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
messages <- 3
}()
go func() {
for i := range messages {
fmt.Println(i)
}
}()
wg.Wait()
}
在一个wait组里我们增加了三个计数器,每完成一个减1,直到为0时,wait组结束。其同样适用于多线程采集:
<br />
<br />
package main
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)
func main() {
urls := []string{
"http://api.douban.com/v2/book/isbn/9787218087351",
"http://ip.taobao.com/service/getIpInfo.php?ip=202.101.172.35",
"https://jsonplaceholder.typicode.com/todos/1",
}
jsonResponses := make(chan string)
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer wg.Done()
res, err := http.Get(url)
if err != nil {
log.Fatal(err)
} else {
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatal(err)
} else {
jsonResponses <- string(body)
}
}
}(url)
}
go func() {
for response := range jsonResponses {
fmt.Println(response)
}
}()
wg.Wait()
}
<br />
上面是采集3个json数据的返回结果。当然也可以参看下官方的示例,官方的示例和这里略有差别,这个是一次通过len增加了n个wait任务,官方的每处理前就先增加一个。
<br />
package main
import (
"sync"
)
type httpPkg struct{}
func (httpPkg) Get(url string) {}
var http httpPkg
func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}
<br />
参考页面:<a href="https://stackoverflow.com/questions/18207772/how-to-wait-for-all-goroutines-to-finish-without-using-time-sleep" target="_blank" rel="noopener">how-to-wait-for-all-goroutines-to-finish-without-using-time-sleep</a>
<br />