导言

本文为社区首发签约文章,14天内制止转载,14天后未获授权制止转载,侵权必究!

信任咱们关于业务问题都不生疏,在之前《MySQL业务篇》中曾详解过MySQL的业务机制,在传统的单库环境下开发,咱们可依托于MySQL所供给的业务机制,来保证单个业务内的一组操作,要么悉数履行成功,要么悉数履行失利。

例如一个下单业务中,假定由「扣减库存、新增订单」两个操作组成,在单库中经过MySQL供给的业务机制,能够保证该业务中任意操作履行呈现问题时,另一个操作改变的数据能够回滚,然后保证整库数据的一致性,避免发生库存扣了,但订单却未增加的状况呈现。

在传统的单体架构中做单库开发,数据的一致性能够经过InnoDB业务机制来保证,但当项目换到分布式架构的环境时,或许当项目换到分库分表的环境时,答案亦是如此吗?并非如此,在分布式环境下,由于每个库都维护着自己的业务机制,相互之间无法感知对方的业务,因而就会呈现分布式业务问题,这也是分布式体系中头疼多年的一个扎手问题!

本章的中心则是讲清楚分布式业务问题,以及该怎样去处理这种扎手问题,但实际现在关于分布式业务的处理计划现已非常红熟了,即Spring Cloud Alibaba中的Seata结构,以及更前期的GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...等结构都能够很好的处理分布式业务问题。

也正由于分布式业务问题的处理计划现已比较完善,根本上一个注解、几行代码、几行装备的作业,就能够轻松处理分布式业务问题,因而本章并非单纯去叙述这些结构的根本运用,而是从另一个视点来考虑分布式业务问题,即假定没有这些老练的处理问题,咱们遇到这个问题时又该怎样处理呢?接下来会与咱们一起,手把手的自己编写一个分布式业务结构。

由于触及到了分布式业务结构的手写,可能内容会比较倾向于底层原理的分享,我会尽量在把这些内容写的简略一点,一起关于每个中心段落也会画图示例,但本身这个体裁就比较硬核,因而想要彻底读懂这章,最好具有如下根底:

  • 分布式常识储藏:首要是指SpringCloud微服务与RPC长途调用的根本运用。
  • 网络常识储藏:首要是指Netty结构的运用、序列化常识、P2P网络通讯的原理。
  • Spring相关的常识储藏:首要是Transactional业务机制原理、AOP切面运用。
  • Java-JUC并发编程的常识储藏:首要是ThreadLocal、Condition、Lock、线程池技术。
  • 其他的常识储藏:首要是指自界说注解式开发、Maven打包技术、MySQL业务原理。

如若咱们不具有上述根底,实则也无需忧虑,通篇读下来应该大致原理也能够弄懂,本章要做的是让诸位知其然并知其所以然,对技术不要停留在单纯的运用层面,而应该恰当性的去参悟底层的完结原理,这才是咱们与其他开发者摆开距离的中心竞争力。

由于个人还并未阅览过Seata结构的源码,因而本章是之前在阅览LCN这个老牌分布式业务结构仿写的,所以很多完结是借鉴于LCN的部分完结,但LCNSeata-AT形式大致相同,因而诸位也可将本篇作为Seata-AT形式的原理篇来阅览,在本章末尾也会供给源码完结。终究,如若你关于手写结构系列的内容感兴趣,那也能够看看之前曾发布过的《手写SpringMVC结构》这篇文章。

一、何谓分布式业务问题?

首要来聊聊啥是分布式业务问题,由于现在的分布式/微服务体系开发中,根本上每个中心服务都会具有自己的独享库,也就是垂直分库的形式,以前面的比如来说,订单服务有订单DB,库存服务有库存DB,每个服务之间的数据库都是独立的。此刻先回顾本来单库环境中处理业务问题的办法,如下:

// 下单服务
@Transactional
public void placeAnOrder(){
    // 调用扣减库存的办法
    inventoryService.minusInventory();
    // 调用增加订单的办法
    orderService.insertOrder();
}

一个下单业务的伪代码如上,会先调用「扣减库存」的办法,接着再调用「新增订单」的办法,为了保证下单这组操作的数据一致性,一般会在办法上加一个@Transactional注解,这样就会将业务托管给Spring来担任,如下:

  • 在该办法履行时,Spring会首要向数据库发送一条begin敞开业务的指令。
  • 假如履行进程中呈现反常,Spring会向数据库发送一条rollback回滚业务的指令。
  • 假如履行一切正常,Spring会向数据库发送一条commit提交业务的指令。

Spring注解式业务的逻辑图如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

