Golang 实现 Redis 之二: Redis 通信协议与解析器的实现
本文是 《用 Golang 实现一个 Redis》系列文章第二篇,本文将分别介绍Redis 通信协议 以及 协议解析器 的实现,若您对协议有所了解可以直接阅读协议解析器部分。

Redis 通信协议

Redis 自 2.0 版本起使用了统一的协议 RESP (REdis Serialization Protocol),该协议易于实现,计算机可以高效的进行解析且易于被人类读懂。

RESP 是一个二进制安全的文本协议,工作于 TCP 协议上。RESP 以行作为单位,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)作为换行符。

\0\0\0
\r\n

RESP 定义了5种格式:

  • 简单字符串(Simple String): 服务器用来返回简单的结果,比如"OK”。非二进制安全,且不允许换行。
  • 错误信息(Error): 服务器用来返回简单的错误信息,比如"ERR Invalid Synatx”。非二进制安全,且不允许换行。
  • 整数(Integer): llen、scard 等命令的返回值, 64位有符号整数
  • 字符串(Bulk String): 二进制安全字符串, 比如 get 等命令的返回值
  • 数组(Array, 又称 Multi Bulk Strings): Bulk String 数组,客户端发送指令以及 lrange 等命令响应的格式

RESP 通过第一个字符来表示格式:

$*
$
$3\r\nSET\r\n

Bulk String 是二进制安全的可以包含任意字节,就是说可以在 Bulk String 内部包含 “\r\n” 字符(行尾的CRLF被隐藏):

$4
a\r\nb
$-1$-1
["foo", "bar"]
*2
$3
foo
$3
bar
SET key value
*3
$3
SET
$3
key
$5
value

将换行符打印出来:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
协议解析器

们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现,协议解析器将实现其 Handler 接口充当应用层服务器。

[][]byte"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"['SET', 'key', 'value']
CRLF
bufioreader.ReadBytes('\n')
二进制安全CRLFSET "a\r\nb" 1
*3  
$3
SET
$4
a\r\nb 
$7
myvalue
ReadBytes
*3  
$3
SET
$4
a  // 错误的分行
b // 错误的分行
$7
myvalue
$4ReadBytes('\n')io.ReadFull(reader, msg)
msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2
_, err = io.ReadFull(reader, msg)

首先们来定义解析器的接口:

// Payload stores redis.Reply or error
type Payload struct {
    Data redis.Reply
    Err  error
}

// ParseStream 通过 io.Reader 读取数据并将结果通过 channel 将结果返回给调用者
// 流式处理的接口适合供客户端/服务端使用
func ParseStream(reader io.Reader) <-chan *Payload {
    ch := make(chan *Payload)
    go parse0(reader, ch)
    return ch
}

// ParseOne 解析 []byte 并返回 redis.Reply 
func ParseOne(data []byte) (redis.Reply, error) {
    ch := make(chan *Payload)
    reader := bytes.NewReader(data)
    go parse0(reader, ch)
    payload := <-ch // parse0 will close the channel
    if payload == nil {
        return nil, errors.New("no reply")
    }
    return payload.Data, payload.Err
}

接下来们可以看一下解析器核心流程的伪代码,您可以在parser.go看到完整代码:

