seata源码解析:seata server各种消息处理流程_seata关闭channel-程序员宅基地

技术标签: 分布式事务  

请添加图片描述

seata-server消息处理流程

上一篇文章我们分析了seata-server端启动流程。本篇文章我们来看seata-server消息处理流程。
请添加图片描述
seata中有一个全局事务协调器DefaultCoordinator,它主要是处理来自RM和TM的请求来做相应的操作,但是实际的执行者并不是DefaultCoordinator,而是DefaultCore

DefaultCore的继承关系如下图,从继承图中我们可以看到其实Core类的实现类才是一个事务管理器。在seata中有4种事务管理模式,所以每种模式有一个具体的事务管理器。

请添加图片描述
而DefaultCore则是聚合了4种具体的事务管理器,根据事务的不同类型调用不同的事务管理器。组件的关系如下图
请添加图片描述
所以事务协调的主要工作就是接受请求然后调用事务管理器进行相应的操作

事务协调器接收请求

之前我们说到,所有的消息都会交给AbstractNettyRemotingServer.ServerHandler来处理,而AbstractNettyRemotingServer.ServerHandler根据消息的不同类型,交给不同的RemotingProcessor来处理

在这里插入图片描述
所以我们对那种消息感兴趣只需要看对应的RemotingProcessor实现类即可,我们挑几个常见的消息分析以下,思路都差不多。

事务管理器执行操作

RegRmProcessor和RegTmProcessor

tm和rm这部分注册代码看的我有点晕(不重要就没耐心看下去),主要作用就是在tc保存tm和rm的长连接,当tc需要往tm和rm发送消息的时候,就从ChannelManager中找到对应的长连接,然后发送消息

各模式中rm注册的时机如下

xa模式:构建DataSourceProxyXA
at模式:构建DataSourceProxy
tcc模式:GlobalTransactionScanner(Bean初始化后阶段),生成代理对象的时候判定这个方法是tcc的prepare方法

ServerOnRequestProcessor

在TC端,全局事务的状态被保存在GlobalSession对象中,分支事务的状态被保存在BranchSession中

ServerOnRequestProcessor处理消息的公共流程为

  1. 对应的channel是否注册过,没注册过直接关闭连接,否则到第二步
  2. 针对不同的消息交给DefaultCoordinator类的不同方法来处理,并返回结果
开启全局事务
// DefaultCoordinator
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
    if (LOGGER.isInfoEnabled()) {
    
        LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
            rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    }
}

消息的接收是通过DefaultCoordinator,然后交给DefaultCore来执行对应的操作,DefaultCore生成xid并返回

// DefaultCore
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    
    // 创建一个 GlobalSession
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 将 ROOT_SESSION_MANAGER 加入到这个 GlobalSession 的监听器列表中
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
 
    // 开启 GlobalSession
    session.begin();

    // 发布事件,如果你对这个事件感兴趣,可以注册这个事件
    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    // 返回 xid
    return session.getXid();
}

可以只返回一个xid,xid由DefaultCore#begin方法生成,xid的生成策略如下

// seata server ip地址 + seata server 端口号 + 雪花算法生成的唯一id
ipAddress + ":" + port + ":" + tranId;

从GlobalSession#begin方法可以看到GlobalSession用到了观察者模式,当GlobalSession的状态发生变更时,会通过给相应的观察者,观察者都是SessionManager,当接收到相应的事件后,将变更的状态进行持久化存储,当使用db模式存储时,这里会在global_table中插入一条记录。

