书籍:The Way To Go,第四部分

Channels

var ch1 chan string
ch1 = make(chan string)
ch1 := make(chan string)
buf := 100
ch1 := make(chan string, buf)
chanOfChans := make(chan chan int)
funcChan := chan func()
func main() {
    ch := make(chan string)
    go sendData(ch)
    go getData(ch)
    time.Sleep(1e9)
}
func sendData(ch chan string) {
    ch <- “Washington”
    ch <- “Tripoli”
    ch <- “London”
}
func getData(ch chan string) {
    var input string
    for { 
        input = <-ch; 
        fmt.Printf("%s ", input) 
    }
}
  • Semaphore pattern

type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)      // semaphore 
...
for i, xi := range data {
     go func (i int, xi float64) {
           res[i] = doSomething(i,xi)
           sem <- empty
     } (i, xi)
}
for i := 0; i < N; i++ {      // wait for goroutines to finish
     <-sem 
}
  • Channel Factory pattern

func main() {
    stream := pump()
    go suck(stream)        // shortened : go suck( pump() )
    time.Sleep(1e9)
}
func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i
        }
    }()
    return ch
}
func suck(ch chan int) {
    for {
        fmt.Println(<-ch)
    }
}
func suck(ch chan int) {
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()
}
  • Channel Directionality

// channel can only receive data and cannot be closed
var send_only chan<- int
var recv_only <-chan int        // channel can only send data
...
var c = make(chan int)          // bidirectional
go source(c)
go sink(c)
func source(ch chan<- int) {
    for { ch <- 1 }
}
func sink(ch <-chan int) {
    for { <-ch }
}
...
// closing a channel
func sendData(ch chan string) {
     ch <- "Washington"
     ch <- "Tripoli"
     ch <- "London"
     ch <- "Beijing"
     ch <- "Tokio"
     close(ch)
}
func getData(ch chan string) {
     for {
          input, open := <-ch
          if !open {
               break
          }
          fmt.Printf("%s ", input)
     }
}
  • Switching between goroutines with select

select {
case u:= <- ch1:
     ...
case v:= <- ch2:
 ...
default: // no value ready to be received
  ...
}
  1. if all are blocked, it waits until one can proceed 

  2. if multiple can proceed, it chooses one at random.

  3. when none of the channel operations can proceed and the default clause is present, then this is executed: the default is always runnable (that is: ready to execute). Using a send operation in a select statement with a default case guarantees that the send will be non-blocking!

  • channels with timeouts and tickers

// func Tick(d Duration) <-chan Time
import "time"
rate_per_sec := 10
var dur Duration = 1e8          // rate_per_sec
chRate := time.Tick(dur)        // every 1/10th of a second
for req := range requests {
    <- chRate                   // rate limit our Service.Method RPC calls
    go client.Call("Service.Method", req, ...)
}
// func After(d Duration) <-chan Time
func main() {
     tick := time.Tick(1e8)
     boom := time.After(5e8)
    for {
        select {
            case <-tick:
                fmt.Println(“tick.”)
            case <-boom:
                fmt.Println(“BOOM!”)
                return
            default:
                fmt.Println(“    .”)
                time.Sleep(5e7)
         }
     }
}
  • using recover with goroutines

func server(workChan <-chan *Work) {
    for work := range workChan {
        go safelyDo(work) 
    }
}
func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            log.Printf(“work failed with %s in %v:”, err, work)
        }
    }()
    do(work)
}

Tasks and Worker Processes

type Pool struct {
    Mu    sync.Mutex
    Tasks []Task
}
func Worker(pool *Pool) { 
    for {
        pool.Mu.Lock()
        // begin critical section:
        task := pool.Tasks[0]            // take the first task
        pool.Tasks = pool.Tasks[1:]      // update the pool
        // end critical section
        pool.Mu.Unlock()
        process(task)
    }
}
func main() {
    pending, done := make(chan *Task), make(chan *Task)
    go sendWork(pending)         // put tasks with work
    for i := 0; i < N; i++ {     // start N goroutines to do
        go Worker(pending, done)
    }
    consumeWork(done)
}
func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}
  • rule - use locking (mutexes) when: caching information in a shared data structure; holding state information;

  • rule - use channels when: communicating asynchronous results; distributing units of work; passing ownership of data;

lazy generator

var resume chan int
func integers() chan int {
    yield := make (chan int)
    count := 0
    go func () {
        for {
            yield <- count
            count++
        }
    } ()
    return yield
}
func generateInteger() int {
    return <-resume
}
func main() {
    resume = integers()
    fmt.Println(generateInteger())     //=> 0
    fmt.Println(generateInteger())     //=> 1
}

Benchmarking goroutines

func main() {
     fmt.Println("sync", testing.Benchmark(BenchmarkChannelSync).String())
     fmt.Println("buffered",  
     testing.Benchmark(BenchmarkChannelBuffered).String())
}
func BenchmarkChannelSync(b *testing.B) {
     ch := make(chan int)
     go func() {
          for i := 0; i < b.N; i++ {
               ch <- i
          }
          close(ch)
     }()
     for _ = range ch {
     }
}
func BenchmarkChannelBuffered(b *testing.B) {
     ch := make(chan int, 128)
     go func() {
          for i := 0; i < b.N; i++ {
               ch <- i
          }
          close(ch)
     }()
     for _ = range ch {
     }
}
// Output:
// Windows:       N         Time 1 op        Operations per sec
// sync           1000000   2443 ns/op  -->  409 332 / s
// buffered       1000000   4850 ns/op  -->  810 477 / s
  • implement a mutex

/* mutexes */
func (s semaphore) Lock() {
    s.P(1)
}
func (s semaphore) Unlock() {
     s.V(1)
}
/* signal-wait */
func (s semaphore) Wait(n int) { 
     s.P(n)
}
func (s semaphore) Signal() {
    s.V(1)
}


本文来自:开源中国博客

感谢作者:月光独奏

查看原文:书籍:The Way To Go,第四部分

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。