并发安全和锁

并发安全和锁 #

Go语言的sync包提供了常见的并发编程控制锁; 在并发编程中锁的主要作用是保证多个线程或者 goroutine在访问同一片内存时不会出现混乱;

golang 中使用 go 语句来开启一个新的协程。 goroutine 是非常轻量的,除了给它分配栈空间,它所占用的内存空间是微乎其微的; 但当多个 goroutine 同时进行处理的时候,就会遇到比如同时抢占一个资源,某个 goroutine 等待另一个 goroutine 处理完某一个步骤之后才能继续的需求。 在 golang 的官方文档上,作者明确指出,golang 并不希望依靠共享内存的方式进行进程的协同操作。 而是希望通过管道 channel 的方式进行;

但是一些特殊情况下,我们依然需要用到锁,所以 sync 包提供了我们需要的功能.

整个包都围绕这 Locker 进行,这是一个 interface:

type Locker interface {
        Lock()
        Unlock()
}

sync.WaitGroup #

它的使用场景是在一个goroutine等待一组goroutine执行完成. WaitGroup拥有一个内部计数器; 当计数器等于0时,则Wait()方法会立即返回; 否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止;

增加计数器,使用Add(int)方法。 减少计数器,我们可以使用Done()(将计数器减1), 也可以传递负数给Add方法把计数器减少指定大小,Done()方法底层就是通过Add(-1)实现的.

var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
    wg.Add(1)
    go func(a int){
        println(a)
        defer wg.Done()
    }(i)
}
wg.Wait()

//示例2
func main() {
	//runtime.GOMAXPROCS(runtime.NumCPU())
	var wg sync.WaitGroup
	wg.Add(2)
	go func(){
		defer wg.Done()
		for i:=1;i<100;i++ {
			fmt.Println("A:",i)
		}
	}()
	go func(){
		defer wg.Done()
		for i:=1;i<100;i++ {
			fmt.Println("B:",i)
		}
	}()

	wg.Wait()
}

sync.Mutex #

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

举个例子:

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i < 5000; i++ {
        x = x + 1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。

Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

sync.RWMutex #

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

读写锁示例:

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

共享内容通信,其实go推荐channel通信共享内存

func add()
{
    for i := 0; i < 5000; i++ {
         ch <- 1
    }
}

通过方式计算
for i := range ch {
    x += i
}

sync.Once #

sync.Once 是 Golang package 中使方法只执行一次的对象实现,作用与 init 函数类似。但也有所不同。

  • init 函数是在文件包首次被加载的时候执行,且只执行一次
  • sync.Onc 是在代码运行中需要的时候执行,且只执行一次

当一个函数不希望程序在一开始的时候就被执行的时候,我们可以使用 sync.Once

import (
	"fmt"
	"sync"
)

type Option struct {
	Address string
	Port int
}

func NewOption(addr string,port int) *Option {
	fmt.Println("init Option config Object")
	return &Option{
		Address:addr,
		Port:port,
	}
}

var OptionPtr *Option

func main() {
	var once sync.Once
	for i := 0; i < 20 ; i++ {
		fmt.Println("run for-loop ",i)
		once.Do(func() {
			OptionPtr = NewOption("127.0.0.1",80)
		})
	}
}

#output
run for-loop  0
init Option config Object
run for-loop  1
run for-loop  2

sync.Once源码 #

package sync

import (
	"sync/atomic"
)

// Once is an object that will perform exactly one action.
type Once struct {
	m    Mutex
	done uint32
}
func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
	// Slow-path.
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

sync.Map #

package main

var m = make(map[int]int)

func get(key int) int {
	return m[key]
}

func set(key int, value int) {
	m[key] = value
}

func main() {
	go func() {
		for i := 0; i < (1 << 32); i++ {
			set(i, i)
		}
	}()

	go func() {
		for i := 0; ; i++ {
			get(i)
		}
	}()

	<-make(chan bool)
}

#output
fatal error: concurrent map read and map write
goroutine 19 [running]:

错误信息显示,并发的 map 读和 map 写,也就是说使用了两个并发函数不断地对 map 进行读和写而发生了竞态问题,map 内部会对这种并发操作进行检查并提前发现。

需要并发读写时,一般的做法是加锁,但这样性能并不高,Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构

sync.Map 有以下特性:

  • 无须初始化,直接声明即可。
  • sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
  • 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。

基本操作

  var scene sync.Map
    // 将键值对保存到sync.Map
    scene.Store("greece", 97)
    scene.Store("london", 100)
    scene.Store("egypt", 200)
    // 从sync.Map中根据键取值
    fmt.Println(scene.Load("london"))
    // 根据键删除对应的键值对
    scene.Delete("london")
    // 遍历所有sync.Map中的键值对
    scene.Range(func(k, v interface{}) bool {
        fmt.Println("iterate:", k, v)
        return true
    })

package main
import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var m1 sync.Map
	go func() {
		for i := 0; i < (1 << 32); i++ {
			fmt.Println("sync.Map Store",i)
			m1.Store(i, i)
			time.Sleep(time.Millisecond)
		}
	}()

	go func() {
		for i := 0; ; i++ {
			fmt.Println("sync.Map Load",i)
			m1.Load(i)
			time.Sleep(time.Millisecond)
		}
	}()

	<- make(chan bool)
}