// GlobalSession
public void begin() throws TransactionException {
    
    this.status = GlobalStatus.Begin;
    this.beginTime = System.currentTimeMillis();
    this.active = true;
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
    
        lifecycleListener.onBegin(this);
    }
}
注册分支事务
// AbstractCore
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                           String applicationData, String lockKeys) throws TransactionException {
    
    // 根据 xid 从 SessionManager 中获取到 GlobalSession
    GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
    return SessionHolder.lockAndExecute(globalSession, () -> {
    
        globalSessionStatusCheck(globalSession);
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        // 创建新的分支事务即 branchSession
        BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                applicationData, lockKeys, clientId);
        MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
        // 对分支事务需要的资源加锁,加锁的逻辑在别的文章详解
        branchSessionLock(globalSession, branchSession);
        try {
    
            // 将 branchSession 加到 globalSession 的属性中
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
    
            branchSessionUnlock(branchSession);
            throw new BranchTransactionException(FailedToAddBranch, String
                    .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
                            branchSession.getBranchId()), ex);
        }
        if (LOGGER.isInfoEnabled()) {
    
            LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
                globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
        }
        return branchSession.getBranchId();
    });
}
  1. 根据 xid 从 SessionManager 中获取到 GlobalSession
  2. 创建新的分支事务即 BranchSession
  3. 将 branchSession 加到 globalSession 的属性中,此时GlobalSession会发布分支事务注册事件,SessionManager 收到事件后会在 branch_table 中插入一条记录

注意:AT模式下,当分支事务注册的时候,会将修改的数据加锁,如果加锁失败,则抛出异常

提交全局事务
// DefaultCore
public GlobalStatus commit(String xid) throws TransactionException {
    
    // 根据xid找到全局事务对象GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
    
        // 已经被commit过了,直接返回成功
        return GlobalStatus.Finished;
    }
    // 添加监听器
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
    
        // Highlight: Firstly, close the session, then no more branch can be registered.
        // 关闭 GlobalSession 防止再次有新的 BranchSession 注册进来
        globalSession.closeAndClean();
        if (globalSession.getStatus() == GlobalStatus.Begin) {
    
            // 判断是否可以异步提交
            // 目前只有at模式可以异步提交,因为是通过undolog的方式去做的
            if (globalSession.canBeCommittedAsync()) {
    
                globalSession.asyncCommit();
                return false;
            } else {
    
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    // 同步提交
    // XA/TCC只能同步提交
    if (shouldCommit) {
    
        boolean success = doGlobalCommit(globalSession, false);
        //If successful and all remaining branches can be committed asynchronously, do async commit.
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
    
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
    
            return globalSession.getStatus();
        }
    } else {
    
        // 异步提交
        // 只有AT模式能异步提交
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    
    boolean success = true;
    // start committing event
    // 发布事件
    eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
        globalSession.getBeginTime(), null, globalSession.getStatus()));

    if (globalSession.isSaga()) {
    
        success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
    } else {
    
        // 取出所有的分支事务,然后提交
        Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
    
            // if not retrying, skip the canBeCommittedAsync branches
            if (!retrying && branchSession.canBeCommittedAsync()) {
    
                return CONTINUE;
            }

            BranchStatus currentStatus = branchSession.getStatus();
            // 一阶段失败
            if (currentStatus == BranchStatus.PhaseOne_Failed) {
    
                globalSession.removeBranch(branchSession);
                return CONTINUE;
            }
            try {
    
                BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                switch (branchStatus) {
    
                    case PhaseTwo_Committed:
                        globalSession.removeBranch(branchSession);
                        return CONTINUE;
                    case PhaseTwo_CommitFailed_Unretryable:
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error(
                                "Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
                            return CONTINUE;
                        } else {
    
                            // 分支事务,不能异步提交,并且还不重试,全局事务执行失败
                            SessionHelper.endCommitFailed(globalSession);
                            LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                    default:
                        // 当前是否正在重试
                        // retrying=true,说明是从重试队列进来的任务,不用再往重试队列放了
                        if (!retrying) {
    
                            globalSession.queueToRetryCommit();
                            return false;
                        }
                        if (globalSession.canBeCommittedAsync()) {
    
                            LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",
                                branchSession.getBranchId(), branchStatus);
                            return CONTINUE;
                        } else {
    
                            LOGGER.error(
                                "Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
                            return false;
                        }
                }
            } catch (Exception ex) {
    
                StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",
                    new String[] {
    branchSession.toString()});
                if (!retrying) {
    
                    globalSession.queueToRetryCommit();
                    throw new TransactionException(ex);
                }
            }
            return CONTINUE;
        });
        // Return if the result is not null
        // result 不为null 则为 false
        if (result != null) {
    
            return result;
        }
        //If has branch and not all remaining branches can be committed asynchronously,
        //do print log and return false
        // 有分支事务,并且不允许异步提交,说明失败了
        if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
    
            LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
            return false;
        }
    }
    //If success and there is no branch, end the global transaction.
    // 分支事务全部提交成功了
    if (success && globalSession.getBranchSessions().isEmpty()) {
    
        // 全局事务状态改为已提交
        SessionHelper.endCommitted(globalSession);

        // committed event
        eventBus.post(new GlobalTransactionEvent(globalSession.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            globalSession.getTransactionName(), globalSession.getApplicationId(), globalSession.getTransactionServiceGroup(),
            globalSession.getBeginTime(), System.currentTimeMillis(), globalSession.getStatus()));

        LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
    }
    return success;
}

