golang 并发设计模式(三)--管道模式管道和显式取消


摘自点击打开链接

Go Concurrency Patterns: Pipelines and cancellation

一、引言

Go并发原语使得构建流式数据管道,高效利用I/O和多核变得简单。这篇文章介绍了几个管道例子,重点指出在操作失败时的细微差别,并介绍了优雅处理失败的技术。

二、什么是管道?

Go没有正式的管道定义。管道只是众多并发程序的一类。一般的,一个管道就是一些列的由channel连接起来的阶段。每个阶段都有执行相同逻辑的goroutine。在每个阶段中,goroutine

·从channel读取上游数据

·在数据上执行一些操作,通常会产生新的数据

·通过channel将数据发往下游

每个阶段都可以有任意个输入channel和输出channel,除了第一个和最有一个channel(只有输入channel或只有输出channel)。第一个步骤通常叫数据源或者生产者,最后一个叫做存储池或者消费者。

我们先从一个简单的管道例子来解释这些概念和技术,稍后我们会介绍一个更为复杂的例子。

数字的平方

假设管道有三个阶段。

第一步,gen函数,是一个将数字列表转换到一个channel中的函数。Gen函数启动了一个goroutine,将数字发送到channel,并在所有数字都发送完后关闭channel。

1

2

3

4

5

6

7

8

9

10

funcgen(nums...int)<-chanint{

out:=make(chanint)

gofunc(){

for_,n:=rangenums{

out<-n

}

close(out)

}()

returnout

}

第二个阶段,sq,从上面的channel接收数字,并返回一个包含所有收到数字的平方的channel。在上游channel关闭后,这个阶段已经往下游发送完所有的结果,然后关闭输出channel:

funcsq(in<-chanint)<-chanint{

out:=make(chanint)

gofunc(){

forn:=rangein{

out<-n*n

}

close(out)

}()

returnout

}

main函数建立这个管道,并执行第一个阶段,从第二个阶段接收结果并逐个打印,直到channel被关闭。

1

2

3

4

5

6

7

8

9

funcmain(){

//Setupthepipeline.

c:=gen(2,3)

out:=sq(c)

//Consumetheoutput.

fmt.Println(<-out)//4

fmt.Println(<-out)//9

}

因为sq对输入channel和输出channel拥有相同的类型,我们可以任意次的组合他们。我们也可以像其他阶段一样,将main函数重写成一个循环遍历。

1

2

3

4

5

6

funcmain(){

//Setupthepipelineandconsumetheoutput.

forn:=rangesq(sq(gen(2,3))){

fmt.Println(n)//16then81

}

}

扇出扇入(Fan-out,fan-in)

多个函数可以从同一个channel读取数据,直到这个channel关闭,这叫扇出。这是一种多个工作实例分布式地协作以并行利用CPU和I/O的方式。

一个函数可以从多个输入读取并处理数据,直到所有的输入channel都被关闭。这个函数会将所有输入channel导入一个单一的channel。这个单一的channel在所有输入channel都关闭后才会关闭。这叫做扇入。

我们可以设置我们的管道执行两个sq实例,每一个实例都从相同的输入channel读取数据。我们引入了一个新的函数,merge,来扇入结果:

1

2

3

4

5

6

7

8

9

10

11

12

funcmain(){

in:=gen(2,3)

//Distributethesqworkacrosstwogoroutinesthatbothreadfromin.

c1:=sq(in)

c2:=sq(in)

//Consumethemergedoutputfromc1andc2.

forn:=rangemerge(c1,c2){

fmt.Println(n)//4then9,or9then4

}

}

merge函数为每一个输入channel启动一个goroutine,goroutine将数据拷贝到同一个输出channel。这样就将多个channel转换成一个channel。一旦所有的outputgoroutine启动起来,merge就启动另一个goroutine,在所有输入拷贝完毕后关闭输出channel。
向一个关闭了的channel发送数据会触发异常,所以在调用close之前确认所有的发送动作都执行完毕很重要。sync.WaitGroup类型为这种同步提供了一种简便的方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

funcmerge(cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint)

//Startanoutputgoroutineforeachinputchannelincs.output

//copiesvaluesfromctooutuntilcisclosed,thencallswg.Done.

output:=func(c<-chanint){

forn:=rangec{

out<-n

}

wg.Done()

}

wg.Add(len(cs))

for_,c:=rangecs{

gooutput(c)

}

//Startagoroutinetocloseoutoncealltheoutputgoroutinesare

//done.Thismuststartafterthewg.Addcall.

gofunc(){

wg.Wait()

close(out)

}()

returnout

}

三、停止的艺术

我们所有的管道函数都遵循一种模式:

·发送者在发送完毕时关闭其输出channel

接收者持续从输入管道接收数据直到输入管道关闭。

这种模式使得每一个接收函数都能写成一个range循环,保证所有的goroutine在数据成功发送到下游后就关闭。

