Go语言并行之美 -- 超越 "Hello World"
偶尔学习一种新的编程语言是件好事,但不能仅止步于 “Hello World"。
时常学习一种新的编程语言对你有好处,即使这种语言不会流行起来或者已经过时。使用新的语言处理旧的问题会促使你重新思考你当前处理问题的视角、方法和习惯。
我喜欢尝试新鲜的事物,特别是编程语言。但是,当你用新的语言实现了“你好,世界!”或者斐波那契序列之后,通常你会感到基本上再没什么可做的,没有任何新奇的地方。你可以试着实现埃拉托斯特尼筛法,借此探索一点数据结构和算法性能。但是我想要一些实际的东西,可能以后还会被复用。因此,不久前我自己创造了一个问题,这个问题可以帮助我仅用几百行代码去熟悉一种语言。
问题涉及了一种语言几个非常重要的方面:字符串,文件和网络输入输出,当然还有并行。称这个问题为 TCP/IP 代理(或者你可以称它 网络调试器)。问题的想法:你有一个TCP/IP侦听器(单线程或多线程)在给定的端口接受连接,当它接受到连接(调用者)时,它必须连接另外一台主机(远程主机),并且在调用者和远程主机间全双工传输数据。另外,代理还可以使用各种格式的日志记录通信内容,用来帮助分析数据。
我不再计算需要使用这种工具的场合。任何时候,只要涉及到网络编程,这种工具必不可少。我已经用不同的语言实现了这种工具很多次:C,C++,Perl,PHP。最近的两个实现用的是Python和Erlang。这种工具代表了我正在寻找的那种实际问题。
我可以指定更具体的需求。应用必须能够同时服务多个连接。对于每个连接,它需要以三种方式记录数据:一个以十六进制格式导出的表示双向顺序数据的导出日志文件;两个二进制日志文件分别记录输入和输出数据流。
这篇文章我将用Go语言来实现这个程序。Go语言的作者声称Go语言血脉中就支持并行和多线程。我打算让它们名符其实。
如果借助boost库使用C++开发这个程序的话,我很可能选择主侦听器线程加上用于服务每个连接的线程。由此,一个单独的连接将完全占用一个线程。
下面是用Go语言实现的程序中用来服务每个连接的线程:
1. 一个双向十六进制导出器线程
2. 两个线程以二进制格式记录输入和输出数据流
3. 两个线程用来在本地和远端主机间双向传输数据
总共5个线程。
再强调一遍,五个线程用来服务每一个单独的连接。我实现所有这些线程不是因为多线程本身的缘故,而是因为Go语言鼓励多线程,C++恰恰相反。Go语言本身就支持多线程,使用起来非常简单。我用Go语言实现TCP/IP代理时没有使用互斥信号量和条件变量。使用Go语言的管道(channels)可以优雅地实现线程同步。
好吧,下面是源代码,带有解释。如果你不熟悉Go编程语言,注释应该会有帮助。我的目的不只关注程序功能,同时还关注Go语言本身。
让我们出发!
2-11行声明了程序将要用到的包。需要注意,如果一个包被包含了但是没有用到,Go将认为这是一种错误,并强制删除没有用到的声明(还记得上次你想放弃并不耐烦地清理C++项目中STL包含进来的列表吗?)
1 package main 2 import ( 3 "encoding/hex" 4 "flag" 5 "fmt" 6 "net" 7 "os" 8 "runtime" 9 "strings" 10 "time" 11 )
12 var ( 13 host *string = flag.String("host", "", "target host or address") 14 port *string = flag.String("port", "0", "target port") 15 listen_port *string = flag.String("listen_port", "0", "listen port") 16 )
17-20行我们看到了Go语言可变参数函数参数的语法。
17 func die(format string, v ...interface{}) { 18 os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...)) 19 os.Exit(1) 20 }
21-28行有两个函数用来启动十六进制导出和二进制日志器。区别仅在于日志名不同。
21 func connection_logger(data chan []byte, conn_n int, local_info, remote_info string) { 22 log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log", format_time(time.Now()), conn_n, local_info, remote_info) 23 logger_loop(data, log_name) 24 } 25 func binary_logger(data chan []byte, conn_n int, peer string) { 26 log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log", format_time(time.Now()), conn_n, peer) 27 logger_loop(data, log_name) 28 }
29-43行是Go乐趣的开始,logger_loop函数创建一个日志文件,然后开始进入无限循环(35-42行)。36行代码等待来之管道data的消息。一个有意思的技巧在34行,defer操作符允许我们定义一个代码块,在函数域的末尾这个代码块一定会执行(类似于Java的 finally)。如果接收到的数据为空,函数退出。
29 func logger_loop(data chan []byte, log_name string) { 30 f, err := os.Create(log_name) 31 if err != nil { 32 die("Unable to create file %s, %v\n", log_name, err) 33 } 34 defer f.Close() 35 for { 36 b := <-data 37 if len(b) == 0 { 38 break 39 } 40 f.Write(b) 41 f.Sync() 42 } 43 } 44 func format_time(t time.Time) string { 45 return t.Format("2006.01.02-15.04.05") 46 } 47 func printable_addr(a net.Addr) string { 48 return strings.Replace(a.String(), ":", "-", -1) 49 } 50 type Channel struct { 51 from, to net.Conn 52 logger, binary_logger chan []byte 53 ack chan bool 54 }
55-58有一个函数从源套接字from读数据,写到日志文件,再将数据发送到目标套接字to。对于每一个连接有两个pass_through函数的实例在本地和远程套接字间向相反的方向拷贝数据。当I/O错误出现时,视为连接断开。最后,79行的函数向主线程发送确认,通知pass_through函数终止。
55 func pass_through(c *Channel) { 56 from_peer := printable_addr(c.from.LocalAddr()) 57 to_peer := printable_addr(c.to.LocalAddr()) 58 b := make([]byte, 10240) 59 offset := 0 60 packet_n := 0 61 for { 62 n, err := c.from.Read(b) 63 if err != nil { 64 c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n", from_peer)) 65 break 66 } 67 if n > 0 { 68 c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X) %d bytes from %s\n", packet_n, offset, n, from_peer)) 69 c.logger <- []byte(hex.Dump(b[:n])) 70 c.binary_logger <- b[:n] 71 c.to.Write(b[:n]) 72 c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n", packet_n, to_peer)) 73 offset += n 74 packet_n += 1 75 } 76 } 77 c.from.Close() 78 c.to.Close() 79 c.ack <- true 80 }
81-107行的函数处理整所有连接。它连接远程套接字(82行),测量连接持续时间(行88,101-103),启动日志器(行93-95),最后启动两个数据传输线程(行97-98)。函数pass_through运行直至通信两端都启动。行99-100等待来之数据传输线程的确认。行104-106终止日志器。
81 func process_connection(local net.Conn, conn_n int, target string) { 82 remote, err := net.Dial("tcp", target) 83 if err != nil { 84 fmt.Printf("Unable to connect to %s, %v\n", target, err) 85 } 86 local_info := printable_addr(remote.LocalAddr()) 87 remote_info := printable_addr(remote.RemoteAddr()) 88 started := time.Now() 89 logger := make(chan []byte) 90 from_logger := make(chan []byte) 91 to_logger := make(chan []byte) 92 ack := make(chan bool) 93 go connection_logger(logger, conn_n, local_info, remote_info) 94 go binary_logger(from_logger, conn_n, local_info) 95 go binary_logger(to_logger, conn_n, remote_info) 96 logger <- []byte(fmt.Sprintf("Connected to %s at %s\n", target, format_time(started))) 97 go pass_through(&Channel{remote, local, logger, to_logger, ack}) 98 go pass_through(&Channel{local, remote, logger, from_logger, ack}) 99 <-ack // Make sure that the both copiers gracefully finish. 100 <-ack // 101 finished := time.Now() 102 duration := finished.Sub(started) 103 logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n", format_time(started), duration.String())) 104 logger <- []byte{} // Stop logger 105 from_logger <- []byte{} // Stop "from" binary logger 106 to_logger <- []byte{} // Stop "to" binary logger 107 }
行108-132是运行TCP/IP侦听器的主函数。行109使Go运行环境使用所有物理可用的CPU。
108 func main() { 109 runtime.GOMAXPROCS(runtime.NumCPU()) 110 flag.Parse() 111 if flag.NFlag() != 3 { 112 fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_post=local_port\n") 113 flag.PrintDefaults() 114 os.Exit(1) 115 } 116 target := net.JoinHostPort(*host, *port) 117 fmt.Printf("Start listening on port %s and forwarding data to %s\n", *listen_port, target) 118 ln, err := net.Listen("tcp", ":"+*listen_port) 119 if err != nil { 120 fmt.Printf("Unable to start listener, %v\n", err) 121 os.Exit(1) 122 } 123 conn_n := 1 124 for { 125 if conn, err := ln.Accept(); err == nil { 126 go process_connection(conn, conn_n, target) 127 conn_n += 1 128 } else { 129 fmt.Printf("Accept failed, %v\n", err) 130 } 131 } 132 }
就这些,只有132行。请注意:程序仅使用了Go语言自身的标准库。
现在准备运行:
go run gotcpspy.go -host pop.yandex.ru -port 110 -local_port 8080
输出应为:
Start listening on port 8080 and forwarding data to pop.yandex.ru:110
然后在另一个窗口运行:
telnet localhost 8080
双向十六进制导出日志文件 log-2012.04.20-19.55.17-0001-192.168.1.41 -49544-213.180.204.37-110.log:
Connected to pop.yandex.ru:110 at 2012.04.20-19.55.17 Received (#0, 00000000) 38 bytes from 192.168.1.41-49544 00000000 2b 4f 4b 20 50 4f 50 20 59 61 21 20 76 31 2e 30 |+OK POP Ya! v1.0| 00000010 2e 30 6e 61 40 32 36 20 48 74 6a 4a 69 74 63 50 |.0na@26 HtjJitcP| 00000020 52 75 51 31 0d 0a |RuQ1..| Sent (#0) to [--1]-8080 Received (#0, 00000000) 11 bytes from [--1]-8080 00000000 55 53 45 52 20 74 65 73 74 0d 0a |USER test..| Sent (#0) to 192.168.1.41-49544 Received (#1, 00000026) 23 bytes from 192.168.1.41-49544 00000000 2b 4f 4b 20 70 61 73 73 77 6f 72 64 2c 20 70 6c |+OK password, pl| 00000010 65 61 73 65 2e 0d 0a |ease...| Sent (#1) to [--1]-8080 Received (#1, 0000000B) 11 bytes from [--1]-8080 00000000 50 41 53 53 20 6e 6f 6e 65 0d 0a |PASS none..| Sent (#1) to 192.168.1.41-49544 Received (#2, 0000003D) 72 bytes from 192.168.1.41-49544 00000000 2d 45 52 52 20 5b 41 55 54 48 5d 20 6c 6f 67 69 |-ERR [AUTH] logi| 00000010 6e 20 66 61 69 6c 75 72 65 20 6f 72 20 50 4f 50 |n failure or POP| 00000020 33 20 64 69 73 61 62 6c 65 64 2c 20 74 72 79 20 |3 disabled, try | 00000030 6c 61 74 65 72 2e 20 73 63 3d 48 74 6a 4a 69 74 |later. sc=HtjJit| 00000040 63 50 52 75 51 31 0d 0a |cPRuQ1..| Sent (#2) to [--1]-8080 Disconnected from 192.168.1.41-49544 Disconnected from [--1]-8080 Finished at 2012.04.20-19.55.17, duration 5.253979s
二进制输出数据日志文件: log-binary-2012.04.20-19.55.17-0001 -192.168.1.41-49544.log:
USER test PASS none
二进制输入数据日志文件: log-binary-2012.04.20-19.55.17 -0001-213.180.204.37-110.log:
+OK POP Ya! v1.0.0na@26 HtjJitcPRuQ1 +OK password, please. -ERR [AUTH] login failure or POP3 disabled, try later. sc=HtjJitcPRuQ
貌似可以工作,让我们通过直接下载一个较大的二进制文件和通过代理下载测试一下程序性能。
直接下载(文件大小约72MB):
time wget http://www.erlang.org/download/otp_src_R15B01.tar.gz ... Saving to: `otp_src_R15B01.tar.gz' ... real 1m2.819s
现在启动代理,然后通过代理下载文件:
go run gotcpspy.go -host=www.erlang.org -port=80 -listen_port=8080
下载:
time wget http://localhost:8080/download/otp_src_R15B01.tar.gz ... Saving to: `otp_src_R15B01.tar.gz.1' ... real 0m56.209s
比较结果:
diff otp_src_R15B01.tar.gz otp_src_R15B01.tar.gz.1
两者匹配,说明程序正确工作。
来看性能。我在我的Mac Air笔记本上重复这个实验许多次。令人惊讶的是,通过代理下载文件比直接下载还快一点。在上面的例子:1m2819s(直接)vs 0m.56209s (通过代理)。我能想到的唯一的解释是wget是单线程的,它在一个线程内多路复用输入输出数据流。相比,代理处理数据流在各自的线程中,这可能导致一点提速。但是这种差别很小,几乎可以忽略不计,在其他计算机或网络这种差异可能完全消失。主要的结论是:
通过代理下载没有降低程序运行速度,尽管有创建大量日志文件的额外开销。
总体来看,我希望你从简单性和清晰度的角度来看这个程序。上面已经指出,但我再次强调:在这个程序中,我是渐进地使用了多线程。问题的实质促使我在处理一个连接时识别出并行活动,然后Go语言并行机制的易用性和安全性使并行得以实现。最后,使用并行我并没有考虑效率与复杂性(比较难调试)间的权衡。
我同意,有时一个问题只需改变位和字节,这时代码的线性效率是你唯一需要关心的。但你遇到越来越多的问题,并行能力、多线程处理成为关键因素,对于这种应用,Go语言非常耀眼。
程序整体代码 gotcpspy.go :
package main import ( "encoding/hex" "flag" "fmt" "net" "os" "runtime" "strings" "time" ) var ( host *string = flag.String("host", "", "target host or address") port *string = flag.String("port", "0", "target port") listen_port *string = flag.String("listen_port", "0", "listen port") ) func die(format string, v ...interface{}) { os.Stderr.WriteString(fmt.Sprintf(format+"\n", v...)) os.Exit(1) } func connection_logger(data chan []byte, conn_n int, local_info, remote_info string) { log_name := fmt.Sprintf("log-%s-%04d-%s-%s.log", format_time(time.Now()), conn_n, local_info, remote_info) logger_loop(data, log_name) } func binary_logger(data chan []byte, conn_n int, peer string) { log_name := fmt.Sprintf("log-binary-%s-%04d-%s.log", format_time(time.Now()), conn_n, peer) logger_loop(data, log_name) } func logger_loop(data chan []byte, log_name string) { f, err := os.Create(log_name) if err != nil { die("Unable to create file %s, %v\n", log_name, err) } defer f.Close() for { b := <-data if len(b) == 0 { break } f.Write(b) f.Sync() } } func format_time(t time.Time) string { return t.Format("2006.01.02-15.04.05") } func printable_addr(a net.Addr) string { return strings.Replace(a.String(), ":", "-", -1) } type Channel struct { from, to net.Conn logger, binary_logger chan []byte ack chan bool } func pass_through(c *Channel) { from_peer := printable_addr(c.from.LocalAddr()) to_peer := printable_addr(c.to.LocalAddr()) b := make([]byte, 10240) offset := 0 packet_n := 0 for { n, err := c.from.Read(b) if err != nil { c.logger <- []byte(fmt.Sprintf("Disconnected from %s\n", from_peer)) break } if n > 0 { c.logger <- []byte(fmt.Sprintf("Received (#%d, %08X) %d bytes from %s\n", packet_n, offset, n, from_peer)) c.logger <- []byte(hex.Dump(b[:n])) c.binary_logger <- b[:n] c.to.Write(b[:n]) c.logger <- []byte(fmt.Sprintf("Sent (#%d) to %s\n", packet_n, to_peer)) offset += n packet_n += 1 } } c.from.Close() c.to.Close() c.ack <- true } func process_connection(local net.Conn, conn_n int, target string) { remote, err := net.Dial("tcp", target) if err != nil { fmt.Printf("Unable to connect to %s, %v\n", target, err) } local_info := printable_addr(remote.LocalAddr()) remote_info := printable_addr(remote.RemoteAddr()) started := time.Now() logger := make(chan []byte) from_logger := make(chan []byte) to_logger := make(chan []byte) ack := make(chan bool) go connection_logger(logger, conn_n, local_info, remote_info) go binary_logger(from_logger, conn_n, local_info) go binary_logger(to_logger, conn_n, remote_info) logger <- []byte(fmt.Sprintf("Connected to %s at %s\n", target, format_time(started))) go pass_through(&Channel{remote, local, logger, to_logger, ack}) go pass_through(&Channel{local, remote, logger, from_logger, ack}) <-ack // Make sure that the both copiers gracefully finish. <-ack // finished := time.Now() duration := finished.Sub(started) logger <- []byte(fmt.Sprintf("Finished at %s, duration %s\n", format_time(started), duration.String())) logger <- []byte{} // Stop logger from_logger <- []byte{} // Stop "from" binary logger to_logger <- []byte{} // Stop "to" binary logger } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) flag.Parse() if flag.NFlag() != 3 { fmt.Printf("usage: gotcpspy -host target_host -port target_port -listen_port=local_port\n") flag.PrintDefaults() os.Exit(1) } target := net.JoinHostPort(*host, *port) fmt.Printf("Start listening on port %s and forwarding data to %s\n", *listen_port, target) ln, err := net.Listen("tcp", ":"+*listen_port) if err != nil { fmt.Printf("Unable to start listener, %v\n", err) os.Exit(1) } conn_n := 1 for { if conn, err := ln.Accept(); err == nil { go process_connection(conn, conn_n, target) conn_n += 1 } else { fmt.Printf("Accept failed, %v\n", err) } } }
原文地址:http://pragprog.com/magazines/2012-06/the-beauty-of-concurrency-in-go
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。