做单元测试的时候,我们需要打桩mock掉一些中间件,miniredis是mock掉redis的一个利器它既可以通过非网络本地mock,也可通过tcp请求,经过redis协议完整mockredis代码,首先看下如何使用:
package mainimport ("github.com/alicebob/miniredis/v2""github.com/gomodule/redigo/redis""fmt""testing""time")func TestSomething(t *testing.T) {s := miniredis.RunT(t)// Optionally set some keys your code expects:s.Set("foo", "bar")s.HSet("some", "other", "key")// Run your code and see if it behaves.// An example using the redigo library from "github.com/gomodule/redigo/redis":c, err := redis.Dial("tcp", s.Addr())fmt.Println(err)_, err = c.Do("SET", "foo", "bar")// Optionally check values in redis...if got, err := s.Get("foo"); err != nil || got != "bar" {t.Error("'foo' has the wrong value")}// ... or use a helper for that:s.CheckGet(t, "foo", "bar")// TTL and expiration:s.Set("foo", "bar")s.SetTTL("foo", 10*time.Second)s.FastForward(11 * time.Second)if s.Exists("foo") {t.Fatal("'foo' should not have existed anymore")}}
可以通过 s := miniredis.RunT(t)启动一个模拟的redis server服务器,然后直接本地set值 s.Set("foo", "bar")
当然也可以通过redis协议进行远程设置,比如我们通过 "github.com/gomodule/redigo/redis"的redis客户端来进行设置。
c, err := redis.Dial("tcp", s.Addr())_, err = c.Do("SET", "foo", "bar")
接着我们分析下看它的源码是如何实现的:
s := miniredis.RunT(t)本质是是启动了一个tcp服务器,然后注册了一系列处理函数,根据每个不同的redis命令进行了不同的处理,server的初始化位于github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func RunT(t Tester) *Miniredis {m := NewMiniRedis()if err := m.Start(); err != nil {t.Cleanup(m.Close)
func NewMiniRedis() *Miniredis {m := Miniredis{dbs: map[int]*RedisDB{},scripts: map[string]string{},subscribers: map[*Subscriber]struct{}{},}m.signal = sync.NewCond(&m)
核心结构体定义如下:
type RedisDB struct {master *Miniredis // pointer to the lock in Miniredisid int // db idkeys map[string]string // Master map of keys with their typestringKeys map[string]string // GET/SET &c. keyshashKeys map[string]hashKey // MGET/MSET &c. keyslistKeys map[string]listKey // LPUSH &c. keyssetKeys map[string]setKey // SADD &c. keyshllKeys map[string]*hll // PFADD &c. keyssortedsetKeys map[string]sortedSet // ZADD &c. keysstreamKeys map[string]*streamKey // XADD &c. keysttl map[string]time.Duration // effective TTL valueskeyVersion map[string]uint // used to watch values}
type Miniredis struct {sync.Mutexsrv *server.Serverport intpasswords map[string]string // username passworddbs map[int]*RedisDBselectedDB int // DB id used in the direct Get(), Set() &c.scripts map[string]string // sha1 -> lua srcsignal *sync.Condnow time.Time // time.Now() if not set.subscribers map[*Subscriber]struct{}rand *rand.RandCtx context.ContextCtxCancel context.CancelFunc}
然后启动server开始服务
func (m *Miniredis) Start() error {s, err := server.NewServer(fmt.Sprintf("127.0.0.1:%d", m.port))if err != nil {return err}return m.start(s)}
github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
type Server struct {l net.Listenercmds map[string]CmdpreHook Hookpeers map[net.Conn]struct{}mu sync.Mutexwg sync.WaitGroupinfoConns intinfoCmds int}
每接受一个请求,起一个协程来处理请求:
func newServer(l net.Listener) *Server {s := Server{cmds: map[string]Cmd{},peers: map[net.Conn]struct{}{},l: l,}s.wg.Add(1)go func() {defer s.wg.Done()s.serve(l)
func (s *Server) serve(l net.Listener) {for {conn, err := l.Accept()if err != nil {return}s.ServeConn(conn)
func (s *Server) ServeConn(conn net.Conn) {s.peers[conn] = struct{}{}go func() {defer s.wg.Done()defer conn.Close()s.servePeer(conn)
解析请求命令,然后分发进行处理:
func (s *Server) servePeer(c net.Conn) {go func() {defer close(readCh)for {args, err := readArray(r)if err != nil {peer.Close()return}readCh <- argsfor args := range readCh {s.Dispatch(peer, args)peer.Flush()
func (s *Server) Dispatch(c *Peer, args []string) {cb, ok := s.cmds[cmdUp]cb(c, cmdUp, args)
github.com/alicebob/miniredis/v2@v2.30.2/server/proto.go其中会解析redis协议得到一个个命令:
func readArray(rd *bufio.Reader) ([]string, error) {line, err := rd.ReadString('\n')switch line[0] {default:return nil, ErrProtocolcase '*':
比如执行命令s.Set("foo", "bar")的过程,本质上是将key对应的val存入一个map,其具体过程如下:
github.com/alicebob/miniredis/v2@v2.30.2/direct.go
func (m *Miniredis) Set(k, v string) error {return m.DB(m.selectedDB).Set(k, v)}
func (m *Miniredis) DB(i int) *RedisDB {m.Lock()defer m.Unlock()return m.db(i)}
// get DB. No locks!func (m *Miniredis) db(i int) *RedisDB {if db, ok := m.dbs[i]; ok {return db}db := newRedisDB(i, m) // main miniredis has our mutex.m.dbs[i] = &dbreturn &db}
github.com/alicebob/miniredis/v2@v2.30.2/direct.go先定位到db然后设置值:
func (db *RedisDB) Set(k, v string) error {db.master.Lock()defer db.master.Unlock()defer db.master.signal.Broadcast()if db.exists(k) && db.t(k) != "string" {return ErrWrongType}db.del(k, true) // Remove expiredb.stringSet(k, v)
func (db *RedisDB) del(k string, delTTL bool) {t := db.t(k)delete(db.keys, k)switch t {case "string":delete(db.stringKeys, k)case "hash":delete(db.hashKeys, k)case "list":delete(db.listKeys, k)case "set":delete(db.setKeys, k)case "zset":delete(db.sortedsetKeys, k)case "stream":delete(db.streamKeys, k)case "hll":delete(db.hllKeys, k)default:panic("Unknown key type: " + t)}func (db *RedisDB) t(k string) string {return db.keys[k]}
github.com/alicebob/miniredis/v2@v2.30.2/db.go比如字符串的设置如下:
func (db *RedisDB) stringSet(k, v string) {db.del(k, false)db.keys[k] = "string"db.stringKeys[k] = vdb.keyVersion[k]++
比如我们用redis客户端连接后c, err := redis.Dial("tcp", s.Addr()) 发送命令_, err = c.Do("SET", "foo", "bar")后,server端会接受命令,然后用提前注册好的命令处理函数来处理github.com/alicebob/miniredis/v2@v2.30.2/server/server.go
func NewServer(addr string) (*Server, error) {l, err := net.Listen("tcp", addr)
func newServer(l net.Listener) *Server {s.serve(l)
在这里会取出对应的命令处理函数:
func (s *Server) Dispatch(c *Peer, args []string) {h := s.preHookcb, ok := s.cmds[cmdUp]cb(c, cmdUp, args)
命令的注册函数是:
func (s *Server) Register(cmd string, f Cmd) error {cmd = strings.ToUpper(cmd)s.cmds[cmd] = f
命令注册的位置位于:github.com/alicebob/miniredis/v2@v2.30.2/cmd_generic.go
func commandsGeneric(m *Miniredis) {m.srv.Register("COPY", m.cmdCopy)m.srv.Register("DEL", m.cmdDel)
github.com/alicebob/miniredis/v2@v2.30.2/cmd_string.go
func commandsString(m *Miniredis) {m.srv.Register("GET", m.cmdGet)
比如get命令的处理函数如下
func (m *Miniredis) cmdGet(c *server.Peer, cmd string, args []string) {if !m.handleAuth(c) {return}if m.checkPubsub(c, cmd) {return}withTx(m, c, func(c *server.Peer, ctx *connCtx) {db := m.db(ctx.selectedDB)c.WriteBulk(db.stringGet(key))
func (db *RedisDB) stringGet(k string) string {if t, ok := db.keys[k]; !ok || t != "string" {return ""}return db.stringKeys[k]}
命令注册发生在服务启动的过程中github.com/alicebob/miniredis/v2@v2.30.2/miniredis.go
func (m *Miniredis) start(s *server.Server) error {commandsConnection(m)commandsGeneric(m)commandsServer(m)commandsString(m)