#output
...
sync.Map Store 3358
sync.Map Store 3359
sync.Map Load 3358
sync.Map Load 3359
...

sync.Cond #

sync.Cond就是用于实现条件变量的,是基于sync.Mutext的基础上,增加了一个通知队列,通知的线程会从通知队列中唤醒一个或多个被通知的线程。 主要有以下几个方法:

sync.NewCond(&mutex):生成一个cond,需要传入一个mutex,因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于sync.Mutex来实现的。
sync.Wait():用于等待通知
sync.Signal():用于发送单个通知
sync.Broadcat():用于广播
package main

import (
	"fmt"
	"sync"
	"time"
)

var locker sync.Mutex
var cond = sync.NewCond(&locker)
// NewCond(l Locker)里面定义的是一个接口,拥有lock和unlock方法。
// 看到sync.Mutex的方法,func (m *Mutex) Lock(),可以看到是指针有这两个方法,所以应该传递的是指针
func main() {
	for i := 0; i < 10; i++ {
		go func(x int) {
			cond.L.Lock()         // 获取锁
			defer cond.L.Unlock() // 释放锁
			cond.Wait()           // 等待通知,阻塞当前 goroutine
			// 通知到来的时候, cond.Wait()就会结束阻塞, do something. 这里仅打印
			fmt.Println(x)
		}(i)
	}
	time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 进入 Wait 阻塞状态
	fmt.Println("Signal...")
	cond.Signal() // 1 秒后下发一个通知给已经获取锁的 goroutine
	time.Sleep(time.Second * 1)
	fmt.Println("Signal...")
	cond.Signal() // 1 秒后下发下一个通知给已经获取锁的 goroutine
	time.Sleep(time.Second * 1)
	cond.Broadcast() // 1 秒后下发广播给所有等待的goroutine
	fmt.Println("Broadcast...")
	time.Sleep(time.Second * 1) // 睡眠 1 秒,等待所有 goroutine 执行完毕
}

引入自己程序中结构控制

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Queue interface {
	Push(e interface{}) error
	Pop() interface{}
}

type FiFo struct {
	cond *sync.Cond
	lock  sync.Mutex
	queue []interface{}
}

func (f *FiFo) Push(e interface{}) (error) {
	f.lock.Lock()         //获取锁
	defer f.lock.Unlock() //释放锁
	f.queue = append(f.queue, e)
	f.cond.Broadcast()
	return nil
}

func (f *FiFo) Pop() (interface{}) {
	f.lock.Lock()         //获取锁
	defer f.lock.Unlock() //释放锁
	if len(f.queue) == 0 {
		f.cond.Wait()
	}
	item := f.queue[0]
	f.queue = f.queue[1:]
	return item
}

func NewFifo() *FiFo  {
	lock := sync.Mutex{}
	return  &FiFo{
		cond:  sync.NewCond(&lock),
		lock:  lock,
		queue: []interface{}{},
	}
}

func main()  {
	fio := NewFifo()
	go func() {
		for {
			_ = fio.Push(rand.Intn(300))
		}
	}()
	time.Sleep(time.Second)
	go func() {
		for {
			e := fio.Pop()
			fmt.Println(e)
		}
	}()

	<-make(chan bool)
}