这种业务办理机制,在单体架构中显然非常好用,但放到分布式环境中,状况则不同,如下:
(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

由于分布式体系都会依据业务去拆分子体系/子服务,因而不同业务之间只能经过RPC的办法,长途调用对方所供给的API接口,假定这儿在库存服务本地的「扣减库存」办法上加一个@Transactional注解,一起在订单服务本地的「新增订单」办法也加一个@Transactional注解,Spring内部的处理逻辑如下:

  • 下单业务长途调用「减库存」接口时,Spring会先向库存DB发送一个begin指令敞开业务。
  • 当扣减库存的业务履行完结后,Spring会直接向库存DB发送一个commit指令提交业务。
  • 下单业务调用本地的「新增订单」办法时,Spring又会向订单DB发送begin指令敞开业务。
  • 当新增订单履行呈现反常时,Spring会向订单DB发送一个rollback指令回滚业务。

此刻剖析如上场景,下单业务理论上应该归于同一个业务,但之前《MySQL业务篇》聊到过,InnoDB的业务机制是依据Undo-log日志完结的,那么减库存发生的回滚记载会记载到库存DBUndo-log中,而新增订单发生的回滚记载则会记载到订单DBUndo-log中,此刻由于服务不同、库不同,因而相互之间无法感知到对方的业务。

当后续「新增订单」的操作履行呈现反常,Spring结构发送的rollback指令,就只能依据订单DB中的回滚记载去复原数据,此刻前面扣减过的库存数据就无法回滚,因而导致了整体数据呈现了不一致性。

1.1、分布式业务问题演示

前面简略叙述了分布式业务问题,但这样讲起来好像有些令人费脑,那接下来直接上个案例,实际感受一下分布式业务造成的数据不一致问题,这儿依据SpringCloud快速建立了一个微服务项目,为了节约篇幅就不带着诸位一起走简略的建立流程了,完好的源码地址会在终究给出,其间有订单、库存两个子服务,库存服务供给了一个减库存的接口,如下:

@RestController
@RequestMapping("/inventory")
public class InventoryAPI {
    // 注入本地的InventoryService
    @Autowired
    private InventoryService inventoryService;
    @RequestMapping("/minusInventory")
    public String minusInventory(Inventory inventory) {
        // 依据传入的产品ID先查询库存
        Inventory inventoryResult =
            inventoryService.selectByPrimaryKey(inventory.getInventoryId());
        // 假如库存缺乏则回来相应提示
        if (inventoryResult.getShopCount() <= 0) {
            return "库存缺乏,请联系卖家....";
        }
        // 假如产品还有剩下库存则对库存减一,接着修正数据库中的库存数量
        inventoryResult.setShopCount(inventoryResult.getShopCount() - 1);
        int n = inventoryService.updateByPrimaryKeySelective(inventoryResult);
        System.out.println("库存信息:" + inventoryResult.toString());
        // 扣减库存成功后,向客户端回来对应的提示
        if (n > 0) {
            return "端口:" + port + ",库存扣减成功!!!";
        }
        return "端口:" + port + ",库存扣减失利!!!";
    }
}
// 库存服务本地的InventoryService完结类
@Service
public class InventoryServiceImpl implements InventoryService {
    // 减库存会调用的修正办法,在上面增加了@Transactional注解
    @Override
    @Transactional
    public Integer updateByPrimaryKeySelective(Inventory record) {
        int i = inventoryMapper.updateByPrimaryKeySelective(record);
        return i;
    }
}

而订单服务中供给了一个下单接口,如下:

@RestController
@RequestMapping("/order")
public class OrderAPI {
    // 注入本地的OrderService
    @Autowired
    private OrderService orderService;
    // 库存服务的长途调用地址
    private static final String URL_PREFIX =
        "http://localhost:8002/inventory/minusInventory";
    // 担任长途调用的RestTemplate
    @Autowired
    private RestTemplate restTemplate;
    // 下单接口
    @RequestMapping("/placeAnOrder")
    public String placeAnOrder(){
        // 随便指定一个产品的ID
        String inventoryId = "92b1162a-eb7a-4d72-9645-dea3fe03c8e2";
        // 然后经过HttpClient调用库存服务的减库存接口
        String result = HttpClient.get(URL_PREFIX +
                "/minusInventory?inventoryId=" + inventoryId);
        System.out.println("\n调用减库存接口后的呼应成果:" + result + "\n");
        // 调用减库存接口成功后,向订单库中刺进一笔订单记载
        String orderId = UUID.randomUUID().toString();
        Order order = new Order(orderId,"黄金竹子","8888.88",inventoryId);
        Integer n = orderService.insertSelective(order);
        System.out.println("\n\n\n" + n + "\n\n\n");
        return "下单调用成功,需求处理事物.....";
    }
}
// 订单服务本地的OrderService完结类
@Service
public class OrderServiceImpl implements OrderService {
    // 新增订单会调用的刺进办法
    @Override
    @Transactional
    public Integer insertSelective(Order record) {
        // 刻意制作出一个反常
        int i = 100 / 0;
        return orderMapper.insertSelective(record);;
    }
}

要留意看,在orderService.insertSelective(order)刺进订单数据的办法中,咱们经过100/0手动制作了一个反常,以此来模仿出「扣减库存」履行成功、「新增订单」履行失利的场景,接着看看库存DB、订单DB中对应的库存表、订单表数据,如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

很显着,现在订单表中还没有任何数据,而库存表中仅有一条测验数据,但要留意:这两张表别离坐落db_inventory、db_order两个不同的库中,此刻「黄金竹子」的库存数量为100,现在别离发动库存服务、订单服务来做简略模仿:

  • 订单服务的下单接口:http://localhost:8001/order/placeAnOrder

这儿就直接用浏览器做测验,浏览器调用下单接口后,操控台的日志如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

两个服务对应的数据库中的数据如下:
(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

成果非常显着,此刻对应产品的库存扣掉了,但由于新增订单时呈现反常,所以订单却并未增加,终究造成了数据不一致问题,这也就是前面所说到的分布式业务问题,这也是分布式体系中,需求处理的一个扎手问题。

1.2、该怎样处理分布式业务问题呢?

早年间分布式架构并不像如今这么干流,一般只有一些互联网大厂才会运用,因而相关的技术生态和处理计划,并不像那么老练,而分布式业务问题,也成为了运用分布式架构不得不处理的扎手问题,在分布式业务问题被发现后,期间推出了各式各样的处理计划,但如今保存下来的干流计划共有四种:

  • ①依据Best Efforts 1PC形式处理分布式业务问题。
  • ②依据XA协议的2PC、3PC形式做大局业务操控。
  • ③依据TTC计划做业务补偿。
  • ④依据MQ完结业务的终究一致性。

但上述四种仅是办法论,也就是一些虚无缥缈的理论,想要运用时还得依据其概念去自己落地,但如今分布式/微服务生态现已非常红熟,所以也有很多现成的落地技术,早已能够处理分布式业务问题,如Seata、GTS、LCN、 Atomikos、RocketMQ、Sharding-Sphere...等结构,都供给了完善的分布式业务支撑,现在较为干流的是引进Seata结构处理,其内部供给了AT、TCC、XA、Seaga-XA四种形式,主推的是AT形式,运用起来也较为简略,大体进程如下:

  • ①引进相关的Maven依托。
  • ②修正相关的装备文件。
  • ③在需求保证分布式业务的办法上加一个@GlobalTransactional注解。

经过上述三步后,就能够轻松处理前期困扰大厂多年的分布式业务问题,是不是尤为轻松?其他的分布式业务结构运用进程也相差无几,引进依托、修正装备、加一个注解即可。

二、手写分布式业务结构的思路剖析

前面对分布式业务问题的描述讲了一大堆,但真实要处理对应问题时,好像仅靠一个注解就够了,这这这……,究竟是怎样处理的呢?信任许多运用Seata或其他分布式业务结构的小伙伴,心中不免都会发生这个疑惑。OK,咱们先假定现在没有这些老练的分布式业务结构,假如自己要处理分布式业务问题,又该怎样去完结呢?那么接下来就要点说说这块,真实让咱们做到知其然并知其所以然!

不过做任何作业得先有规划,没有提前做好预备与计划,任何事一般都不会有太好的成果,因而先剖析一下手写分布式业务结构的思路,把思路捋清楚之后,接着再去编写对应的代码完结。

前面讲的分布式业务问题,实质原因在于Spring是依托数据库本身的业务机制来保证数据一致性的,而两个服务对应着两个库,两个库各自都维护着自己的业务机制,一起无法感知对方的业务状况,终究造成库存服务先提交了业务,而订单服务后回滚业务的状况呈现。

所以想要自己处理分布式业务问题,首要就不能依托MySQL本身的业务机制来处理问题,关于业务的办理有必要要是大局性质的,也就是需求引进一个第三方来进行大局业务办理,而办理业务的这个人物,咱们将其称之为业务办理者。

已然需求把业务交给第三者办理,那么每个参加大局业务的子服务,其业务的操控权有必要要拿到,也就是不允许任何一个参加者私自提交或回滚业务,业务的操控权彻底交给业务办理者,一组大局业务的成果究竟是提交,仍是回滚,这点全权由业务办理者决议。

到这儿呈现了两个人物:业务办理者、业务参加者,所谓的业务参加者,即代表参加一个大局业务的子服务,如前面的下单业务中,库存服务和订单服务,就能够了解成是两个业务参加者。

由于业务参加者要把自己的履行状况告知给办理者,一起办理者需求把业务的终究处理告诉给每个参加者,所以办理者、参加者之间要能相互通讯,所以等会儿会选用Netty网络结构,完结点对点对端通讯。

但为了不影响业务,也就是减小代码的侵入性,关于业务参加者而言,改动的代码量越小越好,所以这儿能够用Maven构建一个本地工程,在这个工程中供给一个大局业务注解,然后将该工程打成一个依托包,其他需求运用分布式业务的子服务,直接在pom.xml中引进该依托即可,触及到分布式业务的办法,直接在业务办法上面加上注解。

OK,到这儿剖析出了大体进程,大体的逻辑图如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

有三个中心进程:

  • ①完结业务办理者,能够协调大局业务的参加者,等整个业务调用链履行完毕后决议业务处理计划。
  • ②完结业务参加者,能够向业务办理者中注册/参加大局业务、告知履行成果、接收终究处理计划。
  • ③构建Maven工程打成依托包,完结自界说注解,尽量对业务代码做到低入侵。

但上述仅是大体思路,下面来开端逐渐完结各个进程,每个进程中的细节会渐渐展开。

三、手写分布式业务结构实战

将完结的大约思路弄清楚后,接着先来完结一下业务办理者,完结经过Netty建立一个服务端,假如关于Netty还不了解的小伙伴,后续我会更新Netty结构的文章,这儿能够套用Java-NIO的概念去了解,代码如下:

// 业务办理者的发动类
public class Main {
    public static void main(String[] args){
        // 这个是自界说的一个服务端
        NettyServer nettyServer = new NettyServer();
        // 为其绑定IP和端口号
        nettyServer.start("localhost", 8080);
        System.out.println("\n>>>>>>业务办理者发动成功<<<<<\n");
    }
}
// Netty服务端 - 业务办理者
public class NettyServer {
    // 发动类
    private ServerBootstrap bootstrap = new ServerBootstrap();
    // NIO事情循环组
    private NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
    // 发动办法
    public void start(String host, int port) {
        try {
            // 调用下面的初始化办法
            init();
            // 绑定端口和IP
            bootstrap.bind(host, port).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    // 初始化办法
    private void init(){
        bootstrap.group(nioEventLoopGroup)
                .channel(NioServerSocketChannel.class)
                // 增加一个自界说的处理器
                .childHandler(new ServerInitializer());
    }
    // 封闭办法
    public void close(){
        nioEventLoopGroup.shutdownGracefully();
        bootstrap.clone();
    }
}

到这儿相对来说还比较简略,就是创立了一个服务端,然后绑定了一个IP和端口,最首要是上面NettyServer.init()办法,在里边增加了一个自界说的处理器,该处理器代码如下:

// NIO的通道处理器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 设置编码器、解码器、处理器
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new NettyServerHandler());
    }
}

前面增加的是解码器和编码器,首要用于数据传输时的编解码作业,要点是要关注终究增加的这个处理器,这个处理器是自界说的,里边会编写处理分布式业务的中心逻辑,根底架构如下:

// 分布式业务的中心处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    private static ChannelGroup channelGroup =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    // 业务组中的业务状况列表
    private static Map<String, List<String>> transactionTypeMap = new ConcurrentHashMap<>();
    // 业务组是否现已接收到完毕的符号
    private static Map<String, Boolean> isEndMap = new ConcurrentHashMap<>();
    // 业务组中应该有的业务个数
    private static Map<String, Integer> transactionCountMap = new ConcurrentHashMap<>();
    // 把整个Channel参加到channelGroup中
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.add(channel);
    }
    /***
     * 这儿是待会儿完结分布式业务的中心
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 等会儿逐渐完结
    }
    // 向客户端(业务参加者)发送终究处理成果的办法
    private void sendResult(JSONObject result){
        System.out.println("业务终究处理成果:" + result.toJSONString());
        for (Channel channel : channelGroup){
            channel.writeAndFlush(result.toJSONString());
        }
    }
}

在这个处理器类中,首要是四个地方需求了解,即里边的三个容器,两个办法:

  • transactionTypeMap:记载一个业务组中,一切子业务的履行状况。
  • isEndMap:记载一个业务组,当时一切的子业务是否已抵达。
  • transactionCountMap:记载一个业务组,应该由多少个子业务组成。
  • handlerAdded():业务办理者的中心办法,一切来自客户端的音讯都会被该办法监听到。
  • sendResult():这个办法会在一组业务悉数履行完结后,用来给参加者回来处理成果运用。

OK,这儿其实并非触及到任何大局业务的处理,只是依据Netty建立了一个服务端结构,后续会一点点去展开完结,这样也能够更便于诸位了解,接着再预备着手完结业务参加者。

3.1、业务参加者的完结进程

前面说过,所谓的业务参加者其实就是指每个业务子服务,所以按理来说,业务参加者的代码完结应该放到库存服务、订单服务中,但这样会对业务服务的造成较高的代码侵入性,一起每个需求运用分布式业务的子服务,都需求复制一遍业务参加者的代码,不免有些表现的“。不太智能”。

因而这儿将业务参加者的中心代码,独自拧出来构建一个Maven子工程,完结完结后将其打成依托包放到本地库房,然后需求运用分布式业务功用的子服务,只需求引进Maven依托即可(我这儿将其命名为zhuzi-distributed-tx,即代表「竹子爱熊猫」开发的一款分布式业务结构,哈哈哈,臭美一下~)。

3.1.1、Netty客户端的完结

作为业务参加者,由于要和业务办理者之间进行通讯,而业务办理者的实质是一个Netty-Server服务端,所以这儿的业务参加者,实质就是Netty-Client客户端,所以Netty客户端的代码完结如下:

// Netty-Client客户端代码完结
@Component
public class NettyClient implements InitializingBean {
    // 这个是业务参加者的中心处理器
    private NettyClientHandler client = null;
    private static ExecutorService executorService = Executors.newCachedThreadPool();
    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("\n\n>>>>>>业务参加者发动成功<<<<<<\n\n");
        start("localhost", 8080);
    }
    // 依据IP、端口地址,向服务端注册客户端
    public void start(String host, int port) {
        client = new NettyClientHandler();
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                // 这儿增加了一个自界说的处理器
                .handler(new ClientInitializer(client));
        try {
            bootstrap.connect(host, port).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public void send(JSONObject sendData) {
        try {
            // 调用处理器中向服务端发送数据的办法
            client.sendData(sendData);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

结构根本上和业务办理者类似,首要有这么一个发动类,会在业务服务发动时,顺势一起随同发动,接着会依据地址找到业务办理者并注册,而业务参加者这儿相同有一个自界说的处理器,代码如下:

// 自界说的处理器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    private NettyClientHandler nettyClientHandler;
    public ClientInitializer(NettyClientHandler nettyClientHandler) {
        this.nettyClientHandler = nettyClientHandler;
    }
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        // 这儿初始化的是客户端的处理器
        pipeline.addLast("handler", nettyClientHandler);
    }
}
// 业务参加者的中心处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private ChannelHandlerContext channelHandlerContext;
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        channelHandlerContext = ctx;
    }
    // 一切服务端(业务办理者)回来的数据都会被该办法监听到
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) 
                                                            throws Exception {
        // 等会儿逐渐完结
    }
    // 向服务端(业务办理者)发送数据的办法
    public void sendData(JSONObject result){
        System.out.println("向事物办理者发送数据:" + result.toJSONString());
        channelHandlerContext.writeAndFlush(result.toJSONString());
    }
}

在业务参加者的处理器中,相同存在两个办法:

  • channelRead():一切来自服务端(业务办理者)的音讯都会被这个办法监听到。
  • sendData():客户端(参加者)向服务端(办理者)发送数据的办法。

OK,看到这儿信任有些关于Netty不大了解的小伙伴会有些懵,那下面画一幅图来简略阐明一下现在的结构,就算你不会Netty结构也不要紧,只需记住这幅图即可:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

这儿实则是依据Netty建立了一个客户端与服务端的通讯架构,这样既能够让参加者向办理者注册业务,也能够让办理者向参加者回来成果,了解这个中心思想就OK

3.1.2、业务参加者的中心完结

建立出办理者、参加者之间的根底通讯架构后,接着来完结业务参加者的中心代码,先界说个枚举类:

// 业务类型的枚举类
public enum TransactionalType {
    // 提交和回滚状况
    commit, rollback;
}

这个枚举类会用来在后续判别业务状况,它会作为子业务目标的一个子特点,业务目标的界说如下:

// 分布式业务 - 子业务目标
public class ZhuziTx {
    // 当时子业务归于哪个业务组
    private String groupId;
    // 当时子业务的业务ID
    private String transactionalId;
    // 当时子业务的业务类型
    private TransactionalType transactionalType;
    // 当时子业务的使命等候行列(依据此完结业务操控权)
    private Task task;
    public ZhuziTx(String groupId, String transactionalId, TransactionalType transactionalType) {
        this.groupId = groupId;
        this.transactionalId = transactionalId;
        this.transactionalType = transactionalType;
        this.task = new Task();
    }
    // 省掉其他结构办法、以及get/set办法......
}

每个参加大局业务的子业务,在这儿都会被封装为一个个ZhuziTx目标,每个子业务具有下述几个特点:

  • groupId:表明当时子业务归于那一组大局业务,由于一起可能存在多组分布式业务。
  • transactionalId:当时子业务的业务ID值,具有大局仅有特性。
  • transactionalType:这个是前面的枚举类,表明当时子业务终究的履行状况(回滚/提交)。
  • task:这个现在暂时不会用到,后面用来完结业务提交或回滚的操控权。

了解这个根本的子业务目标后,接着再来看看业务参加者的中心完结类:

// 业务参加者的中心完结类
public class ZhuziTxParticipant {
    // 获取前面随同服务发动发生的NettyClient客户端
    private static NettyClient nettyClient = 
            ApplicationContextProvider.getBean(com.zhuzi.distributedtx.netty.NettyClient.class);
    // 存储当时线程在履行的子业务目标
    private static ThreadLocal<ZhuziTx> current = new ThreadLocal<>();
    // 存储当时子业务所属的业务组ID值
    private static ThreadLocal<String> currentGroupId = new ThreadLocal<>();
    // 存储当时子业务所属的业务组子业务总量
    private static ThreadLocal<Integer> transactionalCount = new ThreadLocal<>();
    // 业务ID和子业务目标的映射组
    private static Map<String,ZhuziTx> ZHUZI_TRANSACTIONAL_MAP = new HashMap();
    /**
     *  向业务办理者中发送一个创立业务组的指令
     * @return
     */
    public static String createZhuziTransactionalManagerGroup(){
        // 随机发生一个UUID作为业务组ID
        String groupID = UUID.randomUUID().toString();
        // 经过JSON做序列化
        JSONObject sendData = new JSONObject();
        // 传入前面发生的业务组ID,以及本次操作为create创立指令
        sendData.put("groupId", groupID);
        sendData.put("command", "create");
        // 调用客户端的send()办法向服务端发送数据
        nettyClient.send(sendData);
        System.out.println(">>>>>向办理者发送创立业务组指令成功<<<<<");
        // 把业务组ID存在currentGroupId当中
        currentGroupId.set(groupID);
        // 对外回来业务组ID值
        return groupID;
    }
    /***
     *  创立一个子业务目标
     */
    public static ZhuziTx createTransactional(String groupId){
        // 随机发生一个UUID作为子业务ID
        String transactionalId = UUID.randomUUID().toString();
        // 示例化出一个子业务目标
        ZhuziTx zhuziTransactional = new ZhuziTx(groupId, transactionalId);
        // 将创立出的子业务目标保存到相关的变量中
        ZHUZI_TRANSACTIONAL_MAP.put(groupId, zhuziTransactional);
        current.set(zhuziTransactional);
        // 对业务组数量+1
        Integer integer = addTransactionCount();
        System.out.println("创立子业务,现在业务组长度为:" + integer);
        return zhuziTransactional;
    }
    /**
     * 注册业务(向业务办理者的业务组中增加子业务)
     */
    public static ZhuziTx addZhuziTransactional(ZhuziTx ztp,
                                                Boolean isEnd, TransactionalType type){
        // 经过JSON序列化一个目标
        JSONObject sendData = new JSONObject();
        // 传入当时子业务的组ID、业务ID、业务类型、操作类型....信息
        sendData.put("groupId", ztp.getGroupId());
        sendData.put("transactionalId", ztp.getTransactionalId());
        sendData.put("transactionalType", type);
        sendData.put("command", "add");
        sendData.put("isEnd", isEnd);
        sendData.put("transactionalCount", ZhuziTxParticipant.getTransactionalCount());
        // 将封装好的JSON发送给业务办理者
        nettyClient.send(sendData);
        System.out.println(">>>>>向办理者发送增加子业务指令成功<<<<<");
        return ztp;
    }
    // 增加业务组数量的办法
    public static Integer addTransactionCount() {
        System.out.println(transactionalCount.get());
        int i = (transactionalCount.get() == null
                ? 0 : transactionalCount.get()) + 1;
        transactionalCount.set(i);
        return i;
    }
    // 省掉前面类成员的Get/Set办法.....
}

