Golang 实现 Redis(10): 本地原子性事务

持续创造,加快生长!这是我参加「日新计划 6 月更文挑战」的第27天,点击查看活动概况

为了支撑多个指令的原子性履行 Redis 供给了业务机制。 Redis 官方文档中称业务带有以下两个重要的确保:

  • 业务是一个独自的阻隔操作携程网站官网:业务中的一切指令都会序列化、按顺序地履行。业务在履行的过程携程网官网中,不会被其他客户端发送来的指令恳求所打断。
  • 业务是一个原子操作:业务中的指令要么携程网上订票火车票悉数被履行,要么悉数都不履行

redis.io/dredis的五种数据类型ocs源码时代/manual…

咱们在运用业务的过程中或许会遇到两类过错:

  1. 在指令入队过程中呈现语法过错
  2. 在指令履行过程中呈现运行时过错,比方对 strredis持久化ing 类型的 key 进行 lpush 操作

在遇到语法过错时 Redis 会间断指令入队并丢掉业务。在遇到运行时过错时 Redis 仅会报错然后持续履行业务中剩下的指令,不会像大多数数据库那样回滚业务。对此,Redis 官方的解说是:

Redis 指令只会因为过错的语法而失利(而且这些问题不能在入队时发现),或是指令用在了过错类型的键上面:这也就是说,从实用性的源码1688视点来说,失利的指命令行窗口怎么打开令是由编程过错形成的,而这些过错应该在开发携程网上订票飞机的过程中被发现,而不应该呈现在出产环境中。

因为不需求对回滚进行支撑命令行窗口怎么打开,所以 Redis 的内部能够坚持简略且快速。
有种观点以为命令行进入指定目录 Redis 处理业务的做法会产生 bug , 但是需求注意的是, 在通常情况下, 回滚并不能解决编程过错带来的问题。 举个比方, 如果你本来想经过源码1688 INCR 指令将键的值携程旅游网加上 1 , 却不小心加上了 2 , 又或许对过错类型的键履行了 INCR , 回滚是没有办法处理这些情况的。鉴于没有任何机制能避免程序员自己形成的过错, 而且这类过错通常不会在Redis出产环境中呈现, 所redis数据结构以 Redis 选择了更简略、更快速的无回滚办法来处理业务。

emmmm, 接下来咱们测验在 Godis 中完成具有原子性、阻隔性的业务吧。

业务的原子性具有两个特点:1. 业务履行过程不行被其它业务(线程)刺进 2. 业务要么完全成功要么完全不履行,不存在部分成功的状态
业务的阻隔性是指业务中操作的成果是否对其它并发业务可见。因为KV数据库不存在幻读问题,因而咱们需求避免脏读和不行重复度问题。

业务机制浅析

与 Redis 的单线程引擎不同 godis 的存储引擎是并行的,因而需求设计锁机制来确保履行多条指令履行时的原子性和阻隔性。

咱们在完成内存数据库Redis一文中提到:

完成一个常规指令需求供给3个函数:

  • ExecFunc 是实际履行指令的函数
  • PrepareFunc 在 ExecFunc 前履行,担任剖析指令行读写了哪些 key 便于进行加源码
  • UndoFunc 仅在业务中被运用redist,担任预备 undo logs 以备业务履行过程中遇到过错需求回滚。

其中的 PrepareFunc 会剖析指令行回来要读写的 key, 以 prepareMSe命令行怎么打开t 为例:

// return writtenKeys, readKeys
func prepareMSet(args [][]byte) ([]string, []string) {
	size := len(args) / 2
	keys := make([]string, size)
	for i := 0; i < size; i++ {
		keys[i] = string(args[2*i])
	}
	return keys, nil
}

结合完成内存数据库 中提到的 LockMap 即可完成加锁。因为其它协程无法取得相关 key 的锁所以不行能刺进到业务中,所以咱们完成了原源码编辑器子性中不行被刺进的特性。

业务需求把一切 key 一次性完成加锁, 只有在redis缓存业务提交或回滚时才干解锁。不能用到一个 key 就加一次锁用完就解锁,这种办法或许导致脏读:

时间业务1业务2
t1确定key A
t2修正key A
t3解锁key A
t4确定key A
t4读取key A
t5解锁key A携程旅游网
t6提交

携程旅行app官方下载上图所示 t4 时刻, 业务 2 读到了业务 1未提交命令行窗口的数据,呈现了脏读异常。

回滚

为了在遇到运行时过错时业务能够回滚(原子性),可用的回滚办法有两种:

  • 保存修正前的value, 在回滚时用修正前的value进行掩盖
  • 运用回滚指令来吊销原指令的影响。举例来说:键A原值为1,调用了Incr A 之后变为了2,咱们能够再履行一次Set A 1指令来吊销 incr 指令。

出于节约内存的考虑咱们最终选择了第二种计划。比方 HSet 指令只需求另一条 HSet 将 field 改回原值即可,若选用保存源码中的图片 value 的办法咱们则需求保存整个 HashMap。类似情况的还有 LPushRPop 等指令。

有一些指令或许需求多条指令来回滚,比方回滚 Del 时不仅需求恢复对应的 key-value 还需求恢复 TTL 数据。或许 Del 指令删除了多个 key 时,也需求多条指令进行回滚。综上咱们给出 UndoFunc 的界说:

// UndoFunc returns undo logs for the given command line
// execute from head to tail when undo
type UndoFunc func(db *DB, args [][]byte) []CmdLine

咱们以能够redis数据结构回滚恣意操作的redis的五种数据类型rollbackGivenKeys为例进行阐明,当然运用roll携程旅行app官方下载backGiredis数据结构venKeys的成本较高,在或许的情况下尽量完命令行快捷键成针对性的 undo log.

