以通信来共享内存
将资源读进内存-->共享内存,一个个进程/线程进行处理,这是常见模式。 go channel 是一种直接在进程/线程之间传递资源的方式,即以通信来共享内存。 这便是go的精髓。
扩展-一些名词了解
Linux、IPC(进程间通信)、进程(几个状态)、线程、同步、异步、信号、管道、socket、消息队列、字节流、结构化消息、通信、信号量、共享内存、内核空间、用户空间、PID、PPID、fork、COW 将进程细化为多个状态后,系统CPU就可以更加灵活地调度,进程下还有更细的线程,有N多状态, GO调度器实现一套机制,统一调度与分配这些CPU、进程、线程等资源,分为M(内核)、P(go上下文)、G(goroutine)三层 以上概念不学linux原理的话,听个名词就好,GO中重点学习两个点 1. 第三层G就是我们经常使用的goroutine协程,掌握goroutine的使用 2. GO支持的IPC方法有 信号、管道、socket,掌握这三种的实现方式
channel 定义
chan T 双向 chan<-T 只发送,即只往通道中发送 <- chan T 只接收 通道类型,也是引用类型,零值为nil package main import ( "fmt" ) func main(){ var c3 chan int res := c3 ==nil //chan default:<nil>,eq nil:true fmt.Printf("chan default:%v,eq nil:%v\n",c3,res) }
for 与 channel
可以从未初始化的通道(nil)中取值,但会被阻塞 当通道关闭时,会将通道中的元素全部取出后,语句再结束 必须是一个双向通道或接收通道,不是只是发送通道。 var c3 chan int for elem := range c3 { fmt.Println(elem) }
go channel特性
同一时刻,仅有一个goroutine可以向该通道发送元素值,同时也仅有一个goroutine可以从该通道接收元素值,即通道是串行的。 通道中的元素值,严格按发送到该通道的先后顺序排列,最先发送到的元素值,一定最先被接收。等效于先进先出的消息队列。 通道中的元素值,具有原子性,不可被分割;每个元素值只能被一个goroutine接收,被接收后,立刻从通道中清除。 goroutine的执行与主程序是并行的,主程序结束时,还没有执行完毕的goroutine会被强制中止。 通道的传值是复制,不是引用。
通道初始化
通道未初始化时其值为nil,可以从nil中尝试接收元素,但会被永远阻塞 make(chan int) 初始化一个可以接收、发送int值类型的通道,无缓冲 make(chan int, 10) 初始化一个可以接收、发送int值类型的通道,可缓冲10个int值。 第11个int值向通道中发送时会被阻塞。 被缓冲的元素值,会严格按发送的顺序接收。
从通道中接收元素
c := make(chan int, 10) n := <- c 如果通道 c 被关闭,那么n的值为该元素类型的零值, 本例int类型的零值为0; 如果是在接收的过程中被关闭了,n的值同样为0。 ``` package main import "fmt" func main() { c:= make(chan int,10) close(c) n:=<-c fmt.Println(n) } ``` 输出: 0 这表示从一个关闭的通道中取值,取到的是这个通道类型的0值 n,ok := <- c 这种写法与上面的唯一区别在于,当通道关闭时,ok的值为false ``` package main import "fmt" func main() { c:= make(chan int,10) close(c) n,ok:=<-c fmt.Println(n,ok) // 0 false if n,ok:=<-c; ok { fmt.Println(n) }else{ fmt.Println("通道已关闭") } } ``` 输出: 0 false 通道已关闭 如果是for读取已关闭的通道,则会直接跳过 ``` package main import "fmt" func main() { c:= make(chan int,10) close(c) n,ok:=<-c fmt.Println(n,ok) // 0 false if n,ok:=<-c; ok { fmt.Println(n) }else{ fmt.Println("通道已关闭") } fmt.Println("----------------for start ----------") for n:= range c { fmt.Println(n) } fmt.Println("----------------for over ----------") } ``` 输出: 0 false 通道已关闭 ----------------for start ---------- ----------------for over ----------
关闭通道
通常在发送端关闭通道 不可重复关闭通道,关闭一个已经关闭的或未初始化的通道会引发异常 通道关闭后,其中未接收的数据仍可被接收 接收端应先判断通道是否关闭再从中取值,否则若通道关闭可能取出的值是通道类型的零值 func test11(){ dataChan := make(chan int,3) startChan := make(chan string,1) overChan := make(chan string,2) go func() { <- startChan fmt.Println("start receive data ") for{ if elem,ok:= <- dataChan; ok{ fmt.Printf("%v\n",elem) }else { break } } fmt.Println("reciver over") overChan <- "rec over" }() go func() { for i:=0;i<3;i++ { dataChan <- i fmt.Sprintf("send data:%v\n",i) } fmt.Println("send data over") //在接收之前关闭通道 close(dataChan) fmt.Println("dataChan closed") //开始接收 startChan <- "begin" overChan <- "send data over" }() <- overChan <- overChan fmt.Println("main over") } 执行这段代码输出: send data over dataChan closed start receive data 0 1 2 reciver over main over
长度与容量
长度是通道中元素的个数,非固定值 容量是通道中可能缓存的元素个数最大个数 package main import "fmt" func main() { c1 := make(chan int,5) fmt.Printf("c1 长度:%v, 容量:%v\n",len(c1),cap(c1)) c2 := make(chan int) fmt.Printf("c2 长度:%v, 容量:%v\n",len(c2),cap(c2)) } c1 长度:0, 容量:5 c2 长度:0, 容量:0
单向通道
chan T 双向 用于channel定义 chan<-T 只发送 用于接口、函数参数定义 <- chan T 只接收 用于接口、函数参数定义 双向通道可以转换为发送/接收通道,反之不可以。
channel简单示例
package main import ( "fmt" "time" ) //channel的创建,发送,接收 func channe1(){ //创建,channel是有类型的 c1 := make(chan int) //接收,在这段程序中接收方必须是一个goroutine,因为只在主程序中发送而不接收,程序会报deadlock //通常使用匿名函数开一个与主程序同时执行的内部方法,即并行执行 go func(){ fmt.Println("接收数据准备") //这里接收channel使用了io输出功能,io是可以被抢占控制权的,即IO的特性 fmt.Println(<- c1) fmt.Println("接收数据完成") //关闭,不显式关闭时,channel会随主程序(即main)的运行结束而结束 //如果“接收”处理数据的时间较长,就会出现主程序已经结束,但接收方还没处理完的情况 //此时可以让主程序sleep一段时间,等待接收方把数据处理完毕再关闭 close(c1) fmt.Println("接收结束") }() //发送数据,“接收”程序要在发送之前准备, //意思就是发送数据之前,要先为channel准备好接收; //否则,执行<- 1将1发送到channel时,go发现没有人接收,会报deadlock c1 <- 1 //接收方与主程序同时执行 //主程序在此停止1毫秒,就相当于主程序等了接收方一毫秒 time.Sleep(time.Millisecond) } func main(){ channe1() fmt.Println("主程序结束") }
channel同步
如果一个动作会触发另外一个动作,那么这个行为通常被称为事件(event);如果这个事件不附带信息,那么此类事件又通常被用于同步。 channel有发送、接收、关闭三个操作; 发送触发接收,如果一个channel不发送,那么接收将处于阻塞。这种同步,可用于消息通知。 package main import ( "fmt" "time" ) func test(){ c := make(chan struct{}) go func(){ fmt.Println("我要花两分钟去看看园子里的花还活着吗") time.Sleep(7*time.Second) c <- struct{}{} }() //程序会在这里等待7秒 <- c //然后打印出下面这句话 fmt.Println("这花从屋里移值出去后,活得比以前更好了") } func main(){ test() }
让后台协程有序执行
package main import ( "gitee.com/tanpf/tools" "sync" "time" ) func startTask() { cmdList := make([]string,6) cmdList[0] = "mkdir -p /tmp/test/dir1" cmdList[1] = "mkdir /tmp/test/dir1/dir2" cmdList[2] = "mkdir /tmp/test/dir1/dir2/dir3" cmdList[3] = "mkdir /tmp/test/dir1/dir2/dir3/dir4" cmdList[4] = "mkdir /tmp/test/dir1/dir2/dir3/dir4/dir5" cmdList[5] = "mkdir /tmp/test/dir1/dir2/dir3/dir4/dir5/dir6" var cmdLength = len(cmdList) var cmdChan = make(chan string, cmdLength) var onOff = make(chan int, cmdLength) var wg sync.WaitGroup wg.Add(cmdLength) for i,cmd := range cmdList { cmdChan <- cmd go task(&cmdChan, &onOff, &wg, i, cmdLength) } //启动第一个任务,第一个任务结束时会启动第二个任务,依次类推 onOff <- 0 wg.Wait() } func task(cmdChan *chan string, onOff *chan int, wg *sync.WaitGroup, seq, maxLength int) { // for { // if i,ok := <- *onOff; ok { // if i == seq { // //跳出循环等待,开始当前顺序命令执行 // break // }else { // //将取出的命令执行序号放回通道,然后暂停一段时间该后台协程 // *onOff <- i // //让不该执行的后台协程等待,就会让该执行的后台线程更容易获取执行机会,因为它不用等 // time.Sleep(10*time.Microsecond) // } // } // } // 下面的代码与上面的基本等价,初学还是都体会一下 // 当通道无数据时,for会等待数据的到来,换句话讲,channel会阻塞for循环 // onOff可缓冲6个数据,但同一时间只放一个, // 也就是说,同一时刻最多有一个后台协程获取执行权 for i := range *onOff { if i == seq { //跳出循环等待,开始当前顺序命令执行 break }else { //将取出的命令执行序号放回通道,然后暂停一段时间该后台协程 *onOff <- i //让不该执行的后台协程等待,就会让该执行的后台线程更容易获取执行机会,因为它不用等 time.Sleep(10*time.Microsecond) } } //从缓冲中取出的元素顺序,严格遵从放入时的顺序 //所以取出的顺序一定会按命令列表的下标从0排列到列表结束 if cmd,ok := <- *cmdChan; ok{ tools.ExecShell(cmd) } wg.Done() seq++ if seq < maxLength{ *onOff <- seq } } func main(){ startTask() } 代码能否运行起来不重要,先来体会以下几个内容: 1. 思考一下这段代码是如何体现前面说的五个特性的, 2. 后台协和与main并行执行,main并不会等, 这段代码是使用了waitGroup,直译就是“等待组”,意思也挺贴切 3. 代码中将要执行的命令放到了channel中,这就是用通信来共享内存 4. 最核心的是开关onOff通道,是一个典型的 在各个后台协程中 以通道共享数据 来进行并发控制 的例子 5. 不该自己执行时暂停10毫秒的设计,如果没有这个,可能会出现某几个后台协程之间疯狂在通道中传值的情况 linux mkdir有个特性,其父目录若不存在,则无法创建子目录; 若命令不按列表顺序执行,那么最后的目录肯定不会创建成功, 若成功创建所有目录,则表明命令是按顺序执行的。 若去掉onOff顺序控制,本例就是一个后台并发执行的例子。
channel数组
package main import ( "fmt" "sync" ) type worker struct { in chan int done func() } //“接收”方程序 func (wk *worker) doWork(id int){ //接收的确是按数组顺序顺序打印出来的,但这只是程序第一次运行的情况 //接收是在发送之前就以并行的方式运行起来了,之后数据中每个channel都一直处于阻塞等待状态 //也就是说数组中的每个channel谁先打印出数据,就表示该谁先发送数据(忽略channel传送数据时长差异) for n := range wk.in { fmt.Printf("第 %d 次接收的信息为 %c\n",id,n) //通知主程序工作处理完毕 wk.done() } } func createWorker(id int, wtg *sync.WaitGroup) worker{ //channel作为struct的一个属性 wk := worker{ //chan<-表示channel只用于发送数据,即输入 in : make(chan int), done: func(){ wtg.Done() }, } //channel创建之后,就开始以并行的方式建立“接收”方 go wk.doWork(id) return wk } func channel2(){ //WaitGroup的wait(在主程序中调用)与done(在与主程序并行执行的“接收”方中调用)的交互, //可以达到等待所有channel运行完毕,再让主程序运行的效果 //而不是程序员猜想channel“接收”需要多少时间运行, //然后去主程序中设置time.Sleep让主程序等待 var wtg sync.WaitGroup //channel数组 var workers [8]worker for i := 0; i < 8; i++{ //使用引用的方式传送参数,所有的channel公用一个WaitGroup workers[i] = createWorker(i,&wtg) } //要一次性添加完要等待执行的channel个数 wtg.Add(16) for i,worker := range workers { worker.in <- 'a' + i //wtg.Add(1) //这种方式会报错 } for i,worker := range workers { worker.in <- 'A' + i } //等待所有channel执行完毕,否则一直阻塞 wtg.Wait() fmt.Println("所有channel执行完毕") } func main(){ channel2() fmt.Println("主程序结束") } 第 0 次接收的信息为 a 第 1 次接收的信息为 b 第 2 次接收的信息为 c 第 2 次接收的信息为 C 第 3 次接收的信息为 d 第 3 次接收的信息为 D 第 4 次接收的信息为 e 第 4 次接收的信息为 E 第 0 次接收的信息为 A 第 1 次接收的信息为 B 第 6 次接收的信息为 g 第 5 次接收的信息为 f 第 5 次接收的信息为 F 第 6 次接收的信息为 G 第 7 次接收的信息为 h 第 7 次接收的信息为 H 所有channel执行完毕 主程序结束