Loading...
正在加载...
请稍候

Golang实现Redis本地原子性事务

✨步子哥 (steper) 2025年09月17日 07:45
Golang实现Redis本地原子性事务

Golang实现Redis本地原子性事务

securityRedis事务的两个重要保证

为了支持多个命令的原子性执行,Redis提供了事务机制。Redis官方文档中称事务带有以下两个重要的保证:

事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。

error_outlineRedis事务中的错误处理

我们在使用事务的过程中可能会遇到两类错误:

  1. 在命令入队过程中出现语法错误
  2. 在命令执行过程中出现运行时错误,比如对string类型的key进行lpush操作

在遇到语法错误时,Redis会中止命令入队并丢弃事务。在遇到运行时错误时,Redis仅会报错然后继续执行事务中剩下的命令,不会像大多数数据库那样回滚事务。

对此,Redis官方的解释是:

Redis命令只会因为错误的语法而失败(并且这些问题不能在入队时发现),或是命令用在了错误类型的键上面:这也就是说,从实用性的角度来说,失败的命令是由编程错误造成的,而这些错误应该在开发的过程中被发现,而不应该出现在生产环境中。

因为不需要对回滚进行支持,所以Redis的内部可以保持简单且快速。

有种观点认为Redis处理事务的做法会产生bug,然而需要注意的是,在通常情况下,回滚并不能解决编程错误带来的问题。举个例子,如果你本来想通过INCR命令将键的值加上1,却不小心加上了2,又或者对错误类型的键执行了INCR,回滚是没有办法处理这些情况的。鉴于没有任何机制能避免程序员自己造成的错误,并且这类错误通常不会在生产环境中出现,所以Redis选择了更简单、更快速的无回滚方式来处理事务。

build在Godis中实现事务的考虑

接下来我们尝试在Godis中实现具有原子性、隔离性的事务。

事务的特性

事务的原子性具有两个特点:

  1. 事务执行过程不可被其它事务(线程)插入
  2. 事务要么完全成功要么完全不执行,不存在部分成功的状态

事务的隔离性是指事务中操作的结果是否对其它并发事务可见。由于KV数据库不存在幻读问题,因此我们需要避免脏读和不可重复度问题。

锁机制设计

与Redis的单线程引擎不同,godis的存储引擎是并行的,因此需要设计锁机制来保证执行多条命令执行时的原子性和隔离性。

实现一个常规命令需要提供3个函数:

  • ExecFunc:是实际执行命令的函数
  • PrepareFunc:在ExecFunc前执行,负责分析命令行读写了哪些key便于进行加锁
  • UndoFunc:仅在事务中被使用,负责准备undo logs以备事务执行过程中遇到错误需要回滚

其中的PrepareFunc会分析命令行返回要读写的key,以prepareMSet为例:

go
// return writtenKeys, readKeys
func prepareMSet(args [][]byte) ([]string, []string) {
    size := len(args) / 2
    keys := make([]string, size)
    for i := 0; i < size; i++ {
        keys[i] = string(args[2*i])
    }
    return keys, nil
}

结合LockMap即可完成加锁。由于其它协程无法获得相关key的锁所以不可能插入到事务中,所以我们实现了原子性中不可被插入的特性。

重要:事务需要把所有key一次性完成加锁,只有在事务提交或回滚时才能解锁。不能用到一个key就加一次锁用完就解锁,这种方法可能导致脏读。

例如,以下场景会导致脏读:

时间 事务1 事务2
t1 锁定key A
t2 修改key A
t3 解锁key A
t4 锁定key A
t5 读取key A
t6 提交 解锁key A

如上图所示,t4时刻,事务2读到了事务1未提交的数据,出现了脏读异常。

undo回滚机制的实现

为了在遇到运行时错误时事务可以回滚(原子性),可用的回滚方式有两种:

  1. 保存修改前的value,在回滚时用修改前的value进行覆盖
  2. 使用回滚命令来撤销原命令的影响。举例来说:键A原值为1,调用了Incr A之后变为了2,我们可以再执行一次Set A 1命令来撤销incr命令

出于节省内存的考虑,我们最终选择了第二种方案。比如HSet命令只需要另一条HSet将field改回原值即可,若采用保存value的方法我们则需要保存整个HashMap。类似情况的还有LPushRPop等命令。

有一些命令可能需要多条命令来回滚,比如回滚Del时不仅需要恢复对应的key-value还需要恢复TTL数据。或者Del命令删除了多个key时,也需要多条命令进行回滚。综上我们给出UndoFunc的定义:

go
// UndoFunc returns undo logs for the given command line
// execute from head to tail when undo
type UndoFunc func(db *DB, args [][]byte) []CmdLine

我们以可以回滚任意操作的rollbackGivenKeys为例进行说明,当然使用rollbackGivenKeys的成本较高,在可能的情况下尽量实现针对性的undo log。

go
func rollbackGivenKeys(db *DB, keys ...string) []CmdLine {
    var undoCmdLines [][][]byte
    for _, key := range keys {
        entity, ok := db.GetEntity(key)
        if !ok {
            // 原来不存在 key 删掉
            undoCmdLines = append(undoCmdLines, utils.ToCmdLine("DEL", key), )
        } else {
            undoCmdLines = append(undoCmdLines, 
                utils.ToCmdLine("DEL", key), // 先把新 key 删除掉
                aof.EntityToCmd(key, entity).Args, // 把 DataEntity 序列化成命令行
                toTTLCmd(db, key).Args, 
            )
        }
    }
    return undoCmdLines
}

接下来看一下EntityToCmd,非常简单易懂:

go
func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply {
    if entity == nil {
        return nil
    }
    var cmd *protocol.MultiBulkReply
    switch val := entity.Data.(type) {
    case []byte:
        cmd = stringToCmd(key, val)
    case *List.LinkedList:
        cmd = listToCmd(key, val)
    case *set.Set:
        cmd = setToCmd(key, val)
    case dict.Dict:
        cmd = hashToCmd(key, val)
    case *SortedSet.SortedSet:
        cmd = zSetToCmd(key, val)
    }
    return cmd
}
go
var hMSetCmd = []byte("HMSET")

func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply {
    args := make([][]byte, 2+hash.Len()*2)
    args[0] = hMSetCmd
    args[1] = []byte(key)
    i := 0
    hash.ForEach(func(field string, val interface{}) bool {
        bytes, _ := val.([]byte)
        args[2+i*2] = []byte(field)
        args[3+i*2] = bytes
        i++
        return true
    })
    return protocol.MakeMultiBulkReply(args)
}

visibilityRedis Watch命令的实现

Redis Watch命令用于监视一个(或多个)key,如果在事务执行之前这个(或这些)key被其他命令所改动,那么事务将被放弃。

实现Watch命令的核心是发现key是否被改动,我们使用简单可靠的版本号方案:为每个key存储一个版本号,版本号变化说明key被修改了。

设计思想:通过版本号机制,我们可以在事务执行前检查被监视的key是否被修改,从而决定是否继续执行事务。这种机制简单高效,适合于Redis这种高性能的键值存储系统。

总结

在Golang中实现Redis的本地原子性事务,需要考虑以下几个关键点:

  1. 锁机制:由于godis的存储引擎是并行的,需要设计锁机制来保证执行多条命令执行时的原子性和隔离性
  2. 回滚机制:使用回滚命令来撤销原命令的影响,而不是保存修改前的value,以节省内存
  3. Watch命令:通过版本号机制实现,用于监视key是否被修改

通过以上机制,我们可以在Golang中实现具有原子性、隔离性的Redis事务,保证数据的一致性和完整性。

讨论回复

1 条回复
✨步子哥 (steper) #1
09-17 08:06
Golang实现Redis之RDB文件格式

Golang实现Redis之RDB文件格式

RDB文件使用二进制方式存储Redis内存中的数据,具有体积小、加载快的优点。本文主要介绍RDB文件的结构和编码方式,并借此探讨二进制编解码和文件处理方式。

architectureRDB文件的整体结构

我们可以将RDB文件划分为四个部分:文件头、元属性区、数据区和结尾。

