Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

阻塞读且并发安全的map问题 #76

Open
cxlearning opened this issue Sep 4, 2024 · 0 comments
Open

阻塞读且并发安全的map问题 #76

cxlearning opened this issue Sep 4, 2024 · 0 comments

Comments

@cxlearning
Copy link

原代码存在的问题:

  1. Out:若存在item且item.isExist=false时,没有将其置为true
  2. Rd:若存在两个协程a、b同时读相同的key且都阻塞在m.rmx.Lock(),可能会存在a刚放入m的item会被b覆盖

原版

type sp interface {
	Out(key string, val interface{})                  //存入key /val,如果该key读取的goroutine挂起,则唤醒。此方法不会阻塞,时刻都可以立即执行并返回
	Rd(key string, timeout time.Duration) interface{} //读取一个key,如果key不存在阻塞,等待key存在或者超时
}

type Map struct {
	c   map[string]*entry
	rmx *sync.RWMutex
}
type entry struct {
	ch      chan struct{}
	value   interface{}
	isExist bool
}

func (m *Map) Out(key string, val interface{}) {
	m.rmx.Lock()
	defer m.rmx.Unlock()
	item, ok := m.c[key]
	if !ok {
		m.c[key] = &entry{
			value:   val,
			isExist: true,
		}
		return
	}
	item.value = val
	if !item.isExist {
		if item.ch != nil {
			close(item.ch)
			item.ch = nil
		}
	}
	return
}

func (m *Map) Rd(key string, timeout time.Duration) interface{} {
	m.rmx.RLock()
	if e, ok := m.c[key]; ok && e.isExist {
		m.rmx.RUnlock()
		return e.value
	} else if !ok {
		m.rmx.RUnlock()
		m.rmx.Lock()
		e = &entry{ch: make(chan struct{}), isExist: false}
		m.c[key] = e
		m.rmx.Unlock()
		log.Println("协程阻塞 -> ", key)
		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			log.Println("协程超时 -> ", key)
			return nil
		}
	} else {
		m.rmx.RUnlock()
		log.Println("协程阻塞 -> ", key)
		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			log.Println("协程超时 -> ", key)
			return nil
		}
	}
}

func main() {
	mapval := Map{
		c:   make(map[string]*entry),
		rmx: &sync.RWMutex{},
	}

	for i := 0; i < 10; i++ {
		go func() {
			val := mapval.Rd("key", time.Second*6)
			log.Println("读取值为->", val)
		}()
	}

	time.Sleep(time.Second * 3)
	for i := 0; i < 10; i++ {
		go func(val int) {
			mapval.Out("key", val)
		}(i)
	}

	time.Sleep(time.Second * 30)
}
func init() {
	log.SetFlags(log.LstdFlags | log.Lshortfile)
}

改进

type sp interface {
	Out(key string, val interface{})                  //存入key /val,如果该key读取的goroutine挂起,则唤醒。此方法不会阻塞,时刻都可以立即执行并返回
	Rd(key string, timeout time.Duration) interface{} //读取一个key,如果key不存在阻塞,等待key存在或者超时
}

type xMap struct {
	c  map[string]*entry
	mx *sync.Mutex
}
type entry struct {
	ch      chan struct{}
	value   interface{}
	isExist bool
}

func (x *xMap) Out(key string, val interface{}) {
	x.mx.Lock()
	defer x.mx.Unlock()

	if e, ok := x.c[key]; ok {
		exist := e.isExist
		e.value = val
		e.isExist = true

		if !exist && e.ch != nil {
			close(e.ch)
			e.ch = nil
		}
		return
	}

	e := &entry{
		value:   val,
		isExist: true,
		ch:      nil,
	}
	x.c[key] = e
}

func (x *xMap) Rd(key string, timeout time.Duration) interface{} {
	x.mx.Lock()
	defer x.mx.Unlock()

	if e, ok := x.c[key]; ok {
		if e.isExist {
			return e.value
		}

		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			return nil
		}
	}

	e := &entry{
		value:   nil,
		isExist: false,
		ch:      make(chan struct{}),
	}
	x.c[key] = e
	select {
	case <-e.ch:
		return e.value
	case <-time.After(timeout):
		return nil
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant