深入剖析Go語言中的Channel:高級特性與注意事項

阿琪說 發佈 2024-01-09T10:48:26.343824+00:00

#從今天起記錄我的2023#一、channel的基本原理chan 類型的數據結構如下圖所示:下面我們先來具體解釋各個欄位的含義:qcount:代表 chan 中已經接收但還沒被取走的元素的個數。內建函數 len 可以返回這個欄位的值。dataqsiz:隊列的大小。

一、channel的基本原理

chan 類型的數據結構如下圖所示:

下面我們先來具體解釋各個欄位的含義:

  • qcount:代表 chan 中已經接收但還沒被取走的元素的個數。內建函數 len 可以返回這個欄位的值。dataqsiz:隊列的大小。chan 使用一個循環隊列來存放元素,循環隊列很適合這種生產者 - 消費者的場景(我很好奇為什麼這個欄位省略 size 中的 e)。
  • buf:存放元素的循環隊列的 buffer。elemtype 和 elemsize:chan 中元素的類型和 size。因為 chan 一旦聲明,它的元素類型是固定的,即普通類型或者指針類型,所以元素大小也是固定的。
  • sendx:處理髮送數據的指針在 buf 中的位置。一旦接收了新的數據,指針就會加上 elemsize,移向下一個位置。buf 的總大小是 elemsize 的整數倍,而且 buf 是一個循環列表。
  • recvx:處理接收請求時的指針在 buf 中的位置。一旦取出數據,此指針會移動到下一個位置。
  • recvq:chan 是多生產者多消費者的模式,如果消費者因為沒有數據可讀而被阻塞了,就會被加入到 recvq 隊列中。
  • sendq:如果生產者因為 buf 滿了而阻塞,會被加入到 sendq 隊列中。

二、channel的高級特性

1.雙向 Channel 和單向 Channel

在使用 Channel 時,我們可以將其定義為雙向 Channel 或者單向 Channel。雙向 Channel 可以進行發送和接收操作,而單向 Channel 只能進行發送或者接收操作。

雙向 Channel 的聲明方式如下:

var ch chan int // 雙向 Channel

單向 Channel 的聲明方式如下:

var sendCh chan<- int // 只能用於發送數據的 Channel
var recvCh <-chan int // 只能用於接收數據的 Channel

2.帶緩衝區的 Channel 和無緩衝區的 Channel

在創建 Channel 時,我們可以指定緩衝區的大小。如果緩衝區大小為 0,則代表該 Channel 沒有緩衝區,也就是無緩衝區的 Channel。如果緩衝區大小大於 0,則代表該 Channel 是帶緩衝區的 Channel。

帶緩衝區的 Channel 可以在緩衝區未滿時進行發送操作,而無緩衝區的 Channel 只能在接收操作完成前進行發送操作,否則會阻塞當前 goroutine。

3.Range 遍歷 Channel

在 Go 語言中,我們可以使用 for range 語句來遍歷 Channel,這種方式可以避免手動進行循環和判斷 Channel 是否關閉的操作。

語法如下:

for data := range ch {
    // 處理從 ch 中接收到的數據
}

當 Channel 被關閉後,for range 就會自動退出循環。

4.Channel 的選擇器

在使用多個 Channel 進行數據交換時,我們可以使用 Channel 的選擇器來實現非阻塞式的數據交換。選擇器會等待多個 Channel 中的一個可用,並執行相應的操作,如果多個 Channel 同時可用,則隨機選擇一個進行操作。

選擇器的語法如下:

select {
case data, ok := <-ch1:
    if ok {
        // 處理從 ch1 接收到的數據
    } else {
        // ch1 已經關閉
    }
case data, ok := <-ch2:
    if ok {
        // 處理從 ch2 接收到的數據
    } else {
        // ch2 已經關閉
    }
case ch3 <- data:
    // 向 ch3 發送數據 data
default:
    // 如果所有 Channel 都沒有數據可接收或者發送,執行 default 分支
}

5.Channel 的超時機制

在使用 Channel 進行數據交換時,我們可以使用帶有超時機制的操作來避免 goroutine 的長時間阻塞。在 Go 語言中,我們可以使用 time 包中的 Timer 和 Ticker 來實現超時機制。

Timer 用於等待一段時間後執行某個操作,而 Ticker 則用於每隔一段時間執行某個操作。

Timer 的語法如下:

timer := time.NewTimer(time.Second) // 創建一個 1 秒鐘的 Timer
<-timer.C // 等待 Timer 的時間到達

Ticker 的語法如下:

for range ticker.C {
    // 每秒鐘執行一次的操作
}

三、使用 Channel 容易犯的錯誤

首先,我們來總結下會 panic 的情況,總共有 3 種:

  • close 為 nil 的 chan;
  • send 已經 close 的 chan;
  • close 已經 close 的 chan。

goroutine 泄漏的問題也很常見,下面的代碼也是一個實際項目中的例子:

func process(timeout time.Duration) bool {
    ch := make(chan bool)

    go func() {
        // 模擬處理耗時的業務
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

在這個例子中,process 函數會啟動一個 goroutine,去處理需要長時間處理的業務,處理完之後,會發送 true 到 chan 中,目的是通知其它等待的 goroutine,可以繼續處理了。

我們來看一下第 10 行到第 15 行,主 goroutine 接收到任務處理完成的通知,或者超時後就返回了。

如果發生超時,process 函數就返回了,這就會導致 unbuffered 的 chan 從來就沒有被讀取。我們知道,unbuffered chan 必須等 reader 和 writer 都準備好了才能交流,否則就會阻塞。

超時導致未讀,結果就是子 goroutine 就阻塞在第 7 行永遠結束不了,進而導致 goroutine 泄漏。解決這個 Bug 的辦法很簡單,就是將 unbuffered chan 改成容量為 1 的 chan,這樣第 7 行就不會被阻塞了。

一般而言,有幾個原則可以記住:

  • 共享資源的並發訪問使用傳統並發原語;
  • 複雜的任務編排和消息傳遞使用 Channel;
  • 消息通知機制使用 Channel,除非只想 signal 一個 goroutine,才使用 Cond;
  • 簡單等待所有任務的完成用 WaitGroup
關鍵字: