源码学习

发布时间 2023-07-06 00:12:16作者: JiuYou2020

源码学习

spring源码

1. AOP

AOP(Aspect-Oriented Programming)即面向切面编程,是一种编程范式,用于将横切关注点(例如日志记录、安全性、事务管理等)与应用程序的业务逻辑分离开来,以提高代码的可重用性和可维护性。

1. 在Spring中,AOP有以下三种实现方式:

  1. 基于JDK动态代理的AOP实现:针对实现了接口的类,Spring使用JDK动态代理生成代理对象,并通过代理对象拦截方法调用,实现AOP功能。
  2. 基于CGLIB的AOP实现:针对没有实现接口的类,Spring使用CGLIB动态字节码技术生成代理对象,并通过代理对象拦截方法调用,实现AOP功能。相对于JDK动态代理,CGLIB在生成代理对象时更加灵活,但是性能方面会稍微差一些。
  3. 基于AspectJ的AOP实现:AspectJ是一个独立的AOP框架,Spring可以通过与AspectJ的集成来实现AOP功能。AspectJ支持更加丰富的切点表达式和切面类型,可以实现更加复杂的AOP功能。

代理的解释:

在 Spring AOP 中,代理是用来增强目标对象的行为的。代理对象会拦截目标对象的方法调用,并在方法调用前、后或抛出异常时执行一些特定的逻辑,例如记录日志、安全检查、性能监控等。代理对象可以在不修改目标对象源代码的情况下实现这些增强行为,从而提高代码的可维护性和灵活性。

Spring AOP 中采用的代理模式是基于接口的 JDK 动态代理和基于类的 CGLIB 代理。当目标对象实现了至少一个接口时,Spring AOP 将使用 JDK 动态代理来创建代理对象;否则,Spring AOP 将使用 CGLIB 代理来创建代理对象。

需要注意的是,代理对象并不是目标对象的真正子类,而是通过动态生成的字节码来创建的新的对象。因此,在使用代理时,需要注意代理对象不会继承目标对象的非 public 方法和 final 方法的行为。

2. 在一个项目中,使用AOP可以提高代码的可重用性和可维护性,下面是使用AOP的一般步骤:

  1. 定义切面类:切面类是一个包含切点和增强逻辑的类。在切面类中,需要定义切点,即需要拦截的目标方法,并在切点周围织入增强逻辑。
  2. 配置AOP:在Spring配置文件中,需要配置AOP相关的内容,包括切面类、切点、增强类型等。
  3. 定义目标类:在项目中定义需要增强的目标类,即需要被切面类拦截的类。
  4. 配置目标类:在Spring配置文件中,需要配置目标类的相关内容,包括类名、属性、方法等。
  5. 使用AOP:在项目中使用AOP,即调用目标类的方法时,切面类会拦截方法调用,并在方法执行前、执行后或执行过程中织入增强逻辑。

使用AOP可以提高代码的可重用性和可维护性,但需要注意以下几点:

  1. 尽量减少切面的数量:过多的切面会增加代码的复杂度和维护成本,影响系统的性能。
  2. 定义合适的切点:合适的切点可以减少不必要的拦截和增强,提高系统的性能。
  3. 避免使用过多的环绕通知:过多的环绕通知会影响系统的性能,应该尽量避免使用。
  4. 注意切面和目标类的依赖关系:切面和目标类之间的依赖关系可能会影响代码的可维护性和可测试性,需要注意处理。

3. AOP能够增强某些方法,这里的增强指的是什么?

在AOP中,代理可以增强某些方法,具体指的是在目标方法执行前、执行后或执行过程中,通过织入切面逻辑来实现对目标方法的增强。

代理的增强通常包括以下几个方面:

  1. 日志记录:记录方法的参数、返回值、执行时间等信息,以便后续的调试和排查问题。
  2. 安全性控制:对方法进行权限验证,确保只有具有特定权限的用户才能调用该方法。
  3. 缓存控制:在方法执行前检查缓存中是否有目标数据,如果有,则直接返回缓存中的数据,避免重复执行方法。
  4. 事务管理:在方法执行前开启事务,在方法执行后根据执行结果决定是否提交或回滚事务。
  5. 性能监控:记录方法执行的时间、调用次数等信息,以便分析系统的性能瓶颈并进行优化。

通过代理增强方法,可以将这些横切关注点从业务逻辑中分离出来,避免代码的重复和耦合,提高代码的可重用性和可维护性。

在模拟时加深了对接口的理解

使用接口的好处主要有以下几个方面:

  1. 实现多态:使用接口可以实现多态,使得程序更加灵活和可扩展。
  2. 抽象出公共行为:接口可以抽象出公共行为,将其定义在接口中,在不同的实现类中实现这些行为,从而提高代码的可重用性和可维护性。
  3. 解耦合:使用接口可以将程序的各个组成部分解耦合,使得程序更加模块化和易于维护。

举例:

  1. 定义一个接口Interface Test1,存在一个invoke方法
  2. 在另外一个类Test2中调用这里的invoke()方法(在Test2中创建一个Test1的成员变量并编写构造方法)
  3. 在main中创建一个Test2对象,并通过Test2的构造方法,重写其中的invoke()方法来实现程序的灵活性
//1.
interface InvocationHandler {
    void invoke();
}
//2.
public class $Proxy0 implements A12.Foo {
    private InvocationHandler h;

    public $Proxy0(InvocationHandler h) {
        this.h = h;
    }

    @Override
    public void foo() {
        h.invoke();
    }
}
//3.
    public static void main(String[] args) {
        new $Proxy0(new InvocationHandler() {
            @Override
            public void invoke() {
                System.out.println("需要实现的其它功能");
                System.out.println("foo");
            }
        }).foo();
    }

2. SpringMvc

1. 如何通过配置文件批量读取配置

  1. 新建xxxxProperties类,加上注解@ConfigurationProperties(prefix="spring.mvc",这里是以WebMvcProperties为例,在其中添加想要读取的变量,与配置文件的名称相互对应,这样就可以通过WebMvcProperties来批量读取配置.

2. 不直接返回Response对象而是通过配置返回Response对象

背景:

在crud中,我们经常需要在控制器Controller中返回固定的json格式响应体如:

{
	code:"",
	msg:"",
	data:{}
}

一般来说,我们是直接在控制器方法中返回自定义的Response类来实现的,那么,如何直接返回对象而把对象通过配置直接包装到data中呢?

利用@ControllerAdvice来实现

@ControllerAdvice
    static class MyControllerAdvice implements ResponseBodyAdvice<Object> {
        // 满足条件才转换
        public boolean supports(MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
            if (returnType.getMethodAnnotation(ResponseBody.class) != null ||
                AnnotationUtils.findAnnotation(returnType.getContainingClass(), ResponseBody.class) != null) {
//                returnType.getContainingClass().isAnnotationPresent(ResponseBody.class)) {
                return true;
            }
            return false;
        }

        // 将 User 或其它类型统一为 Result 类型
        public Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
            if (body instanceof Result) {
                return body;
            }
            return Result.ok(bod y);
        }
    }

而此方法而可以用作将一些其它类转化为某一些特定的类.

3.DispatcherServlet的作用

DispatcherServlet是Spring框架中的一个重要组件,它是Web应用程序中的前置控制器(Front Controller),主要作用是将客户端的请求分发给相应的处理器(Controller)进行处理,以及将处理器的处理结果分发给视图(View)进行渲染。

具体来说,当客户端发起一个请求时,DispatcherServlet会拦截该请求,根据请求的URL和HTTP方法,将其分发给对应的HandlerMapping找到对应的HandlerAdapter,并调用HandlerAdapter的方法来执行相应的处理器方法。处理器方法会根据请求参数、路径变量等信息进行处理,并返回一个ModelAndView对象,其中包含了需要呈现的模型数据以及对应的视图名。DispatcherServlet再将这个ModelAndView对象传递给对应的ViewResolver,进行视图解析和渲染,最终将结果发送给客户端。

除此之外,DispatcherServlet还提供了一些其他的功能,例如:

  • 支持文件上传和下载
  • 支持国际化
  • 支持Flash属性- 支持异步请求和响应
  • 支持Spring Security等安全框架的集成

4.Tomcat异常处理

  • 我们知道 @ExceptionHandler 只能处理发生在 mvc 流程中的异常,例如控制器内、拦截器内,那么如果是 Filter 出现了异常,如何进行处理呢?

  • 在 Spring Boot 中,是这么实现的:

    1. 因为内嵌了 Tomcat 容器,因此可以配置 Tomcat 的错误页面,Filter 与 错误页面之间是通过请求转发跳转的,可以在这里做手脚
    2. 先通过 ErrorPageRegistrarBeanPostProcessor 这个后处理器配置错误页面地址,默认为 /error 也可以通过 ${server.error.path} 进行配置
    3. 当 Filter 发生异常时,不会走 Spring 流程,但会走 Tomcat 的错误处理,于是就希望转发至 /error 这个地址
      • 当然,如果没有 @ExceptionHandler,那么最终也会走到 Tomcat 的错误处理
    4. Spring Boot 又提供了一个 BasicErrorController,它就是一个标准 @Controller,@RequestMapping 配置为 /error,所以处理异常的职责就又回到了 Spring
    5. 异常信息由于会被 Tomcat 放入 request 作用域,因此 BasicErrorController 里也能获取到
    6. 具体异常信息会由 DefaultErrorAttributes 封装好
    7. BasicErrorController 通过 Accept 头判断需要生成哪种 MediaType 的响应
      • 如果要的不是 text/html,走 MessageConverter 流程
      • 如果需要 text/html,走 mvc 流程,此时又分两种情况
        • 配置了 ErrorViewResolver,根据状态码去找 View
        • 没配置或没找到,用 BeanNameViewResolver 根据一个固定为 error 的名字找到 View,即所谓的 WhitelabelErrorView

评价

  • 一个错误处理搞得这么复杂,就问恶心不?
演示1 - 错误页处理
关键代码
@Bean // ⬅️修改了 Tomcat 服务器默认错误地址, 出错时使用请求转发方式跳转
public ErrorPageRegistrar errorPageRegistrar() {
    return webServerFactory -> webServerFactory.addErrorPages(new ErrorPage("/error"));
}

@Bean // ⬅️TomcatServletWebServerFactory 初始化前用它增强, 注册所有 ErrorPageRegistrar
public ErrorPageRegistrarBeanPostProcessor errorPageRegistrarBeanPostProcessor() {
    return new ErrorPageRegistrarBeanPostProcessor();
}
收获?
  1. Tomcat 的错误页处理手段
演示2 - BasicErrorController
关键代码
@Bean // ⬅️ErrorProperties 封装环境键值, ErrorAttributes 控制有哪些错误信息
public BasicErrorController basicErrorController() {
    ErrorProperties errorProperties = new ErrorProperties();
    errorProperties.setIncludeException(true);
    return new BasicErrorController(new DefaultErrorAttributes(), errorProperties);
}

@Bean // ⬅️名称为 error 的视图, 作为 BasicErrorController 的 text/html 响应结果
public View error() {
    return new View() {
        @Override
        public void render(
            Map<String, ?> model, 
            HttpServletRequest request, 
            HttpServletResponse response
        ) throws Exception {
            System.out.println(model);
            response.setContentType("text/html;charset=utf-8");
            response.getWriter().print("""
                    <h3>服务器内部错误</h3>
                    """);
        }
    };
}

@Bean // ⬅️收集容器中所有 View 对象, bean 的名字作为视图名
public ViewResolver viewResolver() {
    return new BeanNameViewResolver();
}
收获?
  1. Spring Boot 中 BasicErrorController 如何工作

5. SpringBoot内嵌Tomcat服务器

        Server
        └───Service
            ├───Connector (协议, 端口)
            └───Engine
                └───Host(虚拟主机 localhost)
                    ├───Context1 (应用1, 可以设置虚拟路径, / 即 url 起始路径; 项目磁盘路径, 即 docBase )
                    │   │   index.html
                    │   └───WEB-INF
                    │       │   web.xml (servlet, filter, listener) 3.0
                    │       ├───classes (servlet, controller, service ...)
                    │       ├───jsp
                    │       └───lib (第三方 jar 包)
                    └───Context2 (应用2)
                        │   index.html
                        └───WEB-INF
                                web.xml

6. 自定义注解使用

详见spring源码A42_2

例如,需要实现一个自定义注解,传入类名和一个布尔值,判断是否存在该bean

  1. 定义注解
@Retention(RetentionPolicy.RUNTIME)//在运行时
    @Target({ElementType.METHOD, ElementType.TYPE})//前者指定可以在方法上使用,后者则是可以在类上使用
    @Conditional(MyCondition.class)//表示被标注的元素是否存在依赖于 MyCondition 类的判断条件
    @interface ConditionalOnClass {
        boolean exists(); // true 判断存在 false 判断不存在

        String className(); // 要判断的类名
    }

@Conditional(MyCondition.class) 这个注解表示被标注的元素是否存在依赖于 MyCondition 类的判断条件。它用于根据某些条件有条件地启用或禁用被注解的元素。

MyCondition 类的 matches() 方法返回 true 时,被标注的元素将被启用,而当它返回 false 时,元素将被禁用。

@ConditionalOnClass 的情况下,它用于根据类路径中是否存在指定的类来有条件地启用或禁用类或方法。如果 exists 属性设置为 true,则表示只有当类路径中存在指定的类时,被注解的类或方法才会被启用。如果它设置为 false,则表示只有当类路径中不存在指定的类时,被注解的类或方法才会被启用。className 属性用于指定要检查存在性的类的完全限定名称。

  1. 实现MyCondition.class
static class MyCondition implements Condition { // 存在 Druid 依赖
        @Override
        public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
            Map<String, Object> attributes = metadata.getAnnotationAttributes(ConditionalOnClass.class.getName());
            String className = attributes.get("className").toString();
            boolean exists = (boolean) attributes.get("exists");
            boolean present = ClassUtils.isPresent(className, null);
            return exists ? present : !present;
        }
    }

Seata源码

建议先看完官方文档的用户文档及开发者指南部分

各事务模式(AT,TCC,SAGA,XA)

具体介绍详见官方文档

领域模型(TM,RM,TC)

具体介绍详见官方文档

Metrics设计

具体介绍详见官方文档

Manual Transaction 模式

一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commit 或 rollback 行为

根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction ModeManual (Branch) Transaction Mode.

AT 模式(参考链接 TBD)基于 支持本地 ACID 事务关系型数据库

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,MT 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 MT 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

[1].https://seata.io/zh-cn/blog/manual-transaction-mode.html

Seata是如何实现领域模型的

本文主要内容为Seata代码的理解

项目地址

这里的代码可能与最新版本的Seata有些区别,博主也借鉴了一些其它人的博客

Seata的TXC模型

img

TXC的实现通过三个组件来完成。也就是上图的三个深黄色部分,其作用如下:

  1. TM:全局事务管理器,在标注开启fescar分布式事务的服务端开启,并将全局事务发送到TC事务控制端管理
  2. TC:事务控制中心,控制全局事务的提交或者回滚。这个组件需要独立部署维护,目前只支持单机版本,后续迭代计划会有集群版本
  3. RM:资源管理器,主要负责分支事务的上报,本地事务的管理

实现过程:

  • 服务起始方发起全局事务并注册到TC。
  • 在调用协同服务时,协同服务的事务分支事务会先完成阶段一的事务提交或回滚,并生成事务回滚的undo_log日志,同时注册当前协同服务到TC并上报其事务状态,归并到同一个业务的全局事务中。
  • 此时若没有问题继续下一个协同服务的调用,期间任何协同服务的分支事务回滚,都会通知到TC,TC在通知全局事务包含的所有已完成一阶段提交的分支事务回滚。
  • 如果所有分支事务都正常,最后回到全局事务发起方时,也会通知到TC,TC在通知全局事务包含的所有分支删除回滚日志。
  • 在这个过程中为了解决写隔离和度隔离的问题会涉及到TC管理的全局锁。

本博文的目标是深入代码细节,探究其基本思路是如何实现的。

有大佬画了一副流程图,我贴在这里:https://www.processon.com/view/link/6007f5c00791294a0e9b611a

项目结构解析

项目拉下来,用IDE打开后的目录结构如下,下面先大致的看下每个模块的实现

image-20230607165227149

  1. all:这是一个聚合模块,它包含了所有其他模块的依赖,用于方便构建和部署整个Seata系统。
  2. bom:这是一个Bill of Materials(BOM)模块,它定义了Seata的版本和所需的所有依赖库的版本,以确保它们在构建和部署时保持一致。
  3. build:这个模块包含了Seata的构建脚本和相关的工具,用于构建Seata项目。
  4. common:这个模块包含了Seata中通用的工具类和功能,例如序列化、反序列化、日志处理,常用辅助类,静态变量、扩展机制类加载器、以及定义全局的异常等。
  5. compressor:这个模块实现了Seata中的数据压缩功能,用于在分布式事务中减少网络传输的数据量。
  6. config:这个模块提供了Seata的配置管理功能,包括读取和解析配置文件,以及提供配置信息的API。
  7. console:这个模块是Seata的控制台模块,提供了一个可视化的管理控制台,用于监控和管理Seata的分布式事务。
  8. core:核心模块主要封装了TM、RM和TC通讯用RPC相关内容。
  9. dependencies:这个模块定义了Seata的所有依赖库的版本和引用,用于管理和控制Seata的依赖关系。
  10. discovery:这个模块提供了服务发现和注册功能,用于在分布式环境中定位和连接Seata的各个组件。
  11. distribution:这个模块包含了Seata的发布版本,用于方便用户下载和使用。
  12. integration-tx-api:这个模块提供了与Spring等框架集成的相关API,用于在应用程序中使用Seata的分布式事务功能。
  13. metrics:这个模块实现了Seata的性能指标收集和监控功能,用于统计和展示Seata的性能数据。
  14. rm:这个模块是Seata的资源管理器(Resource Manager,RM)模块,负责管理和协调分布式事务中的资源。
  15. rm-datasource:这个模块提供了Seata与不同数据源的集成支持,例如MySQL、Oracle等数据库。
  16. saga:这个模块实现了Seata的Saga模式,用于处理长时间跨越多个服务的分布式事务。
  17. seata-spring-autoconfigure:这个模块提供了Seata在Spring Boot环境中自动配置的功能,简化了Seata的集成和配置过程。
  18. seata-spring-boot-starter:这个模块是Seata在Spring Boot应用中使用的起步依赖模块,方便用户快速集成和使用Seata。
  19. serializer:这个模块提供了Seata的序列化和反序列化功能,用于在分布式事务中对对象进行序列化和传输。
  20. server:这个模块是Seata的服务器模块,提供了Seata服务的启动、关闭和管理功能,同时维护全局锁。
  21. spring:这个模块提供了Seata在Spring框架中的集成支持,例如事务管理器、AOP切面等,是研究Seata的突破口。
  22. sqlparser:这个模块实现了Seata的SQL解析功能,用于解析和处理分布式事务中的SQL语句。
  23. tcc:这个模块实现了Seata的TCC(Try-Confirm-Cancel)模式,用于处理分布式事务中的两阶段提交逻辑。
  24. test:这个模块包含了Seata的测试代码和工具,用于进行单元测试和集成测试。
  25. tm:这个模块是Seata的事务管理器(Transaction Manager,TM)模块,负责管理和协调分布式事务的执行和提交,全局事务事务管理模块,管理全局事务的边界,全局事务开启回滚点都在这个模块控制。
  26. changes:这个模块包含了Seata的变更日志文件,用于记录版本之间的更新和变更。
  27. ext:这个模块是Seata的扩展模块,提供了一些额外的功能和扩展点,可以通过该模块进行自定义扩展。
  28. integration:这个模块包含了Seata与其他框架和组件的集成支持,例如Dubbo、RocketMQ等。
  29. script:这个模块包含了Seata的数据库脚本文件,用于创建和初始化Seata所需的数据库表结构。
  30. seata-plugin:这个模块是Seata的插件模块,用于支持Seata在不同平台和框架下的插件机制,方便扩展和定制。
  31. sessionStore:这个模块提供了Seata的会话存储功能,用于存储和管理分布式事务的会话信息。
  32. style:这个模块定义了Seata的代码风格和规范,包括代码格式化、命名规范等,用于保持整个项目的一致性和可读性。

以上介绍可能有误,欢迎指正补充

Seata事务过程分析

初始化
  1. RM、TM的初始化都是由类GlobalTransactionScanner触发的
  2. TM的初始化:
    1. 当Seata服务器(server)启动时,会初始化TM实例。TM负责管理和协调分布式事务的执行和提交。
    2. TM会向注册中心(Registry Center)注册自己的信息,以便TC可以找到并与其通信。
    3. TM还会启动一个定时任务,定期向TC发送心跳消息,以保持与TC的连接和状态同步。
  3. RM的初始化:
    1. 每个参与分布式事务的RM都需要在自己的应用中进行初始化。
    2. RM实例负责管理和操作本地资源,并与TM和TC进行通信。
    3. RM在初始化时会向注册中心注册自己的信息,以便TC可以找到并与其通信。
    4. RM还会订阅与自己相关的事务信息,以便接收TC发送的事务消息和指令。

Seata的事务分析在官网的用户文档的Api支持,微服务框架支持以及附录的事务隔离中解释的非常清楚,这里仅补充博主不太理解的部分,其中重点关注事务隔离,它将事务讲的非常清晰

1. RpcContext和RootContext异同

博主初次看见RpcContext和RootContext时觉得很迷,特此比较了一下它们的异同

RpcContext是在分布式事务框架Seata中用于封装和管理与远程服务通信的上下文信息的对象。它存储了与远程服务通信相关的各种属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等。RpcContext还提供了方法来持有和释放上下文信息,并在不同的通道和持有者之间进行关联。通过RpcContext,可以在分布式事务的执行过程中传递和共享上下文信息,以实现分布式事务的管理和协调。

RootContext是Seata分布式事务框架中的一个类,用于管理全局事务的上下文信息。它是一个线程级别的上下文,用于在分布式事务的不同参与者之间传递和共享全局事务ID和分支事务ID。RootContext提供了静态方法来设置和获取全局事务ID和分支事务ID,并通过ThreadLocal来保证在同一线程中的全局事务上下文的一致性。

区别:

  1. 功能不同:RpcContext主要用于管理与远程服务通信相关的上下文信息,而RootContext用于管理全局事务的上下文信息。
  2. 作用范围不同:RpcContext的作用范围是在远程服务通信过程中,用于传递和共享上下文信息;而RootContext的作用范围是在整个分布式事务执行过程中,用于传递和共享全局事务上下文信息。
  3. 存储内容不同:RpcContext存储与远程服务通信相关的属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等;而RootContext主要存储全局事务ID和分支事务ID。
  4. 使用方式不同:RpcContext通过实例对象进行操作,而RootContext是通过静态方法直接操作全局事务上下文。

其中:

  1. RootContext:事务的根上下文:负责在应用的运行时,维护 XID 。

  2. RpcContext:RpcContext的作用是在分布式事务框架Seata中,用于封装和管理与远程服务通信的上下文信息。它存储了与远程服务通信相关的各种属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等。RpcContext还提供了方法来持有和释放上下文信息,并在不同的通道和持有者之间进行关联。通过RpcContext,可以在分布式事务的执行过程中传递和共享上下文信息,以实现分布式事务的管理和协调。

/*
 *  RpcContext是Seata中用于在分布式环境中传递事务上下文信息的工具类。
 *  它负责存储和传递各个参与者服务的事务上下文信息,并提供了一些方法来操作上下文信息。
 *  RpcContext对象与具体的服务实例和线程绑定,每个线程在执行业务逻辑时都可以访问和修改自己的RpcContext对象。
 *  RpcContext的作用主要有以下几点:
 *  
 *  1. 存储上下文信息:
 *     - applicationId:应用程序ID
 *     - transactionServiceGroup:事务分组名称
 *     - clientId:客户端ID
 *     - channel:与远程服务通信的Netty Channel对象
 *     - resourceSets:参与者服务涉及的资源集合
 *  
 *  2. 传递上下文信息:
 *     - 通过holdInClientChannels()方法,将RpcContext存储在客户端的通道中,用于传递给事务管理器(TM)
 *     - 通过holdInIdentifiedChannels()方法,将RpcContext存储在标识的通道中,用于传递给资源管理器(RM)
 *     - 通过holdInResourceManagerChannels()方法,将RpcContext存储在资源管理器的通道中,用于传递给资源管理器(RM)
 *  
 *  3. 释放上下文信息:
 *     - 通过release()方法,释放RpcContext对象及其关联的资源,清空持有的上下文信息
 *  
 *  4. 操作上下文信息:
 *     - 通过addResource()方法,向resourceSets中添加资源
 *     - 通过addResources()方法,向resourceSets中添加多个资源
 *     - 通过getPortMap()方法,获取指定资源ID对应的端口映射
 *     - 通过getClientRMHolderMap()方法,获取客户端资源管理器(RM)持有者的映射表
 *  
 *  RpcContext在Seata的分布式事务中起到了传递上下文和协调服务之间的作用,确保分布式事务的一致性和可靠性。
 */

