文|

刘月财

seata-go 项目担任人

北京小桔科技有限公司【滴滴】开发工程师

赵新(花名:于雨)

蚂蚁集团 Seata 项目开源担任人

本文5343字 阅览14分钟

布景

Seata 四种业务形式中,AT 业务形式是阿里体系首创的业务形式,对业务无侵入,也是 Seata 用户最多的一种业务形式,兼具易用性与高功用。

现在,Seata 社区正大力推动其多言语版别建造,Go、PHP、JS 和 Python 四个言语版别基本完成了 TCC 业务形式的完成。参照 Seata v1.5.2 版别的 AT 形式的完成,并结合 Seata 官方文档,本文测验从代码视点详解 Seata AT 业务形式的具体流程,意图是整理 Seata Java 版别 AT 形式的完成细节后,在多言语版别后续开发中,优先完成 AT 业务形式。

1、什么是 AT 形式?

AT 形式是一种二阶段提交的分布式业务形式,它采用了本地 undo log 的办法来数据在修正前后的状况,并用它来完成回滚。从功用上来说,AT 形式由于有 undo log 的存在,一阶段履行完能够立即开释锁和衔接资源,吞吐量比 XA 形式高。用户在运用 AT 形式的时分,只需求装备好对应的数据源即可,业务提交、回滚的流程都由 Seata 主动完成,对用户业务简直没有侵略,运用便当。

2、AT 形式与 ACID 和 CAP

谈论数据库的业务形式,一般都会先谈论业务相关的 ACID 特性,但在分布式场景下,还需求考虑其 CAP 性质。

2.1 AT 与 ACID

数据库业务要满意原子性、一致性、持久性以及阻隔性四个性质,即 ACID 。在分布式业务场景下,一般地,首先保证原子性和持久性,其次保证一致性,阻隔性则由于其运用的不同数据库的锁、数据 MVCC 机制以及相关业务形式的差异, 具有多种阻隔等级,如 MySQL 本身业务就有读未提交(Read Uncommitted)、读已提交(Read Committed)、可重复读(Repeatable Read)、序列化(Serializable)等四种阻隔等级。

2.1.1 AT形式的读阻隔

在数据库本地业务阻隔等级读已提交(Read Committed) 或以上的基础上,Seata(AT 形式)的默许大局阻隔等级是读未提交(Read Uncommitted)

假如应用在特定场景下,有必要要求大局的读已提交,现在 Seata 的办法是经过 SELECT FOR UPDATE 句子的署理。

SELECT FOR UPDATE 句子的履行会查询大局锁,假如大局锁被其他业务持有,则开释本地锁(回滚 SELECT FOR UPDATE 句子的本地履行)并重试。这个过程中,查询是被 block 住的,直到大局锁拿到,即读取的相关数据是已提交的,才返回。

出于总体功用上的考虑,Seata 现在的方案并没有对一切 SELECT 句子都进行署理,仅针对 FOR UPDATE 的 SELECT 句子。

具体比方参阅 Seata 官网:seata.io/zh-cn/docs/…

2.1.2 AT 形式的写阻隔

AT 会对写操作的 SQL 进行阻拦,提交本地业务前,会向 TC 获取大局锁,未获取到大局锁的情况下,不能进行写,以此来保证不会发生写抵触:

一阶段本地业务提交前,需求保证先拿到大局锁

拿不到大局锁,不能提交本地业务;

大局锁的测验被约束在一定范围内,超出范围将放弃,并回滚本地业务,开释本地锁。

具体比方参阅 Seata 官网:seata.io/zh-cn/docs/…

2.2 AT 与 CAP

Seata 一切的业务形式在一般情况下,是需求保证 CP,即一致性和分区容错性,由于分布式业务的核心便是要保证数据的一致性(包含弱一致性)。比方,在一些买卖场景下,涉及到多个体系的金额的改动,保证一致功用够防止体系产生资损。

分布式体系不可防止地会出现服务不可用的情况,如 Seata 的 TC 出现不可用时,用户或许期望经过服务降级,优先保证整个服务的可用性,此刻 Seata 需求从 CP 体系转换为一个保证 AP 的体系。

比方,有一个服务是给用户端供给用户修正信息的功用,假设此刻 TC 服务出现问题,为了不影响用户的运用体验,咱们期望服务仍然可用,只不过一切的 SQL 的履行降级为不走大局业务,而是当做本地业务履行。

AT 形式默许优先保证 CP,但供给了装备通道让用户在 CP 和 AP 两种形式下进行切换:

装备文件的tm.degrade-check参数,其值为true则分支业务保证 AP,反之保证 CP;