上述这个类就是业务参加者的中心处理类,里边首要供给了三个中心办法:

  • createZhuziTransactionalManagerGroup():向办理者中恳求创立一个业务组。
  • createTransactional():依据当时服务的业务状况,创立一个子业务目标。
  • addZhuziTransactional():向办理者的指定业务组中增加一个子业务。

一起为了供给给多个服务运用,这个类中的成员根本上都选用ThreadLocal来润饰,也就是每条履行不同业务的线程,都会具有自己的current、currentGroupId、transactionalCount这三个特点,这三个特点会用来辅助完结在办理者中创立业务组、增加子业务的作业。

3.1.3、接收参加者的业务操控权

经过上述的一些流程后,虽然构建出了一些根底组件,但这并不能阻挠每个服务各自提交业务,默许状况下,MySQL履行完一条SQL语句后会立马提交业务,如若想让MySQL不自动提交业务,则有必要经过begin之类的办法来手动办理业务。

之前这个作业交给Spring来办理,Spring在业务操作未抛出反常的状况下,则会向MySQL发送commit指令,这儿也不去彻底重写Spring的业务机制,由于这样会导致作业量尤为巨大,而是依据Spring原有的业务机制根底上,掠夺掉Spring自动提交、回滚业务的权限。

那终究该怎样掠夺掉Spring自动提交、回滚业务的权限呢?这儿需求先简略了解一下Spring业务机制的原理。

