Go——简单说说goroutine和channel

Go 语言中的 channel 是实现 goroutine 间无锁通信的关键机制,他使得写多线程并发程序变得简单、灵活、触手可得,今天我们来简单说说goroutine和channel

引言:

  • Go 语言中的 channel 是实现 goroutine 间无锁通信的关键机制,他使得写多线程并发程序变得简单、灵活、触手可得

channel是消息传递的机制,用于多线程环境下lock free synchronization.

  • 它同时具备2个特性:
    1. 消息传递
    2. 同步

channel

channel 分类: 不带缓存 channel, 带缓存 channel

无缓冲的与有缓冲channel有着重大差别,那就是一个是同步的 一个是非同步的。

c1:=make(chan int)         无缓冲
c2:=make(chan int,1)      有缓冲
c1<-1
  • 无缓冲: 不仅仅是向 c1 通道放 1,而是一直要等有别的携程 <-c1 接手了这个参数,那么c1<-1才会继续下去,要不然就一直阻塞着。
  • 有缓冲: c2<-1 则不会阻塞,因为缓冲大小是1(其实是缓冲大小为0),只有当放第二个值的时候,第一个还没被人拿走,这时候才会阻塞。
不带缓存 channel
(a)创建channel
    make(chan type)
    e.g.   ch := make(chan int)
(b)通信方式(由于chan操作类似于Queue,为便于理解这里用EnQueue,DeQueue来描述通信操作)
    EnQueue:    ch <- typevar        DeQueue:   var :=  <- ch
    e.g.    ch <- 1    v := <-ch
  • 关键:
    • 调用channel EnQueue 操作之后被阻塞住(不管channel是否为空),直到写的数据被读取掉。
    • 调用channel DeQueue 操作时,如果channel中有数据则被读出,如果为空则阻塞住,直到有人往里面EnQueue数据。
带缓存 channel
(a)创建channel
    make(chan type, size)
    e.g.  ch := make(chan int, 9)
(b)通信方式
    同不带缓存channel
  • 关键:
    • 当channel中元素小于等于channel size时,调用channel EnQueue 操作后数据被放入到缓存中(非阻塞);
    • 当channel满以后,如果再调用EnQueue操作就会被阻塞住直到有元素被DeQueue出来。
    • 调用channel DeQueue 操作时,如果channel 为空则阻塞住直到有人往里面EnQueue数据,否则直接DeQueue出元素。

注意:

需要特别注意的是两者对于range操作的区别:

无缓存channel是EnQueue一个数据被range读一个;

而带缓存channel是EnQueue满之后被range整个一起拿出来用(这个机制对于用户是透明的,用户看到的还是一个一个拿出来),或者timeout时间到之后即使channel没有满也会被range拿出来。

另外,channel 关闭之后,循环读channel操作(e.g. for v:=range channel) 读完channel中剩余数据会自动跳出循环。

  • 参考网络介绍:

顾名思义,就是通道。通道的目的是用来传递数据。在一个通道上我们可以执行数据的发送(Send)和接受(Receive)操作。对于非缓冲的 channel 而言,Receive 方法执行时,会判断该通道上是否有值,如果没有就会等待(阻塞),直到有一个值为止。同样,在 channel 上有值,而尚未被一个 Receiver 接受的时候,Send 方法也会阻塞,直到 Channel 变空。这样,通过一个简单的机制就可以保证 Send 和 Receive 总是在不同的时间执行的,而且只有 Send 之后才能 Receive. 这样就避免了常规的多线程编程中数据共享的问题。正如 Go 语言的文档一句话所说:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来沟通;而是通过沟通来共享内存。

在常规的多线程编程里,我们总是定义好一些类变量,如果这些变量有可能被多个线程同时访问,那么就需要加锁。这样带来了一定的编程复杂性,如果代码写的稍有bug,则会导致读/写到错误的值。

而通过 channel 来沟通,我们得到了一个更为清晰的沟通方式。两个线程(或者 goroutine)要读写相同的数据,则创建一个通道,双方通过对这个通道执行 Send / Receive 的操作来设值或取值即可,相对而言,比较不容易出错。

goroutine

  • goroutine的并发模型定义为以下几个要点:
    • 基于Thread的轻量级协程
    • 通过channel来进行协程间的消息传递
    • 只暴露协程,屏蔽线程操作的接口

goroutine原理

Golang的runtime实现了goroutine和OS thread的M:N模型,因此实际的goroutine是基于线程的更加轻量级的实现,我们便可以在Golang中大量创建goroutine而不用担心昂贵的context swtich所带来的开销。goroutine之间,我们可以通过channel来进行交互。由于go已将将所有system call都wrap到了标准库中,在针对这些systemcall进行调用时会主动标记goroutine为阻塞状态并保存现场,交由scheduler执行。所以在golang中,在大部分情况下我们可以非常安心地在goroutine中使用阻塞操作而不用担心并发性受到影响。

在操作系统的OS Thread和编程语言的User Thread之间,实际上存在3中线程对应模型,也就是:1:1,1:N,M:N。

  • N:1是说,多个(N)用户线程始终在一个内核线程上跑,context上下文切换确实很快,但是无法真正的利用多核。
  • 1:1是说,一个用户线程就只在一个内核线程上跑,这时可以利用多核,但是上下文switch很慢,频繁切换效率很低。
  • M:N是说, 多个goroutine在多个内核线程上跑,这个看似可以集齐上面两者的优势,但是无疑增加了调度的难度。

goroutine google runtime默认的实现为M:N的模型,于是这样可以根据具体的操作类型(操作系统阻塞或非阻塞操作)调整goroutine和OS Thread的映射情况,显得更加的灵活。