func parse0(reader io.Reader, ch chan<- *Payload) {
    // 初始化读取状态
    readingMultiLine := false
    expectedArgsCount := 0
    var args [][]byte
    var bulkLen int64
    for {
        // 上文中们提到 RESP 是以行为单位的
        // 因为行分为简单字符串和二进制安全的BulkString,们需要封装一个 readLine 函数来兼容
        line, err = readLine(reader, bulkLen)
        if err != nil { 
            // 处理错误
            return
        }
        // 接下来们对刚刚读取的行进行解析
        // 们简单的将 Reply 分为两类:
        // 单行: StatusReply, IntReply, ErrorReply
        // 多行: BulkReply, MultiBulkReply

        if !readingMultiLine {
            if isMulitBulkHeader(line) {
                // 们收到了 MulitBulkReply 的第一行
                // 获得 MulitBulkReply 中 BulkString 的个数
                expectedArgsCount = parseMulitBulkHeader(line)
                // 等待 MulitBulkReply 后续行
                readingMultiLine = true
                // 们收到了 BulkReply 的第一行
                // 获得 BulkReply 第二行的长度, 通过 bulkLen 告诉 readLine 函数下一行 BulkString 的长度
                bulkLen = parseBulkHeader()
                // 这个 Reply 中一共有 1 个 BulkString
                expectedArgsCount = 1 
                // 等待 BulkReply 后续行
                readingMultiLine = true
                // 处理 StatusReply, IntReply, ErrorReply 等单行 Reply
                reply := parseSingleLineReply(line)
                // 通过 ch 返回结果
                emitReply(ch)
            }
            // 进入此分支说明们正在等待 MulitBulkReply 或 BulkReply 的后续行
            // MulitBulkReply 的后续行有两种,BulkHeader 或者 BulkString
            if isBulkHeader(line) {
                bulkLen = parseBulkHeader()
                // 们正在读取一个 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                args = append(args, line)
            }
            if len(args) == expectedArgsCount { // 们已经读取了所有后续行
                // 通过 ch 返回结果
                emitReply(ch)
                // 重置状态, 准备解析下一条 Reply
                readingMultiLine = false
                expectedArgsCount = 0
                args = nil
                bulkLen = 0
            }
        }
    }
}

贴一下工具函数的实现:

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
    var msg []byte
    var err error
    if state.bulkLen == 0 { // read simple line
        msg, err = bufReader.ReadBytes('\n')
        if err != nil {
            return nil, true, err
        }
        if len(msg) == 0 || msg[len(msg)-2] != '\r' {
            return nil, false, errors.New("protocol error: " + string(msg))
        }
    } else { // read bulk line (binary safe)
        msg = make([]byte, state.bulkLen+2)
        _, err = io.ReadFull(bufReader, msg)
        if err != nil {
            return nil, true, err
        }
        if len(msg) == 0 ||
        msg[len(msg)-2] != '\r' ||
            msg[len(msg)-1] != '\n' {
            return nil, false, errors.New("protocol error: " + string(msg))
        }
        state.bulkLen = 0
    }
    return msg, false, nil
}

func parseMultiBulkHeader(msg []byte, state *readState) error {
    var err error
    var expectedLine uint64
    expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
    if err != nil {
        return errors.New("protocol error: " + string(msg))
    }
    if expectedLine == 0 {
        state.expectedArgsCount = 0
        return nil
        // first line of multi bulk reply
        state.msgType = msg[0]
        state.readingMultiLine = true
        state.expectedArgsCount = int(expectedLine)
        state.args = make([][]byte, 0, expectedLine)
        return nil
        return errors.New("protocol error: " + string(msg))
    }
}

func parseBulkHeader(msg []byte, state *readState) error {
    var err error
    state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
    if err != nil {
        return errors.New("protocol error: " + string(msg))
    }
    if state.bulkLen == -1 { // null bulk
        return nil
        state.msgType = msg[0]
        state.readingMultiLine = true
        state.expectedArgsCount = 1
        state.args = make([][]byte, 0, 1)
        return nil
        return errors.New("protocol error: " + string(msg))
    }
}

func parseSingleLineReply(msg []byte) (redis.Reply, error) {
    str := strings.TrimSuffix(string(msg), "\n")
    str = strings.TrimSuffix(str, "\r")
    var result redis.Reply
    switch msg[0] {
    case '+': // status reply
        result = reply.MakeStatusReply(str[1:])
    case '-': // err reply
        result = reply.MakeErrReply(str[1:])
    case ':': // int reply
        val, err := strconv.ParseInt(str[1:], 10, 64)
        if err != nil {
            return nil, errors.New("protocol error: " + string(msg))
        }
        result = reply.MakeIntReply(val)
    default:
        // parse as text protocol
        strs := strings.Split(str, " ")
        args := make([][]byte, len(strs))
        for i, s := range strs {
            args[i] = []byte(s)
        }
        result = reply.MakeMultiBulkReply(args)
    }
    return result, nil
}

原文创作:

原文链接:https://www.cnblogs.com/Finley/p/11923168.html