1.背景

1.1.项目介绍

cache2go是一款由golang实现的本地缓存库,提供并发安全的读写操作,具有过期时间控制等特性。项目地址:https://github.com/muesli/cache2go

1.2.使用方法

go get github.com/muesli/cache2go

  • 核心操作API:
    • Cache:创建Cache Table
    • Add:添加Cache
    • Value:读取Cache
    • Delete:删除指定Key Cache
    • Flush:清空整个Cache Table
package main

import (
  "github.com/muesli/cache2go"
  "log"
  "time"
)

type Item struct {
  Name   string `json:"name"`
  Prices int64  `json:"prices"`
  Stocks int64  `json:"stocks"`
}

func basicOpTest() {
  // 初始化itemCache本地缓存
  itemCache := cache2go.Cache("itemCache")
  item := &Item{
     Name:   "MacBookPro",
     Prices: 10000,
     Stocks: 1,
  }

  // 添加item1缓存,过期时间为5秒钟
  itemCache.Add("item1", 5*time.Second, item)

  // 读取item1缓存
  if v, err := itemCache.Value("item1"); err != nil {
     log.Printf("item1 err = %v", err)
  } else {
     log.Printf("读取item1缓存:%#v", v.Data())
  }

  // 睡眠6s后读取
  time.Sleep(6 * time.Second)
  if v, err := itemCache.Value("item1"); err != nil {
     log.Printf("item1 err = %v", err)
  } else {
     log.Printf("6s后读取item1缓存:%#v", v.Data())
  }

  // 添加item2,不设置过期时间
  itemCache.Add("item2", 0, item)

  // 读取item2缓存
  if v, err := itemCache.Value("item2"); err != nil {
     log.Printf("item2 err = %v", err)
  } else {
     log.Printf("读取item2缓存:%#v", v.Data())
  }

  // 删除掉item2缓存
  itemCache.Delete("item2")

  // 再读取item2缓存
  if v, err := itemCache.Value("item2"); err != nil {
     log.Printf("item2 err = %v", err)
  } else {
     log.Printf("读取item2缓存:%#v", v.Data())
  }

  // 添加item3缓存,并删除所有缓存
  itemCache.Add("item3", 0, item)
  itemCache.Flush()

  // 读取item3缓存
  if v, err := itemCache.Value("item3"); err != nil {
     log.Printf("item3 err = %v", err)
  } else {
     log.Printf("读取item3缓存:%#v", v.Data())
  }
}
运行结果:
2022/10/17 20:52:00 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item1 err = Key not found in cache
2022/10/17 20:52:06 读取item2缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 20:52:06 item2 err = Key not found in cache
2022/10/17 20:52:06 item3 err = Key not found in cache
  • 操作回调API:
    • AddAddedItemCallback:新增Cache回调函数
    • AddAboutToDeleteItemCallback:删除Cache回调函数
    • AddAboutToExpireCallback:过期CacheItem回调函数
    • 以上三个方法分别再对应着RemoveXXX,表示删去对应操作的全部回调函数
