将要并发执行的任务包装成一个函数,调用函数的时候前面加上go
关键字,就能够开启一个goroutine去执行该函数的任务
goroutine对应的函数执行完,该goroutine就结束了。
程序启动的时候就会自动创建一个goroutine去执行main函数
main函数结束了,那么程序也就结束了,由该程序启动的所有其他goroutine也都结束了。
goroutine的调度模型:GMP
M:N
:把m个goroutine分配给n个操作系统线程
goroutine是用户态的线程,比内核态的线程更轻量级一点。初始时只占用2KB的栈空间,可以轻松开启数十万的goroutine也不会崩内存
Go1.5之后默认就是操作系统的逻辑核心数,默认跑满CPU
runtime.GOMAXPROCS(1)
:只占用一个核。多用于日志监控等轻量级程序
开启一定数量的goroutine
package main import ( "fmt" "sync" "time" ) // worker pool var wg sync.WaitGroup var notice = make(chan struct{}, 5) func worker(id int, jobs <-chan int, results chan<- int) { defer wg.Done() for j := range jobs { fmt.Printf("worker:%d start job:%d\n", id, j) time.Sleep(time.Second) fmt.Printf("worker:%d end job:%d\n", id, j) results <- j * 2 notice <- struct{}{} // 通知 } } func main() { jobs := make(chan int, 100) results := make(chan int, 100) // 5个任务 go func() { for j := 0; j < 5; j++ { jobs <- j } close(jobs) }() // 开启3个goroutine wg.Add(3) for w := 0; w < 3; w++ { go worker(w, jobs, results) } go func() { for i := 0; i < 5; i++ { <-notice } close(results) }() // 输出结果 for x := range results { fmt.Println(x) } // for a := 1; a < 5; a++ { // <-results // } }
var wg sync.WaitGroup
通过channel实现多个goroutine之间的通信
CSP
通过通信来共享内存
channel是一种类型,一种引用类型。make函数初始化之后才能使用.(slice,map,channel)
var ch chan 元素类型
ch = make(chan 元素类型, [缓冲区大小])
ch <- 100
x:= <- ch
close(ch)
快递员送快递的示例,有缓冲区就是有快递柜
for { x, ok := <-ch // 再取阻塞 if !ok{ // 什么时候ok=false? ch通道被关闭的时候 break } fmt.Println(x, ok) time.Sleep(time.Second) } for x := range ch { fmt.Println(x) }
通常是用作函数的参数,只读通道<- chan
和只写通道chan <- int
同一时刻有多个通道要操作的场景下,使用select。
使用select
语句能提高代码的可读性。
case
同时满足,select
会随机选择一个。case
的select{}
会一直等待,可用于阻塞main函数。package main import ( "fmt" "sync" ) // 锁 var x = 0 var wg sync.WaitGroup var lock sync.Mutex func add() { defer wg.Done() for i := 0; i < 50000; i++ { lock.Lock() x++ lock.Unlock() } } func main() { wg.Add(2) go add() go add() wg.Wait() fmt.Println(x) }
package main import ( "fmt" "sync" "time" ) // 读写互斥锁rwlock var lock sync.Mutex var rwlock sync.RWMutex var wg sync.WaitGroup var x = 0 func read() { defer wg.Done() rwlock.RLock() fmt.Println(x) time.Sleep(time.Millisecond) rwlock.RUnlock() } func write() { defer wg.Done() rwlock.RLock() x++ time.Sleep(5 * time.Millisecond) rwlock.RUnlock() } func main() { start := time.Now() for i := 0; i < 10; i++ { wg.Add(1) go write() } for i := 0; i < 1000; i++ { wg.Add(1) go read() } wg.Wait() fmt.Println(time.Since(start)) }
package main import ( "fmt" "sync" ) // sync.once var wg sync.WaitGroup var once sync.Once func f1(ch1 chan<- int) { defer wg.Done() for i := 0; i < 100; i++ { ch1 <- i } close(ch1) } func f2(ch1 <-chan int, ch2 chan<- int) { defer wg.Done() for { x, ok := <-ch1 if !ok { break } ch2 <- x * x } once.Do(func() { close(ch2) }) // 确保某个操作只执行一次 } func main() { a := make(chan int, 100) b := make(chan int, 100) wg.Add(3) go f1(a) go f2(a, b) go f2(a, b) wg.Wait() for ret := range b { fmt.Println(ret) } }
版本1:慢
package main import ( "fmt" "strconv" "sync" ) // sync.map // Go内置的map不是并发安全的 var ( m = make(map[string]int) lock sync.Mutex ) func get(key string) int { return m[key] } func set(key string, value int) { m[key] = value } func main() { wg := sync.WaitGroup{} for i := 0; i < 21; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) lock.Lock() set(key, n) lock.Unlock() fmt.Printf("k=:%v,v:=%v\n", key, get(key)) wg.Done() }(i) } wg.Wait() }
版本2:快
func main() { m2 := sync.Map{} for i := 0; i < 25; i++ { wg.Add(1) go func(n int) { key := strconv.Itoa(n) m2.Store(key, n) // 必须使用sync.map内置的store去存值 value, _ := m2.Load(key) // 必须使用sync.map内置的load方法根据key去取值 fmt.Printf("k=:%v,v:=%v\n", key, value) wg.Done() }(i) } wg.Wait() }
package main import ( "fmt" "sync" "sync/atomic" ) // 原子操作 var x int64 var wg sync.WaitGroup var lock sync.Mutex // func add() { // defer wg.Done() // lock.Lock() // x++ // lock.Unlock() // } func add1() { atomic.AddInt64(&x, 1) wg.Done() } func main() { // wg.Add(100000) // for i := 0; i < 100000; i++ { // go add1() // } // wg.Wait() // fmt.Println(x) // 比较并交换 x = 100 ok := atomic.CompareAndSwapInt64(&x, 100, 300) // x是否等于100,若是则改为300,不是则返回false fmt.Println(ok, x) }
package main import ( "fmt" "net" "strings" ) // tcp server端 func processConn(conn net.Conn) { defer conn.Close() // 3.与客户端通信 var tmp [128]byte for { n, err := conn.Read(tmp[:]) if err != nil { fmt.Printf("read failed, err:%v\n", err) return } msg := string(tmp[:n]) fmt.Println(msg) reback := strings.ToUpper(msg) conn.Write([]byte(reback)) } } func main() { // 1.本地端口启动服务 listener, err := net.Listen("tcp", "localhost:9000") if err != nil { fmt.Printf("start failed, err:%v\n", err) return } defer listener.Close() // 2.等待别人来连接 for { conn, err := listener.Accept() if err != nil { fmt.Printf("accept failed, err:%v\n", err) return } go processConn(conn) } }
client端
package main import ( "bufio" "fmt" "net" "os" ) // tcp client func main() { // 1.与server端建立链接 conn, err := net.Dial("tcp", "localhost:9000") if err != nil { fmt.Printf("dial failed, err:%v\n", err) return } // 2.发送数据 // var msg string var tmp [128]byte reader := bufio.NewReader(os.Stdin) for { fmt.Print("发送到服务端") // fmt.Scan(&msg) // 有空格会有点小问题 text, _ := reader.ReadString('\n') // 读到换行 if text == "quit" { break } conn.Write([]byte(text)) n, err := conn.Read(tmp[:]) if err != nil { fmt.Printf("read failed, err:%v\n", err) return } fmt.Println(string(tmp[:n])) } }
大端和小端
解决粘包问题
// socket_stick/proto/proto.go package proto import ( "bufio" "bytes" "encoding/binary" ) // Encode 将消息编码 func Encode(message string) ([]byte, error) { // 读取消息的长度,转换成int32类型(占4个字节) var length = int32(len(message)) var pkg = new(bytes.Buffer) // 写入消息头 err := binary.Write(pkg, binary.LittleEndian, length) if err != nil { return nil, err } // 写入消息实体 err = binary.Write(pkg, binary.LittleEndian, []byte(message)) if err != nil { return nil, err } return pkg.Bytes(), nil } // Decode 解码消息 func Decode(reader *bufio.Reader) (string, error) { // 读取消息的长度 lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据 lengthBuff := bytes.NewBuffer(lengthByte) var length int32 err := binary.Read(lengthBuff, binary.LittleEndian, &length) if err != nil { return "", err } // Buffered返回缓冲中现有的可读取的字节数。 if int32(reader.Buffered()) < length+4 { return "", err } // 读取真正的消息数据 pack := make([]byte, int(4+length)) _, err = reader.Read(pack) if err != nil { return "", err } return string(pack[4:]), nil }
server服务端
package main import ( "bufio" "fmt" "io" "net" proto "go.study.com/hina/day01/day09/04nianbao/protocol" ) // socket_stick/server/main.go func process(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) for { recvStr, err := proto.Decode(reader) // n, err := reader.Read(buf[:]) if err == io.EOF { break } if err != nil { fmt.Println("read from client failed, err:", err) break } fmt.Println("收到client发来的数据:", recvStr) } } func main() { listen, err := net.Listen("tcp", "127.0.0.1:30000") if err != nil { fmt.Println("listen failed, err:", err) return } defer listen.Close() for { conn, err := listen.Accept() if err != nil { fmt.Println("accept failed, err:", err) continue } go process(conn) } }
client客户端
package main import ( "fmt" "net" proto "go.study.com/hina/day01/day09/04nianbao/protocol" ) // socket_stick/client/main.go func main() { conn, err := net.Dial("tcp", "127.0.0.1:30000") if err != nil { fmt.Println("dial failed, err", err) return } defer conn.Close() for i := 0; i < 20; i++ { msg := `Hello, Hello. How are you?` // 调用协议编码数据 b, _ := proto.Encode(msg) conn.Write([]byte(b)) } }
服务端server
package main import ( "fmt" "net" "strings" ) // UDP server func main() { conn, err := net.ListenUDP("udp", &net.UDPAddr{ IP: net.IPv4(127, 0, 0, 1), Port: 9000, }) if err != nil { fmt.Printf("conn failed, err:%v\n", err) return } defer conn.Close() // 不需要建立链接,直接发数据 var data [1024]byte for { n, addr, err := conn.ReadFromUDP(data[:]) if err != nil { fmt.Printf("read failed, err:%v\n", err) return } fmt.Println(data[:n]) reply := strings.ToUpper(string(data[:n])) // 发送数据 conn.WriteToUDP([]byte(reply), addr) } }
客户端client
package main import ( "bufio" "fmt" "net" "os" ) // UDP client func main() { socket, err := net.DialUDP("udp", nil, &net.UDPAddr{ IP: net.IPv4(127, 0, 0, 1), Port: 9000, }) if err != nil { fmt.Printf("socket failed, err:%v\n", err) return } defer socket.Close() var reply [1024]byte reader := bufio.NewReader(os.Stdin) for { fmt.Print("发送到服务端:") msg, _ := reader.ReadString('\n') socket.Write([]byte(msg)) // 收回复的数据 n, _, err := socket.ReadFromUDP(reply[:]) if err != nil { fmt.Printf("read failed, err:%v\n", err) return } fmt.Println("收到回复信息:", string(reply[:n])) } }