项目实战
mall项目(SpringBoot项目)
1. 添加Swagger-UI配置,修改MyBatis Generator注释的生成规则
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_02.html#给pmsbrandcontroller添加swagger注解
2. redis基础配置
获取验证码及检验验证码是否过期
3. SpringSecurity和JWT及HuTool使用
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_04.html#在pom-xml中添加项目依赖
4. SpringTask实现定时任务
SpringTask是Spring自主研发的轻量级定时任务工具,相比于Quartz更加简单方便,且不需要引入其他依赖即可使用。该例以批量修改超时订单为例
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_06.html#cron表达式
5. 整合Elasticsearch实现商品搜索
本文主要讲解mall整合Elasticsearch的过程,以实现商品信息在Elasticsearch中的导入、查询、修改、删除为例。
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_07.html#推荐资料
6. MongoDB的使用
本文主要讲解mall整合Mongodb的过程,以实现商品浏览记录在Mongodb中的添加、删除、查询为例。
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_08.html#项目使用框架介绍
7. RabbitMQ实现延迟消息
本文主要讲解mall整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例。
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_09.html#项目使用框架介绍
8. springboot项目整合OSS实现文件上传
本文主要讲解mall整合OSS实现文件上传的过程,采用的是服务端签名后前端直传的方式。
文档地址:https://www.macrozheng.com/mall/architect/mall_arch_10.html#minio
JiuYouのBlog项目
1. 自定义注解
文档地址:https://blog.csdn.net/qq_62656514/article/details/129645806?spm=1001.2014.3001.5501
2. 对某些 API 进行切面处理的情况,比如日志记录、权限验证等。
文档地址:https://blog.csdn.net/qq_62656514/article/details/129645806?spm=1001.2014.3001.5501
源码学习
spring源码
1. AOP
AOP(Aspect-Oriented Programming)即面向切面编程,是一种编程范式,用于将横切关注点(例如日志记录、安全性、事务管理等)与应用程序的业务逻辑分离开来,以提高代码的可重用性和可维护性。
1. 在Spring中,AOP有以下三种实现方式:
- 基于JDK动态代理的AOP实现:针对实现了接口的类,Spring使用JDK动态代理生成代理对象,并通过代理对象拦截方法调用,实现AOP功能。
- 基于CGLIB的AOP实现:针对没有实现接口的类,Spring使用CGLIB动态字节码技术生成代理对象,并通过代理对象拦截方法调用,实现AOP功能。相对于JDK动态代理,CGLIB在生成代理对象时更加灵活,但是性能方面会稍微差一些。
- 基于AspectJ的AOP实现:AspectJ是一个独立的AOP框架,Spring可以通过与AspectJ的集成来实现AOP功能。AspectJ支持更加丰富的切点表达式和切面类型,可以实现更加复杂的AOP功能。
代理的解释:
在 Spring AOP 中,代理是用来增强目标对象的行为的。代理对象会拦截目标对象的方法调用,并在方法调用前、后或抛出异常时执行一些特定的逻辑,例如记录日志、安全检查、性能监控等。代理对象可以在不修改目标对象源代码的情况下实现这些增强行为,从而提高代码的可维护性和灵活性。
Spring AOP 中采用的代理模式是基于接口的 JDK 动态代理和基于类的 CGLIB 代理。当目标对象实现了至少一个接口时,Spring AOP 将使用 JDK 动态代理来创建代理对象;否则,Spring AOP 将使用 CGLIB 代理来创建代理对象。
需要注意的是,代理对象并不是目标对象的真正子类,而是通过动态生成的字节码来创建的新的对象。因此,在使用代理时,需要注意代理对象不会继承目标对象的非 public 方法和 final 方法的行为。
2. 在一个项目中,使用AOP可以提高代码的可重用性和可维护性,下面是使用AOP的一般步骤:
- 定义切面类:切面类是一个包含切点和增强逻辑的类。在切面类中,需要定义切点,即需要拦截的目标方法,并在切点周围织入增强逻辑。
- 配置AOP:在Spring配置文件中,需要配置AOP相关的内容,包括切面类、切点、增强类型等。
- 定义目标类:在项目中定义需要增强的目标类,即需要被切面类拦截的类。
- 配置目标类:在Spring配置文件中,需要配置目标类的相关内容,包括类名、属性、方法等。
- 使用AOP:在项目中使用AOP,即调用目标类的方法时,切面类会拦截方法调用,并在方法执行前、执行后或执行过程中织入增强逻辑。
使用AOP可以提高代码的可重用性和可维护性,但需要注意以下几点:
- 尽量减少切面的数量:过多的切面会增加代码的复杂度和维护成本,影响系统的性能。
- 定义合适的切点:合适的切点可以减少不必要的拦截和增强,提高系统的性能。
- 避免使用过多的环绕通知:过多的环绕通知会影响系统的性能,应该尽量避免使用。
- 注意切面和目标类的依赖关系:切面和目标类之间的依赖关系可能会影响代码的可维护性和可测试性,需要注意处理。
3. AOP能够增强某些方法,这里的增强指的是什么?
在AOP中,代理可以增强某些方法,具体指的是在目标方法执行前、执行后或执行过程中,通过织入切面逻辑来实现对目标方法的增强。
代理的增强通常包括以下几个方面:
- 日志记录:记录方法的参数、返回值、执行时间等信息,以便后续的调试和排查问题。
- 安全性控制:对方法进行权限验证,确保只有具有特定权限的用户才能调用该方法。
- 缓存控制:在方法执行前检查缓存中是否有目标数据,如果有,则直接返回缓存中的数据,避免重复执行方法。
- 事务管理:在方法执行前开启事务,在方法执行后根据执行结果决定是否提交或回滚事务。
- 性能监控:记录方法执行的时间、调用次数等信息,以便分析系统的性能瓶颈并进行优化。
通过代理增强方法,可以将这些横切关注点从业务逻辑中分离出来,避免代码的重复和耦合,提高代码的可重用性和可维护性。
在模拟时加深了对接口的理解
使用接口的好处主要有以下几个方面:
- 实现多态:使用接口可以实现多态,使得程序更加灵活和可扩展。
- 抽象出公共行为:接口可以抽象出公共行为,将其定义在接口中,在不同的实现类中实现这些行为,从而提高代码的可重用性和可维护性。
- 解耦合:使用接口可以将程序的各个组成部分解耦合,使得程序更加模块化和易于维护。
举例:
- 定义一个接口Interface Test1,存在一个invoke方法
- 在另外一个类Test2中调用这里的invoke()方法(在Test2中创建一个Test1的成员变量并编写构造方法)
- 在main中创建一个Test2对象,并通过Test2的构造方法,重写其中的invoke()方法来实现程序的灵活性
//1.
interface InvocationHandler {
void invoke();
}
//2.
public class $Proxy0 implements A12.Foo {
private InvocationHandler h;
public $Proxy0(InvocationHandler h) {
this.h = h;
}
@Override
public void foo() {
h.invoke();
}
}
//3.
public static void main(String[] args) {
new $Proxy0(new InvocationHandler() {
@Override
public void invoke() {
System.out.println("需要实现的其它功能");
System.out.println("foo");
}
}).foo();
}
2. SpringMvc
1. 如何通过配置文件批量读取配置
- 新建
xxxxProperties类,加上注解@ConfigurationProperties(prefix="spring.mvc",这里是以WebMvcProperties为例,在其中添加想要读取的变量,与配置文件的名称相互对应,这样就可以通过WebMvcProperties来批量读取配置.
2. 不直接返回Response对象而是通过配置返回Response对象
背景:
在crud中,我们经常需要在控制器Controller中返回固定的json格式响应体如:
{ code:"", msg:"", data:{} }一般来说,我们是直接在控制器方法中返回自定义的Response类来实现的,那么,如何直接返回对象而把对象通过配置直接包装到data中呢?
利用@ControllerAdvice来实现
@ControllerAdvice
static class MyControllerAdvice implements ResponseBodyAdvice<Object> {
// 满足条件才转换
public boolean supports(MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
if (returnType.getMethodAnnotation(ResponseBody.class) != null ||
AnnotationUtils.findAnnotation(returnType.getContainingClass(), ResponseBody.class) != null) {
// returnType.getContainingClass().isAnnotationPresent(ResponseBody.class)) {
return true;
}
return false;
}
// 将 User 或其它类型统一为 Result 类型
public Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
if (body instanceof Result) {
return body;
}
return Result.ok(bod y);
}
}
而此方法而可以用作将一些其它类转化为某一些特定的类.
3.DispatcherServlet的作用
DispatcherServlet是Spring框架中的一个重要组件,它是Web应用程序中的前置控制器(Front Controller),主要作用是将客户端的请求分发给相应的处理器(Controller)进行处理,以及将处理器的处理结果分发给视图(View)进行渲染。
具体来说,当客户端发起一个请求时,DispatcherServlet会拦截该请求,根据请求的URL和HTTP方法,将其分发给对应的HandlerMapping找到对应的HandlerAdapter,并调用HandlerAdapter的方法来执行相应的处理器方法。处理器方法会根据请求参数、路径变量等信息进行处理,并返回一个ModelAndView对象,其中包含了需要呈现的模型数据以及对应的视图名。DispatcherServlet再将这个ModelAndView对象传递给对应的ViewResolver,进行视图解析和渲染,最终将结果发送给客户端。
除此之外,DispatcherServlet还提供了一些其他的功能,例如:
- 支持文件上传和下载
- 支持国际化
- 支持Flash属性- 支持异步请求和响应
- 支持Spring Security等安全框架的集成
4.Tomcat异常处理
-
我们知道 @ExceptionHandler 只能处理发生在 mvc 流程中的异常,例如控制器内、拦截器内,那么如果是 Filter 出现了异常,如何进行处理呢?
-
在 Spring Boot 中,是这么实现的:
- 因为内嵌了 Tomcat 容器,因此可以配置 Tomcat 的错误页面,Filter 与 错误页面之间是通过请求转发跳转的,可以在这里做手脚
- 先通过 ErrorPageRegistrarBeanPostProcessor 这个后处理器配置错误页面地址,默认为
/error也可以通过${server.error.path}进行配置 - 当 Filter 发生异常时,不会走 Spring 流程,但会走 Tomcat 的错误处理,于是就希望转发至
/error这个地址- 当然,如果没有 @ExceptionHandler,那么最终也会走到 Tomcat 的错误处理
- Spring Boot 又提供了一个 BasicErrorController,它就是一个标准 @Controller,@RequestMapping 配置为
/error,所以处理异常的职责就又回到了 Spring - 异常信息由于会被 Tomcat 放入 request 作用域,因此 BasicErrorController 里也能获取到
- 具体异常信息会由 DefaultErrorAttributes 封装好
- BasicErrorController 通过 Accept 头判断需要生成哪种 MediaType 的响应
- 如果要的不是 text/html,走 MessageConverter 流程
- 如果需要 text/html,走 mvc 流程,此时又分两种情况
- 配置了 ErrorViewResolver,根据状态码去找 View
- 没配置或没找到,用 BeanNameViewResolver 根据一个固定为 error 的名字找到 View,即所谓的 WhitelabelErrorView
评价
- 一个错误处理搞得这么复杂,就问恶心不?
演示1 - 错误页处理
关键代码
@Bean // ⬅️修改了 Tomcat 服务器默认错误地址, 出错时使用请求转发方式跳转
public ErrorPageRegistrar errorPageRegistrar() {
return webServerFactory -> webServerFactory.addErrorPages(new ErrorPage("/error"));
}
@Bean // ⬅️TomcatServletWebServerFactory 初始化前用它增强, 注册所有 ErrorPageRegistrar
public ErrorPageRegistrarBeanPostProcessor errorPageRegistrarBeanPostProcessor() {
return new ErrorPageRegistrarBeanPostProcessor();
}
收获?
- Tomcat 的错误页处理手段
演示2 - BasicErrorController
关键代码
@Bean // ⬅️ErrorProperties 封装环境键值, ErrorAttributes 控制有哪些错误信息
public BasicErrorController basicErrorController() {
ErrorProperties errorProperties = new ErrorProperties();
errorProperties.setIncludeException(true);
return new BasicErrorController(new DefaultErrorAttributes(), errorProperties);
}
@Bean // ⬅️名称为 error 的视图, 作为 BasicErrorController 的 text/html 响应结果
public View error() {
return new View() {
@Override
public void render(
Map<String, ?> model,
HttpServletRequest request,
HttpServletResponse response
) throws Exception {
System.out.println(model);
response.setContentType("text/html;charset=utf-8");
response.getWriter().print("""
<h3>服务器内部错误</h3>
""");
}
};
}
@Bean // ⬅️收集容器中所有 View 对象, bean 的名字作为视图名
public ViewResolver viewResolver() {
return new BeanNameViewResolver();
}
收获?
- Spring Boot 中 BasicErrorController 如何工作
5. SpringBoot内嵌Tomcat服务器
Server
└───Service
├───Connector (协议, 端口)
└───Engine
└───Host(虚拟主机 localhost)
├───Context1 (应用1, 可以设置虚拟路径, / 即 url 起始路径; 项目磁盘路径, 即 docBase )
│ │ index.html
│ └───WEB-INF
│ │ web.xml (servlet, filter, listener) 3.0
│ ├───classes (servlet, controller, service ...)
│ ├───jsp
│ └───lib (第三方 jar 包)
└───Context2 (应用2)
│ index.html
└───WEB-INF
web.xml
6. 自定义注解使用
详见spring源码A42_2
例如,需要实现一个自定义注解,传入类名和一个布尔值,判断是否存在该bean
- 定义注解
@Retention(RetentionPolicy.RUNTIME)//在运行时
@Target({ElementType.METHOD, ElementType.TYPE})//前者指定可以在方法上使用,后者则是可以在类上使用
@Conditional(MyCondition.class)//表示被标注的元素是否存在依赖于 MyCondition 类的判断条件
@interface ConditionalOnClass {
boolean exists(); // true 判断存在 false 判断不存在
String className(); // 要判断的类名
}
@Conditional(MyCondition.class)这个注解表示被标注的元素是否存在依赖于MyCondition类的判断条件。它用于根据某些条件有条件地启用或禁用被注解的元素。当
MyCondition类的matches()方法返回true时,被标注的元素将被启用,而当它返回false时,元素将被禁用。在
@ConditionalOnClass的情况下,它用于根据类路径中是否存在指定的类来有条件地启用或禁用类或方法。如果exists属性设置为true,则表示只有当类路径中存在指定的类时,被注解的类或方法才会被启用。如果它设置为false,则表示只有当类路径中不存在指定的类时,被注解的类或方法才会被启用。className属性用于指定要检查存在性的类的完全限定名称。
- 实现MyCondition.class
static class MyCondition implements Condition { // 存在 Druid 依赖
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Map<String, Object> attributes = metadata.getAnnotationAttributes(ConditionalOnClass.class.getName());
String className = attributes.get("className").toString();
boolean exists = (boolean) attributes.get("exists");
boolean present = ClassUtils.isPresent(className, null);
return exists ? present : !present;
}
}
Seata源码
建议先看完官方文档的用户文档及开发者指南部分
各事务模式(AT,TCC,SAGA,XA)
具体介绍详见官方文档
领域模型(TM,RM,TC)
具体介绍详见官方文档
Metrics设计
具体介绍详见官方文档
Manual Transaction 模式
一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
- 一阶段 prepare 行为
- 二阶段 commit 或 rollback 行为
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:
- 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
- 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
- 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,MT 模式,不依赖于底层数据资源的事务支持:
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 MT 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
[1].https://seata.io/zh-cn/blog/manual-transaction-mode.html
Seata是如何实现领域模型的
本文主要内容为Seata代码的理解
项目地址
这里的代码可能与最新版本的Seata有些区别,博主也借鉴了一些其它人的博客
Seata的TXC模型

TXC的实现通过三个组件来完成。也就是上图的三个深黄色部分,其作用如下:
- TM:全局事务管理器,在标注开启fescar分布式事务的服务端开启,并将全局事务发送到TC事务控制端管理
- TC:事务控制中心,控制全局事务的提交或者回滚。这个组件需要独立部署维护,目前只支持单机版本,后续迭代计划会有集群版本
- RM:资源管理器,主要负责分支事务的上报,本地事务的管理
实现过程:
- 服务起始方发起全局事务并注册到TC。
- 在调用协同服务时,协同服务的事务分支事务会先完成阶段一的事务提交或回滚,并生成事务回滚的undo_log日志,同时注册当前协同服务到TC并上报其事务状态,归并到同一个业务的全局事务中。
- 此时若没有问题继续下一个协同服务的调用,期间任何协同服务的分支事务回滚,都会通知到TC,TC在通知全局事务包含的所有已完成一阶段提交的分支事务回滚。
- 如果所有分支事务都正常,最后回到全局事务发起方时,也会通知到TC,TC在通知全局事务包含的所有分支删除回滚日志。
- 在这个过程中为了解决写隔离和度隔离的问题会涉及到TC管理的全局锁。
本博文的目标是深入代码细节,探究其基本思路是如何实现的。
有大佬画了一副流程图,我贴在这里:https://www.processon.com/view/link/6007f5c00791294a0e9b611a
项目结构解析
项目拉下来,用IDE打开后的目录结构如下,下面先大致的看下每个模块的实现

- all:这是一个聚合模块,它包含了所有其他模块的依赖,用于方便构建和部署整个Seata系统。
- bom:这是一个Bill of Materials(BOM)模块,它定义了Seata的版本和所需的所有依赖库的版本,以确保它们在构建和部署时保持一致。
- build:这个模块包含了Seata的构建脚本和相关的工具,用于构建Seata项目。
- common:这个模块包含了Seata中通用的工具类和功能,例如序列化、反序列化、日志处理,常用辅助类,静态变量、扩展机制类加载器、以及定义全局的异常等。
- compressor:这个模块实现了Seata中的数据压缩功能,用于在分布式事务中减少网络传输的数据量。
- config:这个模块提供了Seata的配置管理功能,包括读取和解析配置文件,以及提供配置信息的API。
- console:这个模块是Seata的控制台模块,提供了一个可视化的管理控制台,用于监控和管理Seata的分布式事务。
- core:核心模块主要封装了TM、RM和TC通讯用RPC相关内容。
- dependencies:这个模块定义了Seata的所有依赖库的版本和引用,用于管理和控制Seata的依赖关系。
- discovery:这个模块提供了服务发现和注册功能,用于在分布式环境中定位和连接Seata的各个组件。
- distribution:这个模块包含了Seata的发布版本,用于方便用户下载和使用。
- integration-tx-api:这个模块提供了与Spring等框架集成的相关API,用于在应用程序中使用Seata的分布式事务功能。
- metrics:这个模块实现了Seata的性能指标收集和监控功能,用于统计和展示Seata的性能数据。
- rm:这个模块是Seata的资源管理器(Resource Manager,RM)模块,负责管理和协调分布式事务中的资源。
- rm-datasource:这个模块提供了Seata与不同数据源的集成支持,例如MySQL、Oracle等数据库。
- saga:这个模块实现了Seata的Saga模式,用于处理长时间跨越多个服务的分布式事务。
- seata-spring-autoconfigure:这个模块提供了Seata在Spring Boot环境中自动配置的功能,简化了Seata的集成和配置过程。
- seata-spring-boot-starter:这个模块是Seata在Spring Boot应用中使用的起步依赖模块,方便用户快速集成和使用Seata。
- serializer:这个模块提供了Seata的序列化和反序列化功能,用于在分布式事务中对对象进行序列化和传输。
- server:这个模块是Seata的服务器模块,提供了Seata服务的启动、关闭和管理功能,同时维护全局锁。
- spring:这个模块提供了Seata在Spring框架中的集成支持,例如事务管理器、AOP切面等,是研究Seata的突破口。
- sqlparser:这个模块实现了Seata的SQL解析功能,用于解析和处理分布式事务中的SQL语句。
- tcc:这个模块实现了Seata的TCC(Try-Confirm-Cancel)模式,用于处理分布式事务中的两阶段提交逻辑。
- test:这个模块包含了Seata的测试代码和工具,用于进行单元测试和集成测试。
- tm:这个模块是Seata的事务管理器(Transaction Manager,TM)模块,负责管理和协调分布式事务的执行和提交,全局事务事务管理模块,管理全局事务的边界,全局事务开启回滚点都在这个模块控制。
- changes:这个模块包含了Seata的变更日志文件,用于记录版本之间的更新和变更。
- ext:这个模块是Seata的扩展模块,提供了一些额外的功能和扩展点,可以通过该模块进行自定义扩展。
- integration:这个模块包含了Seata与其他框架和组件的集成支持,例如Dubbo、RocketMQ等。
- script:这个模块包含了Seata的数据库脚本文件,用于创建和初始化Seata所需的数据库表结构。
- seata-plugin:这个模块是Seata的插件模块,用于支持Seata在不同平台和框架下的插件机制,方便扩展和定制。
- sessionStore:这个模块提供了Seata的会话存储功能,用于存储和管理分布式事务的会话信息。
- style:这个模块定义了Seata的代码风格和规范,包括代码格式化、命名规范等,用于保持整个项目的一致性和可读性。
以上介绍可能有误,欢迎指正补充
Seata事务过程分析
初始化
- RM、TM的初始化都是由类GlobalTransactionScanner触发的
- TM的初始化:
- 当Seata服务器(server)启动时,会初始化TM实例。TM负责管理和协调分布式事务的执行和提交。
- TM会向注册中心(Registry Center)注册自己的信息,以便TC可以找到并与其通信。
- TM还会启动一个定时任务,定期向TC发送心跳消息,以保持与TC的连接和状态同步。
- RM的初始化:
- 每个参与分布式事务的RM都需要在自己的应用中进行初始化。
- RM实例负责管理和操作本地资源,并与TM和TC进行通信。
- RM在初始化时会向注册中心注册自己的信息,以便TC可以找到并与其通信。
- RM还会订阅与自己相关的事务信息,以便接收TC发送的事务消息和指令。
Seata的事务分析在官网的用户文档的Api支持,微服务框架支持以及附录的事务隔离中解释的非常清楚,这里仅补充博主不太理解的部分,其中重点关注事务隔离,它将事务讲的非常清晰
1. RpcContext和RootContext异同
博主初次看见RpcContext和RootContext时觉得很迷,特此比较了一下它们的异同
RpcContext是在分布式事务框架Seata中用于封装和管理与远程服务通信的上下文信息的对象。它存储了与远程服务通信相关的各种属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等。RpcContext还提供了方法来持有和释放上下文信息,并在不同的通道和持有者之间进行关联。通过RpcContext,可以在分布式事务的执行过程中传递和共享上下文信息,以实现分布式事务的管理和协调。
RootContext是Seata分布式事务框架中的一个类,用于管理全局事务的上下文信息。它是一个线程级别的上下文,用于在分布式事务的不同参与者之间传递和共享全局事务ID和分支事务ID。RootContext提供了静态方法来设置和获取全局事务ID和分支事务ID,并通过ThreadLocal来保证在同一线程中的全局事务上下文的一致性。
区别:
- 功能不同:RpcContext主要用于管理与远程服务通信相关的上下文信息,而RootContext用于管理全局事务的上下文信息。
- 作用范围不同:RpcContext的作用范围是在远程服务通信过程中,用于传递和共享上下文信息;而RootContext的作用范围是在整个分布式事务执行过程中,用于传递和共享全局事务上下文信息。
- 存储内容不同:RpcContext存储与远程服务通信相关的属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等;而RootContext主要存储全局事务ID和分支事务ID。
- 使用方式不同:RpcContext通过实例对象进行操作,而RootContext是通过静态方法直接操作全局事务上下文。
其中:
RootContext:事务的根上下文:负责在应用的运行时,维护 XID 。
RpcContext:RpcContext的作用是在分布式事务框架Seata中,用于封装和管理与远程服务通信的上下文信息。它存储了与远程服务通信相关的各种属性和状态,如客户端角色、版本、应用程序ID、事务服务组、资源集合等。RpcContext还提供了方法来持有和释放上下文信息,并在不同的通道和持有者之间进行关联。通过RpcContext,可以在分布式事务的执行过程中传递和共享上下文信息,以实现分布式事务的管理和协调。
/* * RpcContext是Seata中用于在分布式环境中传递事务上下文信息的工具类。 * 它负责存储和传递各个参与者服务的事务上下文信息,并提供了一些方法来操作上下文信息。 * RpcContext对象与具体的服务实例和线程绑定,每个线程在执行业务逻辑时都可以访问和修改自己的RpcContext对象。 * RpcContext的作用主要有以下几点: * * 1. 存储上下文信息: * - applicationId:应用程序ID * - transactionServiceGroup:事务分组名称 * - clientId:客户端ID * - channel:与远程服务通信的Netty Channel对象 * - resourceSets:参与者服务涉及的资源集合 * * 2. 传递上下文信息: * - 通过holdInClientChannels()方法,将RpcContext存储在客户端的通道中,用于传递给事务管理器(TM) * - 通过holdInIdentifiedChannels()方法,将RpcContext存储在标识的通道中,用于传递给资源管理器(RM) * - 通过holdInResourceManagerChannels()方法,将RpcContext存储在资源管理器的通道中,用于传递给资源管理器(RM) * * 3. 释放上下文信息: * - 通过release()方法,释放RpcContext对象及其关联的资源,清空持有的上下文信息 * * 4. 操作上下文信息: * - 通过addResource()方法,向resourceSets中添加资源 * - 通过addResources()方法,向resourceSets中添加多个资源 * - 通过getPortMap()方法,获取指定资源ID对应的端口映射 * - 通过getClientRMHolderMap()方法,获取客户端资源管理器(RM)持有者的映射表 * * RpcContext在Seata的分布式事务中起到了传递上下文和协调服务之间的作用,确保分布式事务的一致性和可靠性。 */ public class RpcContext { private static final Logger LOGGER = LoggerFactory.getLogger(RpcContext.class); private NettyPoolKey.TransactionRole clientRole; // 客户端角色 private String version; // 版本 private String applicationId; // 应用程序ID private String transactionServiceGroup; // 事务分组名称 private String clientId; // 客户端ID private Channel channel; // 与远程服务通信的Netty Channel对象 private Set<String> resourceSets; // 参与者服务涉及的资源集合 private ConcurrentMap<Channel, RpcContext> clientIDHolderMap; // 客户端ID持有者的映射表 private ConcurrentMap<Integer, RpcContext> clientTMHolderMap; // 客户端事务管理器(TM)持有者的映射表 private ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientRMHolderMap; // 客户端资源管理器(RM)持有者的映射表 // 释放上下文信息 public void release() { Integer clientPort = ChannelUtil.getClientPortFromChannel(channel); if (clientIDHolderMap != null) { clientIDHolderMap = null; } if (clientRole == NettyPoolKey.TransactionRole.TMROLE && clientTMHolderMap != null) { clientTMHolderMap.remove(clientPort); clientTMHolderMap = null; } if (clientRole == NettyPoolKey.TransactionRole.RMROLE && clientRMHolderMap != null) { for (Map<Integer, RpcContext> portMap : clientRMHolderMap.values()) { portMap.remove(clientPort); } clientRMHolderMap = null; } if (resourceSets != null) { resourceSets.clear(); } } // 在客户端通道中存储RpcContext public void holdInClientChannels(ConcurrentMap<Integer, RpcContext> clientTMHolderMap) { if (this.clientTMHolderMap != null) { throw new IllegalStateException(); } this.clientTMHolderMap = clientTMHolderMap; Integer clientPort = ChannelUtil.getClientPortFromChannel(channel); this.clientTMHolderMap.put(clientPort, this); } // 在标识的通道中存储RpcContext public void holdInIdentifiedChannels(ConcurrentMap<Channel, RpcContext> clientIDHolderMap) { if (this.clientIDHolderMap != null) { throw new IllegalStateException(); } this.clientIDHolderMap = clientIDHolderMap; this.clientIDHolderMap.put(channel, this); } // 在资源管理器的通道中存储RpcContext public void holdInResourceManagerChannels(String resourceId, ConcurrentMap<Integer, RpcContext> portMap) { if (this.clientRMHolderMap == null) { this.clientRMHolderMap = new ConcurrentHashMap<>(); } Integer clientPort = ChannelUtil.getClientPortFromChannel(channel); portMap.put(clientPort, this); this.clientRMHolderMap.put(resourceId, portMap); } // 在资源管理器的通道中存储RpcContext public void holdInResourceManagerChannels(String resourceId, Integer clientPort) { if (this.clientRMHolderMap == null) { this.clientRMHolderMap = new ConcurrentHashMap<>(); } ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(clientRMHolderMap, resourceId, key -> new ConcurrentHashMap<>()); portMap.put(clientPort, this); } // 获取客户端资源管理器(RM)持有者的映射表 public ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> getClientRMHolderMap() { return clientRMHolderMap; } // 获取指定资源ID对应的端口映射 public Map<Integer, RpcContext> getPortMap(String resourceId) { return clientRMHolderMap.get(resourceId); } // 获取客户端ID public String getClientId() { return clientId; } // 获取通道 public Channel getChannel() { return channel; } // 设置通道 public void setChannel(Channel channel) { this.channel = channel; } // 获取应用程序ID public String getApplicationId() { return applicationId; } // 设置应用程序ID public void setApplicationId(String applicationId) { this.applicationId = applicationId; } // 获取事务服务组 public String getTransactionServiceGroup() { return transactionServiceGroup; } // 设置事务服务组 public void setTransactionServiceGroup(String transactionServiceGroup) { this.transactionServiceGroup = transactionServiceGroup; } // 获取客户端角色 public NettyPoolKey.TransactionRole getClientRole() { return clientRole; } // 设置客户端角色 public void setClientRole(NettyPoolKey.TransactionRole clientRole) { this.clientRole = clientRole; } // 获取版本 public String getVersion() { return version; } // 设置版本 public void setVersion(String version) { this.version = version; } // 获取参与者服务涉及的资源集合 public Set<String> getResourceSets() { return resourceSets; } // 设置参与者服务涉及的资源集合 public void setResourceSets(Set<String> resourceSets) { this.resourceSets = resourceSets; } // 添加资源 public void addResource(String resource) { if (StringUtils.isBlank(resource)) { return; } if (resourceSets == null) { this.resourceSets = new HashSet<String>(); } this.resourceSets.add(resource); } // 添加资源集合 public void addResources(Set<String> resources) { if (resources == null) { return; } if (resourceSets == null) { this.resourceSets = new HashSet<String>(); } this.resourceSets.addAll(resources); } // 设置客户端ID public void setClientId(String clientId) { this.clientId = clientId; } }
2. 看TransactionPropagationFilter 的源码可能有些不太理解,这里以一个下订单的例子进行讲述(源码太长,这里不贴出来了)
- 用户调用 OrderService 的下订单接口,发起一个订单创建请求。
- OrderService 在处理请求之前,会通过代码中的
RootContext.getXID()获取当前的全局事务 ID(XID)。- 如果当前存在全局事务 ID(XID),说明该请求是在一个分布式事务中发起的。OrderService 将把 XID 设置到 RpcContext 的附件中,以便传递给下游的服务提供方(InventoryService)。
- 如果当前不存在全局事务 ID(XID),但在请求的 RpcContext 附件中存在 XID,说明该请求是从上游传递过来的。OrderService 将从 RpcContext 的附件中获取 XID,并将其绑定到当前运行时的事务上下文(RootContext)中。
- 接下来,OrderService 执行业务逻辑,比如创建订单。
- 一旦业务逻辑执行完毕,会调用下游的服务提供方 InventoryService 来减少商品库存。
- 在调用 InventoryService 之前,事务传播过滤器会将当前的全局事务 ID(XID)设置到 RpcContext 的附件中,确保 InventoryService 可以获取到正确的事务上下文。
- 接着,OrderService 执行
invoker.invoke(invocation)调用,即调用 InventoryService 的减少库存方法。- InventoryService 收到请求后,会从 RpcContext 的附件中获取传递过来的全局事务 ID(XID),以确保在减少库存的操作中参与到正确的分布式事务中。
- 一旦库存操作完成,返回结果给 OrderService。
- 在 finally 块中,事务传播过滤器会解绑当前绑定的 XID,并进行清理工作。
- 如果解绑后的 XID 与传递过来的 XID 不一致,会发出警告。这种情况可能发生在调用过程中有新的事务上下文开启,导致 XID 发生变化。为了保持事务的一致性,事务传播过滤器会重新将解绑后的 XID 绑定到事务上下文中。
3. 看AbstractConnectionProxy 的源码,仍以下订单举例
这段代码是 Seata 在处理数据库连接的代理逻辑,其中涉及到了分支事务的类型判断和 PreparedStatement 的创建。
假设有一个下订单的例子,包含以下 SQL 语句:
INSERT INTO orders (order_id, customer_id, total_amount) VALUES (?, ?, ?)现在假设该订单的分支事务类型是 AT(原子事务)。在执行该 SQL 语句时,以下是代码的执行逻辑:
首先,通过调用
RootContext.getBranchType()获取当前的分支事务类型。如果分支事务类型是 AT(
BranchType.AT),表示当前事务是原子事务,即不需要关注主键的返回情况。接下来,使用 SQL 解析器(
SQLRecognizer)来解析 SQL 语句,获取表名和 SQL 类型。在这个例子中,SQL 类型是 INSERT,所以需要检查表的元数据(
TableMeta)来获取主键信息。如果表的元数据中包含主键信息,说明该表有自动生成的主键,需要使用带有主键参数的
prepareStatement()方法创建PreparedStatement对象。如果表的元数据中没有主键信息,或者 SQL 解析过程中出现异常,将使用普通的
prepareStatement()方法创建PreparedStatement对象。最后,返回一个
PreparedStatementProxy对象,该对象封装了原始PreparedStatement对象和相关的代理逻辑。通过以上逻辑,Seata 可以根据不同的分支事务类型和 SQL 语句类型,在创建
PreparedStatement时进行特定的处理。在这个例子中,当分支事务类型为 AT 且 SQL 类型为 INSERT 时,会使用带有主键参数的prepareStatement()方法创建PreparedStatement对象。注意:以上仅是对代码逻辑的分析和假设的情况。实际的执行结果可能会根据具体的代码实现和配置进行调整和变化。
4. ExecuteTemplate 源码
public class ExecuteTemplate { /** * 执行数据库操作 * * @param statementProxy StatementProxy 对象 * @param statementCallback StatementCallback 对象 * @param args 参数列表 * @param <T> 返回结果类型 * @param <S> Statement 的具体类型 * @return 执行结果 * @throws SQLException SQL 异常 */ public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { return execute(null, statementProxy, statementCallback, args); } /** * 执行数据库操作 * * @param sqlRecognizers SQLRecognizer 列表 * @param statementProxy StatementProxy 对象 * @param statementCallback StatementCallback 对象 * @param args 参数列表 * @param <T> 返回结果类型 * @param <S> Statement 的具体类型 * @return 执行结果 * @throws SQLException SQL 异常 */ public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { // 如果不需要全局锁且分支事务类型不是 AT,则直接执行原始的数据库操作 if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { return statementCallback.execute(statementProxy.getTargetStatement(), args); } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { // 如果 SQLRecognizer 列表为空,则根据 SQL 语句和数据库类型获取 SQLRecognizer 列表 sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType); } Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { // 如果 SQLRecognizer 列表为空,则使用 PlainExecutor 执行原始的数据库操作 executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: // 如果是 INSERT 语句,根据数据库类型选择相应的 Executor executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: // 如果是 UPDATE 语句,根据数据库类型选择相应的 Executor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case DELETE: // 如果是 DELETE 语句,根据数据库类型选择相应的 Executor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case SELECT_FOR_UPDATE: // 如果是 SELECT FOR UPDATE 语句,根据数据库类型选择相应的 Executor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case INSERT_ON_DUPLICATE_UPDATE: // 如果是 INSERT ON DUPLICATE UPDATE 语句,根据数据库类型选择相应的 Executor switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE"); } break; case UPDATE_JOIN: // 如果是 UPDATE JOIN 语句,根据数据库类型选择相应的 Executor switch (dbType) { case JdbcConstants.MYSQL: executor = new MySQLUpdateJoinExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name()); } break; default: // 默认情况下,使用 PlainExecutor 执行原始的数据库操作 executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { // 如果有多个 SQLRecognizer,则使用 MultiExecutor 执行多个 SQLRecognizer executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { // 执行具体的数据库操作 rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // 将其他异常转换为 SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; } }假设我们有一个名为
OrderService的微服务,该服务使用了 Seata 进行分布式事务管理。我们将使用上述的ExecuteTemplate类来执行数据库操作。假设我们的订单服务需要在创建订单时插入订单信息到数据库中。下面是示例的订单创建方法:
public class OrderService { private DataSource dataSource; // 假设已注入数据源 public void createOrder(Order order) { Connection connection = null; try { connection = dataSource.getConnection(); StatementProxy<Statement> statementProxy = new StatementProxy<>(connection.createStatement()); String sql = "INSERT INTO orders (order_id, customer_id, total_amount) VALUES (?, ?, ?)"; ExecuteTemplate.execute(statementProxy, (statement, args) -> { PreparedStatement preparedStatement = (PreparedStatement) statement; preparedStatement.setString(1, order.getId()); preparedStatement.setString(2, order.getCustomerId()); preparedStatement.setDouble(3, order.getTotalAmount()); return preparedStatement.executeUpdate(); }, sql); connection.commit(); // 提交事务 } catch (SQLException e) { if (connection != null) { try { connection.rollback(); // 回滚事务 } catch (SQLException ex) { // 处理回滚异常 } } // 处理异常 } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { // 处理连接关闭异常 } } } } }在上述代码中,我们通过
dataSource获取数据库连接,并创建了一个StatementProxy对象,该对象用于代理原始的Statement对象。我们要执行的 SQL 语句是插入订单信息的语句。在执行
ExecuteTemplate.execute方法时,会判断当前的分支事务类型和是否需要全局锁。如果不需要全局锁且分支事务类型不是 AT(原子性事务),则直接执行原始的数据库操作。但是,在我们的例子中,假设需要全局锁且分支事务类型是 AT,因此会根据 SQL 类型选择相应的
Executor来执行数据库操作。对于插入语句(SQLType.INSERT),会使用InsertExecutor进行执行。
InsertExecutor是 Seata 提供的一个实现,它会在插入操作前进行一些额外的处理,例如生成全局唯一的 ID(如果配置了 ID 生成器)或者处理分布式锁等。通过使用
ExecuteTemplate和相应的Executor,我们可以在分布式事务中执行数据库操作,并根据事务的类型和需要进行相应的处理,以保证事务的一致性和隔离性。
AbstractDMLBaseExecutor源码
/** * 抽象的 DML 基础执行器 * @param <T> 执行结果的类型 * @param <S> Statement 的子类型 */ public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> { public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) { super(statementProxy, statementCallback, sqlRecognizer); } @Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } } /** * 处理自动提交为 true 的情况 * @param args 方法参数 * @return 执行结果 * @throws Throwable 执行过程中的异常 */ protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); // 开启事务 return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); // 提交事务 return result; }); } catch (Exception e) { // 当 finally 块中发生异常时,该异常会丢失,所以在这里打印异常信息 LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); } } /** * 处理自动提交为 false 的情况 * @param args 方法参数 * @return 执行结果 * @throws Exception 执行过程中的异常 */ protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; } } /** * UPDATE 语句的执行器 * @param <T> 执行结果的类型 * @param <S> Statement 的子类型 */ public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> { public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) { super(statementProxy, statementCallback, sqlRecognizer); } }
executeAutoCommitTrue方法处理自动提交为 true 的情况。首先,获取连接代理对象,并通过调用changeAutoCommit方法将自动提交设置为 false,即开启事务。然后,使用LockRetryPolicy类实现了一个重试策略,执行executeAutoCommitFalse方法获取执行结果,并通过连接代理对象的commit方法提交事务。如果在执行过程中出现异常,会将异常打印,并根据配置决定是否回滚事务。executeAutoCommitFalse方法处理自动提交为 false 的情况。首先,检查数据库类型是否为 MySQL 并且是否存在多个主键,如果是则抛出不支持的异常。然后,获取执行前的数据镜像(beforeImage),执行具体的数据库操作,并获取执行结果。接着,根据执行前后的数据镜像获取变更的数据(afterImage),并准备撤销日志(undo log)。最后,返回执行结果。
以下订单为例,流程追踪
博主源码和demo在这:https://github.com/JiuYou2020/seata-demo
先给一张比较古早版本的图,虽然和现版本的不一致,但对博主的帮助很大,感谢创作这幅图的作者!
request和response的处理流程
request
rm client -> server
RegisterRMRequest
MergedWarpMessage
BranchRegisterRequest
BranchReportRequest
GlobalLockQueryRequest
tm client -> server
RegisterTMRequest
MergedWarpMessage
GlobalBeginRequest
GlobalCommitRequest
GlobalRollbackRequest
GlobalStatusRequest
GlobalReportRequest
server -> rm client
BranchCommitRequest
BranchRollbackRequest
UndoLogDeleteRequest
server -> tm client
// null
response
Server -> rm client
RegisterRMResponse
MergeResultMessage
BranchRegisterResponse
BranchReportResponse
GlobalLockQueryResponse
Server -> tm client
RegisterTMResponse
MergeResultMessage
GlobalBeginResponse
GlobalCommitResponse
GlobalReportResponse
GlobalRollbackResponse
rm client -> server
BranchCommitResponse
BranchRollbackResponse
tm client -> server
// null
0. 梦开始的地方
io.seata.spring.annotation.GlobalTransactionScanner所作的工作
- 扫描 Spring 容器中的 Bean 定义,识别带有
@GlobalTransactional注解的方法。 - 初始化客户端,RM和TM的初始化流程在这里开始,与TC的沟通也从这里开始
- 添加拦截器(那些拦截带有
@GlobalTransactional注解的方法) - 注册
spring关闭钩子,执行应用程序关闭时所需要进行的一些处理(例如清理TM和RM)
1. RM及TM注册流程
- 启动seata-demo的任意一个微服务,这里是RM注册流程
- Netty是Seata中的网络传输框架,构建了客户端与服务端直接的通信框架,因此猜测发送的请求会经过这里,我们找到两个与远程netty相关的类
- AbstractNettyRemotingClient:
- 这是一个抽象类,用于实现基于 Netty 的远程通信客户端。它定义了一些通用的方法和属性,包括启动、关闭、发送请求等。
- Seata 的客户端可以使用 Netty 作为底层通信框架,与服务端进行通信。
- AbstractNettyRemotingServer:
- 这也是一个抽象类,用于实现基于 Netty 的远程通信服务端。它定义了一些通用的方法和属性,包括启动、关闭、处理请求等
- Seata 的服务端可以使用 Netty 作为底层通信框架,接收客户端的请求并进行相应的处理。
- AbstractNettyRemotingClient:
- 在AbstractNettyRemotingServer中,我们找到了请求的处理方法,源码如下:
/**
* The type ServerHandler.
*/
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {
/**
* Channel read.
* 当从通道读取到消息时被调用,用于处理接收到的消息。
* @param ctx the ctx
* @param msg the msg
* @throws Exception the exception
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
processMessage(ctx, (RpcMessage) msg);
}
/**
* 当通道的可写状态发生改变时被调用。
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
synchronized (lock) {
if (ctx.channel().isWritable()) {
lock.notifyAll();
}
}
ctx.fireChannelWritabilityChanged();
}
/**
* Channel inactive.
*当通道变为非活跃状态时被调用,表示通道已断开连接。
* @param ctx the ctx
* @throws Exception the exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
debugLog("inactive:{}", ctx);
if (messageExecutor.isShutdown()) {
return;
}
handleDisconnect(ctx);
super.channelInactive(ctx);
}
/**
* 获取远程客户端的 IP 地址和端口信息。
* 通过 ChannelManager 从通道中获取与之关联的 RpcContext 对象。
* 如果获取到了有效的 RpcContext 对象,并且该对象的客户端角色不为空,说明该通道是用于与客户端进行通信的,需要释放相应资源。
* 释放 RpcContext 对象,即清理与通道相关的上下文信息。
* 如果日志级别为 INFO,则记录日志,指示通道已变为非活跃状态。
* 如果未获取到有效的 RpcContext 对象,说明该通道是未使用的,直接移除该通道。
* 如果日志级别为 INFO,则记录日志,指示移除了未使用的通道。
* @param ctx
*/
private void handleDisconnect(ChannelHandlerContext ctx) {
final String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (LOGGER.isInfoEnabled()) {
LOGGER.info(ipAndPort + " to server channel inactive.");
}
if (rpcContext != null && rpcContext.getClientRole() != null) {
rpcContext.release();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("remove unused channel:" + ctx.channel());
}
}
}
/**
* Exception caught.
*当处理过程中发生异常时被调用,用于捕获并处理异常情况。
* @param ctx the ctx
* @param cause the cause
* @throws Exception the exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
try {
if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {
return;
}
LOGGER.error("exceptionCaught:{}, channel:{}", cause.getMessage(), ctx.channel());
super.exceptionCaught(ctx, cause);
} finally {
ChannelManager.releaseRpcContext(ctx.channel());
}
}
/**
* User event triggered.
*当用户自定义事件被触发时被调用。
* @param ctx the ctx
* @param evt the evt
* @throws Exception the exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
debugLog("idle:{}", evt);
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("channel:" + ctx.channel() + " read idle.");
}
handleDisconnect(ctx);
try {
closeChannelHandlerContext(ctx);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(ctx + " will closed");
}
super.close(ctx, future);
}
}
- 查阅文档可知,channelRead的作用是当从通道读取到消息时被调用,用于处理接收到的消息。确定了,是从这里处理客户端发来的消息,跟进
processMessage(ctx, (RpcMessage) msg);的源码AbstractNettyRemoting类中 - 简单解释一下这里的源码:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
//打印日志
Object body = rpcMessage.getBody();
// 检查消息体是否实现了MessageTypeAware接口,即具有消息类型
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
// 根据消息类型从processorTable中获取对应的处理器和执行器
if (pair != null) {
if (pair.getSecond() != null) {
try {
// 在线程池中异步执行处理器逻辑
pair.getSecond().execute(() -> {
try {
// 调用处理器的process方法处理消息
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
} finally {
MDC.clear();
}
});
} catch (RejectedExecutionException e) {
//打印日志
// 当线程池已满时,尝试生成线程堆栈信息
}
} else {
// 直接在当前线程中执行处理器逻辑
}
} else {
// 没有找到对应的处理器,记录错误日志
}
} else {
// 消息体不是MessageTypeAware类型,记录错误日志
}
}
- 跟进process的实现类,会发现一个
RegRmProcesser类,大胆猜测在这里处理注册 - 跟进到
onRegRmMessage方法,会看到ChannelManager.registerRMChannel(message, ctx.channel());,确定了,就是在这里注册,源码如下

- 跟进
registerRMChannel方法,源码如下:

此时,RM已经注册完成,TM注册流程除注册细节实现不一致,其它流程基本一致,读者可自行查看
2. 全局事务拦截处理器执行主要流程
- 利用postman发送请求
io.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler会拦截到方法并执行doInvoke方法

- 如果开启了全局事务,则会进入这里处理全局事务逻辑

- 跟进,会在
transactionalTemplate.execute(new TransactionalExecutor(){...}方法中执行

-
在
io.seata.tm.api.TransactionalTemplate#execute做了很多事情,下列列出几个- 处理事务传播行为,如果没有配置,默认为
SUPPORTS

- 开启全局事务并执行业务逻辑,如果发生异常,则回滚

- 在
finally块中,做一些清理

- 处理事务传播行为,如果没有配置,默认为
-
在这里,与TC沟通,开启全局事务,下面的第三点

- 执行业务逻辑,会被
PrepareStatementProxy#execute方法拦截到,做一些具体处理,并作分支注册

- 处理异常,需要回滚则回滚

- 提交事务

3. 如何绑定全局事务并返回xid
中间步骤省略,同第1步2-5
-
继续跟进到处理消息的方法
ServerOnRequestProcessor(根据名称猜测这个最有可能是处理请求的实现) -
跟进
onRequestMessage(ctx, rpcMessage);继续跟进到ServerOnRequestProcessor类的onRequestMessage方法,源码如下:
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// 检查消息类型,如果不是AbstractMessage类型,则记录错误日志并返回
if (!(message instanceof AbstractMessage)) {
//...
}
// 处理批量发送的请求消息(MergedWarpMessage)
if (message instanceof MergedWarpMessage) {
// 检查是否开启了 TCServer 批量发送响应功能,并且客户端版本号大于等于 1.5.0
if (NettyServerConfig.isEnableTcServerBatchSendResponse() && StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion150(rpcContext.getVersion())) {
List<AbstractMessage> msgs = ((MergedWarpMessage)message).msgs;
List<Integer> msgIds = ((MergedWarpMessage)message).msgIds;
// 遍历每个消息,并根据 PARALLEL_REQUEST_HANDLE 标志决定是否并行处理
for (int i = 0; i < msgs.size(); i++) {
AbstractMessage msg = msgs.get(i);
int msgId = msgIds.get(i);
if (PARALLEL_REQUEST_HANDLE) {
// 并行处理
CompletableFuture.runAsync(() -> handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext));
} else {
// 串行处理
handleRequestsByMergedWarpMessageBy150(msg, msgId, rpcMessage, ctx, rpcContext);
}
}
} else {
List<AbstractResultMessage> results = new ArrayList<>();
List<CompletableFuture<AbstractResultMessage>> completableFutures = null;
// 遍历每个消息,并根据 PARALLEL_REQUEST_HANDLE 标志决定是否并行处理
for (int i = 0; i < ((MergedWarpMessage)message).msgs.size(); i++) {
if (PARALLEL_REQUEST_HANDLE) {
// 并行处理
if (completableFutures == null) {
completableFutures = new ArrayList<>();
}
int finalI = i;
completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage(
((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));
} else {
// 串行处理
results.add(i, handleRequestsByMergedWarpMessage(((MergedWarpMessage)message).msgs.get(i), rpcContext));
}
}
if (CollectionUtils.isNotEmpty(completableFutures)) {
// 等待并行处理的结果完成
try {
for (CompletableFuture<AbstractResultMessage> completableFuture : completableFutures) {
results.add(completableFuture.get());
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("handle request error: {}", e.getMessage(), e);
}
}
// 构造 MergeResultMessage,将处理结果发送给客户端
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
}
} else {
// 处理单个发送的请求消息
final AbstractMessage msg = (AbstractMessage) message;
if (LOGGER.isInfoEnabled()) {
// 记录请求消息的日志
String receiveMsgLog = String.format("receive msg[single]: %s, clientIp: %s, vgroup: %s", message,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(receiveMsgLog);
}
// 调用事务消息处理器处理请求消息,并获取处理结果
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
// 将处理结果发送给客户端
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
if (LOGGER.isInfoEnabled()) {
// 记录处理结果的日志
String resultMsgLog = String.format("result msg[single]: %s, clientIp: %s, vgroup: %s", result,
NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
BatchLogHandler.INSTANCE.writeLog(resultMsgLog);
}
}
}
- 应该会在
completableFutures.add(CompletableFuture.supplyAsync(() -> handleRequestsByMergedWarpMessage( ((MergedWarpMessage)message).msgs.get(finalI), rpcContext)));这里串行处理消息,继续跟进到ServerOnRequestProcessor.java类的handleRequestsByMergedWarpMessage方法,看到上面的doc注释,放心了,果然是用来处理rpc请求的 - 继续跟进
AbstractResultMessage resultMessage = transactionMessageHandler.onRequest(subMessage, rpcContext);到DefaultCoordinator.java类的onRequest,因为该方法的接口doc文档中写了On a request received.且是服务模块下的 return transactionRequest.handle(context);跟进到GlobalBeginRequest类中的handle,再跟进到AbstractTCInboundHandler.java的handle- 到这里,终于看见开启全局事务的希望了,
doGlobalBegin(request, response, rpcContext);

- 继续跟进
core.begin

- 此时,已经得到了xid了并开启全局事务了,那么,server是什么时候把xid返回给客户端的呢,请关注第3点的这几行
// 构造 MergeResultMessage,将处理结果发送给客户端
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results.toArray(new AbstractResultMessage[0]));
//在这里,将结果传递会TM
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
- 到这里,或许就有小伙伴有疑惑了,seata-server是什么时候将xid存储到
rootcontext中的呢,代码中没有看见呀,其实是有的,在这里,我先给大家介绍一个知识点(见附),所以说,在第7步的MDC.put(RootContext.MDC_KEY_XID, session.getXid());行中就已经把xid存储到了RootContext中.
附:
MDC(Mapped Diagnostic Context)
MDC是日志框架(如log4j、logback)中的一个重要组件,用于在多线程环境下将上下文信息与日志事件相关联。MDC通过一个Map结构来存储和获取线程的上下文信息,这些信息可以被日志框架捕获并输出到日志中。MDC提供了一种方便的方式来跟踪、记录和区分不同线程中的日志信息。
XID(Transaction ID)
XID是Seata中的一个重要概念,代表分布式事务的全局唯一标识。在分布式事务中,每个参与者都需要使用相同的XID来关联和协调各自的事务操作。XID通常由一串字符串表示,可以是全局唯一的ID,例如UUID。
RootContext
RootContext是Seata中的一个工具类,它提供了操作当前线程上下文信息的静态方法。RootContext主要用于存储和获取当前线程的XID。通过RootContext,可以在不同的代码模块中方便地访问和传递XID,以便进行分布式事务的处理。
它们之间的关系
RootContext内部通过封装MDC的操作来实现XID的存储和获取。它定义了一个常量MDC_KEY_XID,用于指定在MDC中存储XID的键名。通过调用MDC.put(RootContext.MDC_KEY_XID, xid)方法,可以将XID存储到MDC中,并使用MDC_KEY_XID作为键名。然后,通过调用RootContext.getXID()方法,可以从MDC中获取当前线程的XID。
通过将XID存储到MDC中,并通过RootContext提供的方法获取,可以方便地在分布式事务处理中使用和传递XID。
综上所述,MDC是一个日志框架中的组件,用于在多线程环境下存储和获取上下文信息。XID是Seata中用于表示分布式事务全局唯一标识的概念。RootContext是Seata提供的工具类,通过封装MDC的操作,提供了方便的方式来存储和获取当前线程的XID。
4. 执行业务sql
首先介绍两个代理对象的概念:
StatementProxy:
- 当执行的 SQL 语句是普通的、不带占位符的语句时,可以使用
StatementProxy进行执行。- 例如,执行的 SQL 语句是
SELECT * FROM table_name,没有占位符需要填充。
PreparedStatementProxy:
- 当执行的 SQL 语句是预编译语句,需要通过填充占位符来执行时,可以使用
PreparedStatementProxy进行执行。- 例如,执行的 SQL 语句是
SELECT * FROM table_name WHERE column_name = ?,需要填充占位符的值。区别:
StatementProxy是用于执行普通的 SQL 语句,不进行预处理或编译,直接将完整的 SQL 语句发送到数据库执行。PreparedStatementProxy是用于执行预编译的 SQL 语句,通过占位符来表示参数的位置,可以多次执行并填充不同的参数值。
-
这是Seata中的3个类,在执行业务sql时,
PrepareStatementProxy会拦截,并到PrepareStatementProxy#execute执行
-
跟进

-
再继续跟进会进入
io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy<S>, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object...)方法中- 在这个方法中,会对需要执行的sql语句进行识别,并确定适合的执行器(根据数据库类型和进行的操作(如插入,删除))
- 确定执行器后,使用执行器来执行sql
- 源码如下:
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { // 检查是否需要全局锁以及分支类型是否为AT if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // 如果不需要全局锁且分支类型不是AT,则直接执行语句 return statementCallback.execute(statementProxy.getTargetStatement(), args); } // 从连接中获取数据库类型 String dbType = statementProxy.getConnectionProxy().getDbType(); // 检查是否提供了SQL识别器,否则使用SQLVisitorFactory创建识别器 if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor<T> executor; // 根据SQL类型确定适当的执行器 if (CollectionUtils.isEmpty(sqlRecognizers)) { // 如果找不到SQL识别器,则使用PlainExecutor executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: // 对于INSERT语句,使用InsertExecutor executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: // 对于UPDATE语句,使用UpdateExecutor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case DELETE: // 对于DELETE语句,使用DeleteExecutor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case SELECT_FOR_UPDATE: // 对于SELECT FOR UPDATE语句,使用SelectForUpdateExecutor if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } else { executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); } break; case INSERT_ON_DUPLICATE_UPDATE: // 根据数据库类型选择相应的执行器来处理INSERT ON DUPLICATE UPDATE语句 switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE"); } break; case UPDATE_JOIN: // 对于MySQL中的UPDATE JOIN语句,使用MySQLUpdateJoinExecutor switch (dbType) { case JdbcConstants.MYSQL: executor = new MySQLUpdateJoinExecutor<>(statementProxy,statementCallback,sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name()); } break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // 将其他异常转换为SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; } -
在本例中,采用的时mysql的insert语句,所以我在源码中对insert的执行器加载进行了解释,有兴趣的同学可以去查看
-
-
跟进执行器执行的
io.seata.rm.datasource.exec.BaseTransactionalExecutor#execute方法

- 继续跟进执行具体业务的
doExecute方法

- 默认情况下,自动提交是开启的,但是我们需要将自动提交关闭再执行,这是为了保证通过在自动提交关闭的状态下执行业务 SQL,可以将多个 SQL 操作放在一个事务中,当所有操作都执行成功后,再通过手动提交事务来确保数据的一致性。如果其中任何一个操作失败,可以回滚整个事务,避免对数据造成部分修改。
- 我们先进
executeAutoCommitTrue方法,源码如下:

发现在这里还是通过executeAutoCommitFalse来执行业务sql

- 获取执行前快照和执行后快照代码中有少量注释,这里就步解释了,可以看到,在这里执行了业务sql并存储了执行前后快照,接下来,执行提交

- 跟进

- 继续跟进

- 继续跟进


分支注册流程如下第五步
在注册后,刷新undolog到数据库
5. 分支注册
在Seata中,Branch ID(分支ID)用于唯一标识一个分支事务。每个分支事务都有一个唯一的Branch ID。XID(事务ID)用于唯一标识一个全局事务,而Branch ID则用于标识全局事务下的分支事务。
分支事务是全局事务下的子事务,用于表示在分布式环境中的具体操作或资源。例如,一个全局事务可能涉及多个数据库操作,每个数据库操作都可以作为一个分支事务。分支事务在全局事务的协调下进行提交或回滚。
Branch ID的作用如下:
- 唯一标识分支事务:每个分支事务都有一个唯一的Branch ID,用于在全局事务中进行标识和关联。
- 事务状态管理:Seata使用Branch ID来管理分支事务的状态。每个分支事务都有自己的状态,例如已提交、已回滚或待提交等。
- 协调操作:在全局事务提交或回滚时,需要通过Branch ID来协调和执行相应的操作。全局事务的提交或回滚会涉及到所有分支事务的提交或回滚。
- 同绑定全局事务2-5
- 在
DefaultCoordinator的onRequest中,继续跟进return transactionRequest.handle(context);,选择实现类BranchRegisterRequest.java的handle方法,继续跟进return handler.handle(this, rpcContext); - 到了
AbstractTCInboundHandler.java类中,看源码:

DefaultCoordinator.java的doBranchRegister方法,看来DefaultCoordinator.java这个类还是蛮重要滴

- 继续跟进到
io.seata.server.coordinator.DefaultCore#branchRegister方法,没啥内容,继续跟进到io.seata.server.coordinator.AbstractCore#branchRegister方法

这就设计到AT模式的核心了,给分支事务加全局锁和本地锁(概念见下附),详见文档
- 全局锁:确保该分支执行过程中不会受到其它分支干扰
- 本地锁,确保分支事务在本地环境中的原子性和隔离性。
附:
- 全局锁:全局锁是用于保护全局事务的执行过程,确保全局事务的各个分支在执行过程中不会受到并发干扰。在分布式事务中,全局锁通常是由事务协调器(如 Seata 的 TC)管理和控制的。全局锁的作用是在分布式环境中协调各个分支事务的提交或回滚,保证全局事务的一致性。
- 本地锁:本地锁是用于保护单个分支事务的执行过程,确保分支事务在本地环境中的原子性和隔离性。在 AT 模式中,每个参与分布式事务的服务都会有自己的本地锁,用于保护对本地资源的访问。本地锁的作用是在分支事务内部控制本地资源的并发访问,防止并发冲突和数据不一致。
至此,分支事务注册完成,返回branchid也与xid返回类似,请读者自行查看
6. 全局提交
在第一个分支注册完成后,客户端会接着执行业务代码发现有远程调用,则去另一个微服务从犯执行业务sql和分支注册的流程,在执行完成后,提交分支事务并记录undolog,如果发生异常,则回滚,至此,业务逻辑完成,进行全局提交

由于博主时机紧迫,这里就简单讲述一下接下来的流程,不再贴出源码了
- TM(
GlobalTransactionRole.launcher发起GlobalCommitRequest请求,返回GlobalStatus,提交全局事务,其中有异常重试机制,默认5次 - TC接受到请求,在
io.seata.server.coordinator.DefaultCoordinator#doGlobalCommit执行提交 - 跟进到
io.seata.server.coordinator.DefaultCore#commit,在这里,会去查全局锁的表global_table并获取对应的GlobalSession
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 查找全局会话
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
// 如果全局会话不存在,则表示事务已完成
return GlobalStatus.Finished;
}
if (globalSession.isTimeout()) {
// 如果全局会话已超时,则返回超时回滚状态
LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());
return GlobalStatus.TimeoutRollbacking;
}
// 加锁并执行状态变更操作
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
boolean shouldCommitNow = false;
if (globalSession.getStatus() == GlobalStatus.Begin) {
// 首先关闭全局会话,之后不允许再注册分支事务
globalSession.close();
if (globalSession.canBeCommittedAsync()) {
// 如果可以异步提交,则进行异步提交
globalSession.asyncCommit();
MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
} else {
// 修改全局事务状态为 Committing,表示需要立即提交
globalSession.changeGlobalStatus(GlobalStatus.Committing);
shouldCommitNow = true;
}
// 在成功修改状态后清理全局会话
globalSession.clean();
}
return shouldCommitNow;
});
if (shouldCommit) {
// 执行全局提交操作
boolean success = doGlobalCommit(globalSession, false);
// 如果成功并且剩余的分支事务可以异步提交,则进行异步提交
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
// 返回当前全局事务状态
return globalSession.getStatus();
}
} else {
// 如果状态为 AsyncCommitting,则表示已提交;否则返回当前全局事务状态
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
- 上面代码块中,其实做了很多事情,我接着讲,首先进行异步提交,我们跟进
globalSession.asyncCommit(); - 跟进到
io.seata.server.storage.db.session.DataBaseSessionManager#updateGlobalSessionStatus,继续跟进到io.seata.server.storage.db.session.DataBaseSessionManager#updateGlobalSessionStatus,这里会将global_table的status设为1 - 此时
io.seata.server.coordinator.DefaultCoordinator#handleAsyncCommitting会处理AsyncCommiting任务,它会获取GlobalSession列表,如果GlobalStatus为AsyncCommiting,则先遍历BranchSession列表,提交分支事务,删除undolog释放全局锁再删除全局事务信息,删除global_table的信息 - 自此,基本流程走完了
由于时间比较赶,博主难免有很多疏漏之处,欢迎大家指出,另外,这里的异常处理已经回滚机制没有展现,大家有兴趣的同学可以自己去看看源码,参考这张图
编码规范
1. 浮点数之间的等值判断,基本数据类型不能使用 == 进行比较,包装数据类型不能使用 equals
进行判断。
说明:浮点数采用“尾数+阶码”的编码方式,类似于科学计数法的“有效数字+指数”的表示方式。二进制无法精确表
示大部分的十进制小数,具体原理参考《码出高效》。
反例:
float a = 1.0F - 0.9F;
float b = 0.9F - 0.8F;
if (a == b) {
// 预期进入此代码块,执行其它业务逻辑
// 但事实上 a == b 的结果为 false
}
Float x = Float.valueOf(a);
Float y = Float.valueOf(b);
if (x.equals(y)) {
// 预期进入此代码块,执行其它业务逻辑
// 但事实上 equals 的结果为 false
}
正例:
(1)指定一个误差范围,两个浮点数的差值在此范围之内,则认为是相等的。
float a = 1.0F - 0.9F;
float b = 0.9F - 0.8F;
float diff = 1e-6F;
if (Math.abs(a - b) < diff) {
System.out.println("true");
}
(2)使用 BigDecimal 来定义值,再进行浮点数的运算操作。
BigDecimal a = new BigDecimal("1.0");
BigDecimal b = new BigDecimal("0.9");
BigDecimal c = new BigDecimal("0.8");
BigDecimal x = a.subtract(b);
BigDecimal y = b.subtract(c);
if (x.compareTo(y) == 0) {
System.out.println("true");
}
2. BigDecimal 的等值比较应使用 compareTo() 方法,而不是 equals() 方法。
说明:equals() 方法会比较值和精度(1.0 与 1.00 返回结果为 false),而 compareTo() 则会忽略精度。
3. 禁止使用构造方法 BigDecimal(double) 的方式把 double 值转化为 BigDecimal 对象。
说明:BigDecimal(double) 存在精度损失风险,在精确计算或值比较的场景中可能会导致业务逻辑异常。如:
BigDecimal g = new BigDecimal(0.1F);实际的存储值为:0.100000001490116119384765625
正例:优先推荐入参为 String 的构造方法,或使用 BigDecimal 的 valueOf 方法,此方法内部其实执行了 Double 的
toString,而 Double 的 toString 按 double 的实际能表达的精度对尾数进行了截断。
BigDecimal recommend1 = new BigDecimal("0.1");
BigDecimal recommend2 = BigDecimal.valueOf(0.1)
4. 关于基本数据类型与包装数据类型的使用标准如下:
1)【强制】所有的 POJO 类属性必须使用包装数据类型。
2)【强制】RPC 方法的返回值和参数必须使用包装数据类型。
3)【推荐】所有的局部变量使用基本数据类型。
Java 开发手册(黄山版)
8/51
说明:POJO 类属性没有初值是提醒使用者在需要使用时,必须自己显式地进行赋值,任何 NPE 问题,或者入库检查,
都由使用者来保证。
正例:数据库的查询结果可能是 null,因为自动拆箱,用基本数据类型接收有 NPE 风险。
反例:某业务的交易报表上显示成交总额涨跌情况,即正负 x%,x 为基本数据类型,调用的 RPC 服务,调用不成功时,
返回的是默认值,页面显示为 0%,这是不合理的,应该显示成中划线-。所以包装数据类型的 null 值,能够表示额外的
信息,如:远程调用失败,异常退出。
5. 定义 DO / PO / DTO / VO 等 POJO 类时,不要设定任何属性默认值。
反例:某业务的 DO 的 createTime 默认值为 new Date();但是这个属性在数据提取时并没有置入具体值,在更新其
它字段时又附带更新了此字段,导致创建时间被修改成当前时间。
6. 构造方法里面禁止加入任何业务逻辑,如果有初始化逻辑,请放在 init 方法中。
7. POJO 类必须写 toString 方法。使用 IDE 中的工具 source > generate toString 时,如果继
承了另一个 POJO 类,注意在前面加一下 super.toString()。
说明:在方法执行抛出异常时,可以直接调用 POJO 的 toString() 方法打印其属性值,便于排查问题。
8. 使用索引访问用 String 的 split 方法得到的数组时,需做最后一个分隔符后有无内容的检查,
否则会有抛 IndexOutOfBoundsException 的风险。
说明:
String str = "a,b,c,,";
String[] ary = str.split(",");
// 预期大于 3,结果等于 3
System.out.println(ary.length);
9. 类成员与方法访问控制从严:
1)如果不允许外部直接通过 new 来创建对象,那么构造方法必须是 private。
2)工具类不允许有 public 或 default 构造方法。
3)类非 static 成员变量并且与子类共享,必须是 protected。
4)类非 static 成员变量并且仅在本类使用,必须是 private。
5)类 static 成员变量如果仅在本类使用,必须是 private。
6)若是 static 成员变量,考虑是否为 final。
7)类成员方法只供类内部调用,必须是 private。
8)类成员方法只对继承类公开,那么限制为 protected。
说明:任何类、方法、参数、变量,严控访问范围。过于宽泛的访问范围,不利于模块解耦。思考:如果是一个
private 的方法,想删除就删除,可是一个 public 的 service 成员方法或成员变量,删除一下,不得手心冒点汗吗?
变量像自己的小孩,尽量在自己的视线内,变量作用域太大,无限制的到处跑,那么你会担心的。
10. 日期格式化时,传入 pattern 中表示年份统一使用小写的 y。
说明:日期格式化时,yyyy 表示当天所在的年,而大写的 YYYY 代表是 week in which year(JDK7 之后引入的概念),
意思是当天所在的周属于的年份,一周从周日开始,周六结束,只要本周跨年,返回的 YYYY 就是下一年。
正例:表示日期和时间的格式如下所示:
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
反例:某程序员因使用 YYYY/MM/dd 进行日期格式化,2017/12/31 执行结果为 2018/12/31,造成线上故障。
11. 禁止在程序中写死一年为 365 天,避免在公历闰年时出现日期转换错误或程序逻辑错误。
正例:
// 获取今年的天数
int daysOfThisYear = LocalDate.now().lengthOfYear();
// 获取指定某年的天数
LocalDate.of(2011, 1, 1).lengthOfYear();
反例:
// 第一种情况:在闰年 366 天时,出现数组越界异常
int[] dayArray = new int[365];
// 第二种情况:一年有效期的会员制,2020 年 1 月 26 日注册,硬编码 365 返回的却是 2021 年 1 月 25 日
Calendar calendar = Calendar.getInstance();
calendar.set(2020, 1, 26);
calendar.add(Calendar.DATE, 365);
12. 关于 hashCode 和 equals 的处理,遵循如下规则:
1)只要覆写 equals,就必须覆写 hashCode。
2)因为 Set 存储的是不重复的对象,依据 hashCode 和 equals 进行判断,所以 Set 存储的对象必须覆写这两种方法。
3)如果自定义对象作为 Map 的键,那么必须覆写 hashCode 和 equals。
说明:String 因为覆写了 hashCode 和 equals 方法,所以可以愉快地将 String 对象作为 key 来使用。
13. 判断所有集合内部的元素是否为空,使用 isEmpty() 方法,而不是 size() == 0 的方式。
说明:在某些集合中,前者的时间复杂度为 O(1),而且可读性更好。
正例:
Map<String, Object> map = new HashMap<>(16);
if (map.isEmpty()) {
System.out.println("no element in this map.");
}
14. 在使用 java.util.stream.Collectors 类的 toMap() 方法转为 Map 集合时,一定要注意当 value
为 null 时会抛 NPE 异常。
说明:在 java.util.HashMap 的 merge 方法里会进行如下的判断:
if (value == null || remappingFunction == null)
throw new NullPointerException();
反例:
List<Pair<String, Double>> pairArrayList = new ArrayList<>(2);
pairArrayList.add(new Pair<>("version1", 8.3));
pairArrayList.add(new Pair<>("version2", null));
// 抛出 NullPointerException 异常
Map<String, Double> map = pairArrayList.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue, (v1, v2) -> v2));
15. ArrayList 的 subList 结果不可强转成 ArrayList,否则会抛出 ClassCastException 异常:
java.util.RandomAccessSubList cannot be cast to java.util.ArrayList。
说明:subList() 返回的是 ArrayList 的内部类 SubList,并不是 ArrayList 本身,而是 ArrayList 的一个视图,对于
SubList 的所有操作最终会反映到原列表上。
16. 在 subList 场景中,高度注意对父集合元素的增加或删除,均会导致子列表的遍历、增加、删
除产生 ConcurrentModificationException 异常。
说明:抽查表明,90% 的程序员对此知识点都有错误的认知。
17. 使用集合转数组的方法,必须使用集合的 toArray(T[] array),传入的是类型完全一致、长度为
0 的空数组。
反例:直接使用 toArray 无参方法存在问题,此方法返回值只能是 Object[]类,若强转其它类型数组将出现
ClassCastException 错误。
正例:
List<String> list = new ArrayList<>(2);
list.add("guan");
list.add("bao");
String[] array = list.toArray(new String[0]);
说明:使用 toArray 带参方法,数组空间大小的 length:
1)等于 0,动态创建与 size 相同的数组,性能最好。
2)大于 0 但小于 size,重新创建大小等于 size 的数组,增加 GC 负担。
3)等于 size,在高并发情况下,数组创建完成之后,size 正在变大的情况下,负面影响与 2 相同。
4)大于 size,空间浪费,且在 size 处插入 null 值,存在 NPE 隐患。
18. 使用工具类 Arrays.asList() 把数组转换成集合时,不能使用其修改集合相关的方法,它的 add
/ remove / clear 方法会抛出 UnsupportedOperationException 异常。
说明:asList 的返回对象是一个 Arrays 内部类,并没有实现集合的修改方法。Arrays.asList 体现的是适配器模式,只
是转换接口,后台的数据仍是数组,指向的仍是同一个对象。
String[] str = new String[]{ "yang", "guan", "bao" };
List list = Arrays.asList(str);
第一种情况:list.add("yangguanbao"); 运行时异常。
第二种情况:str[0] = "change"; list 中的元素也会随之修改,反之亦然。
19. 不要在 foreach 循环里进行元素的 remove / add 操作。remove 元素请使用 iterator 方式,
如果并发操作,需要对 iterator 对象加锁。
正例:
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
String item = iterator.next();
if (删除元素的条件) {
iterator.remove();
}
}
反例:
for (String item : list) {
if ("1".equals(item)) {
list.remove(item);
}
}
20. 集合初始化时,指定集合初始值大小。
说明:HashMap 使用构造方法 HashMap(int initialCapacity) 进行初始化时,如果暂时无法确定集合大小,那么指
定默认值(16)即可。
正例:initialCapacity = (需要存储的元素个数 / 负载因子) + 1。注意负载因子(即 loaderfactor)默认为 0.75,如果
暂时无法确定初始值大小,请设置为 16(即默认值)。
反例:HashMap 需要放置 1024 个元素,由于没有设置容量初始大小,随着元素增加而被迫不断扩容,resize() 方法
总共会调用 8 次,反复重建哈希表和数据迁移。当放置的集合元素个数达千万级时会影响程序性能。
21. 使用 entrySet 遍历 Map 类集合 KV,而不是 keySet 方式进行遍历。
说明:keySet 其实是遍历了 2 次,一次是转为 Iterator 对象,另一次是从 hashMap 中取出 key 所对应的 value。而
entrySet 只是遍历了一次就把 key 和 value 都放到了 entry 中,效率更高。如果是 JDK8,使用 Map.forEach 方法。
正例:values() 返回的是 V 值集合,是一个 list 集合对象;keySet() 返回的是 K 值集合,是一个 Set 集合对象;
entrySet() 返回的是 K-V 值组合的 Set 集合。
- 高度注意 Map 类集合 K / V 能不能存储 null 值的情况,如下表格:
| 集合类 | Key 是否允许为 null | Value 是否允许为 null | Super 类 | 线程安全性 |
|---|---|---|---|---|
| Hashtable | 不允许 | 不允许 | Dictionary | 线程安全 |
| TreeMap | 不允许 | 允许 | AbstractMap | 线程不安全 |
| ConcurrentHashMap | 不允许 | 不允许 | AbstractMap | 锁分段技术(JDK8:CAS) |
| HashMap | 允许 | 允许 | AbstractMap | 线程不安全 |
23. 获取单例对象需要保证线程安全,其中的方法也要保证线程安全。
说明:资源驱动类、工具类、单例工厂类都需要注意。
24. 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
正例:自定义线程工厂,并且根据外部特征进行分组,比如,来自同一机房的调用,把机房编号赋值给
whatFeatureOfGroup:
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
// 定义线程组名称,在利用 jstack 来排查问题时,非常有帮助
UserThreadFactory(String whatFeatureOfGroup) {
namePrefix = "FromUserThreadFactory's" + whatFeatureOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
25. 在使用阻塞等待获取锁的方式中,必须在 try 代码块之外,并且在加锁方法与 try 代码块之间没
有任何可能抛出异常的方法调用,避免加锁成功后,在 finally 中无法解锁。
说明一:在 lock 方法与 try 代码块之间的方法调用抛出异常,无法解锁,造成其它线程无法成功获取锁。
说明二:如果 lock 方法在 try 代码块之内,可能由于其它方法抛出异常,导致在 finally 代码块中,unlock 对未加锁的对
象解锁,它会调用 AQS 的 tryRelease 方法(取决于具体实现类),抛出 IllegalMonitorStateException 异常。
说明三:在 Lock 对象的 lock 方法实现中可能抛出 unchecked 异常,产生的后果与说明二相同。
正例:
Lock lock = new XxxLock();
// ...
lock.lock();
try {
doSomething();
doOthers();
} finally {
lock.unlock();
}
反例:
Lock lock = new XxxLock();
// ...
try {
// 如果此处抛出异常,则直接执行 finally 代码块
doSomething();
// 无论加锁是否成功,finally 代码块都会执行
lock.lock();
doOthers();
} finally {
lock.unlock();
}
26. 并发修改同一记录时,避免更新丢失,需要加锁。要么在应用层加锁,要么在缓存加锁,要么
在数据库层使用乐观锁,使用 version 作为更新依据。
说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于 3 次。
27. 避免 Random 实例被多线程使用,虽然共享该实例是线程安全的,但会因竞争同一 seed 导致
的性能下降。
说明:Random 实例包括 java.util.Random 的实例或者 Math.random() 的方式。
正例:在 JDK7 之后,可以直接使用 API ThreadLocalRandom,而在 JDK7 之前,需要编码保证每个线程持有一个
单独的 Random 实例。
通过双重检查锁(double-checked locking),实现延迟初始化需要将目标属性声明为
volatile 型,(比如修改 helper 的属性声明为 private volatile Helper helper = null;)。
正例:
public class LazyInitDemo {
private volatile Helper helper = null;
public Helper getHelper() {
if (helper == null) {
synchronized(this) {
if (helper == null) {
helper = new Helper();
}
}
}
return helper;
}
// other methods and fields...
28. 当 switch 括号内的变量类型为 String 并且此变量为外部参数时,必须先进行 null 判断。
反例:如下的代码输出是什么?
public static void main(String[] args) {
method(null);
}
public static void method(String param) {
switch (param) {
case "sth":
System.out.println("it's sth");
break;
case "null":
System.out.println("it's null");
break;
default:
System.out.println("default");
}
}
29. 在高并发场景中,避免使用“等于”判断作为中断或退出的条件。
说明:如果并发控制没有处理好,容易产生等值判断被“击穿”的情况,使用大于或小于的区间判断条件来代替。
反例:判断剩余奖品数量等于 0 时,终止发放奖品,但因为并发处理错误导致奖品数量瞬间变成了负数,这样的话,
活动无法终止。
30. 服务端发生错误时,返回给前端的响应信息必须包含 HTTP 状态码,errorCode、
errorMessage、用户提示信息四个部分。
说明:四个部分的涉众对象分别是浏览器、前端开发、错误排查人员、用户。其中输出给用户的提示信息要求:简短清
晰、提示友好,引导用户进行下一步操作或解释错误原因,提示信息可以包括错误原因、上下文环境、推荐操作等。
errorCode:参考 。errorMessage:简要描述后端出错原因,便于错误排查人员快速定位问题,注意不要包含敏
感数据信息。
正例:常见的 HTTP 状态码如下
1)200 OK:表明该请求被成功地完成,所请求的资源发送到客户端。
2)401 Unauthorized:请求要求身份验证,常见对于需要登录而用户未登录的情况。
3)403 Forbidden:服务器拒绝请求,常见于机密信息或复制其它登录用户链接访问服务器的情况。
4)404 NotFound:服务器无法取得所请求的网页,请求资源不存在。
5)500 InternalServerError:服务器内部错误。
31. 在使用正则表达式时,利用好其预编译功能,可以有效加快正则匹配速度。
说明:不要在方法体内定义:Pattern pattern = Pattern.compile("规则");
正确:public class MyClass{
private static final Pattern pattern = Pattern.compile("规则");
static
{
// 初始化代码块,在类加载时运行,预编译正则表达式
}
public static void myMethod(String str)
{
// 使用预编译的Pattern对象进行匹配
Matcher matcher = pattern.matcher(str);
// ...
}
}
32. 错误码分为一级宏观错误码、二级宏观错误码、三级宏观错误码。
说明:在无法更加具体确定的错误场景中,可以直接使用一级宏观错误码,分别是:A0001(用户端错误)、B0001(系
统执行出错)、C0001(调用第三方服务出错)。
正例:调用第三方服务出错是一级,中间件错误是二级,消息服务出错是三级。
33. 好的单元测试必须遵守 AIR 原则。
说明:单元测试在线上运行时,感觉像空气(AIR)一样感觉不到,但在测试质量的保障上,却是非常关键的。好的单元
测试宏观上来说,具有自动化、独立性、可重复执行的特点。
⚫ A:Automatic(自动化)
⚫ I:Independent(独立性)
⚫ R:Repeatable(可重复)
34. 用户请求传入的任何参数必须做有效性验证。
说明:忽略参数校验可能导致:
⚫ 页面 page size 过大导致内存溢出
⚫ 恶意 order by 导致数据库慢查询
⚫ 缓存击穿
⚫ SSRF
⚫ 任意重定向
⚫ SQL 注入,Shell 注入,反序列化注入
⚫ 正则输入源串拒绝服务 ReDoS
扩展:Java 代码用正则来验证客户端的输入,有些正则写法验证普通用户输入没有问题,但是如果攻击人员使
用的是特殊构造的字符串来验证,有可能导致死循环的结果。
35. 对于文件上传功能,需要对于文件大小、类型进行严格检查和控制。
说明:攻击者可以利用上传漏洞,上传恶意文件到服务器,并且远程执行,达到控制网站服务器的目的。
36. 配置文件中的密码需要加密。
37. 小数类型为 decimal,禁止使用 float 和 double。
说明:在存储的时候,float 和 double 都存在精度损失的问题,很可能在比较值的时候,得到不正确的结果。如果存
储的数据范围超过 decimal 的范围,建议将数据拆成整数和小数并分开存储。
Java面试
1. Jvm
1.Thread是如何解决内存泄漏问题的
- ThreadLocal持有着对ThreadLocalMap的引用。
- ThreadLocalMap持有着对各个值(Value)的引用。
- 如果Thread退出而ThreadLocal和ThreadLocalMap还存在,就会造成内存泄漏。
ThreadLocal内部提供了一个remove()方法用于解决此问题:
public void remove() {
Map<ThreadLocal<T>, T> m = getMap(this.threadLocalMap);
m.remove(ThreadLocal.this);
}
remove()方法会从ThreadLocalMap中移除当前ThreadLocal对象对应的引用。
此外,ThreadLocal也实现了ThreadLocal.ThreadLocalMap.Entry类的lazySet()和get()方法:
void lazySet(ThreadLocal<?> key, Object value) {
Entry e = table[indexFor(hash, table.length)];
...
// Overrides AbstractMap.SimpleEntry.setValue().
e.value = value;
}
@SuppressWarnings("unchecked")
T get() {
Entry[] tab = table;
int len = tab.length;
...
Entry e = tab[index];
if (e != null) {
return (T)e.value;
}
}
这就保证了,只有在调用get()方法时,ThreadLocalMap才会创建Value,而不会在调用set()方法时创建。
总的来说,ThreadLocal通过这些机制来:
- 在Thread退出时自动清理ThreadLocalMap引用以避免内存泄漏
- 惰性地创建Value,避免大量ThreadLocal内存管理开销
2. 计算机网络
网络分层模型
OSI七层模型
是什么?每一层的作用?
OSI 七层模型 是国际标准化组织提出一个网络分层模型,其大体结构以及每一层提供的功能如下图所示:

每一层都专注做一件事情,并且每一层都需要使用下一层提供的功能。
虽然体系完整,但是太复杂,不实用,有些功能在多个层重复出现。
更生动的图:

TCP/IP四层模型
是什么?每一层的作用
是七层模型的精简版,把会话层、表示层、应用层合并为应用层,把物理层、数据链路层合并为网络接口层,由以下四层组成:
-
- 应用层(定义了不同设备应用程序间信息交换的格式,消息会交给下一层传输层来传输)
- 传输层(负责向两台终端设备进程之间的通信提供通用的数据传输服务)
- 网络层(为分组交换网上的不同主机提供通信服务)
- 网络接口层(封装成数据帧发送到网络上)
为什么网络要分层?
-
- 各层之间相互独立,只需要顾好自己。
- 提高整体灵活性,高内聚,低耦合。
- 大问题小化。
OSI 和 TCP/IP 网络分层模型详解(基础)
应用层常见协议:
应用层常见协议
- HTTP(Hypertext Transfer Protocol,超文本传输协议):基于 TCP 协议,是一种用于传输超文本和多媒体内容的协议,主要是为 Web 浏览器与 Web 服务器之间的通信而设计的。当我们使用浏览器浏览网页的时候,我们网页就是通过 HTTP 请求进行加载的。
- SMTP(Simple Mail Transfer Protocol,简单邮件发送协议):基于 TCP 协议,是一种用于发送电子邮件的协议。注意 ⚠️:SMTP 协议只负责邮件的发送,而不是接收。要从邮件服务器接收邮件,需要使用 POP3 或 IMAP 协议。
- POP3/IMAP(邮件接收协议):基于 TCP 协议,两者都是负责邮件接收的协议。IMAP 协议是比 POP3 更新的协议,它在功能和性能上都更加强大。IMAP 支持邮件搜索、标记、分类、归档等高级功能,而且可以在多个设备之间同步邮件状态。几乎所有现代电子邮件客户端和服务器都支持 IMAP。
- FTP(File Transfer Protocol,文件传输协议) : 基于 TCP 协议,是一种用于在计算机之间传输文件的协议,可以屏蔽操作系统和文件存储方式。注意 ⚠️:FTP 是一种不安全的协议,因为它在传输过程中不会对数据进行加密。建议在传输敏感数据时使用更安全的协议,如 SFTP。
- Telnet(远程登陆协议):基于 TCP 协议,用于通过一个终端登陆到其他服务器。Telnet 协议的最大缺点之一是所有数据(包括用户名和密码)均以明文形式发送,这有潜在的安全风险。这就是为什么如今很少使用 Telnet,而是使用一种称为 SSH 的非常安全的网络传输协议的主要原因。
- SSH(Secure Shell Protocol,安全的网络传输协议):基于 TCP 协议,通过加密和认证机制实现安全的访问和文件传输等业务
- RTP(Real-time Transport Protocol,实时传输协议):通常基于 UDP 协议,但也支持 TCP 协议。它提供了端到端的实时传输数据的功能,但不包含资源预留存、不保证实时传输质量,这些功能由 WebRTC 实现。
- DNS(Domain Name System,域名管理系统): 基于 UDP 协议,用于解决域名和 IP 地址的映射问题。
关于这些协议的详细介绍请看 应用层常见协议总结(应用层) 这篇文章。
传输层常见协议:
传输层常见协议
- TCP(Transmisson Control Protocol,传输控制协议 ):提供 面向连接 的,可靠 的数据传输服务。
- UDP(User Datagram Protocol,用户数据协议):提供 无连接 的,尽最大努力 的数据传输服务(不保证数据传输的可靠性),简单高效。
网络层常见协议:
网络层常见协议
- IP(Internet Protocol,网际协议):TCP/IP 协议中最重要的协议之一,主要作用是定义数据包的格式、对数据包进行路由和寻址,以便它们可以跨网络传播并到达正确的目的地。目前 IP 协议主要分为两种,一种是过去的 IPv4,另一种是较新的 IPv6,目前这两种协议都在使用,但后者已经被提议来取代前者。
- ARP(Address Resolution Protocol,地址解析协议):ARP 协议解决的是网络层地址和链路层地址之间的转换问题。因为一个 IP 数据报在物理上传输的过程中,总是需要知道下一跳(物理上的下一个目的地)该去往何处,但 IP 地址属于逻辑地址,而 MAC 地址才是物理地址,ARP 协议解决了 IP 地址转 MAC 地址的一些问题。
- ICMP(Internet Control Message Protocol,互联网控制报文协议):一种用于传输网络状态和错误消息的协议,常用于网络诊断和故障排除。例如,Ping 工具就使用了 ICMP 协议来测试网络连通性。
- NAT(Network Address Translation,网络地址转换协议):NAT 协议的应用场景如同它的名称——网络地址转换,应用于内部网到外部网的地址转换过程中。具体地说,在一个小的子网(局域网,LAN)内,各主机使用的是同一个 LAN 下的 IP 地址,但在该 LAN 以外,在广域网(WAN)中,需要一个统一的 IP 地址来标识该 LAN 在整个 Internet 上的位置。
- OSPF(Open Shortest Path First,开放式最短路径优先) ):一种内部网关协议(Interior Gateway Protocol,IGP),也是广泛使用的一种动态路由协议,基于链路状态算法,考虑了链路的带宽、延迟等因素来选择最佳路径。
- RIP(Routing Information Protocol,路由信息协议):一种内部网关协议(Interior Gateway Protocol,IGP),也是一种动态路由协议,基于距离向量算法,使用固定的跳数作为度量标准,选择跳数最少的路径作为最佳路径。
- BGP(Border Gateway Protocol,边界网关协议):一种用来在路由选择域之间交换网络层可达性信息(Network Layer Reachability Information,NLRI)的路由选择协议,具有高度的灵活性和可扩展性。