func callBackTest() {
   // 初始化itemCache本地缓存
   itemCache := cache2go.Cache("itemCache")

   // 设置各操作回调函数
   itemCache.AddAddedItemCallback(func(item *cache2go.CacheItem) {
      log.Printf("added callback, item = %#v", item)
   })
   itemCache.AddAboutToDeleteItemCallback(func(item *cache2go.CacheItem) {
      log.Printf("deleted callback, item = %#v", item)
   })
   item := itemCache.Add("expire_item", 1*time.Second, Item{
      Name:   "expire_item",
      Prices: 1,
      Stocks: 1,
   })
   item.AddAboutToExpireCallback(func(item interface{}) {
      log.Printf("expired callback, item = %#v", item)
   })
   // 执行基本操作
   basicOpTest()
}
输出结果
2022/10/17 21:12:09 added callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:12:09 读取item1缓存:&main.Item{Name:"MacBookPro", Prices:10000, Stocks:1}
2022/10/17 21:12:10 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"expire_item", data:main.Item{Name:"expire_item", Prices:1, Stocks:1}, lifeSpan:1000000000, createdOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55e4d7d8, ext:374551, loc:(*time.Location)(0x1187880)}, accessCount:0, aboutToExpire:[]func(interface {}){(func(interface {}))(0x10a0530)}}
2022/10/17 21:12:10 expired callback, item = "expire_item"
2022/10/17 21:12:14 deleted callback, item = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:(*main.Item)(0xc00008c040), lifeSpan:5000000000, createdOn:time.Time{wall:0xc0cb730a55e5a2f8, ext:426392, loc:(*time.Location)(0x1187880)}, accessedOn:time.Time{wall:0xc0cb730a55eaa820, ext:755728, loc:(*time.Location)(0x1187880)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
// ...
  • 设置自定义缓存加载器:SetDataLoader
func dataLoaderTest() {
   // 初始化itemCache本地缓存
   redisItemCache := cache2go.Cache("redisItemCache")

   // 设置自定义的cache加载逻辑
   redisItemCache.SetDataLoader(func(key interface{}, args ...interface{}) *cache2go.CacheItem {
      // 如果是redis开头的key,先从redis中获取
      if strings.HasPrefix(key.(string), "redis") {
         return cache2go.NewCacheItem(key, 0, Item{
            Name: "redis_item",
         })
      }
      return nil
   })

   // 写入一条数据
   redisItemCache.Add("item1", 0, Item{
      Name: "item1",
   })

   item1, _ := redisItemCache.Value("item1")
   log.Printf("item1 = %#v", item1)

   redisItem, _ := redisItemCache.Value("redis_item")
   log.Printf("redisItem = %#v", redisItem)
}
输出结果
2022/10/17 21:59:37 item1 = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"item1", data:main.Item{Name:"item1", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601954b8, ext:492934, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d260196840, ext:497913, loc:(*time.Location)(0x11858c0)}, accessCount:1, aboutToExpire:[]func(interface {})(nil)}
2022/10/17 21:59:37 redisItem = &cache2go.CacheItem{RWMutex:sync.RWMutex{w:sync.Mutex{state:0, sema:0x0}, writerSem:0x0, readerSem:0x0, readerCount:0, readerWait:0}, key:"redis_item", data:main.Item{Name:"redis_item", Prices:0, Stocks:0}, lifeSpan:0, createdOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessedOn:time.Time{wall:0xc0cb75d2601d34e8, ext:746274, loc:(*time.Location)(0x11858c0)}, accessCount:0, aboutToExpire:[]func(interface {})(nil)}
2.源码分析

2.1.项目结构

在这里插入图片描述

核心代码文件为:

  • cachetable.go:封装了CacheTable结构体,实现Cache列表相关操作API
  • cacheitem.go:封装了CacheItem结构体,实现了Cache对象相关操作API
  • cache.go:提供cache全局map,存储了CacheTable结构体与table名称映射

2.2.数据结构

  • CacheTable
type CacheTable struct {
   sync.RWMutex

   // The table's name.
   name string
   // All cached items.
   items map[interface{}]*CacheItem

   // Timer responsible for triggering cleanup.
   cleanupTimer *time.Timer
   // Current timer duration.
   cleanupInterval time.Duration

   // The logger used for this table.
   logger *log.Logger

   // Callback method triggered when trying to load a non-existing key.
   loadData func(key interface{}, args ...interface{}) *CacheItem
   // Callback method triggered when adding a new item to the cache.
   addedItem []func(item *CacheItem)
   // Callback method triggered before deleting an item from the cache.
   aboutToDeleteItem []func(item *CacheItem)
}
  • CacheItem
type CacheItem struct {
   sync.RWMutex

   // The item's key.
   key interface{}
   // The item's data.
   data interface{}
   // How long will the item live in the cache when not being accessed/kept alive.
   lifeSpan time.Duration

   // Creation timestamp.
   createdOn time.Time
   // Last access timestamp.
   accessedOn time.Time
   // How often the item was accessed.
   accessCount int64

   // Callback method triggered right before removing the item from the cache
   aboutToExpire []func(key interface{})
}

2.3.API代码流程

1.Cache

位于cache.go文件,维护了全局CacheTable Map。

var (
   // 全局cache map
   cache = make(map[string]*CacheTable)
   // cache map 读写锁
   mutex sync.RWMutex
)

// 从cache map中获取对应的CacheTable,不存在则创建新的
func Cache(table string) *CacheTable {
   // 先上读锁,获取cacheTable
   mutex.RLock()
   t, ok := cache[table]
   mutex.RUnlock()

   // 不存在,则新建
   if !ok {
      // 写操作需要上写锁
      mutex.Lock()
      t, ok = cache[table]
      // 双重校验是否存在
      if !ok {
         // 不存在则新建cacheTable
         t = &CacheTable{
            name:  table,
            items: make(map[interface{}]*CacheItem),
         }
         cache[table] = t
      }
      mutex.Unlock()
   }
   return t
}

2.Add

位于cachetable.go文件,是CacheTable结构体的方法之一,实现了添加KV缓存的逻辑。

func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
   // 封装一个item
   item := NewCacheItem(key, lifeSpan, data)

   // 锁表,将item添加进去
   table.Lock()
   table.addInternal(item)

   return item
}