但是在真实的案例中,并不是所有的输入数据都需要被接收处理。有些时候是故意这么设计的:接收者可能只需要数据的子集就够了;或者更一般的,因为输入数据有错误而导致接收函数提早退出。上面任何一种情况下,接收者都不应该继续等待后续的数据到来,并且我们希望上游函数停止生成后续步骤已经不需要的数据。

在我们的管道例子中,如果一个阶段无法消费所有的输入数据,那些发送这些数据的goroutine就会一直阻塞下去:

1

2

3

4

5

6

7

//Consumethefirstvaluefromoutput.

out:=merge(c1,c2)

fmt.Println(<-out)//4or9

return

//Sincewedidn'treceivethesecondvaluefromout,

//oneoftheoutputgoroutinesishungattemptingtosendit.

}

这是一种资源泄漏:goroutine会占用内存和运行时资源。goroutine栈持有的堆引用会阻止GC回收资源。而且goroutine不能被垃圾回收,必须主动退出。

我们必须重新设计管道中的上游函数,在下游函数无法接收所有输入数据时退出。一种方法就是让输出channel拥有一定的缓存。缓存可以存储一定数量的数据。如果缓存空间足够,发送操作就会马上返回:

1

2

3

4

c:=make(chanint,2)//buffersize2

c<-1//succeedsimmediately

c<-2//succeedsimmediately

c<-3//blocksuntilanothergoroutinedoes<-candreceives1

如果在channel创建时就知道需要发送数据的数量,带缓存的channel会简化代码。例如,我们可以重写gen函数,拷贝一系列的整数到一个带缓存的channel而不是创建一个新的goroutine:

1

2

3

4

5

6

7

8

funcgen(nums...int)<-chanint{

out:=make(chanint,len(nums))

for_,n:=rangenums{

out<-n

}

close(out)

returnout

}

反过来我们看管道中被阻塞的goroutine,我们可以考虑为merge函数返回的输出channel增加一个缓存:

funcmerge(cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint,1)//enoughspacefortheunreadinputs

//...therestisunchanged...

虽然这样可以避免了程序中goroutine的阻塞,但这是很烂的代码。选择缓存大小为1取决于知道merge函数接收数字的数量和下游函数消费数字的数量。这是很不稳定的:如果我们向gen多发送了一个数据,或者下游函数少消费了数据,我们就又一次阻塞了goroutine。

然而,我们需要提供一种方式,下游函数可以通知上游发送者下游要停止接收数据。

四、显式取消

当main函数决定在没有从out接收所有的数据而要退出时,它需要通知上游的goroutine取消即将发送的数据。可以通过向一个叫做done的channel发送数据来实现。因为有两个潜在阻塞的goroutine,main函数会发送两个数据:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{

varwgsync.WaitGroup

out:=make(chanint)

//Startanoutputgoroutineforeachinputchannelincs.output

//copiesvaluesfromctooutuntilcisclosedoritreceivesavalue

//fromdone,thenoutputcallswg.Done.

output:=func(c<-chanint){

forn:=rangec{

select{

caseout<-n:

case<-done:

}

}

wg.Done()

}

//...therestisunchanged...

这种方法有一个问题:每一个下游函数需要知道潜在可能阻塞的上游发送者发送报文的数量,以发送响应的信号让其提早退出。跟踪这些数量是无趣的而且很容易出错。

我们需要一种能够让未知或无界数量的goroutine都能够停止向下游发送数据的方法。在Go中,我们可以通过关闭一个channel实现。因为从一个关闭了的channel执行接收操作总能马上成功,并返回相应数据类型的零值。

这意味着main函数仅通过关闭done就能实现将所有的发送者解除阻塞。关闭操作是一个高效的对发送者的广播信号。我们扩展管道中所有的函数接受done作为一个参数,并通过defer来实现相应channel的关闭操作。因此,无论main函数在哪一行退出都会通知上游退出。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

funcmain(){

//Setupadonechannelthat'ssharedbythewholepipeline,

//andclosethatchannelwhenthispipelineexits,asasignal

//forallthegoroutineswestartedtoexit.

done:=make(chanstruct{})

deferclose(done)

in:=gen(done,2,3)

//Distributethesqworkacrosstwogoroutinesthatbothreadfromin.

c1:=sq(done,in)

c2:=sq(done,in)

//Consumethefirstvaluefromoutput.

out:=merge(done,c2)

fmt.Println(<-out)//4or9

//donewillbeclosedbythedeferredcall.

}

现在每一个管道函数在done被关闭后就可以马上返回了。merge函数中的output可以在接收管道的数据消费完之前返回,因为output函数知道上游发送者sq会在done关闭后停止产生数据。同时,output通过defer语句保证wq.Done会在所有退出路径上调用。

这里有个机制:

Ø首先保证sq退出,sq退出,源头的生产者就没有了,merge里面的forrange就会退出;

Ø如果恰巧阻塞到merge的select里面,也由于done已经关闭,对导致解除阻塞而退出;

(原因是:一般通知生产者停止生产时,消费者已经不去消费了caseout<-n:会阻塞,所以slect中的

case<-done:

ruturn

能够执行select多个chan都已经准备就绪,选择是随机的,这就说明sq肯定有机会退出)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

<-done://这里使用的是闭包

return

}

}

}