文件头

包含Magic Number和版本号两部分:

  • RDB文件以ASCII编码的'REDIS'开头作为魔数(File Magic Number)表示自身的文件类型
  • 接下来的4个字节表示RDB文件的版本号
元属性区

保存诸如文件创建时间、创建它的Redis实例的版本号、文件中键的个数等信息

数据格式为:RDB_OPCODE_AUX(0xFA) + key + value

数据区

按照数据库来组织,开头为当前数据库的编号和数据库中的键个数,随后是数据库中各键值对

Redis定义了一系列RDB_OPCODE来存储一些特殊信息

结尾

RDB_OPCODE_EOF表示RDB文件结尾,剩下的部分是RDB的CRC64校验码

codeRDB中的各种编码

Length Encoding

Length Encoding是一种可变长度的无符号整型编码,因为通常被用来存储字符串长度、列表长度等长度数据所以被称为Length Encoding。

Length Encoding的四种类型:

  1. 如果前两位是00,那么下面剩下的6位就表示具体长度
  2. 如果前两位是01,那么会再读取一个字节的数据,加上前面剩下的6位,共14位用于表示具体长度
  3. 如果前两位是10,如果剩下的6位都是0那么后面32个字节表示具体长度。如果剩下的6位为000001,那么后面的64个字节表示具体长度
  4. 如果前两位是11,表示为使用字符串存储整数的特殊编码
go
func (dec *Decoder) readLength() (uint64, bool, error) {
    firstByte, err := dec.readByte() // 先读一个字节
    if err != nil {
        return 0, false, fmt.Errorf("read length failed: %v", err)
    }
    lenType := (firstByte & 0xc0) >> 6 // 取前两位
    var length uint64
    special := false
    switch lenType {
    case len6Bit: // 前两位是 00
        length = uint64(firstByte) & 0x3f // 读剩余 6 位
    case len14Bit: // 前两位是01
        nextByte, err := dec.readByte()
        if err != nil {
            return 0, false, fmt.Errorf("read len14Bit failed: %v", err)
        }
        length = (uint64(firstByte)&0x3f)<<8 | uint64(nextByte)
    case len32or64Bit: // 前两位是 10
        if firstByte == len32Bit {
            err = dec.readFull(dec.buffer[0:4]) // 接下来的 4 个字节 32 位表示具体长度
            if err != nil {
                return 0, false, fmt.Errorf("read len32Bit failed: %v", err)
            }
            length = uint64(binary.BigEndian.Uint32(dec.buffer))
        } else if firstByte == len64Bit {
            err = dec.readFull(dec.buffer) // 接下来的 8 个字节 64 位表示具体长度
            if err != nil {
                return 0, false, fmt.Errorf("read len64Bit failed: %v", err)
            }
            length = binary.BigEndian.Uint64(dec.buffer)
        } else {
            return 0, false, fmt.Errorf("illegal length encoding: %x", firstByte)
        }
    case lenSpecial: // 前两位为 11
        special = true
        length = uint64(firstByte) & 0x3f
    }
    return length, special, nil
}

String Encoding

RDB的String Encoding可以分为三种类型:

编码类型 描述
简单字符串编码 由普通长度编码 + 字符串的ASCII序列组成
整数字符串 以特殊长度编码开头,直接存储整数值
LZF压缩字符串 由表示压缩后长度的Length Encoding + 表示压缩前长度的Length Encoding + 压缩后的二进制数据三部分组成
go
func (dec *Decoder) readString() ([]byte, error) {
    length, special, err := dec.readLength()
    if err != nil {
        return nil, err
    }
    if special { // 前两位为 11 时 special = true
        switch length { // 此时的 length 为第一个字节的后 6 位
        case encodeInt8: // 第一个字节为 0xc0
            // 第一个字节后 6 位为 000000,表示下一个字节为补码表示的整数
            b, err := dec.readByte()
            return []byte(strconv.Itoa(int(b))), err
        case encodeInt16: // 第一个字节为 0xc1
            b, err := dec.readUint16()
            return []byte(strconv.Itoa(int(b))), err
        case encodeInt32: // 第一个字节为 0xc2
            b, err := dec.readUint32()
            return []byte(strconv.Itoa(int(b))), err
        case encodeLZF: // 第一个字节为 0xc3
            // 读取 LZF 压缩字符串
            return dec.readLZF()
        default:
            return []byte{}, errors.New("Unknown string encode type ")
        }
    }
    res := make([]byte, length)
    err = dec.readFull(res)
    return res, err
}