可以看到AT模式可以异步提交,因为AT模式全局提交只是删除undoLog,异步提交可以提高执行效率。而其他模式得同步提交,依次向RM发送分支事务提交请求,当所有分支事务都执行成功后,全局事务提交成功。否则,将任务交给管理重试的SessionManager进行重试

全局事务的提交和回滚逻辑差不多,回滚逻辑就不分析了

ServerOnResponseProcessor

当我们需要进行全局提交时,需要向各个RM发送对应的请求,注意发送的是同步请求,阻塞获取结果。

实现思路主要是如下一个map

// 消息id -> 消息对应的MessageFuture
ConcurrentMap<Integer, MessageFuture> futures

每个消息有一个消息id,当发送的时候给每条消息创建一个MessageFuture,放在futures中,然后这个MessageFuture(底层其实就是CompletableFuture)阻塞获取结果

而ServerOnResponseProcessor则是用来接收分支提交(请求和响应对应的消息id是一样的),当收到结果后,设置消息对应的MessageFuture为完成,此时阻塞的同步请求就能获取到结果了

请添加图片描述

public class ServerOnResponseProcessor implements RemotingProcessor {
    

    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    
        // 根据消息id找到对应的MessageFuture
        MessageFuture messageFuture = futures.remove(rpcMessage.getId());
        if (messageFuture != null) {
    
            messageFuture.setResultMessage(rpcMessage.getBody());
        } else {
    
            // 没有找到对应的消息发送记录
            // 删除部分代码
        }
    }
}

参考博客

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zzti_erlie/article/details/120894915

智能推荐

while循环&CPU占用率高问题深入分析与解决方案_main函数使用while(1)循环cpu占用99-程序员宅基地