func (table *CacheTable) addInternal(item *CacheItem) {
   // 添加kv值到map中
   table.items[item.key] = item

   expDur := table.cleanupInterval
   addedItem := table.addedItem
   // 添加完成解除写锁
   table.Unlock()

   // 触发Add回调函数
   if addedItem != nil {
      for _, callback := range addedItem {
         callback(item)
      }
   }

   // 如果一个item有设置过期时间,且比检查失效间隔小,则进行过期key清理(懒加载思想,只有存在这类Key才会启动清理,而不是定时任务)
   if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
      table.expirationCheck()
   }
}

func (table *CacheTable) expirationCheck() {
   table.Lock()
   if table.cleanupTimer != nil {
      table.cleanupTimer.Stop()
   }
   if table.cleanupInterval > 0 {
      table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
   } else {
      table.log("Expiration check installed for table", table.name)
   }
   
   now := time.Now()
   smallestDuration := 0 * time.Second
   for key, item := range table.items {
      // 遍历该table下的所有items
      item.RLock()
      lifeSpan := item.lifeSpan
      accessedOn := item.accessedOn
      item.RUnlock()

      if lifeSpan == 0 {
         continue
      }
      if now.Sub(accessedOn) >= lifeSpan {
         // 该item已超出存活时间,删除key
         table.deleteInternal(key)
      } else {
         // 找到最小的需要过期的item,计算最优时间间隔
         if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
            smallestDuration = lifeSpan - now.Sub(accessedOn)
         }
      }
   }

   // 在最优时间间隔后启动定时任务检查table的过期key
   table.cleanupInterval = smallestDuration
   if smallestDuration > 0 {
      table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
         go table.expirationCheck()
      })
   }
   table.Unlock()
}

3.Value

Value用于读取Key匹配的CacheItem。

func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
   table.RLock()
   // 先尝试从items中获取该item
   r, ok := table.items[key]
   loadData := table.loadData
   table.RUnlock()

   if ok {
      // 如果存在,则更新该item的accessOn时间和accessCount计数
      r.KeepAlive()
      return r, nil
   }

   // 如果不存在,则从loadData自定义加载函数中尝试获取
   if loadData != nil {
      item := loadData(key, args...)
      if item != nil {
         // 如果自定义加载函数中存在该item,则添加到table中并返回
         table.Add(key, item.lifeSpan, item.data)
         return item, nil
      }

      return nil, ErrKeyNotFoundOrLoadable
   }

   return nil, ErrKeyNotFound
}

4.Delete

Delete函数用于删除指定Key的Item。

func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
   table.Lock()
   defer table.Unlock()

   return table.deleteInternal(key)
}

func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
   // 判断key是否存在
   r, ok := table.items[key]
   if !ok {
      return nil, ErrKeyNotFound
   }
   
   aboutToDeleteItem := table.aboutToDeleteItem
   table.Unlock()

   // 先触发删除回调函数
   if aboutToDeleteItem != nil {
      for _, callback := range aboutToDeleteItem {
         callback(r)
      }
   }

   r.RLock()
   defer r.RUnlock()
   // 触发item的过期回调函数
   if r.aboutToExpire != nil {
      for _, callback := range r.aboutToExpire {
         callback(key)
      }
   }

   table.Lock()
   table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
   // 将item从table中删除
   delete(table.items, key)

   return r, nil
}

5.Flush

清空整个table的cache。

func (table *CacheTable) Flush() {
   table.Lock()
   defer table.Unlock()

   table.log("Flushing table", table.name)

   // 直接将items重新初始化
   table.items = make(map[interface{}]*CacheItem)
   table.cleanupInterval = 0
   if table.cleanupTimer != nil {
      // 如果此时还有清理过期定时器,则终止其运行
      table.cleanupTimer.Stop()
   }
}
3.总结

cache2go这个项目写得很精简,本质上就是使用到了map来作为本地缓存kv存储结构,但是有一些值得学习的地方:

  • 使用map时,由于其是非线程安全的,所以在并发场景下需要上锁,可以选择RWMutex读写锁来控制并发的读和串行写,避免panic
  • 对于需要清理过期Key的场景,如果使用定时任务定时遍历整个集合来做清理,会耗费较多时间和资源,可以由写入时判断是否存在需要清理的Key,再启动定时任务来做清理,避免频繁遍历
  • 可以利用golang函数式的特性,方便地实现各操作回调函数,比如添加、删除、失效操作回调等
  • 另外个人觉得这个项目可以优化的空间:由于使用的是本地缓存,为了避免内存oom可以在创建时指定限制key的最大数量,以及在内存不足时的写入策略(如直接报错或者随机清理掉一批Key等)。