之前提到过,Spring结构的业务机制,仍旧是依托于数据库本身所供给的业务机制来完结,这也就意味着当被Spring@Transactional注解润饰的办法,履行完毕后,Spring会去调用JDBC接口中的commit()/rollback()办法,然后完结业务的提交或回滚。

简略了解上述原理后,那再来考虑一下咱们该怎样接收业务的操控权呢?局势就非常明亮了,已然Spring是经过JDBC接口中的办法,来完结业务提交或回滚的,那咱们只需求编撰一个AOP切面,去阻拦尝试调用对应接口提交/回滚业务的线程即可。

可是问题又又又来了,咱们将调用JDBC业务接口的线程阻拦后,这还不行,由于这并不能让咱们接收业务的操控权,这只是只能掠夺掉Spring自动提交、回滚业务的权限。但终究每个业务服务(业务参加者)在收到业务办理者终究的处理计划后,仍旧需求提交或回滚业务,所以咱们的zhuzi-distributed-tx还需求彻底拿到业务操控权。

但这儿该怎样去拿到业务操控权呢?其实很简略,这儿先写一下AOP切面的完结,如下:

// 掠夺并接收Spring业务操控权的切面
@Aspect
@Component
public class ZhuziDataSourceAspect {
    @Around("execution(* javax.sql.DataSource.getConnection(..))")
    public Connection dataSourceAround(ProceedingJoinPoint proceedingJoinPoint)
                                                throws Throwable {
        System.out.println("业务切面成功阻拦,正在接收操控权......");
        // 假如当时调用业务接口的线程正在参加分布式业务,
        // 则回来自界说的Connection目标接收业务操控权
        if (ZhuziTxParticipant.getCurrent() != null){
            System.out.println("回来自界说的Connection目标.......");
            Connection connection = (Connection) proceedingJoinPoint.proceed();
            return new ZhuziConnection(connection, ZhuziTxParticipant.getCurrent());
        }
        // 假如当时线程没有参加分布式业务,让其正常提交/回滚业务
        System.out.println("回来JDBC的Connection目标.............");
        return (Connection) proceedingJoinPoint.proceed();
    }
}

