# Day08 标准库sync网络编程与测试

# 并发安全与锁

多个goroutine同时操作一个资源(临界区),就会发生竞态问题(数据竞态)

例如下面实例,两个goroutine在访问和修改全局变量x时,就会存在数据竞争,导致最终的结果不是10000

var (
	x int64
	wg sync.WaitGroup // 等待组
)

// add 对全局变量x执行5000次加1操作
func add() {
	for i := 0; i < 5000; i++ {
		x = x + 1
	}
	wg.Done()
}

func main() {
	wg.Add(2)

	go add()
	go add()

	wg.Wait()
	fmt.Println(x)
}

# 互斥锁 sync.Mutex

  • 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源
  • Go语言中使用sync包中提供的Mutex类型来实现互斥锁
var (
	x int64
	wg sync.WaitGroup // 等待组
    mutex sync.Mutex  // 互斥锁
)

// add 对全局变量x执行5000次加1操作
func add() {
	for i := 0; i < 5000; i++ {
        mutex.Lock()     // 修改x前加锁
		x = x + 1
        mutex.Unlock()  // 修改x后解锁
	}
	wg.Done()
}

func main() {
	wg.Add(2)

	go add()
	go add()

	wg.Wait()
	fmt.Println(x)
}

// 编译后多次执行,每一次都会得到预期中的结果 10000
  • 使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;
  • 当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的;

# 读写互斥锁 sync.RWMutex

  • 互斥锁是完全互斥的,但是在读多写少的场景下,并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,读写互斥锁是更好的选择
  • Go语言中使用sync包中的RWMutex类型实现读写互斥锁
var (
	x       int64
	wg      sync.WaitGroup // 等待组
	mutex   sync.Mutex     // 互斥锁
	rwMutex sync.RWMutex   // 读写互斥锁
)

// writeWithLock 使用互斥锁的写操作
func writeWithLock() {
	mutex.Lock() // 加互斥锁
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
	mutex.Unlock()                    // 解互斥锁

	wg.Done()
}

// writeWithLock 使用互斥锁的读操作
func readWithLock() {
	mutex.Lock()                 // 加互斥锁
	time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
	mutex.Unlock()               // 解互斥锁
	wg.Done()
}

// writeWithLock 使用读写互斥锁的写操作
func writeWithRWLock() {
	rwMutex.Lock() // 加写锁
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假设写操作耗时10毫秒
	rwMutex.Unlock()                  // 释放写锁
	wg.Done()
}

// readWithRWLock 使用读写互斥锁的读操作
func readWithRWLock() {
	rwMutex.RLock()              // 加读锁
	time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
	rwMutex.RUnlock()            // 释放读锁
	wg.Done()
}

func do(wf, rf func(), wc int, rc int) {
	start := time.Now()
	// wc个并发写操作
	for i := 0; i < wc; i++ {
		wg.Add(1)
		go wf()
	}
	//  rc个并发读操作
	for i := 0; i < rc; i++ {
		wg.Add(1)
		go rf()
	}

	wg.Wait()
	cost := time.Since(start)
	fmt.Printf("x:%v cost:%v\n", x, cost)
}

func rwmutex_demo() {
	// 互斥锁模拟10次写操作和1000次读操作
	do(writeWithLock, readWithLock, 10, 1000)      // x:10 cost:1.255622833s

    // 读写互斥锁模拟10次写操作和1000次读操作
	do(writeWithRWLock, readWithRWLock, 10, 1000)  // x:20 cost:112.072375ms
}

// 从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能

读写锁分为两种:读锁和写锁

  • 当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;
  • 而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待;

# 等待组 sync.WaitGroup

Go语言中可以使用sync.WaitGroup来实现并发任务的同步

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少

  • 启动了N个并发任务时,通过调用wg.Add将计数器值增加N;
  • 每个任务完成时,通过调用wg.Done方法将计数器减1;
  • 通过调用wg.Wait来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成;
var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 启动另外一个goroutine去执行hello函数
	fmt.Println("main goroutine done!")
	
    wg.Wait()
}
  • 需要注意sync.WaitGroup是一个结构体,进行参数传递的时候要传递指针

# 执行一次 sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等

func (o *Once) Do(f func())

如果要执行的函数f需要传递参数就需要搭配闭包来使用

var icons map[string]image.Image
var once sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
	once.Do(loadIcons)
	return icons[name]
}

并发安全的单例模式

type singleton struct{}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
	once.Do(func() {
		instance = &singleton{}
	})
	return instance
}

# 安全映射 sync.Map

Go语言中内置的map不是并发安全的

var m = make(map[string]int)

func get(key string) int {
	return m[key]
}

func set(key string, value int) {
	m[key] = value
}

func map_demo() {
	var wg = sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			key := strconv.Itoa(n)
			set(key, n)
			fmt.Printf("key=%s, value=%d\n", key, get(key))
		}(i)
	}
	wg.Wait()
}

// fatal error: concurrent map writes

这种场景需要为map加锁来保证并发的安全性,Go语言的sync包中提供了一个开箱即用的并发安全版sync.Map

// 并发安全的map
var m = sync.Map{}

func map_demo1() {
	var wg = sync.WaitGroup{}
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			key := strconv.Itoa(n)
			m.Store(key, n)     // 存储key-value
			v, _ := m.Load(key) // 根据key取值
			fmt.Printf("key=%v, value=%v\n", key, v)
		}(i)
	}
	wg.Wait()
}

# 原子操作 sync/atomic

针对整数数据类型(int32、uint32、int64、uint64)使用原子操作来保证并发安全。Go语言中原子操作由内置的标准库sync/atomic提供