public class RpcContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcContext.class);

   private NettyPoolKey.TransactionRole clientRole;  // 客户端角色

   private String version;  // 版本

    private String applicationId;  // 应用程序ID

    private String transactionServiceGroup;  // 事务分组名称

    private String clientId;  // 客户端ID

    private Channel channel;  // 与远程服务通信的Netty Channel对象

    private Set<String> resourceSets;  // 参与者服务涉及的资源集合

    private ConcurrentMap<Channel, RpcContext> clientIDHolderMap;  // 客户端ID持有者的映射表

    private ConcurrentMap<Integer, RpcContext> clientTMHolderMap;  // 客户端事务管理器(TM)持有者的映射表

    private ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientRMHolderMap;  // 客户端资源管理器(RM)持有者的映射表

    // 释放上下文信息
    public void release() {
        Integer clientPort = ChannelUtil.getClientPortFromChannel(channel);
        if (clientIDHolderMap != null) {
            clientIDHolderMap = null;
        }
        if (clientRole == NettyPoolKey.TransactionRole.TMROLE && clientTMHolderMap != null) {
            clientTMHolderMap.remove(clientPort);
            clientTMHolderMap = null;
        }
        if (clientRole == NettyPoolKey.TransactionRole.RMROLE && clientRMHolderMap != null) {
            for (Map<Integer, RpcContext> portMap : clientRMHolderMap.values()) {
                portMap.remove(clientPort);
            }
            clientRMHolderMap = null;
        }
        if (resourceSets != null) {
            resourceSets.clear();
        }
    }

    // 在客户端通道中存储RpcContext
    public void holdInClientChannels(ConcurrentMap<Integer, RpcContext> clientTMHolderMap) {
        if (this.clientTMHolderMap != null) {
            throw new IllegalStateException();
        }
        this.clientTMHolderMap = clientTMHolderMap;
        Integer clientPort = ChannelUtil.getClientPortFromChannel(channel);
        this.clientTMHolderMap.put(clientPort, this);
    }

    // 在标识的通道中存储RpcContext
    public void holdInIdentifiedChannels(ConcurrentMap<Channel, RpcContext> clientIDHolderMap) {
        if (this.clientIDHolderMap != null) {
            throw new IllegalStateException();
        }
        this.clientIDHolderMap = clientIDHolderMap;
        this.clientIDHolderMap.put(channel, this);
    }

    // 在资源管理器的通道中存储RpcContext
    public void holdInResourceManagerChannels(String resourceId, ConcurrentMap<Integer, RpcContext> portMap) {
        if (this.clientRMHolderMap == null) {
            this.clientRMHolderMap = new ConcurrentHashMap<>();
        }
        Integer clientPort = ChannelUtil.getClientPortFromChannel(channel);
        portMap.put(clientPort, this);
        this.clientRMHolderMap.put(resourceId, portMap);
    }

    // 在资源管理器的通道中存储RpcContext
    public void holdInResourceManagerChannels(String resourceId, Integer clientPort) {
        if (this.clientRMHolderMap == null) {
            this.clientRMHolderMap = new ConcurrentHashMap<>();
        }
        ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(clientRMHolderMap, resourceId,
            key -> new ConcurrentHashMap<>());
        portMap.put(clientPort, this);
    }

    // 获取客户端资源管理器(RM)持有者的映射表
    public ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> getClientRMHolderMap() {
        return clientRMHolderMap;
    }

    // 获取指定资源ID对应的端口映射
    public Map<Integer, RpcContext> getPortMap(String resourceId) {
        return clientRMHolderMap.get(resourceId);
    }

    // 获取客户端ID
    public String getClientId() {
        return clientId;
    }

    // 获取通道
    public Channel getChannel() {
        return channel;
    }

    // 设置通道
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    // 获取应用程序ID
    public String getApplicationId() {
        return applicationId;
    }

    // 设置应用程序ID
    public void setApplicationId(String applicationId) {
        this.applicationId = applicationId;
    }

    // 获取事务服务组
    public String getTransactionServiceGroup() {
        return transactionServiceGroup;
    }

    // 设置事务服务组
    public void setTransactionServiceGroup(String transactionServiceGroup) {
        this.transactionServiceGroup = transactionServiceGroup;
    }

    // 获取客户端角色
    public NettyPoolKey.TransactionRole getClientRole() {
        return clientRole;
    }

    // 设置客户端角色
    public void setClientRole(NettyPoolKey.TransactionRole clientRole) {
        this.clientRole = clientRole;
    }

    // 获取版本
    public String getVersion() {
        return version;
    }

    // 设置版本
    public void setVersion(String version) {
        this.version = version;
    }

    // 获取参与者服务涉及的资源集合
    public Set<String> getResourceSets() {
        return resourceSets;
    }

    // 设置参与者服务涉及的资源集合
    public void setResourceSets(Set<String> resourceSets) {
        this.resourceSets = resourceSets;
    }

    // 添加资源
    public void addResource(String resource) {
        if (StringUtils.isBlank(resource)) {
            return;
        }
        if (resourceSets == null) {
            this.resourceSets = new HashSet<String>();
        }
        this.resourceSets.add(resource);
    }

    // 添加资源集合
    public void addResources(Set<String> resources) {
        if (resources == null) {
            return;
        }
        if (resourceSets == null) {
            this.resourceSets = new HashSet<String>();
        }
        this.resourceSets.addAll(resources);
    }

    // 设置客户端ID
    public void setClientId(String clientId) {
        this.clientId = clientId;
    }
}
2. 看TransactionPropagationFilter 的源码可能有些不太理解,这里以一个下订单的例子进行讲述(源码太长,这里不贴出来了)
  1. 用户调用 OrderService 的下订单接口,发起一个订单创建请求。
  2. OrderService 在处理请求之前,会通过代码中的 RootContext.getXID() 获取当前的全局事务 ID(XID)。
  3. 如果当前存在全局事务 ID(XID),说明该请求是在一个分布式事务中发起的。OrderService 将把 XID 设置到 RpcContext 的附件中,以便传递给下游的服务提供方(InventoryService)。
  4. 如果当前不存在全局事务 ID(XID),但在请求的 RpcContext 附件中存在 XID,说明该请求是从上游传递过来的。OrderService 将从 RpcContext 的附件中获取 XID,并将其绑定到当前运行时的事务上下文(RootContext)中。
  5. 接下来,OrderService 执行业务逻辑,比如创建订单。
  6. 一旦业务逻辑执行完毕,会调用下游的服务提供方 InventoryService 来减少商品库存。
  7. 在调用 InventoryService 之前,事务传播过滤器会将当前的全局事务 ID(XID)设置到 RpcContext 的附件中,确保 InventoryService 可以获取到正确的事务上下文。
  8. 接着,OrderService 执行 invoker.invoke(invocation) 调用,即调用 InventoryService 的减少库存方法。
  9. InventoryService 收到请求后,会从 RpcContext 的附件中获取传递过来的全局事务 ID(XID),以确保在减少库存的操作中参与到正确的分布式事务中。
  10. 一旦库存操作完成,返回结果给 OrderService。
  11. 在 finally 块中,事务传播过滤器会解绑当前绑定的 XID,并进行清理工作。
  12. 如果解绑后的 XID 与传递过来的 XID 不一致,会发出警告。这种情况可能发生在调用过程中有新的事务上下文开启,导致 XID 发生变化。为了保持事务的一致性,事务传播过滤器会重新将解绑后的 XID 绑定到事务上下文中。
3. 看AbstractConnectionProxy 的源码,仍以下订单举例

这段代码是 Seata 在处理数据库连接的代理逻辑,其中涉及到了分支事务的类型判断和 PreparedStatement 的创建。

假设有一个下订单的例子,包含以下 SQL 语句:

INSERT INTO orders (order_id, customer_id, total_amount) VALUES (?, ?, ?)

