文章有点长,我决定用半个小时来和你分享~? 废话不多说,上代码。。。
基于Seata 1.5.2,项目中用 seata-spring-boot-starter
SeataDataSourceAutoConfiguration
SeataDataSourceAutoConfiguration 主要是配置数据源自动代理,可以看到:
- 默认seata.enabled、seata.enableAutoDataSourceProxy、seata.enable-auto-data-source-proxy都是true
- 只有当classpath中有DataSource时才会进行此配置
- 创建了一个SeataAutoDataSourceProxyCreator,用于自动代理数据源
先记一下,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor
刚才new了一个SeataAutoDataSourceProxyCreator,继续看构造方法,默认useJdkProxy是false,excludes为空,dataSourceProxyMode是AT
构造方法中最重要的一件事情是构造AOP通知(拦截器),这里new了一个SeataAutoDataSourceProxyAdvice
SeataAutoDataSourceProxyAdvice是一个MethodInterceptor。
MethodInterceptor是aop中的一个接口,当目标方法被调用时就会调用与之关联的MethodInterceptor的invoke方法
至此,在构造方法中完成了advisors的赋值,advisors[]中有一个DefaultIntroductionAdvisor,DefaultIntroductionAdvisor中引用了SeataAutoDataSourceProxyAdvice
前面说过,SeataAutoDataSourceProxyCreator是一个BeanPostProcessor,而BeanPostProcessor是BeanFactory中的一个钩子(回调),称之为后置处理器
AbstractAutoProxyCreator#postProcessBeforeInstantiation()
AbstractAutoProxyCreator#postProcessAfterInitialization()
AbstractAutoProxyCreator#wrapIfNecessary()
AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()
SeataAutoDataSourceProxyCreator#getAdvicesAndAdvisorsForBean()
SeataAutoDataSourceProxyCreator#wrapIfNecessary()
DataSourceProxyHolder维护了数据源对象与数据源代理对象的映射
至此,数据源代理部分就看完了,下面总结一下:
- 启动的时候自动配置数据源代理,创建了一个SeataAutoDataSourceProxyCreator
- SeataAutoDataSourceProxyCreator在构造方法中创建AOP通知,并赋值给其属性
- AbstractAutoProxyCreator是一个抽象类不能被实例化,能实例化的只有SeataAutoDataSourceProxyCreator
- SeataAutoDataSourceProxyCreator从AbstractAutoProxyCreator那里继承了很多属性和方法,其中就包括postProcessBeforeInstantiation()、postProcessAfterInitialization()、createProxy()等等
- SeataAutoDataSourceProxyCreator间接实现了BeanPostProcessor接口,也就是说它也是BeanPostProcessor的一个实现类
- BeanFactory回调所有的BeanPostProcessor#postProcessAfterInitialization()时,就会调用SeataAutoDataSourceProxyCreator的postProcessAfterInitialization()方法,最终会调用wrapIfNecessary()方法
- wrapIfNecessary()只关心DataSource对象,它负责为DataSource对象生成代理对象,并且在SeataAutoDataSourceProxyCreator中维护了DataSource对象与SeataDataSourceProxy对象之间的映射关心
- 创建代理对象时,会给DataSource对象应用AOP拦截器。用AOP的话来讲,就是给目标对象DataSource织入通知,并创建一个被增强的代理对象
- 通知(拦截器)是SeataAutoDataSourceProxyAdvice,它实现了MethodInterceptor接口
- SeataAutoDataSourceProxyAdvice#invoke()方法所做的事情就是,拿到原始DataSource的代理对象,并且在代理对象上调用目标方法
综上所述,以上做的所有工作都是为了将来调用 javax.sql.DataSource 上的任意方法时都会被拦截,然后调用其代理对象上对应的方法。而DataSource中最重要的一个方法就是getConnection()
划重点:将来,所有调用 javax.sql.DataSource#getConnection() 都会被拦截,然后在代理对象上执行getConnection(),因此可以这样说
调 javax.sql.DataSource#getConnection() 实际上执行的是 io.seata.rm.datasource.SeataDataSourceProxy#getConnection(),AT模式下返回的是DataSourceProxy
SeataAutoConfiguration
SeataAutoConfiguration里面主要是配置GlobalTransactionScanner(全局事务扫描器)
seata.enabled=true 才会开启 SeataAutoConfiguration
GlobalTransactionScanner 也继承自 AbstractAutoProxyCreator,同时还实现了InitializingBean接口。BeanFactory在设置了所有bean属性之后会调用InitializingBean的afterPropertiesSet()方法
GlobalTransactionScanner#afterPropertiesSet()
io.seata.common.DefaultValues中定义了很多默认值
同样地,因为实现了BeanPostProcessor接口,所以在启动时BeanFactory实例化Bean之后,会调用GlobalTransactionScanner的postProcessAfterInitialization(),尽管这个postProcessAfterInitialization()方法时从AbstractAutoProxyCreator那里继承来的,但是不影响啊,还是会调用GlobalTransactionScanner这个bean的postProcessAfterInitialization()方法。于是,最终又会调wrapIfNecessary()方法。
GlobalTransactionScanner#wrapIfNecessary()
这里面有一个很重要的逻辑就是,创建了一个GlobalTransactionalInterceptor对象,并赋值给interceptor
AbstractAutoProxyCreator#getAdvicesAndAdvisorsForBean()是一个抽象方法,实现在子类GlobalTransactionScanner中
因此,所有在GlobalTransactionScanner#wrapIfNecessary()中被代理的对象,都被应用GlobalTransactionalInterceptor
GlobalTransactionalInterceptor也是一个MethodInterceptor
也就是说,目标方法的调用都会转到GlobalTransactionalInterceptor#invoke()上
GlobalTransactionalInterceptor#handleGlobalTransaction()
事务执行直接调用TransactionalTemplate的execute()方法
io.seata.tm.api.TransactionalTemplate#execute()
io.seata.tm.api.GlobalTransactionContext#getCurrent() 获取当前事务
io.seata.tm.api.TransactionalTemplate#beginTransaction()
tx是DefaultGlobalTransaction
io.seata.tm.api.DefaultGlobalTransaction#begin()
DefaultGlobalTransaction中的TransactionManager是DefaultTransactionManager
DefaultTransactionManager中提供了事务相关的底层操作
io.seata.tm.api.DefaultGlobalTransaction#commit()
io.seata.tm.api.DefaultGlobalTransaction#rollback()的逻辑与commit()类似,都是重试调用transactionManager.rollback(xid)
全局事务扫描器部分的代码就看到这里,下面总结一下:
- 配置项seata.enabled=true 会触发 SeataAutoConfiguration 自动配置
- SeataAutoConfiguration中创建了一个GlobalTransactionScanner
- GlobalTransactionScanner继承了AbstractAutoProxyCreator,并实现InitializingBean接口
- 初始化TM、RM
- 由于继承了AbstractAutoProxyCreator,所以BeanFactory会调用GlobalTransactionScanner#方法postProcessAfterInitialization(),最终会调用GlobalTransactionScanner#wrapIfNecessary()来为目标对象创建代理对象
- GlobalTransactionScanner#wrapIfNecessary()中创建了一个GlobalTransactionalInterceptor,GlobalTransactionalInterceptor是一个MethodInterceptor
- 在创建代理对象的时候,在AbstractAutoProxyCreator#wrapIfNecessary()方法中,为代理对象应用GlobalTransactionalInterceptor,于是所有目标对象上的方法调用就会转为调用GlobalTransactionalInterceptor#invoke()
- GlobalTransactionalInterceptor#invoke()方法中,首先获取被调用的目标对象的Class和Method对象,然后检查目标方法或类上是否有@GlobalTransactional或@GlobalLock注解,而且配置项中不能禁用全局事务
- 如果加了@GlobalTransactional注解,则创建一个AspectTransactional,然后开始处理全局事务,默认传播特性是REQUIRED
- 如果加了@GlobalLock注解,则开始处理全局锁
- 处理全局事务就是直接调用事务模板中的execute方法,TransactionalTemplate#execute()是一个模板方法,其中定义了事务处理的流程。首先开启事务,然后执行业务逻辑,最后提交事务,异常回滚事务。
- 事务操作是在DefaultGlobalTransaction中处理的,最终处理在DefaultTransactionManager。DefaultTransactionManager负责同步远程调用,向TC发请求来开启、提交、回滚事务等操作
数据库操作执行SQL语句
RM执行本地事务
通过Java自带的JDBC操作数据库通常是这样的:
MyBatis底层也是这一套
接下来看Seata是如何做的
首先是获取数据库连接Connection,前面已经说过了,调用DataSource的getConnection()方法底层是在代理对象SeataDataSourceProxy上调用getConnection()。SeataDataSourceProxy是接口,如果是AT模式,则这个数据源代理对象是DataSourceProxy
DataSourceProxy#getConnection()获取数据库连接
ConnectionProxy#createStatement()
ConnectionProxy#prepareStatement()
PreparedStatementProxy 继承自 StatementProxy,因此下面就直接看PreparedStatementProxy如何执行SQL
PreparedStatementProxy#execute()
ExecuteTemplate#execute() 是一个模板方法
挑一个看看吧,就挑UpdateExecutor
UpdateExecutor构造方法中一直调父类的构造法,既然如此,那么直接看BaseTransactionalExecutor
UpdateExecutor#execute()
这个方法是从BaseTransactionalExecutor那里继承来的,又是一个模板方法,可见设计模式是多么重要
AbstractDMLBaseExecutor#doExecute()
AbstractDMLBaseExecutor#executeAutoCommitTrue()
ConnectionProxy#changeAutoCommit()
现在事务自动提交已经被Seata改成false了
UpdateExecutor#beforeImage()
BaseTransactionalExecutor#prepareUndoLog()
接下来,提交事务
ConnectionProxy#processGlobalTransactionCommit() 处理全局事务提交
分支事务提交以后,业务数据更改和undo_log就都提交了
回想一下,为什么在执行业务修改前要先将默认的自动提交改成手动提交,最后再改成自动提交呢?
因为,要将业务数据修改和插入undo_log放在同一个事务里,一起提交
这一切都归功于代理
回顾一下整个调用链
结合之前的案例,AT模式TC、TM、RM三者的交互应该是这样的:
问题一:为什么在执行的时候,先将数据库自动提交autoCommit设为false,最后再改成true呢?
答:因为,需要将undo_log和业务数据修改放到同一个事务中,这样可以保证业务数据修改成功后undo_log必然插入成功,所以Seata要将其改为手动提交。最后再改成true是因为默认autoCommit就是true,这样可以不影响其它业务。
问题二:什么情况下ConnectionContext中xid=null,且isGlobalLockRequire=true呢?或者换一种问法,什么情况下不在全局事务中,当仍然需要全局锁呢?
答:当业务方法上不加@GlobalTransactional,而是只加了@GlobalLock注解的情况下,就会出现上述情况,也就会执行 ConnectionProxy#processLocalCommitWithGlobalLocks()方法,在事务提交前检查全局锁,这样设计的目的是在AT模式下,不出现脏读、脏写。由于数据源被代理了,当一个加了@GlobalTransactional的全局事务,与另一个加了@GlobalTransactional或@GlobalLock注解的事务在本地事务提交前就会检查全局锁,要先获得全局锁才能提交本地事务,这样就避免了脏读脏写,从而相当于实现了全局事务的读已提交隔离级别。参见:https://seata.io/zh-cn/blog/seata-at-lock.html
关于Seata 1.5.2 Client端的源码学习就先到这里,欢迎交流~
如果你都已经看到了这里,不妨给我点个赞吧?