List Encoding & Set Encoding & Hash Encoding

List Encoding:开头为一个普通长度编码块表示List的长度,随后是对应个数的String Encoding块。

Set Encoding:与List Encoding完全相同。

Hash Encoding:开头为一个普通长度编码块表示哈希表中的键值对个数,随后为对应个数的:Key String Encoding + Value String Encoding组成的键值对。

ZSet Encoding & ZSet2 Encoding

这两种表示有序集合方式非常类似,开头是一个普通长度编码块表示元素数,随后是对应个数的表示score的float值 + 表示member的String Encode。

唯一的区别是,ZSet的score采用字符串来存储浮点数,ZSet2使用IEEE 754规定的二进制格式存储float。

IntSet Encoding

当Set中的元素全部为整数时,Redis可能使用IntSet编码进行存储。

IntSet在逻辑上是升序排列的整数列表,它的前4字节表示其中整数编码格式,4-8位表示元素数,其后为对应个数的整数。

go
// IntSet 示例:0200 0000 0400 0000 0100 0200 0300 0400
// 前4个字节 0200 0000 是一个小端序的32位整数2,表示使用2个字节表示一个整数
// 第4至8字节 0400 0000 是小端序的整数4表示IntSet中有4个元素
// 接下来是4个小端序表示的整数 0100 0200 0300 0400 即1、2、3、4

// 编码IntSet
buf := make([]byte, 8, 8+int(intSize)*len(values))
binary.LittleEndian.PutUint32(buf[0:4], intSize) // 写入 intSize=2 表示使用2个字节表示一个整数
binary.LittleEndian.PutUint32(buf[4:8], uint32(len(values))) // 写入元素数
for _, value := range intList {
    switch intSize {
    case 2:
        binary.LittleEndian.PutUint16(enc.buffer[0:2], uint16(value))
        buf = append(buf, enc.buffer[0:2]...)
    // 省略其它 intSize 的源码
    }
}
// buf 中是原始的 int set,需要调用 writeString 将其编码为 StringEncoding 然后写入
err = enc.writeString(string(buf))

zipList

ziplist是一种非常紧凑的顺序结构,它将数据和编码信息存储在一段连续空间中。在RDB文件中除了list结构外,hash、sorted set结构也会使用ziplist编码。

ziplist结构:

  • zlbytes:整个ziplist所占的字节数,包括自己所占的4个字节
  • zltail:表示从ziplist开头到最后一个entry开头的偏移量,从而可以在O(1)时间内访问尾节点
  • zllen:表示ziplist中entry的个数
  • entry:ziplist中元素,格式为
  • zlend:表示ziplist的结束,固定为255(0xff)

entry中的prevlen表示前一个entry的长度,用于从尾节点开始向前遍历。前节点长度小于254时,占用1字节用来表示前节点长度,否则占用5字节。

lightbulb总结

RDB文件是Redis持久化的一种方式,通过精心设计的二进制格式存储数据,具有体积小、加载快的优点。RDB文件的结构清晰,分为文件头、元属性区、数据区和结尾四个部分,其中数据区按照数据库组织,存储了各种数据类型的键值对。

RDB文件使用了多种编码方式来优化存储空间,包括Length Encoding、String Encoding、List Encoding、Set Encoding、Hash Encoding、ZSet Encoding、IntSet Encoding和zipList等。这些编码方式根据数据的特点和大小,选择最合适的存储方式,从而在保证数据完整性的同时,最小化存储空间。

理解RDB文件格式和编码方式,对于Redis的持久化机制、数据恢复以及性能优化都有重要意义。同时,这些编码思想也可以应用到其他需要高效存储和传输数据的场景中。