现在假设该订单的分支事务类型是 AT(原子事务)。在执行该 SQL 语句时,以下是代码的执行逻辑:

  1. 首先,通过调用 RootContext.getBranchType() 获取当前的分支事务类型。

  2. 如果分支事务类型是 AT(BranchType.AT),表示当前事务是原子事务,即不需要关注主键的返回情况。

  3. 接下来,使用 SQL 解析器(SQLRecognizer)来解析 SQL 语句,获取表名和 SQL 类型。

  4. 在这个例子中,SQL 类型是 INSERT,所以需要检查表的元数据(TableMeta)来获取主键信息。

  5. 如果表的元数据中包含主键信息,说明该表有自动生成的主键,需要使用带有主键参数的 prepareStatement() 方法创建 PreparedStatement 对象。

  6. 如果表的元数据中没有主键信息,或者 SQL 解析过程中出现异常,将使用普通的 prepareStatement() 方法创建 PreparedStatement 对象。

  7. 最后,返回一个 PreparedStatementProxy 对象,该对象封装了原始 PreparedStatement 对象和相关的代理逻辑。

通过以上逻辑,Seata 可以根据不同的分支事务类型和 SQL 语句类型,在创建 PreparedStatement 时进行特定的处理。在这个例子中,当分支事务类型为 AT 且 SQL 类型为 INSERT 时,会使用带有主键参数的 prepareStatement() 方法创建 PreparedStatement 对象。

注意:以上仅是对代码逻辑的分析和假设的情况。实际的执行结果可能会根据具体的代码实现和配置进行调整和变化。

4. ExecuteTemplate 源码
public class ExecuteTemplate {
 /**
     * 执行数据库操作
     *
     * @param statementProxy    StatementProxy 对象
     * @param statementCallback StatementCallback 对象
     * @param args              参数列表
     * @param <T>               返回结果类型
     * @param <S>               Statement 的具体类型
     * @return 执行结果
     * @throws SQLException SQL 异常
     */
    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }

    /**
     * 执行数据库操作
     *
     * @param sqlRecognizers    SQLRecognizer 列表
     * @param statementProxy    StatementProxy 对象
     * @param statementCallback StatementCallback 对象
     * @param args              参数列表
     * @param <T>               返回结果类型
     * @param <S>               Statement 的具体类型
     * @return 执行结果
     * @throws SQLException SQL 异常
     */
    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        // 如果不需要全局锁且分支事务类型不是 AT,则直接执行原始的数据库操作
        if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        String dbType = statementProxy.getConnectionProxy().getDbType();
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            // 如果 SQLRecognizer 列表为空,则根据 SQL 语句和数据库类型获取 SQLRecognizer 列表
            sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
        }

        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            // 如果 SQLRecognizer 列表为空,则使用 PlainExecutor 执行原始的数据库操作
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        // 如果是 INSERT 语句,根据数据库类型选择相应的 Executor
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        // 如果是 UPDATE 语句,根据数据库类型选择相应的 Executor
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case DELETE:
                        // 如果是 DELETE 语句,根据数据库类型选择相应的 Executor
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case SELECT_FOR_UPDATE:
                        // 如果是 SELECT FOR UPDATE 语句,根据数据库类型选择相应的 Executor
                        if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                            executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        } else {
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        }
                        break;
                    case INSERT_ON_DUPLICATE_UPDATE:
                        // 如果是 INSERT ON DUPLICATE UPDATE 语句,根据数据库类型选择相应的 Executor
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                            case JdbcConstants.MARIADB:
                                executor = new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                        }
                        break;
                    case UPDATE_JOIN:
                        // 如果是 UPDATE JOIN 语句,根据数据库类型选择相应的 Executor
                        switch (dbType) {
                            case JdbcConstants.MYSQL:
                                executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                                break;
                            default:
                                throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
                        }
                        break;
                    default:
                        // 默认情况下,使用 PlainExecutor 执行原始的数据库操作
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                // 如果有多个 SQLRecognizer,则使用 MultiExecutor 执行多个 SQLRecognizer
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }

        T rs;
        try {
            // 执行具体的数据库操作
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // 将其他异常转换为 SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
}

假设我们有一个名为 OrderService 的微服务,该服务使用了 Seata 进行分布式事务管理。我们将使用上述的 ExecuteTemplate 类来执行数据库操作。

假设我们的订单服务需要在创建订单时插入订单信息到数据库中。下面是示例的订单创建方法:

public class OrderService {

    private DataSource dataSource; // 假设已注入数据源

    public void createOrder(Order order) {
        Connection connection = null;
        try {
            connection = dataSource.getConnection();
            StatementProxy<Statement> statementProxy = new StatementProxy<>(connection.createStatement());
            String sql = "INSERT INTO orders (order_id, customer_id, total_amount) VALUES (?, ?, ?)";
            ExecuteTemplate.execute(statementProxy, (statement, args) -> {
                PreparedStatement preparedStatement = (PreparedStatement) statement;
                preparedStatement.setString(1, order.getId());
                preparedStatement.setString(2, order.getCustomerId());
                preparedStatement.setDouble(3, order.getTotalAmount());
                return preparedStatement.executeUpdate();
            }, sql);
            connection.commit(); // 提交事务
        } catch (SQLException e) {
            if (connection != null) {
                try {
                    connection.rollback(); // 回滚事务
                } catch (SQLException ex) {
                    // 处理回滚异常
                }
            }
            // 处理异常
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    // 处理连接关闭异常
                }
            }
        }
    }
}

在上述代码中,我们通过 dataSource 获取数据库连接,并创建了一个 StatementProxy 对象,该对象用于代理原始的 Statement 对象。我们要执行的 SQL 语句是插入订单信息的语句。

在执行 ExecuteTemplate.execute 方法时,会判断当前的分支事务类型和是否需要全局锁。如果不需要全局锁且分支事务类型不是 AT(原子性事务),则直接执行原始的数据库操作。

但是,在我们的例子中,假设需要全局锁且分支事务类型是 AT,因此会根据 SQL 类型选择相应的 Executor 来执行数据库操作。对于插入语句(SQLType.INSERT),会使用 InsertExecutor 进行执行。

InsertExecutor 是 Seata 提供的一个实现,它会在插入操作前进行一些额外的处理,例如生成全局唯一的 ID(如果配置了 ID 生成器)或者处理分布式锁等。

通过使用 ExecuteTemplate 和相应的 Executor,我们可以在分布式事务中执行数据库操作,并根据事务的类型和需要进行相应的处理,以保证事务的一致性和隔离性。

  1. AbstractDMLBaseExecutor源码
/**
 * 抽象的 DML 基础执行器
 * @param <T> 执行结果的类型
 * @param <S> Statement 的子类型
 */
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
                                   SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

    /**
     * 处理自动提交为 true 的情况
     * @param args 方法参数
     * @return 执行结果
     * @throws Throwable 执行过程中的异常
     */
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.changeAutoCommit(); // 开启事务
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit(); // 提交事务
                return result;
            });
        } catch (Exception e) {
            // 当 finally 块中发生异常时,该异常会丢失,所以在这里打印异常信息
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

    /**
     * 处理自动提交为 false 的情况
     * @param args 方法参数
     * @return 执行结果
     * @throws Exception 执行过程中的异常
     */
    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
}

/**
 * UPDATE 语句的执行器
 * @param <T> 执行结果的类型
 * @param <S> Statement 的子类型
 */
public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
                          SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

}
  • executeAutoCommitTrue 方法处理自动提交为 true 的情况。首先,获取连接代理对象,并通过调用 changeAutoCommit 方法将自动提交设置为 false,即开启事务。然后,使用 LockRetryPolicy 类实现了一个重试策略,执行 executeAutoCommitFalse 方法获取执行结果,并通过连接代理对象的 commit 方法提交事务。如果在执行过程中出现异常,会将异常打印,并根据配置决定是否回滚事务。
  • executeAutoCommitFalse 方法处理自动提交为 false 的情况。首先,检查数据库类型是否为 MySQL 并且是否存在多个主键,如果是则抛出不支持的异常。然后,获取执行前的数据镜像(beforeImage),执行具体的数据库操作,并获取执行结果。接着,根据执行前后的数据镜像获取变更的数据(afterImage),并准备撤销日志(undo log)。最后,返回执行结果。

以下订单为例,流程追踪

博主源码和demo在这:https://github.com/JiuYou2020/seata-demo

先给一张比较古早版本的,虽然和现版本的不一致,但对博主的帮助很大,感谢创作这幅图的作者!

request和response的处理流程

request

rm client -> server

RegisterRMRequest
MergedWarpMessage
BranchRegisterRequest
BranchReportRequest
GlobalLockQueryRequest

tm client -> server

RegisterTMRequest
MergedWarpMessage
GlobalBeginRequest
GlobalCommitRequest
GlobalRollbackRequest
GlobalStatusRequest
GlobalReportRequest