在goroutine实现中,有三个最重要的数据结构,分别为G M P:
G:代表一个goroutine
M:代表 一个OS Thread
P:一个P和一个M进行绑定,代表在这个OS Thread上的调度器

goroutine - 可以大致理解为一种轻量级的线程(或微线程),它是一种“分配在同一个地址空间内的,能够并行执行的函数”。同时,它是轻量级的,不需要像分配线程那样分配独立的栈空间。所以理论上讲,我们可以很容易的分配很多个 goroutine, 让它们并发执行,而其开销则比多线程程序要小得多,从而可以让程序支持比较大的并发性。

goroutinue,本质上就是协程。但有两点不同:
  1. goroutinue可以实现并行,也就是说,多个协程可以在多个处理器同时跑。而协程同一时刻只能在一个处理器上跑(把宿主语言想象成单线程的就好了)。
  2. goroutine之间的通信是通过channel,而协程的通信是通过yield和resume()操作。
那么如果有两个A,B两个协程 在放入和取出数据时都只能用int类型的数据进行通信
  • 如 取数据 <- channel 放数据 channel <- 1

协程之间可以通过在通道中放入-取出数据的方式进行通信

3种优雅的Go channel用法

写Go的人应该都听过Rob Pike的这句话

Do not communicate by sharing memory; instead, share memory by communicating.

相信很多朋友和我一样,在实际应用中总感觉不到好处,为了用channel而用。但以我的切身体会来说,这是写代码时碰到的场景不复杂、对channel不熟悉导致的,所以希望这篇文章能给大家带来点新思路,对Golang优雅的channel有更深的认识 :)

Fan In/Out

数据的输出有时候需要做扇出/入(FanIn/Out),但是在函数中调用常常得修改接口,而且上下游对于数据的依赖程度非常高,所以一般使用通过channel进行Fan In/Out,这样就可以轻易实现类似于shell里的管道。

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
                case s := <-input1:  c <- s
                case s := <-input2:  c <- s
            }
        }
    }()
    return c
}

同步Goroutine

两个goroutine之间同步状态,例如A goroutine需要让B goroutine退出,一般做法如下:

func main() {
    g = make(chan int)
    quit = make(chan bool)
    go B()
    for i := 0; i < 3; i++ {
        g <- i
    }
    quit <- true // 没办法等待B的退出只能Sleep
    fmt.Println("Main quit")
}

func B() {
    for {
        select {
            case i := <-g:
            fmt.Println(i + 1)
            case <-quit:
            fmt.Println("B quit")
            return
        }
    }
}
Log
/*
Output:
1
2
3
Main quit
*/

可是了main函数没办法等待B合适地退出,所以B quit 没办法打印,程序直接退出了。然而,chan是Go里的第一对象,所以可以把chan传入chan中,所以上面的代码可以把quit 定义为chan chan bool,以此控制两个goroutine的同步

func main() {
    g = make(chan int)
    quit = make(chan chan bool)
    go B()
    for i := 0; i < 5; i++ {
        g <- i
    }
    wait := make(chan bool)
    quit <- wait
    <-wait //这样就可以等待B的退出了
    fmt.Println("Main Quit")
}

func B() {
    for {
        select {
            case i := <-g:
            fmt.Println(i + 1)
            case c := <-quit:
            c <- true
            fmt.Println("B Quit")
            return
        }
    }
}
Log
/* Output
1
2
3
B Quit
Main Quit
*/

分布式递归调用

在现实生活中,如果你要找美国总统聊天,你会怎么做?第一步打电话给在美国的朋友,然后他们也会发动自己的关系网,再找可能认识美国总统的人,以此类推,直到找到为止。这在Kadmelia分布式系统中也是一样的,如果需要获取目标ID信息,那么就不停地查询,被查询节点就算没有相关信息,也会返回它觉得最近节点,直到找到ID或者等待超时。 好了,这个要用Go来实现怎么做呢?

func recursiveCall(ctx context.Context, id []byte, initialNodes []*node){
    seen := map[string]*node{} //已见过的节点记录
    request := make(chan *node, 3) //设置请求节点channel

    // 输入初始节点
    go func() {
        for _, n := range initialNodes {
            request <- n
        }
    }()

    OUT:
    for {
        //循环直到找到数据
        if data != nil {
            return
        }

        // 在新的请求,超时和上层取消请求中select
        select {
            case n := <-request:
            go func() {
                // 发送新的请求
                response := s.sendQuery(ctx, n, MethodFindValue, id)
                select {
                    case <-ctx.Done():
                    case msg :=<-response:
                    seen[responseToNode(response)] = n //更新已见过的节点信息
                    // 加载新的节点
                    for _, rn := range LoadNodeInfoFromByte(msg[PayLoadStart:]) {
                        mu.Lock()
                        _, ok := seen[rn.HexID()]
                        mu.Unlock()
                        // 见过了,跳过这个节点
                        if ok {
                            continue
                        }
                        AddNode(rn)
                        // 将新的节点送入channel
                        request <- rn
                        }
                    }
                }
            }()
            case <-time.After(500 * time.Millisecond):
            break OUT // break至外层,否则仅仅是跳至loop外
            case <-ctx.Done():
            break OUT
        }
    }
    return
}

这时的buffered channel类似于一个局部queue,对需要的节点进行处理,但这段代码的精妙之处在于,这里的block操作是select的,随时可以取消,而不是要等待或者对queue的长度有认识。

坚持原创技术分享,您的支持将鼓励我继续创作!