一、并发
并发即多任务同时执行,go是并发语言,并不是并行语言。
并发性和并行性是两种概念:并发是指你能处理不同的事但是你无法同时处理他们,而并行指的是你能同时处理不同的事。如你在跑步时你鞋带松了,你需要停下来系鞋带再继续跑,这就是并发,而并行是你可以一边跑步一边听音乐。
真正的并行是需要靠多核应用来支持的,例如在多核浏览器上同时下载文件和呈现页面。
并行的程序并不一定是最快的,因为一些并行程序之间也是需要相互通信的,而他们之间通信的开销比较大,相对而言并发之间的通信开销就很小,所以并行并不一定执行速度更快。
二、进程、线程、协程
进程是一个程序在一个数据集中的一次动态执行过程,可以简单理解为"正在执行的程序",它是CPU资源分配和调度的独立单位。
进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。进程的局限是创建、撤销和切换的开销比较大。
线程是在进程之后发展出来的概念。线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。一个进程可以包含多个线程。
线程的优点是减小了程序并发执行时的开销,提高了操作系统的并发性能,缺点是线程没有自己的系统资源,只拥有在运行时必不可少的资源,但同一进程的各线程可以共享进程所拥有的系统资源,如果把进程比作一个车间,那么线程就好比是车间里面的工人。不过对于某些独占性资源存在锁机制,处理不当可能会产生"死锁"。
协程是一种用户态的轻量级线程,又称微线程,英文名Coroutine,协程的调度完全由用户控制。人们通常将协程和子程序(函数)比较着理解。
子程序调用总是一个入口,一次返回,一旦退出即完成了子程序的执行。
与传统的系统级线程和进程相比,协程的最大优势在于其"轻量级",可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万的。这也是协程也叫轻量级线程的原因。
协程与多线程相比,其优势体现在∶协程的执行效率极高。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
Go语言通过协程来实现并发
Goroutine
Goroutine是与其他函数或方法同时运行的函数或方法。goroutine可以被认为是轻量级的线程,与线程相比,创建goroutine的成本很小,它就是一段代码,一个函数入口,以及在堆上为其分配的一个堆栈(出事大小为4K,会随着程序的执行自动增长删除)。因为它非常廉价,Go应用程序可以并发运行数千个goroutine
如何使用Goroutine
//使用go关键字
go function()
//在主Goroutine中打印字母,子Goroutine中打印数字
func printNumber() { //注意:作为GOroutine的函数一般都是没有返回值的,有也会被舍弃
for i := 1; i <= 100; i++ {
fmt.Printf("子Goroutine中的i:%d\n", i)
}
}
func main() {
go printNumber()//使用go运行子Goroutine
for i := 1; i <= 100; i++ {
fmt.Printf("\t主Goroutine中打印A%d\n", i)
} //若是主Goroutine运行完了子Goroutine还没有运行完,子Goroutine也会直接结束
//time.sleep(time.second)
}
主Goroutine:封装main函数的Goroutine称为主Goroutine
主Goroutine所做的是并不只是执行main函数,它首先要做的是设定每一个Goroutine所能申请的栈空间的最大尺寸,在32位计算机系统中此最大尺寸为250MB,64位计算机系统则是1GB。如果有某个Goroutine的栈空间尺寸大于这个限制,那么运行时系统就会引发一个栈溢出的运行时恐慌。随后这个go程序也就停止运行了。
此后,主Goroutine会进行一系列的初始化工作:
1.创建一个特殊的defer语句,用于在主goroutine退出时做必要的善后处理。因为主goroutine也可能非正常的结束
2.启动专用于在后台清扫内存垃圾的goroutine,并设置GC可用的标识
3.执行mian包中的init函数
4.执行main函数
执行完main函数后,它还会检查主goroutine是否引发了运行时恐慌,并进行必要的处理。最后主goroutine会结束自己以及当前进程的运行。
Go语言的并发模型
线程模型大致上可以分为三类,分别为:内核级线程模型,用户级线程模型,两级线程模型。
内核线程模型是一种一对一的模型,每一个用户线程对应着一个内核线程KSE,由单独的处理器来对用户线程进行调度,优点是各运行各的互不影响,一个线程阻塞了不会影响其他的线程。缺点是上下文切换开销较大,且没有那么多KSE
用户级线程模型是一种多对一的模型,多个用户线程对应着一个内核线程KSE,一个处理器调度处理多条线程,优点是上下文切换方便,缺点是某条线程阻塞会影响其他线程,为解决此问题很多语言都将完全阻塞的方法封装为了未完全阻塞的方法。。。
两级线程模型则是整合了以上两种模型,是一种多对多的模型,一个处理器可以调度多个用户线程,而当某个线程阻塞时,可以将该处理器上的其他线程分配到其他的处理器上。有点显而易见,缺点则是实现起来比较困难。而Go语言的并发调度就是用的两级线程模型。
Go并发调度:G-P-M模型
在操作系统提供的内核线程之上,Go搭建了一个特有的两级线程模型。goroutine机制实现了M:N的线程模型,goroutine机制是协程(coroutine)的一种实现,golang内置的调度器,可以让多核CPU中每个CPU执行一个协程。
理解goroutine机制的原理,关键是理解Go语言scheduler的实现。
Go语言中支撑整个scheduler实现的主要有4个重要结构,分别是M、G、P、Sched,前三个定义在runtime.h中,Sched定义在proc.c中。
Sched结构就是调度器,它维护有存储M和G的队列以及调度器的一些状态信息等。
M结构是Machine,系统线程,它由操作系统管理的,goroutine就是跑在M之上的;M是一个很大的结构,里面维护小对象内存cache (mcache)、当前执行的goroutine、随机数发生器等等非常多的信息。
P结构是Processor,处理器,它的主要用途就是用来执行goroutine的,它维护了一个goroutine队列,即runqueue。Processor是让我们从N:1调度到M:N调度的重要部分。
G是goroutine实现的核心结构,它包含了栈,指令指针,以及其他对调度goroutine很重要的信息,例如其阻塞的channel。
在单核处理器的场景下,所有goroutine运行在同一个M系统线程中,每一个M系统线程维护一个Processor,任何时刻,一个Processor中只有一个goroutine,其他goroutine在runqueue中等待。一个goroutine运行完自己的时间片后,让出上下文,回到runqueue中。多核处理器的场景下,为了运行goroutines,每个M系统线程会持有一个Processor。
当其中一个Processor的runqueue为空,没有goroutine可以调度。它会从另外一个上下文偷取一半的goroutine。
runtime包
包含与 Go 的运行时系统交互的操作,例如控制 goroutine 的函数。它还包括反射包使用的低级类型信息;有关运行时类型系统的可编程接口
func GOROOT()string//获取GOROOT目录
runtime.GOOS//常量,获取操作系统
func NumCPU()int//获取当前电脑逻辑CPU的数量
func GOMAXPROCS(n int)int//设置go执行时可用的的最大CPU的数量
func Gosched()//让出CPU
func Goexit()//终止当前Goroutine,但其defer语句仍会执行
临界资源安全问题
并发本身并不复杂,但是因为有了资源竞争的问题,就使得我们开发出好的并发程序变得复杂起来,因为会引起很多莫名其妙的问题。
如果多个goroutine在访问同一个数据资源的时候,其中一低线程修改了数据,那么这个数值就被修改了,对于其他的goroutine来讲,这个数值可能是不对的。
为了解决临界资源的安全问题,Go语言可以使用sync包下的锁操作。
当然,在Go的并发编程中有一句很经典的话︰不要以共享内存的方式去通信,而要以通信的方式去共享内存。
在Go语言中并不鼓励用锁保护共享状态的方式在不同的Goroutine中分享信息(以共享内存的方式去通信)。而是鼓励通过channel将共享状态或共享状态的变化在各个Goroutine之间传递〈以通信的方式去共享内存),这样同样能像用锁―样保证在同一的时间只有一个Goroutine访问共享状态。
当然,在主流的编程语言中为了保证多线程之间共享数据安全性和一致性,都会提供一套基本的同步工具集,如锁,条件变量,原子操作等等。Go语言标准库也毫不意外的提供了这些同步机制,使用方式也和其他语言也差不多。
sync包下的WaitGroup
//WaitGroup同步等待组
type WaitGroup struct {
// contains filtered or unexported fields
}
//WaitGroup 等待一组 goroutine 完成。main goroutine 调用 Add 来设置要等待的 goroutine 的数量。然后每个 goroutine 运行并在完成时调用 Done。同时,Wait 可用于阻塞,直到所有 goroutine 完成。
//若是Add设置的值大于goroutine的数量,程序会陷入死锁,报一个恐慌
func (wg *WaitGroup) Add(delta int)//设置等待组中要执行的goroutine的数量(即counter的值)
func (wg *WaitGroup) Done()//让等待组的couter减一
func (wg *WaitGroup) Wait()//让主goroutine等待直到waitgroup的counter为0
//例子:
var wg sync.WaitGroup
func fun(x int) {
defer wg.Done()
for i := 0; i < 10; i++ {
fmt.Println(x, "来了", i)
}
}
func main() {
wg.Add(2)
go fun(1)
go fun(2)
fmt.Println("main add 2 goroutine")
wg.Wait()
fmt.Println("main run end")
}
sync包下的互斥锁
type Locker interface {
Lock()
Unlock()
}
type Mutex struct {
// contains filtered or unexported fields
}//互斥锁
type RWMutex struct {
// contains filtered or unexported fields
}//读写锁
//mutex互斥锁:同时只能上一把锁
var wg sync.WaitGroup
var mutex sync.Mutex
var ticket int = 10
func fun(x int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
for {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
mutex.Lock()
if ticket > 0 {
fmt.Println("售票亭", x, "售票,余票:", ticket-1)
ticket--
} else {
mutex.Unlock()
fmt.Println("票已售完,下次再来吧")
break
}
mutex.Unlock()
}
}
func main() {
wg.Add(4)
go fun(1)
go fun(2)
go fun(3)
go fun(4)
fmt.Println("begin sell ", ticket, "tickets")
wg.Wait()
fmt.Println("end")
}
//RWMutex读写锁:读锁和写锁不能同时存在,读锁可以任意多把同时存在,写锁只能有一把
var wg sync.WaitGroup
var rwmutex sync.RWMutex
var num int = 10
func readData(i int) {
defer wg.Done()
fmt.Printf("%d begin reading\n", i)
rwmutex.RLock()
fmt.Printf("%d is reading\n", i)
time.Sleep(1 * time.Second)
fmt.Println(i, ":", num)
rwmutex.RUnlock()
fmt.Printf("%d end reading\n", i)
}
func writeData(i int) {
defer wg.Done()
fmt.Printf("%d begin writing\n", i)
rwmutex.Lock()
fmt.Printf("%d is writing\n", i)
time.Sleep(2 * time.Second)
num = 8
rwmutex.Unlock()
fmt.Printf("%d end writing\n", i)
}
func main() {
wg.Add(4)
go readData(1)
go writeData(2)
go readData(3)
go writeData(4)
fmt.Println("main start")
wg.Wait()
fmt.Println("main end")
}
channel通道
channel通道可以被认为是Goroutines通信的管道,数据可以从一段发送到另一端,通过通道来接收数据
虽然可以通过sync包下的锁来实现同步,但go语言强烈支持的是使用channel来实现goroutine间的通信。
Go语言中,要传递某个数据给另一个goroutine(协程),可以把这个数据封装成一个对象,然后把这个对象的指针传入某个channel中,另外一个goroutine从这个channel中读出这个指针,并处理其指向的内存对象。Go从语言层面保证同一个时间只有一个goroutine能够访问channel里面的数据,为开发者提供了一种优雅简单的工具,所以Go的做法就是使用channel来通信,通过通信来传递内存数据,使得内存数据在不同的goroutine中传递,而不是使用共享内存来通信。
通道的使用
每个通道都有与其相关的类型,该类型是通道允许传输的数据类型。(通道的零值为nil,nil通道没有任何用处,因此通道必须使用类似于map和切片的方法来定义)
1.声明和创建通道
var channelName chan type//声明格式:var 通道名 chan 通道类型
channelName = make(chan type)//如果通道为nil就需要创建通道
2.发送和接收
data := <- a//从通道a中读取数据
a <- data //将数据写到通道a中
/*箭头所指的方向即是数据流动的方向,流进通道内即发送,从通道内流出则是接收*/
//另:
v,ok := <-a//从通道a中读取数据
func main() {
chan1 := make(chan bool)
var isover bool
go func() {
for i := 0; i < 10; i++ {
fmt.Println("子goroutine正在运行,i=", i)
}
chan1 <- true //发送数据后此处阻塞直到数据被读出去
fmt.Println("子goroutine运行结束")
}()
//通道的读写操作都是阻塞的,在读取到通道中的数据前会一直阻塞
isover = <-chan1
fmt.Println("isover ==", isover)
fmt.Println("main over")
}
注:1.通道是goroutine之间的连接,所以读端和写端不能在同一个goroutine中。
2.读写的阻塞只是默认的,当创建的管道是缓冲通道时,在缓冲区填满(写)或缓冲区为空(读)前不会发生阻塞
3.在程序设计时要避免出现死锁,读和写成对存在。
3.关闭通道
发送者可以通过关闭通道来通知接收方不会有更多的数据被发送到channel中
close(ch)
接受者可以在接受通道中的数据时使用额外的变量来检查通道是否已经关闭
v,ok := <-ch //返回false表示通道已经关闭,此时v的值为ch通道类型的零值,如int类型则为0
func main() {
chan1 := make(chan int)
go func() {
for i := 0; i < 5; i++ {
chan1 <- i
}
close(chan1)
}()
for {
v, ok := <-chan1
if !ok { //返回false表示通道已经关闭了
fmt.Println("channel is closed,ok ==", ok)
//此时再打印一遍v的值为0
break
}
fmt.Println("v ==", v, " ok ==", ok)
}
fmt.Println("main over")
}
可以使用range来循环获取数据直到通道关闭
func main() {
chan1 := make(chan int)
go func() {
for i := 0; i < 5; i++ {
chan1 <- i
}
close(chan1)
}()
for v := range chan1 {
fmt.Println("v ==", v)
}
fmt.Println("main over")
}
注:在这种接收方循环接收数据的情况下,发送方都要手动的关闭通道来告知没有数据传输了,否则就会造成死锁。
4.创建带缓冲区的通道
ch := make(chan type,capacity)
//带缓冲的通道:
//发送数据只有在缓冲区满了时才会阻塞
//接收数据只有在缓冲区为空时才会阻塞
func main() {
chan1 := make(chan int, 3)
go func() {
for i := 0; i < 10; i++ {
fmt.Println("send num...", strconv.Itoa(i))
chan1 <- i
time.Sleep(200 * time.Microsecond)
}
close(chan1)
}()
time.Sleep(2 * time.Second)
for v := range chan1 {
fmt.Println("v ==", v)
}
fmt.Println("main over")
}
5.定向通道/单向通道:只能读或者只能写
chan <- T//只能写
<- chan T//只能读
ch1:=make(chan <- int)//只写
ch2:=make(<- chan int)//只读
//应用场景:通常创建的仍然是双向的通道,只是在某些函数中将双向通道作为单向通道传入,从而实现该函数只能对该通道进行读/写操作
func fun1(ch chan<- int) {
ch <- 1000
}
func fun2(ch1 <-chan int, ch2 chan bool) {
data := <-ch1
fmt.Println(data)
ch2 <- true
}
func main() {
ch1 := make(chan int)
ch2 := make(chan bool)
go fun1(ch1)
go fun2(ch1, ch2)
<-ch2
fmt.Println("main over")
}
6.time包中与通道相关的函数
主要就是定时器,标准库中的Timer让用户可以自定义自己的超时逻辑,尤其是在应对select处理多个channel的超时、但channel读写的超时等情形时尤为方便。
Timer是一次性的时间触发事件,这点与Ticker不同,Ticker是按一定时间间隔持续触发时间事件。
//创建一个计时器,d时间后触发:在自己的通道上持续d时间后发送当前时间
//func NewTimer(d Duration)*Timer
//返回值是Timer类型,是有一个成员变量C的结构体,C的类型是Time类型的只读通道
/*type Timer struct{
C <-chan Time
}*/
timer := time.NewTimer(3*time.Second)
fmt.Printf("%T\n",timer)//*time.Timer
fmt.Println(time.Now)
ch2 := timer.C
fmt.Println(<-ch2)
//或直接fmt.Println(<-timer.C)
//取消定时器
//func (t Timer)Stop()bool
//返回true表示成功停止了,返回false表示停止失败了
timer := time.NewTimer(5*time.Second)
fmt.Println("timer start")
go func(){//由于最终定时器被停止了,所以正常情况下这条goroutine是不会执行的
<- timer.C
fmt.Println("timer over")
}
time.Sleep(3*time.Second)
flag:=timer.Stop()
if flag{
fmt.Println("timer is stopped")
}
//d时间后获取当前时间 <==> NewTimer(d).C
//func After(d Duration) <-chan Time
ch := time.After(3*time.Second)
fmt.Println(time.Now())
fmt.Println(<-ch)
select语句
select是go中的一个控制结构,select语句类似于switch语句,但是select会随机执行一个可运行的case。如果没有case可运行,要看是否有default可以运行,有则运行default,没有它将阻塞,直到有case可以运行。
select{
case communication clause:
statement(s);
case communication clause:
statement(s);
default :
statement(s);
}
- 每个case都必须是一个通信(channel操作)
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果有多个case可以运行,select会随机公平的选出一个执行,其他的不会执行
- 否则:如果有default子句,则执行该语句,没有的话就阻塞到某个case可以运行。go不会重新对channel或值进行求值
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
ch1 <- 100
}()
go func() {
ch2 <- 200
}()
select {
case data := <-ch1:
fmt.Println("ch1 :", data)
case data := <-ch2:
fmt.Println("ch2 :", data)
}
fmt.Println("main over")
}
CSP模型
CSP是Communicating Sequential Process的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。
严格来说,CSP是一门形式语言,用于描述并发系统中的互动模式,也因此成为一众面向并发的编程语言的理论源头,并衍生出Occam/Limbo/Golang…
而具体到编程语言,如Golang,其实只用到了CSP的很小一部分,即理论中的Process/Channel(对应到语言中的goroutine/channel)∶这两个并发原语之间没有从属关系,Process 可以订阅任意个Channel,Channel也并不关心是哪个Process在利用它进行通信;Process围绕Channel进行读写,形成一套有序阻塞和可预测的并发模型。
与主流语言通过共享内存来进行并发控制方式不同,Go语言采用了CSP模式。这是一种用于描述两个独立的并发实体通过共享的通讯Channel(管道)进行通信的并发模型。
Golang就是借用CSP模型的一些概念为之实现并发进行理论支持,其实从实际上出发,go语言并没有完全实现了CSP模型的所有理论,仅仅是借用了process和channel这两个概念。process是在go语言上的表现就是goroutine是实际并发执行的实体,每个实体之间是通过channel通讯来实现数据共享。
Go语言的CSP模型是由协程Goroutine与通道Channel实现;
Go协程goroutine是一种轻量线程,它不是操作系统的线程,而是将一个操作系统线程分段使用,通过调度器实现协作式调度。是一种绿色线程,微线程,它与Coroutine协程也有区别,能够在发现堵塞后启动新的微线程。
通道channel类似Unix的Pipe,用于协程之间通讯和同步。协程之间虽然解耦,但是它们和Channel有着耦合。
Goroutine是实际并发执行的实体,它底层是使用协程(coroutine)实现并发,coroutine是一种运行在用户态的用户线程,类似于greenthread,go底层选择使用coroutine的出发点是因为,它具有以下特点:
1.用户空间避免了内核态和用户态的切换导致的成本
2.可以由语言和框架层进行调度
3.更小的栈空间允许创建大量的实例