Goroutine
Go語言的主要的功能在於令人簡易使用的並行設計,這個方法叫做Goroutine,通過Goroutine能夠讓你的程序以異步的方式運行,而不需要擔心一個函數導致程序中斷,因此Go語言也非常地適合網絡服務。
我們通過go讓其中一個函數同步運行,如此就不需要等待該函數運行完後才能運行下一個函數。
func main() {
// 通過 `go`,我們可以把這個函數異步執行,這樣就不會阻塞往下執行。
go loop()
// 執行 Other
}
Goroutine是類似線程的概念(但Goroutine並不是線程)。線程屬於系統層面,通常來說創建一個新的線程會消耗較多的資源且管理不易。而 Goroutine就像輕量級的線程,但我們稱其為並發,一個Go程序可以運行超過數萬個 Goroutine,並且這些性能都是原生級的,隨時都能夠關閉、結束。一個核心裏面可以有多個Goroutine,通過GOMAXPROCS參數你能夠限制Gorotuine可以占用幾個系統線程來避免失控。
在內置的官方包中也不時能夠看見Goroutine的應用,像是net/http中用來監聽網絡服務的函數實際上是創建一個不斷運行循環的Goroutine。
設置同時執行的cpu數(GOMAXPROCS)
GOMAXPROCS 在調度程序優化後會去掉,默認用系統所有資源。
func main() {
num := runtime.NumCPU() //本地機器的邏輯CPU個數
runtime.GOMAXPROCS(num) //設置可同時執行的最大CPU數,並返回先前的設置
fmt.Println(num)
}
Goroutine中使用recover
應用場景,如果某個goroutine panic了,而且這個goroutine裡面沒有捕獲(recover),那麼整個進程就會掛掉。所以,好的習慣是每當go產生一個goroutine,就需要寫下recover。
var (
domainSyncChan = make(chan int, 10)
)
func domainPut(num int) {
defer func() {
err := recover()
if err != nil {
fmt.Println("error to chan put.")
}
}()
domainSyncChan <- num
panic("error....")
}
func main() {
for i := 0; i < 10; i++ {
domainName := i
go domainPut(domainName)
}
time.Sleep(time.Second * 2)
}
Goroutine 栗子
package main
import (
"fmt"
"sync"
"time"
)
var (
m = make(map[int]uint64)
lock sync.Mutex //申明一個互斥鎖
)
type task struct {
n int
}
func calc(t *task) {
defer func() {
err := recover()
if err != nil {
fmt.Println("error...")
return
}
}()
var sum uint64
sum = 1
for i := 1; i < t.n; i++ {
sum *= uint64(i)
}
lock.Lock() //寫全局數據加互斥鎖
m[t.n] = sum
lock.Unlock() //解鎖
}
func main() {
for i := 0; i < 10; i++ {
t := &task{n: i}
go calc(t) // Goroutine來執行任務
}
time.Sleep(time.Second) // Goroutine異步,所以等一秒到任務完成
lock.Lock() //讀全局數據加鎖
for k, v := range m {
fmt.Printf("%d! = %v\n", k, v)
}
fmt.Println(len(m))
lock.Unlock() //解鎖
}
Goroutine 栗子(等待所有任務退出主程序再退出)
package main
import (
"sync"
"fmt"
"time"
)
func calc(w *sync.WaitGroup, i int) {
fmt.Println("calc: ", i)
time.Sleep(time.Second)
w.Done()
}
func main() {
wg := sync.WaitGroup{}
for i:=0; i<10; i++ {
wg.Add(1)
go calc(&wg, i)
}
wg.Wait()
fmt.Println("all goroutine finish")
}
Channel
channel,管道、隊列,先進先出,用來異步傳遞數據。channel加上goroutine,就形成了一種既簡單又強大的請求處理模型,使高並發和線程同步之間代碼的編寫變得異常簡單。
線程安全,多個goroutine同時訪問,不需要加鎖。
channel是有類型的,一個整數的channel只能存放整數。
channel使用
//chan申明
var userChan chan interface{} // chan裡面放interface類型
userChan = make(chan interface{}, 10) // make初始化,大小為10
var readOnlyChan <-chan int // 只讀chan
var writeOnlyChan chan<- int // 只寫chan
//chan放取數據
userChan <- "nick"
name := <- userChan
name, ok := <- userChan
//關閉chan
intChan := make(chan int, 1)
intChan <- 9
close(intChan)
// range chan
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
close(intChan)
for v := range intChan {
fmt.Println(v)
}
放入chan數據個數超過初始化指定大小會怎樣?
userChan := make(chan interface{})
userChan <- "nick"
// 錯誤!fatal error: all goroutines are asleep - deadlock!
// 開啟race會一直阻塞
開啟一個goroutine來放入初始化未指定大小的chan不會報錯。
即放即走,在等放入時有來拿數據的,就直接拿走。
userChan := make(chan interface{})
go func() {
userChan <- "nick"
}()
name := <- userChan
userChan := make(chan interface{})
go func() {
for {
userChan <- "nick"
}
}()
for {
name := <- userChan
fmt.Println(name)
time.Sleep(time.Millisecond)
}
chan關閉與不關閉
關閉chan後再放入數據會 panic: send on closed channel。
chan不關閉取超數據的情況會報 deadlock
func main() {
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
for {
//十次後 fatal error: all goroutines are asleep - deadlock!
i := <- intChan
fmt.Println(i)
time.Sleep(time.Second)
}
}
chan關閉的情況取超出值為類型默認值,如int為0
func main() {
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
close(intChan)
for {
i := <- intChan
//十次後i值都為0,不報錯
time.Sleep(time.Second)
fmt.Println(i)
}
}
判斷chan是否取完
func main() {
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
close(intChan)
for {
i, ok := <- intChan
if !ok {
fmt.Println("channel is close.")
return
}
fmt.Println(i)
}
}
channel 栗子
栗子一
func sendData(ch chan<- string) {
ch <- "go"
ch <- "java"
ch <- "c"
ch <- "c++"
ch <- "python"
close(ch)
}
func getData(ch <-chan string, chColse chan bool) {
for {
str, ok := <-ch
if !ok {
fmt.Println("chan is close.")
break
}
fmt.Println(str)
}
chColse <- true
}
func main() {
ch := make(chan string, 10)
chColse := make(chan bool, 1)
go sendData(ch)
go getData(ch, chColse)
<-chColse
close(chColse)
}
栗子二:interface類型chan,取出後轉化為對應類型。
type user struct {
Name string
}
func main() {
userChan := make(chan interface{}, 1)
u := user{Name: "nick"}
userChan <- &u
close(userChan)
var u1 interface{}
u1 = <-userChan
var u2 *user
u2, ok := u1.(*user)
if !ok {
fmt.Println("cant not convert.")
return
}
fmt.Println(u2)
}
channel 超時處理
利用select來處理chan超時。
for {
select {
case v := <-chan1:
fmt.Println(v)
case v := <-chan2:
fmt.Println(v)
default:
time.Sleep(time.Second)
fmt.Println("timeout...")
}
}
time.After()定時器來做處理。
在time.After()計時器觸發之前,底層計時器不會被垃圾收集器回收。
select {
case m := <-c:
handle(m)
case <-time.After(5 * time.Minute):
fmt.Println("timed out")
}
定時器栗子
Goroutine+Channel 栗子
栗子一
多個goroutine處理任務;
等待一組channel的返回結果。
func calc(taskChan, resChan chan int, exitChan chan bool) {
defer func() {
err := recover()
if err != nil {
fmt.Println("error...")
return
}
}()
for v := range taskChan {
// 任務處理邏輯
flag := true
for i := 2; i < v; i++ {
if v%i == 0 {
flag = false
break
}
}
if flag {
//結果進chan
resChan <- v
}
}
//處理完進退出chan
exitChan <- true
}
func main() {
//任務chan
intChan := make(chan int, 1000)
//結果chan
resChan := make(chan int, 1000)
//退出chan
exitChan := make(chan bool, 8)
go func() {
for i := 0; i < 1000; i++ {
intChan <- i
}
close(intChan)
}()
//啟動8個goroutine做任務
for i := 0; i < 8; i++ {
go calc(intChan, resChan, exitChan)
}
go func() {
//等所有goroutine結束
for i := 0; i < 8; i++ {
<-exitChan
}
close(resChan)
close(exitChan)
}()
for v := range resChan {
fmt.Println(v)
}
}
栗子二
等待一組channel的返回結果 sync.WaitGroup 的解決方法。
WaitGroup用於等待一組線程的結束。父線程調用Add方法來設定應等待的線程的數量。每個被等待的線程在結束時應調用Done方法。同時,主線程里可以調用Wait方法阻塞至所有線程結束。
func merge(cs <-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}