func rollbackGivenKeys(db *DB, keys ...string) []CmdLine {
	var undoCmdLines [][][]byte
	for _, key := range keys {
		entity, ok := db.GetEntity(key)
		if !ok {
			// 本来不存在 key 删掉
			undoCmdLines = append(undoCmdLines,
				utils.ToCmdLine("DEL", key),
			)
		} else {
			undoCmdLines = append(undoCmdLines,
				utils.ToCmdLine("DEL", key), // 先把新 key 删除去
				aof.EntityToCmd(key, entity).Args, // 把 DataEntity 序列化成指令行
				toTTLCmd(db, key).Args,
			)
		}
	}
	return undoCmdLines
}

接下来看一下 EntityToCmd, 非常简略易懂:

func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply {
	if entity == nil {
		return nil
	}
	var cmd *protocol.MultiBulkReply
	switch val := entity.Data.(type) {
	case []byte:
		cmd = stringToCmd(key, val)
	case *List.LinkedList:
		cmd = listToCmd(key, val)
	case *set.Set:
		cmd = setToCmd(key, val)
	case dict.Dict:
		cmd = hashToCmd(key, val)
	case *SortedSet.SortedSet:
		cmd = zSetToCmd(key, val)
	}
	return cmd
}
var hMSetCmd = []byte("HMSET")
func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply {
	args := make([][]byte, 2+hash.Len()*2)
	args[0] = hMSetCmd
	args[1] = []byte(key)
	i := 0
	hash.ForEach(func(field string, val interface{}) bool {
		bytes, _ := val.([]byte)
		args[2+i*2] = []byte(field)
		args[3+i*2] = bytes
		i++
		return true
	})
	return protocol.MakeMultiBulkReply(args)
}

Watch

Redis Watch 指令用于监视一个(或多个) key ,如果在业务履行之前这个(或这些) key 被其他指令所改动,那么业务将被抛弃。

完成 Watch 指令的核心是发现 key 是否被改动,咱们运用简略牢靠的版本号计划:为每个 key 存携程网飞机票预订官网储一个版本号,版本号改动阐明 key 被修正了:

// database/single_db.go
func (db *DB) GetVersion(key string) uint32 {
	entity, ok := db.versionMap.Get(key)
	if !ok {
		return 0
	}
	return entity.(uint32)
}
// database/transaciton.go
func Watch(db *DB, conn redis.Connection, args [][]byte) redis.Reply {
	watching := conn.GetWatching()
	for _, bkey := range args {
		key := string(bkey)
		watching[key] = db.GetVersion(key) // 将当前版本号存在 conn 目标中
	}
	return protocol.MakeOkReply()
}

在履行业务前比较版本号:

// database/transaciton.go
func isWatchingChanged(db *DB, watching map[string]uint32) bool {
	for key, ver := range watching {
		currentVersion := db.GetVersion(key)
		if ver != currentVersion {
			return true
		}
	}
	return false
}

源码导读

在了解业务相关机制后,咱们能够来看一下业务履行的核心代码Redis ExecMulti

func (db *DB) ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply {
	// 预备阶段
	// 运用 prepareFunc 获取业务要读写的 key
	writeKeys := make([]string, 0) // may contains duplicate
	readKeys := make([]string, 0)
	for _, cmdLine := range cmdLines {
		cmdName := strings.ToLower(string(cmdLine[0]))
		cmd := cmdTable[cmdName]
		prepare := cmd.prepare
		write, read := prepare(cmdLine[1:])
		writeKeys = append(writeKeys, write...)
		readKeys = append(readKeys, read...)
	}
	watchingKeys := make([]string, 0, len(watching))
	for key := range watching {
		watchingKeys = append(watchingKeys, key)
	}
	readKeys = append(readKeys, watchingKeys...)
	// 即将读写的 key 和被 watch 的 key 一起加锁
	db.RWLocks(writeKeys, readKeys)
	defer db.RWUnLocks(writeKeys, readKeys)
	// 检查被 watch 的 key 是否发生了改动
	if isWatchingChanged(db, watching) { // watching keys changed, abort
		return protocol.MakeEmptyMultiBulkReply()
	}
	// 履行阶段
	results := make([]redis.Reply, 0, len(cmdLines))
	aborted := false
	undoCmdLines := make([][]CmdLine, 0, len(cmdLines))
	for _, cmdLine := range cmdLines {
		// 在指令履行前再预备 undo log, 这样才干确保例如用 decr 回滚 incr 指令的完成能够正常工作
		undoCmdLines = append(undoCmdLines, db.GetUndoLogs(cmdLine))
		result := db.execWithLock(cmdLine)
		if protocol.IsErrorReply(result) {
			aborted = true
			// don't rollback failed commands
			undoCmdLines = undoCmdLines[:len(undoCmdLines)-1]
			break
		}
		results = append(results, result)
	}
	// 履行成功
	if !aborted { 
		db.addVersion(writeKeys...)
		return protocol.MakeMultiRawReply(results)
	}
	// 业务失利进行回滚
	size := len(undoCmdLines)
	for i := size - 1; i >= 0; i-- {
		curCmdLines := undoCmdLines[i]
		if len(curCmdLines) == 0 {
			continue
		}
		for _, cmdLine := range curCmdLines {
			db.execWithLock(cmdLine)
		}
	}
	return protocol.MakeErrReply("EXECABORT Transaction discarded because of previous errors.")
}

发表回复

提供最优质的资源集合

立即查看 了解详情