这个切面的代码不多,首要是阻拦了一切调用DataSource.getConnection()的线程,然后会进行判别,假如当时线程履行的业务办法,正在参加分布式业务,则回来自界说的数据库衔接目标,假如是未参加分布式业务的本地业务操作,则让其正常提交/回滚业务。

那这儿为何要去阻拦履行DataSource.getConnection()这个办法的线程呢?由于Spring在提交业务时,会先调用该办法获取数据库衔接目标,然后经过数据库衔接目标中的rollback/commit办法完结业务回滚或提交,因而能够以该办法作为切入点,然后接收Spring的业务操控权。

但详细怎样接收的呢?中心完结则坐落ZhuziConnection这个自界说的数据库衔接类中,如下:

// 自界说的数据库衔接类(有必要要完结JDBC的Connection接口)
public class ZhuziConnection implements Connection {
    // 本来应该回来的数据库衔接目标
    private Connection connection;
    // 存放参加分布式业务的子业务
    private ZhuziTx zhuziTx;
    // 担任提交业务的线程
    private ExecutorService commitT = Executors.newSingleThreadExecutor();
    // 担任回滚业务的线程
    private ExecutorService rollbackT = Executors.newSingleThreadExecutor();
    public ZhuziConnection(Connection connection, ZhuziTx zhuziTx) {
        this.connection = connection;
        this.zhuziTx = zhuziTx;
    }
    @Override
    public void commit() throws SQLException {
        // 交给线程池中的线程来做终究的业务提交
        commitT.execute(() -> {
            try {
                // 堵塞线程,制止提交
                zhuziTx.getTask().waitTask();
                // 假如办理者回来业务能够提交,则提交业务
                if (zhuziTx.getTransactionalType().equals(TransactionalType.commit)) {
                    System.out.println("\n收到办理者终究决断:提交业务中\n");
                    connection.commit();
                    System.out.println("\n子业务提交业务成功...\n");
                }
                // 不然调用rollback()办法回滚业务
                else {
                    System.out.println("\n收到办理者终究决断:回滚业务中...\n");
                    connection.rollback();
                    System.out.println("\n子业务回滚业务成功...\n");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    @Override
    public void rollback() throws SQLException {
        // 交给线程池中的线程来做终究的业务回滚
        rollbackT.execute(() -> {
            zhuziTx.getTask().waitTask();
            try {
                connection.rollback();
                System.out.println("\n\n子业务回滚业务成功...\n\n");
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    @Override
    public void close() throws SQLException {
        connection.close();
    }
    // 省掉其他Connection接口需求完结的办法......
}

这个自界说的数据库衔接类,有必要要完结JDBC中的Connection接口,毕竟当线程在尝试调用DataSource.getConnection()获取衔接时,经过切面回来一个并非Connection类型的目标回去,这自然会导致程序报错,一起咱们仍旧需求经过本来的衔接目标,完结业务终究的提交或回滚,因而也要将本来应该回来的Connection目标注入到自界说的衔接类中。

关于接口Connection中的其他办法,直接调用本来衔接目标的办法来履行即可,咱们想要接收业务的操控权,仅有需求重写的就是rollback()/commit()这两个办法,接着来看看重写后的两个办法。

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

这儿先将代码截出来便于了解,但想要了解这两段代码,需求具有必定程度的多线程编程根底,不然会非常困惑,首要要记住:不管履行哪个办法,都会有两条线程而并非一条!当一条线程履行完业务办法后,接着会依据Spring的业务机制,来获取衔接目标调用commit/rollback办法完毕业务,而这儿咱们将commit/rollback办法改成了向线程池中提交一个使命,那也就意味着:业务线程向对应的线程池提交办法后,会当即回来,这儿业务线程并不会被堵塞

而这儿不堵塞,堵塞的是什么呢?其实被堵塞住的是线程池中的线程,这些线程会堵塞至业务办理者回来终究决断时,才会持续往下履行(后续代码中会唤醒这些线程),这样就做到了即不堵塞业务线程,又没有真实提交/回滚业务,然后真实的拿到了业务操控权,究竟啥时候提交/回滚业务,这彻底能够由咱们自己决议。

但留意:堵塞线程的办法是调用了子业务目标中的Task.waitTask()办法,还记得咱们之前界说的子业务目标嘛?其间有一个这样的成员:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

这个成员的完结类如下:

// 子业务的等候行列:依据此完结业务操控权
public class Task {
    // 经过ReentrantLock的Condition条件等候行列完结线程堵塞/唤醒
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    // 堵塞挂起线程的办法
    public void waitTask(){
        System.out.println("业务操控权现已被阻拦挂起........");
        lock.lock();
        try {
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    // 唤醒放下线程的办法
    public void signalTask(){
        System.out.println("业务操控权现已被阻拦放下........");
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

这个Task类的完结尤为简略,内部经过ReentrantLockCondition多条件等候行列,完结waitTask()堵塞线程、signalTask唤醒线程这两个办法,当Spring尝试提交/回滚参加者的业务时,由于运用的是咱们自界说的衔接目标,因而调用commit/rollback办法后并不会真实完毕业务,而是会把提交/回滚业务的作业交给线程池完结。

当线程池收到业务线程提交的使命后,会首要挂起本身线程,等候后续呈现唤醒指令时,才会真实的履行commit/rollback操作。

那这儿的线程被堵塞后,究竟什么时候会唤醒呢?也就是多久才会真实的提交/回滚业务呢?这儿需求回到之前的NettyClientHandler.channelRead()办法,如下:

// 当业务办理者回来终究决断时,该办法会被触发,进而会履行这个办法的代码
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) 
        throws Exception {
    System.out.println("接收到业务办理者的终究决断:" + msg.toString());
    // 反序列化解析JSON数据
    JSONObject data = JSON.parseObject((String) msg);
    String groupId = data.getString("groupId");
    String command = data.getString("command");
    System.out.println("接收command:" + command);
    // 对业务进行操作
    ZhuziTx zhuziTx = ZhuziTxParticipant.getZhuziTransactional(groupId);
    // 假如业务办理者终究决议提交业务
    if ("commit".equals(command)){
        // 依据groupID找到子业务并设置commit状况
        zhuziTx.setTransactionalType(TransactionalType.commit);
    }
    // 假如业务办理者终究决议回滚业务
    else{
        // 依据groupID找到子业务并设置rollback回滚状况
        zhuziTx.setTransactionalType(TransactionalType.rollback);
    }
    // 唤醒在之前堵塞的、担任提交/回滚业务的线程
    zhuziTx.getTask().signalTask();
}

当参加者收到办理者的终究告诉后,依据业务办理者的终究决断来设置业务状况,然后再唤醒前面堵塞的线程,真实履行提交或回滚业务的操作:

  • 假如办理者的告诉为commit,这儿会将子业务的状况设为TransactionalType.commit
  • 不然这儿会将子业务的状况设为TransactionalType.rollback

channelRead()办法被触发后,终究会调用signalTask()唤醒前面堵塞的线程,前面堵塞的线程被唤醒后,会接着履行if (zhuziTx.getTransactionalType().equals(TransactionalType.commit))这行代码,也就是判别办理者终究给出的决断是否为commit,假如是则提交当时子业务,不然会调用connection.rollback()办法回滚当时子业务。

看到这儿,关于业务终究是怎样提交或回滚的,信任咱们现已明白了其间原理,但一通代码看下来咱们估量有些绕,所以接着上一个流程图,帮咱们总结一下这个进程,如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

3.1.4、自界说分布式业务注解

为了保证对业务代码的零侵入性,这儿运用自界说注解来完结参加者嵌入业务服务的功用,当其他子服务需求运用时,只需求在对应的办法上加上一个注解即可,自界说注解如下:

// 自界说的分布式业务注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface ZhuziTransactional {
    // 标识当时是大局业务的敞开者
    boolean isStart() default false;
    // 标识当时是大局业务的完毕者
    boolean isEnd() default false;
}

这个注解中有两个值可选,isStart=true表明当时被该注解润饰的办法,是一个分布式业务中的第一个业务操作,isEnd=true则代表是终究一个业务操作,但光界说注解是没有意义,接着还需求经过AOP切面来阻拦运用该注解的办法,切面如下:

// 担任阻拦自界说注解的切面
@Aspect
@Component
public class ZhuziTransactionalAspect implements Ordered {
    @Around("@annotation(com.zhuzi.distributedtx.annotation.ZhuziTransactional)")
    public Integer invoke(ProceedingJoinPoint proceedingJoinPoint){
        System.out.println("分布式业务注解收效,切面成功阻拦............");
        // 获取对应注解的业务办法,以及办法上的注解目标
        MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
        Method method = signature.getMethod();
        ZhuziTransactional zta = method.getAnnotation(ZhuziTransactional.class);
        // 创立业务组
        String groupId = "";
        // 假如现在触发切面的办法,是一组大局业务的第一个子业务
        if (zta.isStart()){
            // 则向业务办理者注册一个业务组
            groupId = ZhuziTxParticipant.createZhuziTransactionalManagerGroup();
        }
        // 不然获取当时业务所属的业务组ID
        else {
            groupId = ZhuziTxParticipant.getCurrentGroupId();
        }
        // 创立子业务
        ZhuziTx zhuziTx = ZhuziTxParticipant.createTransactional(groupId);
        // spring会敞开MySQL业务
        try {
            //履行spring切面(dataSource切面),履行详细的业务办法
            Object result = proceedingJoinPoint.proceed();
            // 没有抛出反常证明该业务能够提交,把子业务增加进业务组
            ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                    TransactionalType.commit);
            // 回来履行成功的成果
            return (Integer) result;
        }  catch (Exception e){
                e.printStackTrace();
                // 抛出反常证明该业务需求回滚,把子业务增加进业务组
                ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                        TransactionalType.rollback);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            // 把子业务增加进业务组,抛出反常证明该业务需求回滚
            ZhuziTxParticipant.addZhuziTransactional(zhuziTx, zta.isEnd(),
                    TransactionalType.rollback);
            // 回来履行失利的成果
            return -1;
        }
        return -1;
    }
    // 设置优先级,让前面阻拦业务的切面先履行
    @Override
    public int getOrder() {
        return 10000;
    }
}

这个切面代码并不多,但逻辑相对来说也并不简略,整个办法履行的中心逻辑如下:

  • ①经过反射机制获取自界说注解润饰的Method办法目标,以及注解目标本身。
  • ②判别业务办法上注解的值,看看是isStart是否为True
    • true,表明触发切面的业务办法,是分布式业务中的第一个业务操作,所以会先向办理者恳求创立一个业务组,并获取业务组ID
    • 不为true,判别当时子业务应该归于哪个业务组,获取业务组ID
  • ③经过前面拿到的业务组ID,调用createTransactional()办法实例化一个子业务目标。
  • ④经过AOP中的proceedingJoinPoint.proceed()办法,履行切面阻拦的详细业务操作。
  • ⑤假如业务操作履行进程中没有抛出反常,则向办理者的业务组中增加一个commit状况的子业务。
  • ⑥假如业务操作履行进程中抛出反常,则向办理者的业务组中增加一个rollback状况的子业务。

上述就是整个AOP切面的中心作业,一句话总结就是:会依据当时子业务的履行状况,向业务办理者的业务组中增加一个子业务。但为了避免这个切面的优先级高过前面的切面,因而也需求重写一下getOrder()办法,将当时切面的优先级放的低一些,让阻拦Spring业务的切面先履行。

3.1.5、业务组ID是怎样在上下流服务中传递的?

不过在这个切面中,有一个细节,即当时子业务是怎样知道自己是归于哪个业务组的呢?在代码中运用了ZhuziTxParticipant.getCurrentGroupId()获取了当时子业务的组ID,这个GroupId是怎样传递过来的呢?现在的调用状况如下:

  • ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
  • ②库存服务中敞开一个分布式业务,生成一个大局仅有ID,并创立一个业务组。
  • ③库存服务依据前面生成得到的组ID,把本身业务的履行状况,参加到办理者的业务组中。
  • ④库存服务向订单服务回来调用成果,即OK/200,以及调用减库存接口成功的信息。
  • ⑤订单服务收到调用成果后,持续调用本地的新增订单办法,履行完结后增加履行状况到办理者。

这个进程听着好像不是特别难对吧?但问题就在于④、⑤之间,订单服务中的「新增订单」履行完结后,又怎样知道自己是归于哪个分布式业务组的呢?所以这儿需求把库存服务中,生成的groupId传递过来,这样才干保证两个子业务,会增加到同一个业务组里边。

但这个需求听起来简略,但完结起来却并不容易,不管是经过阻拦器、亦或是过滤器,都无法完结这个需求,由于在库存服务履行完结后,呼应报文就现已生成,所以在阻拦器、过滤器中新增呼应头信息,这是无法收效的。

那终究我是怎样处理的呢?简略翻阅源码后,这儿用到了Spring结构预留的一个钩子接口:ResponseBodyAdvice,完结这个接口的钩子类,会在Controller办法履行完结之后,呼应报文拼装之前被调用,因而咱们能够在这儿织入业务组ID,完结如下:

//  Spring结构预留的钩子接口:织入业务组ID
@ControllerAdvice
public class GroupIdRespAdvice implements ResponseBodyAdvice {
    // 钩子类的前置办法:有必要为true才会履行beforeBodyWrite()办法
    @Override
    public boolean supports(MethodParameter methodParameter, Class aClass) {
        return true;
    }
    // Controller办法履行完结之后,呼应报文拼装之前履行
    @Override
    public Object beforeBodyWrite(Object body, MethodParameter methodParameter,
                                  MediaType mediaType, Class aClass,
                                  ServerHttpRequest request,
                                  ServerHttpResponse response) {
        // 假如ThreadLocal中的业务组ID不为空,代表当时恳求参加了分布式业务,
        // 会获取对应的业务组ID放入到呼应头中(关于一般恳求不会改写呼应头)
        if (ZhuziTxParticipant.getCurrentGroupId() != null){
            // 把需求传递的业务组ID、子业务数量放入呼应头中
            response.getHeaders().set("groupId",
                ZhuziTxParticipant.getCurrentGroupId());
            response.getHeaders().set("transactionalCount",
                String.valueOf(ZhuziTxParticipant.getTransactionCount()));
        }
        return body;
    }
}

这样处理之后,就能够在上游服务的恳求出口,为每个触及分布式业务的恳求增加上一个呼应头信息,在呼应头中会传输下流服务所需的业务组ID、组中子业务数量信息,接着还需求在下流服务的呼应进口,获取这些恳求头信息,完结如下:

// HttpClient长途调用东西
public class HttpClient {
    // GET恳求的办法
    public static String get(String url) {
        String result = "";
        try {
            // 创立一个httpClient目标,并调用传入的URL接口
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpGet httpGet = new HttpGet(url);
            CloseableHttpResponse response = httpClient.execute(httpGet);
            // 假如调用成果是回来OK,状况码为200
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                // 获取response目标中的一切呼应头
                Header[] allHeaders = response.getAllHeaders();
                for (Header header : allHeaders) {
                    // 从中找到上游服务传递的组ID、业务数量,并赋值给自己的子业务
                    if ("groupId".equals(header.getName())){
                        String groupId = header.getValue();
                        ZhuziTxParticipant.setCurrentGroupId(groupId);
                    }
                    if ("transactionalCount".equals(header.getName())){
                        String transactionalCount = header.getValue();
                        ZhuziTxParticipant.setTransactionCount(
                                Integer.valueOf(transactionalCount == null ?
                                    "0" : transactionalCount));
                    }
                }
                // 向调用方回来上游服务终究的调用成果
                result = EntityUtils.toString(response.getEntity(), "utf-8");
            }
            response.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
}

由于我这儿是经过HttpClient来完结长途调用的,所以我只需求在调用完毕后,读取response目标的恳求头信息,然后获取其间的业务组ID,并保存到自己的ThreadLocal中即可。

但假如是用了Dubbo、gRPC、Fegin、RestTemplate....等长途调用的办法,咱们可自行依据RPC东西的类型,去编写Filter过滤器截断呼应成果,然后获取呼应头中的数据,接着放入自己的ThreadLocal中即可。

这个地方的实质完结就是分布式体系中,按调用链路去依次传递一个大局同享数据,在上游服务的出口写入呼应头信息、下流服务的进口获取呼应头信息即可。

我这儿的groupId、transactionalCount都是放入到ZhuziTxParticipantThreadLocal<String> currentGroupId成员中,由于这儿运用了ThreadLocal来存储,所以多个分布式业务一起履行的状况,仍旧不会冲突。

3.1.6、业务参加者的收尾作业

到这儿,业务参加者就完好的完结出来了,但为了供给给其他业务子服务运用,因而咱们还需求履行一下mvn -package指令,将当时完结好的zhuzi-distributed-tx结构,打包到Maven本地库房中,后续其他子服务可经过下述GAV坐标导入依托:

<dependency>
    <groupId>com.zhuzi</groupId>
    <artifactId>zhuzi-distributed-tx</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

3.2、业务办理者的中心完结

在前面的进程中,咱们只依据Netty构建出了一个最根本的服务端,但关于业务办理者的中心逻辑还并未开端完结,因而现在开端编撰办理者的中心完结,也就是回到NettyServerHandler.channelRead()办法中,完结中心的逻辑,代码如下:

/***
 *
 *  创立业务组,而且增加保存业务
 *  而且需求判别,假如一切业务都现已履行了(有成果了,要么提交,要么回滚)
 *      假如其间有一个业务需求回滚,那么告诉一切客户进行回滚,不然则告诉一切客户端进行提交
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("承受数据:" + msg.toString());
    JSONObject jsonObject = JSON.parseObject((String)msg);
    // create:创立一个业务组,add:增加业务
    String command = jsonObject.getString("command");
    // 业务组ID
    String groupId = jsonObject.getString("groupId");
    // 子业务类型(commit:待提交、rollback:待回滚)
    String transactionType = jsonObject.getString("transactionalType");
    // 业务数量(当时这个大局业务的总参加者数量)
    Integer transactionCount = jsonObject.getInteger("transactionalCount");
    // 是否完毕业务(是否为终究一个业务)
    Boolean isEnd = jsonObject.getBoolean("isEnd");
    // 假如参加者发来的是create指令,则创立一个业务组
    if ("create".equals(command)){
        transactionTypeMap.put(groupId, new ArrayList<String>());
    }
    // 假如参加者是add操作,则将对应子业务参加业务组
    else if ("add".equals(command)){
        transactionTypeMap.get(groupId).add(transactionType);
        // 判别当时子业务是否为整组终究一个业务
        if (isEnd) {
            // 是则声明本组业务已完毕
            isEndMap.put(groupId, true);
            transactionCountMap.put(groupId, transactionCount);
        } else {
            // 不然声明后续仍旧会有业务到来
            isEndMap.put(groupId, false);
            transactionCountMap.put(groupId, transactionCount);
        }
        // 调试时的输出信息
        System.out.println("isEndMap长度:" + isEndMap.size());
        System.out.println("transactionCountMap长度:" + transactionCountMap.get(groupId));
        System.out.println("transactionTypeMap长度:" + transactionTypeMap.get(groupId).size());
        JSONObject result = new JSONObject();
        result.put("groupId",groupId);
        // 假如现已接收到完毕业务的符号,则判别业务是否现已悉数抵达
        if (isEndMap.get(groupId) &&
                transactionCountMap.get(groupId)
                        .equals(transactionTypeMap.get(groupId).size())){
            // 假如现已悉数抵达则看是否需求回滚
            if (transactionTypeMap.get(groupId).contains("rollback")){
                System.out.println("业务终究回滚..........");
                result.put("command","rollback");
                sendResult(result);
            // 假如一组业务中没有任何业务需求回滚,则提交整组业务
            } else {
                System.out.println("业务终究提交..........");
                result.put("command","commit");
                sendResult(result);
            }
        }
    }
}

之前聊到过,客户端(参加者)一切发送给服务端(办理者)的数据,都会被这个channelRead()办法监听到,也就是每当有客户端给服务端发送数据时,都会触发这个办法履行,因而咱们只需在这个办法中完结中心逻辑即可,代码逻辑如下:

  • ①经过JSON反序列化,解析客户端(参加者)发送过来的数据包。
  • ②假如参加者数据包的command=create,则先创立一个业务组。
  • ③假如参加者数据包的command=add,则将对应子业务的履行状况增加进业务组。
  • ④将子业务增加进业务组后,接着判别一下isEnd是否为true
    • 否:持续等候其他子业务的到来。
    • 是:进入第⑤步,对一个分布式业务进行终究处理。
  • ⑤判别整组业务中是否包括rollback,只需有一个子业务的状况为rollback,整组业务都需求回滚,反之则提交。
  • ⑥终究构建一个JSON数据包,并调用sendResult()办法,把办理者的终究决断告诉给每个参加者。

上面是整个业务办理者的中心逻辑,简略来说其实就两个功用:

  • 依据参加者数据包中的command指令,来创立业务组或增加子业务。
  • 在一组业务悉数已抵达后,判别整组业务终究究竟要回滚仍是提交。

相较于业务参加者的完结来说,业务办理者的代码还比较简略,接着来做个简略的测验。

3.3、测验自界说的分布式业务结构

前面完结了业务参加者和业务办理者的中心功用后,接着在对应的业务服务中引进zhuzi-distributed-tx结构的依托:

<dependency>
    <groupId>com.zhuzi</groupId>
    <artifactId>zhuzi-distributed-tx</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

然后先在库存服务service层的扣减库存办法上,加上一个@ZhuziTransactional注解:

@Override
@Transactional
// 这儿要写上isStart = true,由于这是第一个业务操作
@ZhuziTransactional(isStart = true)
public Integer updateByPrimaryKeySelective(Inventory record) {
    int i = inventoryMapper.updateByPrimaryKeySelective(record);
    return i;
}

接着在订单服务service层的新增订单办法上,要相同加一个@ZhuziTransactional注解:

@Override
@Transactional
// 这儿要写上isEnd = true,由于这是终究一个业务操作
@ZhuziTransactional(isEnd = true)
public Integer insertSelective(Order record) {
    // 刻意抛出一个反常
    int i = 100 / 0;
    int n = orderMapper.insertSelective(record);
    System.out.println("\n\n\n" + n + "\n\n\n");
    return n;
}

接着别离发动业务办理者、注册中心、库存服务、订单服务四个进程,然后开端测验,仍旧经过浏览器调用之前的下单接口:

  • http://localhost:8001/order/placeAnOrder

此刻要点调查操控台的日志输出,来看看成果,业务办理者的操控台输出如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

业务参加者-库存服务的操控台输出如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

业务参加者-订单服务的操控台输出如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

上述三个日志输出中,要点调查业务办理者的输出,整个流程如下:

(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

当业务办理者完结图中第七步后,接着业务参加者(业务服务)这边会收到来自办理者的告诉,各自把自己子业务(「扣减库存、新增订单」)回滚,终究来看看数据库的表数据:
(二十七)舞动手指速写一个Seata-AT框架解决棘手的分布式事务问题

终究会发现,在两个数据库中,数据仍旧没有发生变化,库存表中的数据仍旧是99,而订单表中也没有新增订单数据,终究做到了数据的彻底一致性,然后处理了分布式业务造成的数据不一致问题。

四、分布式业务手写/原理篇总结

经过前面三个阶段的论述后,咱们一点点的从分布式业务问题引出、演示,再到逐渐去琢磨手写分布式业务结构的思路,再渐渐的手写出了一切代码,终究成功处理了分布式业务问题,这个进程相对来说也并不轻松,尤其是一些根柢较弱的小伙伴,阅览起来可能存在很大压力,所以在终究再完好总结一下:

  • ①客户端调用订单服务的下单接口时,订单服务会先调用库存服务的减库存接口。
  • ②库存服务中敞开一个分布式业务,生成一个大局仅有ID,并创立一个业务组。
  • ③库存服务依据前面生成得到的组ID,把本身业务的履行状况(commit),参加到办理者的业务组中。
  • ④库存服务向订单服务回来调用成功,而且经过Spring钩子类,将groupId放到呼应头中。
  • ⑤订单服务收到调用成果后,从呼应头中拿到业务组ID、子业务数量,放到本身的ThreadLocal中。
  • ⑥订单服务持续调用本地的新增订单办法,但由于咱们手动制作了反常,所以履行会报错,终究会依据前面的groupId,在办理者的业务组中增加一条履行状况为rollback的子业务。
  • ⑦业务办理者发现「下单」这个分布式业务的一切子业务悉数抵达后,接着会进行终究审判,发现其间存在一个rollback,然后告诉对应的一切业务参加者回滚。
  • ⑧库存、订单服务收到业务办理者的终究审判后,终究回滚各自的一切业务操作,保证数据的彻底一致性。

咱们写的整个分布式业务结构,其间心处理流程如上,而LCN、Seata-AT形式的履行流程也大致如此,都是依据数据库的业务机制来完结的,但实际上会比咱们这个更加复杂很多倍,会牵扯到资源办理者、大局锁等概念。

一起真实的分布式业务结构中,都只会有一个分布式业务注解,生成大局业务ID的操作,会放到最开端完结,然后向下进行传递,默许是终究一个子业务来完毕整组业务操作,伪逻辑如下:

@GlobalTransactional // ①会在这儿先生成大局业务ID
public String placeAnOrder(String shopID){
    RPC.减库存接口();  // ②传递大局ID,减库存履行完结后,会依据大局ID增加一个子业务
    Local.新增订单办法(); // ③传递大局ID,新增订单履行后,再向业务组增加一个子业务
    // ④由于后面没有其他操作了,默许会完毕这组分布式业务,进行终究的提交/回滚操作
}

但咱们规划的这款分布式业务结构,则规划出了两个分布式业务注解,用isStrart来敞开分布式业务 isEnd来完毕业务,这儿首要是能让咱们更便于了解分布式业务的中心原理,不过这种做法并不完善。

但咱们的目的并不是打造一款商用结构,而是摒弃繁枝末节,真实了解分布式业务结构的中心原理,因而我就不持续去完善zhuzi-distributed-tx这个“分布式业务结构”啦~,咱们感兴趣的可自行Down下源码,这儿我附上GitHub的源码地址:>>>>戳我拜访<<<<。

源码中涵盖了整个业务体系和分布式业务结构的完好完结,但为了快速建立,所以关于微服务项目的架构并不全面,如RPC结构用的HttpClientGateWay网关也没有,限流熔断也没做,注册中心仍旧用的是Eureka....,仍是那句话,咱们有兴趣可自行完善~