goroutine
在Go中,每一个并发执行的活动称为goroutine。
当一个程序启动后,只有一个goroutine来调用main函数,称为主goroutine。新的goroutine通过go创建。
f() // 调用f()等待它返回
go f() // 新建一个调用f()的goroutine, 不用等待
下面,go计算第45个斐波那契,spinner作为指示器,指示程序依然在运行:
import (
"time"
"fmt"
)
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}
上述程序写成了两个两个自主的活动(指示器和斐波那契),但同时在进行。
并发时钟服务器
首先是顺序时钟服务器,以每秒钟一次的频率向客户端发送当前时间:
import (
"net"
"log"
"io"
"time"
)
func main() {
listener, err := net.Listen("tcp","localhost:8000")
if err!= nil{
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil{
log.Print(err)
continue
}
handleConn(conn)
}
}
func handleConn(c net.Conn){
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil{
return
}
time.Sleep(1 * time.Second)
}
}
func mustCopy(dst io.Writer,src io.Reader){
if _, err := io.Copy(dst, src);err != nil{
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil{
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}


可以看见,第二个客户端只有当第一个结束才可以正常工作,因为服务器是顺序的,一次只能处理一个客户请求。
想支持并发:在handleConn的地方添加一个go关键字,使它在自己的goroutine内执行:
func main() {
listener, err := net.Listen("tcp","localhost:8000")
if err!= nil{
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil{
log.Print(err)
continue
}
go handleConn(conn)
}
}
func handleConn(c net.Conn){
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil{
return
}
time.Sleep(1 * time.Second)
}
}


并发回声服务器
回声服务器可以模仿真实的回声,第一次大的回声(“HELLO”),在一定的延迟之后中等音量的回声(“Hello”),最后安静的回声(“hello”)
回声服务器,每个连接使用多个goroutine来处理。
服务端如下:
func echo(c net.Conn, shout string, delay time.Duration){
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan(){
echo(c, input.Text(), 1*time.Second)
}
c.Close()
}
func main() {
l, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := l.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn)
}
}
客户端要实现在终端上输入,同时将服务器的回复复制到输出:
func mustCopy(dst io.Writer,src io.Reader){
if _, err := io.Copy(dst, src);err != nil{
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil{
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
显然,当主goroutine从标准输入读取并发送到服务器的时候,第二个goroutine读取服务器的回复并输出。

其中第三次从客户端进行的呼叫知道第二次回声枯竭才进行处理。而真实的回声由三个独立的回声叠加而成。
对服务端中的HandleConn做出修改,使其在调用echo时加入go关键字:
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan(){
go echo(c, input.Text(), 1*time.Second)
}
c.Close()
}

实现了即时处理。
通道
- 创建
通道是goroutine之间的连接。通道可以让一个goroutine发送特定值到另一个goroutine。每个通道是一个具体类型的导管,佳作通道的元素类型。
可以使用make函数创建一个通道,一个有int类型元素的通道写为chan int。这时创建的是无缓冲通道,还可以接收第二个可选参数,表示通道的容量。
ch := make(chan int) // 无缓冲通道
ch := make(chan int, 0) // 无缓冲通道
ch := make(chan int, 3) // 容量为3的缓冲通道
通道是一个使用make创建的数据结构的引用。当复制或作为参数传递到一个函数时,复制的是引用,调用者与被调用者都引用同一份数据结构。同类型的通道可以使用==进行比较。二者是同一通道数据的引用时,结果为true。
- 操作
通道有三种主要的操作:
- 发送(send)
- 接收(receive)
- 关闭(close)
前两个统称为通信,操作使用 <-
func main() {
ch := make(chan int)
ch <- x // 发送语句,通道在左,值在右
x = <-ch // 接收中,<-放在操作数前面,就是赋值
<-ch // 接收语句,丢弃结果,结果未被使用时合法的
}
关闭(close)中,设置一个标志位来指示当前已经发送完毕。
close(ch)
- 无缓冲通道
无缓冲通道进行的通信导致发送和接收goroutine同步化。所以无缓冲通道也称为同步通道。
因此,为了让程序等待后台的goroutine在完成后再退出,可使用一个通道同步两个goroutine:
func mustCopy(dst io.Writer,src io.Reader){
if _, err := io.Copy(dst, src);err != nil{
log.Fatal(err)
}
}
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil{
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 忽略错误
log.Println("done")
done <- struct{}{} // 指示主goroutine
}()
// defer conn.Close()
// go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
conn.Close()
<- done // 等待后台goroutine完成
}
上述函数,当用户关闭标准输入流时,mustCopy返回,主goroutine调用conn.Close()来关闭两端网络连接。
- 关闭写半边的连接导致服务器看到EOF
- 关闭读半边的连接导致后台goroutine调用io.Copy返回”read from closed connection”错误。 返回前,后台goroutine记录一条消息,然后发送一个值到done通道。主goroutine在退出前一直等待,直到它接收到了这个值。
每一条消息有一个值,但有时候通信本身以及通信发生的时间也很重要,这个消息叫做事件。可使用struct{}元素类型的通道来强调它。
- 管道
通道可以用来连接goroutine,这样一个的输出是另一个输入。这叫做管道。
下面的例子由3个goroutine组成,它们被2个管道连接起来:
func main() {
naturals := make(chan int)
squares := make(chan int)
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
go func() {
for x := range naturals {
squares <- x*x
}
close(squares)
}()
for x := range squares{
fmt.Println(x)
}
}
很简单,最后输出各个数字的平方数。每次循环完了关闭对应通道
单向通道类型
将上述的3个函数分解,写出输入和输出通道。可使用单向通道类型,对上述的函数进行简化:
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
func counter(out chan<- int){
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan <- int, in <- chan int) {
for v := range in{
out <- v * v
}
close(out)
}
func printer(in <- chan int) {
for i := range in{
fmt.Println(i)
}
}
其中,chan <- int是一个只能发送的通道,允许发送但不允许接收。反之,<- chan int是一个只能接收int类型通道,允许接收但不允许发送。
counter(naturals)的调用隐式地将chan int类型转化为参数要求的chan <- int类型。
- 缓冲通道
创建一个可以容纳3个字符串的缓冲通道:
ch = make(chan string, 3)

在缓冲通道上,发送操作在队列的尾部插入一个元素,接收操作从队列的头部移除一个元素。要是满了还要输入或者空了要接收,都会阻塞,直到另一个goroutine对它进行操作。
当前可以无阻塞发送三个值:
ch <- "A"
ch <- "B"
ch <- "C"
此时通道:

如果接收一个值
fmt.Println(<-ch)
缓冲通道如下所示:
可使用查询语句,查询通道缓冲区的容量和当前通道内元素的个数:
fmt.Println(cap(ch)) // 3
fmt.Println(len(ch)) // 2
下面的例子展示goroutine泄露,1个缓冲通道,并发的向三个镜像地址发请求,将它们的响应通过一个缓冲通道进行发送,只接收第一个返回的响应。
func mirroredQuery() string{
response := make(chan string, 3)
go func() {response <- request("www.aaa.com")}()
go func() {response <- request("www.bbb.com")}()
go func() {response <- request("www.ccc.com")}()
return <- response // 返回最快的request
}
func request(hostname string)(reponse string) {/*...*/}
如果是无缓冲通道,两个比较慢的goroutine将被卡住,因为没有goroutine来接收。这种goroutine泄露是一种bug,泄露的goroutine不会自动回收。
并行循环
考虑生成一批全尺寸图像的缩略图
ImageFile从infile中读取一幅图像并把它的缩略图写入同一个目录中。它返回生成的文件名,比如"foo.thumb.jpg"
func ImageFile(infile string) (string, error) {
ext := filepath.Ext(infile) // e.g., ".jpg", ".JPEG"
outfile := strings.TrimSuffix(infile, ext) + ".thumb" + ext
return outfile, ImageFile2(outfile, infile)
}
下面的程序在一个图像文件名字列表上进行循环,然后给每一个图像产生一副缩略图:
func makeThumbnails(filename []string){
for _, f := range filenames{
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
显然,处理文件的顺序没有关系,因为每一个缩略操作和其它的操作独立。像这样由一些完全独立的子问题组成的问题称为高度并行。并行的话,可以对前一个函数进行优化,添加go关键字:
func makeThumbnails2(filename []string){
for _, f := range filenames{
go thumbnail.ImageFile(f) // 忽略了错误
}
}
要注意的是,这个makeThumbnails2函数在没有完成想要完成的事情之前就返回了。它启动了所有的goroutine,每个文件一个,但是没有等它们执行完毕。
可以修改内层goroutine,通过一个共享的通道发送事件来向外层goroutine报告它的完成。
func makeThumbnails3(filename []string){
ch := make(chan struct{})
for _, f := range filenames{
go func(f string) {
thumbnail.ImageFile(f)
ch <- struct{}{}
}(f)
}
for range filenames{
<- ch
}
}
如果调用thumbnail.ImageFile无法创建一个文件,它返回一个错误。则需要返回一个它从扩展的操作中接收到的错误:
func makeThumbnails4(filename []string) error{
errors := make(chan error)
for _, f := range filenames{
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames{
if err := <- errors; err != nil{
return err
}
}
return nil
}
注意到,当遇到第一个非nil的错误时,它将错误返回给调用者,这样没有goroutine继续从errors返回通道上进行接收,直至读完。每一个现存的工作goroutine在试图发送值到此通道的时候永久阻塞,永不终止。这种情况下goroutine泄露可能导致整个程序卡住或者系统内存耗尽。
解决的办法是使用一个缓冲通道来返回生成的图像文件的名称以及任何错误信息:
func makeThumbnails4(filename []string) (thumbfiles []string, err error){
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames))
for _, f := range filenames{
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames{
it : <- ch
if it.err != nil{
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
使用select多路复用
下面的例子对火箭发射进行倒计时。time.Tick函数返回一个通道。定期发送事件,每个事件的值是一个时间戳:
func main() {
fmt.Println("Commencing countdown")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<- tick
}
launch()
}
func launch() {
fmt.Println("Lift off!")
}
输出如下:
Commencing countdown
10
9
8
7
6
5
4
3
2
1
Lift off!
然后可以尝试用select语句,进行多路复用,在倒计时进行时按下回车键来取消发射过程的能力。select语句有一系列情况和可选的默认分支。每一个情况指定一次通信和关联一段代码块。在select中一直等待,直到一次通信来告知有一些情况可以执行,然后再进行通信。
如果多个select同时满足,则随机选择一个,保证每一个通道有相同的机会被选中。
func main() {
// ...创建终止通道...
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <- time.After(10 * time.Second):
wait()
case <- abort:
fmt.Println("abort")
return
}
launch()
}
func launch() {
fmt.Println("Lift off!")
}
func wait() {
fmt.Println("")
}
同时select可以有一个默认情况,用来指定在没有其他的通信发生时可以立即执行的动作:
select {
case <- time.After(10 * time.Second):
wait()
case <- abort:
fmt.Println("abort")
return
default:
// ...
}
取消
先创建一个取消通道,在它上面不发送任何值,但是它的关闭表明程序需要停止它正在做的事情,也定义了一个工具函数cancelled,在它被调用的时候检测取消状态:
var done = make(chan struct{})
func cancelled () bool {
select {
case <- done:
return true
default:
return false
}
}
然后,创建一个读取标准输入的goroutine,这个goroutine通过关闭done通道来广播取消事件:
go func(){
os.Stdin.Read(make([]byte, 1))
close(done)
}()
让goroutine来响应取消操作。在主goroutine在,添加第三个情况到select语句中,它尝试从done通道接收。如果选择这个情况,函数将返回,但是在返回之前它必须耗尽fileSizes通道,丢弃它所有的值,直到通道关闭。做这些是为了保证所有的walkDir调用可以执行完,不会卡在向fileSizes通道发送消息上:
for{
select {
case <- done:
// 耗尽fileSizes以允许已有的goroutine结束
for range fileSizes {
// 不执行任何操作
case size, ok := <-fileSizes:
// ......
}
}
}
walkDir goroutine在开始的时候检测取消状态,如果设置状态,什么都不做立即返回。它让在取消后创建的goroutine什么都不做:
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan <- int64) {
defer n.Done()
if cancelled() {
return
}
for _,entry := range dirents(dir){
// .......
}
}
下面的select让取消操作的延迟更短:
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
return nil
}
defer func() { <- sema }()
// ....
}
综上,最终的代码如下:
import (
"os"
"fmt"
"sync"
"time"
"path/filepath"
)
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
func main() {
roots := os.Args[1:]
if len(roots) == 0 {
roots = []string{"."}
}
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done)
}()
// 并行遍历文件树的每个根。
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
// 定期打印结果。
tick := time.Tick(500 * time.Millisecond)
var nfiles, nbytes int64
loop:
for {
select {
case <-done:
// 排出fileSizes以允许现有的goroutine完成。
for range fileSizes {
// Do nothing.
}
return
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes被关闭
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
var sema = make(chan struct{}, 20)
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}:
case <-done:
return nil
}
defer func() { <-sema }()
f, err := os.Open(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
return nil
}
defer f.Close()
entries, err := f.Readdir(0) // 0 => no limit; read all entries
if err != nil {
fmt.Fprintf(os.Stderr, "du: %v\n", err)
// Don't return: Readdir may return partial results.
}
return entries
}