文章浏览阅读3.8k次,点赞9次,收藏28次。直接上一个工作中碰到的问题,另外一个系统开启多线程调用我这边的接口,然后我这边会开启多线程批量查询第三方接口并且返回给调用方。使用的是两三年前别人遗留下来的方法,放到线上后发现确实是可以正常取到结果,但是一旦调用,CPU占用就直接100%(部署环境是win server服务器)。因此查看了下相关的老代码并使用JProfiler查看发现是在某个while循环的时候有问题。具体项目代码就不贴了,类似于下面这段代码。​​​​​​while(flag) {//your code;}这里的flag._main函数使用while(1)循环cpu占用99

【无标题】jetbrains idea shift f6不生效_idea shift +f6快捷键不生效-程序员宅基地

文章浏览阅读347次。idea shift f6 快捷键无效_idea shift +f6快捷键不生效

node.js学习笔记之Node中的核心模块_node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是-程序员宅基地

文章浏览阅读135次。Ecmacript 中没有DOM 和 BOM核心模块Node为JavaScript提供了很多服务器级别,这些API绝大多数都被包装到了一个具名和核心模块中了,例如文件操作的 fs 核心模块 ,http服务构建的http 模块 path 路径操作模块 os 操作系统信息模块// 用来获取机器信息的var os = require('os')// 用来操作路径的var path = require('path')// 获取当前机器的 CPU 信息console.log(os.cpus._node模块中有很多核心模块,以下不属于核心模块,使用时需下载的是

数学建模【SPSS 下载-安装、方差分析与回归分析的SPSS实现(软件概述、方差分析、回归分析)】_化工数学模型数据回归软件-程序员宅基地

文章浏览阅读10w+次,点赞435次,收藏3.4k次。SPSS 22 下载安装过程7.6 方差分析与回归分析的SPSS实现7.6.1 SPSS软件概述1 SPSS版本与安装2 SPSS界面3 SPSS特点4 SPSS数据7.6.2 SPSS与方差分析1 单因素方差分析2 双因素方差分析7.6.3 SPSS与回归分析SPSS回归分析过程牙膏价格问题的回归分析_化工数学模型数据回归软件

利用hutool实现邮件发送功能_hutool发送邮件-程序员宅基地

文章浏览阅读7.5k次。如何利用hutool工具包实现邮件发送功能呢?1、首先引入hutool依赖<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.19</version></dependency>2、编写邮件发送工具类package com.pc.c..._hutool发送邮件

docker安装elasticsearch,elasticsearch-head,kibana,ik分词器_docker安装kibana连接elasticsearch并且elasticsearch有密码-程序员宅基地

文章浏览阅读867次,点赞2次,收藏2次。docker安装elasticsearch,elasticsearch-head,kibana,ik分词器安装方式基本有两种,一种是pull的方式,一种是Dockerfile的方式,由于pull的方式pull下来后还需配置许多东西且不便于复用,个人比较喜欢使用Dockerfile的方式所有docker支持的镜像基本都在https://hub.docker.com/docker的官网上能找到合..._docker安装kibana连接elasticsearch并且elasticsearch有密码

随便推点

Python 攻克移动开发失败!_beeware-程序员宅基地

文章浏览阅读1.3w次,点赞57次,收藏92次。整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近年来,随着机器学习的兴起,有一门编程语言逐渐变得火热——Python。得益于其针对机器学习提供了大量开源框架和第三方模块,内置..._beeware

Swift4.0_Timer 的基本使用_swift timer 暂停-程序员宅基地

文章浏览阅读7.9k次。//// ViewController.swift// Day_10_Timer//// Created by dongqiangfei on 2018/10/15.// Copyright 2018年 飞飞. All rights reserved.//import UIKitclass ViewController: UIViewController { ..._swift timer 暂停

元素三大等待-程序员宅基地

文章浏览阅读986次,点赞2次,收藏2次。1.硬性等待让当前线程暂停执行,应用场景:代码执行速度太快了,但是UI元素没有立马加载出来,造成两者不同步,这时候就可以让代码等待一下,再去执行找元素的动作线程休眠,强制等待 Thread.sleep(long mills)package com.example.demo;import org.junit.jupiter.api.Test;import org.openqa.selenium.By;import org.openqa.selenium.firefox.Firefox.._元素三大等待

Java软件工程师职位分析_java岗位分析-程序员宅基地

文章浏览阅读3k次,点赞4次,收藏14次。Java软件工程师职位分析_java岗位分析

Java:Unreachable code的解决方法_java unreachable code-程序员宅基地

文章浏览阅读2k次。Java:Unreachable code的解决方法_java unreachable code

标签data-*自定义属性值和根据data属性值查找对应标签_如何根据data-*属性获取对应的标签对象-程序员宅基地

文章浏览阅读1w次。1、html中设置标签data-*的值 标题 11111 222222、点击获取当前标签的data-url的值$('dd').on('click', function() { var urlVal = $(this).data('ur_如何根据data-*属性获取对应的标签对象

推荐文章

热门文章

相关标签