分布式事务解决方案
二阶段提交(2PC)
在分布式系统里,每个节点都可以知晓自己操作的成功或者失败,却无法知道其他节点操作的成功或失败。当一个事务跨多个节点时,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果。
(1)投票阶段(voting phase):参与者将操作结果通知协调者;
(2)提交阶段(commit phase):收到参与者的通知后,协调者再向参与者发出通知,根据反馈情况决定各参与者是否要提交还是回滚;
两阶段提交的正向流程
第一阶段:
- 事务协调者的节点会首先向所有的参与者节点发送Prepare请求。
- 接到Prepare请求之后,每一个参与者节点会各自执行与事务有关的数据更新,写入Undo Log和Redo Log,完成了事务提交外的所有任务。向事务协调节点返回"完成"的消息。

第二阶段:
当事务协调者接到了所有参与者的返回的完成消息。
- 如果所有参与者都返回正常完成,那么协调者将会向所有事务参与者发出Commit请求。
- 接到Commit请求之后,事务参与者节点会各自进行本地的事务提交,并释放锁资源。当本地事务完成提交后,将会向事务协调者返回"完成"消息。
- 当事务协调者接收到所有事务参与者的"完成"反馈,整个分布式事务完成。

两阶段调教失败处理流程
第一阶段:
某个事务参与者反馈失败消息,说明该节点的本地事务执行不成功,必须回滚

第二阶段:
事务协调节点向所有的事务参与者发送Abort(中止)请求。接收到Abort请求之后,各个事务参与者节点需要在本地进行事务的回滚操作,回滚操作依照Undo Log来进行。

总结
低并发强一致性
性能问题:
- 在第一阶段返回后,事务并没有提交,而是处于阻塞状态,要等到所有节点准备完毕,事务协调者才会发送提交通知,这中间将会一直占用数据库资源
- prepare和commit阶段都需要网络信息交流,会耗费时间。
协调者单点故障问题:2PC模型的核心,一旦事务协调者节点挂掉,参与者收不到提交或是回滚通知,参与者会一直处于中间状态无法完成事务。
丢失消息导致的不一致问题:第二个阶段,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。
XA协议(基于2PC的标准化方式)
XA 协议是实现 2PC 的一种标准化方式,AP 通过 TM 来定义事务操作,TM 和 RM 之间会通过 XA 规范进行通信,执行两阶段提交,而 AP 的资源是从 RM 拿的

- AP 应用程序,就是我们的应用,事务的发起者。
- RM 资源管理器:简单的认为就是数据库,具备事务提交和回滚能力,对应我们上面的 2PC 就是参与者。
- TM 事务管理器:就是协调者了,和每个 RM 通信。
先定义一个全局唯一的 XID,然后告知每个事务分支要进行的操作,可以看到图中执行了两个操作,分别是改名字和插入日志,等于先注册下要做的事情,通过 XA START XID 和 XA END XID 来包裹要执行的 SQL。

然后需要发送准备命令,来执行第一阶段,也就是除了事务的提交啥都干了的阶段。

然后根据准备的情况来选择执行提交事务命令还是回滚事务命令。

Percolator模型(基于2PC改进)
三阶段提交(3PC)
为了解决 2PC 同步阻塞和减少数据不一致的情况,相较于2PC多了一个询问阶段(其余阶段与2PC对应),该阶段仅是协调者去访问参与者是否能接请求,防止个别参与者不正常的情况下,其他参与者都执行了事务,锁定资源。、
3PC 在参与者处也引入了超时机制,这样在协调者挂了的情况下,如果已经到了提交阶段了,超时没收到协调者的情况的话就会自动提交事务,这就可能出现数据不一致(期望于询问阶段所有参与者没问题,执行任务也就没问题)

而 3PC 的出发点是为了解决 2PC 的缺点,但是多了一个阶段就多了一次通讯的开销,而且是绝大部分情况下无用的通讯。
虽说引入参与者超时来解决协调者挂了的阻塞问题,但是还是可能导致数据不一致。

代码补偿事务(TCC)
2PC 还是 3PC 都是依赖于数据库的事务提交和回滚而TCC 就是一种业务层面或者是应用层的两阶段提交。
针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。TCC模式要求从服务提供三个接口:Try、Confirm、Cancel。
第一阶段
- Try:检测是否满足条件及资源预留,在订单服务中,Try 接口可能会检查库存是否足够,并预留库存,但不实际扣减库存。
第二阶段
提交或回滚
- Confirm:try成功的话,真正执行业务,不作任何业务检查;只使用Try阶段预留的业务资源;Confirm操作满足幂等性。
- Cancel:try失败的话,释放Try阶段预留的业务资源;Cancel操作满足幂等性。

总结
与2PC的区别
- TCC业务有侵入,但是 TCC 没有资源的阻塞,每一个方法都是直接提交事务的,如果出错是通过业务层面的 Cancel 来进行补偿
- XA业务无侵入,但是 XA存在资源的阻塞,如果出错是通过数据库层面的回滚来进行补偿
三大注意

- 幂等问题:
- 因为网络调用无法保证请求一定能到达,所以都会有重调机制,因此对于 Try、Confirm、Cancel 三个方法都需要幂等实现。
- 如果所有try成功后,存在个别Confirm失败,只能是多次重试调失败了的 Confirm 直到成功为止,当超过重试次数之后,通过mq或其他手段记录到数据库,人工处理。
- 空回滚问题:
- 指的是 Try 方法由于网络问题没收到超时了,此时事务管理器就会发出 Cancel 命令,那么需要支持 Cancel 在未执行 Try 的情况下能正常的 Cancel。
- 悬挂问题:
- 这个问题也是指 Try 方法由于网络阻塞超时触发了事务管理器发出了 Cancel 命令,但是执行了 Cancel 命令之后 Try 请求到了,你说气不气。
TCC变体
对于没有提供try接口的服务,只能直接调用confirm,如果confirm执行失败,则调用cancel。也就是在第一阶段直接就执行完整个业务操作了,所以要重点关注回滚操作,如果回滚失败得有提醒,要人工介入等。

本地消息表
高并发最终一致,本地消息就是利用了本地事务,会在数据库中存放一张本地事务消息表,在进行本地事务操作中加入了本地消息的插入,即将业务的执行和将消息放入消息表中的操作放在同一个事务中提交
- 如果本地事务执行成功,消息肯定也插入成功,然后再调用其他服务,如果调用成功就修改这条本地消息的状态。
- 如果本地事务执行失败,会有一个后台线程扫描,发现这些状态的消息,会在重试次数内再次调用相应的服务,超过次数则特殊记录,待人工介入处理,所以需要保证服务的幂等性。

执行流程:
订单系统,添加一条订单和一条消息,在一个事务里提交。
订单系统,使用定时任务轮询查询状态为未同步的消息表,发送到 MQ,如果发送失败,就重试发送。
库存系统,接收 MQ 消息,修改库存表,需要保证幂等操作。
如果修改成功,调用 RPC 接口修改订单系统消息表的状态为已完成或者直接删除这条消息。
如果修改失败,可以不做处理,等待重试。

MQ 事务消息
RocketMQ事务消息
实现了最终一致性,不需要依赖本地数据库事务,解决的是本地事务的执行和发消息这两个动作满足事务的约束。
RocketMQ 的事务消息也可以被认为是一个两阶段提交,在事务开始的时候会先发送一个半消息(对 Consumer 是不可见,储存在了一个特殊队列)给 Broker。
发送完半消息之后再执行本地事务,再根据本地事务的执行结果来决定是向 Broker 发送提交消息,还是发送回滚消息。为了应对发送的消息丢失,半消息长时间存在,Broker 会定时的向 Producer (需要提供一个查询接口)来反查这个事务是否成功,如果没成功,会进行多次查询,因为有可能事务还在执行。如果成功那么就将半消息恢复到正常要发送的队列中,这样消费者就可以消费这条消息了。

在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
Kafka事务消息
和RocketMQ不一样,一次事务中需要发送多个消息的情况,保证多个消息之间的事务约束,即多条消息要么都发送成功,要么都发送失败。
在开始事务的时候,生产者会向事务协调者发起请求表示事务开启,事务协调者会将这个消息记录到特殊的日志-事务日志中,然后生产者再发送真正想要发送的消息,这里 Kafka 和 RocketMQ 处理不一样,Kafka 会像对待正常消息一样处理这些事务消息,由消费端来过滤这个消息,即在消费消息时排除开启事务的消息。
所有消息发送完毕之后,生产者会向事务协调者发送提交或者回滚请求,由事务协调者来进行两阶段提交,如果是提交那么会先执行预提交,即把事务的状态置为预提交然后写入事务日志,然后再向所有事务有关的分区写入一条类似事务结束的消息,这样消费端消费到这个消息的时候就知道事务好了,可以把消息放出来了。
AT模式(Seata实现)
- AT 模式就是两阶段提交,对于其存在的同步阻塞的问题进行了改进
- 第一阶段:直接就把事务提交了,直接释放了本地锁,和本地消息表有点类似,执行真正的事务操作中还会插入回滚日志,然后在一个事务中提交。
- 第二阶段-回滚:通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录,拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改根据配置策略处理,否则的话回滚数据
- 第二阶段-提交:异步和批量地删除相应 UNDO LOG 记录
- AT 模式默认全局是读未提交的隔离级别,如果应用在特定场景下,必需要求全局的读已提交 ,可以通过 SELECT FOR UPDATE 语句的代理。
回滚日志来源
通过框架代理 JDBC 的一些类,在执行 SQL 的时候解析 SQL 得到执行前的数据镜像,然后执行 SQL ,再得到执行后的数据镜像,然后把这些数据组装成回滚日志。再伴随这个本地事务的提交把回滚日志也插入到数据库的 UNDO_LOG 表中(所以数据库需要有一张UNDO_LOG 表)。
然后一阶段如果成功,那么二阶段可以异步的删除那些回滚日志,如果一阶段失败那么可以通过回滚日志来反向补偿恢复。
为了避免中间有其它线程修改了这条数据怎么办,在事务提交前需要拿到全局锁(可以理解为对这条数据的锁),然后才能顺利提交本地事务,如果一直拿不到那就需要回滚本地事务了。
步骤概述
此时有两个事务,分别是 tx1、和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的全局锁 ,本地提交释放本地锁。
tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁 。

可以看到 tx2 的修改被阻塞了,之后重试拿到全局锁之后就能提交然后释放本地锁。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的全局锁等锁超时,放弃全局锁并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程全局锁在 tx1 结束前一直是被 tx1 持有的,所以不会发生脏写的问题。

事务模型
Transaction Coordinator (TC):事务协调器,就是seata,维护全局事务的运行状态,负责协调并决定全局事务的提交或回滚。
Transaction Manager(TM): 资源管理器,标注全局@GlobalTransactional启动入口动作的微服务模块,它是事务的发起者,负责定义全局事务的范围,并根据TC 维护的全局事务和分支事务状态,做出开始事务、提交事务、回滚事务的决议
Resource Manager (RM):资源管理器(数据库本身),负责本地事务的注册,本地事务状态的汇报(投票),并且负责本地事务的提交和回滚。
XID:一个全局事务的唯一标识
其中,TM是一个分布式事务的发起者和终结者,TC负责维护分布式事务的运行状态,而RM则负责本地事务的运行。

TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
XID 在微服务调用链路的上下文中传播。
RM 向 TC 注册分支事务,接着执行这个分支事务并提交(重点:RM在第一阶段就已经执行了本地事务的提交/回滚),最后将执行结果汇报给TC
TM 根据 TC 中所有的分支事务的执行情况,发起全局提交或回滚决议。
TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
MT模式(Seata实现)
MT模式本质上是一种TCC方案,业务逻辑需要被拆分为 Prepare/Commit/Rollback 3 部分,形成一个 MT 分支,加入全局事务。
不依赖于底层数据资源的事务支持,需自定义prepare/commit/rollback操作,对业务有侵入

Saga 模式
假设有 N 个操作,直接从 T1 开始就是直接执行提交事务,然后再执行 T2,可以看到就是无锁的直接提交,到 T3 发现执行失败了,然后就进入 Compenstaing 阶段,开始一个一个倒回补偿了。
可以看到这种情况是不保证事务的隔离性的,并且 Saga 也有 TCC 的一样的注意点,需要空补偿,防悬挂和幂等。
而且极端情况下会因为数据被改变了导致无法回滚的情况。比如第一步给我打了 2 万块钱,我给取出来花了,这时候你回滚,我账上余额已经 0 了。
这种情况只能在业务流程上入手,就拿买皮肤的场景来说,正确的流程应该是先扣钱再给皮肤,钱到自己袋里先,皮肤没给成功用户自然而然会找过来,这时候再给他呗。