wg.Add(len(cs))

for_,c:=rangecs{

gooutput(c)

}

//Startagoroutinetocloseoutoncealltheoutputgoroutinesare

//done.Thismuststartafterthewg.Addcall.

gofunc(){

wg.Wait()

close(out)

}()

returnout

}

类似的,sq也可以在done关闭后马上返回。sq通过defer语句使得任何退出路径都能关闭其输出channelout。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

funcsq(done<-chanstruct{},in<-chanint)<-chanint{

out:=make(chanint)

gofunc(){

deferclose(out)

forn:=rangein{

select{

caseout<-n*n:

<-done:

}

}

}()

returnout

}

·管道构建的指导思想如下:

每一个阶段在所有发送操作完成后关闭输出channel。

每一个阶段持续从输入channel接收数据直到输入channel被关闭或者生产者被解除阻塞(译者:生产者退出)

·

·管道解除生产者阻塞有两种方法:

要么保证有足够的缓存空间存储将要被生产的数据;

要么显式的通知生产者消费者要取消接收数据;

经验

l一般情况下有发送者主动关闭chan,特殊情况下才由消费者发信号告知生产者关闭chan,避免生产者的goroutine一直阻塞;

这里使用了一个技巧就是:读取被关闭的channel会立马返回,是不会阻塞的,返回值是chantype的零值;select与之结合天衣无缝的扑捉到关闭信号然后,生产者停止生产退出关闭channel,释放资源。

树形摘要

让我们来看一个更为实际的管道。

MD5是一个信息摘要算法,对于文件校验非常有用。命令行工具md5sum很有用,可以打印一系列文件的摘要值。

我们的例子程序和md5sum类似,但是接受一个单一的文件夹作为参数,打印该文件夹下每一个普通文件的摘要值,并按路径名称排序。

我们程序的main函数调用一个工具函数MD5ALL,该函数返回一个从路径名称到摘要值的哈希表,然后排序并输出结果:

13
)
r range {
r {
. err
}
. sum
}
// Check whether the Walk failed.
errc {
err
}
nil

结论

这篇文章介绍了如果用Go构建流式数据管道的技术。在这样的管道中处理错误有点取巧,因为管道中每一个阶段可能被阻塞不能往下游发送数据,下游阶段可能已经不关心输入数据。我们展示了关闭channel如何向所有管道启动的goroutine广播一个done信号,并且定义了正确构建管道的指导思想。

深入阅读: • Go并发模式(视频)展示了Go并发原语的基本概念和几个实现的方法 • 高级Go并发模式(视频)包含几个更为复杂的Go并发原语的使用,尤其是select • Douglas McIlroy的Squinting at Power Series论文展示了类似Go的并发模式如何为复杂的计算提供优雅的支持。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


类型转换 1、int转string 2、string转int 3、string转float 4、用户结构类型转换
package main import s &quot;strings&quot; import &quot;fmt&quot; var p = fmt.Println func main() { p(&quot;Contains: &quot;, s.Contains(&quot;test&quo
类使用:实现一个people中有一个sayhi的方法调用功能,代码如下: 接口使用:实现上面功能,代码如下:
html代码: beego代码:
1、读取文件信息: 2、读取文件夹下的所有文件: 3、写入文件信息 4、删除文件,成功返回true,失败返回false
配置环境:Windows7+推荐IDE:LiteIDEGO下载地址:http://www.golangtc.com/downloadBeego开发文档地址:http://beego.me/docs/intro/ 安装步骤: 一、GO环境安装 二、配置系统变量 三、Beego安装 一、GO环境安装 根
golang获取程序运行路径:
Golang的文档和社区资源:为什么它可以帮助开发人员快速上手?
Golang:AI 开发者的实用工具
Golang的标准库:为什么它可以大幅度提高开发效率?
Golang的部署和运维:如何将应用程序部署到生产环境中?
高性能AI开发:Golang的优势所在
本篇文章和大家了解一下go语言开发优雅得关闭协程的方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。1.简介本文将介绍首先为什么需要主...
这篇文章主要介绍了Go关闭goroutine协程的方法,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。1.简介本文将介绍首先为什么需要主动关闭gor...
本篇文章和大家了解一下go关闭GracefulShutdown服务的几种方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。目录Shutdown方法Regi...
这篇文章主要介绍了Go语言如何实现LRU算法的核心思想和实现过程,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。GO实现Redis的LRU例子常
今天小编给大家分享的是Go简单实现多租户数据库隔离的方法,相信很多人都不太了解,为了让大家更加了解,所以给大家总结了以下内容,一起往下看吧。一定会...
这篇“Linux系统中怎么安装NSQ的Go语言客户端”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希
本文小编为大家详细介绍“怎么在Go语言中实现锁机制”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么在Go语言中实现锁机制”文章能帮助大家解决疑惑,下面...
今天小编给大家分享一下Go语言中interface类型怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考