Skip to content

Commit

Permalink
Fix concurrent map write/read bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cgxxv committed May 1, 2022
1 parent 0545a93 commit 7c16526
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 134 deletions.
31 changes: 27 additions & 4 deletions arc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (c *ArcCache) Init(clock Clock, capacity int) {
}

func (c *ArcCache) Set(ctx context.Context, key string, val interface{}, ttl time.Duration) error {
c.Lock()
defer c.Unlock()

value := deref(val)
item, ok := c.items[key]
if ttl > 0 {
Expand All @@ -50,7 +53,7 @@ func (c *ArcCache) Set(ctx context.Context, key string, val interface{}, ttl tim
}

defer func() {
c.Evict(ctx, 1)
c.evict(ctx, 1)
if c.t1.Has(key) || c.t2.Has(key) {
return
}
Expand All @@ -65,35 +68,55 @@ func (c *ArcCache) Set(ctx context.Context, key string, val interface{}, ttl tim
}

func (c *ArcCache) Get(ctx context.Context, key string) (interface{}, error) {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if !ok {
return nil, KeyNotFoundError
}

c.update(ctx, key)
if item.IsExpired(c.clock) {
c.Remove(ctx, key)
c.remove(ctx, key)
return nil, KeyExpiredError
}

return item.value, nil
}

func (c *ArcCache) Exists(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if !ok {
return false
}

c.update(ctx, key)
if item.IsExpired(c.clock) {
c.Remove(ctx, key)
c.remove(ctx, key)
return false
}
return true
}

func (c *ArcCache) Remove(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

return c.remove(ctx, key)
}

func (c *ArcCache) Evict(ctx context.Context, count int) {
c.Lock()
defer c.Unlock()

c.evict(ctx, count)
}

func (c *ArcCache) remove(ctx context.Context, key string) bool {
delete(c.items, key)
if elt := c.b1.Lookup(key); elt != nil {
c.b1.Remove(key, elt)
Expand All @@ -115,7 +138,7 @@ func (c *ArcCache) Remove(ctx context.Context, key string) bool {
return false
}

func (c *ArcCache) Evict(ctx context.Context, count int) {
func (c *ArcCache) evict(ctx context.Context, count int) {
if !c.isCacheFull() && c.t1.Len()+c.t2.Len() < c.cap {
return
}
Expand Down
5 changes: 2 additions & 3 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ type Cache interface {

//only for debug
DebugShardIndex(key string) uint64
debugFromLocal2(ctx context.Context, key string, onLoad bool) (interface{}, error)
debugFromLocal(ctx context.Context, key string, onLoad bool) (interface{}, error)
debugRemove(ctx context.Context, key string) bool
debugLocalGet(ctx context.Context, key string) (interface{}, error)
debugLocalRemove(ctx context.Context, key string) bool
serializer
}

Expand Down
65 changes: 42 additions & 23 deletions lfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (c *LfuCache) Init(clock Clock, capacity int) {
}

func (c *LfuCache) Set(ctx context.Context, key string, val interface{}, ttl time.Duration) error {
c.Lock()
defer c.Unlock()

value := deref(val)
item, ok := c.items[key]
if ttl > 0 {
Expand All @@ -43,7 +46,7 @@ func (c *LfuCache) Set(ctx context.Context, key string, val interface{}, ttl tim
if ok {
item.value = value
} else {
c.Evict(ctx, 1)
c.evict(ctx, 1)
item.key = key
item.value = value
item.freqElement = nil
Expand All @@ -60,6 +63,9 @@ func (c *LfuCache) Set(ctx context.Context, key string, val interface{}, ttl tim
}

func (c *LfuCache) Get(ctx context.Context, key string) (interface{}, error) {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if ok {
if !item.IsExpired(c.clock) {
Expand All @@ -72,29 +78,10 @@ func (c *LfuCache) Get(ctx context.Context, key string) (interface{}, error) {
return nil, KeyNotFoundError
}

func (c *LfuCache) Evict(ctx context.Context, count int) {
if len(c.items) < c.cap {
return
}

entry := c.freqList.Front()
for i := 0; i < count; {
if entry == nil {
return
} else {
for _, item := range entry.Value.(*freqEntry).items {
if i >= count {
return
}
c.removeItem(item)
i++
}
entry = entry.Next()
}
}
}

func (c *LfuCache) Exists(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if !ok {
return false
Expand All @@ -109,6 +96,9 @@ func (c *LfuCache) Exists(ctx context.Context, key string) bool {
}

func (c *LfuCache) Remove(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if ok {
c.removeItem(&item)
Expand All @@ -117,6 +107,35 @@ func (c *LfuCache) Remove(ctx context.Context, key string) bool {
return false
}

func (c *LfuCache) Evict(ctx context.Context, count int) {
c.Lock()
defer c.Unlock()

c.evict(ctx, count)
}

func (c *LfuCache) evict(ctx context.Context, count int) {
if len(c.items) < c.cap {
return
}

entry := c.freqList.Front()
for i := 0; i < count; {
if entry == nil {
return
} else {
for _, item := range entry.Value.(*freqEntry).items {
if i >= count {
return
}
c.removeItem(item)
i++
}
entry = entry.Next()
}
}
}

func (c *LfuCache) removeItem(item *lfuItem) {
entry := item.freqElement.Value.(*freqEntry)
delete(c.items, item.key)
Expand Down
51 changes: 35 additions & 16 deletions lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func (c *LruCache) Init(clock Clock, capacity int) {
}

func (c *LruCache) Set(ctx context.Context, key string, val interface{}, ttl time.Duration) error {
c.Lock()
defer c.Unlock()

value := deref(val)
it, ok := c.items[key]
if ok {
Expand All @@ -35,7 +38,7 @@ func (c *LruCache) Set(ctx context.Context, key string, val interface{}, ttl tim
}
c.evictList.MoveToFront(it)
} else {
c.Evict(ctx, 1)
c.evict(ctx, 1)
item := lruItem{
key: key,
value: value,
Expand All @@ -52,6 +55,9 @@ func (c *LruCache) Set(ctx context.Context, key string, val interface{}, ttl tim
}

func (c *LruCache) Get(ctx context.Context, key string) (interface{}, error) {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if ok {
it := item.Value.(*lruItem)
Expand All @@ -64,22 +70,10 @@ func (c *LruCache) Get(ctx context.Context, key string) (interface{}, error) {
return nil, KeyNotFoundError
}

func (c *LruCache) Evict(ctx context.Context, count int) {
if c.evictList.Len() < c.cap {
return
}

for i := 0; i < count; i++ {
ent := c.evictList.Back()
if ent == nil {
return
} else {
c.removeElement(ent)
}
}
}

func (c *LruCache) Exists(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

item, ok := c.items[key]
if !ok {
return false
Expand All @@ -93,13 +87,38 @@ func (c *LruCache) Exists(ctx context.Context, key string) bool {
}

func (c *LruCache) Remove(ctx context.Context, key string) bool {
c.Lock()
defer c.Unlock()

if ent, ok := c.items[key]; ok {
c.removeElement(ent)
return true
}
return false
}

func (c *LruCache) Evict(ctx context.Context, count int) {
c.Lock()
defer c.Unlock()

c.evict(ctx, count)
}

func (c *LruCache) evict(ctx context.Context, count int) {
if c.evictList.Len() < c.cap {
return
}

for i := 0; i < count; i++ {
ent := c.evictList.Back()
if ent == nil {
return
} else {
c.removeElement(ent)
}
}
}

func (c *LruCache) removeElement(e *list.Element) {
c.evictList.Remove(e)
entry := e.Value.(*lruItem)
Expand Down
Loading

0 comments on commit 7c16526

Please sign in to comment.