seata
快速构建
建表
在分布式数据库的每一个数据库节点下创建对应UNDO表:incubator-seata/script/server/db/mysql.sql at develop · apache/incubator-seata (github.com)
-- ------------------------------------------- MySQL建表语句 ------------------------------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);配置修改
先备份,再修改seata-server-2.0.0\conf\application.yml配置文件,启动nacos,执行seata bin目录下seata-server.bat命令,启动后访问local host:7091,登录账号和密码都为seata
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
server:
port: 7091
spring:
application:
name: seata-server
logging:
config: classpath:logback-spring.xml
file:
path: ${log.home:${user.home}/logs/seata}
extend:
logstash-appender:
destination: 127.0.0.1:4560
kafka-appender:
bootstrap-servers: 127.0.0.1:9092
topic: logback_to_logstash
console:
user:
username: seata
password: seata
seata:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP #后续自己在nacos里面新建,不想新建SEATA_GROUP,就写DEFAULT_GROUP
username: nacos
password: nacos
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP #后续自己在nacos里面新建,不想新建SEATA_GROUP,就写DEFAULT_GROUP
namespace:
cluster: default
username: nacos
password: nacos
store:
mode: db
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true
user: root
password: 123456
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**pom文件
<dependencies>
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--alibaba-seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--SpringBoot集成druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<!--mybatis和springboot整合-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<!--Mysql数据库驱动8 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>yml配置

seata:
registry:
type: nacos # 注册中心类型,这里使用 Nacos 作为注册中心
nacos:
server-addr: 127.0.0.1:8848 # Nacos 服务器地址
namespace: "" # Nacos 命名空间,默认为空
group: SEATA_GROUP # Nacos 服务分组,默认为 SEATA_GROUP
application: seata-server # 注册到 Nacos 中的应用名称
tx-service-group: default_tx_group #事务组,由它获得 TC(Transaction Coordinator)服务的集群名称
service:
vgroup-mapping: # 事务组与 TC 服务集群的映射关系
default_tx_group: default # 将事务组 default_tx_group 映射到 TC 服务集群 default
data-source-proxy-mode: AT # 数据源代理模式,使用 AT 模式(Automatic Transaction)
logging:
level:
io:
seata: info注解添加
在调用微服务入口上添加注解即可,如@GlobalTransactional(name = "zzyy-create-order",rollbackFor = Exception.class),基本上使用的都是AT模式。
@Slf4j
@Service
public class OrderServiceImpl implements OrderService
{
@Resource
private OrderMapper orderMapper;
@Resource//订单微服务通过OpenFeign去调用库存微服务
private StorageFeignApi storageFeignApi;
@Resource//订单微服务通过OpenFeign去调用账户微服务
private AccountFeignApi accountFeignApi;
@Override
@GlobalTransactional(name = "zzyy-create-order",rollbackFor = Exception.class) //AT
//@GlobalTransactional @Transactional(rollbackFor = Exception.class) //XA
public void create(Order order) {
//xid检查
String xid = RootContext.getXID();
//1. 新建订单
log.info("==================>开始新建订单"+"\t"+"xid_order:" +xid);
//订单状态status:0:创建中;1:已完结
order.setStatus(0);
int result = orderMapper.insertSelective(order);
//插入订单成功后获得插入mysql的实体对象
Order orderFromDB = null;
if(result > 0)
{
orderFromDB = orderMapper.selectOne(order);
//orderFromDB = orderMapper.selectByPrimaryKey(order.getId());
log.info("-------> 新建订单成功,orderFromDB info: "+orderFromDB);
System.out.println();
//2. 扣减库存
log.info("-------> 订单微服务开始调用Storage库存,做扣减count");
storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());
log.info("-------> 订单微服务结束调用Storage库存,做扣减完成");
System.out.println();
//3. 扣减账号余额
log.info("-------> 订单微服务开始调用Account账号,做扣减money");
accountFeignApi.decrease(orderFromDB.getUserId(), orderFromDB.getMoney());
log.info("-------> 订单微服务结束调用Account账号,做扣减完成");
System.out.println();
//4. 修改订单状态
//订单状态status:0:创建中;1:已完结
log.info("-------> 修改订单状态");
orderFromDB.setStatus(1);
Example whereCondition=new Example(Order.class);
Example.Criteria criteria=whereCondition.createCriteria();
criteria.andEqualTo("userId",orderFromDB.getUserId());
criteria.andEqualTo("status",0);
int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);
log.info("-------> 修改订单状态完成"+"\t"+updateResult);
log.info("-------> orderFromDB info: "+orderFromDB);
}
System.out.println();
log.info("==================>结束新建订单"+"\t"+"xid_order:" +xid);
}
}