手动修正装备中心的service.disableGlobalTransaction特点为true,则封闭大局业务完成 AP。

3、AT 数据源署理

在 AT 形式中,用户只需求装备好 AT 的署理数据源即可, AT 的一切流程都在署理数据源中完成,对用户无感知。

AT 数据源署理的全体类结构如下图:

Seata AT 模式代码级详解

AT 业务数据源署理类结构图【fromseata.io/zh-cn/docs/…

AT 的数据源署理中,别离对方针数据库的 DataSource 、 Connection 和 Statement 进行了署理,在履行方针 SQL 动作之前,完成了 RM 资源注册、 undo log 生成、分支业务注册、分支业务提交/回滚等操作,而这些操刁难用户并无感知。

下面的时序图中,展示了 AT 形式在履行过程中,这几个署理类的动作细节:

Seata AT 模式代码级详解

注:图片主张在 PC 端检查

4、AT 形式流程

以下是 AT 形式的全体流程,从这儿能够看到分布式业务各个关键动作的履行机遇,每个动作细节,咱们后边来评论:

Seata AT 模式代码级详解

注:图片主张在 PC 端检查

4.1 一阶段

在 AT 形式的第一阶段, Seata 会经过署理数据源,阻拦用户履行的业务 SQL ,假设用户没有敞开业务,会主动敞开一个新业务。假如业务 SQL 是写操作(增、删、改操作)类型,会解析业务 SQL 的语法,生成 SELECT SQL 句子,把要被修正的记载查出来,保存为 “before image” 。然后履职业务 SQL ,履行完后用相同的原理,将现已被修正的记载查出来,保存为 “after image” ,至此一个 undo log 记载就完整了。随后 RM 会向 TC 注册分支业务, TC 侧会新加锁记载,锁能够保证 AT 形式的读、写阻隔。RM 再将 undo log 和业务 SQL 的本地业务提交,保证业务 SQL 和保存 undo log 记载 SQL 的原子性。

Seata AT 模式代码级详解

4.2 二阶段提交

AT 形式的二阶段提交,TC 侧会将该业务的锁删去,然后通知 RM 异步删去 undo log 记载即可。

Seata AT 模式代码级详解

4.3 二阶段回滚

假如 AT 形式的二阶段是回滚,那么 RM 侧需求依据一阶段保存的 undo log 数据中的 before image 记载,经过逆向 SQL 的办法,对在一阶段修正过的业务数据进行复原即可。

但是在复原数据之前,需求进行脏数据校验。由于在一阶段提交后,到现在进行回滚的中心这段时刻,该记载有或许被别的业务改动过。校验的办法,便是用 undo log 的 after image 和现在数据库的数据做比较,假设数据一致,阐明没有脏数据;不一致则阐明有脏数据,出现脏数据就需求人工进行处理了。

Seata AT 模式代码级详解

5、关键代码模块

如下是 AT 形式整个流程的首要模块,咱们从中能够了解开发 AT 形式需求做哪些工作:

Seata AT 模式代码级详解

5.1 Undo log数据格局

undo log 存在表 undo_log 表中,undo_log 表的表结构如下:

Seata AT 模式代码级详解

rollback_info 寄存了业务数据修正前后的内容,数据表寄存的是经过紧缩后的格局,他的明文格局如下:

{
    "branchId":2828558179596595558,
    "sqlUndoLogs":[
        {
            "afterImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":70
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "beforeImage":{
                "rows":[
                    {
                        "fields":[
                            {
                                "keyType":"PRIMARY_KEY",
                                "name":"id",
                                "type":4,
                                "value":3
                            },
                            {
                                "keyType":"NULL",
                                "name":"count",
                                "type":4,
                                "value":100
                            }
                        ]
                    }
                ],
                "tableName":"stock_tbl"
            },
            "sqlType":"UPDATE",
            "tableName":"stock_tbl"
        }
    ],
    "xid":"192.168.51.102:8091:2828558179596595550"
}

5.2 UndoLogManager

UndoLogManager 担任 undo log 的新加、删去、回滚操作,不同的数据库有不同的完成(不同数据库的 SQL 语法会不同),公共逻辑放在了 AbstractUndoLogManager 抽象类中,全体的类承继关系如下图:

Seata AT 模式代码级详解

注:图片主张在 PC 端检查

刺进和删去 undo log 的逻辑都比较简单,直接操作数据表就行。这儿重点看下回滚 undo log 的逻辑:

Seata AT 模式代码级详解

源码剖析如下:

@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
	Connection conn = null;b
		ResultSet rs = null;
	PreparedStatement selectPST = null;
	boolean originalAutoCommit = true;
	for (; ; ) {
		try {
			conn = dataSourceProxy.getPlainConnection();
			// The entire undo process should run in a local transaction.
			// 敞开本地业务,保证删去undo log和恢复业务数据的SQL在一个业务中commit
			if (originalAutoCommit = conn.getAutoCommit()) {
				conn.setAutoCommit(false);
			}
			// Find UNDO LOG
			selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
			selectPST.setLong(1, branchId);
			selectPST.setString(2, xid);
			// 查出branchId的一切undo log记载,用来恢复业务数据
			rs = selectPST.executeQuery();
			boolean exists = false;
			while (rs.next()) {
				exists = true;
				// It is possible that the server repeatedly sends a rollback request to roll back
				// the same branch transaction to multiple processes,
				// ensuring that only the undo_log in the normal state is processed.
				int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
				// 假如state=1,阐明能够回滚;state=1阐明不能回滚
				if (!canUndo(state)) {
					if (LOGGER.isInfoEnabled()) {
						LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
					}
					return;
				}
				String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
				Map<String, String> context = parseContext(contextString);
				byte[] rollbackInfo = getRollbackInfo(rs);
				String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
				// 依据serializer获取序列化东西类
				UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
					: UndoLogParserFactory.getInstance(serializer);
				// 反序列化undo log,得到业务记载修正前后的明文
				BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
				try {
					// put serializer name to local
					setCurrentSerializer(parser.getName());
					List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
					if (sqlUndoLogs.size() > 1) {
						Collections.reverse(sqlUndoLogs);
					}
					for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
						TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
								conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
						sqlUndoLog.setTableMeta(tableMeta);
						AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
								dataSourceProxy.getDbType(), sqlUndoLog);
						undoExecutor.executeOn(conn);
					}
				} finally {
					// remove serializer name
					removeCurrentSerializer();
				}
			}
			// If undo_log exists, it means that the branch transaction has completed the first phase,
			// we can directly roll back and clean the undo_log
			// Otherwise, it indicates that there is an exception in the branch transaction,
			// causing undo_log not to be written to the database.
			// For example, the business processing timeout, the global transaction is the initiator rolls back.
			// To ensure data consistency, we can insert an undo_log with GlobalFinished state
			// to prevent the local transaction of the first phase of other programs from being correctly submitted.
			// See https://github.com/seata/seata/issues/489
			if (exists) {
				deleteUndoLog(xid, branchId, conn);
				conn.commit();
				if (LOGGER.isInfoEnabled()) {
					LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
							State.GlobalFinished.name());
				}
			} else {
				// 假如不存在undo log,或许是由于分支业务还未履行完成(比方,分支业务履行超时),TM发起了回滚大局业务的恳求。
				// 这个时分,往undo_log表刺进一条记载,能够使分支业务提交的时分失败(undo log)
				insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
				conn.commit();
				if (LOGGER.isInfoEnabled()) {
					LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
							State.GlobalFinished.name());
				}
			}
			return;
		} catch (SQLIntegrityConstraintViolationException e) {
			// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
			if (LOGGER.isInfoEnabled()) {
				LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
			}
		} catch (Throwable e) {
			if (conn != null) {
				try {
					conn.rollback();
				} catch (SQLException rollbackEx) {
					LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
				}
			}
			throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
					.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
						branchId, e.getMessage()), e);
		} finally {
			try {
				if (rs != null) {
					rs.close();
				}
				if (selectPST != null) {
					selectPST.close();
				}
				if (conn != null) {
					if (originalAutoCommit) {
						conn.setAutoCommit(true);
					}
					conn.close();
				}
			} catch (SQLException closeEx) {
				LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
			}
		}
	}
}

备注:需求特别注意下,当回滚的时分,发现 undo log 不存在,需求往 undo_log 表新加一条记载,防止由于 RM 在 TM 宣布回滚恳求后,又成功提交分支业务的场景。

5.3 Compressor 紧缩算法

Compressor 接口界说了紧缩算法的标准,用来紧缩文本,节省存储空间:


public interface Compressor {
    /**
     * compress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] compress(byte[] bytes);
    /**
     * decompress byte[] to byte[].
     * @param bytes the bytes
     * @return the byte[]
     */
    byte[] decompress(byte[] bytes);
}

现在现已完成的紧缩算法有如下这些:

Seata AT 模式代码级详解

5.4 UndoLogParser 序列化算法

Serializer 接口界说了序列化算法的标准,用来序列化代码:

public interface UndoLogParser {
    /**
     * Get the name of parser;
     * 
     * @return the name of parser
     */
    String getName();
    /**
     * Get default context of this parser
     * 
     * @return the default content if undo log is empty
     */
    byte[] getDefaultContent();
    /**
     * Encode branch undo log to byte array.
     *
     * @param branchUndoLog the branch undo log
     * @return the byte array
     */
    byte[] encode(BranchUndoLog branchUndoLog);
    /**
     * Decode byte array to branch undo log.
     *
     * @param bytes the byte array
     * @return the branch undo log
     */
    BranchUndoLog decode(byte[] bytes);
}

现在现已完成的序列化算法有如下这些:

Seata AT 模式代码级详解

5.5 Executor 履行器

Executor 是 SQL 履行的进口类, AT 在履行 SQL 前后,需求管理 undo log 的 image 记载,首要是构建 undo log ,包含依据不同的业务 SQL ,来组装查询 undo log 的 SQL 句子;履行查询 undo log 的 SQL ,获取到镜像记载数据;履行刺进 undo log 的逻辑(未提交业务)。

public interface Executor<T> {​    /**     * Execute t.     *     * @param args the args     * @return the t     * @throws Throwable the throwable     */    T execute(Object... args) throws Throwable;}

针对不同的业务 SQL ,有不同的 Executor 完成,首要是由于不同操作/不同数据库类型的业务 SQL ,生成 undo log 的 SQL 的逻辑不同,所以都别离重写了 beforeImage() 和 afterImage() 办法。全体的承继关系如下图所示:

Seata AT 模式代码级详解

注:图片主张在 PC 端检查

为了直观地看到不同类型的 SQL 生成的 before image SQL 和 after iamge SQL ,这儿做个整理。假设方针数据表的结构如下:


public interface Executor<T> {
    /**
     * Execute t.
     *
     * @param args the args
     * @return the t
     * @throws Throwable the throwable
     */
    T execute(Object... args) throws Throwable;
}

Seata AT 模式代码级详解

注:图片主张在 PC 端检查

5.6 AsyncWorker

AsyncWorker 是用来做异步履行的,用来做分支业务提交和 undo log 记载删去等操作。

Seata AT 模式代码级详解

6、关于功用

并不存在某一种完美的分布式业务机制能够习惯一切场景,完美满意一切需求。无论 AT 形式、TCC 形式仍是 Saga 形式,本质上都是对 XA 标准在各种场景下安全性或者功用的缺乏的改善。Seata 不同的业务形式是在一致性、可靠性、易用性、功用四个特性之间进行不同的取舍。

近期 Seata 社区发现有同行,在未具体剖析 Java 版别 AT 形式的代码的具体完成的情况下,仅对某个前期的 Go 版别的 Seata 进行短链接压测后,质疑 AT 模型的功用及其数据安全性,请具有一定思辨能力的用户朋友们在接受这个定论前细心查阅其测试办法与测试对象,区分好 “李鬼” 与 “李逵”。

实际上,这个前期的 Go 版别完成仅参照了 Seata v1.4.0,且未严格把 Seata AT 形式的一切功用都予以完成。话说回来,即便其推崇的 Seata XA 形式,其也依赖于单 DB 的XA 形式。而当下最新版别的 MySQL XA 业务形式的 BUG 仍然很多,这个地基并没有其幻想中的那样百分百稳固。

由阿里与蚂蚁集团共建的 Seata,是咱们多年内部分布式业务工程实践与技术经历的结晶,开源出来后得到了多达 150+ 以上职业同行生产环境的验证。开源大路既长且宽,这个道路上能够有机动车道也有非机动车道,还能够有人行道,我们携手把道路拓展延伸,而非站在人行道上宣扬机动车道危险性高且车速慢。

7、总结

Seata AT 形式依赖于各个 DB 厂商的不同版别的 DB Driver(数据库驱动),每种数据库发布新版别后,其 SQL 语义及其运用形式都或许发生改动。跟着近年 Seata 被其用户们广泛应用于多种业务场景,在开发者们的努力下,Seata AT 形式保持了编程接口与其 XA 形式简直一致,适配了简直一切的干流数据库,并覆盖了这些数据库的首要盛行版别的 Driver:真实做到了把分布式体系的 “复杂性”留在了结构层面,把易用性和高功用交给了用户。

当然,Seata Java 版别的 XA 和 AT 形式还有许多需求完善与改善的地方,遑论其它多言语版别的完成。欢迎对 Seata 及其多言语版别建造感兴趣的同行参与到 Seata 的建造中来,共同努力把 Seata 打造成一个标准化分布式业务平台。

本周推荐阅览

Go内存泄漏,pprof 够用了么?

Go 原生插件运用问题全解析

Go 代码城市上云–KusionStack 实践

Seata-php 半年规划