server -> rm client

BranchCommitRequest
BranchRollbackRequest
UndoLogDeleteRequest

server -> tm client

// null

response

Server -> rm client

RegisterRMResponse
MergeResultMessage
BranchRegisterResponse
BranchReportResponse
GlobalLockQueryResponse

Server -> tm client

RegisterTMResponse
MergeResultMessage
GlobalBeginResponse
GlobalCommitResponse
GlobalReportResponse
GlobalRollbackResponse

rm client -> server

BranchCommitResponse
BranchRollbackResponse

tm client -> server

// null

0. 梦开始的地方

io.seata.spring.annotation.GlobalTransactionScanner所作的工作

  1. 扫描 Spring 容器中的 Bean 定义,识别带有 @GlobalTransactional 注解的方法。
  2. 初始化客户端,RM和TM的初始化流程在这里开始,与TC的沟通也从这里开始
  3. 添加拦截器(那些拦截带有@GlobalTransactional注解的方法)
  4. 注册spring关闭钩子,执行应用程序关闭时所需要进行的一些处理(例如清理TM和RM)

1. RM及TM注册流程

  1. 启动seata-demo的任意一个微服务,这里是RM注册流程
  2. Netty是Seata中的网络传输框架,构建了客户端与服务端直接的通信框架,因此猜测发送的请求会经过这里,我们找到两个与远程netty相关的类
    • AbstractNettyRemotingClient:
      • 这是一个抽象类,用于实现基于 Netty 的远程通信客户端。它定义了一些通用的方法和属性,包括启动、关闭、发送请求等。
      • Seata 的客户端可以使用 Netty 作为底层通信框架,与服务端进行通信。
    • AbstractNettyRemotingServer:
      • 这也是一个抽象类,用于实现基于 Netty 的远程通信服务端。它定义了一些通用的方法和属性,包括启动、关闭、处理请求等
      • Seata 的服务端可以使用 Netty 作为底层通信框架,接收客户端的请求并进行相应的处理。
  3. AbstractNettyRemotingServer中,我们找到了请求的处理方法,源码如下:
/**
     * The type ServerHandler.
     */
    @ChannelHandler.Sharable
    class ServerHandler extends ChannelDuplexHandler {

        /**
         * Channel read.
         * 当从通道读取到消息时被调用,用于处理接收到的消息。
         * @param ctx the ctx
         * @param msg the msg
         * @throws Exception the exception
         */
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            processMessage(ctx, (RpcMessage) msg);
        }
		/**
		 * 当通道的可写状态发生改变时被调用。
		 */
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            synchronized (lock) {
                if (ctx.channel().isWritable()) {
                    lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        /**
         * Channel inactive.
         *当通道变为非活跃状态时被调用,表示通道已断开连接。
         * @param ctx the ctx
         * @throws Exception the exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            debugLog("inactive:{}", ctx);
            if (messageExecutor.isShutdown()) {
                return;
            }
            handleDisconnect(ctx);
            super.channelInactive(ctx);
        }
	    /**
         * 获取远程客户端的 IP 地址和端口信息。
         * 通过 ChannelManager 从通道中获取与之关联的 RpcContext 对象。
         * 如果获取到了有效的 RpcContext 对象,并且该对象的客户端角色不为空,说明该通道是用于与客户端进行通信的,需要释放相应资源。
         * 释放 RpcContext 对象,即清理与通道相关的上下文信息。
         * 如果日志级别为 INFO,则记录日志,指示通道已变为非活跃状态。
         * 如果未获取到有效的 RpcContext 对象,说明该通道是未使用的,直接移除该通道。
         * 如果日志级别为 INFO,则记录日志,指示移除了未使用的通道。
         * @param ctx
         */
        private void handleDisconnect(ChannelHandlerContext ctx) {
            final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
            RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ipAndPort + " to server channel inactive.");
            }
            if (rpcContext != null && rpcContext.getClientRole() != null) {
                rpcContext.release();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
                }
            } else {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove unused channel:" + ctx.channel());
                }
            }
        }

        /**
         * Exception caught.
         *当处理过程中发生异常时被调用,用于捕获并处理异常情况。
         * @param ctx   the ctx
         * @param cause the cause
         * @throws Exception the exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            try {
                if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {
                    return;
                }
                LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel());
                super.exceptionCaught(ctx, cause);
            } finally {
                ChannelManager.releaseRpcContext(ctx.channel());
            }
        }

        /**
         * User event triggered.
         *当用户自定义事件被触发时被调用。
         * @param ctx the ctx
         * @param evt the evt
         * @throws Exception the exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                debugLog("idle:{}", evt);
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel:" + ctx.channel() + " read idle.");
                    }
                    handleDisconnect(ctx);
                    try {
                        closeChannelHandlerContext(ctx);
                    } catch (Exception e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            }
        }

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }

    }
  1. 查阅文档可知,channelRead的作用是当从通道读取到消息时被调用,用于处理接收到的消息。确定了,是从这里处理客户端发来的消息,跟进processMessage(ctx, (RpcMessage) msg);的源码AbstractNettyRemoting类中
  2. 简单解释一下这里的源码:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    //打印日志
    Object body = rpcMessage.getBody();
    // 检查消息体是否实现了MessageTypeAware接口,即具有消息类型
    if (body instanceof MessageTypeAware) {
        MessageTypeAware messageTypeAware = (MessageTypeAware) body;
        final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
        // 根据消息类型从processorTable中获取对应的处理器和执行器
        if (pair != null) {
            if (pair.getSecond() != null) {
                try {
                    // 在线程池中异步执行处理器逻辑
                    pair.getSecond().execute(() -> {
                        try {
                            // 调用处理器的process方法处理消息
                            pair.getFirst().process(ctx, rpcMessage);
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        } finally {
                            MDC.clear();
                        }
                    });
                } catch (RejectedExecutionException e) {   
                    //打印日志
                    // 当线程池已满时,尝试生成线程堆栈信息
                }
            } else {
                // 直接在当前线程中执行处理器逻辑
            }
        } else {
            // 没有找到对应的处理器,记录错误日志
        }
    } else {
        // 消息体不是MessageTypeAware类型,记录错误日志
    }
}

  1. 跟进process的实现类,会发现一个RegRmProcesser类,大胆猜测在这里处理注册
  2. 跟进到onRegRmMessage方法,会看到ChannelManager.registerRMChannel(message, ctx.channel());,确定了,就是在这里注册,源码如下

image-20230611143900530

  1. 跟进registerRMChannel方法,源码如下:

image-20230611143712884

此时,RM已经注册完成,TM注册流程除注册细节实现不一致,其它流程基本一致,读者可自行查看

2. 全局事务拦截处理器执行主要流程

  1. 利用postman发送请求
  2. io.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler会拦截到方法并执行doInvoke方法

image-20230612193443223

  1. 如果开启了全局事务,则会进入这里处理全局事务逻辑

image-20230612193609622

  1. 跟进,会在transactionalTemplate.execute(new TransactionalExecutor(){...}方法中执行

image-20230612193754791

  1. io.seata.tm.api.TransactionalTemplate#execute做了很多事情,下列列出几个

    • 处理事务传播行为,如果没有配置,默认为SUPPORTS

    image-20230612193959117

    • 开启全局事务并执行业务逻辑,如果发生异常,则回滚

    image-20230612194057171

    • finally块中,做一些清理

    image-20230612194123936

  2. 在这里,与TC沟通,开启全局事务,下面的第三点

image-20230612215620583

  1. 执行业务逻辑,会被PrepareStatementProxy#execute方法拦截到,做一些具体处理,并作分支注册

image-20230612215939304

  1. 处理异常,需要回滚则回滚

image-20230612220013992

  1. 提交事务

image-20230612220021335

3. 如何绑定全局事务并返回xid

中间步骤省略,同第1步2-5

  1. 继续跟进到处理消息的方法ServerOnRequestProcessor(根据名称猜测这个最有可能是处理请求的实现)

  2. 跟进onRequestMessage(ctx, rpcMessage);继续跟进到ServerOnRequestProcessor类的onRequestMessage方法,源码如下:

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
    Object message = rpcMessage.getBody();
    RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());

    // 检查消息类型,如果不是AbstractMessage类型,则记录错误日志并返回
    if (!(message instanceof AbstractMessage)) {
        //...
    }

    // 处理批量发送的请求消息(MergedWarpMessage)
    if (message instanceof MergedWarpMessage) {
        // 检查是否开启了 TCServer 批量发送响应功能,并且客户端版本号大于等于 1.5.0
        if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
                && Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
            List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
            List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;

            // 遍历每个消息,并根据 PARALLEL_REQUEST_HANDLE 标志决定是否并行处理
            for (int i = 0; i < msgs.size(); i++) {
                AbstractMessage msg = msgs.get(i);
                int msgId = msgIds.get(i);

                if (PARALLEL_REQUEST_HANDLE) {
                    // 并行处理
                    CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
                } else {
                    // 串行处理
                    handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
                }
            }
        } else {
            List<AbstractResultMessage> results = new ArrayList<>();
            List<CompletableFuture<AbstractResultMessage>> completableFutures = null;

            // 遍历每个消息,并根据 PARALLEL_REQUEST_HANDLE 标志决定是否并行处理
            for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
                if (PARALLEL_REQUEST_HANDLE) {
                    // 并行处理
                    if (completableFutures == null) {
                        completableFutures = new ArrayList<>();
                    }
                    int finalI = i;
                    completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
                            ((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
                } else {
                    // 串行处理
                    results.add(i, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
                }
            }

            if (CollectionUtils.isNotEmpty(completableFutures)) {
                // 等待并行处理的结果完成
                try {
                    for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
                        results.add(completableFuture.get());
                    }
                } catch (InterruptedException | ExecutionException e) {
                    LOGGER.error("handle request error: {}", e.getMessage(), e);
                }
            }

            // 构造 MergeResultMessage,将处理结果发送给客户端
            MergeResultMessage resultMessage = new MergeResultMessage();
            resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
            remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
        }
    } else {
        // 处理单个发送的请求消息
        final AbstractMessage msg = (AbstractMessage) message;

        if (LOGGER.isInfoEnabled()) {
            // 记录请求消息的日志
            String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message,
                    NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
        }

        // 调用事务消息处理器处理请求消息,并获取处理结果
        AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);

        // 将处理结果发送给客户端
        remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);

        if (LOGGER.isInfoEnabled()) {
            // 记录处理结果的日志
            String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result,
                    NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
            BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
        }
    }
}
  1. 应该会在completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage( ((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));这里串行处理消息,继续跟进到ServerOnRequestProcessor.java类的handleRequestsByMergedWarpMessage方法,看到上面的doc注释,放心了,果然是用来处理rpc请求的
  2. 继续跟进AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(subMessage, rpcContext);DefaultCoordinator.java类的onRequest,因为该方法的接口doc文档中写了On a request received.且是服务模块下的
  3. return transactionRequest.handle(context);跟进到GlobalBeginRequest类中的handle,再跟进到AbstractTCInboundHandler.javahandle
  4. 到这里,终于看见开启全局事务的希望了,doGlobalBegin(request, response, rpcContext);

image-20230611143401603

  1. 继续跟进core.begin

image-20230611143420242

  1. 此时,已经得到了xid了并开启全局事务了,那么,server是什么时候把xid返回给客户端的呢,请关注第3点的这几行
// 构造 MergeResultMessage,将处理结果发送给客户端
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
//在这里,将结果传递会TM
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
  1. 到这里,或许就有小伙伴有疑惑了,seata-server是什么时候将xid存储到rootcontext中的呢,代码中没有看见呀,其实是有的,在这里,我先给大家介绍一个知识点(见附),所以说,在第7步的MDC.put(RootContext.MDC_KEY_XID, session.getXid());行中就已经把xid存储到了RootContext中.

附:

MDC(Mapped Diagnostic Context)

MDC是日志框架(如log4j、logback)中的一个重要组件,用于在多线程环境下将上下文信息与日志事件相关联。MDC通过一个Map结构来存储和获取线程的上下文信息,这些信息可以被日志框架捕获并输出到日志中。MDC提供了一种方便的方式来跟踪、记录和区分不同线程中的日志信息。

XID(Transaction ID)

XID是Seata中的一个重要概念,代表分布式事务的全局唯一标识。在分布式事务中,每个参与者都需要使用相同的XID来关联和协调各自的事务操作。XID通常由一串字符串表示,可以是全局唯一的ID,例如UUID。

RootContext

RootContext是Seata中的一个工具类,它提供了操作当前线程上下文信息的静态方法。RootContext主要用于存储和获取当前线程的XID。通过RootContext,可以在不同的代码模块中方便地访问和传递XID,以便进行分布式事务的处理。

它们之间的关系

RootContext内部通过封装MDC的操作来实现XID的存储和获取。它定义了一个常量MDC_KEY_XID,用于指定在MDC中存储XID的键名。通过调用MDC.put(RootContext.MDC_KEY_XID, xid)方法,可以将XID存储到MDC中,并使用MDC_KEY_XID作为键名。然后,通过调用RootContext.getXID()方法,可以从MDC中获取当前线程的XID。

通过将XID存储到MDC中,并通过RootContext提供的方法获取,可以方便地在分布式事务处理中使用和传递XID。

综上所述,MDC是一个日志框架中的组件,用于在多线程环境下存储和获取上下文信息。XID是Seata中用于表示分布式事务全局唯一标识的概念。RootContext是Seata提供的工具类,通过封装MDC的操作,提供了方便的方式来存储和获取当前线程的XID。

4. 执行业务sql

首先介绍两个代理对象的概念:

StatementProxy

  • 当执行的 SQL 语句是普通的、不带占位符的语句时,可以使用 StatementProxy 进行执行。
  • 例如,执行的 SQL 语句是 SELECT * FROM table_name,没有占位符需要填充。

PreparedStatementProxy

  • 当执行的 SQL 语句是预编译语句,需要通过填充占位符来执行时,可以使用 PreparedStatementProxy 进行执行。
  • 例如,执行的 SQL 语句是 SELECT * FROM table_name WHERE column_name = ?,需要填充占位符的值。

区别:

  • StatementProxy 是用于执行普通的 SQL 语句,不进行预处理或编译,直接将完整的 SQL 语句发送到数据库执行。
  • PreparedStatementProxy 是用于执行预编译的 SQL 语句,通过占位符来表示参数的位置,可以多次执行并填充不同的参数值。
image-20230612220706669
  1. 这是Seata中的3个类,在执行业务sql时,PrepareStatementProxy会拦截,并到PrepareStatementProxy#execute执行image-20230612221109818

  2. 跟进image-20230612221142688

    1. 再继续跟进会进入io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy<S>, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object...)方法中

      1. 在这个方法中,会对需要执行的sql语句进行识别,并确定适合的执行器(根据数据库类型和进行的操作(如插入,删除))
      2. 确定执行器后,使用执行器来执行sql
      3. 源码如下:
          public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                           StatementProxy<S> statementProxy,
                                                           StatementCallback<T, S> statementCallback,
                                                           Object... args) throws SQLException {
              // 检查是否需要全局锁以及分支类型是否为AT
              if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
                  // 如果不需要全局锁且分支类型不是AT,则直接执行语句
                  return statementCallback.execute(statementProxy.getTargetStatement(), args);
              }
      
              // 从连接中获取数据库类型
              String dbType = statementProxy.getConnectionProxy().getDbType();
      
              // 检查是否提供了SQL识别器,否则使用SQLVisitorFactory创建识别器
              if (CollectionUtils.isEmpty(sqlRecognizers)) {
                  sqlRecognizers = SQLVisitorFactory.get(
                          statementProxy.getTargetSQL(),
                          dbType);
              }
      
              Executor<T> executor;
      
              // 根据SQL类型确定适当的执行器
              if (CollectionUtils.isEmpty(sqlRecognizers)) {
                  // 如果找不到SQL识别器,则使用PlainExecutor
                  executor = new PlainExecutor<>(statementProxy, statementCallback);
              } else {
                  if (sqlRecognizers.size() == 1) {
                      SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                      switch (sqlRecognizer.getSQLType()) {
                          case INSERT:
                              // 对于INSERT语句,使用InsertExecutor
                              executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                          new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                          new Object[]{statementProxy, statementCallback, sqlRecognizer});
                              break;
                          case UPDATE:
                              // 对于UPDATE语句,使用UpdateExecutor
                              if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                                  executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              } else {
                                  executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              }
                              break;
                          case DELETE:
                              // 对于DELETE语句,使用DeleteExecutor
                              if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                                  executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              } else {
                                  executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              }
                              break;
                          case SELECT_FOR_UPDATE:
                              // 对于SELECT FOR UPDATE语句,使用SelectForUpdateExecutor
                              if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                                  executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              } else {
                                  executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                              }
                              break;
                          case INSERT_ON_DUPLICATE_UPDATE:
                              // 根据数据库类型选择相应的执行器来处理INSERT ON DUPLICATE UPDATE语句
                              switch (dbType) {
                                  case JdbcConstants.MYSQL:
                                  case JdbcConstants.MARIADB:
                                      executor =
                                          new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                      break;
                                  default:
                                      throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                              }
                              break;
                          case UPDATE_JOIN:
                              // 对于MySQL中的UPDATE JOIN语句,使用MySQLUpdateJoinExecutor
                              switch (dbType) {
                                  case JdbcConstants.MYSQL:
                                      executor = new MySQLUpdateJoinExecutor<>(statementProxy,statementCallback,sqlRecognizer);
                                      break;
                                  default:
                                      throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
                              }
                              break;
                          default:
                              executor = new PlainExecutor<>(statementProxy, statementCallback);
                              break;
                      }
                  } else {
                      executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                  }
              }
      
              T rs;
              try {
                  rs = executor.execute(args);
              } catch (Throwable ex) {
                  if (!(ex instanceof SQLException)) {
                      // 将其他异常转换为SQLException
                      ex = new SQLException(ex);
                  }
                  throw (SQLException) ex;
              }
      
              return rs;
          }
      
    2. 在本例中,采用的时mysql的insert语句,所以我在源码中对insert的执行器加载进行了解释,有兴趣的同学可以去查看

  3. 跟进执行器执行的io.seata.rm.datasource.exec.BaseTransactionalExecutor#execute方法

image-20230612221915799

  1. 继续跟进执行具体业务的doExecute方法

image-20230612221950091

  1. 默认情况下,自动提交是开启的,但是我们需要将自动提交关闭再执行,这是为了保证通过在自动提交关闭的状态下执行业务 SQL,可以将多个 SQL 操作放在一个事务中,当所有操作都执行成功后,再通过手动提交事务来确保数据的一致性。如果其中任何一个操作失败,可以回滚整个事务,避免对数据造成部分修改。
  2. 我们先进executeAutoCommitTrue方法,源码如下:

image-20230612222241618

发现在这里还是通过executeAutoCommitFalse来执行业务sql

image-20230612223001039

  1. 获取执行前快照和执行后快照代码中有少量注释,这里就步解释了,可以看到,在这里执行了业务sql并存储了执行前后快照,接下来,执行提交

image-20230612223531178

  1. 跟进

image-20230612225554178

  1. 继续跟进

image-20230612225616528

  1. 继续跟进

image-20230612225813113

image-20230612225919888

分支注册流程如下第五步

在注册后,刷新undolog到数据库

5. 分支注册

在Seata中,Branch ID(分支ID)用于唯一标识一个分支事务。每个分支事务都有一个唯一的Branch ID。XID(事务ID)用于唯一标识一个全局事务,而Branch ID则用于标识全局事务下的分支事务。

分支事务是全局事务下的子事务,用于表示在分布式环境中的具体操作或资源。例如,一个全局事务可能涉及多个数据库操作,每个数据库操作都可以作为一个分支事务。分支事务在全局事务的协调下进行提交或回滚。

Branch ID的作用如下:

  1. 唯一标识分支事务:每个分支事务都有一个唯一的Branch ID,用于在全局事务中进行标识和关联。
  2. 事务状态管理:Seata使用Branch ID来管理分支事务的状态。每个分支事务都有自己的状态,例如已提交、已回滚或待提交等。
  3. 协调操作:在全局事务提交或回滚时,需要通过Branch ID来协调和执行相应的操作。全局事务的提交或回滚会涉及到所有分支事务的提交或回滚。
  1. 同绑定全局事务2-5
  2. DefaultCoordinatoronRequest中,继续跟进return transactionRequest.handle(context);,选择实现类BranchRegisterRequest.javahandle方法,继续跟进return handler.handle(this, rpcContext);
  3. 到了AbstractTCInboundHandler.java类中,看源码:

image-20230611143457919

  1. DefaultCoordinator.javadoBranchRegister方法,看来DefaultCoordinator.java这个类还是蛮重要滴

image-20230611143544374

  1. 继续跟进到io.seata.server.coordinator.DefaultCore#branchRegister方法,没啥内容,继续跟进到io.seata.server.coordinator.AbstractCore#branchRegister方法

image-20230611143605738

这就设计到AT模式的核心了,给分支事务加全局锁和本地锁(概念见下附),详见文档

  • 全局锁:确保该分支执行过程中不会受到其它分支干扰
  • 本地锁,确保分支事务在本地环境中的原子性和隔离性。

附:

  1. 全局锁:全局锁是用于保护全局事务的执行过程,确保全局事务的各个分支在执行过程中不会受到并发干扰。在分布式事务中,全局锁通常是由事务协调器(如 Seata 的 TC)管理和控制的。全局锁的作用是在分布式环境中协调各个分支事务的提交或回滚,保证全局事务的一致性。
  2. 本地锁:本地锁是用于保护单个分支事务的执行过程,确保分支事务在本地环境中的原子性和隔离性。在 AT 模式中,每个参与分布式事务的服务都会有自己的本地锁,用于保护对本地资源的访问。本地锁的作用是在分支事务内部控制本地资源的并发访问,防止并发冲突和数据不一致。

至此,分支事务注册完成,返回branchid也与xid返回类似,请读者自行查看

6. 全局提交

在第一个分支注册完成后,客户端会接着执行业务代码发现有远程调用,则去另一个微服务从犯执行业务sql和分支注册的流程,在执行完成后,提交分支事务并记录undolog,如果发生异常,则回滚,至此,业务逻辑完成,进行全局提交

image-20230612230542660

由于博主时机紧迫,这里就简单讲述一下接下来的流程,不再贴出源码了

  1. TM(GlobalTransactionRole.launcher发起GlobalCommitRequest请求,返回GlobalStatus,提交全局事务,其中有异常重试机制,默认5次
  2. TC接受到请求,在io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit执行提交
  3. 跟进到io.seata.server.coordinator.DefaultCore#commit,在这里,会去查全局锁的表global_table并获取对应的GlobalSession
@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 查找全局会话
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        // 如果全局会话不存在,则表示事务已完成
        return GlobalStatus.Finished;
    }

    if (globalSession.isTimeout()) {
        // 如果全局会话已超时,则返回超时回滚状态
        LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());
        return GlobalStatus.TimeoutRollbacking;
    }

    // 加锁并执行状态变更操作
    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        boolean shouldCommitNow = false;
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            // 首先关闭全局会话,之后不允许再注册分支事务
            globalSession.close();
            if (globalSession.canBeCommittedAsync()) {
                // 如果可以异步提交,则进行异步提交
                globalSession.asyncCommit();
                MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
            } else {
                // 修改全局事务状态为 Committing,表示需要立即提交
                globalSession.changeGlobalStatus(GlobalStatus.Committing);
                shouldCommitNow = true;
            }
            // 在成功修改状态后清理全局会话
            globalSession.clean();
        }
        return shouldCommitNow;
    });

    if (shouldCommit) {
        // 执行全局提交操作
        boolean success = doGlobalCommit(globalSession, false);
        // 如果成功并且剩余的分支事务可以异步提交,则进行异步提交
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            // 返回当前全局事务状态
            return globalSession.getStatus();
        }
    } else {
        // 如果状态为 AsyncCommitting,则表示已提交;否则返回当前全局事务状态
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}
  1. 上面代码块中,其实做了很多事情,我接着讲,首先进行异步提交,我们跟进globalSession.asyncCommit();
  2. 跟进到io.seata.server.storage.db.session.DataBaseSessionManager#updateGlobalSessionStatus,继续跟进到io.seata.server.storage.db.session.DataBaseSessionManager#updateGlobalSessionStatus ,这里会将global_table的status设为1
  3. 此时io.seata.server.coordinator.DefaultCoordinator#handleAsyncCommitting会处理AsyncCommiting任务,它会获取GlobalSession列表,如果GlobalStatus为AsyncCommiting,则先遍历BranchSession列表,提交分支事务,删除undolog释放全局锁再删除全局事务信息,删除global_table的信息
  4. 自此,基本流程走完了

由于时间比较赶,博主难免有很多疏漏之处,欢迎大家指出,另外,这里的异常处理已经回滚机制没有展现,大家有兴趣的同学可以自己去看看源码,参考这张图