type Counter interface {
	Inc()
	Load() int64
}

// 普通版
type CommonCounter struct {
	counter int64
}

func (c *CommonCounter) Inc() {
	c.counter++
}

func (c *CommonCounter) Load() int64 {
	return c.counter
}

// 互斥锁版
type MutexCounter struct {
	counter int64
	lock    sync.Mutex
}

func (m *MutexCounter) Inc() {
	m.lock.Lock()
	defer m.lock.Unlock()
	m.counter++
}

func (m *MutexCounter) Load() int64 {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.counter
}

// 原子操作版
type AtomicCounter struct {
	counter int64
}

func (a *AtomicCounter) Inc() {
	atomic.AddInt64(&a.counter, 1)
}

func (a *AtomicCounter) Load() int64 {
	return atomic.LoadInt64(&a.counter)
}

func test(c Counter) {
	var wg1 sync.WaitGroup
	start := time.Now()

	for i := 0; i < 100000; i++ {
		wg1.Add(1)
		go func() {
			c.Inc()
			wg1.Done()
		}()
	}
	wg1.Wait()

	end := time.Now()
	fmt.Println(c.Load(), end.Sub(start))
}

func main(){
    c1 := CommonCounter{} // 非并发安全
	test(&c1)

	c2 := MutexCounter{} // 使用互斥锁实现并发安全
	test(&c2)

	c3 := AtomicCounter{} // 并发安全比互斥锁效率高
	test(&c3)
}

# 网络编程-TCP通信

TCP是面向连接的协议,数据像水流一样传输,会存在黏包问题

  • TCP服务端

一个TCP服务端可以同时连接很多个客户端,因为Go语言中创建多个goroutine实现并发非常方便和高效,所以我们可以每建立一次链接就创建一个goroutine去处理

TCP服务端程序的处理流程:

  1. 监听端口;
  2. 接收客户端请求建立链接;
  3. 创建goroutine处理链接;
func process(conn net.Conn) {
	defer conn.Close()

	for {
		reader := bufio.NewReader(conn)
		var buf [128]byte
		n, err := reader.Read(buf[:]) // 读取数据
		if err != nil {
			fmt.Println("read from client failed, err:", err)
			break
		}
		recvStr := string(buf[:n])
		fmt.Println("收到client端发来的数据:", recvStr)
		conn.Write([]byte(recvStr)) // 发送数据
	}
}

func main() {
	// 1. 监听端口;
	listener, err := net.Listen("tcp", "127.0.0.1:80")
	if err != nil {
		fmt.Println("listen failed err: ", err)
		return
	}

	for {
		// 2. 接收客户端请求建立链接;
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("conn failed err: ", err)
			continue
		}

		// 3. 创建goroutine处理链接;
		go process(conn)
	}

}
  • TPC客户端

一个TCP客户端进行TCP通信的流程如下:

  1. 建立与服务端的链接
  2. 进行数据收发
  3. 关闭链接
func main() {
	// 1. 建立与服务端的链接
	client, err := net.Dial("tcp", "127.0.0.1:80")
	if err != nil {
		fmt.Println("Dial err:", err)
		return
	}
	// 3. 关闭链接
	defer client.Close()

	// 2. 进行数据收发
	inputReader := bufio.NewReader(os.Stdin)
	for {
		fmt.Print("请输入数据,换行结束:")
		input, _ := inputReader.ReadString('\n') // 读取用户输入
		inputInfo := strings.Trim(input, "\r\n")
		if strings.ToUpper(inputInfo) == "Q" { // 如果输入q就退出
			return
		}

		_, err = client.Write([]byte(inputInfo)) // 发送数据
		if err != nil {
			fmt.Println("Write err:", err)
			return
		}

		buf := [512]byte{}
		n, err := client.Read(buf[:])
		if err != nil {
			fmt.Println("Recv failed, err:", err)
			return
		}
		fmt.Println("Recv data:", string(buf[:n]))
	}
}

# 网络编程-UDP通信

需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域

  • UDP服务端
func main() {
	listen, err := net.ListenUDP("udp", &net.UDPAddr{
		IP:   net.IPv4(0, 0, 0, 0),
		Port: 30000,
	})
	if err != nil {
		fmt.Println("listen failed, err:", err)
		return
	}
	defer listen.Close()
	for {
		var data [1024]byte
		n, addr, err := listen.ReadFromUDP(data[:]) // 接收数据
		if err != nil {
			fmt.Println("read udp failed, err:", err)
			continue
		}
		fmt.Printf("data:%v addr:%v count:%v\n", string(data[:n]), addr, n)
		_, err = listen.WriteToUDP(data[:n], addr) // 发送数据
		if err != nil {
			fmt.Println("write to udp failed, err:", err)
			continue
		}
	}
}
  • UDP客户端
func main() {
	client, err := net.DialUDP("udp", nil, &net.UDPAddr{
		IP:   net.IPv4(0, 0, 0, 0),
		Port: 30000,
	})
	if err != nil {
		fmt.Println("连接服务端失败,err:", err)
		return
	}
	defer client.Close()
	sendData := []byte("Hello server")
	_, err = client.Write(sendData) // 发送数据
	if err != nil {
		fmt.Println("发送数据失败,err:", err)
		return
	}
	data := make([]byte, 4096)
	n, remoteAddr, err := client.ReadFromUDP(data) // 接收数据
	if err != nil {
		fmt.Println("接收数据失败,err:", err)
		return
	}
	fmt.Printf("recv:%v addr:%v count:%v\n", string(data[:n]), remoteAddr, n)
}
上次更新: 1/10/2023, 12:41:52 PM