一、channel的基本原理
chan 類型的數據結構如下圖所示:
下面我們先來具體解釋各個欄位的含義:
- qcount:代表 chan 中已經接收但還沒被取走的元素的個數。內建函數 len 可以返回這個欄位的值。dataqsiz:隊列的大小。chan 使用一個循環隊列來存放元素,循環隊列很適合這種生產者 - 消費者的場景(我很好奇為什麼這個欄位省略 size 中的 e)。
- buf:存放元素的循環隊列的 buffer。elemtype 和 elemsize:chan 中元素的類型和 size。因為 chan 一旦聲明,它的元素類型是固定的,即普通類型或者指針類型,所以元素大小也是固定的。
- sendx:處理髮送數據的指針在 buf 中的位置。一旦接收了新的數據,指針就會加上 elemsize,移向下一個位置。buf 的總大小是 elemsize 的整數倍,而且 buf 是一個循環列表。
- recvx:處理接收請求時的指針在 buf 中的位置。一旦取出數據,此指針會移動到下一個位置。
- recvq:chan 是多生產者多消費者的模式,如果消費者因為沒有數據可讀而被阻塞了,就會被加入到 recvq 隊列中。
- sendq:如果生產者因為 buf 滿了而阻塞,會被加入到 sendq 隊列中。
二、channel的高級特性
1.雙向 Channel 和單向 Channel
在使用 Channel 時,我們可以將其定義為雙向 Channel 或者單向 Channel。雙向 Channel 可以進行發送和接收操作,而單向 Channel 只能進行發送或者接收操作。
雙向 Channel 的聲明方式如下:
var ch chan int // 雙向 Channel
單向 Channel 的聲明方式如下:
var sendCh chan<- int // 只能用於發送數據的 Channel
var recvCh <-chan int // 只能用於接收數據的 Channel
2.帶緩衝區的 Channel 和無緩衝區的 Channel
在創建 Channel 時,我們可以指定緩衝區的大小。如果緩衝區大小為 0,則代表該 Channel 沒有緩衝區,也就是無緩衝區的 Channel。如果緩衝區大小大於 0,則代表該 Channel 是帶緩衝區的 Channel。
帶緩衝區的 Channel 可以在緩衝區未滿時進行發送操作,而無緩衝區的 Channel 只能在接收操作完成前進行發送操作,否則會阻塞當前 goroutine。
3.Range 遍歷 Channel
在 Go 語言中,我們可以使用 for range 語句來遍歷 Channel,這種方式可以避免手動進行循環和判斷 Channel 是否關閉的操作。
語法如下:
for data := range ch {
// 處理從 ch 中接收到的數據
}
當 Channel 被關閉後,for range 就會自動退出循環。
4.Channel 的選擇器
在使用多個 Channel 進行數據交換時,我們可以使用 Channel 的選擇器來實現非阻塞式的數據交換。選擇器會等待多個 Channel 中的一個可用,並執行相應的操作,如果多個 Channel 同時可用,則隨機選擇一個進行操作。
選擇器的語法如下:
select {
case data, ok := <-ch1:
if ok {
// 處理從 ch1 接收到的數據
} else {
// ch1 已經關閉
}
case data, ok := <-ch2:
if ok {
// 處理從 ch2 接收到的數據
} else {
// ch2 已經關閉
}
case ch3 <- data:
// 向 ch3 發送數據 data
default:
// 如果所有 Channel 都沒有數據可接收或者發送,執行 default 分支
}
5.Channel 的超時機制
在使用 Channel 進行數據交換時,我們可以使用帶有超時機制的操作來避免 goroutine 的長時間阻塞。在 Go 語言中,我們可以使用 time 包中的 Timer 和 Ticker 來實現超時機制。
Timer 用於等待一段時間後執行某個操作,而 Ticker 則用於每隔一段時間執行某個操作。
Timer 的語法如下:
timer := time.NewTimer(time.Second) // 創建一個 1 秒鐘的 Timer
<-timer.C // 等待 Timer 的時間到達
Ticker 的語法如下:
for range ticker.C {
// 每秒鐘執行一次的操作
}
三、使用 Channel 容易犯的錯誤
首先,我們來總結下會 panic 的情況,總共有 3 種:
- close 為 nil 的 chan;
- send 已經 close 的 chan;
- close 已經 close 的 chan。
goroutine 泄漏的問題也很常見,下面的代碼也是一個實際項目中的例子:
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模擬處理耗時的業務
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println("exit goroutine")
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
在這個例子中,process 函數會啟動一個 goroutine,去處理需要長時間處理的業務,處理完之後,會發送 true 到 chan 中,目的是通知其它等待的 goroutine,可以繼續處理了。
我們來看一下第 10 行到第 15 行,主 goroutine 接收到任務處理完成的通知,或者超時後就返回了。
如果發生超時,process 函數就返回了,這就會導致 unbuffered 的 chan 從來就沒有被讀取。我們知道,unbuffered chan 必須等 reader 和 writer 都準備好了才能交流,否則就會阻塞。
超時導致未讀,結果就是子 goroutine 就阻塞在第 7 行永遠結束不了,進而導致 goroutine 泄漏。解決這個 Bug 的辦法很簡單,就是將 unbuffered chan 改成容量為 1 的 chan,這樣第 7 行就不會被阻塞了。
一般而言,有幾個原則可以記住:
- 共享資源的並發訪問使用傳統並發原語;
- 複雜的任務編排和消息傳遞使用 Channel;
- 消息通知機制使用 Channel,除非只想 signal 一個 goroutine,才使用 Cond;
- 簡單等待所有任務的完成用 WaitGroup