Go语言:channel


channel是Go语言中类型安全的内置类型;两个goruntine可以通过channle同步的进行消息通信;想必你一定听过Go 的创始人之一 Rob Pike的那句名言:

Do not communicate by sharing memory; instead, share memory by communicating; 不要通过共享内存的方式通信,而是要通过通信的方式共享数据

channel的设计和使用充分体现了这样Go语言哲学思想;下面我们来了解一下channel的基本类型和常见用法。

无缓冲 Channels

ch := make(chan struct{})

无缓冲chan的特点是发送端和接受端必须同时准备就绪才可以发送数据;因此: 1、如果发送端goroutine发送数据时如果接收端为没有goroutine准备就绪时发送端会阻塞 2、同样的当接收端goroutine从channel中读取数据时,如果发送端没有发送数据,接收端将阻塞。 84e1e817-96f7-4d8f-8ce0-21860780e078.png 看一下下面这段代码会发生什么情况?

ch := make(chan struct{})
ch <- struct{}{}
go func() {
    <-ch
    fmt.Println("receive message")
}()

显然在第二行 写入chan时 接收端的goroutine并没有就绪,因此此处会一直等待,导致接下来接收端无法正常启动。从而造成死锁的情况;这种情况下可以怎么解决呢?一直方式是新启动一个goroutine进行写入操作;例外一种就是将ch的容量改为1;和也就是接下来我们要介绍的有缓冲chan。

有缓冲Channels

ch := make(chan struct{},10)

有缓冲Channel具有容量,因此其行为与无缓冲channel不同。当 goroutine 向缓冲chan发送数据时,如果chan已经写满则该goroutine将阻塞直到chan可写入。如果chan中有空间,可以立即发送goroutine 可以继续执行。当goroutine 从缓冲chan读取数据时,如果缓冲chan为空,则该goroutine阻塞直到有数据写入。 f6240a8b-d9ee-443c-b25e-ee3edca228e1.png

ch := make(chan struct{},1)
ch <- struct{}{}
go func() {
    <-ch
    fmt.Println("receive message")
}()

单向Channel

上面的示例中我们用到的都是双向channel,既可以写入也可以读取;接下来我们介绍的时单向channel,也就是只能读取或者只能写入的通道; 1、只能读取 <-chan 2、只能写入 chan<-

func read(ch <-chan string) {
	fmt.Println("read from channel: " + <-ch)
}
func write(ch chan<- string) {
	ch <- "message"
	fmt.Println("write to channel ")
}

func main() {
	ch1 := make(chan string, 1)
	go write(ch1)
	go read(ch1)

	time.Sleep(5 * time.Second)
}

需要注意的时双向chan可以转成单项chan,而单向chan不能转成双向chan。

常见用法:扇入模式

这里的扇入是指有多个Channel 输入、一个 Channel 输出。 此处引用“Go Concurrency Patterns”中的示例;将多个chan合并到 out chan;通过WaitGroup等待全部chan的数据写入out后关闭out chan;

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	wg.Add(len(cs))
	for _, c := range cs {
		go func(c <-chan int) {
			for n := range c {
				out <- n
			}
			wg.Done()
		}(c)
	}
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

常见用法:扇出模式

扇出模式是和扇入模式相反的。扇出模式只有一个输入 Channel,有多个目标 Channel即一对多的关系。

func fanOut(ch <-chan int, out []chan int) {
	var wg sync.WaitGroup
	for v := range ch { // 从输入chan中读取数据
		for i := 0; i < len(out); i++ {
			wg.Add(1)
			go func(v int, idx int) {
				defer wg.Done()
				out[idx] <- v
			}(v, i)
		}
	}
	wg.Wait()
	for i := 0; i < len(out); i++ {
		close(out[i])
	}

}

反射操作Channel

一般处理单个channel会用select监听;或者for range结构读取;如果channl数量比较多或者数量不确定,可以使用反射操作channel

func createCases(chs ...chan int) []reflect.SelectCase {
	var cases []reflect.SelectCase

	// 创建recv case
	for _, ch := range chs {
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(ch),
		})
	}

	// 创建send case
	for i, ch := range chs {
		v := reflect.ValueOf(i)
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectSend,
			Chan: reflect.ValueOf(ch),
			Send: v,
		})
	}

	return cases
}

func main() {
	var ch1 = make(chan int, 10)
	var ch2 = make(chan int, 10)

	// 创建SelectCase
	var cases = createCases(ch1, ch2)
	go func() {
		for {
			ch1 <- 1
			time.Sleep(2 * time.Second)
		}
	}()
	go func() {
		for {
			ch1 <- 2
			time.Sleep(2 * time.Second)
		}
	}()
	for {
		chosen, recv, ok := reflect.Select(cases)
		if recv.IsValid() { // recv case
			fmt.Println("recv:", cases[chosen].Dir, recv, ok)
		} else { // send case
			fmt.Println("send:", cases[chosen].Dir, ok)
		}
		time.Sleep(time.Second)
	}
}
如有疑问关注公众号给我留言
wx

关注公众号

©2017-2023 鲁ICP备17023316号-1 Powered by Hugo